package org.apache.kafka.queue;

import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(60)
/* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueueTest.class */
public class KafkaEventQueueTest {
    private static final long ONE_HOUR_NS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.HOURS);

    /* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueueTest$ExceptionTrapperEvent.class */
    static class ExceptionTrapperEvent implements EventQueue.Event {
        final CompletableFuture<Throwable> exception = new CompletableFuture<>();

        ExceptionTrapperEvent() {
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            this.exception.complete(null);
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void handleException(Throwable th) {
            this.exception.complete(th);
        }
    }

    /* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueueTest$FutureEvent.class */
    private static class FutureEvent<T> implements EventQueue.Event {
        private final CompletableFuture<T> future;
        private final Supplier<T> supplier;

        FutureEvent(CompletableFuture<T> completableFuture, Supplier<T> supplier) {
            this.future = completableFuture;
            this.supplier = supplier;
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            this.future.complete(this.supplier.get());
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void handleException(Throwable th) {
            this.future.completeExceptionally(th);
        }
    }

    /* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueueTest$InterruptibleEvent.class */
    private static class InterruptibleEvent implements EventQueue.Event {
        private final CompletableFuture<Void> runFuture = new CompletableFuture<>();
        private final CompletableFuture<Thread> queueThread;
        private final AtomicInteger numCallsToRun;
        private final AtomicInteger numInterruptedExceptionsSeen;

        InterruptibleEvent(CompletableFuture<Thread> completableFuture, AtomicInteger atomicInteger, AtomicInteger atomicInteger2) {
            this.queueThread = completableFuture;
            this.numCallsToRun = atomicInteger;
            this.numInterruptedExceptionsSeen = atomicInteger2;
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            this.numCallsToRun.incrementAndGet();
            this.queueThread.complete(Thread.currentThread());
            this.runFuture.get();
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void handleException(Throwable th) {
            if (th instanceof InterruptedException) {
                this.numInterruptedExceptionsSeen.incrementAndGet();
                Thread.currentThread().interrupt();
            }
        }
    }

    @Test
    public void testCreateAndClose() throws Exception {
        new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testCreateAndClose").close();
    }

    @Test
    public void testHandleEvents() throws Exception {
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testHandleEvents");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture completableFuture = new CompletableFuture();
        kafkaEventQueue.prepend(new FutureEvent(completableFuture, () -> {
            Assertions.assertEquals(1, atomicInteger.incrementAndGet());
            return 1;
        }));
        CompletableFuture completableFuture2 = new CompletableFuture();
        kafkaEventQueue.appendWithDeadline(Time.SYSTEM.nanoseconds() + TimeUnit.SECONDS.toNanos(60L), new FutureEvent(completableFuture2, () -> {
            Assertions.assertEquals(2, atomicInteger.incrementAndGet());
            return 2;
        }));
        CompletableFuture completableFuture3 = new CompletableFuture();
        kafkaEventQueue.append(new FutureEvent(completableFuture3, () -> {
            Assertions.assertEquals(3, atomicInteger.incrementAndGet());
            return 3;
        }));
        Assertions.assertEquals(1, (Integer) completableFuture.get());
        Assertions.assertEquals(3, (Integer) completableFuture3.get());
        Assertions.assertEquals(2, (Integer) completableFuture2.get());
        CompletableFuture completableFuture4 = new CompletableFuture();
        kafkaEventQueue.appendWithDeadline(Time.SYSTEM.nanoseconds() + TimeUnit.SECONDS.toNanos(60L), new FutureEvent(completableFuture4, () -> {
            Assertions.assertEquals(4, atomicInteger.incrementAndGet());
            return 4;
        }));
        completableFuture4.get();
        kafkaEventQueue.beginShutdown("testHandleEvents");
        kafkaEventQueue.close();
    }

    @Test
    public void testTimeouts() throws Exception {
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testTimeouts");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture completableFuture = new CompletableFuture();
        kafkaEventQueue.append(new FutureEvent(completableFuture, () -> {
            Assertions.assertEquals(1, atomicInteger.incrementAndGet());
            return 1;
        }));
        CompletableFuture completableFuture2 = new CompletableFuture();
        kafkaEventQueue.append(new FutureEvent(completableFuture2, () -> {
            Assertions.assertEquals(2, atomicInteger.incrementAndGet());
            Time.SYSTEM.sleep(1L);
            return 2;
        }));
        CompletableFuture completableFuture3 = new CompletableFuture();
        kafkaEventQueue.appendWithDeadline(Time.SYSTEM.nanoseconds() + 1, new FutureEvent(completableFuture3, () -> {
            atomicInteger.incrementAndGet();
            return 3;
        }));
        CompletableFuture completableFuture4 = new CompletableFuture();
        kafkaEventQueue.append(new FutureEvent(completableFuture4, () -> {
            atomicInteger.incrementAndGet();
            return 4;
        }));
        Assertions.assertEquals(1, (Integer) completableFuture.get());
        Assertions.assertEquals(2, (Integer) completableFuture2.get());
        Assertions.assertEquals(4, (Integer) completableFuture4.get());
        Assertions.assertEquals(TimeoutException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        kafkaEventQueue.close();
        Assertions.assertEquals(3, atomicInteger.get());
    }

    @Test
    public void testScheduleDeferred() throws Exception {
        CompletableFuture completableFuture;
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testAppendDeferred");
        AtomicLong atomicLong = new AtomicLong(0L);
        do {
            atomicLong.addAndGet(1L);
            completableFuture = new CompletableFuture();
            kafkaEventQueue.scheduleDeferred(null, optionalLong -> {
                return OptionalLong.of(Time.SYSTEM.nanoseconds() + 1000000);
            }, new FutureEvent(completableFuture, () -> {
                return Boolean.valueOf(atomicLong.get() % 2 == 0);
            }));
            CompletableFuture completableFuture2 = new CompletableFuture();
            kafkaEventQueue.append(new FutureEvent(completableFuture2, () -> {
                return Long.valueOf(atomicLong.addAndGet(1L));
            }));
            completableFuture2.get();
        } while (!((Boolean) completableFuture.get()).booleanValue());
        kafkaEventQueue.close();
    }

    @Test
    public void testScheduleDeferredWithTagReplacement() throws Exception {
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testScheduleDeferredWithTagReplacement");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture completableFuture = new CompletableFuture();
        kafkaEventQueue.scheduleDeferred("foo", optionalLong -> {
            return OptionalLong.of(Time.SYSTEM.nanoseconds() + ONE_HOUR_NS);
        }, new FutureEvent(completableFuture, () -> {
            return Integer.valueOf(atomicInteger.addAndGet(1000));
        }));
        CompletableFuture completableFuture2 = new CompletableFuture();
        kafkaEventQueue.scheduleDeferred("foo", optionalLong2 -> {
            return OptionalLong.of(optionalLong2.orElse(0L) - ONE_HOUR_NS);
        }, new FutureEvent(completableFuture2, () -> {
            return Integer.valueOf(atomicInteger.addAndGet(1));
        }));
        Assertions.assertFalse(completableFuture.isDone());
        Assertions.assertEquals(1, (Integer) completableFuture2.get());
        Assertions.assertEquals(1, atomicInteger.get());
        kafkaEventQueue.close();
    }

    @Test
    public void testDeferredIsQueuedAfterTriggering() throws Exception {
        MockTime mockTime = new MockTime(0L, 100000L, 1L);
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(mockTime, new LogContext(), "testDeferredIsQueuedAfterTriggering");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        List asList = Arrays.asList(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
        kafkaEventQueue.scheduleDeferred("foo", optionalLong -> {
            return OptionalLong.of(2L);
        }, new FutureEvent((CompletableFuture) asList.get(0), () -> {
            return Integer.valueOf(atomicInteger.getAndIncrement());
        }));
        kafkaEventQueue.append(new FutureEvent((CompletableFuture) asList.get(1), () -> {
            return Integer.valueOf(atomicInteger.getAndAdd(1));
        }));
        Assertions.assertEquals(0, (Integer) ((CompletableFuture) asList.get(1)).get());
        mockTime.sleep(1L);
        kafkaEventQueue.append(new FutureEvent((CompletableFuture) asList.get(2), () -> {
            return Integer.valueOf(atomicInteger.getAndAdd(1));
        }));
        Assertions.assertEquals(1, (Integer) ((CompletableFuture) asList.get(0)).get());
        Assertions.assertEquals(2, (Integer) ((CompletableFuture) asList.get(2)).get());
        kafkaEventQueue.close();
    }

    @Test
    public void testShutdownBeforeDeferred() throws Exception {
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testShutdownBeforeDeferred");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture completableFuture = new CompletableFuture();
        kafkaEventQueue.scheduleDeferred("myDeferred", optionalLong -> {
            return OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(1L));
        }, new FutureEvent(completableFuture, () -> {
            return Integer.valueOf(atomicInteger.getAndAdd(1));
        }));
        kafkaEventQueue.beginShutdown("testShutdownBeforeDeferred");
        Assertions.assertEquals(RejectedExecutionException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        Assertions.assertEquals(0, atomicInteger.get());
        kafkaEventQueue.close();
    }

    @Test
    public void testRejectedExecutionException() throws Exception {
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testRejectedExecutionException");
        kafkaEventQueue.close();
        final CompletableFuture completableFuture = new CompletableFuture();
        kafkaEventQueue.append(new EventQueue.Event() { // from class: org.apache.kafka.queue.KafkaEventQueueTest.1
            @Override // org.apache.kafka.queue.EventQueue.Event
            public void run() throws Exception {
                completableFuture.complete(null);
            }

            @Override // org.apache.kafka.queue.EventQueue.Event
            public void handleException(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        Assertions.assertEquals(RejectedExecutionException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
    }

    @Test
    public void testSize() throws Exception {
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testEmpty");
        Assertions.assertTrue(kafkaEventQueue.isEmpty());
        CompletableFuture completableFuture = new CompletableFuture();
        kafkaEventQueue.append(() -> {
        });
        Assertions.assertFalse(kafkaEventQueue.isEmpty());
        Assertions.assertEquals(1, kafkaEventQueue.size());
        kafkaEventQueue.append(() -> {
        });
        Assertions.assertEquals(2, kafkaEventQueue.size());
        completableFuture.complete(null);
        TestUtils.waitForCondition(() -> {
            return kafkaEventQueue.isEmpty();
        }, "Failed to see the queue become empty.");
        kafkaEventQueue.scheduleDeferred("later", optionalLong -> {
            return OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(1L));
        }, () -> {
        });
        Assertions.assertFalse(kafkaEventQueue.isEmpty());
        kafkaEventQueue.scheduleDeferred("soon", optionalLong2 -> {
            return OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(1L));
        }, () -> {
        });
        Assertions.assertFalse(kafkaEventQueue.isEmpty());
        kafkaEventQueue.cancelDeferred("later");
        kafkaEventQueue.cancelDeferred("soon");
        TestUtils.waitForCondition(() -> {
            return kafkaEventQueue.isEmpty();
        }, "Failed to see the queue become empty.");
        kafkaEventQueue.close();
        Assertions.assertTrue(kafkaEventQueue.isEmpty());
    }

    @Test
    public void testHandleExceptionThrowingAnException() throws Exception {
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testHandleExceptionThrowingAnException");
        CompletableFuture completableFuture = new CompletableFuture();
        kafkaEventQueue.append(() -> {
        });
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        kafkaEventQueue.append(new EventQueue.Event() { // from class: org.apache.kafka.queue.KafkaEventQueueTest.2
            @Override // org.apache.kafka.queue.EventQueue.Event
            public void run() throws Exception {
                atomicInteger.incrementAndGet();
                throw new IllegalStateException("First exception");
            }

            @Override // org.apache.kafka.queue.EventQueue.Event
            public void handleException(Throwable th) {
                if (th instanceof IllegalStateException) {
                    atomicInteger.incrementAndGet();
                    throw new RuntimeException("Second exception");
                }
            }
        });
        kafkaEventQueue.append(() -> {
            atomicInteger.incrementAndGet();
        });
        Assertions.assertEquals(3, kafkaEventQueue.size());
        completableFuture.complete(null);
        TestUtils.waitForCondition(() -> {
            return atomicInteger.get() == 3;
        }, "Failed to see all events execute as planned.");
        kafkaEventQueue.close();
    }

    @Test
    public void testInterruptedExceptionHandling() throws Exception {
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testInterruptedExceptionHandling");
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        kafkaEventQueue.append(new InterruptibleEvent(completableFuture, atomicInteger, atomicInteger2));
        kafkaEventQueue.append(new InterruptibleEvent(completableFuture, atomicInteger, atomicInteger2));
        kafkaEventQueue.append(new InterruptibleEvent(completableFuture, atomicInteger, atomicInteger2));
        kafkaEventQueue.append(new InterruptibleEvent(completableFuture, atomicInteger, atomicInteger2));
        ((Thread) completableFuture.get()).interrupt();
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertEquals(1, atomicInteger.get());
        });
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertEquals(3, atomicInteger2.get());
        });
        kafkaEventQueue.close();
    }

    @Test
    public void testInterruptedWithEmptyQueue() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testInterruptedWithEmptyQueue", () -> {
            completableFuture.complete(null);
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        kafkaEventQueue.append(() -> {
            completableFuture2.complete(Thread.currentThread());
        });
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertEquals(0, kafkaEventQueue.size());
        });
        ((Thread) completableFuture2.get()).interrupt();
        completableFuture.get();
        ExceptionTrapperEvent exceptionTrapperEvent = new ExceptionTrapperEvent();
        kafkaEventQueue.append(exceptionTrapperEvent);
        Assertions.assertEquals(InterruptedException.class, exceptionTrapperEvent.exception.get().getClass());
        kafkaEventQueue.close();
        ExceptionTrapperEvent exceptionTrapperEvent2 = new ExceptionTrapperEvent();
        kafkaEventQueue.append(exceptionTrapperEvent2);
        Assertions.assertEquals(RejectedExecutionException.class, exceptionTrapperEvent2.exception.get().getClass());
    }

    @Test
    public void testInterruptedWithDeferredEvents() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        KafkaEventQueue kafkaEventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext(), "testInterruptedWithDeferredEvents", () -> {
            completableFuture.complete(null);
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        kafkaEventQueue.append(() -> {
            completableFuture2.complete(Thread.currentThread());
        });
        ExceptionTrapperEvent exceptionTrapperEvent = new ExceptionTrapperEvent();
        ExceptionTrapperEvent exceptionTrapperEvent2 = new ExceptionTrapperEvent();
        kafkaEventQueue.scheduleDeferred("ie2", optionalLong -> {
            return OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(2L));
        }, exceptionTrapperEvent2);
        kafkaEventQueue.scheduleDeferred("ie1", optionalLong2 -> {
            return OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(1L));
        }, exceptionTrapperEvent);
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertEquals(2, kafkaEventQueue.size());
        });
        ((Thread) completableFuture2.get()).interrupt();
        completableFuture.get();
        Assertions.assertEquals(InterruptedException.class, exceptionTrapperEvent.exception.get().getClass());
        Assertions.assertEquals(InterruptedException.class, exceptionTrapperEvent2.exception.get().getClass());
        kafkaEventQueue.close();
    }
}
