package io.confluent.telemetry.exporter.http;

import io.confluent.shaded.com.google.common.annotations.VisibleForTesting;
import io.confluent.shaded.io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClient;
import io.confluent.shaded.io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClientBatchResult;
import io.confluent.shaded.io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClientStats;
import io.confluent.shaded.io.confluent.telemetry.client.Credentials;
import io.confluent.shaded.io.confluent.telemetry.client.ProxyConfig;
import io.confluent.shaded.io.confluent.telemetry.client.TelemetryHttpClient;
import io.confluent.shaded.io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.confluent.shaded.io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.confluent.shaded.io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.confluent.telemetry.exporter.AbstractExporter;
import io.confluent.telemetry.metrics.SerializedMetric;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/telemetry/exporter/http/HttpExporter.class */
public class HttpExporter extends AbstractExporter {
    public static final String SELF_METRICS_GROUP = "HttpExporter";
    private static final Logger log = LoggerFactory.getLogger(HttpExporter.class);
    private static final Function<Collection<ResourceMetrics>, ExportMetricsServiceRequest> REQUEST_CONVERTER = collection -> {
        return ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(collection).build();
    };
    private BufferingAsyncTelemetryHttpClient<ResourceMetrics, ExportMetricsServiceRequest, ExportMetricsServiceResponse> bufferingClient;

    @VisibleForTesting
    volatile Credentials credentials;

    @VisibleForTesting
    volatile ProxyConfig proxyConfig;
    private final String name;

    public HttpExporter(String str, HttpExporterConfig httpExporterConfig) {
        this.name = (String) Objects.requireNonNull(str);
        this.bufferingClient = httpExporterConfig.getBufferingAsyncClientBuilder().setClient(httpExporterConfig.configureClientDefaults(new TelemetryHttpClient.Builder()).setRequestSerializer((outputStream, exportMetricsServiceRequest) -> {
            exportMetricsServiceRequest.writeTo(outputStream);
        }).setResponseDeserializer(ExportMetricsServiceResponse::parseFrom).setEndpoint(TelemetryHttpClient.V1_METRICS_ENDPOINT).setCredentialsSupplier(() -> {
            return this.credentials;
        }).setProxyConfigSupplier(() -> {
            return this.proxyConfig;
        }).build()).setCreateRequestFn(REQUEST_CONVERTER).build();
        this.bufferingClient.getBatchResults().doOnNext(this::trackMetricResponses);
        setDynamicFields(httpExporterConfig);
    }

    public void setDynamicFields(HttpExporterConfig httpExporterConfig) {
        this.credentials = httpExporterConfig.getCredentials();
        this.proxyConfig = httpExporterConfig.getProxyConfig();
    }

    private void trackMetricResponses(BufferingAsyncTelemetryHttpClientBatchResult<ResourceMetrics, ExportMetricsServiceResponse> bufferingAsyncTelemetryHttpClientBatchResult) {
        if (bufferingAsyncTelemetryHttpClientBatchResult.isSuccess()) {
            return;
        }
        log.error("Confluent Telemetry Metrics Failure", bufferingAsyncTelemetryHttpClientBatchResult.getThrowable());
    }

    @VisibleForTesting
    BufferingAsyncTelemetryHttpClientStats stats() {
        return this.bufferingClient.stats();
    }

    @Override // io.confluent.telemetry.exporter.AbstractExporter
    public void doEmit(SerializedMetric serializedMetric) {
        if (this.credentials == null) {
            return;
        }
        this.bufferingClient.submit(Collections.singleton(serializedMetric.metric()));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this.bufferingClient.close();
    }

    @Override // io.confluent.telemetry.exporter.Exporter
    public void setMetricsRegistry(Metrics metrics) {
        Map singletonMap = Collections.singletonMap("exporterName", this.name);
        metrics.addMetric(metrics.metricName("items-total", SELF_METRICS_GROUP, singletonMap), (metricConfig, j) -> {
            return this.bufferingClient.stats().getTotalItems();
        });
        metrics.addMetric(metrics.metricName("items-failed", SELF_METRICS_GROUP, singletonMap), (metricConfig2, j2) -> {
            return this.bufferingClient.stats().getTotalFailedItems();
        });
        metrics.addMetric(metrics.metricName("items-succeeded", SELF_METRICS_GROUP, singletonMap), (metricConfig3, j3) -> {
            return this.bufferingClient.stats().getTotalSuccessfulItems();
        });
        metrics.addMetric(metrics.metricName("batches-total", SELF_METRICS_GROUP, singletonMap), (metricConfig4, j4) -> {
            return this.bufferingClient.stats().getTotalBatches();
        });
        metrics.addMetric(metrics.metricName("batches-dropped", SELF_METRICS_GROUP, singletonMap), (metricConfig5, j5) -> {
            return this.bufferingClient.stats().getTotalDroppedBatches();
        });
        metrics.addMetric(metrics.metricName("batches-failed", SELF_METRICS_GROUP, singletonMap), (metricConfig6, j6) -> {
            return this.bufferingClient.stats().getTotalFailedBatches();
        });
        metrics.addMetric(metrics.metricName("batches-succeeded", SELF_METRICS_GROUP, singletonMap), (metricConfig7, j7) -> {
            return this.bufferingClient.stats().getTotalSuccessfulBatches();
        });
        metrics.addMetric(metrics.metricName("send-time-total-millis", SELF_METRICS_GROUP, singletonMap), (metricConfig8, j8) -> {
            return this.bufferingClient.stats().getTotalSendTimeMs();
        });
    }
}
