package io.confluent.shaded.io.confluent.telemetry.events.exporter.http;

import io.confluent.shaded.com.google.common.annotations.VisibleForTesting;
import io.confluent.shaded.com.google.common.collect.Sets;
import io.confluent.shaded.com.google.protobuf.MessageLite;
import io.confluent.shaded.io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClient;
import io.confluent.shaded.io.confluent.telemetry.client.BufferingAsyncTelemetryHttpClientBatchResult;
import io.confluent.shaded.io.confluent.telemetry.client.Credentials;
import io.confluent.shaded.io.confluent.telemetry.client.ProxyConfig;
import io.confluent.shaded.io.confluent.telemetry.events.exporter.Exporter;
import io.confluent.shaded.io.confluent.telemetry.events.exporter.ExporterConfig;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/shaded/io/confluent/telemetry/events/exporter/http/HttpExporter.class */
public class HttpExporter<Data, Req extends MessageLite, Resp extends MessageLite> implements Exporter<Data> {
    private static final Logger log = LoggerFactory.getLogger(HttpExporter.class);
    protected Function<Collection<Data>, Req> requestConverter;
    protected Function<ByteBuffer, Resp> responseDeserializer;
    protected BufferingAsyncTelemetryHttpClient<Data, Req, Resp> bufferingClient;
    protected String endpoint;
    private HttpExporterConfig config;
    private volatile Credentials credentials;
    private volatile ProxyConfig proxyConfig;

    public void configure(Map<String, ?> map) {
        this.config = new HttpExporterConfig(map);
        setClientAndSubscribe(this.config.getBufferingAsyncClientBuilder().setClient(this.config.getClientBuilder().setResponseDeserializer(this.responseDeserializer).setEndpoint(this.endpoint).setCredentialsSupplier(() -> {
            return this.credentials;
        }).setProxyConfigSupplier(() -> {
            return this.proxyConfig;
        }).build()).setCreateRequestFn(this.requestConverter).build());
        this.bufferingClient.getBatchResults().doOnNext(this::trackResponses);
        setDynamicFields(this.config);
    }

    private void setDynamicFields(HttpExporterConfig httpExporterConfig) {
        this.credentials = httpExporterConfig.getCredentials();
        this.proxyConfig = httpExporterConfig.getProxyConfig();
    }

    @VisibleForTesting
    void setClientAndSubscribe(BufferingAsyncTelemetryHttpClient<Data, Req, Resp> bufferingAsyncTelemetryHttpClient) {
        this.bufferingClient = bufferingAsyncTelemetryHttpClient;
        this.bufferingClient.getBatchResults().doOnNext(this::trackResponses);
    }

    private void trackResponses(BufferingAsyncTelemetryHttpClientBatchResult<Data, Resp> bufferingAsyncTelemetryHttpClientBatchResult) {
        if (bufferingAsyncTelemetryHttpClientBatchResult.isSuccess()) {
            return;
        }
        log.error("Confluent Telemetry Failure", bufferingAsyncTelemetryHttpClientBatchResult.getThrowable());
    }

    @Override // io.confluent.shaded.io.confluent.telemetry.events.exporter.Exporter
    public CompletableFuture<Boolean> emit(Data data) {
        if (this.credentials == null) {
            return CompletableFuture.completedFuture(false);
        }
        this.bufferingClient.submit(Collections.singleton(data));
        return CompletableFuture.completedFuture(true);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this.bufferingClient.close();
    }

    public void reconfigure(Map<String, ?> map) {
        setDynamicFields(new HttpExporterConfig(map));
    }

    public Set<String> reconfigurableConfigs() {
        return Sets.union(ExporterConfig.RECONFIGURABLES, HttpExporterConfig.RECONFIGURABLE_CONFIGS);
    }

    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
    }
}
