package io.confluent.controlcenter.streams;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.confluent.common.metrics.MeasurableStat;
import io.confluent.common.metrics.MetricName;
import io.confluent.common.metrics.Metrics;
import io.confluent.common.metrics.Sensor;
import io.confluent.common.metrics.stats.Count;
import io.confluent.common.metrics.stats.Max;
import io.confluent.common.metrics.stats.Min;
import io.confluent.common.metrics.stats.Rate;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.streams.StreamsModule;
import io.confluent.controlcenter.streams.aggregation.MetricEvent;
import io.confluent.controlcenter.streams.aggregation.MetricsAggregation;
import io.confluent.controlcenter.util.ClusterTopicPartition;
import io.confluent.controlcenter.util.MinMeasurableStat;
import io.confluent.metrics.record.ConfluentMetric;
import io.confluent.monitoring.record.Monitoring;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/WindowExtractor.class */
public class WindowExtractor implements TimestampExtractor {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WindowExtractor.class);
    private static String monitoringTopicName = null;
    private static String metricsTopicName = null;
    private static LoadingCache<ClusterTopicPartition, Sensor> metrics = null;

    @Inject
    public static void setTopicNames(@StreamsModule.C3StreamsMetrics final Metrics metrics2, TopicStoreMaster topicStoreMaster) {
        monitoringTopicName = topicStoreMaster.getMonitoringTopicName();
        metricsTopicName = topicStoreMaster.getMetricsTopicName();
        log.info("setting topic names {}", monitoringTopicName);
        metrics = CacheBuilder.newBuilder().build(new CacheLoader<ClusterTopicPartition, Sensor>() { // from class: io.confluent.controlcenter.streams.WindowExtractor.1
            private final MinMeasurableStat monitoringInputTopicGauges = new MinMeasurableStat();
            private final MinMeasurableStat metricsInputTopicGauges = new MinMeasurableStat();

            public synchronized Sensor getOrCreateSensor(String str, MinMeasurableStat minMeasurableStat, Sensor... sensorArr) {
                Sensor sensor = Metrics.this.getSensor(str);
                if (sensor == null) {
                    sensor = Metrics.this.sensor(str, sensorArr);
                    sensor.add(new MetricName(sensor.name() + ".count", "WindowExtractor"), new Count());
                    sensor.add(new MetricName(sensor.name() + ".rate", "WindowExtractor"), new Rate(new Count()));
                    Max max = new Max();
                    sensor.add(new MetricName(sensor.name() + ".timestamp.max", "WindowExtractor"), max);
                    sensor.add(new MetricName(sensor.name() + ".timestamp.min", "WindowExtractor"), new Min());
                    if (minMeasurableStat != null) {
                        minMeasurableStat.addGauge(max);
                    }
                }
                return sensor;
            }

            public synchronized Sensor getOrCreateSensor(String str, Map<String, String> map, MeasurableStat measurableStat) {
                Sensor sensor = Metrics.this.getSensor(str);
                if (sensor == null) {
                    sensor = Metrics.this.sensor(str);
                    sensor.add(new MetricName(sensor.name() + ".count", "WindowExtractor", map), new Count());
                    sensor.add(new MetricName(sensor.name() + ".rate", "WindowExtractor", map), new Rate(new Count()));
                    sensor.add(new MetricName(sensor.name() + ".timestamp", "WindowExtractor", map), new Min());
                    sensor.add(new MetricName(sensor.name() + ".min", "WindowExtractor", map), measurableStat);
                }
                return sensor;
            }

            @Override // com.google.common.cache.CacheLoader
            public Sensor load(ClusterTopicPartition clusterTopicPartition) throws Exception {
                Sensor orCreateSensor;
                synchronized (Metrics.this) {
                    String str = clusterTopicPartition.topicPartition.topic();
                    Sensor sensor = null;
                    MinMeasurableStat minMeasurableStat = null;
                    WindowExtractor.log.debug("making sensor for key={}", clusterTopicPartition);
                    if (clusterTopicPartition.cluster != null) {
                        if (WindowExtractor.monitoringTopicName != null && WindowExtractor.monitoringTopicName.equals(str)) {
                            minMeasurableStat = this.monitoringInputTopicGauges;
                            sensor = getOrCreateSensor("monitoring-input-topic-progress-" + clusterTopicPartition.cluster, ImmutableMap.of("input", "monitoring", "progress", "input-topic", MetricsAggregation.CLUSTER_DIMENSION, clusterTopicPartition.cluster), minMeasurableStat);
                        } else if (WindowExtractor.metricsTopicName != null && WindowExtractor.metricsTopicName.equals(str)) {
                            minMeasurableStat = this.metricsInputTopicGauges;
                            sensor = getOrCreateSensor("metrics-input-topic-progress-" + clusterTopicPartition.cluster, ImmutableMap.of("input", "metrics", "progress", "input-topic", MetricsAggregation.CLUSTER_DIMENSION, clusterTopicPartition.cluster), minMeasurableStat);
                        }
                    }
                    orCreateSensor = getOrCreateSensor(clusterTopicPartition.topicPartition.toString(), minMeasurableStat, getOrCreateSensor(str, minMeasurableStat, sensor == null ? (Sensor[]) null : new Sensor[]{sensor}));
                }
                return orCreateSensor;
            }
        });
    }

    @Override // org.apache.kafka.streams.processor.TimestampExtractor
    public long extract(ConsumerRecord<Object, Object> consumerRecord, long j) {
        long j2 = 0;
        String str = null;
        try {
            if (consumerRecord.value() instanceof Monitoring.MonitoringMessage) {
                j2 = ((Monitoring.MonitoringMessage) consumerRecord.value()).getWindow();
                str = ((Monitoring.MonitoringMessage) consumerRecord.value()).getClusterId();
            } else if (consumerRecord.value() instanceof Controlcenter.ClientGroup) {
                j2 = ((Controlcenter.ClientGroup) consumerRecord.value()).getWindow();
            } else if (consumerRecord.value() instanceof Controlcenter.WindowedGrouping) {
                j2 = ((Controlcenter.WindowedGrouping) consumerRecord.value()).getWindow();
            } else if (consumerRecord.value() instanceof Controlcenter.VerifiableMonitoringMessage) {
                Controlcenter.VerifiableMonitoringMessage verifiableMonitoringMessage = (Controlcenter.VerifiableMonitoringMessage) consumerRecord.value();
                j2 = verifiableMonitoringMessage.getMonitoringMessage().getWindow();
                str = verifiableMonitoringMessage.getMonitoringMessage().getClusterId();
            } else if (consumerRecord.value() instanceof Controlcenter.TriggerEvent) {
                j2 = ((Controlcenter.TriggerEvent) consumerRecord.value()).getWindow();
            } else if (consumerRecord.value() instanceof ConfluentMetric.MetricsMessage) {
                j2 = ((ConfluentMetric.MetricsMessage) consumerRecord.value()).getTimestamp();
                str = ((ConfluentMetric.MetricsMessage) consumerRecord.value()).getClusterId();
            } else if (consumerRecord.key() instanceof MetricEvent) {
                j2 = consumerRecord.timestamp();
            } else if (consumerRecord.key() instanceof Controlcenter.WindowedClusterGroup) {
                j2 = ((Controlcenter.WindowedClusterGroup) consumerRecord.key()).getWindow();
            } else {
                log.warn("unable to extract message timestamp: unknown message type");
            }
        } catch (Exception e) {
            log.warn("unable to extract message timestamp: error extracting timestamp", (Throwable) e);
        }
        ClusterTopicPartition clusterTopicPartition = new ClusterTopicPartition(str, new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
        if (metrics != null && j2 > 0) {
            metrics.getUnchecked(clusterTopicPartition).record(j2);
        }
        if (j2 > 0) {
            return j2;
        }
        if (j >= 0) {
            log.debug("Extracted timestamp {} <=0, will return previousTimestamp {}", Long.valueOf(j2), Long.valueOf(j));
            return j;
        }
        log.debug("Extracted timestamp {} <=0 and previousTimestamp < 0, will return 0", Long.valueOf(j2));
        return 0L;
    }
}
