package org.apache.kafka.connect.runtime;

import io.confluent.logevents.connect.LogEvents;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/AbstractHerder.class */
public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
    private final String workerId;
    protected final Worker worker;
    private final String kafkaClusterId;
    protected final StatusBackingStore statusBackingStore;
    protected final ConfigBackingStore configBackingStore;
    private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
    private final ExecutorService connectorExecutor;
    private final Logger log = LoggerFactory.getLogger((Class<?>) AbstractHerder.class);
    protected volatile boolean running = false;
    private final ConcurrentMap<String, Connector> tempConnectors = new ConcurrentHashMap();

    public AbstractHerder(Worker worker, String str, String str2, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this.worker = worker;
        this.worker.herder = this;
        this.workerId = str;
        this.kafkaClusterId = str2;
        this.statusBackingStore = statusBackingStore;
        this.configBackingStore = configBackingStore;
        this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
        this.connectorExecutor = Executors.newCachedThreadPool();
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public String kafkaClusterId() {
        return this.kafkaClusterId;
    }

    protected abstract int generation();

    /* JADX INFO: Access modifiers changed from: protected */
    public void startServices() {
        this.worker.start();
        this.statusBackingStore.start();
        this.configBackingStore.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopServices() {
        this.statusBackingStore.stop();
        this.configBackingStore.stop();
        this.worker.stop();
        this.connectorExecutor.shutdown();
        Utils.closeQuietly(this.connectorClientConfigOverridePolicy, "connector client config override policy");
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public boolean isRunning() {
        return this.running;
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onStartup(String str) {
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.RUNNING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onStop(String str) {
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.STOPPED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onPause(String str) {
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.PAUSED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onResume(String str) {
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.RUNNING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onShutdown(String str) {
        this.statusBackingStore.putSafe(new ConnectorStatus(str, AbstractStatus.State.UNASSIGNED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onFailure(String str, Throwable th) {
        Optional empty;
        if (this.worker.logEventsDeduplicateErrors()) {
            Worker worker = this.worker;
            worker.getClass();
            empty = Optional.of(LogEvents.createLogEventState(worker::createIfAbsentConnectorLogEventState, str, this.worker.logFailureEventResetTime()));
        } else {
            empty = Optional.empty();
        }
        Optional optional = empty;
        Worker worker2 = this.worker;
        worker2.getClass();
        LogEvents.connectorFailedEvent(worker2::logEventsEmitter, str, th, optional);
        this.statusBackingStore.putSafe(new ConnectorStatus(str, AbstractStatus.State.FAILED, trace(th), this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onStartup(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.put(new TaskStatus(connectorTaskId, AbstractStatus.State.RUNNING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onFailure(ConnectorTaskId connectorTaskId, Throwable th) {
        Optional empty;
        if (this.worker.logEventsDeduplicateErrors()) {
            Worker worker = this.worker;
            worker.getClass();
            empty = Optional.of(LogEvents.createLogEventState(worker::createIfAbsentTaskLogEventState, connectorTaskId.toString(), this.worker.logFailureEventResetTime()));
        } else {
            empty = Optional.empty();
        }
        Optional optional = empty;
        Worker worker2 = this.worker;
        worker2.getClass();
        LogEvents.connectorTaskFailedEvent(worker2::logEventsEmitter, connectorTaskId.connector(), connectorTaskId.toString(), th, optional);
        this.statusBackingStore.putSafe(new TaskStatus(connectorTaskId, AbstractStatus.State.FAILED, this.workerId, generation(), trace(th)));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onShutdown(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.putSafe(new TaskStatus(connectorTaskId, AbstractStatus.State.UNASSIGNED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onResume(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.put(new TaskStatus(connectorTaskId, AbstractStatus.State.RUNNING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onPause(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.put(new TaskStatus(connectorTaskId, AbstractStatus.State.PAUSED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onDeletion(String str) {
        Iterator<TaskStatus> it = this.statusBackingStore.getAll(str).iterator();
        while (it.hasNext()) {
            onDeletion(it.next().id());
        }
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.DESTROYED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onDeletion(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.put(new TaskStatus(connectorTaskId, AbstractStatus.State.DESTROYED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onRestart(String str) {
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.RESTARTING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onRestart(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.put(new TaskStatus(connectorTaskId, AbstractStatus.State.RESTARTING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void pauseConnector(String str) {
        if (!this.configBackingStore.contains(str)) {
            throw new NotFoundException("Unknown connector " + str);
        }
        this.configBackingStore.putTargetState(str, TargetState.PAUSED);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void resumeConnector(String str) {
        if (!this.configBackingStore.contains(str)) {
            throw new NotFoundException("Unknown connector " + str);
        }
        this.configBackingStore.putTargetState(str, TargetState.STARTED);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public Plugins plugins() {
        return this.worker.getPlugins();
    }

    protected abstract Map<String, String> rawConfig(String str);

    @Override // org.apache.kafka.connect.runtime.Herder
    public void connectorConfig(String str, Callback<Map<String, String>> callback) {
        connectorInfo(str, (th, connectorInfo) -> {
            if (th != null) {
                callback.onCompletion(th, null);
            } else {
                callback.onCompletion(null, connectorInfo.config());
            }
        });
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public Collection<String> connectors() {
        return this.configBackingStore.snapshot().connectors();
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public ConnectorInfo connectorInfo(String str) {
        ClusterConfigState snapshot = this.configBackingStore.snapshot();
        if (!snapshot.contains(str)) {
            return null;
        }
        Map<String, String> rawConnectorConfig = snapshot.rawConnectorConfig(str);
        return new ConnectorInfo(str, rawConnectorConfig, snapshot.tasks(str), connectorType(rawConnectorConfig));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> filterSensitiveConfig(Map<String, String> map) {
        HashMap hashMap = new HashMap(map);
        hashMap.keySet().removeIf(str -> {
            return str.startsWith("confluent.license");
        });
        hashMap.keySet().removeIf(str2 -> {
            return str2.startsWith("confluent.topic");
        });
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<ConnectorTaskId, Map<String, String>> buildTasksConfig(String str) {
        ClusterConfigState snapshot = this.configBackingStore.snapshot();
        if (!snapshot.contains(str)) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (ConnectorTaskId connectorTaskId : snapshot.tasks(str)) {
            hashMap.put(connectorTaskId, filterSensitiveConfig(snapshot.taskConfig(connectorTaskId)));
        }
        return hashMap;
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public ConnectorStateInfo connectorStatus(String str) {
        ConnectorStatus connectorStatus = this.statusBackingStore.get(str);
        if (connectorStatus == null) {
            throw new NotFoundException("No status found for connector " + str);
        }
        Collection<TaskStatus> all = this.statusBackingStore.getAll(str);
        ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(connectorStatus.state().toString(), connectorStatus.workerId(), connectorStatus.trace());
        ArrayList arrayList = new ArrayList();
        for (TaskStatus taskStatus : all) {
            arrayList.add(new ConnectorStateInfo.TaskState(taskStatus.id().task(), taskStatus.state().toString(), taskStatus.workerId(), taskStatus.trace()));
        }
        Collections.sort(arrayList);
        return new ConnectorStateInfo(str, connectorState, arrayList, connectorType(rawConfig(str)));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public ActiveTopicsInfo connectorActiveTopics(String str) {
        return new ActiveTopicsInfo(str, (Collection) this.statusBackingStore.getAllTopics(str).stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void resetConnectorActiveTopics(String str) {
        this.statusBackingStore.getAllTopics(str).stream().forEach(topicStatus -> {
            this.statusBackingStore.deleteTopic(topicStatus.connector(), topicStatus.topic());
        });
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public StatusBackingStore statusBackingStore() {
        return this.statusBackingStore;
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId connectorTaskId) {
        TaskStatus taskStatus = this.statusBackingStore.get(connectorTaskId);
        if (taskStatus == null) {
            throw new NotFoundException("No status found for task " + connectorTaskId);
        }
        return new ConnectorStateInfo.TaskState(connectorTaskId.task(), taskStatus.state().toString(), taskStatus.workerId(), taskStatus.trace());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, ConfigValue> validateSinkConnectorConfig(ConfigDef configDef, Map<String, String> map) {
        return SinkConnectorConfig.validate(configDef.validateAll(map), map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector sourceConnector, ConfigDef configDef, Map<String, String> map) {
        return configDef.validateAll(map);
    }

    private ConfigInfos validateHeaderConverterConfig(Map<String, String> map, ConfigValue configValue) {
        String str = map.get("header.converter");
        if (str == null || configValue == null || !configValue.errorMessages().isEmpty()) {
            return null;
        }
        try {
            try {
                ConfigDef config = ((HeaderConverter) Utils.newInstance(str, HeaderConverter.class)).config();
                if (config == null) {
                    this.log.warn("{}.config() has returned a null ConfigDef; no further preflight config validation for this converter will be performed", str);
                    return null;
                }
                Map<String, String> map2 = (Map) map.entrySet().stream().filter(entry -> {
                    return ((String) entry.getKey()).startsWith("header.converter.");
                }).collect(Collectors.toMap(entry2 -> {
                    return ((String) entry2.getKey()).substring("header.converter.".length());
                }, (v0) -> {
                    return v0.getValue();
                }));
                map2.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName());
                try {
                    return prefixedConfigInfos(config.configKeys(), config.validate(map2), "header.converter.");
                } catch (RuntimeException e) {
                    this.log.error("Failed to perform custom config validation for header converter of type {}", str, e);
                    configValue.addErrorMessage("Failed to perform custom config validation for header converter" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
                    return null;
                }
            } catch (RuntimeException e2) {
                this.log.error("Failed to load ConfigDef from header converter of type {}", str, e2);
                configValue.addErrorMessage("Failed to load ConfigDef from header converter" + (e2.getMessage() != null ? ": " + e2.getMessage() : ""));
                return null;
            }
        } catch (ClassNotFoundException | RuntimeException e3) {
            this.log.error("Failed to instantiate header converter class {}; this should have been caught by prior validation logic", str, e3);
            configValue.addErrorMessage("Failed to load class " + str + (e3.getMessage() != null ? ": " + e3.getMessage() : ""));
            return null;
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void validateConnectorConfig(Map<String, String> map, Callback<ConfigInfos> callback) {
        validateConnectorConfig(map, callback, true);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void validateConnectorConfig(Map<String, String> map, Callback<ConfigInfos> callback, boolean z) {
        this.connectorExecutor.submit(() -> {
            try {
                callback.onCompletion(null, validateConnectorConfig((Map<String, String>) map, z));
            } catch (Throwable th) {
                callback.onCompletion(th, null);
            }
        });
    }

    public Optional<RestartPlan> buildRestartPlan(RestartRequest restartRequest) {
        String connectorName = restartRequest.connectorName();
        ConnectorStatus connectorStatus = this.statusBackingStore.get(connectorName);
        if (connectorStatus == null) {
            return Optional.empty();
        }
        return Optional.of(new RestartPlan(restartRequest, new ConnectorStateInfo(connectorName, new ConnectorStateInfo.ConnectorState((restartRequest.shouldRestartConnector(connectorStatus) ? AbstractStatus.State.RESTARTING : connectorStatus.state()).toString(), connectorStatus.workerId(), connectorStatus.trace()), (List) this.statusBackingStore.getAll(connectorName).stream().map(taskStatus -> {
            return new ConnectorStateInfo.TaskState(taskStatus.id().task(), (restartRequest.shouldRestartTask(taskStatus) ? AbstractStatus.State.RESTARTING : taskStatus.state()).toString(), taskStatus.workerId(), taskStatus.trace());
        }).collect(Collectors.toList()), connectorType(rawConfig(connectorName)))));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean connectorUsesConsumer(ConnectorType connectorType, Map<String, String> map) {
        return connectorType == ConnectorType.SINK;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean connectorUsesAdmin(ConnectorType connectorType, Map<String, String> map) {
        return connectorType == ConnectorType.SOURCE ? SourceConnectorConfig.usesTopicCreation(map) : !Utils.isBlank(map.getOrDefault(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "").trim());
    }

    protected boolean connectorUsesProducer(ConnectorType connectorType, Map<String, String> map) {
        return connectorType == ConnectorType.SOURCE || !Utils.isBlank(map.getOrDefault(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "").trim());
    }

    ConfigInfos validateConnectorConfig(Map<String, String> map, boolean z) {
        ConnectorType connectorType;
        ConfigDef enrich;
        Map<String, ConfigValue> validateSinkConnectorConfig;
        if (this.worker.configTransformer() != null) {
            map = this.worker.configTransformer().transform(map);
        }
        String str = map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
        if (str == null) {
            throw new BadRequestException("Connector config " + map + " contains no connector type");
        }
        Connector connector = getConnector(str);
        LoaderSwap withClassLoader = plugins().withClassLoader(plugins().connectorLoader(str));
        Throwable th = null;
        try {
            if (connector instanceof SourceConnector) {
                connectorType = ConnectorType.SOURCE;
                enrich = ConnectorConfig.enrich(plugins(), SourceConnectorConfig.configDef(), map, false);
                validateSinkConnectorConfig = validateSourceConnectorConfig((SourceConnector) connector, enrich, map);
            } else {
                connectorType = ConnectorType.SINK;
                enrich = ConnectorConfig.enrich(plugins(), SinkConnectorConfig.configDef(), map, false);
                validateSinkConnectorConfig = validateSinkConnectorConfig(enrich, map);
            }
            Map<String, ConfigValue> map2 = validateSinkConnectorConfig;
            map.entrySet().stream().filter(entry -> {
                return entry.getValue() == null;
            }).map((v0) -> {
                return v0.getKey();
            }).forEach(str2 -> {
                ((ConfigValue) map2.computeIfAbsent(str2, ConfigValue::new)).addErrorMessage("Null value can not be supplied as the configuration value.");
            });
            ArrayList arrayList = new ArrayList(validateSinkConnectorConfig.values());
            LinkedHashMap linkedHashMap = new LinkedHashMap(enrich.configKeys());
            LinkedHashSet linkedHashSet = new LinkedHashSet(enrich.groups());
            ConfigDef config = connector.config();
            if (null == config) {
                throw new BadRequestException(String.format("%s.config() must return a ConfigDef that is not null.", connector.getClass().getName()));
            }
            String str3 = map.get("name");
            Config validate = connector.validate(this.worker.configDecorator().decorateConnectorConfig(str3, connector, config, map));
            if (null == validate) {
                throw new BadRequestException(String.format("%s.validate() must return a Config that is not null.", connector.getClass().getName()));
            }
            linkedHashMap.putAll(config.configKeys());
            linkedHashSet.addAll(config.groups());
            arrayList.addAll(validate.configValues());
            ConfigInfos validateHeaderConverterConfig = validateHeaderConverterConfig(map, validateSinkConnectorConfig.get("header.converter"));
            ConfigInfos decorateValidationResult = this.worker.configDecorator().decorateValidationResult(str3, connector, config, map, generateResult(str, linkedHashMap, arrayList, new ArrayList(linkedHashSet)));
            AbstractConfig abstractConfig = new AbstractConfig(new ConfigDef(), map, z);
            ConfigInfos configInfos = null;
            ConfigInfos configInfos2 = null;
            ConfigInfos configInfos3 = null;
            if (connectorUsesProducer(connectorType, map)) {
                configInfos = validateClientOverrides(str3, ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX, abstractConfig, ProducerConfig.configDef(), connector.getClass(), connectorType, ConnectorClientConfigRequest.ClientType.PRODUCER, this.connectorClientConfigOverridePolicy);
            }
            if (connectorUsesAdmin(connectorType, map)) {
                configInfos3 = validateClientOverrides(str3, ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX, abstractConfig, AdminClientConfig.configDef(), connector.getClass(), connectorType, ConnectorClientConfigRequest.ClientType.ADMIN, this.connectorClientConfigOverridePolicy);
            }
            if (connectorUsesConsumer(connectorType, map)) {
                configInfos2 = validateClientOverrides(str3, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, abstractConfig, ConsumerConfig.configDef(), connector.getClass(), connectorType, ConnectorClientConfigRequest.ClientType.CONSUMER, this.connectorClientConfigOverridePolicy);
            }
            ConfigInfos mergeConfigInfos = mergeConfigInfos(str, decorateValidationResult, configInfos, configInfos2, configInfos3, validateHeaderConverterConfig);
            if (withClassLoader != null) {
                if (0 != 0) {
                    try {
                        withClassLoader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    withClassLoader.close();
                }
            }
            return mergeConfigInfos;
        } catch (Throwable th3) {
            if (withClassLoader != null) {
                if (0 != 0) {
                    try {
                        withClassLoader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    withClassLoader.close();
                }
            }
            throw th3;
        }
    }

    private static ConfigInfos mergeConfigInfos(String str, ConfigInfos... configInfosArr) {
        int i = 0;
        LinkedList linkedList = new LinkedList();
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (ConfigInfos configInfos : configInfosArr) {
            if (configInfos != null) {
                i += configInfos.errorCount();
                linkedList.addAll(configInfos.values());
                linkedHashSet.addAll(configInfos.groups());
            }
        }
        return new ConfigInfos(str, i, new ArrayList(linkedHashSet), linkedList);
    }

    private static ConfigInfos validateClientOverrides(String str, String str2, AbstractConfig abstractConfig, ConfigDef configDef, Class<? extends Connector> cls, ConnectorType connectorType, ConnectorClientConfigRequest.ClientType clientType, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Object> entry : abstractConfig.originalsWithPrefix(str2).entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            ConfigDef.ConfigKey configKey = configDef.configKeys().get(key);
            hashMap.put(key, configKey != null ? ConfigDef.parseType(key, value, configKey.type) : value);
        }
        return prefixedConfigInfos(configDef.configKeys(), connectorClientConfigOverridePolicy.validate(new ConnectorClientConfigRequest(str, connectorType, cls, hashMap, clientType)), str2);
    }

    private static ConfigInfos prefixedConfigInfos(Map<String, ConfigDef.ConfigKey> map, List<ConfigValue> list, String str) {
        int i = 0;
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        ArrayList arrayList = new ArrayList();
        if (list == null) {
            return new ConfigInfos("", 0, new ArrayList(linkedHashSet), arrayList);
        }
        for (ConfigValue configValue : list) {
            ConfigDef.ConfigKey configKey = map.get(configValue.name());
            ConfigKeyInfo configKeyInfo = null;
            if (configKey != null) {
                if (configKey.group != null) {
                    linkedHashSet.add(configKey.group);
                }
                configKeyInfo = convertConfigKey(configKey, str);
            }
            ConfigValue configValue2 = new ConfigValue(str + configValue.name(), configValue.value(), configValue.recommendedValues(), configValue.errorMessages());
            if (configValue2.errorMessages().size() > 0) {
                i++;
            }
            arrayList.add(new ConfigInfo(configKeyInfo, convertConfigValue(configValue2, configKey != null ? configKey.type : null)));
        }
        return new ConfigInfos("", i, new ArrayList(linkedHashSet), arrayList);
    }

    public static ConfigInfos generateResult(String str, Map<String, ConfigDef.ConfigKey> map, List<ConfigValue> list, List<String> list2) {
        int i = 0;
        LinkedList linkedList = new LinkedList();
        HashMap hashMap = new HashMap();
        for (ConfigValue configValue : list) {
            String name = configValue.name();
            hashMap.put(name, configValue);
            if (!map.containsKey(name)) {
                linkedList.add(new ConfigInfo(null, convertConfigValue(configValue, null)));
                i += configValue.errorMessages().size();
            }
        }
        for (Map.Entry<String, ConfigDef.ConfigKey> entry : map.entrySet()) {
            String key = entry.getKey();
            ConfigKeyInfo convertConfigKey = convertConfigKey(entry.getValue());
            ConfigDef.Type type = entry.getValue().type;
            ConfigValueInfo configValueInfo = null;
            if (hashMap.containsKey(key)) {
                ConfigValue configValue2 = (ConfigValue) hashMap.get(key);
                configValueInfo = convertConfigValue(configValue2, type);
                i += configValue2.errorMessages().size();
            }
            linkedList.add(new ConfigInfo(convertConfigKey, configValueInfo));
        }
        return new ConfigInfos(str, i, list2, linkedList);
    }

    public static ConfigKeyInfo convertConfigKey(ConfigDef.ConfigKey configKey) {
        return convertConfigKey(configKey, "");
    }

    private static ConfigKeyInfo convertConfigKey(ConfigDef.ConfigKey configKey, String str) {
        String convertToString;
        String str2 = str + configKey.name;
        ConfigDef.Type type = configKey.type;
        String name = configKey.type.name();
        boolean z = false;
        if (ConfigDef.NO_DEFAULT_VALUE.equals(configKey.defaultValue)) {
            convertToString = null;
            z = true;
        } else {
            convertToString = ConfigDef.convertToString(configKey.defaultValue, type);
        }
        return new ConfigKeyInfo(str2, name, z, convertToString, configKey.importance.name(), configKey.documentation, configKey.group, configKey.orderInGroup, configKey.width.name(), configKey.displayName, configKey.dependents);
    }

    private static ConfigValueInfo convertConfigValue(ConfigValue configValue, ConfigDef.Type type) {
        String convertToString = configValue.value() instanceof String ? (String) configValue.value() : ConfigDef.convertToString(configValue.value(), type);
        LinkedList linkedList = new LinkedList();
        if (type == ConfigDef.Type.LIST) {
            Iterator<Object> it = configValue.recommendedValues().iterator();
            while (it.hasNext()) {
                linkedList.add(ConfigDef.convertToString(it.next(), ConfigDef.Type.STRING));
            }
        } else {
            Iterator<Object> it2 = configValue.recommendedValues().iterator();
            while (it2.hasNext()) {
                linkedList.add(ConfigDef.convertToString(it2.next(), type));
            }
        }
        return new ConfigValueInfo(configValue.name(), convertToString, linkedList, configValue.errorMessages(), configValue.visible());
    }

    protected Connector getConnector(String str) {
        return this.tempConnectors.computeIfAbsent(str, str2 -> {
            return plugins().newConnector(str2);
        });
    }

    public org.apache.kafka.connect.runtime.rest.entities.ConnectorType connectorType(Map<String, String> map) {
        String str;
        if (map != null && (str = map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)) != null) {
            try {
                return org.apache.kafka.connect.runtime.rest.entities.ConnectorType.from(getConnector(str).getClass());
            } catch (ConnectException e) {
                this.log.warn("Unable to retrieve connector type", (Throwable) e);
                return org.apache.kafka.connect.runtime.rest.entities.ConnectorType.UNKNOWN;
            }
        }
        return org.apache.kafka.connect.runtime.rest.entities.ConnectorType.UNKNOWN;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean maybeAddConfigErrors(ConfigInfos configInfos, Callback<Herder.Created<ConnectorInfo>> callback) {
        int errorCount = configInfos.errorCount();
        boolean z = errorCount > 0;
        if (z) {
            StringBuilder sb = new StringBuilder();
            sb.append("Connector configuration is invalid and contains the following ").append(errorCount).append(" error(s):");
            Iterator<ConfigInfo> it = configInfos.values().iterator();
            while (it.hasNext()) {
                Iterator<String> it2 = it.next().configValue().errors().iterator();
                while (it2.hasNext()) {
                    sb.append('\n').append(it2.next());
                }
            }
            callback.onCompletion(new BadRequestException(sb.append("\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`").toString()), null);
        }
        return z;
    }

    private String trace(Throwable th) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            th.printStackTrace(new PrintStream((OutputStream) byteArrayOutputStream, false, StandardCharsets.UTF_8.name()));
            return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
        } catch (UnsupportedEncodingException e) {
            return null;
        }
    }

    public static List<Map<String, String>> reverseTransform(String str, ClusterConfigState clusterConfigState, List<Map<String, String>> list) {
        Map<String, String> rawConnectorConfig = clusterConfigState.rawConnectorConfig(str);
        Set<String> keysWithVariableValues = keysWithVariableValues(rawConnectorConfig, ConfigTransformer.DEFAULT_PATTERN);
        ArrayList arrayList = new ArrayList();
        Iterator<Map<String, String>> it = list.iterator();
        while (it.hasNext()) {
            HashMap hashMap = new HashMap(it.next());
            for (String str2 : keysWithVariableValues) {
                if (hashMap.containsKey(str2)) {
                    hashMap.put(str2, rawConnectorConfig.get(str2));
                }
            }
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    public boolean taskConfigsChanged(ClusterConfigState clusterConfigState, String str, List<Map<String, String>> list) {
        int taskCount = clusterConfigState.taskCount(str);
        boolean z = false;
        if (list.size() != taskCount) {
            this.log.debug("Connector {} task count changed from {} to {}", str, Integer.valueOf(taskCount), Integer.valueOf(list.size()));
            z = true;
        } else {
            for (int i = 0; i < taskCount; i++) {
                if (!list.get(i).equals(clusterConfigState.taskConfig(new ConnectorTaskId(str, i)))) {
                    this.log.debug("Connector {} has change in configuration for task {}-{}", str, str, Integer.valueOf(i));
                    z = true;
                }
            }
        }
        if (z) {
            this.log.debug("Reconfiguring connector {}: writing new updated configurations for tasks", str);
        } else {
            this.log.debug("Skipping reconfiguration of connector {} as generated configs appear unchanged", str);
        }
        return z;
    }

    static Set<String> keysWithVariableValues(Map<String, String> map, Pattern pattern) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (entry.getValue() != null && pattern.matcher(entry.getValue()).find()) {
                hashSet.add(entry.getKey());
            }
        }
        return hashSet;
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public List<ConfigKeyInfo> connectorPluginConfig(String str) {
        ConfigDef config;
        Plugins plugins = plugins();
        try {
            try {
                LoaderSwap withClassLoader = plugins.withClassLoader(plugins.pluginClass(str).getClassLoader());
                Throwable th = null;
                try {
                    Object newPlugin = plugins.newPlugin(str);
                    PluginType from = PluginType.from(newPlugin.getClass());
                    ConfigDef configDef = null;
                    switch (from) {
                        case SINK:
                            configDef = SinkConnectorConfig.configDef();
                            config = ((SinkConnector) newPlugin).config();
                            break;
                        case SOURCE:
                            configDef = SourceConnectorConfig.configDef();
                            config = ((SourceConnector) newPlugin).config();
                            break;
                        case CONVERTER:
                            config = ((Converter) newPlugin).config();
                            break;
                        case HEADER_CONVERTER:
                            config = ((HeaderConverter) newPlugin).config();
                            break;
                        case TRANSFORMATION:
                            config = ((Transformation) newPlugin).config();
                            break;
                        case PREDICATE:
                            config = ((Predicate) newPlugin).config();
                            break;
                        default:
                            throw new BadRequestException("Invalid plugin type " + from + ". Valid types are sink, source, converter, header_converter, transformation, predicate.");
                    }
                    LinkedHashMap linkedHashMap = new LinkedHashMap(config.configKeys());
                    if (configDef != null) {
                        Map<String, ConfigDef.ConfigKey> configKeys = configDef.configKeys();
                        linkedHashMap.getClass();
                        configKeys.forEach((v1, v2) -> {
                            r1.putIfAbsent(v1, v2);
                        });
                    }
                    ArrayList arrayList = new ArrayList();
                    Iterator it = linkedHashMap.values().iterator();
                    while (it.hasNext()) {
                        arrayList.add(convertConfigKey((ConfigDef.ConfigKey) it.next()));
                    }
                    return arrayList;
                } finally {
                    if (withClassLoader != null) {
                        if (0 != 0) {
                            try {
                                withClassLoader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            withClassLoader.close();
                        }
                    }
                }
            } catch (ClassNotFoundException e) {
                throw new ConnectException("Failed to load plugin class or one of its dependencies", e);
            }
        } catch (ClassNotFoundException e2) {
            throw new NotFoundException("Unknown plugin " + str + ".");
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void connectorOffsets(String str, Callback<ConnectorOffsets> callback) {
        ClusterConfigState snapshot = this.configBackingStore.snapshot();
        try {
            if (snapshot.contains(str)) {
                this.worker.connectorOffsets(str, snapshot.connectorConfig(str), callback);
            } else {
                callback.onCompletion(new NotFoundException("Connector " + str + " not found"), null);
            }
        } catch (Throwable th) {
            callback.onCompletion(th, null);
        }
    }
}
