package io.confluent.security.audit.telemetry.exporter;

import com.google.common.util.concurrent.Runnables;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import io.cloudevents.kafka.KafkaMessageFactory;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.events.EventLoggerConfig;
import io.confluent.telemetry.events.EventUtils;
import io.confluent.telemetry.events.Extensions;
import io.confluent.telemetry.events.exporter.Exporter;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Utils;
import org.codehaus.plexus.util.SelectorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/audit/telemetry/exporter/NonBlockingKafkaExporter.class */
public class NonBlockingKafkaExporter implements Exporter<Event> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NonBlockingKafkaExporter.class);
    private static final long TOPIC_READY_TIMEOUT_MS = 15000;
    private static final long TOPIC_PARTITION_TIMEOUT_MS = 1000;
    private KafkaProducer<String, byte[]> producer;
    private boolean createTopic;
    private Properties producerProperties;
    private String defaultRoute;
    private TopicManager topicManager;
    private Instant metadataRefreshed;
    private ScheduledThreadPoolExecutor metadataRefresh;
    private NonBlockingKafkaExporterConfig config;
    private volatile boolean isClosing = false;
    private Encoding encoding = Encoding.BINARY;
    private final EventFormat structuredEventFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.config = new NonBlockingKafkaExporterConfig(map);
        this.createTopic = this.config.getBoolean(NonBlockingKafkaExporterConfig.TOPIC_CREATE_CONFIG).booleanValue();
        this.defaultRoute = NonBlockingKafkaExporterConfig.DEFAULT_TOPIC;
        this.producerProperties = this.config.producerProperties();
        this.producer = new KafkaProducer<>(this.producerProperties);
        String string = new EventLoggerConfig(map).getString(EventLoggerConfig.CLOUD_EVENT_ENCODING_CONFIG);
        boolean z = -1;
        switch (string.hashCode()) {
            case -1388966911:
                if (string.equals("binary")) {
                    z = true;
                    break;
                }
                break;
            case 185106769:
                if (string.equals("structured")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.encoding = Encoding.STRUCTURED;
                break;
            case true:
                this.encoding = Encoding.BINARY;
                break;
            default:
                throw new RuntimeException("unknown encoding " + string);
        }
        this.topicManager = TopicManager.newBuilder().setAdminClientProperties(this.config.clientProperties(AdminClientConfig.configNames())).setDefaultTopicConfig(this.config.defaultTopicConfig()).setDefaultTopicPartitions(this.config.getInt(NonBlockingKafkaExporterConfig.TOPIC_PARTITIONS_CONFIG)).setDefaultTopicReplicas(this.config.getInt(NonBlockingKafkaExporterConfig.TOPIC_REPLICAS_CONFIG)).setTimeOutMs(this.config.getInt(NonBlockingKafkaExporterConfig.REQUEST_TIMEOUT_MS_CONFIG)).setTopics(this.config.getTopicSpecs()).build();
        this.metadataRefreshed = Instant.MIN;
        if (!this.config.getBoolean(NonBlockingKafkaExporterConfig.EVENT_LOGGER_LOG_BLOCKING_CONFIG).booleanValue()) {
            this.metadataRefresh = new ScheduledThreadPoolExecutor(1);
            long min = Math.min(new ProducerConfig(this.producerProperties).getLong("metadata.max.age.ms").longValue(), ProducerMetadata.TOPIC_EXPIRY_MS) / 2;
            this.metadataRefresh.scheduleAtFixedRate(this::ensureOrCheckTopicsWithMetadata, min, min, TimeUnit.MILLISECONDS);
        }
        ensureOrCheckTopicsWithMetadata();
    }

    @Override // io.confluent.telemetry.events.exporter.Exporter
    public CompletableFuture<Boolean> emit(Event event) {
        return emit(event, Runnables.doNothing(), Runnables.doNothing());
    }

    public CompletableFuture<Boolean> emit(Event event, Runnable runnable, Runnable runnable2) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        try {
            String route = route(event);
            if (!this.topicManager.topicExists(route)) {
                try {
                    Future<Boolean> ensureTopics = this.createTopic ? this.topicManager.ensureTopics() : this.topicManager.checkTopics();
                    if (!this.config.getBoolean(NonBlockingKafkaExporterConfig.EVENT_LOGGER_LOG_BLOCKING_CONFIG).booleanValue()) {
                        throw new RuntimeException("Topic " + route + " not found on cluster [" + this.config.getString(NonBlockingKafkaExporterConfig.BOOTSTRAP_SERVERS_CONFIG) + SelectorUtils.PATTERN_HANDLER_SUFFIX);
                    }
                    if (ensureTopics.get(15000L, TimeUnit.MILLISECONDS).booleanValue()) {
                        log.debug("all topics created successfully");
                    }
                } catch (Exception e) {
                    throw new RuntimeException("error while creating topics", e);
                }
            }
            if (Thread.currentThread().isInterrupted() || this.isClosing) {
                runnable2.run();
                log.warn("Failed to produce event log message because audit logger is closing. Message: {}", EventUtils.toJson(event));
                completableFuture.complete(false);
            } else {
                log.trace("Generated event log message : {}", event);
                this.producer.send(createProducerRecord(event, route), (recordMetadata, exc) -> {
                    if (exc == null) {
                        runnable.run();
                        log.debug("Produced event log message of size {} with offset {} to topic partition {}-{}", Integer.valueOf(recordMetadata.serializedValueSize()), Long.valueOf(recordMetadata.offset()), recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()));
                        completableFuture.complete(true);
                    } else {
                        runnable2.run();
                        exc.printStackTrace();
                        log.error("Failed to produce event log message: " + EventUtils.toJson(event));
                        completableFuture.completeExceptionally(exc);
                    }
                });
            }
        } catch (InterruptException e2) {
            completableFuture.completeExceptionally(e2);
        } catch (Throwable th) {
            runnable2.run();
            log.warn("Failed to produce event log message {}. Message: {}", th, EventUtils.toJson(event));
            completableFuture.completeExceptionally(th);
        }
        return completableFuture;
    }

    private ProducerRecord<String, byte[]> createProducerRecord(Event event, String str) {
        CloudEvent cloudEvent = EventUtils.toCloudEvent(event);
        MessageWriter createWriter = KafkaMessageFactory.createWriter(str, null);
        switch (this.encoding) {
            case STRUCTURED:
                return (ProducerRecord) createWriter.writeStructured(cloudEvent, this.structuredEventFormat);
            case BINARY:
                return (ProducerRecord) createWriter.writeBinary(cloudEvent);
            default:
                throw new RuntimeException("Unknown encoding " + this.encoding);
        }
    }

    @Override // io.confluent.telemetry.events.exporter.Exporter
    public boolean routeReady(Event event) {
        String route = route(event);
        boolean z = this.topicManager.topicExists(route);
        if (z || !this.topicManager.topicManaged(route)) {
            if (z) {
                return metadataReady(route);
            }
            return false;
        }
        if (this.createTopic) {
            this.topicManager.ensureTopics();
            return false;
        }
        this.topicManager.checkTopics();
        return false;
    }

    private boolean metadataReady(String str) {
        try {
            List<PartitionInfo> partitionsFor = this.producer.partitionsFor(str);
            if (!partitionsFor.isEmpty()) {
                log.debug("Event log topic {} is ready with {} partitions", str, Integer.valueOf(partitionsFor.size()));
                return true;
            }
        } catch (Exception e) {
            log.trace("Exception while checking for event log partitions", (Throwable) e);
        }
        log.debug("Event log topic {} is NOT ready", str);
        return false;
    }

    private String route(Event event) {
        return event.extensionNames().contains(Extensions.ROUTE) ? event.extension(Extensions.ROUTE) : this.defaultRoute;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.isClosing = true;
        if (this.metadataRefresh != null) {
            this.metadataRefresh.shutdownNow();
        }
        if (this.producer != null) {
            this.producer.flush();
            this.producer.close(Duration.ofMillis(0L));
        }
        if (this.topicManager != null) {
            this.topicManager.close();
        }
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public Set<String> reconfigurableConfigs() {
        return Utils.mkSet(NonBlockingKafkaExporterConfig.TOPIC_CONFIG);
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
        new NonBlockingKafkaExporterConfig(map).getTopicSpecs();
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void reconfigure(Map<String, ?> map) {
        new NonBlockingKafkaExporterConfig(map).getTopicSpecs().values().stream().forEach(topicSpec -> {
            this.topicManager.addTopic(topicSpec);
        });
        ensureOrCheckTopicsWithMetadata();
    }

    private void ensureOrCheckTopicsWithMetadata() {
        boolean z = false;
        try {
            z = (this.createTopic ? this.topicManager.ensureTopics() : this.topicManager.checkTopics()).get(15000L, TimeUnit.MILLISECONDS).booleanValue();
            if (z) {
                log.debug("all topics exist");
            }
        } catch (Exception e) {
            log.error("error while checking topics", (Throwable) e);
        }
        Set<String> managedTopics = this.topicManager.managedTopics();
        managedTopics.removeIf(this::metadataReady);
        long j = 0;
        if (this.createTopic || z) {
            while (!managedTopics.isEmpty() && j < 15000) {
                log.info("Event logger is waiting for metadata for topics: " + managedTopics);
                try {
                    j += 1000;
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                }
                managedTopics.removeIf(this::metadataReady);
            }
        }
        if (managedTopics.isEmpty()) {
            log.debug("Event logger has metadata for all topics");
        } else {
            log.warn("Event logger is missing metadata for topics: " + managedTopics);
        }
        this.metadataRefreshed = Instant.now();
    }

    public Instant lastMetadataRefresh() {
        return this.metadataRefreshed;
    }

    public NonBlockingKafkaExporterConfig config() {
        return this.config;
    }
}
