package com.linkedin.kafka.cruisecontrol.common;

import com.linkedin.kafka.cruisecontrol.ConfigFetchErrorHandler;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.databalancer.utils.OperationRetryer;
import io.confluent.databalancer.utils.RetryableResult;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/common/BatchedConfigsFetcher.class */
public class BatchedConfigsFetcher {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BatchedConfigsFetcher.class);
    private static final Duration DESCRIBE_CONFIG_PER_BATCH_WAIT_RETRY_DELAY = Duration.ofSeconds(1);
    private final Duration describeConfigsPerBatchMaxRetryDuration;
    private final List<ConfigResource> configResources;
    private final KafkaCruiseControlConfig config;
    private final Admin adminClient;
    private final Time time;
    private final int timeoutMs;
    private final boolean includeSynonyms;
    private final boolean ignoreUnknownTopicOrPartition;
    private final ConfigFetchErrorHandler errorHandler;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/common/BatchedConfigsFetcher$Builder.class */
    public static class Builder {
        private final KafkaCruiseControlConfig config;
        private final Admin adminClient;
        private final ConfigResource.Type type;
        private final Time time;
        private Collection<String> entities;
        private int timeoutMs;
        private final Duration describeConfigsPerBatchMaxRetryDuration;
        private boolean includeSynonyms;
        private boolean ignoreUnknownTopicOrPartitionException;
        private ConfigFetchErrorHandler errorHandler;

        private Builder(Admin admin, KafkaCruiseControlConfig kafkaCruiseControlConfig, ConfigResource.Type type, Time time) {
            this.adminClient = admin;
            this.config = kafkaCruiseControlConfig;
            this.type = type;
            this.time = time;
            this.timeoutMs = kafkaCruiseControlConfig.getInt(KafkaCruiseControlConfig.DESCRIBE_CONFIGS_RESPONSE_TIMEOUT_MS_CONFIG).intValue();
            this.describeConfigsPerBatchMaxRetryDuration = Duration.ofMillis(this.timeoutMs * 3);
        }

        public Builder entities(Collection<String> collection) {
            this.entities = collection;
            return this;
        }

        public Builder entity(String str) {
            this.entities = Collections.singletonList(str);
            return this;
        }

        public Builder timeout(int i) {
            this.timeoutMs = i;
            return this;
        }

        public Builder includeSynonyms(boolean z) {
            this.includeSynonyms = z;
            return this;
        }

        public Builder ignoreUnknownTopicOrPartitionException(boolean z) {
            this.ignoreUnknownTopicOrPartitionException = z;
            return this;
        }

        public Builder errorHandler(ConfigFetchErrorHandler configFetchErrorHandler) {
            this.errorHandler = configFetchErrorHandler;
            return this;
        }

        public BatchedConfigsFetcher build() {
            if (this.entities == null) {
                throw new IllegalStateException("Entities to fetch configs for is not provided.");
            }
            return new BatchedConfigsFetcher(this.adminClient, this.config, this.entities, this.type, this.time, this.timeoutMs, this.describeConfigsPerBatchMaxRetryDuration, this.includeSynonyms, this.ignoreUnknownTopicOrPartitionException, this.errorHandler);
        }
    }

    public static Builder of(Admin admin, KafkaCruiseControlConfig kafkaCruiseControlConfig, ConfigResource.Type type, Time time) {
        return new Builder(admin, kafkaCruiseControlConfig, type, time);
    }

    private BatchedConfigsFetcher(Admin admin, KafkaCruiseControlConfig kafkaCruiseControlConfig, Collection<String> collection, ConfigResource.Type type, Time time, int i, Duration duration, boolean z, boolean z2, ConfigFetchErrorHandler configFetchErrorHandler) {
        this.adminClient = admin;
        this.config = kafkaCruiseControlConfig;
        this.includeSynonyms = z;
        this.ignoreUnknownTopicOrPartition = z2;
        this.time = time;
        this.timeoutMs = i;
        this.describeConfigsPerBatchMaxRetryDuration = duration;
        this.configResources = (List) collection.stream().map(str -> {
            return new ConfigResource(type, str);
        }).collect(Collectors.toList());
        this.errorHandler = configFetchErrorHandler;
    }

    public Map<ConfigResource, Config> getConfigs() {
        int intValue = this.config.getInt(KafkaCruiseControlConfig.DESCRIBE_CONFIGS_BATCH_SIZE_CONFIG).intValue();
        AtomicInteger atomicInteger = new AtomicInteger();
        Collection<List> singleton = intValue <= 0 ? Collections.singleton(this.configResources) : ((Map) this.configResources.stream().collect(Collectors.groupingBy(configResource -> {
            return Integer.valueOf(atomicInteger.getAndIncrement() / intValue);
        }))).values();
        LOG.debug("Going to use batch size of {} to get config of {} entities, resulting in {} describeConfigs call", Integer.valueOf(intValue), Integer.valueOf(this.configResources.size()), Integer.valueOf(singleton.size()));
        DescribeConfigsOptions includeSynonyms = new DescribeConfigsOptions().timeoutMs(Integer.valueOf(this.timeoutMs)).includeSynonyms(this.includeSynonyms);
        OperationRetryer operationRetryer = new OperationRetryer(this.time, this.describeConfigsPerBatchMaxRetryDuration, DESCRIBE_CONFIG_PER_BATCH_WAIT_RETRY_DELAY, "Get entity configuration.");
        long milliseconds = this.time.milliseconds();
        HashMap hashMap = new HashMap();
        for (List list : singleton) {
            try {
                Optional ofNullable = Optional.ofNullable((Map) operationRetryer.runWithRetries(() -> {
                    DescribeConfigsResult describeConfigs = this.adminClient.describeConfigs(list, includeSynonyms);
                    HashMap hashMap2 = new HashMap();
                    for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : describeConfigs.values().entrySet()) {
                        try {
                            hashMap2.put(entry.getKey(), entry.getValue().get(this.timeoutMs, TimeUnit.MILLISECONDS));
                        } catch (Exception e) {
                            if (this.errorHandler == null) {
                                if (!(e.getCause() instanceof UnknownTopicOrPartitionException) || !this.ignoreUnknownTopicOrPartition) {
                                    LOG.error("Error when describing config for resource: {}", list, e);
                                    return RetryableResult.Incomplete.instance();
                                }
                                LOG.error("Ignoring {} as it doesn't exist anymore.", entry.getKey().name());
                            } else {
                                if (list.size() == 1) {
                                    return this.errorHandler.handleError(entry.getKey(), e);
                                }
                                this.errorHandler.handleError(entry.getKey(), e);
                            }
                        }
                    }
                    return RetryableResult.Success.of(hashMap2);
                }));
                hashMap.getClass();
                ofNullable.ifPresent(hashMap::putAll);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.error("Interrupted when describing config for resource: {}", list, e);
                SbkAdminUtils.sneakyThrow(e);
            }
        }
        long milliseconds2 = this.time.milliseconds();
        if (milliseconds2 - milliseconds > 15000) {
            LOG.warn("getConfigs completed successfully after a considerable period of time - {} ms", Long.valueOf(milliseconds2 - milliseconds));
        }
        return hashMap;
    }
}
