package org.apache.kafka.connect.runtime.tracing;

import io.confluent.shaded.org.slf4j.Logger;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.TransformationStage;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;

@Confluent
/* loaded from: input_file:org/apache/kafka/connect/runtime/tracing/AbstractTracer.class */
public abstract class AbstractTracer implements Tracer {
    protected final TracerConfig tracerConfig;
    protected final Producer<byte[], byte[]> traceProducer;
    protected final TopicAdmin admin;
    private final TracingContext tracingContext;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    private final List<TransformationStage<TraceRecord>> traceRecordTransformations;
    private final Logger log;
    private final Set<String> skipTopicsCreationSet = ConcurrentHashMap.newKeySet();
    private TraceRecordBuilder currentTraceRecordBuilder;

    public AbstractTracer(ConnectorTaskId connectorTaskId, TracerConfig tracerConfig, Producer<byte[], byte[]> producer, TopicAdmin topicAdmin) {
        this.tracerConfig = tracerConfig;
        this.traceProducer = producer;
        this.admin = topicAdmin;
        this.keyConverter = tracerConfig.keyConverter();
        this.valueConverter = tracerConfig.valueConverter();
        this.headerConverter = tracerConfig.headerConverter();
        this.traceRecordTransformations = Collections.unmodifiableList(tracerConfig.traceTransformations());
        this.tracingContext = new TracingContext(connectorTaskId, tracerConfig);
        this.log = new LogContext(String.format("[%s-tracer-%s] ", connectorTaskId.toString(), tracingContext().traceID())).logger(AbstractTracer.class);
    }

    private RecordHeaders convertHeaders(TraceRecord traceRecord) {
        Headers headers = traceRecord.headers();
        RecordHeaders recordHeaders = new RecordHeaders();
        if (headers != null) {
            String str = traceRecord.topic();
            for (Header header : headers) {
                String key = header.key();
                recordHeaders.add(key, this.headerConverter.fromConnectHeader(str, key, header.schema(), header.value()));
            }
        }
        return recordHeaders;
    }

    private ProducerRecord<byte[], byte[]> convertAndTransform(TraceRecord traceRecord, boolean z, boolean z2) {
        ProducerRecord<byte[], byte[]> convertAndTransform;
        this.log.trace("Begin transformation and conversion of TraceRecord {}", traceRecord);
        if (z) {
            try {
                Iterator<TransformationStage<TraceRecord>> it = this.traceRecordTransformations.iterator();
                while (it.hasNext()) {
                    traceRecord = it.next().apply(traceRecord);
                    if (traceRecord == null) {
                        return null;
                    }
                }
            } catch (Exception e) {
                if (!z2) {
                    throw new ConnectorTracingException("Connector Tracing failed to convert and transform record ", e);
                }
                this.log.debug("Conversion/transformation on trace record failed with an exception. Failed record can be logged by switching the log level to TRACE Creating new trace record for the encountered exception. Please check the trace configs ", (Throwable) e);
                convertAndTransform = convertAndTransform(traceRecordBuilder().createTracingError(traceRecord, e), false, false);
            }
        }
        byte[] fromConnectData = this.keyConverter.fromConnectData(traceRecord.topic(), traceRecord.keySchema(), traceRecord.key());
        byte[] fromConnectData2 = this.valueConverter.fromConnectData(traceRecord.topic(), traceRecord.valueSchema(), traceRecord.value());
        RecordHeaders convertHeaders = convertHeaders(traceRecord);
        this.log.trace("Successfully created ProducerRecord for traceRecord");
        convertAndTransform = new ProducerRecord<>(traceRecord.topic(), traceRecord.kafkaPartition(), ConnectUtils.checkAndConvertTimestamp(traceRecord.timestamp()), fromConnectData, fromConnectData2, convertHeaders);
        return convertAndTransform;
    }

