package io.confluent.security.store.kafka.clients;

import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/store/kafka/clients/KafkaUtils.class */
public class KafkaUtils {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaUtils.class);
    private static final int RETRY_BACKOFF_MS = 10;

    public static void waitForTopic(String str, int i, Time time, Duration duration, Function<String, Set<Integer>> function, Consumer<String> consumer) {
        long millis = duration.toMillis();
        long milliseconds = time.milliseconds() + duration.toMillis();
        Set set = (Set) IntStream.range(0, i).boxed().collect(Collectors.toSet());
        Set<Integer> emptySet = Collections.emptySet();
        boolean z = false;
        while (true) {
            Throwable th = null;
            Throwable th2 = null;
            try {
                emptySet = function.apply(str);
            } catch (UnknownTopicOrPartitionException e) {
                if (!z && consumer != null) {
                    try {
                        log.debug("Topic not found, attempting to create topic {}", str);
                        consumer.accept(str);
                        z = true;
                    } catch (InvalidReplicationFactorException | RetriableException e2) {
                        log.debug("Failed to create topic " + str, e2);
                        th2 = e2;
                    }
                }
            } catch (RetriableException e3) {
                log.debug("Partition info could not be obtained for " + str, e3);
                th = e3;
            }
            if (emptySet != null && !emptySet.isEmpty()) {
                if (set.equals(emptySet)) {
                    log.debug("Topic {} has the expected {} partitions, returning", str, Integer.valueOf(i));
                    return;
                } else if (emptySet.size() >= i || ((Integer) Collections.max(emptySet)).intValue() > i - 1) {
                    break;
                } else {
                    log.debug("Topic {} has partitions {}, waiting for 0-{}", str, emptySet, Integer.valueOf(i - 1));
                }
            }
            long milliseconds2 = milliseconds - time.milliseconds();
            if (milliseconds2 <= 0) {
                Throwable th3 = th2 != null ? th2 : th;
                throw new TimeoutException(String.format("Full metadata for topic %s not available within timeout %s ms, available partitions %s, expected 0-%d. This happened during %s call. This means that not enough healthy brokers were found to satisfy partitions required for topic to become usable. If this error is preventing broker startup, you may want to increase timeout using confluent.authorizer.init.timeout.ms config property.", str, Long.valueOf(millis), emptySet, Integer.valueOf(i - 1), th3 == th2 ? "Create" : "Describe"), th3);
            }
            time.sleep(Math.min(milliseconds2, 10L));
        }
        throw new IllegalStateException(String.format("Unexpected partitions for topic %s: expected 0-%d, got %s", str, Integer.valueOf(i - 1), emptySet));
    }
}
