package org.apache.kafka.coordinator.group.runtime;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.runtime.EventAccumulator;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.class */
public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
    private final Logger log;
    private final List<EventProcessorThread> threads;
    private volatile boolean shuttingDown = false;
    private final EventAccumulator<TopicPartition, CoordinatorEvent> accumulator = new EventAccumulator<>();

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor$EventProcessorThread.class */
    private class EventProcessorThread extends Thread {
        private final Logger log;

        EventProcessorThread(String str) {
            super(str);
            this.log = new LogContext("[" + str + "]: ").logger(EventProcessorThread.class);
            setDaemon(false);
        }

        private void handleEvents() {
            while (!MultiThreadedEventProcessor.this.shuttingDown) {
                CoordinatorEvent coordinatorEvent = (CoordinatorEvent) MultiThreadedEventProcessor.this.accumulator.poll();
                if (coordinatorEvent != null) {
                    try {
                        this.log.debug("Executing event: {}.", coordinatorEvent);
                        coordinatorEvent.run();
                    } catch (Throwable th) {
                        this.log.error("Failed to run event {} due to: {}.", coordinatorEvent, th.getMessage(), th);
                        coordinatorEvent.complete(th);
                    } finally {
                        MultiThreadedEventProcessor.this.accumulator.done(coordinatorEvent);
                    }
                }
            }
        }

        private void drainEvents() {
            EventAccumulator.Event poll = MultiThreadedEventProcessor.this.accumulator.poll(0L, TimeUnit.MILLISECONDS);
            while (true) {
                CoordinatorEvent coordinatorEvent = (CoordinatorEvent) poll;
                if (coordinatorEvent == null) {
                    return;
                }
                try {
                    this.log.debug("Draining event: {}.", coordinatorEvent);
                    coordinatorEvent.complete(new RejectedExecutionException("EventProcessor is closed."));
                } catch (Throwable th) {
                    this.log.error("Failed to reject event {} due to: {}.", coordinatorEvent, th.getMessage(), th);
                } finally {
                    MultiThreadedEventProcessor.this.accumulator.done(coordinatorEvent);
                }
                poll = MultiThreadedEventProcessor.this.accumulator.poll(0L, TimeUnit.MILLISECONDS);
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.log.info("Starting");
            try {
                handleEvents();
            } catch (Throwable th) {
                this.log.error("Exiting with exception.", th);
            }
            if (MultiThreadedEventProcessor.this.shuttingDown) {
                this.log.info("Shutting down. Draining the remaining events.");
                try {
                    drainEvents();
                } catch (Throwable th2) {
                    this.log.error("Draining threw exception.", th2);
                }
                this.log.info("Shutdown completed");
            }
        }
    }

    public MultiThreadedEventProcessor(LogContext logContext, String str, int i) {
        this.log = logContext.logger(MultiThreadedEventProcessor.class);
        this.threads = (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new EventProcessorThread(str + i2);
        }).collect(Collectors.toList());
        this.threads.forEach((v0) -> {
            v0.start();
        });
    }

    @Override // org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor
    public void enqueue(CoordinatorEvent coordinatorEvent) throws RejectedExecutionException {
        this.accumulator.add(coordinatorEvent);
    }

    public synchronized void beginShutdown() {
        if (this.shuttingDown) {
            this.log.debug("Event processor is already shutting down.");
            return;
        }
        this.log.info("Shutting down event processor.");
        this.accumulator.close();
        this.shuttingDown = true;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        beginShutdown();
        Iterator<EventProcessorThread> it = this.threads.iterator();
        while (it.hasNext()) {
            it.next().join();
        }
        this.log.info("Event processor closed.");
    }
}
