package io.confluent.telemetry.events.exporter.kafka.async;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.telemetry.events.exporter.Exporter;
import java.io.Closeable;
import java.time.Clock;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/telemetry/events/exporter/kafka/async/AsyncKafkaExporter.class */
public class AsyncKafkaExporter<T> implements Exporter<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AsyncKafkaExporter.class);
    private final EmitterThread<T> emitterThread;
    private final BlockingDeque<Tuple<T>> buffer;
    private final ExecutorService executorService;
    private final Producer<String, byte[]> producer;
    private volatile boolean isClosed;

    /* loaded from: input_file:io/confluent/telemetry/events/exporter/kafka/async/AsyncKafkaExporter$Builder.class */
    public static class Builder<T> {
        protected Properties producerProperties;
        protected int bufferSize = 500;
        protected TopicSupplier<T> topicSupplier;
        protected Producer<String, byte[]> kafkaProducer;
        protected Function<T, ProducerRecord<String, byte[]>> dataSerializer;

        protected Builder() {
        }

        public Builder<T> withProducerProperties(Properties properties) {
            this.producerProperties = properties;
            return this;
        }

        public Builder<T> withBufferSize(int i) {
            this.bufferSize = i;
            return this;
        }

        public Builder<T> withTopicSupplier(TopicSupplier<T> topicSupplier) {
            this.topicSupplier = topicSupplier;
            return this;
        }

        public Builder<T> withDataSerializer(Function<T, ProducerRecord<String, byte[]>> function) {
            this.dataSerializer = function;
            return this;
        }

        public AsyncKafkaExporter<T> build() {
            Objects.requireNonNull(this.producerProperties);
            return new AsyncKafkaExporter<>(this.producerProperties, this.bufferSize, this.topicSupplier, this.dataSerializer);
        }
    }

    /* loaded from: input_file:io/confluent/telemetry/events/exporter/kafka/async/AsyncKafkaExporter$EmitterThread.class */
    private static class EmitterThread<T> implements Runnable, Closeable {
        private static final int ERROR_LOG_INTERVAL_MS = 5000;
        private final Producer<String, byte[]> producer;
        private final TopicSupplier<T> topicSupplier;
        private final BlockingDeque<Tuple<T>> deque;
        private final Function<T, ProducerRecord<String, byte[]>> dataSerializer;
        protected final AtomicLong droppedEventCount = new AtomicLong();
        protected final AtomicReference<Exception> droppedEventException = new AtomicReference<>();
        protected long lastLoggedTimestamp = 0;
        protected long lastLoggedCount = 0;
        private volatile boolean isClosed = false;

        public EmitterThread(Producer<String, byte[]> producer, TopicSupplier<T> topicSupplier, BlockingDeque<Tuple<T>> blockingDeque, Function<T, ProducerRecord<String, byte[]>> function) {
            this.producer = producer;
            this.dataSerializer = function;
            this.topicSupplier = topicSupplier;
            this.deque = blockingDeque;
        }

        @Override // java.lang.Runnable
        public void run() {
            Tuple<T> tuple = null;
            while (!this.isClosed) {
                try {
                    try {
                        tuple = this.deque.take();
                        if (tuple.data == null && tuple.future.isDone()) {
                            break;
                        } else {
                            produce(tuple);
                        }
                    } catch (Throwable th) {
                        AsyncKafkaExporter.log.error("Exception while getting record from queue, Events would not be emitted anymore.", th);
                        if (tuple != null) {
                            tuple.future.completeExceptionally(th);
                        }
                        while (true) {
                            Tuple<T> poll = this.deque.poll();
                            if (poll == null) {
                                return;
                            } else {
                                poll.future.complete(false);
                            }
                        }
                    }
                } catch (Throwable th2) {
                    while (true) {
                        Tuple<T> poll2 = this.deque.poll();
                        if (poll2 == null) {
                            break;
                        } else {
                            poll2.future.complete(false);
                        }
                    }
                    throw th2;
                }
            }
            while (true) {
                Tuple<T> poll3 = this.deque.poll();
                if (poll3 == null) {
                    return;
                } else {
                    poll3.future.complete(false);
                }
            }
        }

        public void produce(Tuple<T> tuple) {
            try {
                if (!this.topicSupplier.topic(tuple.data).isPresent()) {
                    this.droppedEventCount.incrementAndGet();
                    maybeLogException();
                    tuple.future.complete(false);
                    return;
                }
                synchronized (this.producer) {
                    if (Thread.currentThread().isInterrupted() || this.isClosed) {
                        tuple.future.complete(false);
                    } else {
                        if (AsyncKafkaExporter.log.isTraceEnabled()) {
                            AsyncKafkaExporter.log.trace("Generated telemetry message : {}", tuple.data);
                        }
                        this.producer.send(this.dataSerializer.apply(tuple.data), (recordMetadata, exc) -> {
                            if (exc != null) {
                                this.droppedEventCount.incrementAndGet();
                                this.droppedEventException.compareAndSet(null, exc);
                                tuple.future.completeExceptionally(exc);
                            }
                        });
                    }
                    maybeLogException();
                }
            } catch (Throwable th) {
                tuple.future.completeExceptionally(th);
            }
        }

        private void maybeLogException() {
            long j = this.droppedEventCount.get();
            long j2 = j - this.lastLoggedCount;
            if (j2 > 0) {
                long j3 = this.lastLoggedTimestamp + WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT;
                long millis = Clock.systemUTC().millis();
                if (j3 < j3) {
                    AsyncKafkaExporter.log.warn("Failed to produce {} telemetry messages {}", Long.valueOf(j2), this.droppedEventException.getAndSet(null));
                    this.lastLoggedTimestamp = millis;
                    this.lastLoggedCount = j;
                }
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.producer != null) {
                synchronized (this.producer) {
                    this.isClosed = true;
                    this.producer.close(Duration.ofSeconds(15L));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/telemetry/events/exporter/kafka/async/AsyncKafkaExporter$Tuple.class */
    public static class Tuple<T> {
        final T data;
        final CompletableFuture<Boolean> future;

        Tuple(T t, CompletableFuture<Boolean> completableFuture) {
            this.data = t;
            this.future = completableFuture;
        }
    }

    @VisibleForTesting
    protected AsyncKafkaExporter(int i, TopicSupplier<T> topicSupplier, Producer<String, byte[]> producer, Function<T, ProducerRecord<String, byte[]>> function) {
        this.isClosed = false;
        if (i < 1) {
            throw new IllegalArgumentException("Buffer size can not be less than 1");
        }
        this.buffer = new LinkedBlockingDeque(i);
        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("async-kafka-exporter-%d").build());
        this.producer = producer;
        this.emitterThread = new EmitterThread<>(this.producer, topicSupplier, this.buffer, function);
        this.executorService.submit(this.emitterThread);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AsyncKafkaExporter(Properties properties, int i, TopicSupplier<T> topicSupplier, Function<T, ProducerRecord<String, byte[]>> function) {
        this(i, topicSupplier, new KafkaProducer(properties), function);
    }

    public static <T> Builder<T> newBuilder() {
        return new Builder<>();
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public Set<String> reconfigurableConfigs() {
        return null;
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void reconfigure(Map<String, ?> map) {
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
    }

    @Override // io.confluent.telemetry.events.exporter.Exporter
    public CompletableFuture<Boolean> emit(T t) {
        if (this.isClosed) {
            return CompletableFuture.completedFuture(false);
        }
        Tuple<T> tuple = new Tuple<>(t, new CompletableFuture());
        return !this.buffer.offer(tuple) ? CompletableFuture.completedFuture(false) : tuple.future;
    }

    @VisibleForTesting
    public BlockingDeque<Tuple<T>> buffer() {
        return this.buffer;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.buffer.offer(new Tuple<>(null, CompletableFuture.completedFuture(true)));
        this.isClosed = true;
        this.emitterThread.close();
        this.executorService.shutdown();
    }

    public Object producer() {
        return this.producer;
    }
}
