package io.confluent.telemetry;

import io.confluent.shaded.com.google.common.base.Verify;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.Metric;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.MetricDescriptor;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.Point;
import io.confluent.telemetry.collector.MetricsCollector;
import io.confluent.telemetry.exporter.Exporter;
import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/telemetry/MetricsCollectorTask.class */
public class MetricsCollectorTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MetricsCollectorTask.class);
    private final Context context;
    private final Set<Exporter> exporters;
    private final Collection<MetricsCollector> collectors;
    private final long collectIntervalMs;
    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    private final ConcurrentMap<MetricsCollector, AtomicLong> metricsCollected = new ConcurrentHashMap();
    private final ConcurrentMap<MetricsCollector, AtomicLong> metricsSent = new ConcurrentHashMap();

    public MetricsCollectorTask(Context context, Set<Exporter> set, Collection<MetricsCollector> collection, long j) {
        Verify.verify(j > 0, "collection interval cannot be less than 1", new Object[0]);
        this.exporters = (Set) Objects.requireNonNull(set);
        Verify.verify(!set.isEmpty(), "At least one exporter must be enabled", new Object[0]);
        this.collectors = (Collection) Objects.requireNonNull(collection);
        this.context = (Context) Objects.requireNonNull(context);
        this.collectIntervalMs = j;
        this.executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.executor.setThreadFactory(runnable -> {
            Thread thread = new Thread(runnable, "confluent-telemetry-metrics-collector-task-scheduler");
            thread.setDaemon(true);
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                log.error("Uncaught exception in thread '{}':", thread2.getName(), th);
            });
            return thread;
        });
    }

    private long metricsCollectedAddAndGet(MetricsCollector metricsCollector, long j) {
        return this.metricsCollected.getOrDefault(metricsCollector, new AtomicLong()).addAndGet(j);
    }

    private Metric buildMetricsCollectedMetric(MetricsCollector metricsCollector, long j) {
        String simpleName = metricsCollector.getClass().getSimpleName();
        HashMap hashMap = new HashMap();
        hashMap.put(MetricsCollector.LABEL_COLLECTOR, simpleName);
        if (this.context.isDebugEnabled()) {
            hashMap.put(MetricsCollector.LABEL_LIBRARY, "none");
        }
        return this.context.metricWithSinglePointTimeseries("io.confluent.telemetry/metrics_collector_task/metrics_collected_total", MetricDescriptor.Type.CUMULATIVE_INT64, hashMap, Point.newBuilder().setTimestamp(MetricsUtils.now()).setInt64Value(metricsCollectedAddAndGet(metricsCollector, j)).build());
    }

    private long metricsSent(MetricsCollector metricsCollector) {
        return this.metricsSent.getOrDefault(metricsCollector, new AtomicLong()).get();
    }

    private Metric buildMetricsSentMetric(MetricsCollector metricsCollector) {
        String simpleName = metricsCollector.getClass().getSimpleName();
        HashMap hashMap = new HashMap();
        hashMap.put(MetricsCollector.LABEL_COLLECTOR, simpleName);
        if (this.context.isDebugEnabled()) {
            hashMap.put(MetricsCollector.LABEL_LIBRARY, "none");
        }
        return this.context.metricWithSinglePointTimeseries("io.confluent.telemetry/metrics_collector_task/metrics_sent_total", MetricDescriptor.Type.CUMULATIVE_INT64, hashMap, Point.newBuilder().setTimestamp(MetricsUtils.now()).setInt64Value(metricsSent(metricsCollector)).build());
    }

    private void updateMetricsSent(MetricsCollector metricsCollector, int i) {
        this.metricsSent.getOrDefault(metricsCollector.getClass().getSimpleName(), new AtomicLong()).addAndGet(i);
    }

    public void start() {
        log.info("Starting Confluent telemetry reporter with an interval of {} ms", Long.valueOf(this.collectIntervalMs));
        schedule();
    }

    private void schedule() {
        this.executor.scheduleAtFixedRate(this::collectAndExport, this.collectIntervalMs, this.collectIntervalMs, TimeUnit.MILLISECONDS);
    }

    private void collectAndExport() {
        this.collectors.forEach(this::collectAndExport);
    }

    private void collectAndExport(MetricsCollector metricsCollector) {
        try {
            Collection<Metric> collect = metricsCollector.collect();
            log.trace("Collected {} metrics from {}", Integer.valueOf(collect.size()), metricsCollector);
            collect.add(buildMetricsCollectedMetric(metricsCollector, collect.size()));
            collect.add(buildMetricsSentMetric(metricsCollector));
            for (Exporter exporter : this.exporters) {
                try {
                    exporter.export(collect);
                } catch (Throwable th) {
                    log.error("Error exporting metrics via {}", exporter, th);
                }
            }
        } catch (Throwable th2) {
            log.error("Error while collecting metrics for {}", metricsCollector, th2);
        }
    }

    public void close() {
        this.executor.shutdown();
    }
}
