package org.apache.kafka.connect.runtime;

import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.LoggingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnector.class */
public class WorkerConnector implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WorkerConnector.class);
    private static final String THREAD_NAME_PREFIX = "connector-thread-";
    private final String connName;
    private final Map<String, String> config;
    private final ConnectorStatus.Listener statusListener;
    private final ClassLoader loader;
    private final CloseableConnectorContext ctx;
    private final Connector connector;
    private final ConnectorMetricsGroup metrics;
    private final CloseableOffsetStorageReader offsetStorageReader;
    private final ConnectorOffsetBackingStore offsetStore;
    private State state = State.INIT;
    private final AtomicReference<TargetState> pendingTargetStateChange = new AtomicReference<>();
    private final AtomicReference<Callback<TargetState>> pendingStateChangeCallback = new AtomicReference<>();
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private volatile boolean stopping = false;
    private volatile boolean cancelled = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnector$ConnectorMetricsGroup.class */
    public class ConnectorMetricsGroup implements ConnectorStatus.Listener, AutoCloseable {
        private volatile AbstractStatus.State state;
        private final ConnectMetrics.MetricGroup metricGroup;
        private final ConnectorStatus.Listener delegate;
        private final ConnectMetricsRegistry registry;

        /* JADX INFO: Access modifiers changed from: protected */
        public void addHerderMetrics(Herder herder) {
            this.metricGroup.addValueMetric(this.registry.legacyConnectorTotalTaskCount, j -> {
                if (herder.connectorStatus(WorkerConnector.this.connName) == null) {
                    return null;
                }
                return Long.valueOf(r0.tasks().size());
            });
        }

        public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State state, ConnectorStatus.Listener listener) {
            Objects.requireNonNull(connectMetrics);
            Objects.requireNonNull(WorkerConnector.this.connector);
            Objects.requireNonNull(state);
            Objects.requireNonNull(listener);
            this.delegate = listener;
            this.state = state;
            this.registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(this.registry.connectorGroupName(), this.registry.connectorTagName(), WorkerConnector.this.connName);
            this.metricGroup.close();
            this.metricGroup.addImmutableValueMetric(this.registry.connectorType, WorkerConnector.this.connectorType());
            this.metricGroup.addImmutableValueMetric(this.registry.connectorClass, WorkerConnector.this.connector.getClass().getName());
            this.metricGroup.addImmutableValueMetric(this.registry.connectorVersion, WorkerConnector.this.connector.version());
            this.metricGroup.addValueMetric(this.registry.connectorStatus, j -> {
                return this.state.toString().toLowerCase(Locale.getDefault());
            });
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.metricGroup.close();
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onStartup(String str) {
            this.state = AbstractStatus.State.RUNNING;
            synchronized (this) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onStartup(str);
                }
            }
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onShutdown(String str) {
            this.state = AbstractStatus.State.UNASSIGNED;
            synchronized (this) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onShutdown(str);
                }
            }
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onPause(String str) {
            this.state = AbstractStatus.State.PAUSED;
            synchronized (this) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onPause(str);
                }
            }
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onResume(String str) {
            this.state = AbstractStatus.State.RUNNING;
            synchronized (this) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onResume(str);
                }
            }
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onFailure(String str, Throwable th) {
            this.state = AbstractStatus.State.FAILED;
            synchronized (this) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onFailure(str, th);
                }
            }
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onDeletion(String str) {
            this.state = AbstractStatus.State.DESTROYED;
            this.delegate.onDeletion(str);
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onRestart(String str) {
            this.state = AbstractStatus.State.RESTARTING;
            this.delegate.onRestart(str);
        }

        boolean isUnassigned() {
            return this.state == AbstractStatus.State.UNASSIGNED;
        }

        boolean isRunning() {
            return this.state == AbstractStatus.State.RUNNING;
        }

        boolean isPaused() {
            return this.state == AbstractStatus.State.PAUSED;
        }

        boolean isFailed() {
            return this.state == AbstractStatus.State.FAILED;
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnector$State.class */
    public enum State {
        INIT,
        STOPPED,
        STARTED,
        FAILED
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnector$WorkerConnectorContext.class */
    private abstract class WorkerConnectorContext implements ConnectorContext {
        private WorkerConnectorContext() {
        }

        @Override // org.apache.kafka.connect.connector.ConnectorContext
        public void requestTaskReconfiguration() {
            WorkerConnector.this.ctx.requestTaskReconfiguration();
        }

        @Override // org.apache.kafka.connect.connector.ConnectorContext
        public void raiseError(Exception exc) {
            WorkerConnector.log.error("{} Connector raised an error", WorkerConnector.this, exc);
            WorkerConnector.this.onFailure(exc);
            WorkerConnector.this.ctx.raiseError(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnector$WorkerSinkConnectorContext.class */
    public class WorkerSinkConnectorContext extends WorkerConnectorContext implements SinkConnectorContext {
        private WorkerSinkConnectorContext() {
            super();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnector$WorkerSourceConnectorContext.class */
    public class WorkerSourceConnectorContext extends WorkerConnectorContext implements SourceConnectorContext {
        private final OffsetStorageReader offsetStorageReader;

        WorkerSourceConnectorContext(OffsetStorageReader offsetStorageReader) {
            super();
            this.offsetStorageReader = offsetStorageReader;
        }

        @Override // org.apache.kafka.connect.source.SourceConnectorContext
        public OffsetStorageReader offsetStorageReader() {
            return this.offsetStorageReader;
        }
    }

    public WorkerConnector(String str, Connector connector, ConnectorConfig connectorConfig, CloseableConnectorContext closeableConnectorContext, ConnectMetrics connectMetrics, ConnectorStatus.Listener listener, CloseableOffsetStorageReader closeableOffsetStorageReader, ConnectorOffsetBackingStore connectorOffsetBackingStore, ClassLoader classLoader) {
        this.connName = str;
        this.config = connectorConfig.originalsStrings();
        this.loader = classLoader;
        this.ctx = closeableConnectorContext;
        this.connector = connector;
        this.metrics = new ConnectorMetricsGroup(connectMetrics, AbstractStatus.State.UNASSIGNED, listener);
        this.statusListener = this.metrics;
        this.offsetStorageReader = closeableOffsetStorageReader;
        this.offsetStore = connectorOffsetBackingStore;
    }

    public ClassLoader loader() {
        return this.loader;
    }

    @Override // java.lang.Runnable
    public void run() {
        LoggingContext.clear();
        try {
            LoggingContext forConnector = LoggingContext.forConnector(this.connName);
            Throwable th = null;
            try {
                String name = Thread.currentThread().getName();
                try {
                    Thread.currentThread().setName(THREAD_NAME_PREFIX + this.connName);
                    doRun();
                    Thread.currentThread().setName(name);
                    if (forConnector != null) {
                        if (0 != 0) {
                            try {
                                forConnector.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            forConnector.close();
                        }
                    }
                } catch (Throwable th3) {
                    Thread.currentThread().setName(name);
                    throw th3;
                }
            } finally {
            }
        } finally {
            this.shutdownLatch.countDown();
        }
    }

    void doRun() {
        TargetState andSet;
        Callback<TargetState> andSet2;
        initialize();
        while (!this.stopping) {
            synchronized (this) {
                andSet = this.pendingTargetStateChange.getAndSet(null);
                andSet2 = this.pendingStateChangeCallback.getAndSet(null);
            }
            if (andSet != null && !this.stopping) {
                doTransitionTo(andSet, andSet2);
            }
            synchronized (this) {
                if (this.pendingTargetStateChange.get() == null && !this.stopping) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
        doShutdown();
    }

    void initialize() {
        try {
            if (!isSourceConnector() && !isSinkConnector()) {
                throw new ConnectException("Connector implementations must be a subclass of either SourceConnector or SinkConnector");
            }
            log.debug("{} Initializing connector {}", this, this.connName);
            if (isSinkConnector()) {
                SinkConnectorConfig.validate(this.config);
                this.connector.initialize(new WorkerSinkConnectorContext());
            } else {
                Objects.requireNonNull(this.offsetStore, "Offset store cannot be null for source connectors");
                Objects.requireNonNull(this.offsetStorageReader, "Offset reader cannot be null for source connectors");
                this.offsetStore.start();
                this.connector.initialize(new WorkerSourceConnectorContext(this.offsetStorageReader));
            }
        } catch (Throwable th) {
            log.error("{} Error initializing connector", this, th);
            onFailure(th);
        }
    }

    private boolean doStart() throws Throwable {
        try {
            switch (this.state) {
                case STARTED:
                    return false;
                case INIT:
                case STOPPED:
                    this.connector.start(this.config);
                    this.state = State.STARTED;
                    return true;
                default:
                    throw new IllegalArgumentException("Cannot start connector in state " + this.state);
            }
        } catch (Throwable th) {
            log.error("{} Error while starting connector", this, th);
            onFailure(th);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onFailure(Throwable th) {
        this.statusListener.onFailure(this.connName, th);
        this.state = State.FAILED;
    }

    private void resume() throws Throwable {
        if (doStart()) {
            this.statusListener.onResume(this.connName);
        }
    }

    private void start() throws Throwable {
        if (doStart()) {
            this.statusListener.onStartup(this.connName);
        }
    }

    public boolean isRunning() {
        return this.state == State.STARTED;
    }

    private void pause() {
        try {
            switch (this.state) {
                case STARTED:
                    this.connector.stop();
                    break;
                case INIT:
                    break;
                case STOPPED:
                    return;
                default:
                    throw new IllegalArgumentException("Cannot pause connector in state " + this.state);
            }
            this.statusListener.onPause(this.connName);
            this.state = State.STOPPED;
        } catch (Throwable th) {
            log.error("{} Error while shutting down connector", this, th);
            this.statusListener.onFailure(this.connName, th);
            this.state = State.FAILED;
        }
    }

    public synchronized void shutdown() {
        log.info("Scheduled shutdown for {}", this);
        this.stopping = true;
        notify();
    }

    void doShutdown() {
        try {
            try {
                TargetState andSet = this.pendingTargetStateChange.getAndSet(null);
                Callback<TargetState> andSet2 = this.pendingStateChangeCallback.getAndSet(null);
                if (andSet2 != null) {
                    andSet2.onCompletion(new ConnectException("Could not begin changing connector state to " + andSet.name() + " as the connector has been scheduled for shutdown"), null);
                }
                if (this.state == State.STARTED) {
                    this.connector.stop();
                }
                this.state = State.STOPPED;
                this.statusListener.onShutdown(this.connName);
                log.info("Completed shutdown for {}", this);
                Utils.closeQuietly(this.ctx, "connector context for " + this.connName);
                Utils.closeQuietly(this.metrics, "connector metrics for " + this.connName);
                Utils.closeQuietly(this.offsetStorageReader, "offset reader for " + this.connName);
                if (this.offsetStore != null) {
                    ConnectorOffsetBackingStore connectorOffsetBackingStore = this.offsetStore;
                    connectorOffsetBackingStore.getClass();
                    Utils.closeQuietly(connectorOffsetBackingStore::stop, "offset backing store for " + this.connName);
                }
            } catch (Throwable th) {
                log.error("{} Error while shutting down connector", this, th);
                this.state = State.FAILED;
                this.statusListener.onFailure(this.connName, th);
                Utils.closeQuietly(this.ctx, "connector context for " + this.connName);
                Utils.closeQuietly(this.metrics, "connector metrics for " + this.connName);
                Utils.closeQuietly(this.offsetStorageReader, "offset reader for " + this.connName);
                if (this.offsetStore != null) {
                    ConnectorOffsetBackingStore connectorOffsetBackingStore2 = this.offsetStore;
                    connectorOffsetBackingStore2.getClass();
                    Utils.closeQuietly(connectorOffsetBackingStore2::stop, "offset backing store for " + this.connName);
                }
            }
        } catch (Throwable th2) {
            Utils.closeQuietly(this.ctx, "connector context for " + this.connName);
            Utils.closeQuietly(this.metrics, "connector metrics for " + this.connName);
            Utils.closeQuietly(this.offsetStorageReader, "offset reader for " + this.connName);
            if (this.offsetStore != null) {
                ConnectorOffsetBackingStore connectorOffsetBackingStore3 = this.offsetStore;
                connectorOffsetBackingStore3.getClass();
                Utils.closeQuietly(connectorOffsetBackingStore3::stop, "offset backing store for " + this.connName);
            }
            throw th2;
        }
    }

    public synchronized void cancel() {
        this.statusListener.onShutdown(this.connName);
        Utils.closeQuietly(this.ctx, "connector context for " + this.connName);
        Utils.closeQuietly(this.offsetStorageReader, "offset reader for " + this.connName);
        this.cancelled = true;
    }

    public boolean awaitShutdown(long j) {
        try {
            return this.shutdownLatch.await(j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return false;
        }
    }

    public void transitionTo(TargetState targetState, Callback<TargetState> callback) {
        Callback<TargetState> andSet;
        TargetState andSet2;
        synchronized (this) {
            andSet = this.pendingStateChangeCallback.getAndSet(callback);
            andSet2 = this.pendingTargetStateChange.getAndSet(targetState);
            notify();
        }
        if (andSet != null) {
            andSet.onCompletion(new ConnectException("Could not begin changing connector state to " + andSet2.name() + " before another request to change state was made; the new request (which is to change the state to " + targetState.name() + ") has pre-empted this one"), null);
        }
    }

    void doTransitionTo(TargetState targetState, Callback<TargetState> callback) {
        if (this.state == State.FAILED) {
            callback.onCompletion(new ConnectException(this + " Cannot transition connector to " + targetState + " since it has failed"), null);
            return;
        }
        try {
            doTransitionTo(targetState);
            callback.onCompletion(null, targetState);
        } catch (Throwable th) {
            callback.onCompletion(new ConnectException("Failed to transition connector " + this.connName + " to state " + targetState, th), null);
        }
    }

    private void doTransitionTo(TargetState targetState) throws Throwable {
        log.debug("{} Transition connector to {}", this, targetState);
        if (targetState == TargetState.PAUSED) {
            pause();
        } else {
            if (targetState != TargetState.STARTED) {
                throw new IllegalArgumentException("Unhandled target state " + targetState);
            }
            if (this.state == State.INIT) {
                start();
            } else {
                resume();
            }
        }
    }

    public boolean isSinkConnector() {
        return ConnectUtils.isSinkConnector(this.connector);
    }

    public boolean isSourceConnector() {
        return ConnectUtils.isSourceConnector(this.connector);
    }

    protected String connectorType() {
        return isSinkConnector() ? "sink" : isSourceConnector() ? "source" : ClientInformation.UNKNOWN_NAME_OR_VERSION;
    }

    public Connector connector() {
        return this.connector;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectorMetricsGroup metrics() {
        return this.metrics;
    }

    public String toString() {
        return "WorkerConnector{id=" + this.connName + '}';
    }
}
