package org.apache.kafka.connect.cli;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/cli/ConnectDistributed.class */
public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectDistributed.class);

    public ConnectDistributed(String... strArr) {
        super(strArr);
    }

    @Override // org.apache.kafka.connect.cli.AbstractConnectCli
    protected String usage() {
        return "ConnectDistributed worker.properties";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.connect.cli.AbstractConnectCli
    public Herder createHerder(DistributedConfig distributedConfig, String str, Plugins plugins, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, RestServer restServer, RestClient restClient) {
        String kafkaClusterId = distributedConfig.kafkaClusterId();
        String clientIdBase = ConnectUtils.clientIdBase(distributedConfig);
        HashMap hashMap = new HashMap(distributedConfig.originals());
        ConnectUtils.addMetricsContextProperties(hashMap, distributedConfig, kafkaClusterId);
        hashMap.put("client.id", clientIdBase + "shared-admin");
        SharedTopicAdmin sharedTopicAdmin = new SharedTopicAdmin(hashMap);
        KafkaOffsetBackingStore kafkaOffsetBackingStore = new KafkaOffsetBackingStore(sharedTopicAdmin, () -> {
            return clientIdBase;
        }, plugins.newInternalConverter(true, JsonConverter.class.getName(), Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
        kafkaOffsetBackingStore.configure(distributedConfig);
        Worker worker = new Worker(str, Time.SYSTEM, plugins, distributedConfig, kafkaOffsetBackingStore, connectorClientConfigOverridePolicy);
        WorkerConfigTransformer configTransformer = worker.configTransformer();
        Converter internalValueConverter = worker.getInternalValueConverter();
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(Time.SYSTEM, internalValueConverter, sharedTopicAdmin, clientIdBase);
        kafkaStatusBackingStore.configure(distributedConfig);
        return new DistributedHerder(distributedConfig, Time.SYSTEM, worker, kafkaClusterId, kafkaStatusBackingStore, new KafkaConfigBackingStore(internalValueConverter, distributedConfig, configTransformer, sharedTopicAdmin, clientIdBase), restServer.advertisedUrl().toString(), restClient, connectorClientConfigOverridePolicy, Collections.emptyList(), sharedTopicAdmin);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.kafka.connect.cli.AbstractConnectCli
    protected DistributedConfig createConfig(Map<String, String> map) {
        return new DistributedConfig(map);
    }

    public static void main(String[] strArr) {
        new ConnectDistributed(strArr).run();
    }

    @Override // org.apache.kafka.connect.cli.AbstractConnectCli
    protected /* bridge */ /* synthetic */ DistributedConfig createConfig(Map map) {
        return createConfig((Map<String, String>) map);
    }
}
