package org.apache.kafka.connect.runtime;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.TransactionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerTransactionContext.class */
public class WorkerTransactionContext implements TransactionContext {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WorkerTransactionContext.class);
    private final Set<SourceRecord> commitableRecords = new HashSet();
    private final Set<SourceRecord> abortableRecords = new HashSet();
    private boolean batchCommitRequested = false;
    private boolean batchAbortRequested = false;

    @Override // org.apache.kafka.connect.source.TransactionContext
    public synchronized void commitTransaction() {
        this.batchCommitRequested = true;
    }

    @Override // org.apache.kafka.connect.source.TransactionContext
    public synchronized void commitTransaction(SourceRecord sourceRecord) {
        Objects.requireNonNull(sourceRecord, "Source record used to define transaction boundaries may not be null");
        this.commitableRecords.add(sourceRecord);
    }

    @Override // org.apache.kafka.connect.source.TransactionContext
    public synchronized void abortTransaction() {
        this.batchAbortRequested = true;
    }

    @Override // org.apache.kafka.connect.source.TransactionContext
    public synchronized void abortTransaction(SourceRecord sourceRecord) {
        Objects.requireNonNull(sourceRecord, "Source record used to define transaction boundaries may not be null");
        this.abortableRecords.add(sourceRecord);
    }

    public synchronized boolean shouldCommitBatch() {
        checkBatchRequestsConsistency();
        boolean z = this.batchCommitRequested;
        this.batchCommitRequested = false;
        return z;
    }

    public synchronized boolean shouldAbortBatch() {
        checkBatchRequestsConsistency();
        boolean z = this.batchAbortRequested;
        this.batchAbortRequested = false;
        return z;
    }

    public synchronized boolean shouldCommitOn(SourceRecord sourceRecord) {
        checkRecordRequestConsistency(sourceRecord);
        return this.commitableRecords.remove(sourceRecord);
    }

    public synchronized boolean shouldAbortOn(SourceRecord sourceRecord) {
        checkRecordRequestConsistency(sourceRecord);
        return this.abortableRecords.remove(sourceRecord);
    }

    private void checkBatchRequestsConsistency() {
        if (this.batchCommitRequested && this.batchAbortRequested) {
            throw new IllegalStateException("Connector requested both commit and abort of same transaction");
        }
    }

    private void checkRecordRequestConsistency(SourceRecord sourceRecord) {
        if (this.commitableRecords.contains(sourceRecord) && this.abortableRecords.contains(sourceRecord)) {
            log.trace("Connector will fail as it has requested both commit and abort of transaction for same record: {}", sourceRecord);
            throw new IllegalStateException(String.format("Connector requested both commit and abort of same record against topic/partition %s/%s", sourceRecord.topic(), sourceRecord.kafkaPartition()));
        }
    }
}
