package org.apache.kafka.queue;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.queue.EventQueue;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueue.class */
public final class KafkaEventQueue implements EventQueue {
    private final Time time;
    private final Logger log;
    private final Thread eventHandlerThread;
    private final ReentrantLock lock = new ReentrantLock();
    private final EventHandler eventHandler = new EventHandler();
    private long closingTimeNs = ConfluentConfigs.PASSWORD_ENCODER_OLD_SECRET_TTL_MS_DEFAULT;
    private EventQueue.Event cleanupEvent = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueue$EventContext.class */
    public static class EventContext {
        private final EventQueue.Event event;
        private final EventQueue.EventInsertionType insertionType;
        private EventContext prev = this;
        private EventContext next = this;
        private OptionalLong deadlineNs = OptionalLong.empty();
        private String tag;

        EventContext(EventQueue.Event event, EventQueue.EventInsertionType eventInsertionType, String str) {
            this.event = event;
            this.insertionType = eventInsertionType;
            this.tag = str;
        }

        void insertAfter(EventContext eventContext) {
            this.next.prev = eventContext;
            eventContext.next = this.next;
            eventContext.prev = this;
            this.next = eventContext;
        }

        void insertBefore(EventContext eventContext) {
            this.prev.next = eventContext;
            eventContext.prev = this.prev;
            eventContext.next = this;
            this.prev = eventContext;
        }

        void remove() {
            this.prev.next = this.next;
            this.next.prev = this.prev;
            this.prev = this;
            this.next = this;
        }

        boolean isSingleton() {
            return this.prev == this && this.next == this;
        }

        void run(Logger logger) throws InterruptedException {
            try {
                this.event.run();
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                try {
                    this.event.handleException(e2);
                } catch (Throwable th) {
                    logger.error("Unexpected exception in handleException", th);
                }
            }
        }

        void completeWithTimeout() {
            completeWithException(new TimeoutException());
        }

        void completeWithException(Throwable th) {
            this.event.handleException(th);
        }
    }

    /* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueue$EventHandler.class */
    private class EventHandler implements Runnable {
        private final Map<String, EventContext> tagToEventContext;
        private final EventContext head;
        private final TreeMap<Long, EventContext> deadlineMap;
        private final Condition cond;

        private EventHandler() {
            this.tagToEventContext = new HashMap();
            this.head = new EventContext(null, null, null);
            this.deadlineMap = new TreeMap<>();
            this.cond = KafkaEventQueue.this.lock.newCondition();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                handleEvents();
                KafkaEventQueue.this.cleanupEvent.run();
            } catch (Throwable th) {
                KafkaEventQueue.this.log.warn("event handler thread exiting with exception", th);
            }
        }

        private void remove(EventContext eventContext) {
            eventContext.remove();
            if (eventContext.deadlineNs.isPresent()) {
                this.deadlineMap.remove(Long.valueOf(eventContext.deadlineNs.getAsLong()));
                eventContext.deadlineNs = OptionalLong.empty();
            }
            if (eventContext.tag != null) {
                this.tagToEventContext.remove(eventContext.tag, eventContext);
                eventContext.tag = null;
            }
        }

        private void handleEvents() throws InterruptedException {
            EventContext eventContext = null;
            EventContext eventContext2 = null;
            while (true) {
                if (eventContext != null) {
                    eventContext.completeWithTimeout();
                    eventContext = null;
                } else if (eventContext2 != null) {
                    eventContext2.run(KafkaEventQueue.this.log);
                    eventContext2 = null;
                }
                KafkaEventQueue.this.lock.lock();
                try {
                    long j = Long.MAX_VALUE;
                    Map.Entry<Long, EventContext> firstEntry = this.deadlineMap.firstEntry();
                    if (firstEntry != null) {
                        long nanoseconds = KafkaEventQueue.this.time.nanoseconds();
                        long longValue = firstEntry.getKey().longValue();
                        EventContext value = firstEntry.getValue();
                        if (longValue <= nanoseconds) {
                            if (value.insertionType == EventQueue.EventInsertionType.DEFERRED) {
                                remove(value);
                                eventContext2 = value;
                            } else {
                                remove(value);
                                eventContext = value;
                            }
                            KafkaEventQueue.this.lock.unlock();
                        } else if (KafkaEventQueue.this.closingTimeNs <= nanoseconds) {
                            remove(value);
                            eventContext = value;
                            KafkaEventQueue.this.lock.unlock();
                        } else {
                            j = longValue - nanoseconds;
                        }
                    }
                    if (this.head.next != this.head) {
                        eventContext2 = this.head.next;
                        remove(eventContext2);
                        KafkaEventQueue.this.lock.unlock();
                    } else {
                        if (KafkaEventQueue.this.closingTimeNs != ConfluentConfigs.PASSWORD_ENCODER_OLD_SECRET_TTL_MS_DEFAULT && this.deadlineMap.isEmpty()) {
                            return;
                        }
                        if (KafkaEventQueue.this.closingTimeNs != ConfluentConfigs.PASSWORD_ENCODER_OLD_SECRET_TTL_MS_DEFAULT) {
                            long nanoseconds2 = KafkaEventQueue.this.time.nanoseconds();
                            if (j > KafkaEventQueue.this.closingTimeNs - nanoseconds2) {
                                j = KafkaEventQueue.this.closingTimeNs - nanoseconds2;
                            }
                        }
                        if (j == ConfluentConfigs.PASSWORD_ENCODER_OLD_SECRET_TTL_MS_DEFAULT) {
                            this.cond.await();
                        } else {
                            this.cond.awaitNanos(j);
                        }
                        KafkaEventQueue.this.lock.unlock();
                    }
                } finally {
                    KafkaEventQueue.this.lock.unlock();
                }
            }
        }

