package io.confluent.telemetry.collector;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import io.confluent.shaded.com.google.common.annotations.VisibleForTesting;
import io.confluent.telemetry.MetricKey;
import io.confluent.telemetry.emitter.Emitter;
import io.confluent.telemetry.metrics.SinglePointMetric;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import kafka.controller.PartitionSLOMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/telemetry/collector/SLOMetricsCollector.class */
public class SLOMetricsCollector implements MetricsCollector {
    private static final String FIXED_RATE_SUFFIX = "fixed_rate";
    private static final int MAX_SIZE = 10000;
    private static final int POLLING_PERIOD_MS = 30000;
    private final ScheduledExecutorService executor;
    private final MetricsRegistry metricsRegistry;
    private final MetricNamingStrategy<MetricName> metricNamingStrategy;
    private final Clock clock;

    @VisibleForTesting
    final Queue<MetricAndTimestamp> polledMetrics;

    @VisibleForTesting
    final AtomicInteger polledSize;
    private static final Logger log = LoggerFactory.getLogger(SLOMetricsCollector.class);
    private static final List<String> SLO_METRICS = Arrays.asList(PartitionSLOMetrics.GlobalPartitionAvailabilityMetric(), PartitionSLOMetrics.GlobalUnderMinIsrPartitionCountMetric(), PartitionSLOMetrics.TenantPartitionAvailabilitySLOMetric());
    private static final Predicate<MetricName> SLO_METRICS_PREDICATE = metricName -> {
        return "KafkaController".equals(metricName.getType()) && SLO_METRICS.contains(metricName.getName());
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/telemetry/collector/SLOMetricsCollector$MetricAndTimestamp.class */
    public static class MetricAndTimestamp {
        final SinglePointMetric metric;
        final Instant timestamp;

        MetricAndTimestamp(SinglePointMetric singlePointMetric, Instant instant) {
            this.metric = singlePointMetric;
            this.timestamp = instant;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            MetricAndTimestamp metricAndTimestamp = (MetricAndTimestamp) obj;
            return this.metric.equals(metricAndTimestamp.metric) && this.timestamp.equals(metricAndTimestamp.timestamp);
        }

        public int hashCode() {
            return Objects.hash(this.metric, this.timestamp);
        }

        public String toString() {
            return "MetricAndTimestamp{metric=" + this.metric + ", timestamp=" + this.timestamp + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/telemetry/collector/SLOMetricsCollector$MetricKeyAndTimestamp.class */
    public static class MetricKeyAndTimestamp {
        final MetricKey metricKey;
        final Instant timestamp;

        MetricKeyAndTimestamp(MetricKey metricKey, Instant instant) {
            this.metricKey = metricKey;
            this.timestamp = instant;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            MetricKeyAndTimestamp metricKeyAndTimestamp = (MetricKeyAndTimestamp) obj;
            return this.metricKey.equals(metricKeyAndTimestamp.metricKey) && this.timestamp.equals(metricKeyAndTimestamp.timestamp);
        }

        public int hashCode() {
            return Objects.hash(this.metricKey, this.timestamp);
        }

        public String toString() {
            return "MetricKeyAndTimestamp{metricKey=" + this.metricKey + ", timestamp=" + this.timestamp + '}';
        }
    }

    public SLOMetricsCollector(MetricsRegistry metricsRegistry, MetricNamingStrategy<MetricName> metricNamingStrategy) {
        this(metricsRegistry, metricNamingStrategy, Clock.systemUTC());
    }

    @VisibleForTesting
    SLOMetricsCollector(MetricsRegistry metricsRegistry, MetricNamingStrategy<MetricName> metricNamingStrategy, Clock clock) {
        this.executor = Executors.newScheduledThreadPool(1);
        this.polledMetrics = new ConcurrentLinkedQueue();
        this.polledSize = new AtomicInteger(0);
        this.metricsRegistry = metricsRegistry;
        this.metricNamingStrategy = metricNamingStrategy;
        this.clock = clock;
    }

    @Override // io.confluent.telemetry.collector.MetricsCollector
    public void start() {
        log.info("Begin polling SLO metrics every 30 seconds");
        this.executor.scheduleAtFixedRate(this::pollMetrics, 30000L, 30000L, TimeUnit.MILLISECONDS);
    }

    @Override // io.confluent.telemetry.collector.MetricsCollector
    public void stop() {
        log.info("Shutting down SLO collector thread");
        this.executor.shutdown();
    }

    @Override // io.confluent.telemetry.collector.MetricsCollector
    public void collect(Emitter emitter) {
        if (this.polledSize.get() == 0) {
            return;
        }
        Instant truncatedTo = Instant.now(this.clock).truncatedTo(ChronoUnit.MINUTES);
        LinkedHashMap linkedHashMap = new LinkedHashMap(SLO_METRICS.size());
        Iterator<MetricAndTimestamp> it = this.polledMetrics.iterator();
        while (it.hasNext()) {
            MetricAndTimestamp next = it.next();
            if (next.timestamp.isBefore(truncatedTo)) {
                linkedHashMap.put(new MetricKeyAndTimestamp(next.metric.key(), next.timestamp), next.metric);
                it.remove();
                this.polledSize.decrementAndGet();
            }
        }
        linkedHashMap.forEach((metricKeyAndTimestamp, singlePointMetric) -> {
            log.trace("Emit SLO metric {} for timestamp {}", metricKeyAndTimestamp.metricKey, metricKeyAndTimestamp.timestamp);
            emitter.emitMetric(singlePointMetric);
        });
    }

    @VisibleForTesting
    void pollMetrics() {
        for (Map.Entry entry : this.metricsRegistry.allMetrics().entrySet()) {
            MetricName metricName = (MetricName) entry.getKey();
            Gauge gauge = (Metric) entry.getValue();
            if (SLO_METRICS_PREDICATE.test(metricName)) {
                MetricKey derivedMetricKey = this.metricNamingStrategy.derivedMetricKey(this.metricNamingStrategy.metricKey(metricName), FIXED_RATE_SUFFIX);
                if (gauge instanceof Gauge) {
                    Object value = gauge.value();
                    Instant truncatedTo = this.clock.instant().truncatedTo(ChronoUnit.MINUTES);
                    if (value instanceof Number) {
                        enqueueMetric(new MetricAndTimestamp(SinglePointMetric.gauge(derivedMetricKey, (Number) value, truncatedTo), truncatedTo));
                    } else if (value instanceof Boolean) {
                        enqueueMetric(new MetricAndTimestamp(SinglePointMetric.gauge(derivedMetricKey, ((Boolean) value).booleanValue() ? 1L : 0L, truncatedTo), truncatedTo));
                    } else {
                        log.debug("Ignoring {} value = {}", metricName, value);
                    }
                } else {
                    log.debug("Ignoring {} metric = {}", metricName, gauge);
                }
            }
        }
    }

    private void enqueueMetric(MetricAndTimestamp metricAndTimestamp) {
        if (this.polledSize.incrementAndGet() < MAX_SIZE) {
            this.polledMetrics.offer(metricAndTimestamp);
        } else {
            log.warn("Dropping metric {}, queue is full!", metricAndTimestamp);
        }
    }
}
