package kafka.catalog;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import kafka.catalog.event.MetadataCollectorEvent;
import kafka.catalog.exceptions.CollectorContextNotInitializedException;
import kafka.server.KafkaConfig;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;

/* loaded from: input_file:kafka/catalog/ZKTopicMetadataCollectorEventQueueIntegrationTest.class */
public class ZKTopicMetadataCollectorEventQueueIntegrationTest {

    @Mock
    private KafkaConfig mockConfig;

    @Mock
    private KafkaZkClient mockZkClient;

    @Mock
    private Logger mockLogger;
    private ZKTopicMetadataCollector collector;
    private ZKTopicMetadataCollectorConfig config;
    private ArgumentCaptor<MetadataCollectorEvent> submittedEvent;
    private Metrics metrics;
    private Time time;
    private MetadataCollectorEventQueue eventQueue;

    /* loaded from: input_file:kafka/catalog/ZKTopicMetadataCollectorEventQueueIntegrationTest$TestControllerActiveEvent.class */
    public class TestControllerActiveEvent extends MetadataCollectorEvent {
        private final boolean shouldActive;
        private final int expectedEpoch;
        private final CountDownLatch countDownLatch;

        public TestControllerActiveEvent(ZKTopicMetadataCollector zKTopicMetadataCollector, CountDownLatch countDownLatch, int i, boolean z, Time time) {
            super(zKTopicMetadataCollector, time);
            this.expectedEpoch = i;
            this.countDownLatch = countDownLatch;
            this.shouldActive = z;
        }

        public void run() throws Exception {
            Assertions.assertEquals(Boolean.valueOf(this.shouldActive), Boolean.valueOf(this.collector.isActive()));
            if (this.shouldActive) {
                Assertions.assertEquals(this.expectedEpoch, ((ZKTopicMetadataCollectorContext) this.collector.collectorContext().get()).epoch());
            }
            this.countDownLatch.countDown();
        }
    }

    @BeforeEach
    void setUp() {
        MockitoAnnotations.openMocks(this);
        this.submittedEvent = ArgumentCaptor.forClass(MetadataCollectorEvent.class);
        this.time = new MockTime();
        this.metrics = new Metrics();
        this.config = new ZKTopicMetadataCollectorConfig(30, 300, 1, 1, 1, "foo");
        this.eventQueue = new MetadataCollectorEventQueue(this.time);
        this.collector = new ZKTopicMetadataCollector(this.mockConfig, this.config, this.mockZkClient, this.metrics, this.mockLogger, this.eventQueue, Optional.empty(), false, this.time);
    }

    @AfterEach
    void tearDown() {
        this.collector.shutdown();
    }

    @Test
    public void testCollectorLifeCycle() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(4);
        this.collector.enable(Collections.EMPTY_MAP, 0);
        this.eventQueue.append(new TestControllerActiveEvent(this.collector, countDownLatch, 0, true, this.time));
        TestUtils.waitForCondition(() -> {
            return countDownLatch.getCount() == 3;
        }, "Test Latch never decrement");
        this.collector.disable();
        this.eventQueue.append(new TestControllerActiveEvent(this.collector, countDownLatch, -1, false, this.time));
        TestUtils.waitForCondition(() -> {
            return countDownLatch.getCount() == 2;
        }, "Test Latch never decrement");
        this.collector.enable(Collections.EMPTY_MAP, 1);
        this.eventQueue.append(new TestControllerActiveEvent(this.collector, countDownLatch, 1, true, this.time));
        TestUtils.waitForCondition(() -> {
            return countDownLatch.getCount() == 1;
        }, "Test Latch never decrement");
        this.collector.disable();
        this.eventQueue.append(new TestControllerActiveEvent(this.collector, countDownLatch, -1, false, this.time));
        TestUtils.waitForCondition(() -> {
            return countDownLatch.getCount() == 0;
        }, "Test Latch never decrement");
        this.collector.shutdown();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.eventQueue.append(new TestControllerActiveEvent(this.collector, countDownLatch, -1, false, this.time));
        }, "Should not enqueue event after collector is shutdown.");
    }

    @Test
    void testDoubleDisable() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.collector.enable(Collections.EMPTY_MAP, 1);
        this.collector.disable();
        this.collector.disable();
        this.eventQueue.append(new TestControllerActiveEvent(this.collector, countDownLatch, -1, false, this.time));
        TestUtils.waitForCondition(() -> {
            return countDownLatch.getCount() == 0;
        }, "Test Latch never decrement");
    }

    @Test
    void testDoubleShutdown() {
        this.collector.shutdown();
        this.collector.shutdown();
        ((Logger) Mockito.verify(this.mockLogger, Mockito.times(2))).info((String) ArgumentMatchers.eq("Finished shutdown."));
    }

    @Test
    public void testCollectorListenBeforeStart() throws InterruptedException {
        final AtomicReference atomicReference = new AtomicReference(null);
        this.eventQueue.append(new MetadataCollectorEvent(this.collector, this.time) { // from class: kafka.catalog.ZKTopicMetadataCollectorEventQueueIntegrationTest.1
            public void run() throws Exception {
                context();
            }

            public void handleException(Throwable th) {
                atomicReference.set(th.getClass());
            }
        });
        TestUtils.waitForCondition(() -> {
            return ((Class) atomicReference.get()).equals(CollectorContextNotInitializedException.class);
        }, "Didn't get expected exception.");
    }

    @Test
    public void testEnableCollectorAfterShutdown() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalStateException.class);
        this.collector.shutdown();
        this.collector.enable(Collections.emptyMap(), 1);
        ((Logger) Mockito.verify(this.mockLogger)).error((String) ArgumentMatchers.any(), (Throwable) forClass.capture());
        Assertions.assertNotNull(forClass.getValue());
    }
}
