package io.confluent.connect.replicator.util;

import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminOperationException;
import kafka.admin.BrokerMetadata;
import kafka.admin.RackAwareMode;
import kafka.admin.RackAwareMode$Safe$;
import kafka.common.TopicPlacement;
import kafka.server.ConfigType;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import kafka.zk.TopicsZNode;
import kafka.zookeeper.ZNodeChildChangeHandler;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;

@Deprecated
/* loaded from: input_file:io/confluent/connect/replicator/util/ReplicatorAdminClientWithZk.class */
public class ReplicatorAdminClientWithZk implements ReplicatorAdminClient {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReplicatorAdminClient.class);
    private static final RackAwareMode RACK_AWARE_SAFE = RackAwareMode$Safe$.MODULE$;
    private static final String TOPIC_METADATA_PATH = TopicsZNode.path();
    private static final int FORCE_REFRESH_DEADLINE = 0;
    private final KafkaZkClient kafkaZkClient;
    private final AdminZkClient adminZkClient;
    private final Time time;
    private final long maxAgeMs;
    private Collection<BrokerMetadata> brokers;
    private long nextBrokerMetadataRefresh;
    private long nextTopicMetadataRefresh;
    private ReplicatorAdminClient.TopicMetadataListener metadataListener;
    private ZNodeChildChangeHandler childChangeHandler;
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private Map<String, TopicMetadata> topicMetadata = new HashMap();
    private Set<String> interestedTopics = Collections.emptySet();

    public ReplicatorAdminClientWithZk(KafkaZkClient kafkaZkClient, Time time, long j) {
        this.kafkaZkClient = kafkaZkClient;
        this.time = time;
        this.maxAgeMs = j;
        long milliseconds = time.milliseconds();
        this.nextTopicMetadataRefresh = milliseconds;
        this.nextBrokerMetadataRefresh = milliseconds;
        this.adminZkClient = new AdminZkClient(kafkaZkClient);
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public String clusterId() {
        return (String) this.kafkaZkClient.getClusterId().getOrElse(null);
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public Properties topicConfig(String str) {
        return this.adminZkClient.fetchEntityConfig(ConfigType.Topic(), str);
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public void changeTopicConfig(String str, Properties properties) {
        this.adminZkClient.changeTopicConfig(str, properties);
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized int aliveBrokers() {
        long milliseconds = this.time.milliseconds();
        if (this.brokers != null && milliseconds < this.nextBrokerMetadataRefresh) {
            return this.brokers.size();
        }
        refreshBrokerMetadata();
        return this.brokers.size();
    }

    private void refreshBrokerMetadata() {
        log.debug("Refreshing broker metadata...");
        this.brokers = JavaConversions.asJavaCollection(this.adminZkClient.getBrokerMetadatas(RACK_AWARE_SAFE, Option.empty()));
        this.nextBrokerMetadataRefresh = this.time.milliseconds() + this.maxAgeMs;
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized void setInterestedTopics(Set<String> set, ReplicatorAdminClient.TopicMetadataListener topicMetadataListener) {
        this.metadataListener = topicMetadataListener;
        this.interestedTopics = set;
        this.topicMetadata.keySet().retainAll(set);
        if (this.topicMetadata.size() < set.size()) {
            this.nextTopicMetadataRefresh = 0L;
        }
        this.childChangeHandler = new ZNodeChildChangeHandler() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZk.1
            @Override // kafka.zookeeper.ZNodeChildChangeHandler
            public String path() {
                return ReplicatorAdminClientWithZk.TOPIC_METADATA_PATH;
            }

            @Override // kafka.zookeeper.ZNodeChildChangeHandler
            public void handleChildChange() {
                ReplicatorAdminClientWithZk.this.executor.execute(() -> {
                    synchronized (ReplicatorAdminClientWithZk.this) {
                        Set<String> asJavaSet = JavaConversions.setAsJavaSet(ReplicatorAdminClientWithZk.this.kafkaZkClient.getAllTopicsInCluster());
                        HashSet hashSet = new HashSet();
                        for (String str : asJavaSet) {
                            if (ReplicatorAdminClientWithZk.this.interestedTopics.contains(str)) {
                                hashSet.add(str);
                            }
                        }
                        if (!hashSet.isEmpty()) {
                            ReplicatorAdminClientWithZk.this.refreshTopicMetadata(hashSet);
                        }
                    }
                });
            }
        };
        this.kafkaZkClient.registerZNodeChildChangeHandler(this.childChangeHandler);
        this.kafkaZkClient.getAllTopicsInCluster();
    }

    private Map<String, TopicMetadata> topicMetadata() {
        if (this.time.milliseconds() >= this.nextTopicMetadataRefresh) {
            refreshTopicMetadata(this.interestedTopics);
        }
        return this.topicMetadata;
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized TopicMetadata topicMetadata(String str) {
        return topicMetadata().get(str);
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized boolean partitionExists(TopicPartition topicPartition) {
        TopicMetadata topicMetadata = topicMetadata().get(topicPartition.topic());
        return topicMetadata != null && topicPartition.partition() < topicMetadata.numPartitions();
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized boolean topicExists(String str) {
        return topicMetadata().get(str) != null;
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public boolean createTopic(String str, int i, short s, Properties properties) {
        try {
            log.info("Creating topic {} with {} partitions, replication factor {}, and config {}", str, Integer.valueOf(i), Short.valueOf(s), properties);
            this.nextTopicMetadataRefresh = 0L;
            this.adminZkClient.createTopic(str, i, s, properties, RACK_AWARE_SAFE, false);
            return true;
        } catch (TopicExistsException e) {
            log.debug("Failed to create topic {} because it already exists. Ignoring exception", str, e);
            return false;
        }
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public boolean createTopic(String str, Optional<Integer> optional, Optional<Short> optional2, Properties properties) {
        if (optional.isPresent() && optional2.isPresent()) {
            return createTopic(str, optional.get().intValue(), optional2.get().shortValue(), properties);
        }
        throw new InvalidConfigurationException(getClass().getName() + " does not support parameters with default options. Please specify the number of partitions and replication factor explicitly.");
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public void addPartitions(String str, int i) {
        Map mapAsJavaMap = JavaConversions.mapAsJavaMap(this.kafkaZkClient.getFullReplicaAssignmentForTopics(JavaConversions.asScalaSet(new HashSet(Arrays.asList(str))).toSet()));
        if (mapAsJavaMap.isEmpty()) {
            throw new AdminOperationException("The topic " + str + " does not exist");
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : mapAsJavaMap.entrySet()) {
            hashMap.put(Integer.valueOf(((TopicPartition) entry.getKey()).partition()), entry.getValue());
        }
        log.debug("Increasing the number of partitions for topic {} to {}", str, Integer.valueOf(i));
        this.adminZkClient.addPartitions(str, JavaConversions.mapAsScalaMap(hashMap), this.adminZkClient.getBrokerMetadatas(RACK_AWARE_SAFE, Option.empty()), i, Option.apply((scala.collection.Map) null), false, Option.apply((TopicPlacement) null));
        this.nextTopicMetadataRefresh = 0L;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void refreshTopicMetadata(Set<String> set) {
        log.debug("Refreshing topic metadata...");
        scala.collection.Map<String, Seq<Object>> partitionsForTopics = this.kafkaZkClient.getPartitionsForTopics(JavaConversions.asScalaSet(set).toSet());
        Iterator<String> keysIterator = partitionsForTopics.keysIterator();
        this.topicMetadata.clear();
        while (keysIterator.hasNext()) {
            String mo9209next = keysIterator.mo9209next();
            int size = partitionsForTopics.get(mo9209next).get().size();
            if (size > 0) {
                this.topicMetadata.put(mo9209next, new TopicMetadata(mo9209next, size));
            }
        }
        this.nextTopicMetadataRefresh = this.time.milliseconds() + this.maxAgeMs;
        if (this.metadataListener != null) {
            this.metadataListener.onTopicMetadataRefresh();
        }
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient, java.lang.AutoCloseable
    public void close() {
        this.kafkaZkClient.close();
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(30L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
