package io.confluent.telemetry;

import io.confluent.shaded.com.google.common.base.Verify;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.Metric;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.MetricDescriptor;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.Point;
import io.confluent.telemetry.collector.MetricsCollector;
import io.confluent.telemetry.exporter.AbstractExporter;
import io.confluent.telemetry.exporter.Exporter;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/telemetry/MetricsCollectorTask.class */
public class MetricsCollectorTask implements MetricsCollector {
    private static final Logger log = LoggerFactory.getLogger(MetricsCollectorTask.class);
    private static final String COLLECTED_TOTAL_METRIC_NAME = MetricsUtils.fullMetricName("io.confluent.telemetry", "metrics", "collected/delta");
    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    private final Context context;
    private final Supplier<Collection<Exporter>> exportersSupplier;
    private final Collection<MetricsCollector> collectors;
    private final long collectIntervalMs;
    private volatile Predicate<MetricKey> metricsPredicate;

    /* loaded from: input_file:io/confluent/telemetry/MetricsCollectorTask$CompositeExporter.class */
    private static class CompositeExporter extends AbstractExporter {
        private final Supplier<Collection<Exporter>> exportersSupplier;
        private final AtomicLong collectedMetricsCount = new AtomicLong();

        public CompositeExporter(Supplier<Collection<Exporter>> supplier) {
            this.exportersSupplier = supplier;
        }

        public long getCountAndReset() {
            return this.collectedMetricsCount.getAndSet(0L);
        }

        @Override // io.confluent.telemetry.exporter.AbstractExporter, io.confluent.telemetry.exporter.Exporter
        public void reconfigurePredicate(Predicate<MetricKey> predicate) {
        }

        @Override // io.confluent.telemetry.exporter.AbstractExporter, io.confluent.telemetry.exporter.Exporter
        public void doEmit(MetricKey metricKey, Metric metric) {
            boolean z = false;
            Iterator<Exporter> it = this.exportersSupplier.get().iterator();
            while (it.hasNext()) {
                if (it.next().emit(metricKey, metric)) {
                    z = true;
                }
            }
            if (z) {
                this.collectedMetricsCount.incrementAndGet();
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() {
        }
    }

    public MetricsCollectorTask(Context context, Supplier<Collection<Exporter>> supplier, Collection<MetricsCollector> collection, long j, Predicate<MetricKey> predicate) {
        Verify.verify(j > 0, "collection interval cannot be less than 1", new Object[0]);
        this.exportersSupplier = (Supplier) Objects.requireNonNull(supplier);
        this.collectors = (Collection) Objects.requireNonNull(collection);
        this.context = (Context) Objects.requireNonNull(context);
        this.collectIntervalMs = j;
        this.metricsPredicate = predicate;
        this.executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.executor.setThreadFactory(runnable -> {
            Thread thread = new Thread(runnable, "confluent-telemetry-metrics-collector-task-scheduler");
            thread.setDaemon(true);
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                log.error("Uncaught exception in thread '{}':", thread2.getName(), th);
            });
            return thread;
        });
    }

    public void start() {
        log.info("Starting Confluent telemetry reporter with an interval of {} ms for resource = (type = {})", Long.valueOf(this.collectIntervalMs), this.context.getResource().getType());
        log.debug("Telemetry reporter resource labels: {}", this.context.getResource().getLabelsMap());
        schedule();
    }

    private void schedule() {
        this.executor.scheduleAtFixedRate(this::collectAndExport, this.collectIntervalMs, this.collectIntervalMs, TimeUnit.MILLISECONDS);
    }

    private void collectAndExport() {
        this.collectors.forEach(this::collectAndExport);
    }

    private void collectAndExport(MetricsCollector metricsCollector) {
        try {
            CompositeExporter compositeExporter = new CompositeExporter(this.exportersSupplier);
            Throwable th = null;
            try {
                try {
                    metricsCollector.collect(compositeExporter);
                    long countAndReset = compositeExporter.getCountAndReset();
                    log.trace("Collected {} metrics from {}", Long.valueOf(countAndReset), metricsCollector);
                    compositeExporter.getClass();
                    emitMetricsCollectedMetric(metricsCollector, countAndReset, compositeExporter::emit);
                    if (compositeExporter != null) {
                        if (0 != 0) {
                            try {
                                compositeExporter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            compositeExporter.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            log.error("Error while collecting metrics for collector = {})", metricsCollector, th4);
        }
    }

    public void close() {
        this.executor.shutdown();
    }

    private void emitMetricsCollectedMetric(MetricsCollector metricsCollector, long j, BiConsumer<MetricKey, Metric> biConsumer) {
        String simpleName = metricsCollector.getClass().getSimpleName();
        HashMap hashMap = new HashMap();
        hashMap.put(MetricsCollector.LABEL_COLLECTOR, simpleName);
        if (this.context.isDebugEnabled()) {
            hashMap.put(MetricsCollector.LABEL_LIBRARY, "none");
        }
        MetricKey metricKey = new MetricKey(COLLECTED_TOTAL_METRIC_NAME, hashMap);
        if (this.metricsPredicate.test(metricKey)) {
            biConsumer.accept(metricKey, this.context.metricWithSinglePointTimeseries(COLLECTED_TOTAL_METRIC_NAME, MetricDescriptor.Type.CUMULATIVE_INT64, hashMap, Point.newBuilder().setTimestamp(MetricsUtils.now()).setInt64Value(j).build()));
        }
    }

    @Override // io.confluent.telemetry.collector.MetricsCollector
    public void collect(Exporter exporter) {
    }

    @Override // io.confluent.telemetry.collector.MetricsCollector
    public void reconfigurePredicate(Predicate<MetricKey> predicate) {
        this.metricsPredicate = predicate;
    }
}
