package io.confluent.telemetry.exporter.http;

import io.confluent.shaded.com.google.common.annotations.VisibleForTesting;
import io.confluent.shaded.com.google.common.collect.ImmutableMap;
import io.confluent.shaded.com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.shaded.com.google.protobuf.Timestamp;
import io.confluent.shaded.io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClient;
import io.confluent.shaded.io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClientBatchResult;
import io.confluent.shaded.io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClientStats;
import io.confluent.shaded.io.confluent.telemetry.client.TelemetryHttpClient;
import io.confluent.shaded.io.opencensus.proto.agent.metrics.v1.ExportMetricsServiceRequest;
import io.confluent.shaded.io.opencensus.proto.agent.metrics.v1.ExportMetricsServiceResponse;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.Metric;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.MetricDescriptor;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.Point;
import io.confluent.telemetry.Context;
import io.confluent.telemetry.MetricKey;
import io.confluent.telemetry.MetricsUtils;
import io.confluent.telemetry.collector.MetricsCollector;
import io.confluent.telemetry.collector.MetricsCollectorProvider;
import io.confluent.telemetry.exporter.AbstractExporter;
import io.confluent.telemetry.exporter.Exporter;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/telemetry/exporter/http/HttpExporter.class */
public class HttpExporter extends AbstractExporter implements MetricsCollectorProvider {
    private static final String METRIC_GROUP = "exporter/http";
    private static final String BATCHES_TOTAL_METRIC_NAME = MetricsUtils.fullMetricName("io.confluent.telemetry", METRIC_GROUP, "batches_total");
    private static final String ITEMS_TOTAL_METRIC_NAME = MetricsUtils.fullMetricName("io.confluent.telemetry", METRIC_GROUP, "items_total");
    private static final String SEND_TIME_SEC_METRIC_NAME = MetricsUtils.fullMetricName("io.confluent.telemetry", METRIC_GROUP, "send_time_seconds");
    private static final Logger log = LoggerFactory.getLogger(HttpExporter.class);
    private static final Double SECONDS_PER_MILLISECOND = Double.valueOf(0.001d);
    private static final Function<Collection<Metric>, ExportMetricsServiceRequest> REQUEST_CONVERTER = collection -> {
        return ExportMetricsServiceRequest.newBuilder().addAllMetrics(collection).build();
    };
    private static final Function<ByteBuffer, ExportMetricsServiceResponse> RESPONSE_DESERIALIZER = byteBuffer -> {
        try {
            return ExportMetricsServiceResponse.parseFrom(byteBuffer);
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
    };
    private final BufferingAsyncTelemetryHttpClient<Metric, ExportMetricsServiceRequest, ExportMetricsServiceResponse> bufferingClient;
    public volatile boolean canEmitMetrics;

    public HttpExporter(HttpExporterConfig httpExporterConfig) {
        this(httpExporterConfig.getBufferingAsyncClientBuilder().setClient(httpExporterConfig.getClientBuilder().setResponseDeserializer(RESPONSE_DESERIALIZER).setEndpoint(TelemetryHttpClient.V1_METRICS_ENDPOINT).build()).setCreateRequestFn(REQUEST_CONVERTER).build());
        this.canEmitMetrics = httpExporterConfig.canEmitMetrics().booleanValue();
        reconfigurePredicate(httpExporterConfig.buildMetricsPredicate());
    }

    @VisibleForTesting
    HttpExporter(BufferingAsyncTelemetryHttpClient<Metric, ExportMetricsServiceRequest, ExportMetricsServiceResponse> bufferingAsyncTelemetryHttpClient) {
        this.canEmitMetrics = false;
        this.bufferingClient = bufferingAsyncTelemetryHttpClient;
        this.bufferingClient.getBatchResults().doOnNext(this::trackMetricResponses);
    }

    @VisibleForTesting
    void setCanEmitMetrics(boolean z) {
        this.canEmitMetrics = z;
    }

    private void trackMetricResponses(BufferingAsyncTelemetryHttpClientBatchResult<Metric, ExportMetricsServiceResponse> bufferingAsyncTelemetryHttpClientBatchResult) {
        if (bufferingAsyncTelemetryHttpClientBatchResult.isSuccess()) {
            return;
        }
        log.error("Confluent Telemetry Metrics Failure", bufferingAsyncTelemetryHttpClientBatchResult.getThrowable());
    }

    @VisibleForTesting
    BufferingAsyncTelemetryHttpClientStats stats() {
        return this.bufferingClient.stats();
    }

    @Override // io.confluent.telemetry.exporter.AbstractExporter, io.confluent.telemetry.exporter.Exporter
    public void doEmit(MetricKey metricKey, Metric metric) throws RuntimeException {
        if (this.canEmitMetrics) {
            this.bufferingClient.submit(Collections.singleton(metric));
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this.bufferingClient.close();
    }

    @Override // io.confluent.telemetry.collector.MetricsCollectorProvider
    public MetricsCollector collector(final Predicate<MetricKey> predicate, final Context context) {
        return new MetricsCollector() { // from class: io.confluent.telemetry.exporter.http.HttpExporter.1
            private volatile Predicate<MetricKey> metricsPredicate;

            {
                this.metricsPredicate = predicate;
            }

            @Override // io.confluent.telemetry.collector.MetricsCollector
            public void collect(Exporter exporter) {
                BufferingAsyncTelemetryHttpClientStats stats = HttpExporter.this.bufferingClient.stats();
                Timestamp now = MetricsUtils.now();
                HashMap hashMap = new HashMap();
                if (context.isDebugEnabled()) {
                    hashMap.put(MetricsCollector.LABEL_LIBRARY, "none");
                }
                ImmutableMap of = ImmutableMap.of("dropped", Long.valueOf(stats.getTotalDroppedBatches()), "success", Long.valueOf(stats.getTotalSuccessfulBatches()), "failed", Long.valueOf(stats.getTotalFailedBatches()));
                Context context2 = context;
                of.forEach((str, l) -> {
                    ImmutableMap build = ImmutableMap.builder().putAll(hashMap).put("status", str).build();
                    MetricKey metricKey = new MetricKey(HttpExporter.BATCHES_TOTAL_METRIC_NAME, build);
                    if (this.metricsPredicate.test(metricKey)) {
                        exporter.emit(metricKey, context2.metricWithSinglePointTimeseries(HttpExporter.BATCHES_TOTAL_METRIC_NAME, MetricDescriptor.Type.CUMULATIVE_INT64, build, Point.newBuilder().setTimestamp(now).setInt64Value(l.longValue()).build()));
                    }
                });
                ImmutableMap of2 = ImmutableMap.of("success", Long.valueOf(stats.getTotalSuccessfulItems()), "failed", Long.valueOf(stats.getTotalFailedItems()));
                Context context3 = context;
                of2.forEach((str2, l2) -> {
                    ImmutableMap build = ImmutableMap.builder().putAll(hashMap).put("status", str2).build();
                    MetricKey metricKey = new MetricKey(HttpExporter.ITEMS_TOTAL_METRIC_NAME, build);
                    if (this.metricsPredicate.test(metricKey)) {
                        exporter.emit(metricKey, context3.metricWithSinglePointTimeseries(HttpExporter.ITEMS_TOTAL_METRIC_NAME, MetricDescriptor.Type.CUMULATIVE_INT64, build, Point.newBuilder().setTimestamp(now).setInt64Value(l2.longValue()).build()));
                    }
                });
                MetricKey metricKey = new MetricKey(HttpExporter.SEND_TIME_SEC_METRIC_NAME, hashMap);
                if (this.metricsPredicate.test(metricKey)) {
                    exporter.emit(metricKey, context.metricWithSinglePointTimeseries(HttpExporter.SEND_TIME_SEC_METRIC_NAME, MetricDescriptor.Type.CUMULATIVE_DOUBLE, hashMap, Point.newBuilder().setTimestamp(now).setDoubleValue(stats.getTotalSendTimeMs() * HttpExporter.SECONDS_PER_MILLISECOND.doubleValue()).build()));
                }
            }

            @Override // io.confluent.telemetry.collector.MetricsCollector
            public void reconfigurePredicate(Predicate<MetricKey> predicate2) {
                this.metricsPredicate = predicate2;
            }
        };
    }

    public void reconfigure(HttpExporterConfig httpExporterConfig) {
        reconfigurePredicate(httpExporterConfig.buildMetricsPredicate());
        String string = httpExporterConfig.getString("api.key");
        String apiSecretOrEmpty = httpExporterConfig.getApiSecretOrEmpty();
        if (httpExporterConfig.canEmitMetrics().booleanValue()) {
            this.canEmitMetrics = true;
        } else {
            this.canEmitMetrics = false;
        }
        this.bufferingClient.updateCredentials(string, apiSecretOrEmpty);
        Optional map = Optional.ofNullable(httpExporterConfig.getString("proxy.url")).map(URI::create);
        BufferingAsyncTelemetryHttpClient<Metric, ExportMetricsServiceRequest, ExportMetricsServiceResponse> bufferingAsyncTelemetryHttpClient = this.bufferingClient;
        bufferingAsyncTelemetryHttpClient.getClass();
        map.ifPresent(bufferingAsyncTelemetryHttpClient::updateProxyUrl);
        this.bufferingClient.updateProxyCredentials(httpExporterConfig.getString("proxy.username"), httpExporterConfig.getString("proxy.password"));
    }

    @VisibleForTesting
    BufferingAsyncTelemetryHttpClient<Metric, ExportMetricsServiceRequest, ExportMetricsServiceResponse> getClient() {
        return this.bufferingClient;
    }
}
