package org.apache.kafka.connect.runtime;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.SubmittedRecords;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreation;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.class */
public abstract class AbstractWorkerSourceTask extends WorkerTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractWorkerSourceTask.class);
    private static final long SEND_FAILED_BACKOFF_MS = 100;
    protected final WorkerConfig workerConfig;
    protected final WorkerSourceTaskContext sourceTaskContext;
    protected final ConnectorOffsetBackingStore offsetStore;
    protected final OffsetStorageWriter offsetWriter;
    protected final Producer<byte[], byte[]> producer;
    private final SourceTask task;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    private final TransformationChain<SourceRecord> transformationChain;
    private final TopicAdmin admin;
    private final CloseableOffsetStorageReader offsetReader;
    private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
    private final CountDownLatch stopRequestedLatch;
    private final boolean topicTrackingEnabled;
    private final TopicCreation topicCreation;
    private final Executor closeExecutor;
    List<SourceRecord> toSend;
    protected Map<String, String> taskConfig;
    protected boolean started;
    private volatile boolean producerClosed;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/AbstractWorkerSourceTask$SourceRecordWriteCounter.class */
    public static class SourceRecordWriteCounter {
        private final SourceTaskMetricsGroup metricsGroup;
        private final int batchSize;
        private boolean completed = false;
        private int counter;
        static final /* synthetic */ boolean $assertionsDisabled;

        public SourceRecordWriteCounter(int i, SourceTaskMetricsGroup sourceTaskMetricsGroup) {
            if (!$assertionsDisabled && i <= 0) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && sourceTaskMetricsGroup == null) {
                throw new AssertionError();
            }
            this.batchSize = i;
            this.counter = i;
            this.metricsGroup = sourceTaskMetricsGroup;
        }

        public void skipRecord() {
            if (this.counter > 0) {
                int i = this.counter - 1;
                this.counter = i;
                if (i == 0) {
                    finishedAllWrites();
                }
            }
        }

        public void completeRecord() {
            if (this.counter > 0) {
                int i = this.counter - 1;
                this.counter = i;
                if (i == 0) {
                    finishedAllWrites();
                }
            }
        }

        public void retryRemaining() {
            finishedAllWrites();
        }

        private void finishedAllWrites() {
            if (this.completed) {
                return;
            }
            this.metricsGroup.recordWrite(this.batchSize - this.counter);
            this.completed = true;
        }

        static {
            $assertionsDisabled = !AbstractWorkerSourceTask.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/AbstractWorkerSourceTask$SourceTaskMetricsGroup.class */
    public static class SourceTaskMetricsGroup implements AutoCloseable {
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor sourceRecordPoll;
        private final Sensor sourceRecordWrite;
        private final Sensor sourceRecordActiveCount;
        private final Sensor pollTime;
        private int activeRecordCount;

        public SourceTaskMetricsGroup(ConnectorTaskId connectorTaskId, ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.sourceTaskGroupName(), registry.connectorTagName(), connectorTaskId.connector(), registry.taskTagName(), Integer.toString(connectorTaskId.task()));
            this.metricGroup.close();
            this.sourceRecordPoll = this.metricGroup.sensor("source-record-poll");
            this.sourceRecordPoll.add(this.metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
            this.sourceRecordPoll.add(this.metricGroup.metricName(registry.sourceRecordPollTotal), new CumulativeSum());
            this.sourceRecordWrite = this.metricGroup.sensor("source-record-write");
            this.sourceRecordWrite.add(this.metricGroup.metricName(registry.sourceRecordWriteRate), new Rate());
            this.sourceRecordWrite.add(this.metricGroup.metricName(registry.sourceRecordWriteTotal), new CumulativeSum());
            this.pollTime = this.metricGroup.sensor("poll-batch-time");
            this.pollTime.add(this.metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());
            this.pollTime.add(this.metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new Avg());
            this.sourceRecordActiveCount = this.metricGroup.sensor("source-record-active-count");
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCount), new Value());
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCountMax), new Max());
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCountAvg), new Avg());
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.metricGroup.close();
        }

        void recordPoll(int i, long j) {
            this.sourceRecordPoll.record(i);
            this.pollTime.record(j);
            this.activeRecordCount += i;
            this.sourceRecordActiveCount.record(this.activeRecordCount);
        }

        void recordWrite(int i) {
            this.sourceRecordWrite.record(i);
            this.activeRecordCount -= i;
            this.activeRecordCount = Math.max(0, this.activeRecordCount);
            this.sourceRecordActiveCount.record(this.activeRecordCount);
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    protected abstract void prepareToInitializeTask();

    protected abstract void prepareToEnterSendLoop();

    protected abstract void beginSendIteration();

    protected abstract void prepareToPollTask();

    protected abstract void recordDropped(SourceRecord sourceRecord);

    protected abstract Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord);

    protected abstract void recordDispatched(SourceRecord sourceRecord);

    protected abstract void batchDispatched();

    protected abstract void recordSent(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord, RecordMetadata recordMetadata);

    protected abstract void producerSendFailed(boolean z, ProducerRecord<byte[], byte[]> producerRecord, SourceRecord sourceRecord, Exception exc);

    protected abstract void finalOffsetCommit(boolean z);

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractWorkerSourceTask(ConnectorTaskId connectorTaskId, SourceTask sourceTask, TaskStatus.Listener listener, TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, TransformationChain<SourceRecord> transformationChain, WorkerSourceTaskContext workerSourceTaskContext, Producer<byte[], byte[]> producer, TopicAdmin topicAdmin, Map<String, TopicCreationGroup> map, CloseableOffsetStorageReader closeableOffsetStorageReader, OffsetStorageWriter offsetStorageWriter, ConnectorOffsetBackingStore connectorOffsetBackingStore, WorkerConfig workerConfig, ConnectMetrics connectMetrics, ClassLoader classLoader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, Executor executor) {
        super(connectorTaskId, listener, targetState, classLoader, connectMetrics, retryWithToleranceOperator, time, statusBackingStore);
        this.started = false;
        this.producerClosed = false;
        this.workerConfig = workerConfig;
        this.task = sourceTask;
        this.keyConverter = converter;
        this.valueConverter = converter2;
        this.headerConverter = headerConverter;
        this.transformationChain = transformationChain;
        this.producer = producer;
        this.admin = topicAdmin;
        this.offsetReader = closeableOffsetStorageReader;
        this.offsetWriter = offsetStorageWriter;
        this.offsetStore = connectorOffsetBackingStore;
        this.closeExecutor = executor;
        this.sourceTaskContext = workerSourceTaskContext;
        this.stopRequestedLatch = new CountDownLatch(1);
        this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(connectorTaskId, connectMetrics);
        this.topicTrackingEnabled = workerConfig.getBoolean(WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG).booleanValue();
        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, map);
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void initialize(TaskConfig taskConfig) {
        try {
            this.taskConfig = taskConfig.originalsStrings();
        } catch (Throwable th) {
            log.error("{} Task failed initialization and will not be started.", this, th);
            onFailure(th);
        }
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    protected void initializeAndStart() {
        prepareToInitializeTask();
        this.offsetStore.start();
        this.started = true;
        this.task.initialize(this.sourceTaskContext);
        this.task.start(this.taskConfig);
        log.info("{} Source task finished initialization and start", this);
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void cancel() {
        super.cancel();
        this.offsetReader.close();
        this.closeExecutor.execute(() -> {
            closeProducer(Duration.ZERO);
        });
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void stop() {
        super.stop();
        this.stopRequestedLatch.countDown();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void removeMetrics() {
        Utils.closeQuietly(this.sourceTaskMetricsGroup, "source task metrics tracker");
        super.removeMetrics();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    protected void close() {
        if (this.started) {
            SourceTask sourceTask = this.task;
            sourceTask.getClass();
            Utils.closeQuietly(sourceTask::stop, "source task");
        }
        closeProducer(Duration.ofSeconds(30L));
        if (this.admin != null) {
            Utils.closeQuietly(() -> {
                this.admin.close(Duration.ofSeconds(30L));
            }, "source task admin");
        }
        Utils.closeQuietly(this.transformationChain, "transformation chain");
        Utils.closeQuietly(this.retryWithToleranceOperator, "retry operator");
        Utils.closeQuietly(this.offsetReader, "offset reader");
        ConnectorOffsetBackingStore connectorOffsetBackingStore = this.offsetStore;
        connectorOffsetBackingStore.getClass();
        Utils.closeQuietly(connectorOffsetBackingStore::stop, "offset backing store");
    }

    private void closeProducer(Duration duration) {
        if (this.producer != null) {
            this.producerClosed = true;
            Utils.closeQuietly(() -> {
                this.producer.close(duration);
            }, "source task producer");
        }
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void execute() {
        try {
            prepareToEnterSendLoop();
            while (!isStopping()) {
                beginSendIteration();
                if (shouldPause()) {
                    onPause();
                    if (awaitUnpause()) {
                        onResume();
                        prepareToEnterSendLoop();
                    }
                } else {
                    if (this.toSend == null) {
                        prepareToPollTask();
                        log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
                        long milliseconds = this.time.milliseconds();
                        this.toSend = poll();
                        if (this.toSend != null) {
                            recordPollReturned(this.toSend.size(), this.time.milliseconds() - milliseconds);
                        }
                    }
                    if (this.toSend != null) {
                        log.trace("{} About to send {} records to Kafka", this, Integer.valueOf(this.toSend.size()));
                        if (sendRecords()) {
                            batchDispatched();
                        } else {
                            this.stopRequestedLatch.await(100L, TimeUnit.MILLISECONDS);
                        }
                    }
                }
            }
        } catch (InterruptedException e) {
        } catch (RuntimeException e2) {
            try {
                finalOffsetCommit(true);
            } catch (Exception e3) {
                log.error("Failed to commit offsets for already-failing task", (Throwable) e3);
            }
            throw e2;
        }
        finalOffsetCommit(false);
    }

    boolean sendRecords() {
        int i = 0;
        recordBatch(this.toSend.size());
        SourceRecordWriteCounter sourceRecordWriteCounter = this.toSend.size() > 0 ? new SourceRecordWriteCounter(this.toSend.size(), this.sourceTaskMetricsGroup) : null;
        for (SourceRecord sourceRecord : this.toSend) {
            this.retryWithToleranceOperator.sourceRecord(sourceRecord);
            SourceRecord apply = this.transformationChain.apply(sourceRecord);
            ProducerRecord<byte[], byte[]> convertTransformedRecord = convertTransformedRecord(apply);
            if (convertTransformedRecord == null || this.retryWithToleranceOperator.failed()) {
                sourceRecordWriteCounter.skipRecord();
                recordDropped(sourceRecord);
            } else {
                log.trace("{} Appending record to the topic {} with key {}, value {}", this, apply.topic(), apply.key(), apply.value());
                Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord = prepareToSendRecord(sourceRecord, convertTransformedRecord);
                try {
                    String str = convertTransformedRecord.topic();
                    maybeCreateTopic(str);
                    this.producer.send(convertTransformedRecord, (recordMetadata, exc) -> {
                        if (exc == null) {
                            sourceRecordWriteCounter.completeRecord();
                            log.trace("{} Wrote record successfully: topic {} partition {} offset {}", this, recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()));
                            recordSent(sourceRecord, convertTransformedRecord, recordMetadata);
                            prepareToSendRecord.ifPresent((v0) -> {
                                v0.ack();
                            });
                            if (this.topicTrackingEnabled) {
                                recordActiveTopic(convertTransformedRecord.topic());
                                return;
                            }
                            return;
                        }
                        if (this.producerClosed) {
                            log.trace("{} failed to send record to {}; this is expected as the producer has already been closed", this, str, exc);
                        } else {
                            log.error("{} failed to send record to {}: ", this, str, exc);
                        }
                        log.trace("{} Failed record: {}", this, sourceRecord);
                        producerSendFailed(false, convertTransformedRecord, sourceRecord, exc);
                        if (this.retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
                            sourceRecordWriteCounter.skipRecord();
                            prepareToSendRecord.ifPresent((v0) -> {
                                v0.ack();
                            });
                        }
                    });
                } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
                    log.warn("{} Failed to send record to topic '{}' and partition '{}'. Backing off before retrying: ", this, convertTransformedRecord.topic(), convertTransformedRecord.partition(), e);
                    this.toSend = this.toSend.subList(i, this.toSend.size());
                    prepareToSendRecord.ifPresent((v0) -> {
                        v0.drop();
                    });
                    sourceRecordWriteCounter.retryRemaining();
                    return false;
                } catch (ConnectException e2) {
                    log.warn("{} Failed to send record to topic '{}' and partition '{}' due to an unrecoverable exception: ", this, convertTransformedRecord.topic(), convertTransformedRecord.partition(), e2);
                    log.trace("{} Failed to send {} with unrecoverable exception: ", this, convertTransformedRecord, e2);
                    throw e2;
                } catch (KafkaException e3) {
                    producerSendFailed(true, convertTransformedRecord, sourceRecord, e3);
                }
                i++;
                recordDispatched(sourceRecord);
            }
        }
        this.toSend = null;
        return true;
    }

    protected List<SourceRecord> poll() throws InterruptedException {
        try {
            return this.task.poll();
        } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
            return null;
        }
    }

    protected ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord sourceRecord) {
        if (sourceRecord == null) {
            return null;
        }
        RecordHeaders recordHeaders = (RecordHeaders) this.retryWithToleranceOperator.execute(() -> {
            return convertHeaderFor(sourceRecord);
        }, Stage.HEADER_CONVERTER, this.headerConverter.getClass());
        byte[] bArr = (byte[]) this.retryWithToleranceOperator.execute(() -> {
            return this.keyConverter.fromConnectData(sourceRecord.topic(), recordHeaders, sourceRecord.keySchema(), sourceRecord.key());
        }, Stage.KEY_CONVERTER, this.keyConverter.getClass());
        byte[] bArr2 = (byte[]) this.retryWithToleranceOperator.execute(() -> {
            return this.valueConverter.fromConnectData(sourceRecord.topic(), recordHeaders, sourceRecord.valueSchema(), sourceRecord.value());
        }, Stage.VALUE_CONVERTER, this.valueConverter.getClass());
        if (this.retryWithToleranceOperator.failed()) {
            return null;
        }
        return new ProducerRecord<>(sourceRecord.topic(), sourceRecord.kafkaPartition(), ConnectUtils.checkAndConvertTimestamp(sourceRecord.timestamp()), bArr, bArr2, recordHeaders);
    }

    private void maybeCreateTopic(String str) {
        if (!this.topicCreation.isTopicCreationRequired(str)) {
            log.trace("Topic creation by the connector is disabled or the topic {} was previously created.If auto.create.topics.enable is enabled on the broker, the topic will be created with default settings", str);
            return;
        }
        log.info("The task will send records to topic '{}' for the first time. Checking whether topic exists", str);
        if (!this.admin.describeTopics(str).isEmpty()) {
            log.info("Topic '{}' already exists.", str);
            this.topicCreation.addTopic(str);
            return;
        }
        log.info("Creating topic '{}'", str);
        TopicCreationGroup findFirstGroup = this.topicCreation.findFirstGroup(str);
        log.debug("Topic '{}' matched topic creation group: {}", str, findFirstGroup);
        NewTopic newTopic = findFirstGroup.newTopic(str);
        TopicAdmin.TopicCreationResponse createOrFindTopics = this.admin.createOrFindTopics(newTopic);
        if (createOrFindTopics.isCreated(newTopic.name())) {
            this.topicCreation.addTopic(str);
            log.info("Created topic '{}' using creation group {}", newTopic, findFirstGroup);
        } else {
            if (!createOrFindTopics.isExisting(newTopic.name())) {
                log.warn("Request to create new topic '{}' failed", str);
                throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure that the task is authorized to create topics or that the topic exists and restart the task");
            }
            this.topicCreation.addTopic(str);
            log.info("Found existing topic '{}'", newTopic);
        }
    }

    protected RecordHeaders convertHeaderFor(SourceRecord sourceRecord) {
        Headers headers = sourceRecord.headers();
        RecordHeaders recordHeaders = new RecordHeaders();
        if (headers != null) {
            String str = sourceRecord.topic();
            for (Header header : headers) {
                String key = header.key();
                recordHeaders.add(key, this.headerConverter.fromConnectHeader(str, key, header.schema(), header.value()));
            }
        }
        return recordHeaders;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitTaskRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        try {
            this.task.commitRecord(sourceRecord, recordMetadata);
        } catch (Throwable th) {
            log.error("{} Exception thrown while calling task.commitRecord()", this, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitSourceTask() {
        try {
            this.task.commit();
        } catch (Throwable th) {
            log.error("{} Exception thrown while calling task.commit()", this, th);
        }
    }

    protected void recordPollReturned(int i, long j) {
        this.sourceTaskMetricsGroup.recordPoll(i, j);
    }

    SourceTaskMetricsGroup sourceTaskMetricsGroup() {
        return this.sourceTaskMetricsGroup;
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public /* bridge */ /* synthetic */ void transitionTo(TargetState targetState) {
        super.transitionTo(targetState);
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public /* bridge */ /* synthetic */ boolean shouldPause() {
        return super.shouldPause();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask, java.lang.Runnable
    public /* bridge */ /* synthetic */ void run() {
        super.run();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public /* bridge */ /* synthetic */ boolean awaitStop(long j) {
        return super.awaitStop(j);
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public /* bridge */ /* synthetic */ ClassLoader loader() {
        return super.loader();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public /* bridge */ /* synthetic */ ConnectorTaskId id() {
        return super.id();
    }
}