        Exception enqueue(EventContext eventContext, Function<OptionalLong, OptionalLong> function) {
            EventContext put;
            KafkaEventQueue.this.lock.lock();
            try {
                if (KafkaEventQueue.this.closingTimeNs != ConfluentConfigs.PASSWORD_ENCODER_OLD_SECRET_TTL_MS_DEFAULT) {
                    RejectedExecutionException rejectedExecutionException = new RejectedExecutionException();
                    KafkaEventQueue.this.lock.unlock();
                    return rejectedExecutionException;
                }
                OptionalLong empty = OptionalLong.empty();
                if (eventContext.tag != null && (put = this.tagToEventContext.put(eventContext.tag, eventContext)) != null) {
                    empty = put.deadlineNs;
                    remove(put);
                }
                OptionalLong apply = function.apply(empty);
                boolean isSingleton = this.head.isSingleton();
                boolean z = false;
                switch (eventContext.insertionType) {
                    case APPEND:
                        this.head.insertBefore(eventContext);
                        if (isSingleton) {
                            z = true;
                            break;
                        }
                        break;
                    case PREPEND:
                        this.head.insertAfter(eventContext);
                        if (isSingleton) {
                            z = true;
                            break;
                        }
                        break;
                    case DEFERRED:
                        if (!apply.isPresent()) {
                            RuntimeException runtimeException = new RuntimeException("You must specify a deadline for deferred events.");
                            KafkaEventQueue.this.lock.unlock();
                            return runtimeException;
                        }
                        break;
                }
                if (apply.isPresent()) {
                    long asLong = apply.getAsLong();
                    long longValue = this.deadlineMap.isEmpty() ? ConfluentConfigs.PASSWORD_ENCODER_OLD_SECRET_TTL_MS_DEFAULT : this.deadlineMap.firstKey().longValue();
                    while (this.deadlineMap.putIfAbsent(Long.valueOf(asLong), eventContext) != null) {
                        asLong++;
                    }
                    eventContext.deadlineNs = OptionalLong.of(asLong);
                    if (asLong <= longValue) {
                        z = true;
                    }
                }
                if (z) {
                    this.cond.signal();
                }
                return null;
            } finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }

        void cancelDeferred(String str) {
            KafkaEventQueue.this.lock.lock();
            try {
                EventContext eventContext = this.tagToEventContext.get(str);
                if (eventContext != null) {
                    remove(eventContext);
                }
            } finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }

        void wakeUp() {
            KafkaEventQueue.this.lock.lock();
            try {
                KafkaEventQueue.this.eventHandler.cond.signal();
            } finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }
    }

    public KafkaEventQueue(Time time, LogContext logContext, String str) {
        this.time = time;
        this.log = logContext.logger(KafkaEventQueue.class);
        this.eventHandlerThread = new KafkaThread(str + "EventHandler", this.eventHandler, false);
        this.eventHandlerThread.start();
    }

    @Override // org.apache.kafka.queue.EventQueue
    public void enqueue(EventQueue.EventInsertionType eventInsertionType, String str, Function<OptionalLong, OptionalLong> function, EventQueue.Event event) {
        EventContext eventContext = new EventContext(event, eventInsertionType, str);
        Exception enqueue = this.eventHandler.enqueue(eventContext, function);
        if (enqueue != null) {
            eventContext.completeWithException(enqueue);
        }
    }

    @Override // org.apache.kafka.queue.EventQueue
    public void cancelDeferred(String str) {
        this.eventHandler.cancelDeferred(str);
    }

    @Override // org.apache.kafka.queue.EventQueue
    public void beginShutdown(String str, EventQueue.Event event, long j, TimeUnit timeUnit) {
        if (j < 0) {
            throw new IllegalArgumentException("beginShutdown must be called with a non-negative timeout.");
        }
        Objects.requireNonNull(event);
        this.lock.lock();
        try {
            if (this.cleanupEvent != null) {
                this.log.debug("{}: Event queue is already shutting down.", str);
                this.lock.unlock();
                return;
            }
            this.log.info("{}: shutting down event queue.", str);
            this.cleanupEvent = event;
            long nanoseconds = this.time.nanoseconds() + timeUnit.toNanos(j);
            if (this.closingTimeNs >= nanoseconds) {
                this.closingTimeNs = nanoseconds;
            }
            this.eventHandler.cond.signal();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.kafka.queue.EventQueue
    public void wakeup() {
        this.eventHandler.wakeUp();
    }

    @Override // org.apache.kafka.queue.EventQueue, java.lang.AutoCloseable
    public void close() throws InterruptedException {
        beginShutdown("KafkaEventQueue#close");
        this.eventHandlerThread.join();
        this.log.info("closed event queue.");
    }
}
