package org.apache.kafka.connect.runtime.tracing;

import io.confluent.shaded.com.google.common.collect.ImmutableMap;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;

@Confluent
/* loaded from: input_file:org/apache/kafka/connect/runtime/tracing/TraceRecordBuilderImpl.class */
public class TraceRecordBuilderImpl implements TraceRecordBuilder {
    public static final String KEY = "key";
    public static final String VALUE = "value";
    public static final String HEADERS = "headers";
    public static final String TYPE = "type";
    public static final String TOPIC = "topic";
    public static final String PARTITION = "partition";
    public static final String CONNECTOR = "connector";
    public static final String RECORD = "record";
    public static final String METADATA = "metadata";
    public static final String ERROR = "error";
    public static final String ERROR_MESSAGE = "error_message";
    public static final String ERROR_TRACE = "error_trace";
    public static final String ADDITIONAL_ERROR_INFO = "error_info";
    public static final String SOURCE_TYPE = "SOURCE";
    public static final String SINK_TYPE = "SINK";
    private final TracingContext context;
    private final TracerConfig tracerConfig;
    private static final String RECORD_SCHEMA_NAME = "TRANSFORMED_RECORD";
    private static final String TRACE_SCHEMA_NAME = "TRACE_SCHEMA";
    private static final String TRACE_ERROR_SCHEMA_NAME = "TRACE_ERROR";
    private static final String ERROR_TYPE = "Type";
    private static final String SINK_RECORD_ERROR = "SinkRecord error";
    private static final String CONVERSION_ERROR = "Conversion Error";
    private static final String CONVERSION_TYPE = "Conversion Type";
    private static final String TASK_ERROR = "Task error";
    private static final String TRACING_ERROR = "Tracing error";
    private static final String FAILED_RECORD = "Failed Record";
    private static final String EXECUTING_CLASS = "Executing Class";
    private static final String ERROR_STAGE = "Stage";
    private static final String DATA_ERROR = "Data error";
    private static final String DATA_ERROR_INFO = "Info";
    public static final Schema ERROR_SCHEMA;
    public static final Schema ERROR_SCHEMA_WITH_TRACE;
    private static final String TRACE_METADATA_SCHEMA_NAME = "TRACE_METADATA";
    public static final String SOURCE_PARTITION = "source_partition";
    public static final String SOURCE_OFFSET = "source_offset";
    public static final String OFFSET = "offset";
    public static final String CORRELATION_ID = "correlation_id";
    public static final String CURRENT_STEP = "current_step";
    public static final String TOTAL_STEP = "total_step";
    public static final String TRANSFORMATION_NAME = "transformation_name";
    public static final String TRANSFORMATION_TYPE = "transformation_type";
    public static final Schema METADATA_SCHEMA = SchemaBuilder.struct().name(TRACE_METADATA_SCHEMA_NAME).field(SOURCE_PARTITION, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()).field(SOURCE_OFFSET, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()).field("topic", SchemaBuilder.OPTIONAL_STRING_SCHEMA).field("partition", SchemaBuilder.OPTIONAL_INT32_SCHEMA).field(OFFSET, SchemaBuilder.OPTIONAL_INT64_SCHEMA).field("connector", Schema.STRING_SCHEMA).field(CORRELATION_ID, SchemaBuilder.OPTIONAL_STRING_SCHEMA).field(CURRENT_STEP, Schema.OPTIONAL_INT32_SCHEMA).field(TOTAL_STEP, Schema.OPTIONAL_INT32_SCHEMA).field("type", SchemaBuilder.OPTIONAL_STRING_SCHEMA).field(TRANSFORMATION_NAME, SchemaBuilder.OPTIONAL_STRING_SCHEMA).field(TRANSFORMATION_TYPE, SchemaBuilder.OPTIONAL_STRING_SCHEMA).build();
    private final List<TraceRecord> records = new ArrayList();
    private final Map<String, Integer> recordStepCount = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/tracing/TraceRecordBuilderImpl$TraceMetadata.class */
    public static class TraceMetadata {
        private final Optional<Map<String, String>> sourcePartition;
        private final Optional<Map<String, String>> sourceOffset;
        private final Optional<String> type;
        private final Optional<Long> offset;
        private final Optional<String> topic;
        private final Optional<Integer> kafkaPartition;
        private final Optional<Long> timestamp;
        private static final TraceMetadata EMPTY = new TraceMetadata(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());