    private void maybeCreateTopic(String str) {
        try {
            if (this.skipTopicsCreationSet.contains(str)) {
                return;
            }
            this.log.info("Attempting to create new or find existing trace topic: '{}'", str);
            if (this.admin.createOrFindTopic(TopicAdmin.defineTopic(str).partitions(this.tracerConfig.getInt(TracerConfig.TOPIC_PARTITION_CONFIG).intValue()).replicationFactor(this.tracerConfig.getShort(TracerConfig.TOPIC_REPLICATION_FACTOR).shortValue()).config(this.tracerConfig.originalsWithPrefix(TracerConfig.TOPIC_CONFIGS_PREFIX)).build())) {
                this.skipTopicsCreationSet.add(str);
                this.log.info("Trace topic '{}' created/existed on broker, commencing message production", str);
            }
        } catch (Exception e) {
            if (e.getCause() instanceof RetriableException) {
                this.log.debug("Creation/lookup of trace topic '{}' failed but will be retried in next iteration. Current trace message will be sent to broker assuming failure conditions may be resolved shortly.", str, e);
            } else {
                this.log.warn("Creation/lookup of trace topic '{}' failed. Skipping creation and assuming broker side topic creation is enabled", str, e);
                this.skipTopicsCreationSet.add(str);
            }
        }
    }

    public abstract TraceRecordBuilder newTraceRecordBuilder();

    @Override // org.apache.kafka.connect.runtime.tracing.Tracer
    public Future<RecordMetadata> writeTraceRecord(TraceRecord traceRecord, Callback callback) {
        try {
            ProducerRecord<byte[], byte[]> convertAndTransform = convertAndTransform(traceRecord, true, true);
            if (convertAndTransform != null) {
                maybeCreateTopic(convertAndTransform.topic());
                return this.traceProducer.send(convertAndTransform, (recordMetadata, exc) -> {
                    if (null == exc) {
                        this.log.trace("Successfully sent trace record {}", recordMetadata);
                    } else {
                        this.log.trace("Error {} occurred while sending record {}", exc, recordMetadata);
                    }
                    if (callback != null) {
                        callback.onCompletion(recordMetadata, exc);
                    }
                });
            }
        } catch (RetriableException e) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Exception sending trace record {}", traceRecord, e);
            } else {
                this.log.error("Exception sending trace record. (Enable trace log level to see records.)", (Throwable) e);
            }
        } catch (ConnectorTracingException e2) {
            throw e2;
        } catch (Exception e3) {
            throw new ConnectorTracingException("Failed to write trace record ", e3);
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.kafka.connect.runtime.tracing.Tracer
    public TraceRecordBuilder traceRecordBuilder() {
        return this.currentTraceRecordBuilder;
    }

    @Override // org.apache.kafka.connect.runtime.tracing.Tracer
    public List<TraceRecord> buildRecords() {
        List<TraceRecord> build = traceRecordBuilder().build();
        this.log.debug("Trace record build returned {} records", Integer.valueOf(build.size()));
        this.currentTraceRecordBuilder = newTraceRecordBuilder();
        return build;
    }

    @Override // org.apache.kafka.connect.runtime.tracing.Tracer
    public void start() {
        this.log.info("Starting tracer for {}", this.tracingContext.connectorTaskId());
        this.currentTraceRecordBuilder = newTraceRecordBuilder();
    }

    @Override // org.apache.kafka.connect.runtime.tracing.Tracer
    public final TracingContext tracingContext() {
        return this.tracingContext;
    }

    @Override // org.apache.kafka.connect.runtime.tracing.Tracer, java.lang.AutoCloseable
    public void close() {
        try {
            this.log.info("Closing producer and topic admin for connector tracing");
            if (null != traceRecordBuilder()) {
                buildAndWriteRecords();
            }
            this.traceProducer.close(Duration.ofSeconds(30L));
            this.admin.close(Duration.ofSeconds(30L));
            this.log.info("Successfully stopped tracer for {} ", this.tracingContext.connectorTaskId());
        } catch (Exception e) {
            throw new ConnectorTracingException("Failed to stop tracer for " + this.tracingContext.connectorTaskId(), e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.Tracer
    public TracerConfig tracerConfig() {
        return this.tracerConfig;
    }
}
