package org.apache.kafka.connect.runtime;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.logevents.connect.LogEventState;
import io.confluent.logevents.connect.LogEventsConfig;
import io.confluent.logevents.connect.LogEventsEmitter;
import io.confluent.logevents.connect.LogEventsKafkaEmitter;
import io.confluent.telemetry.events.exporter.kafka.KafkaExporterConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.TraceReporter;
import org.apache.kafka.connect.runtime.errors.TraceWorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.runtime.tracing.ConnectTracer;
import org.apache.kafka.connect.runtime.tracing.Tracer;
import org.apache.kafka.connect.runtime.tracing.TracerConfig;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/Worker.class */
public class Worker {
    public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
    public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Worker.class);
    public static final String FAIL_NON_TOLERATED_EXCEPTION = "fail.non.tolerated.exception";
    protected Herder herder;
    private final ExecutorService executor;
    private final Time time;
    private final String workerId;
    private final String kafkaClusterId;
    private final Plugins plugins;
    private final ConnectMetrics metrics;
    private final WorkerMetricsGroup workerMetricsGroup;
    private ConnectorStatusMetricsGroup connectorStatusMetricsGroup;
    private final WorkerConfig config;
    private final Converter internalKeyConverter;
    private final Converter internalValueConverter;
    private final OffsetBackingStore globalOffsetBackingStore;
    private final ConcurrentMap<String, WorkerConnector> connectors;
    private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks;
    private Optional<SourceTaskOffsetCommitter> sourceTaskOffsetCommitter;
    private final WorkerConfigTransformer workerConfigTransformer;
    private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
    private final WorkerConfigDecorator workerConfigDecorator;
    private final LogEventsKafkaEmitter logEventsKafkaEmitter;
    private ScheduledExecutorService logEventStateCleanupService;
    private LogEventsConfig logEventsConfig;
    private final ConcurrentMap<String, LogEventState> connectorLogEventStateMap;
    private final ConcurrentMap<String, LogEventState> taskLogEventStateMap;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/Worker$ConnectorStatusMetricsGroup.class */
    public static class ConnectorStatusMetricsGroup {
        private final ConnectMetrics connectMetrics;
        private final ConnectMetricsRegistry registry;
        private final ConcurrentMap<String, ConnectMetrics.MetricGroup> connectorStatusMetrics = new ConcurrentHashMap();
        private final Herder herder;
        private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks;

        protected ConnectorStatusMetricsGroup(ConnectMetrics connectMetrics, ConcurrentMap<ConnectorTaskId, WorkerTask> concurrentMap, Herder herder) {
            this.connectMetrics = connectMetrics;
            this.registry = connectMetrics.registry();
            this.tasks = concurrentMap;
            this.herder = herder;
        }

        protected ConnectMetrics.LiteralSupplier<Long> taskCounter(String str) {
            return j -> {
                return Long.valueOf(this.tasks.keySet().stream().filter(connectorTaskId -> {
                    return connectorTaskId.connector().equals(str);
                }).count());
            };
        }

        protected ConnectMetrics.LiteralSupplier<Long> taskStatusCounter(String str, AbstractStatus.State state) {
            return j -> {
                return Long.valueOf(this.tasks.values().stream().filter(workerTask -> {
                    return workerTask.id().connector().equals(str) && this.herder.taskStatus(workerTask.id()).state().equalsIgnoreCase(state.toString());
                }).count());
            };
        }

        protected synchronized void recordTaskAdded(ConnectorTaskId connectorTaskId) {
            if (this.connectorStatusMetrics.containsKey(connectorTaskId.connector())) {
                return;
            }
            String connector = connectorTaskId.connector();
            ConnectMetrics.MetricGroup group = this.connectMetrics.group(this.registry.workerGroupName(), this.registry.connectorTagName(), connector);
            group.addValueMetric(this.registry.connectorTotalTaskCount, taskCounter(connector));
            for (Map.Entry<MetricNameTemplate, AbstractStatus.State> entry : this.registry.connectorStatusMetrics.entrySet()) {
                group.addValueMetric(entry.getKey(), taskStatusCounter(connector, entry.getValue()));
            }
            this.connectorStatusMetrics.put(connectorTaskId.connector(), group);
        }

        protected synchronized void recordTaskRemoved(ConnectorTaskId connectorTaskId) {
            if (this.tasks.keySet().stream().noneMatch(connectorTaskId2 -> {
                return connectorTaskId2.connector().equals(connectorTaskId.connector());
            })) {
                this.connectorStatusMetrics.get(connectorTaskId.connector()).close();
                this.connectorStatusMetrics.remove(connectorTaskId.connector());
            }
        }

        protected synchronized void close() {
            Iterator<ConnectMetrics.MetricGroup> it = this.connectorStatusMetrics.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }

        protected ConnectMetrics.MetricGroup metricGroup(String str) {
            return this.connectorStatusMetrics.get(str);
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/Worker$ExactlyOnceSourceTaskBuilder.class */
    class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
        private final Runnable preProducerCheck;
        private final Runnable postProducerCheck;

        public ExactlyOnceSourceTaskBuilder(ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, TaskStatus.Listener listener, TargetState targetState, Runnable runnable, Runnable runnable2) {
            super(connectorTaskId, clusterConfigState, listener, targetState);
            this.preProducerCheck = runnable;
            this.postProducerCheck = runnable2;
        }

        @Override // org.apache.kafka.connect.runtime.Worker.TaskBuilder
        public WorkerTask doBuild(Task task, ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, TaskStatus.Listener listener, TargetState targetState, ConnectorConfig connectorConfig, Converter converter, Converter converter2, HeaderConverter headerConverter, ClassLoader classLoader, ErrorHandlingMetrics errorHandlingMetrics, Class<? extends Connector> cls, RetryWithToleranceOperator retryWithToleranceOperator) {
            TransformationChain transformationChain;
            SourceConnectorConfig sourceConnectorConfig = new SourceConnectorConfig(Worker.this.plugins, connectorConfig.originalsStrings(), Worker.this.config.topicCreationEnable());
            TopicAdmin topicAdmin = new TopicAdmin(Worker.adminConfigs(connectorTaskId.connector(), "connector-adminclient-" + connectorTaskId, Worker.this.config, sourceConnectorConfig, cls, Worker.this.connectorClientConfigOverridePolicy, Worker.this.kafkaClusterId, ConnectorType.SOURCE));
            this.closeableResources.accept(topicAdmin, "topic admin for task" + connectorTaskId);
            Map<String, TopicCreationGroup> map = null;
            if (Worker.this.config.topicCreationEnable() && sourceConnectorConfig.usesTopicCreation()) {
                map = TopicCreationGroup.configuredGroups(sourceConnectorConfig);
            }
            Optional connectorTracer = Worker.this.connectorTracer(connectorConfig, connectorTaskId, null, topicAdmin);
            connectorTracer.ifPresent(tracer -> {
                this.closeableResources.accept(tracer, "tracer for task " + connectorTaskId);
            });
            List<ErrorReporter> sourceTaskReporters = Worker.this.sourceTaskReporters(connectorTaskId, sourceConnectorConfig, errorHandlingMetrics);
            sourceTaskReporters.forEach(errorReporter -> {
                this.closeableResources.accept(errorReporter, "reporter " + errorReporter + " for task " + connectorTaskId);
            });
            if (connectorTracer.isPresent()) {
                transformationChain = new TracingTransformationChain((Tracer) connectorTracer.get(), retryWithToleranceOperator);
                this.closeableResources.accept(transformationChain, "transformations for task " + connectorTaskId);
                TraceReporter traceReporter = new TraceReporter((Tracer) connectorTracer.get());
                this.closeableResources.accept(traceReporter, "trace reporter for task " + connectorTaskId);
                sourceTaskReporters.add(traceReporter);
            } else {
                transformationChain = new TransformationChain(sourceConnectorConfig.transformationStages(), retryWithToleranceOperator);
                this.closeableResources.accept(transformationChain, "transformations for task " + connectorTaskId);
            }
            Worker.log.info("Initializing: {}", transformationChain);
            retryWithToleranceOperator.reporters(sourceTaskReporters);
            Map<String, Object> exactlyOnceSourceTaskProducerConfigs = Worker.exactlyOnceSourceTaskProducerConfigs(connectorTaskId, Worker.this.config, sourceConnectorConfig, cls, Worker.this.connectorClientConfigOverridePolicy, Worker.this.kafkaClusterId);
            KafkaProducer kafkaProducer = new KafkaProducer(exactlyOnceSourceTaskProducerConfigs);
            this.closeableResources.accept(kafkaProducer, "producer for task " + connectorTaskId);
            ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask = Worker.this.offsetStoreForExactlyOnceSourceTask(connectorTaskId, sourceConnectorConfig, cls, kafkaProducer, exactlyOnceSourceTaskProducerConfigs, topicAdmin);
            offsetStoreForExactlyOnceSourceTask.configure(Worker.this.config);
            ExactlyOnceWorkerSourceTask exactlyOnceWorkerSourceTask = new ExactlyOnceWorkerSourceTask(connectorTaskId, (SourceTask) task, listener, targetState, converter, converter2, headerConverter, transformationChain, kafkaProducer, topicAdmin, map, new OffsetStorageReaderImpl(offsetStoreForExactlyOnceSourceTask, connectorTaskId.connector(), Worker.this.internalKeyConverter, Worker.this.internalValueConverter), new OffsetStorageWriter(offsetStoreForExactlyOnceSourceTask, connectorTaskId.connector(), Worker.this.internalKeyConverter, Worker.this.internalValueConverter), offsetStoreForExactlyOnceSourceTask, Worker.this.config, clusterConfigState, Worker.this.metrics, errorHandlingMetrics, classLoader, Worker.this.time, retryWithToleranceOperator, Worker.this.herder.statusBackingStore(), sourceConnectorConfig, Worker.this.executor, this.preProducerCheck, this.postProducerCheck);
            connectorTracer.ifPresent(tracer2 -> {
                Worker.log.info("Using tracing id {} for connector '{}' and task {}", tracer2.tracingContext().traceID(), connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
                exactlyOnceWorkerSourceTask.useTracer(tracer2);
            });
            return exactlyOnceWorkerSourceTask;
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/Worker$SinkTaskBuilder.class */
    class SinkTaskBuilder extends TaskBuilder {
        public SinkTaskBuilder(ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, TaskStatus.Listener listener, TargetState targetState) {
            super(connectorTaskId, clusterConfigState, listener, targetState);
        }

        @Override // org.apache.kafka.connect.runtime.Worker.TaskBuilder
        public WorkerTask doBuild(Task task, ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, TaskStatus.Listener listener, TargetState targetState, ConnectorConfig connectorConfig, Converter converter, Converter converter2, HeaderConverter headerConverter, ClassLoader classLoader, ErrorHandlingMetrics errorHandlingMetrics, Class<? extends Connector> cls, RetryWithToleranceOperator retryWithToleranceOperator) {
            TransformationChain transformationChain;
            Worker.log.info("Initializing: {}", new TransformationChain(connectorConfig.transformationStages(), retryWithToleranceOperator));
            SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(Worker.this.plugins, connectorConfig.originalsStrings());
            List<ErrorReporter> sinkTaskReporters = Worker.this.sinkTaskReporters(connectorTaskId, sinkConnectorConfig, errorHandlingMetrics, cls);
            sinkTaskReporters.forEach(errorReporter -> {
                this.closeableResources.accept(errorReporter, "reporter " + errorReporter + " for task " + connectorTaskId);
            });
            Optional connectorTracer = Worker.this.connectorTracer(connectorConfig, connectorTaskId, null, null);
            connectorTracer.ifPresent(tracer -> {
                this.closeableResources.accept(tracer, "tracer for task " + connectorTaskId);
            });
            if (connectorTracer.isPresent()) {
                transformationChain = new TracingTransformationChain((Tracer) connectorTracer.get(), retryWithToleranceOperator);
                this.closeableResources.accept(transformationChain, "transformations for task " + connectorTaskId);
                TraceReporter traceReporter = new TraceReporter((Tracer) connectorTracer.get());
                this.closeableResources.accept(traceReporter, "trace reporter for task " + connectorTaskId);
                sinkTaskReporters.add(traceReporter);
            } else {
                transformationChain = new TransformationChain(connectorConfig.transformationStages(), retryWithToleranceOperator);
                this.closeableResources.accept(transformationChain, "transformations for task " + connectorTaskId);
            }
            Worker.log.info("Initializing: {}", transformationChain);
            retryWithToleranceOperator.reporters(sinkTaskReporters);
            WorkerErrantRecordReporter createWorkerErrantRecordReporter = Worker.this.createWorkerErrantRecordReporter(sinkConnectorConfig, retryWithToleranceOperator, converter, converter2, headerConverter, connectorTracer);
            KafkaConsumer kafkaConsumer = new KafkaConsumer(Worker.baseConsumerConfigs(connectorTaskId, "connector-consumer-" + connectorTaskId, Worker.this.config, connectorConfig, cls, Worker.this.connectorClientConfigOverridePolicy, Worker.this.kafkaClusterId, ConnectorType.SINK));
            this.closeableResources.accept(kafkaConsumer, "consumer for task " + connectorTaskId);
            WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorTaskId, (SinkTask) task, listener, targetState, Worker.this.config, clusterConfigState, Worker.this.metrics, converter, converter2, errorHandlingMetrics, headerConverter, transformationChain, kafkaConsumer, classLoader, Worker.this.time, retryWithToleranceOperator, createWorkerErrantRecordReporter, Worker.this.herder.statusBackingStore());
            connectorTracer.ifPresent(tracer2 -> {
                Worker.log.info("Using tracing id {} for connector '{}' and task {}", tracer2.tracingContext().traceID(), connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
                workerSinkTask.useTracer(tracer2);
            });
            return workerSinkTask;
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/Worker$SourceTaskBuilder.class */
    class SourceTaskBuilder extends TaskBuilder {
        public SourceTaskBuilder(ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, TaskStatus.Listener listener, TargetState targetState) {
            super(connectorTaskId, clusterConfigState, listener, targetState);
        }

        @Override // org.apache.kafka.connect.runtime.Worker.TaskBuilder
        public WorkerTask doBuild(Task task, ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, TaskStatus.Listener listener, TargetState targetState, ConnectorConfig connectorConfig, Converter converter, Converter converter2, HeaderConverter headerConverter, ClassLoader classLoader, ErrorHandlingMetrics errorHandlingMetrics, Class<? extends Connector> cls, RetryWithToleranceOperator retryWithToleranceOperator) {
            TransformationChain transformationChain;
            SourceConnectorConfig sourceConnectorConfig = new SourceConnectorConfig(Worker.this.plugins, connectorConfig.originalsStrings(), Worker.this.config.topicCreationEnable());
            Map<String, Object> baseProducerConfigs = Worker.baseProducerConfigs(connectorTaskId, "connector-producer-" + connectorTaskId, Worker.this.config, sourceConnectorConfig, cls, Worker.this.connectorClientConfigOverridePolicy, Worker.this.kafkaClusterId);
            KafkaProducer kafkaProducer = new KafkaProducer(baseProducerConfigs);
            this.closeableResources.accept(kafkaProducer, "producer for task " + connectorTaskId);
            TopicAdmin topicAdmin = null;
            boolean sourceConnectorTopicCreationEnabled = Worker.this.sourceConnectorTopicCreationEnabled(sourceConnectorConfig);
            if (sourceConnectorTopicCreationEnabled || Worker.this.regularSourceTaskUsesConnectorSpecificOffsetsStore(sourceConnectorConfig)) {
                topicAdmin = new TopicAdmin(Worker.adminConfigs(connectorTaskId.connector(), "connector-adminclient-" + connectorTaskId, Worker.this.config, sourceConnectorConfig, cls, Worker.this.connectorClientConfigOverridePolicy, Worker.this.kafkaClusterId, ConnectorType.SOURCE));
                this.closeableResources.accept(topicAdmin, "topic admin for task" + connectorTaskId);
            }
            Map<String, TopicCreationGroup> configuredGroups = sourceConnectorTopicCreationEnabled ? TopicCreationGroup.configuredGroups(sourceConnectorConfig) : null;
            Optional connectorTracer = Worker.this.connectorTracer(connectorConfig, connectorTaskId, null, topicAdmin);
            connectorTracer.ifPresent(tracer -> {
                this.closeableResources.accept(tracer, "tracer for task " + connectorTaskId);
            });
            List<ErrorReporter> sourceTaskReporters = Worker.this.sourceTaskReporters(connectorTaskId, sourceConnectorConfig, errorHandlingMetrics);
            sourceTaskReporters.forEach(errorReporter -> {
                this.closeableResources.accept(errorReporter, "reporter " + errorReporter + " for task " + connectorTaskId);
            });
            if (connectorTracer.isPresent()) {
                transformationChain = new TracingTransformationChain((Tracer) connectorTracer.get(), retryWithToleranceOperator);
                this.closeableResources.accept(transformationChain, "transformations for task " + connectorTaskId);
                TraceReporter traceReporter = new TraceReporter((Tracer) connectorTracer.get());
                this.closeableResources.accept(traceReporter, "trace reporter for task " + connectorTaskId);
                sourceTaskReporters.add(traceReporter);
            } else {
                transformationChain = new TransformationChain(sourceConnectorConfig.transformationStages(), retryWithToleranceOperator);
                this.closeableResources.accept(transformationChain, "transformations for task " + connectorTaskId);
            }
            Worker.log.info("Initializing: {}", transformationChain);
            retryWithToleranceOperator.reporters(sourceTaskReporters);
            ConnectorOffsetBackingStore offsetStoreForRegularSourceTask = Worker.this.offsetStoreForRegularSourceTask(connectorTaskId, sourceConnectorConfig, cls, kafkaProducer, baseProducerConfigs, topicAdmin);
            offsetStoreForRegularSourceTask.configure(Worker.this.config);
            OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl(offsetStoreForRegularSourceTask, connectorTaskId.connector(), Worker.this.internalKeyConverter, Worker.this.internalValueConverter);
            this.closeableResources.accept(offsetStorageReaderImpl, "offset reader for task " + connectorTaskId);
            WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorTaskId, (SourceTask) task, listener, targetState, converter, converter2, errorHandlingMetrics, headerConverter, transformationChain, kafkaProducer, topicAdmin, configuredGroups, offsetStorageReaderImpl, new OffsetStorageWriter(offsetStoreForRegularSourceTask, connectorTaskId.connector(), Worker.this.internalKeyConverter, Worker.this.internalValueConverter), offsetStoreForRegularSourceTask, Worker.this.config, clusterConfigState, Worker.this.metrics, classLoader, Worker.this.time, retryWithToleranceOperator, Worker.this.herder.statusBackingStore(), Worker.this.executor);
            connectorTracer.ifPresent(tracer2 -> {
                Worker.log.info("Using tracing id {} for connector '{}' and task {}", tracer2.tracingContext().traceID(), connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
                workerSourceTask.useTracer(tracer2);
            });
            return workerSourceTask;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/Worker$TaskBuilder.class */
    public abstract class TaskBuilder {
        private final ConnectorTaskId id;
        private final ClusterConfigState configState;
        private final TaskStatus.Listener statusListener;
        private final TargetState initialState;
        private Task task = null;
        private ConnectorConfig connectorConfig = null;
        private Converter keyConverter = null;
        private Converter valueConverter = null;
        private HeaderConverter headerConverter = null;
        private ClassLoader classLoader = null;
        protected BiConsumer<AutoCloseable, String> closeableResources = null;

        public TaskBuilder(ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, TaskStatus.Listener listener, TargetState targetState) {
            this.id = connectorTaskId;
            this.configState = clusterConfigState;
            this.statusListener = listener;
            this.initialState = targetState;
        }

        public TaskBuilder withTask(Task task) {
            this.task = task;
            return this;
        }

        public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) {
            this.connectorConfig = connectorConfig;
            return this;
        }

        public TaskBuilder withKeyConverter(Converter converter) {
            this.keyConverter = converter;
            return this;
        }

        public TaskBuilder withValueConverter(Converter converter) {
            this.valueConverter = converter;
            return this;
        }

        public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) {
            this.headerConverter = headerConverter;
            return this;
        }

        public TaskBuilder withClassloader(ClassLoader classLoader) {
            this.classLoader = classLoader;
            return this;
        }

        public TaskBuilder withCloseableResourcesRecorder(BiConsumer<AutoCloseable, String> biConsumer) {
            this.closeableResources = biConsumer;
            return this;
        }

        public WorkerTask build() {
            Objects.requireNonNull(this.task, "Task cannot be null");
            Objects.requireNonNull(this.connectorConfig, "Connector config used by task cannot be null");
            Objects.requireNonNull(this.keyConverter, "Key converter used by task cannot be null");
            Objects.requireNonNull(this.valueConverter, "Value converter used by task cannot be null");
            Objects.requireNonNull(this.headerConverter, "Header converter used by task cannot be null");
            Objects.requireNonNull(this.classLoader, "Classloader used by task cannot be null");
            ErrorHandlingMetrics errorHandlingMetrics = Worker.this.errorHandlingMetrics(this.id);
            this.closeableResources.accept(errorHandlingMetrics, "error handling metrics for task " + this.id);
            Class<? extends Connector> connectorClass = Worker.this.plugins.connectorClass(this.connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
            RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(this.connectorConfig.errorRetryTimeout(), this.connectorConfig.errorMaxDelayInMillis(), this.connectorConfig.errorToleranceType(), Time.SYSTEM, errorHandlingMetrics, Boolean.valueOf((String) Worker.this.config.originals().getOrDefault(Worker.FAIL_NON_TOLERATED_EXCEPTION, "false")).booleanValue());
            this.closeableResources.accept(retryWithToleranceOperator, "retry operator for task " + this.id);
            return doBuild(this.task, this.id, this.configState, this.statusListener, this.initialState, this.connectorConfig, this.keyConverter, this.valueConverter, this.headerConverter, this.classLoader, errorHandlingMetrics, connectorClass, retryWithToleranceOperator);
        }

        abstract WorkerTask doBuild(Task task, ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, TaskStatus.Listener listener, TargetState targetState, ConnectorConfig connectorConfig, Converter converter, Converter converter2, HeaderConverter headerConverter, ClassLoader classLoader, ErrorHandlingMetrics errorHandlingMetrics, Class<? extends Connector> cls, RetryWithToleranceOperator retryWithToleranceOperator);
    }

    public Worker(String str, Time time, Plugins plugins, WorkerConfig workerConfig, OffsetBackingStore offsetBackingStore, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this(str, time, plugins, workerConfig, offsetBackingStore, Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy);
    }

    Worker(String str, Time time, Plugins plugins, WorkerConfig workerConfig, OffsetBackingStore offsetBackingStore, ExecutorService executorService, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this.connectors = new ConcurrentHashMap();
        this.tasks = new ConcurrentHashMap();
        this.logEventStateCleanupService = null;
        this.connectorLogEventStateMap = new ConcurrentHashMap();
        this.taskLogEventStateMap = new ConcurrentHashMap();
        this.kafkaClusterId = workerConfig.kafkaClusterId();
        this.metrics = new ConnectMetrics(str, workerConfig, time, this.kafkaClusterId);
        this.executor = executorService;
        this.workerId = str;
        this.time = time;
        this.plugins = plugins;
        this.config = workerConfig;
        this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
        this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors, this.tasks, this.metrics);
        Map<String, String> singletonMap = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
        this.internalKeyConverter = plugins.newInternalConverter(true, JsonConverter.class.getName(), singletonMap);
        this.internalValueConverter = plugins.newInternalConverter(false, JsonConverter.class.getName(), singletonMap);
        this.globalOffsetBackingStore = offsetBackingStore;
        this.workerConfigTransformer = initConfigTransformer();
        this.workerConfigDecorator = WorkerConfigDecorator.initialize(this.config, this.workerConfigTransformer);
        this.logEventsKafkaEmitter = new LogEventsKafkaEmitter();
    }

    private WorkerConfigTransformer initConfigTransformer() {
        List<String> list = this.config.getList("config.providers");
        HashMap hashMap = new HashMap();
        for (String str : list) {
            hashMap.put(str, this.plugins.newConfigProvider(this.config, "config.providers." + str, Plugins.ClassLoaderUsage.PLUGINS));
        }
        return new WorkerConfigTransformer(this, hashMap);
    }

    public WorkerConfigTransformer configTransformer() {
        return this.workerConfigTransformer;
    }

    public WorkerConfigDecorator configDecorator() {
        return this.workerConfigDecorator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Herder herder() {
        return this.herder;
    }

    public LogEventsEmitter logEventsEmitter() {
        return this.logEventsKafkaEmitter;
    }

    public ConcurrentMap<String, LogEventState> connectorLogEventStateMap() {
        return this.connectorLogEventStateMap;
    }

    public ConcurrentMap<String, LogEventState> taskLogEventStateMap() {
        return this.taskLogEventStateMap;
    }

    public LogEventState createIfAbsentConnectorLogEventState(Map.Entry<String, LogEventState> entry) {
        this.connectorLogEventStateMap.putIfAbsent(entry.getKey(), entry.getValue());
        return this.connectorLogEventStateMap.get(entry.getKey());
    }

    public LogEventState createIfAbsentTaskLogEventState(Map.Entry<String, LogEventState> entry) {
        this.taskLogEventStateMap.putIfAbsent(entry.getKey(), entry.getValue());
        return this.taskLogEventStateMap.get(entry.getKey());
    }

    public long logFailureEventResetTime() {
        return this.logEventsConfig.logFailureEventResetTime();
    }

    public boolean logEventsDeduplicateErrors() {
        return this.logEventsConfig.logEventsDeduplicateErrors().booleanValue();
    }

    public void start() {
        log.info("Worker starting");
        this.globalOffsetBackingStore.start();
        this.sourceTaskOffsetCommitter = this.config.exactlyOnceSourceEnabled() ? Optional.empty() : Optional.of(new SourceTaskOffsetCommitter(this.config));
        this.connectorStatusMetricsGroup = new ConnectorStatusMetricsGroup(this.metrics, this.tasks, this.herder);
        this.logEventsKafkaEmitter.start(this.config.originals());
        this.logEventsConfig = this.logEventsKafkaEmitter.logEventsConfig();
        if (this.logEventsConfig.logEventsDeduplicateErrors().booleanValue()) {
            this.logEventStateCleanupService = Executors.newSingleThreadScheduledExecutor();
            this.logEventStateCleanupService.scheduleAtFixedRate(cleanupLogEventState(), this.logEventsConfig.logFailureEventResetTime(), LogEventsConfig.LOG_EVENTS_FAILURE_EVENT_MAP_CLEANUP_TIME_DEFAULT.longValue(), TimeUnit.MILLISECONDS);
        }
        log.info("Worker started");
    }

    public void stop() {
        log.info("Worker stopping");
        long milliseconds = this.time.milliseconds() + this.config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG).longValue();
        if (!this.connectors.isEmpty()) {
            log.warn("Shutting down connectors {} uncleanly; herder should have shut down connectors before the Worker is stopped", this.connectors.keySet());
            stopAndAwaitConnectors();
        }
        if (!this.tasks.isEmpty()) {
            log.warn("Shutting down tasks {} uncleanly; herder should have shut down tasks before the Worker is stopped", this.tasks.keySet());
            stopAndAwaitTasks();
        }
        long milliseconds2 = milliseconds - this.time.milliseconds();
        this.sourceTaskOffsetCommitter.ifPresent(sourceTaskOffsetCommitter -> {
            sourceTaskOffsetCommitter.close(milliseconds2);
        });
        this.globalOffsetBackingStore.stop();
        this.metrics.stop();
        this.logEventsKafkaEmitter.stop();
        log.info("Worker stopped");
        this.workerMetricsGroup.close();
        this.connectorStatusMetricsGroup.close();
        this.workerConfigTransformer.close();
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                this.executor.shutdownNow();
                if (!this.executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                    log.error("Executor did not terminate in time");
                }
            }
        } catch (InterruptedException e) {
            this.executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (this.logEventsConfig.logEventsDeduplicateErrors().booleanValue()) {
            this.logEventStateCleanupService.shutdownNow();
        }
    }

    public void startConnector(String str, Map<String, String> map, CloseableConnectorContext closeableConnectorContext, ConnectorStatus.Listener listener, TargetState targetState, Callback<TargetState> callback) {
        ConnectorConfig connectorConfig;
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector;
        OffsetStorageReaderImpl offsetStorageReaderImpl;
        ConnectorStatus.Listener wrapStatusListener = this.workerMetricsGroup.wrapStatusListener(listener);
        LoggingContext forConnector = LoggingContext.forConnector(str);
        Throwable th = null;
        try {
            if (this.connectors.containsKey(str)) {
                callback.onCompletion(new ConnectException("Connector with name " + str + " already exists"), null);
                if (forConnector != null) {
                    if (0 == 0) {
                        forConnector.close();
                        return;
                    }
                    try {
                        forConnector.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            String str2 = map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
            ClassLoader connectorLoader = this.plugins.connectorLoader(str2);
            try {
                LoaderSwap withClassLoader = this.plugins.withClassLoader(connectorLoader);
                Throwable th3 = null;
                try {
                    try {
                        log.info("Creating connector {} of type {}", str, str2);
                        Connector newConnector = this.plugins.newConnector(str2);
                        Map<String, String> decorateConnectorConfig = configDecorator().decorateConnectorConfig(str, newConnector, newConnector.config(), map);
                        if (ConnectUtils.isSinkConnector(newConnector)) {
                            connectorConfig = new SinkConnectorConfig(this.plugins, decorateConnectorConfig);
                            offsetStorageReaderImpl = null;
                            offsetStoreForExactlyOnceSourceConnector = null;
                        } else {
                            SourceConnectorConfig sourceConnectorConfig = new SourceConnectorConfig(this.plugins, decorateConnectorConfig, this.config.topicCreationEnable());
                            connectorConfig = sourceConnectorConfig;
                            offsetStoreForExactlyOnceSourceConnector = this.config.exactlyOnceSourceEnabled() ? offsetStoreForExactlyOnceSourceConnector(sourceConnectorConfig, str, newConnector) : offsetStoreForRegularSourceConnector(sourceConnectorConfig, str, newConnector);
                            offsetStoreForExactlyOnceSourceConnector.configure(this.config);
                            offsetStorageReaderImpl = new OffsetStorageReaderImpl(offsetStoreForExactlyOnceSourceConnector, str, this.internalKeyConverter, this.internalValueConverter);
                        }
                        WorkerConnector workerConnector = new WorkerConnector(str, newConnector, connectorConfig, closeableConnectorContext, this.metrics, wrapStatusListener, offsetStorageReaderImpl, offsetStoreForExactlyOnceSourceConnector, connectorLoader);
                        if (this.config.taskStatusMetricsEnabled().booleanValue()) {
                            workerConnector.metrics().addHerderMetrics(this.herder);
                        }
                        log.info("Instantiated connector {} with version {} of type {}", str, newConnector.version(), newConnector.getClass());
                        workerConnector.transitionTo(targetState, callback);
                        if (withClassLoader != null) {
                            if (0 != 0) {
                                try {
                                    withClassLoader.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                withClassLoader.close();
                            }
                        }
                        if (this.connectors.putIfAbsent(str, workerConnector) != null) {
                            callback.onCompletion(new ConnectException("Connector with name " + str + " already exists"), null);
                            if (forConnector != null) {
                                if (0 == 0) {
                                    forConnector.close();
                                    return;
                                }
                                try {
                                    forConnector.close();
                                    return;
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                    return;
                                }
                            }
                            return;
                        }
                        this.executor.submit(this.plugins.withClassLoader(connectorLoader, workerConnector));
                        log.info("Finished creating connector {}", str);
                        if (forConnector != null) {
                            if (0 == 0) {
                                forConnector.close();
                                return;
                            }
                            try {
                                forConnector.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th3 = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (withClassLoader != null) {
                        if (th3 != null) {
                            try {
                                withClassLoader.close();
                            } catch (Throwable th9) {
                                th3.addSuppressed(th9);
                            }
                        } else {
                            withClassLoader.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                log.error("Failed to start connector {}", str, th10);
                wrapStatusListener.onFailure(str, th10);
                callback.onCompletion(th10, null);
                if (forConnector != null) {
                    if (0 == 0) {
                        forConnector.close();
                        return;
                    }
                    try {
                        forConnector.close();
                    } catch (Throwable th11) {
                        th.addSuppressed(th11);
                    }
                }
            }
        } catch (Throwable th12) {
            if (forConnector != null) {
                if (0 != 0) {
                    try {
                        forConnector.close();
                    } catch (Throwable th13) {
                        th.addSuppressed(th13);
                    }
                } else {
                    forConnector.close();
                }
            }
            throw th12;
        }
    }

    public boolean isSinkConnector(String str) {
        WorkerConnector workerConnector = this.connectors.get(str);
        if (workerConnector == null) {
            throw new ConnectException("Connector " + str + " not found in this worker.");
        }
        LoaderSwap withClassLoader = this.plugins.withClassLoader(workerConnector.loader());
        Throwable th = null;
        try {
            try {
                boolean isSinkConnector = workerConnector.isSinkConnector();
                if (withClassLoader != null) {
                    if (0 != 0) {
                        try {
                            withClassLoader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        withClassLoader.close();
                    }
                }
                return isSinkConnector;
            } finally {
            }
        } catch (Throwable th3) {
            if (withClassLoader != null) {
                if (th != null) {
                    try {
                        withClassLoader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    withClassLoader.close();
                }
            }
            throw th3;
        }
    }

    public List<Map<String, String>> connectorTaskConfigs(String str, ConnectorConfig connectorConfig) {
        ArrayList arrayList = new ArrayList();
        LoggingContext forConnector = LoggingContext.forConnector(str);
        Throwable th = null;
        try {
            log.trace("Reconfiguring connector tasks for {}", str);
            WorkerConnector workerConnector = this.connectors.get(str);
            if (workerConnector == null) {
                throw new ConnectException("Connector " + str + " not found in this worker.");
            }
            int intValue = connectorConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG).intValue();
            Map<String, String> originalsStrings = connectorConfig.originalsStrings();
            Connector connector = workerConnector.connector();
            LoaderSwap withClassLoader = this.plugins.withClassLoader(workerConnector.loader());
            Throwable th2 = null;
            try {
                try {
                    String name = connector.taskClass().getName();
                    Iterator<Map<String, String>> it = connector.taskConfigs(intValue).iterator();
                    while (it.hasNext()) {
                        HashMap hashMap = new HashMap(it.next());
                        hashMap.put(TaskConfig.TASK_CLASS_CONFIG, name);
                        if (originalsStrings.containsKey("topics")) {
                            hashMap.put("topics", originalsStrings.get("topics"));
                        }
                        if (originalsStrings.containsKey("topics.regex")) {
                            hashMap.put("topics.regex", originalsStrings.get("topics.regex"));
                        }
                        arrayList.add(hashMap);
                    }
                    if (withClassLoader != null) {
                        if (0 != 0) {
                            try {
                                withClassLoader.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            withClassLoader.close();
                        }
                    }
                    return arrayList;
                } finally {
                }
            } catch (Throwable th4) {
                if (withClassLoader != null) {
                    if (th2 != null) {
                        try {
                            withClassLoader.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        withClassLoader.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forConnector != null) {
                if (0 != 0) {
                    try {
                        forConnector.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forConnector.close();
                }
            }
        }
    }

    private void stopConnector(String str) {
        LoggingContext forConnector = LoggingContext.forConnector(str);
        Throwable th = null;
        try {
            WorkerConnector workerConnector = this.connectors.get(str);
            log.info("Stopping connector {}", str);
            if (workerConnector == null) {
                log.warn("Ignoring stop request for unowned connector {}", str);
                if (forConnector != null) {
                    if (0 == 0) {
                        forConnector.close();
                        return;
                    }
                    try {
                        forConnector.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            LoaderSwap withClassLoader = this.plugins.withClassLoader(workerConnector.loader());
            Throwable th3 = null;
            try {
                try {
                    workerConnector.shutdown();
                    if (withClassLoader != null) {
                        if (0 != 0) {
                            try {
                                withClassLoader.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            withClassLoader.close();
                        }
                    }
                    if (forConnector != null) {
                        if (0 == 0) {
                            forConnector.close();
                            return;
                        }
                        try {
                            forConnector.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th3 = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (withClassLoader != null) {
                    if (th3 != null) {
                        try {
                            withClassLoader.close();
                        } catch (Throwable th8) {
                            th3.addSuppressed(th8);
                        }
                    } else {
                        withClassLoader.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (forConnector != null) {
                if (0 != 0) {
                    try {
                        forConnector.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    forConnector.close();
                }
            }
            throw th9;
        }
    }

    private void stopConnectors(Collection<String> collection) {
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            stopConnector(it.next());
        }
    }

    private void awaitStopConnector(String str, long j) {
        LoggingContext forConnector = LoggingContext.forConnector(str);
        Throwable th = null;
        try {
            WorkerConnector remove = this.connectors.remove(str);
            if (remove == null) {
                log.warn("Ignoring await stop request for non-present connector {}", str);
                if (forConnector != null) {
                    if (0 == 0) {
                        forConnector.close();
                        return;
                    }
                    try {
                        forConnector.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            if (remove.awaitShutdown(j)) {
                log.debug("Graceful stop of connector {} succeeded.", str);
            } else {
                log.error("Connector '{}' failed to properly shut down, has become unresponsive, and may be consuming external resources. Correct the configuration for this connector or remove the connector. After fixing the connector, it may be necessary to restart this worker to release any consumed resources.", str);
                remove.cancel();
            }
            if (forConnector != null) {
                if (0 == 0) {
                    forConnector.close();
                    return;
                }
                try {
                    forConnector.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (forConnector != null) {
                if (0 != 0) {
                    try {
                        forConnector.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    forConnector.close();
                }
            }
            throw th4;
        }
    }

    private void awaitStopConnectors(Collection<String> collection) {
        long milliseconds = this.time.milliseconds() + CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS;
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            awaitStopConnector(it.next(), Math.max(0L, milliseconds - this.time.milliseconds()));
        }
    }

    public void stopAndAwaitConnectors() {
        stopAndAwaitConnectors(new ArrayList(this.connectors.keySet()));
    }

    public void stopAndAwaitConnectors(Collection<String> collection) {
        stopConnectors(collection);
        awaitStopConnectors(collection);
    }

    public void stopAndAwaitConnector(String str) {
        stopConnector(str);
        awaitStopConnectors(Collections.singletonList(str));
    }

    public Set<String> connectorNames() {
        return this.connectors.keySet();
    }

    public boolean isRunning(String str) {
        WorkerConnector workerConnector = this.connectors.get(str);
        return workerConnector != null && workerConnector.isRunning();
    }

    public boolean startSinkTask(ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, Map<String, String> map, Map<String, String> map2, TaskStatus.Listener listener, TargetState targetState) {
        HashMap hashMap = new HashMap();
        SinkTaskBuilder sinkTaskBuilder = new SinkTaskBuilder(connectorTaskId, clusterConfigState, listener, targetState);
        hashMap.getClass();
        return startTask(connectorTaskId, map, map2, listener, hashMap, sinkTaskBuilder.withCloseableResourcesRecorder((v1, v2) -> {
            r7.put(v1, v2);
        }));
    }

    public boolean startSourceTask(ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, Map<String, String> map, Map<String, String> map2, TaskStatus.Listener listener, TargetState targetState) {
        HashMap hashMap = new HashMap();
        SourceTaskBuilder sourceTaskBuilder = new SourceTaskBuilder(connectorTaskId, clusterConfigState, listener, targetState);
        hashMap.getClass();
        return startTask(connectorTaskId, map, map2, listener, hashMap, sourceTaskBuilder.withCloseableResourcesRecorder((v1, v2) -> {
            r7.put(v1, v2);
        }));
    }

    public boolean startExactlyOnceSourceTask(ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, Map<String, String> map, Map<String, String> map2, TaskStatus.Listener listener, TargetState targetState, Runnable runnable, Runnable runnable2) {
        return startTask(connectorTaskId, map, map2, listener, new HashMap(), new ExactlyOnceSourceTaskBuilder(connectorTaskId, clusterConfigState, listener, targetState, runnable, runnable2));
    }

    @VisibleForTesting
    WorkerTask buildSourceWorkerTask(ClusterConfigState clusterConfigState, ConnectorConfig connectorConfig, ConnectorTaskId connectorTaskId, Task task, TaskStatus.Listener listener, TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, ClassLoader classLoader, BiConsumer<AutoCloseable, String> biConsumer) {
        return buildWorkerTask(connectorConfig, task, converter, converter2, headerConverter, classLoader, biConsumer, new SourceTaskBuilder(connectorTaskId, clusterConfigState, listener, targetState).withCloseableResourcesRecorder(biConsumer));
    }

    @VisibleForTesting
    WorkerTask buildSinkWorkerTask(ClusterConfigState clusterConfigState, ConnectorConfig connectorConfig, ConnectorTaskId connectorTaskId, Task task, TaskStatus.Listener listener, TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, ClassLoader classLoader, BiConsumer<AutoCloseable, String> biConsumer) {
        return buildWorkerTask(connectorConfig, task, converter, converter2, headerConverter, classLoader, biConsumer, new SinkTaskBuilder(connectorTaskId, clusterConfigState, listener, targetState).withCloseableResourcesRecorder(biConsumer));
    }

    WorkerTask buildWorkerTask(ConnectorConfig connectorConfig, Task task, Converter converter, Converter converter2, HeaderConverter headerConverter, ClassLoader classLoader, BiConsumer<AutoCloseable, String> biConsumer, TaskBuilder taskBuilder) {
        return taskBuilder.withTask(task).withConnectorConfig(connectorConfig).withKeyConverter(converter).withValueConverter(converter2).withHeaderConverter(headerConverter).withClassloader(classLoader).withCloseableResourcesRecorder(biConsumer).build();
    }

    @VisibleForTesting
    boolean startTask(ConnectorTaskId connectorTaskId, Map<String, String> map, Map<String, String> map2, TaskStatus.Listener listener, Map<AutoCloseable, String> map3, TaskBuilder taskBuilder) {
        TaskStatus.Listener wrapStatusListener = this.workerMetricsGroup.wrapStatusListener(listener);
        LoggingContext forTask = LoggingContext.forTask(connectorTaskId);
        Throwable th = null;
        try {
            log.info("Creating task {}", connectorTaskId);
            if (this.tasks.containsKey(connectorTaskId)) {
                throw new ConnectException("Task already exists in this worker: " + connectorTaskId);
            }
            this.connectorStatusMetricsGroup.recordTaskAdded(connectorTaskId);
            ClassLoader connectorLoader = this.plugins.connectorLoader(map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
            try {
                LoaderSwap withClassLoader = this.plugins.withClassLoader(connectorLoader);
                Throwable th2 = null;
                try {
                    try {
                        ConnectorConfig connectorConfig = new ConnectorConfig(this.plugins, map);
                        TaskConfig taskConfig = new TaskConfig(map2);
                        Class<? extends U> asSubclass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
                        Task newTask = this.plugins.newTask(asSubclass);
                        log.info("Instantiated task {} with version {} of type {}", connectorTaskId, newTask.version(), asSubclass.getName());
                        Converter newConverter = this.plugins.newConverter(connectorConfig, "key.converter", Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
                        Converter newConverter2 = this.plugins.newConverter(connectorConfig, "value.converter", Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
                        HeaderConverter newHeaderConverter = this.plugins.newHeaderConverter(connectorConfig, "header.converter", Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
                        map3.put(newHeaderConverter, "header converter for task " + connectorTaskId);
                        if (newConverter == null) {
                            newConverter = this.plugins.newConverter(this.config, "key.converter", Plugins.ClassLoaderUsage.PLUGINS);
                            log.info("Set up the key converter {} for task {} using the worker config", newConverter.getClass(), connectorTaskId);
                        } else {
                            log.info("Set up the key converter {} for task {} using the connector config", newConverter.getClass(), connectorTaskId);
                        }
                        if (newConverter2 == null) {
                            newConverter2 = this.plugins.newConverter(this.config, "value.converter", Plugins.ClassLoaderUsage.PLUGINS);
                            log.info("Set up the value converter {} for task {} using the worker config", newConverter2.getClass(), connectorTaskId);
                        } else {
                            log.info("Set up the value converter {} for task {} using the connector config", newConverter2.getClass(), connectorTaskId);
                        }
                        if (newHeaderConverter == null) {
                            newHeaderConverter = this.plugins.newHeaderConverter(this.config, "header.converter", Plugins.ClassLoaderUsage.PLUGINS);
                            map3.put(newHeaderConverter, "header converter for task " + connectorTaskId);
                            log.info("Set up the header converter {} for task {} using the worker config", newHeaderConverter.getClass(), connectorTaskId);
                        } else {
                            log.info("Set up the header converter {} for task {} using the connector config", newHeaderConverter.getClass(), connectorTaskId);
                        }
                        map3.getClass();
                        WorkerTask buildWorkerTask = buildWorkerTask(connectorConfig, newTask, newConverter, newConverter2, newHeaderConverter, connectorLoader, (v1, v2) -> {
                            r7.put(v1, v2);
                        }, taskBuilder);
                        buildWorkerTask.initialize(taskConfig);
                        if (withClassLoader != null) {
                            if (0 != 0) {
                                try {
                                    withClassLoader.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                withClassLoader.close();
                            }
                        }
                        if (this.tasks.putIfAbsent(connectorTaskId, buildWorkerTask) != null) {
                            buildWorkerTask.getClass();
                            Utils.closeQuietly(buildWorkerTask::doClose, "duplicate instance of task " + connectorTaskId);
                            throw new ConnectException("Task already exists in this worker: " + connectorTaskId);
                        }
                        this.executor.submit(this.plugins.withClassLoader(connectorLoader, buildWorkerTask));
                        if (buildWorkerTask instanceof WorkerSourceTask) {
                            this.sourceTaskOffsetCommitter.ifPresent(sourceTaskOffsetCommitter -> {
                                sourceTaskOffsetCommitter.schedule(connectorTaskId, (WorkerSourceTask) buildWorkerTask);
                            });
                        }
                        return true;
                    } finally {
                    }
                } catch (Throwable th4) {
                    if (withClassLoader != null) {
                        if (th2 != null) {
                            try {
                                withClassLoader.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            withClassLoader.close();
                        }
                    }
                    throw th4;
                }
            } catch (Throwable th6) {
                log.error("Failed to start task {}", connectorTaskId, th6);
                this.connectorStatusMetricsGroup.recordTaskRemoved(connectorTaskId);
                wrapStatusListener.onFailure(connectorTaskId, th6);
                map3.forEach(Utils::closeQuietly);
                if (forTask != null) {
                    if (0 != 0) {
                        try {
                            forTask.close();
                        } catch (Throwable th7) {
                            th.addSuppressed(th7);
                        }
                    } else {
                        forTask.close();
                    }
                }
                return false;
            }
        } finally {
            if (forTask != null) {
                if (0 != 0) {
                    try {
                        forTask.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    forTask.close();
                }
            }
        }
    }

    public KafkaFuture<Void> fenceZombies(String str, int i, Map<String, String> map) {
        return fenceZombies(str, i, map, Admin::create);
    }

    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r19v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 18, insn: 0x016b: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:42:0x016b */
    /* JADX WARN: Not initialized variable reg: 19, insn: 0x0170: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r19 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:44:0x0170 */
    /* JADX WARN: Type inference failed for: r18v0, types: [org.apache.kafka.connect.runtime.isolation.LoaderSwap] */
    /* JADX WARN: Type inference failed for: r19v0, types: [java.lang.Throwable] */
    KafkaFuture<Void> fenceZombies(String str, int i, Map<String, String> map, Function<Map<String, Object>, Admin> function) {
        ?? r18;
        ?? r19;
        log.debug("Fencing out {} task producers for source connector {}", Integer.valueOf(i), str);
        LoggingContext forConnector = LoggingContext.forConnector(str);
        Throwable th = null;
        try {
            try {
                LoaderSwap withClassLoader = this.plugins.withClassLoader(this.plugins.connectorLoader(map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
                Throwable th2 = null;
                SourceConnectorConfig sourceConnectorConfig = new SourceConnectorConfig(this.plugins, map, this.config.topicCreationEnable());
                Admin apply = function.apply(adminConfigs(str, "connector-worker-adminclient-" + str, this.config, sourceConnectorConfig, this.plugins.connectorClass(sourceConnectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), this.connectorClientConfigOverridePolicy, this.kafkaClusterId, ConnectorType.SOURCE));
                try {
                    KafkaFuture<Void> whenComplete = apply.fenceProducers((Collection) IntStream.range(0, i).mapToObj(i2 -> {
                        return new ConnectorTaskId(str, i2);
                    }).map(this::taskTransactionalId).collect(Collectors.toList()), new FenceProducersOptions().timeoutMs(Integer.valueOf((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS))).all().whenComplete((r8, th3) -> {
                        if (th3 == null) {
                            log.debug("Finished fencing out {} task producers for source connector {}", Integer.valueOf(i), str);
                        }
                        Utils.closeQuietly(apply, "Zombie fencing admin for connector " + str);
                    });
                    if (withClassLoader != null) {
                        if (0 != 0) {
                            try {
                                withClassLoader.close();
                            } catch (Throwable th4) {
                                th2.addSuppressed(th4);
                            }
                        } else {
                            withClassLoader.close();
                        }
                    }
                    return whenComplete;
                } catch (Exception e) {
                    Utils.closeQuietly(apply, "Zombie fencing admin for connector " + str);
                    throw e;
                }
            } catch (Throwable th5) {
                if (r18 != 0) {
                    if (r19 != 0) {
                        try {
                            r18.close();
                        } catch (Throwable th6) {
                            r19.addSuppressed(th6);
                        }
                    } else {
                        r18.close();
                    }
                }
                throw th5;
            }
        } finally {
            if (forConnector != null) {
                if (0 != 0) {
                    try {
                        forConnector.close();
                    } catch (Throwable th7) {
                        th.addSuppressed(th7);
                    }
                } else {
                    forConnector.close();
                }
            }
        }
    }

    static Map<String, Object> exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId connectorTaskId, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str) {
        Map<String, Object> baseProducerConfigs = baseProducerConfigs(connectorTaskId, "connector-producer-" + connectorTaskId, workerConfig, connectorConfig, cls, connectorClientConfigOverridePolicy, str);
        if (!connectorConfig.originals().containsKey("producer.override.enable.idempotence") && !workerConfig.originals().containsKey("producer.enable.idempotence")) {
            baseProducerConfigs.remove(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
        }
        ConnectUtils.ensureProperty(baseProducerConfigs, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, ConfluentConfigs.AUDIT_LOGGER_ENABLE_DEFAULT, "for connectors when exactly-once source support is enabled", false);
        ConnectUtils.ensureProperty(baseProducerConfigs, ProducerConfig.TRANSACTIONAL_ID_CONFIG, taskTransactionalId(workerConfig.groupId(), connectorTaskId.connector(), connectorTaskId.task()), "for connectors when exactly-once source support is enabled", true);
        return baseProducerConfigs;
    }

    static Map<String, Object> baseProducerConfigs(String str, String str2, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str3) {
        return baseProducerConfigs(str, null, str2, workerConfig, connectorConfig, cls, connectorClientConfigOverridePolicy, str3);
    }

    static Map<String, Object> baseProducerConfigs(ConnectorTaskId connectorTaskId, String str, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str2) {
        return baseProducerConfigs(connectorTaskId.connector(), connectorTaskId, str, workerConfig, connectorConfig, cls, connectorClientConfigOverridePolicy, str2);
    }

    static Map<String, Object> baseProducerConfigs(String str, ConnectorTaskId connectorTaskId, String str2, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str3) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", workerConfig.bootstrapServers());
        hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
        hashMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
        hashMap.put(ProducerConfig.ACKS_CONFIG, ConfluentConfigs.CHECKSUM_ENABLED_FILES_ALL);
        hashMap.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        hashMap.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
        hashMap.put("client.id", str2);
        hashMap.putAll(workerConfig.originalsWithPrefix(KafkaExporterConfig.PREFIX_PRODUCER));
        hashMap.putAll(telemetryReporterConfig(workerConfig));
        ConnectUtils.addMetricsContextProperties(hashMap, workerConfig, str3, connectorTaskId);
        ConnectUtils.addConfluentMetricsContextProperties(hashMap);
        hashMap.putAll(connectorClientConfigOverrides(str, connectorConfig, cls, ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX, ConnectorType.SOURCE, ConnectorClientConfigRequest.ClientType.PRODUCER, connectorClientConfigOverridePolicy));
        return hashMap;
    }

    static Map<String, Object> exactlyOnceSourceOffsetsConsumerConfigs(String str, String str2, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str3) {
        Map<String, Object> baseConsumerConfigs = baseConsumerConfigs(str, str2, workerConfig, connectorConfig, cls, connectorClientConfigOverridePolicy, str3, ConnectorType.SOURCE);
        ConnectUtils.ensureProperty(baseConsumerConfigs, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT), "for source connectors' offset consumers when exactly-once source support is enabled", false);
        return baseConsumerConfigs;
    }

    static Map<String, Object> regularSourceOffsetsConsumerConfigs(String str, String str2, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str3) {
        Map<String, Object> baseConsumerConfigs = baseConsumerConfigs(str, str2, workerConfig, connectorConfig, cls, connectorClientConfigOverridePolicy, str3, ConnectorType.SOURCE);
        baseConsumerConfigs.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        return baseConsumerConfigs;
    }

    static Map<String, Object> baseConsumerConfigs(String str, String str2, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str3, ConnectorType connectorType) {
        return baseConsumerConfigs(str, null, str2, workerConfig, connectorConfig, cls, connectorClientConfigOverridePolicy, str3, connectorType);
    }

    static Map<String, Object> baseConsumerConfigs(ConnectorTaskId connectorTaskId, String str, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str2, ConnectorType connectorType) {
        return baseConsumerConfigs(connectorTaskId.connector(), connectorTaskId, str, workerConfig, connectorConfig, cls, connectorClientConfigOverridePolicy, str2, connectorType);
    }

    static Map<String, Object> baseConsumerConfigs(String str, ConnectorTaskId connectorTaskId, String str2, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str3, ConnectorType connectorType) {
        HashMap hashMap = new HashMap();
        hashMap.put("group.id", SinkUtils.consumerGroupId(str));
        hashMap.put("client.id", str2);
        hashMap.put("bootstrap.servers", workerConfig.bootstrapServers());
        hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        hashMap.putAll(workerConfig.originalsWithPrefix("consumer."));
        hashMap.putAll(telemetryReporterConfig(workerConfig));
        ConnectUtils.addMetricsContextProperties(hashMap, workerConfig, str3, connectorTaskId);
        ConnectUtils.addConfluentMetricsContextProperties(hashMap);
        hashMap.putAll(connectorClientConfigOverrides(str, connectorConfig, cls, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, connectorType, ConnectorClientConfigRequest.ClientType.CONSUMER, connectorClientConfigOverridePolicy));
        return hashMap;
    }

    static Map<String, Object> adminConfigs(String str, String str2, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str3, ConnectorType connectorType) {
        return adminConfigs(str, null, str2, workerConfig, connectorConfig, cls, connectorClientConfigOverridePolicy, str3, connectorType);
    }

    static Map<String, Object> adminConfigs(ConnectorTaskId connectorTaskId, String str, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str2, ConnectorType connectorType) {
        return adminConfigs(connectorTaskId.connector(), connectorTaskId, str, workerConfig, connectorConfig, cls, connectorClientConfigOverridePolicy, str2, connectorType);
    }

    private static Map<String, Object> adminConfigs(String str, ConnectorTaskId connectorTaskId, String str2, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String str3, ConnectorType connectorType) {
        HashMap hashMap = new HashMap();
        Map map = (Map) workerConfig.originals().entrySet().stream().filter(entry -> {
            return (((String) entry.getKey()).startsWith("admin.") || ((String) entry.getKey()).startsWith(KafkaExporterConfig.PREFIX_PRODUCER) || ((String) entry.getKey()).startsWith("consumer.")) ? false : true;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        hashMap.put("bootstrap.servers", workerConfig.bootstrapServers());
        hashMap.put("client.id", str2);
        hashMap.putAll(map);
        hashMap.putAll(workerConfig.originalsWithPrefix("admin."));
        hashMap.putAll(telemetryReporterConfig(workerConfig));
        hashMap.putAll(connectorClientConfigOverrides(str, connectorConfig, cls, ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX, connectorType, ConnectorClientConfigRequest.ClientType.ADMIN, connectorClientConfigOverridePolicy));
        ConnectUtils.addMetricsContextProperties(hashMap, workerConfig, str3, connectorTaskId);
        ConnectUtils.addConfluentMetricsContextProperties(hashMap);
        return hashMap;
    }

    private static Map<String, Object> telemetryReporterConfig(WorkerConfig workerConfig) {
        HashMap hashMap = new HashMap();
        if (workerConfig.originals().containsKey("metric.reporters")) {
            hashMap.put("metric.reporters", workerConfig.originals().get("metric.reporters"));
            hashMap.putAll(workerConfig.originalsWithPrefix(WorkerConfig.CONFLUENT_TELEMETRY_PREFIX, false));
        }
        return hashMap;
    }

    private static Map<String, Object> connectorClientConfigOverrides(String str, ConnectorConfig connectorConfig, Class<? extends Connector> cls, String str2, ConnectorType connectorType, ConnectorClientConfigRequest.ClientType clientType, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        Map<String, Object> originalsWithPrefix = connectorConfig.originalsWithPrefix(str2);
        List list = (List) connectorClientConfigOverridePolicy.validate(new ConnectorClientConfigRequest(str, connectorType, cls, originalsWithPrefix, clientType)).stream().filter(configValue -> {
            return configValue.errorMessages().size() > 0;
        }).collect(Collectors.toList());
        if (list.size() > 0) {
            throw new ConnectException("Client Config Overrides not allowed " + list);
        }
        return originalsWithPrefix;
    }

    private String taskTransactionalId(ConnectorTaskId connectorTaskId) {
        return taskTransactionalId(this.config.groupId(), connectorTaskId.connector(), connectorTaskId.task());
    }

    public static String taskTransactionalId(String str, String str2, int i) {
        return String.format("%s-%s-%d", str, str2, Integer.valueOf(i));
    }

    ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId connectorTaskId) {
        return new ErrorHandlingMetrics(connectorTaskId, this.metrics);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<ErrorReporter> sinkTaskReporters(ConnectorTaskId connectorTaskId, SinkConnectorConfig sinkConnectorConfig, ErrorHandlingMetrics errorHandlingMetrics, Class<? extends Connector> cls) {
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.add(new LogReporter(connectorTaskId, sinkConnectorConfig, errorHandlingMetrics));
            String dlqTopicName = sinkConnectorConfig.dlqTopicName();
            if (dlqTopicName != null && !dlqTopicName.isEmpty()) {
                arrayList.add(DeadLetterQueueReporter.createAndSetup(adminConfigs(connectorTaskId, "connector-dlq-adminclient-", this.config, sinkConnectorConfig, cls, this.connectorClientConfigOverridePolicy, this.kafkaClusterId, ConnectorType.SINK), connectorTaskId, sinkConnectorConfig, baseProducerConfigs(connectorTaskId, "connector-dlq-producer-" + connectorTaskId, this.config, sinkConnectorConfig, cls, this.connectorClientConfigOverridePolicy, this.kafkaClusterId), errorHandlingMetrics));
            }
            return arrayList;
        } catch (Throwable th) {
            arrayList.forEach(errorReporter -> {
                Utils.closeQuietly(errorReporter, "reporter " + errorReporter + " for task " + connectorTaskId);
            });
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<ErrorReporter> sourceTaskReporters(ConnectorTaskId connectorTaskId, ConnectorConfig connectorConfig, ErrorHandlingMetrics errorHandlingMetrics) {
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.add(new LogReporter(connectorTaskId, connectorConfig, errorHandlingMetrics));
            return arrayList;
        } catch (Throwable th) {
            arrayList.forEach(errorReporter -> {
                Utils.closeQuietly(errorReporter, "reporter " + errorReporter + " for task " + connectorTaskId);
            });
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public WorkerErrantRecordReporter createWorkerErrantRecordReporter(SinkConnectorConfig sinkConnectorConfig, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter, Converter converter2, HeaderConverter headerConverter, Optional<Tracer> optional) {
        if (sinkConnectorConfig.enableErrantRecordReporter()) {
            return optional.isPresent() ? new TraceWorkerErrantRecordReporter(retryWithToleranceOperator, converter, converter2, headerConverter, optional.get()) : new WorkerErrantRecordReporter(retryWithToleranceOperator, converter, converter2, headerConverter);
        }
        return null;
    }

    private void stopTask(ConnectorTaskId connectorTaskId) {
        LoggingContext forTask = LoggingContext.forTask(connectorTaskId);
        Throwable th = null;
        try {
            WorkerTask workerTask = this.tasks.get(connectorTaskId);
            if (workerTask == null) {
                log.warn("Ignoring stop request for unowned task {}", connectorTaskId);
                if (forTask != null) {
                    if (0 == 0) {
                        forTask.close();
                        return;
                    }
                    try {
                        forTask.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            log.info("Stopping task {}", workerTask.id());
            if (workerTask instanceof WorkerSourceTask) {
                this.sourceTaskOffsetCommitter.ifPresent(sourceTaskOffsetCommitter -> {
                    sourceTaskOffsetCommitter.remove(workerTask.id());
                });
            }
            LoaderSwap withClassLoader = this.plugins.withClassLoader(workerTask.loader());
            Throwable th3 = null;
            try {
                workerTask.stop();
                if (withClassLoader != null) {
                    if (0 != 0) {
                        try {
                            withClassLoader.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        withClassLoader.close();
                    }
                }
                if (forTask != null) {
                    if (0 == 0) {
                        forTask.close();
                        return;
                    }
                    try {
                        forTask.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            } catch (Throwable th6) {
                if (withClassLoader != null) {
                    if (0 != 0) {
                        try {
                            withClassLoader.close();
                        } catch (Throwable th7) {
                            th3.addSuppressed(th7);
                        }
                    } else {
                        withClassLoader.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (forTask != null) {
                if (0 != 0) {
                    try {
                        forTask.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    forTask.close();
                }
            }
            throw th8;
        }
    }

    private void stopTasks(Collection<ConnectorTaskId> collection) {
        Iterator<ConnectorTaskId> it = collection.iterator();
        while (it.hasNext()) {
            stopTask(it.next());
        }
    }

    private void awaitStopTask(ConnectorTaskId connectorTaskId, long j) {
        LoggingContext forTask = LoggingContext.forTask(connectorTaskId);
        Throwable th = null;
        try {
            WorkerTask remove = this.tasks.remove(connectorTaskId);
            if (remove == null) {
                log.warn("Ignoring await stop request for non-present task {}", connectorTaskId);
                if (forTask != null) {
                    if (0 == 0) {
                        forTask.close();
                        return;
                    }
                    try {
                        forTask.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            if (remove.awaitStop(j)) {
                log.debug("Graceful stop of task {} succeeded.", remove.id());
            } else {
                log.error("Graceful stop of task {} failed.", remove.id());
                remove.cancel();
            }
            try {
                remove.removeMetrics();
                this.connectorStatusMetricsGroup.recordTaskRemoved(connectorTaskId);
                if (forTask != null) {
                    if (0 == 0) {
                        forTask.close();
                        return;
                    }
                    try {
                        forTask.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                this.connectorStatusMetricsGroup.recordTaskRemoved(connectorTaskId);
                throw th4;
            }
        } catch (Throwable th5) {
            if (forTask != null) {
                if (0 != 0) {
                    try {
                        forTask.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTask.close();
                }
            }
            throw th5;
        }
    }

    private void awaitStopTasks(Collection<ConnectorTaskId> collection) {
        long milliseconds = this.time.milliseconds() + this.config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG).longValue();
        Iterator<ConnectorTaskId> it = collection.iterator();
        while (it.hasNext()) {
            awaitStopTask(it.next(), Math.max(0L, milliseconds - this.time.milliseconds()));
        }
    }

    private Runnable cleanupLogEventState() {
        return () -> {
            List<String> list = (List) this.connectorLogEventStateMap.entrySet().stream().filter(entry -> {
                return ((LogEventState) entry.getValue()).shouldReset();
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList());
            log.debug("Potentially cleaning up LogEventState for connectorIds: {}", list);
            cleanupLogEventStateEntries(this.connectorLogEventStateMap, list);
            List<String> list2 = (List) this.taskLogEventStateMap.entrySet().stream().filter(entry2 -> {
                return ((LogEventState) entry2.getValue()).shouldReset();
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList());
            log.debug("Potentially cleaning up LogEventState for taskIds: {}", list2);
            cleanupLogEventStateEntries(this.taskLogEventStateMap, list2);
        };
    }

    private void cleanupLogEventStateEntries(Map<String, LogEventState> map, List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            map.computeIfPresent(it.next(), (str, logEventState) -> {
                if (logEventState.shouldReset()) {
                    return null;
                }
                return logEventState;
            });
        }
    }

    public void stopAndAwaitTasks() {
        stopAndAwaitTasks(new ArrayList(this.tasks.keySet()));
    }

    public void stopAndAwaitTasks(Collection<ConnectorTaskId> collection) {
        stopTasks(collection);
        awaitStopTasks(collection);
    }

    public void stopAndAwaitTask(ConnectorTaskId connectorTaskId) {
        stopTask(connectorTaskId);
        awaitStopTasks(Collections.singletonList(connectorTaskId));
    }

    public Set<ConnectorTaskId> taskIds() {
        return this.tasks.keySet();
    }

    public Converter getInternalKeyConverter() {
        return this.internalKeyConverter;
    }

    public Converter getInternalValueConverter() {
        return this.internalValueConverter;
    }

    public Plugins getPlugins() {
        return this.plugins;
    }

    public String workerId() {
        return this.workerId;
    }

    public boolean isTopicCreationEnabled() {
        return this.config.topicCreationEnable();
    }

    public ConnectMetrics metrics() {
        return this.metrics;
    }

    public void setTargetState(String str, TargetState targetState, Callback<TargetState> callback) {
        LoaderSwap withClassLoader;
        log.info("Setting connector {} state to {}", str, targetState);
        WorkerConnector workerConnector = this.connectors.get(str);
        if (workerConnector != null) {
            withClassLoader = this.plugins.withClassLoader(workerConnector.loader());
            Throwable th = null;
            try {
                try {
                    workerConnector.transitionTo(targetState, callback);
                    if (withClassLoader != null) {
                        if (0 != 0) {
                            try {
                                withClassLoader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            withClassLoader.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : this.tasks.entrySet()) {
            if (entry.getKey().connector().equals(str)) {
                WorkerTask value = entry.getValue();
                withClassLoader = this.plugins.withClassLoader(value.loader());
                Throwable th3 = null;
                try {
                    try {
                        value.transitionTo(targetState);
                        if (withClassLoader != null) {
                            if (0 != 0) {
                                try {
                                    withClassLoader.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                withClassLoader.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            }
        }
    }

    public void connectorOffsets(String str, Map<String, String> map, Callback<ConnectorOffsets> callback) {
        String str2 = map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
        LoaderSwap withClassLoader = this.plugins.withClassLoader(this.plugins.connectorLoader(str2));
        Throwable th = null;
        try {
            Connector newConnector = this.plugins.newConnector(str2);
            if (ConnectUtils.isSinkConnector(newConnector)) {
                log.debug("Fetching offsets for sink connector: {}", str);
                sinkConnectorOffsets(str, newConnector, map, callback);
            } else {
                log.debug("Fetching offsets for source connector: {}", str);
                sourceConnectorOffsets(str, newConnector, map, callback);
            }
            if (withClassLoader != null) {
                if (0 == 0) {
                    withClassLoader.close();
                    return;
                }
                try {
                    withClassLoader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (withClassLoader != null) {
                if (0 != 0) {
                    try {
                        withClassLoader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    withClassLoader.close();
                }
            }
            throw th3;
        }
    }

    private void sinkConnectorOffsets(String str, Connector connector, Map<String, String> map, Callback<ConnectorOffsets> callback) {
        sinkConnectorOffsets(str, connector, map, callback, Admin::create);
    }

    void sinkConnectorOffsets(String str, Connector connector, Map<String, String> map, Callback<ConnectorOffsets> callback, Function<Map<String, Object>, Admin> function) {
        Map<String, Object> adminConfigs = adminConfigs(str, "connector-worker-adminclient-" + str, this.config, (ConnectorConfig) new SinkConnectorConfig(this.plugins, map), (Class<? extends Connector>) connector.getClass(), this.connectorClientConfigOverridePolicy, this.kafkaClusterId, ConnectorType.SINK);
        String str2 = (String) baseConsumerConfigs(str, "connector-consumer-", this.config, (ConnectorConfig) new SinkConnectorConfig(this.plugins, map), (Class<? extends Connector>) connector.getClass(), this.connectorClientConfigOverridePolicy, this.kafkaClusterId, ConnectorType.SINK).get("group.id");
        Admin apply = function.apply(adminConfigs);
        try {
            apply.listConsumerGroupOffsets(str2, new ListConsumerGroupOffsetsOptions().timeoutMs(Integer.valueOf((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS))).partitionsToOffsetAndMetadata().whenComplete((map2, th) -> {
                if (th != null) {
                    log.error("Failed to retrieve consumer group offsets for sink connector {}", str, th);
                    callback.onCompletion(new ConnectException("Failed to retrieve consumer group offsets for sink connector " + str, th), null);
                } else {
                    callback.onCompletion(null, SinkUtils.consumerGroupOffsetsToConnectorOffsets(map2));
                }
                Utils.closeQuietly(apply, "Offset fetch admin for sink connector " + str);
            });
        } catch (Throwable th2) {
            Utils.closeQuietly(apply, "Offset fetch admin for sink connector " + str);
            callback.onCompletion(new ConnectException("Failed to retrieve consumer group offsets for sink connector " + str, th2), null);
        }
    }

    private void sourceConnectorOffsets(String str, Connector connector, Map<String, String> map, Callback<ConnectorOffsets> callback) {
        SourceConnectorConfig sourceConnectorConfig = new SourceConnectorConfig(this.plugins, map, this.config.topicCreationEnable());
        ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector = this.config.exactlyOnceSourceEnabled() ? offsetStoreForExactlyOnceSourceConnector(sourceConnectorConfig, str, connector) : offsetStoreForRegularSourceConnector(sourceConnectorConfig, str, connector);
        sourceConnectorOffsets(str, offsetStoreForExactlyOnceSourceConnector, new OffsetStorageReaderImpl(offsetStoreForExactlyOnceSourceConnector, str, this.internalKeyConverter, this.internalValueConverter), callback);
    }

    void sourceConnectorOffsets(String str, ConnectorOffsetBackingStore connectorOffsetBackingStore, CloseableOffsetStorageReader closeableOffsetStorageReader, Callback<ConnectorOffsets> callback) {
        this.executor.submit(() -> {
            try {
                try {
                    connectorOffsetBackingStore.configure(this.config);
                    connectorOffsetBackingStore.start();
                    callback.onCompletion(null, new ConnectorOffsets((List) closeableOffsetStorageReader.offsets(connectorOffsetBackingStore.connectorPartitions(str)).entrySet().stream().map(entry -> {
                        return new ConnectorOffset((Map) entry.getKey(), (Map) entry.getValue());
                    }).collect(Collectors.toList())));
                    Utils.closeQuietly(closeableOffsetStorageReader, "Offset reader for connector " + str);
                    connectorOffsetBackingStore.getClass();
                    Utils.closeQuietly(connectorOffsetBackingStore::stop, "Offset store for connector " + str);
                } catch (Throwable th) {
                    callback.onCompletion(th, null);
                    Utils.closeQuietly(closeableOffsetStorageReader, "Offset reader for connector " + str);
                    connectorOffsetBackingStore.getClass();
                    Utils.closeQuietly(connectorOffsetBackingStore::stop, "Offset store for connector " + str);
                }
            } catch (Throwable th2) {
                Utils.closeQuietly(closeableOffsetStorageReader, "Offset reader for connector " + str);
                connectorOffsetBackingStore.getClass();
                Utils.closeQuietly(connectorOffsetBackingStore::stop, "Offset store for connector " + str);
                throw th2;
            }
        });
    }

    ConnectorStatusMetricsGroup connectorStatusMetricsGroup() {
        return this.connectorStatusMetricsGroup;
    }

    WorkerMetricsGroup workerMetricsGroup() {
        return this.workerMetricsGroup;
    }

    ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector(SourceConnectorConfig sourceConnectorConfig, String str, Connector connector) {
        String offsetsTopic = sourceConnectorConfig.offsetsTopic();
        Map<String, Object> baseProducerConfigs = baseProducerConfigs(str, "connector-producer-" + str, this.config, (ConnectorConfig) sourceConnectorConfig, (Class<? extends Connector>) connector.getClass(), this.connectorClientConfigOverridePolicy, this.kafkaClusterId);
        if (!(offsetsTopic != null && this.config.connectorOffsetsTopicsPermitted())) {
            return ConnectorOffsetBackingStore.withOnlyWorkerStore(() -> {
                return LoggingContext.forConnector(str);
            }, this.globalOffsetBackingStore, this.config.offsetsTopic());
        }
        KafkaConsumer kafkaConsumer = new KafkaConsumer(regularSourceOffsetsConsumerConfigs(str, "connector-consumer-" + str, this.config, sourceConnectorConfig, connector.getClass(), this.connectorClientConfigOverridePolicy, this.kafkaClusterId));
        TopicAdmin topicAdmin = new TopicAdmin(adminConfigs(str, "connector-adminclient-" + str, this.config, (ConnectorConfig) sourceConnectorConfig, (Class<? extends Connector>) connector.getClass(), this.connectorClientConfigOverridePolicy, this.kafkaClusterId, ConnectorType.SOURCE));
        KafkaOffsetBackingStore forConnector = KafkaOffsetBackingStore.forConnector(offsetsTopic, kafkaConsumer, topicAdmin, this.internalKeyConverter);
        return sameOffsetTopicAsWorker(offsetsTopic, baseProducerConfigs) ? ConnectorOffsetBackingStore.withOnlyConnectorStore(() -> {
            return LoggingContext.forConnector(str);
        }, forConnector, offsetsTopic, topicAdmin) : ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector(str);
        }, this.globalOffsetBackingStore, forConnector, offsetsTopic, topicAdmin);
    }

    ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector(SourceConnectorConfig sourceConnectorConfig, String str, Connector connector) {
        String str2 = (String) Optional.ofNullable(sourceConnectorConfig.offsetsTopic()).orElse(this.config.offsetsTopic());
        Map<String, Object> baseProducerConfigs = baseProducerConfigs(str, "connector-producer-" + str, this.config, (ConnectorConfig) sourceConnectorConfig, (Class<? extends Connector>) connector.getClass(), this.connectorClientConfigOverridePolicy, this.kafkaClusterId);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(exactlyOnceSourceOffsetsConsumerConfigs(str, "connector-consumer-" + str, this.config, sourceConnectorConfig, connector.getClass(), this.connectorClientConfigOverridePolicy, this.kafkaClusterId));
        TopicAdmin topicAdmin = new TopicAdmin(adminConfigs(str, "connector-adminclient-" + str, this.config, (ConnectorConfig) sourceConnectorConfig, (Class<? extends Connector>) connector.getClass(), this.connectorClientConfigOverridePolicy, this.kafkaClusterId, ConnectorType.SOURCE));
        KafkaOffsetBackingStore forConnector = KafkaOffsetBackingStore.forConnector(str2, kafkaConsumer, topicAdmin, this.internalKeyConverter);
        return sameOffsetTopicAsWorker(str2, baseProducerConfigs) ? ConnectorOffsetBackingStore.withOnlyConnectorStore(() -> {
            return LoggingContext.forConnector(str);
        }, forConnector, str2, topicAdmin) : ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector(str);
        }, this.globalOffsetBackingStore, forConnector, str2, topicAdmin);
    }

    ConnectorOffsetBackingStore offsetStoreForRegularSourceTask(ConnectorTaskId connectorTaskId, SourceConnectorConfig sourceConnectorConfig, Class<? extends Connector> cls, Producer<byte[], byte[]> producer, Map<String, Object> map, TopicAdmin topicAdmin) {
        String offsetsTopic = sourceConnectorConfig.offsetsTopic();
        if (!regularSourceTaskUsesConnectorSpecificOffsetsStore(sourceConnectorConfig)) {
            return ConnectorOffsetBackingStore.withOnlyWorkerStore(() -> {
                return LoggingContext.forTask(connectorTaskId);
            }, this.globalOffsetBackingStore, this.config.offsetsTopic());
        }
        Objects.requireNonNull(topicAdmin, "Source tasks require a non-null topic admin when configured to use their own offsets topic");
        KafkaOffsetBackingStore forTask = KafkaOffsetBackingStore.forTask(sourceConnectorConfig.offsetsTopic(), producer, new KafkaConsumer(regularSourceOffsetsConsumerConfigs(connectorTaskId.connector(), "connector-consumer-" + connectorTaskId, this.config, sourceConnectorConfig, cls, this.connectorClientConfigOverridePolicy, this.kafkaClusterId)), topicAdmin, this.internalKeyConverter);
        return sameOffsetTopicAsWorker(sourceConnectorConfig.offsetsTopic(), map) ? ConnectorOffsetBackingStore.withOnlyConnectorStore(() -> {
            return LoggingContext.forTask(connectorTaskId);
        }, forTask, offsetsTopic, topicAdmin) : ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forTask(connectorTaskId);
        }, this.globalOffsetBackingStore, forTask, offsetsTopic, topicAdmin);
    }

    ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask(ConnectorTaskId connectorTaskId, SourceConnectorConfig sourceConnectorConfig, Class<? extends Connector> cls, Producer<byte[], byte[]> producer, Map<String, Object> map, TopicAdmin topicAdmin) {
        Objects.requireNonNull(topicAdmin, "Source tasks require a non-null topic admin when exactly-once support is enabled");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(exactlyOnceSourceOffsetsConsumerConfigs(connectorTaskId.connector(), "connector-consumer-" + connectorTaskId, this.config, sourceConnectorConfig, cls, this.connectorClientConfigOverridePolicy, this.kafkaClusterId));
        String str = (String) Optional.ofNullable(sourceConnectorConfig.offsetsTopic()).orElse(this.config.offsetsTopic());
        KafkaOffsetBackingStore forTask = KafkaOffsetBackingStore.forTask(str, producer, kafkaConsumer, topicAdmin, this.internalKeyConverter);
        return sameOffsetTopicAsWorker(str, map) ? ConnectorOffsetBackingStore.withOnlyConnectorStore(() -> {
            return LoggingContext.forTask(connectorTaskId);
        }, forTask, str, topicAdmin) : ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forTask(connectorTaskId);
        }, this.globalOffsetBackingStore, forTask, str, topicAdmin);
    }

    private boolean sameOffsetTopicAsWorker(String str, Map<String, Object> map) {
        HashSet hashSet = new HashSet(this.config.getList("bootstrap.servers"));
        HashSet hashSet2 = new HashSet();
        try {
            hashSet2.addAll((List) ConfigDef.parseType("bootstrap.servers", map.getOrDefault("bootstrap.servers", "").toString(), ConfigDef.Type.LIST));
            return str.equals(this.config.offsetsTopic()) && hashSet.equals(hashSet2);
        } catch (Exception e) {
            throw new ConnectException("Failed to parse bootstrap servers property in producer config", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean regularSourceTaskUsesConnectorSpecificOffsetsStore(SourceConnectorConfig sourceConnectorConfig) {
        return sourceConnectorConfig.offsetsTopic() != null && this.config.connectorOffsetsTopicsPermitted();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean sourceConnectorTopicCreationEnabled(SourceConnectorConfig sourceConnectorConfig) {
        return this.config.topicCreationEnable() && sourceConnectorConfig.usesTopicCreation();
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Confluent
    public Optional<Tracer> connectorTracer(ConnectorConfig connectorConfig, ConnectorTaskId connectorTaskId, KafkaProducer<byte[], byte[]> kafkaProducer, TopicAdmin topicAdmin) {
        TracerConfig tracerConfig = new TracerConfig(this.plugins, connectorConfig);
        if (!tracerConfig.isTracingEnabled()) {
            return Optional.empty();
        }
        log.info("Setting up tracing for connector '{}' and task {}", connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
        Class<? extends Connector> connectorClass = this.plugins.connectorClass(connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
        if (kafkaProducer == null) {
            Map<String, Object> baseProducerConfigs = baseProducerConfigs(connectorTaskId, "Connector-Tracer-Producer-" + connectorTaskId, this.config, connectorConfig, connectorClass, this.connectorClientConfigOverridePolicy, this.kafkaClusterId);
            ProducerConfig producerConfig = new ProducerConfig(baseProducerConfigs);
            baseProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.valueOf(2 * (producerConfig.getLong(ProducerConfig.LINGER_MS_CONFIG).intValue() + producerConfig.getInt("request.timeout.ms").intValue())));
            log.info("Creating new producer for sending trace records");
            kafkaProducer = new KafkaProducer<>(baseProducerConfigs);
        }
        if (topicAdmin == null) {
            log.info("Creating new admin client for creating trace topic(s)");
            topicAdmin = new TopicAdmin(adminConfigs(connectorTaskId, "Connector-Tracer-Admin-" + connectorTaskId, this.config, connectorConfig, connectorClass, this.connectorClientConfigOverridePolicy, this.kafkaClusterId, ConnectorType.SINK));
        }
        return Optional.of(new ConnectTracer(connectorTaskId, tracerConfig, kafkaProducer, topicAdmin));
    }
}