        private TraceMetadata(Optional<Map<String, String>> optional, Optional<Map<String, String>> optional2, Optional<String> optional3, Optional<Long> optional4, Optional<String> optional5, Optional<Integer> optional6, Optional<Long> optional7) {
            this.sourcePartition = optional;
            this.sourceOffset = optional2;
            this.type = optional3;
            this.offset = optional4;
            this.topic = optional5;
            this.kafkaPartition = optional6;
            this.timestamp = optional7;
        }

        public static TraceMetadata extractFromConnectRecord(ConnectRecord connectRecord) {
            if (connectRecord == null) {
                return EMPTY;
            }
            Optional empty = Optional.empty();
            Optional empty2 = Optional.empty();
            Optional empty3 = Optional.empty();
            Optional empty4 = Optional.empty();
            if (connectRecord instanceof SourceRecord) {
                empty = Optional.ofNullable(TraceRecordBuilderImpl.stringMap(((SourceRecord) connectRecord).sourcePartition()));
                empty2 = Optional.ofNullable(TraceRecordBuilderImpl.stringMap(((SourceRecord) connectRecord).sourceOffset()));
                empty3 = Optional.of(TraceRecordBuilderImpl.SOURCE_TYPE);
            } else if (connectRecord instanceof SinkRecord) {
                empty4 = Optional.of(Long.valueOf(((SinkRecord) connectRecord).kafkaOffset()));
                empty3 = Optional.of(TraceRecordBuilderImpl.SINK_TYPE);
            }
            return new TraceMetadata(empty, empty2, empty3, empty4, Optional.ofNullable(connectRecord.topic()), Optional.ofNullable(connectRecord.kafkaPartition()), Optional.ofNullable(connectRecord.timestamp()));
        }

        public static TraceMetadata extractFromConsumerRecord(ConsumerRecord<?, ?> consumerRecord) {
            return consumerRecord == null ? EMPTY : new TraceMetadata(Optional.empty(), Optional.empty(), Optional.of(TraceRecordBuilderImpl.SINK_TYPE), Optional.of(Long.valueOf(consumerRecord.offset())), Optional.of(consumerRecord.topic()), Optional.of(Integer.valueOf(consumerRecord.partition())), Optional.of(Long.valueOf(consumerRecord.timestamp())));
        }

        public Optional<Integer> kafkaPartition() {
            return this.kafkaPartition;
        }

        public Optional<String> topic() {
            return this.topic;
        }

        public Optional<Long> offset() {
            return this.offset;
        }

        public Optional<String> type() {
            return this.type;
        }

        public Optional<Map<String, String>> sourceOffset() {
            return this.sourceOffset;
        }

        public Optional<Map<String, String>> sourcePartition() {
            return this.sourcePartition;
        }

        public Optional<Long> timestamp() {
            return this.timestamp;
        }
    }

