package io.confluent.ksql.metrics;

import io.confluent.common.utils.Time;
import io.confluent.ksql.metrics.TopicSensors;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.shaded.com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.connect.runtime.tracing.TraceRecordBuilderImpl;

/* loaded from: input_file:io/confluent/ksql/metrics/ProducerCollector.class */
public class ProducerCollector implements MetricCollector, ProducerInterceptor<Object, Object> {
    public static final String PRODUCER_MESSAGES_PER_SEC = "messages-per-sec";
    public static final String PRODUCER_TOTAL_MESSAGES = "total-messages";
    private MetricCollectors metricsCollectors;
    private Metrics metrics;
    private final Map<String, TopicSensors<ProducerRecord<Object, Object>>> topicSensors = new HashMap();
    private String id;
    private Time time;

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        configure((String) map.get("client.id"), (MetricCollectors) Objects.requireNonNull(map.get(KsqlConfig.KSQL_INTERNAL_METRIC_COLLECTORS_CONFIG)));
    }

    void configure(String str, MetricCollectors metricCollectors) {
        this.metricsCollectors = metricCollectors;
        this.metrics = metricCollectors.getMetrics();
        this.id = metricCollectors.addCollector(str, this);
        this.time = metricCollectors.getTime();
    }

    @Override // org.apache.kafka.clients.producer.ProducerInterceptor
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception exc) {
    }

    @Override // org.apache.kafka.clients.producer.ProducerInterceptor
    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> producerRecord) {
        collect(producerRecord, false);
        return producerRecord;
    }

    private void collect(ProducerRecord<Object, Object> producerRecord, boolean z) {
        collect(z, producerRecord.topic().toLowerCase());
    }

    private void collect(boolean z, String str) {
        this.topicSensors.computeIfAbsent(getKey(str), str2 -> {
            return new TopicSensors(str, buildSensors(str2));
        }).increment(null, z);
    }

    private List<TopicSensors.SensorMetric<ProducerRecord<Object, Object>>> buildSensors(String str) {
        ArrayList arrayList = new ArrayList();
        synchronized (this.metrics) {
            addSensor(str, PRODUCER_MESSAGES_PER_SEC, new Rate(), arrayList);
            addSensor(str, PRODUCER_TOTAL_MESSAGES, new CumulativeSum(), arrayList);
        }
        return arrayList;
    }

    private void addSensor(String str, String str2, MeasurableStat measurableStat, List<TopicSensors.SensorMetric<ProducerRecord<Object, Object>>> list) {
        String str3 = "prod-" + str + "-" + str2 + "-" + this.id;
        MetricName metricName = new MetricName(str2, "producer-metrics", "producer-" + str3, ImmutableMap.of(TraceRecordBuilderImpl.KEY, str, "id", this.id));
        Sensor sensor = this.metrics.getSensor(str3);
        final Sensor sensor2 = this.metrics.sensor(str3);
        if (sensor == null || this.metrics.metrics().get(metricName) == null) {
            sensor2.add(metricName, measurableStat);
        }
        list.add(new TopicSensors.SensorMetric<ProducerRecord<Object, Object>>(sensor2, this.metrics.metrics().get(metricName), this.time, false) { // from class: io.confluent.ksql.metrics.ProducerCollector.1
            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // io.confluent.ksql.metrics.TopicSensors.SensorMetric
            public void record(ProducerRecord<Object, Object> producerRecord) {
                sensor2.record(1.0d);
                super.record((AnonymousClass1) producerRecord);
            }
        });
    }

    private String getKey(String str) {
        return str;
    }

    @Override // org.apache.kafka.clients.producer.ProducerInterceptor, java.lang.AutoCloseable
    public void close() {
        this.metricsCollectors.remove(this.id);
        this.topicSensors.values().forEach(topicSensors -> {
            topicSensors.close(this.metrics);
        });
    }

    @Override // io.confluent.ksql.metrics.MetricCollector
    public Collection<TopicSensors.Stat> stats(String str, boolean z) {
        return MetricUtils.stats(str, z, this.topicSensors.values());
    }

    @Override // io.confluent.ksql.metrics.MetricCollector
    public double aggregateStat(String str, boolean z) {
        return MetricUtils.aggregateStat(str, z, this.topicSensors.values());
    }

    public String toString() {
        return getClass().getSimpleName() + " " + this.id + " " + this.topicSensors.toString();
    }
}
