package io.confluent.security.audit.telemetry.exporter;

import java.io.Closeable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/audit/telemetry/exporter/TopicManager.class */
public class TopicManager implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TopicManager.class);
    private final ConcurrentHashMap<String, Boolean> topicExists;
    private final ConcurrentHashMap<String, TopicSpec> topicMap;
    private final ThreadPoolExecutor reconcileJobExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(1), new ThreadPoolExecutor.AbortPolicy());
    private final AdminClient adminClient;
    private final Map<String, String> defaultTopicConfig;
    private final Integer defaultTopicPartitions;
    private final Integer defaultTopicReplicas;
    private final Integer timeOutMs;
    private Future<Boolean> reconcileFuture;

    /* loaded from: input_file:io/confluent/security/audit/telemetry/exporter/TopicManager$Builder.class */
    public static final class Builder {
        private Properties adminClientProperties;
        private Integer defaultTopicPartitions;
        private Integer defaultTopicReplicas;
        private Map<String, String> defaultTopicConfig;
        private Integer timeOutMs;
        private Map<String, TopicSpec> topics;

        private Builder() {
            this.defaultTopicConfig = new HashMap();
        }

        public Builder setDefaultTopicPartitions(Integer num) {
            this.defaultTopicPartitions = num;
            return this;
        }

        public Builder setDefaultTopicReplicas(Integer num) {
            this.defaultTopicReplicas = num;
            return this;
        }

        public Builder setAdminClientProperties(Properties properties) {
            this.adminClientProperties = properties;
            return this;
        }

        public Builder setDefaultTopicConfig(Map<String, String> map) {
            this.defaultTopicConfig = map;
            return this;
        }

        public Builder setTimeOutMs(Integer num) {
            this.timeOutMs = num;
            return this;
        }

        public Builder setTopics(Map<String, TopicSpec> map) {
            this.topics = map;
            return this;
        }

        public TopicManager build() {
            Objects.requireNonNull(this.adminClientProperties, "Admin client properties is required.");
            Objects.requireNonNull(this.timeOutMs, "timeout is required");
            return new TopicManager(this.adminClientProperties, this.defaultTopicConfig, this.defaultTopicPartitions, this.defaultTopicReplicas, this.timeOutMs, this.topics);
        }
    }

    public TopicManager(Properties properties, Map<String, String> map, Integer num, Integer num2, Integer num3, Map<String, TopicSpec> map2) {
        this.adminClient = AdminClient.create(properties);
        this.defaultTopicConfig = map;
        this.defaultTopicPartitions = num;
        this.defaultTopicReplicas = num2;
        this.timeOutMs = num3;
        this.reconcileJobExecutor.setThreadFactory(runnable -> {
            Thread thread = new Thread(runnable, "confluent-event-topic-manager-task-scheduler");
            thread.setDaemon(true);
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                log.error("Uncaught exception in thread '{}':", thread2.getName(), th);
            });
            return thread;
        });
        this.reconcileJobExecutor.allowCoreThreadTimeOut(true);
        this.topicExists = new ConcurrentHashMap<>();
        this.topicMap = new ConcurrentHashMap<>();
        map2.entrySet().stream().forEach(entry -> {
            this.topicMap.put(entry.getKey(), entry.getValue());
            this.topicExists.put(entry.getKey(), false);
        });
        checkTopics();
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public Future<Boolean> ensureTopics() {
        try {
            this.reconcileFuture = this.reconcileJobExecutor.submit(this::reconcile);
        } catch (RejectedExecutionException e) {
        }
        return this.reconcileFuture;
    }

    public Future<Boolean> checkTopics() {
        try {
            this.reconcileFuture = this.reconcileJobExecutor.submit(this::check);
        } catch (RejectedExecutionException e) {
        }
        return this.reconcileFuture;
    }

    private void mergeDefaults(TopicSpec topicSpec) {
        if (topicSpec.partitions() <= 0) {
            topicSpec.setPartitions(this.defaultTopicPartitions.intValue());
        }
        if (topicSpec.replicationFactor() <= 0) {
            topicSpec.setReplicationFactor(this.defaultTopicReplicas.intValue());
        }
        if (this.defaultTopicConfig != null) {
            this.defaultTopicConfig.entrySet().stream().forEach(entry -> {
            });
        }
    }

    private void updateTopicExistsWithDescribedTopics() {
        for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : this.adminClient.describeTopics(this.topicMap.keySet(), new DescribeTopicsOptions().timeoutMs(this.timeOutMs)).topicNameValues().entrySet()) {
            try {
                TopicDescription topicDescription = entry.getValue().get(this.timeOutMs.intValue(), TimeUnit.MILLISECONDS);
                this.topicExists.put(topicDescription.name(), true);
                log.debug("Event log topic {} ready with {} partitions", entry.getKey(), Integer.valueOf(topicDescription.partitions().size()));
            } catch (InterruptedException e) {
                log.warn("interrupted while describing event log topic {}", entry.getKey(), e);
            } catch (ExecutionException | TimeoutException e2) {
                if (e2.getCause() instanceof UnknownTopicOrPartitionException) {
                    this.topicExists.put(entry.getKey(), false);
                } else {
                    log.error("error while describing topics", (Throwable) e2);
                }
            }
        }
    }

    public boolean check() {
        updateTopicExistsWithDescribedTopics();
        return this.topicExists.values().stream().allMatch(bool -> {
            return bool.booleanValue();
        });
    }

    public boolean reconcile() {
        try {
            updateTopicExistsWithDescribedTopics();
            for (Map.Entry<String, KafkaFuture<Void>> entry : this.adminClient.createTopics((List) ((Set) this.topicExists.entrySet().stream().filter(entry2 -> {
                return !((Boolean) entry2.getValue()).booleanValue();
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toSet())).stream().map(str -> {
                TopicSpec topicSpec = this.topicMap.get(str);
                mergeDefaults(topicSpec);
                return new NewTopic(str, topicSpec.partitions(), (short) topicSpec.replicationFactor()).configs(topicSpec.config());
            }).collect(Collectors.toList()), new CreateTopicsOptions().timeoutMs(this.timeOutMs)).values().entrySet()) {
                try {
                    entry.getValue().get(this.timeOutMs.intValue(), TimeUnit.MILLISECONDS);
                    this.topicExists.put(entry.getKey(), true);
                } catch (ExecutionException | TimeoutException e) {
                    if (e.getCause() instanceof TopicExistsException) {
                        this.topicExists.put(entry.getKey(), true);
                    } else {
                        log.error("error while creating topic " + entry.getKey(), (Throwable) e);
                    }
                }
            }
        } catch (InterruptedException e2) {
            log.warn("Event log topic initialization interrupted");
        }
        return this.topicExists.values().stream().allMatch(bool -> {
            return bool.booleanValue();
        });
    }

    public void addTopic(TopicSpec topicSpec) {
        this.topicMap.put(topicSpec.name(), topicSpec);
    }

    public boolean topicExists(String str) {
        return this.topicExists.getOrDefault(str, false).booleanValue();
    }

    public boolean topicManaged(String str) {
        return this.topicMap.containsKey(str);
    }

    public Set<String> managedTopics() {
        return new HashSet(this.topicMap.keySet());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.reconcileJobExecutor.shutdownNow();
        this.adminClient.close();
    }
}