    public TraceRecordBuilderImpl(TracerConfig tracerConfig, TracingContext tracingContext) {
        this.context = tracingContext;
        this.tracerConfig = tracerConfig;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, String> stringMap(Map<String, ?> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return entry.getValue().toString();
        }));
    }

    private String stackTraceError(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter((Writer) stringWriter, true));
        return stringWriter.getBuffer().toString();
    }

    private void correlateAndIncrementStep(Struct struct) {
        String str = (String) Arrays.asList(this.context.traceID().toString(), this.context.connectorTaskId().toString(), this.context.processedRecordCount().toString()).stream().collect(Collectors.joining("-"));
        struct.put(CORRELATION_ID, str);
        Integer valueOf = Integer.valueOf(this.recordStepCount.getOrDefault(str, 0).intValue() + 1);
        struct.put(CURRENT_STEP, valueOf);
        struct.put(TOTAL_STEP, (Object) (-1));
        this.recordStepCount.put(str, valueOf);
    }

    private Struct createErrorObject(SchemaBuilder schemaBuilder, Throwable th, Optional<Map<String, String>> optional, boolean z) {
        Schema schema = z ? ERROR_SCHEMA_WITH_TRACE : ERROR_SCHEMA;
        schemaBuilder.field("error", schema);
        Struct struct = new Struct(schema);
        struct.put(ERROR_MESSAGE, th.getMessage());
        if (z) {
            struct.put(ERROR_TRACE, stackTraceError(th));
        }
        optional.ifPresent(map -> {
            struct.put(ADDITIONAL_ERROR_INFO, map);
        });
        return struct;
    }

    private Object actualOrNull(Schema schema, Object obj) {
        return (schema != null || obj == null) ? obj : obj.toString();
    }

    private <R extends ConnectRecord<R>> SchemaAndValue createSchemaAndValue(TraceMetadata traceMetadata, Optional<R> optional, Optional<String> optional2, Optional<String> optional3, Optional<Throwable> optional4, Optional<Map<String, String>> optional5, boolean z, boolean z2, boolean z3) {
        Optional ofNullable;
        Struct struct = new Struct(METADATA_SCHEMA);
        traceMetadata.sourcePartition().ifPresent(map -> {
            struct.put(SOURCE_PARTITION, map);
        });
        traceMetadata.sourceOffset().ifPresent(map2 -> {
            struct.put(SOURCE_OFFSET, map2);
        });
        traceMetadata.type().ifPresent(str -> {
            struct.put("type", str);
        });
        traceMetadata.offset().ifPresent(l -> {
            struct.put(OFFSET, l);
        });
        traceMetadata.topic().ifPresent(str2 -> {
            struct.put("topic", str2);
        });
        struct.put("connector", this.context.connectorTaskId().connector());
        traceMetadata.kafkaPartition().ifPresent(num -> {
            struct.put("partition", num);
        });
        optional2.ifPresent(str3 -> {
            struct.put(TRANSFORMATION_NAME, str3);
        });
        optional3.ifPresent(str4 -> {
            struct.put(TRANSFORMATION_TYPE, str4);
        });
        SchemaBuilder field = SchemaBuilder.struct().name(TRACE_SCHEMA_NAME).field("metadata", METADATA_SCHEMA);
        Optional<U> flatMap = optional4.flatMap(th -> {
            return Optional.of(createErrorObject(field, th, optional5, z3));
        });
        if (optional.isPresent() || z2) {
            Schema schema = (Schema) optional.map(connectRecord -> {
                return SchemaBuilder.struct().name(RECORD_SCHEMA_NAME).field(KEY, connectRecord.keySchema() != null ? connectRecord.keySchema() : Schema.OPTIONAL_STRING_SCHEMA).field("value", connectRecord.valueSchema() != null ? connectRecord.valueSchema() : Schema.OPTIONAL_STRING_SCHEMA).field("headers", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA).optional().build()).optional().build();
            }).orElse(Schema.OPTIONAL_STRING_SCHEMA);
            field.field("record", schema);
            ofNullable = Optional.ofNullable(optional.map(connectRecord2 -> {
                Struct struct2 = new Struct(schema);
                struct2.put(KEY, actualOrNull(connectRecord2.keySchema(), connectRecord2.key()));
                struct2.put("value", actualOrNull(connectRecord2.valueSchema(), connectRecord2.value()));
                struct2.put("headers", convertHeaders(connectRecord2.headers()));
                return struct2;
            }).orElse(null));
        } else {
            ofNullable = Optional.empty();
        }
        Schema build = field.build();
        Struct struct2 = new Struct(build);
        struct2.put("metadata", struct);
        flatMap.ifPresent(struct3 -> {
            struct2.put("error", struct3);
        });
        if (optional.isPresent() || z2) {
            struct2.put("record", ofNullable.orElse(null));
        }
        if (z) {
            correlateAndIncrementStep(struct);
        }
        return new SchemaAndValue(build, struct2);
    }

    private <R extends ConnectRecord<R>> SchemaAndValue createSchemaAndValue(TraceMetadata traceMetadata, Optional<R> optional, Optional<String> optional2, Optional<String> optional3, Optional<Throwable> optional4, Optional<Map<String, String>> optional5) {
        return createSchemaAndValue(traceMetadata, optional, optional2, optional3, optional4, optional5, true, false, true);
    }

    private Object convertHeaders(Headers headers) {
        if (headers == null) {
            return null;
        }
        return StreamSupport.stream(headers.spliterator(), false).collect(Collectors.toMap((v0) -> {
            return v0.key();
        }, header -> {
            if (header.value() != null) {
                return header.value().toString();
            }
            return null;
        }));
    }

    private void updateStepCountWithTotalSteps(TraceRecord traceRecord) {
        Struct struct = (Struct) ((Struct) traceRecord.value()).get("metadata");
        struct.put(TOTAL_STEP, this.recordStepCount.get(struct.getString(CORRELATION_ID)));
    }

    private <R extends ConnectRecord<R>> TraceRecord dataExceptionRecord(R r, TraceMetadata traceMetadata, DataException dataException, String str) {
        try {
            return createTraceRecord(createSchemaAndValue(traceMetadata, Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(dataException), Optional.of(ImmutableMap.of(ERROR_TYPE, DATA_ERROR, DATA_ERROR_INFO, str, FAILED_RECORD, r.toString())), true, false, false));
        } catch (Exception e) {
            throw new ConnectorTracingException("Failed to create trace record for a data exception, Context " + this.context, e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder
    public <R extends ConnectRecord<R>> TraceRecordBuilder appendRecord(R r) {
        TraceMetadata extractFromConnectRecord = TraceMetadata.extractFromConnectRecord(r);
        try {
            this.records.add(createTraceRecord(createSchemaAndValue(extractFromConnectRecord, Optional.ofNullable(r), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty())));
            return this;
        } catch (Exception e) {
            if (e instanceof DataException) {
                this.records.add(dataExceptionRecord(r, extractFromConnectRecord, (DataException) e, "Invalid Record"));
            }
            throw new ConnectorTracingException("Failed to append connect record, Context" + this.context, e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder
    public <R extends ConnectRecord<R>> TraceRecordBuilder appendTransformedRecord(String str, Class<?> cls, R r, R r2) {
        TraceMetadata extractFromConnectRecord = TraceMetadata.extractFromConnectRecord(r == null ? r2 : r);
        try {
            this.records.add(createTraceRecord(createSchemaAndValue(extractFromConnectRecord, Optional.ofNullable(r), Optional.of(str), Optional.of(cls.getName()), Optional.empty(), Optional.empty(), true, true, false)));
            return this;
        } catch (Exception e) {
            if (e instanceof DataException) {
                this.records.add(dataExceptionRecord(r, extractFromConnectRecord, (DataException) e, "Invalid Transformed Record"));
            }
            throw new ConnectorTracingException("Failed to append transformed record for transformation " + str + ", Context " + this.context, e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder
    public TraceRecordBuilder appendTransformationError(String str, Class<?> cls, Throwable th) {
        try {
            this.records.add(createTraceRecord(createSchemaAndValue(TraceMetadata.EMPTY, Optional.empty(), Optional.of(str), Optional.of(cls.getName()), Optional.of(th), Optional.empty())));
            return this;
        } catch (Exception e) {
            throw new ConnectorTracingException("Failed to append transformation error due to failed transformation " + str + " caused by: " + th.getMessage() + ", Context " + this.context, e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder
    public TraceRecordBuilder appendSourceConversionError(Stage stage, SourceRecord sourceRecord, Throwable th) {
        try {
            this.records.add(createTraceRecord(createSchemaAndValue(TraceMetadata.extractFromConnectRecord(sourceRecord), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(th), Optional.of(ImmutableMap.of(ERROR_TYPE, CONVERSION_ERROR, CONVERSION_TYPE, stage.toString())))));
            return this;
        } catch (Exception e) {
            throw new ConnectorTracingException("Failed to append source conversion error caused by " + th.getMessage() + ", Context" + this.context, e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder
    public TraceRecordBuilder appendSinkConversionError(Stage stage, ConsumerRecord consumerRecord, Throwable th) {
        try {
            this.records.add(createTraceRecord(createSchemaAndValue(TraceMetadata.extractFromConsumerRecord(consumerRecord), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(th), Optional.of(ImmutableMap.of(ERROR_TYPE, CONVERSION_ERROR, CONVERSION_TYPE, stage.toString())))));
            return this;
        } catch (Exception e) {
            throw new ConnectorTracingException("Failed to append sink conversion error  caused by: " + th.getMessage() + ", Context " + this.context, e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder
    public TraceRecord createSinkRecordError(SinkRecord sinkRecord, Throwable th) {
        TraceMetadata extractFromConnectRecord = TraceMetadata.extractFromConnectRecord(sinkRecord);
        try {
            return createTraceRecord(createSchemaAndValue(extractFromConnectRecord, Optional.of(sinkRecord), Optional.empty(), Optional.empty(), Optional.of(th), Optional.of(ImmutableMap.of(ERROR_TYPE, SINK_RECORD_ERROR)), false, true, true));
        } catch (Exception e) {
            if (e instanceof DataException) {
                this.records.add(dataExceptionRecord(sinkRecord, extractFromConnectRecord, (DataException) e, "Invalid Sink Record"));
            }
            throw new ConnectorTracingException("Failed to create sink record error for record:  caused by " + th.getMessage() + ", Context" + this.context, e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder
    public TraceRecordBuilder appendError(String str, Class<?> cls, Throwable th) {
        try {
            this.records.add(createTraceRecord(createSchemaAndValue(TraceMetadata.EMPTY, Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(th), Optional.of(ImmutableMap.of(ERROR_TYPE, TASK_ERROR, EXECUTING_CLASS, cls.getName(), ERROR_STAGE, str)))));
            return this;
        } catch (Exception e) {
            throw new ConnectorTracingException("Failed to append error on stage: " + str + " caused by: " + th.getMessage() + ", Context" + this.context, e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder
    public TraceRecord createTracingError(TraceRecord traceRecord, Throwable th) {
        try {
            return createTraceRecord(createSchemaAndValue(TraceMetadata.EMPTY, Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(th), Optional.of(ImmutableMap.of(ERROR_TYPE, TRACING_ERROR, FAILED_RECORD, traceRecord.toString())), false, false, false));
        } catch (Exception e) {
            throw new ConnectorTracingException("Failed to create error for trace record:  caused by " + th.getMessage() + ", Context" + this.context, e);
        }
    }

    @Override // org.apache.kafka.connect.runtime.tracing.TraceRecordBuilder
    public List<TraceRecord> build() {
        try {
            this.records.forEach(this::updateStepCountWithTotalSteps);
            return this.records;
        } catch (Exception e) {
            throw new ConnectorTracingException("Failed to build list of trace record, Context " + this.context, e);
        }
    }

    private <R extends ConnectRecord<R>> TraceRecord createTraceRecord(SchemaAndValue schemaAndValue) {
        return new TraceRecord(this.tracerConfig.traceTopic(), null, Schema.STRING_SCHEMA, this.context.connectorTaskId().toString(), schemaAndValue.schema(), schemaAndValue.value(), null, null);
    }

    static {
        SchemaBuilder field = SchemaBuilder.struct().optional().name(TRACE_ERROR_SCHEMA_NAME).field(ERROR_MESSAGE, Schema.STRING_SCHEMA).field(ADDITIONAL_ERROR_INFO, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional());
        ERROR_SCHEMA = field.build();
        ERROR_SCHEMA_WITH_TRACE = field.field(ERROR_TRACE, Schema.STRING_SCHEMA).build();
    }
}
