package org.apache.kafka.connect.runtime.standalone;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.HerderRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerder.class */
public class StandaloneHerder extends AbstractHerder {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StandaloneHerder.class);
    private final AtomicLong requestSeqNum;
    private final ScheduledExecutorService requestExecutorService;
    private ClusterConfigState configState;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerder$ConfigUpdateListener.class */
    private class ConfigUpdateListener implements ConfigBackingStore.UpdateListener {
        private ConfigUpdateListener() {
        }

        @Override // org.apache.kafka.connect.storage.ConfigBackingStore.UpdateListener
        public void onConnectorConfigRemove(String str) {
            synchronized (StandaloneHerder.this) {
                StandaloneHerder.this.configState = StandaloneHerder.this.configBackingStore.snapshot();
            }
        }

        @Override // org.apache.kafka.connect.storage.ConfigBackingStore.UpdateListener
        public void onConnectorConfigUpdate(String str) {
            synchronized (StandaloneHerder.this) {
                StandaloneHerder.this.configState = StandaloneHerder.this.configBackingStore.snapshot();
            }
        }

        @Override // org.apache.kafka.connect.storage.ConfigBackingStore.UpdateListener
        public void onTaskConfigUpdate(Collection<ConnectorTaskId> collection) {
            synchronized (StandaloneHerder.this) {
                StandaloneHerder.this.configState = StandaloneHerder.this.configBackingStore.snapshot();
            }
        }

        @Override // org.apache.kafka.connect.storage.ConfigBackingStore.UpdateListener
        public void onConnectorTargetStateChange(String str) {
            synchronized (StandaloneHerder.this) {
                StandaloneHerder.this.configState = StandaloneHerder.this.configBackingStore.snapshot();
                TargetState targetState = StandaloneHerder.this.configState.targetState(str);
                StandaloneHerder.this.worker.setTargetState(str, targetState);
                if (targetState == TargetState.STARTED) {
                    StandaloneHerder.this.updateConnectorTasks(str);
                }
            }
        }

        @Override // org.apache.kafka.connect.storage.ConfigBackingStore.UpdateListener
        public void onSessionKeyUpdate(SessionKey sessionKey) {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerder$StandaloneHerderRequest.class */
    static class StandaloneHerderRequest implements HerderRequest {
        private final long seq;
        private final ScheduledFuture<?> future;

        public StandaloneHerderRequest(long j, ScheduledFuture<?> scheduledFuture) {
            this.seq = j;
            this.future = scheduledFuture;
        }

        @Override // org.apache.kafka.connect.runtime.HerderRequest
        public void cancel() {
            this.future.cancel(false);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return (obj instanceof StandaloneHerderRequest) && this.seq == ((StandaloneHerderRequest) obj).seq;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.seq));
        }
    }

    public StandaloneHerder(Worker worker, String str, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this(worker, worker.workerId(), str, new MemoryStatusBackingStore(), new MemoryConfigBackingStore(worker.configTransformer()), connectorClientConfigOverridePolicy);
    }

    StandaloneHerder(Worker worker, String str, String str2, StatusBackingStore statusBackingStore, MemoryConfigBackingStore memoryConfigBackingStore, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        super(worker, str, str2, statusBackingStore, memoryConfigBackingStore, connectorClientConfigOverridePolicy);
        this.requestSeqNum = new AtomicLong();
        this.configState = ClusterConfigState.EMPTY;
        this.requestExecutorService = Executors.newSingleThreadScheduledExecutor();
        memoryConfigBackingStore.setUpdateListener(new ConfigUpdateListener());
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void start() {
        log.info("Herder starting");
        startServices();
        log.info("Herder started");
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void stop() {
        log.info("Herder stopping");
        this.requestExecutorService.shutdown();
        try {
            if (!this.requestExecutorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                this.requestExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
        }
        for (String str : connectors()) {
            removeConnectorTasks(str);
            this.worker.stopConnector(str);
        }
        stopServices();
        log.info("Herder stopped");
    }

    @Override // org.apache.kafka.connect.runtime.AbstractHerder
    public int generation() {
        return 0;
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void connectors(Callback<Collection<String>> callback) {
        callback.onCompletion(null, connectors());
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void connectorInfo(String str, Callback<ConnectorInfo> callback) {
        ConnectorInfo connectorInfo = connectorInfo(str);
        if (connectorInfo == null) {
            callback.onCompletion(new NotFoundException("Connector " + str + " not found"), null);
        } else {
            callback.onCompletion(null, connectorInfo);
        }
    }

    private ConnectorInfo createConnectorInfo(String str) {
        if (!this.configState.contains(str)) {
            return null;
        }
        Map<String, String> rawConnectorConfig = this.configState.rawConnectorConfig(str);
        return new ConnectorInfo(str, rawConnectorConfig, this.configState.tasks(str), connectorTypeForClass(rawConnectorConfig.get("connector.class")));
    }

    @Override // org.apache.kafka.connect.runtime.AbstractHerder
    protected Map<String, String> config(String str) {
        return this.configState.connectorConfig(str);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void connectorConfig(String str, final Callback<Map<String, String>> callback) {
        connectorInfo(str, new Callback<ConnectorInfo>() { // from class: org.apache.kafka.connect.runtime.standalone.StandaloneHerder.1
            @Override // org.apache.kafka.connect.util.Callback
            public void onCompletion(Throwable th, ConnectorInfo connectorInfo) {
                if (th != null) {
                    callback.onCompletion(th, null);
                } else {
                    callback.onCompletion(null, connectorInfo.config());
                }
            }
        });
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void deleteConnectorConfig(String str, Callback<Herder.Created<ConnectorInfo>> callback) {
        try {
            if (!this.configState.contains(str)) {
                callback.onCompletion(new NotFoundException("Connector " + str + " not found", null), null);
                return;
            }
            removeConnectorTasks(str);
            this.worker.stopConnector(str);
            this.configBackingStore.removeConnectorConfig(str);
            onDeletion(str);
            callback.onCompletion(null, new Herder.Created<>(false, null));
        } catch (ConnectException e) {
            callback.onCompletion(e, null);
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void putConnectorConfig(String str, Map<String, String> map, boolean z, Callback<Herder.Created<ConnectorInfo>> callback) {
        try {
            if (maybeAddConfigErrors(validateConnectorConfig(map), callback)) {
                return;
            }
            boolean z2 = false;
            if (!this.configState.contains(str)) {
                z2 = true;
            } else {
                if (!z) {
                    callback.onCompletion(new AlreadyExistsException("Connector " + str + " already exists"), null);
                    return;
                }
                this.worker.stopConnector(str);
            }
            this.configBackingStore.putConnectorConfig(str, map);
            if (!startConnector(str)) {
                callback.onCompletion(new ConnectException("Failed to start connector: " + str), null);
            } else {
                updateConnectorTasks(str);
                callback.onCompletion(null, new Herder.Created<>(z2, createConnectorInfo(str)));
            }
        } catch (ConnectException e) {
            callback.onCompletion(e, null);
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void requestTaskReconfiguration(String str) {
        if (this.worker.connectorNames().contains(str)) {
            updateConnectorTasks(str);
        } else {
            log.error("Task that requested reconfiguration does not exist: {}", str);
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void taskConfigs(String str, Callback<List<TaskInfo>> callback) {
        if (!this.configState.contains(str)) {
            callback.onCompletion(new NotFoundException("Connector " + str + " not found", null), null);
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (ConnectorTaskId connectorTaskId : this.configState.tasks(str)) {
            arrayList.add(new TaskInfo(connectorTaskId, this.configState.rawTaskConfig(connectorTaskId)));
        }
        callback.onCompletion(null, arrayList);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void putTaskConfigs(String str, List<Map<String, String>> list, Callback<Void> callback, InternalRequestSignature internalRequestSignature) {
        throw new UnsupportedOperationException("Kafka Connect in standalone mode does not support externally setting task configurations.");
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void restartTask(ConnectorTaskId connectorTaskId, Callback<Void> callback) {
        if (!this.configState.contains(connectorTaskId.connector())) {
            callback.onCompletion(new NotFoundException("Connector " + connectorTaskId.connector() + " not found", null), null);
        }
        Map<String, String> taskConfig = this.configState.taskConfig(connectorTaskId);
        if (taskConfig == null) {
            callback.onCompletion(new NotFoundException("Task " + connectorTaskId + " not found", null), null);
        }
        Map<String, String> connectorConfig = this.configState.connectorConfig(connectorTaskId.connector());
        TargetState targetState = this.configState.targetState(connectorTaskId.connector());
        this.worker.stopAndAwaitTask(connectorTaskId);
        if (this.worker.startTask(connectorTaskId, this.configState, connectorConfig, taskConfig, this, targetState)) {
            callback.onCompletion(null, null);
        } else {
            callback.onCompletion(new ConnectException("Failed to start task: " + connectorTaskId), null);
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void restartConnector(String str, Callback<Void> callback) {
        if (!this.configState.contains(str)) {
            callback.onCompletion(new NotFoundException("Connector " + str + " not found", null), null);
        }
        this.worker.stopConnector(str);
        if (startConnector(str)) {
            callback.onCompletion(null, null);
        } else {
            callback.onCompletion(new ConnectException("Failed to start connector: " + str), null);
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized HerderRequest restartConnector(long j, final String str, final Callback<Void> callback) {
        return new StandaloneHerderRequest(this.requestSeqNum.incrementAndGet(), this.requestExecutorService.schedule(new Runnable() { // from class: org.apache.kafka.connect.runtime.standalone.StandaloneHerder.2
            @Override // java.lang.Runnable
            public void run() {
                StandaloneHerder.this.restartConnector(str, callback);
            }
        }, j, TimeUnit.MILLISECONDS));
    }

    private boolean startConnector(String str) {
        return this.worker.startConnector(str, this.configState.connectorConfig(str), new HerderConnectorContext(this, str), this, this.configState.targetState(str));
    }

    private List<Map<String, String>> recomputeTaskConfigs(String str) {
        Map<String, String> connectorConfig = this.configState.connectorConfig(str);
        return this.worker.connectorTaskConfigs(str, this.worker.isSinkConnector(str) ? new SinkConnectorConfig(plugins(), connectorConfig) : new SourceConnectorConfig(plugins(), connectorConfig));
    }

    private void createConnectorTasks(String str, TargetState targetState) {
        Map<String, String> connectorConfig = this.configState.connectorConfig(str);
        for (ConnectorTaskId connectorTaskId : this.configState.tasks(str)) {
            this.worker.startTask(connectorTaskId, this.configState, connectorConfig, this.configState.taskConfig(connectorTaskId), this, targetState);
        }
    }

    private void removeConnectorTasks(String str) {
        List<ConnectorTaskId> tasks = this.configState.tasks(str);
        if (tasks.isEmpty()) {
            return;
        }
        this.worker.stopAndAwaitTasks(tasks);
        this.configBackingStore.removeTaskConfigs(str);
        tasks.forEach(this::onDeletion);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateConnectorTasks(String str) {
        if (!this.worker.isRunning(str)) {
            log.info("Skipping update of connector {} since it is not running", str);
            return;
        }
        List<Map<String, String>> recomputeTaskConfigs = recomputeTaskConfigs(str);
        if (recomputeTaskConfigs.equals(this.configState.allTaskConfigs(str))) {
            return;
        }
        removeConnectorTasks(str);
        this.configBackingStore.putTaskConfigs(str, reverseTransform(str, this.configState, recomputeTaskConfigs));
        createConnectorTasks(str, this.configState.targetState(str));
    }
}
