package io.confluent.telemetry.client;

import io.confluent.shaded.com.google.common.base.Preconditions;
import io.confluent.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.shaded.com.google.protobuf.MessageLite;
import io.confluent.shaded.io.reactivex.BackpressureOverflowStrategy;
import io.confluent.shaded.io.reactivex.BackpressureStrategy;
import io.confluent.shaded.io.reactivex.Flowable;
import io.confluent.shaded.io.reactivex.Observable;
import io.confluent.shaded.io.reactivex.Scheduler;
import io.confluent.shaded.io.reactivex.Single;
import io.confluent.shaded.io.reactivex.disposables.Disposable;
import io.confluent.shaded.io.reactivex.schedulers.Schedulers;
import io.confluent.shaded.io.reactivex.subjects.PublishSubject;
import io.confluent.shaded.io.reactivex.subjects.Subject;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/* loaded from: input_file:io/confluent/telemetry/client/BufferingAsyncTelemetryHttpClient.class */
public class BufferingAsyncTelemetryHttpClient<T, S extends MessageLite, R> implements AutoCloseable {
    public static final int DEFAULT_MAX_BATCH_SIZE = 1000;
    public static final int DEFAULT_MAX_PENDING_BATCHES = 10;
    public static final int DEFAULT_MAX_INFLIGHT_SUBMISSIONS = 1;
    private final Subject<T> incomingSubject;
    private final Flowable<BufferingAsyncTelemetryHttpClientBatchResult<T, R>> statusFlowable;
    private final Disposable subscription;
    private final Duration maxBatchDuration;
    private final int maxBatchSize;
    private final TelemetryHttpClient<S, R> client;
    private final Scheduler scheduler;
    private final int maxPendingBatches;
    private final int maxInflightSubmissions;
    private final Function<Collection<T>, S> createRequestFn;
    private final BufferingAsyncTelemetryHttpClientStatsCollector statsCollector;
    public static final Duration DEFAULT_MAX_BATCH_DURATION = Duration.ofSeconds(5);
    public static final Scheduler DEFAULT_SCHEDULER = Schedulers.from(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("buffering-async-telemetry-http-client-%d").build()));
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) BufferingAsyncTelemetryHttpClient.class);

    /* loaded from: input_file:io/confluent/telemetry/client/BufferingAsyncTelemetryHttpClient$Builder.class */
    public static final class Builder<T, S extends MessageLite, R> {
        private Scheduler scheduler;
        private Duration maxBatchDuration;
        private int maxBatchSize;
        private TelemetryHttpClient<S, R> client;
        private int maxPendingBatches;
        private int maxInflightSubmissions;
        private Function<Collection<T>, S> createRequestFn;

        private Builder() {
            this.scheduler = BufferingAsyncTelemetryHttpClient.DEFAULT_SCHEDULER;
            this.maxBatchDuration = BufferingAsyncTelemetryHttpClient.DEFAULT_MAX_BATCH_DURATION;
            this.maxBatchSize = 1000;
            this.maxPendingBatches = 10;
            this.maxInflightSubmissions = 1;
        }

        public Builder<T, S, R> setMaxBatchDuration(Duration duration) {
            Preconditions.checkArgument(duration.toMillis() > 0);
            this.maxBatchDuration = duration;
            return this;
        }

        public Builder<T, S, R> setMaxBatchSize(int i) {
            Preconditions.checkArgument(i > 0);
            this.maxBatchSize = i;
            return this;
        }

        public Builder<T, S, R> setClient(TelemetryHttpClient<S, R> telemetryHttpClient) {
            Preconditions.checkNotNull(telemetryHttpClient);
            this.client = telemetryHttpClient;
            return this;
        }

        public Builder<T, S, R> setScheduler(Scheduler scheduler) {
            Preconditions.checkNotNull(scheduler);
            this.scheduler = scheduler;
            return this;
        }

        public Builder<T, S, R> setMaxPendingBatches(int i) {
            Preconditions.checkArgument(i > 0);
            this.maxPendingBatches = i;
            return this;
        }

        public Builder<T, S, R> setMaxInflightSubmissions(int i) {
            Preconditions.checkArgument(i > 0);
            this.maxInflightSubmissions = i;
            return this;
        }

        public Builder<T, S, R> setCreateRequestFn(Function<Collection<T>, S> function) {
            this.createRequestFn = function;
            return this;
        }

        public BufferingAsyncTelemetryHttpClient<T, S, R> build() {
            Preconditions.checkState(this.client != null, "Must specify a client when building %s", BufferingAsyncTelemetryHttpClient.class);
            Preconditions.checkState(this.createRequestFn != null, "Must specify a createRequestFn when building %s", BufferingAsyncTelemetryHttpClient.class);
            return new BufferingAsyncTelemetryHttpClient<>(this);
        }
    }

    private BufferingAsyncTelemetryHttpClient(Builder<T, S, R> builder) {
        this.statsCollector = new BufferingAsyncTelemetryHttpClientStatsCollector();
        this.client = ((Builder) builder).client;
        this.scheduler = ((Builder) builder).scheduler;
        this.maxBatchDuration = ((Builder) builder).maxBatchDuration;
        this.maxBatchSize = ((Builder) builder).maxBatchSize;
        this.maxPendingBatches = ((Builder) builder).maxPendingBatches;
        this.maxInflightSubmissions = ((Builder) builder).maxInflightSubmissions;
        this.createRequestFn = ((Builder) builder).createRequestFn;
        this.incomingSubject = PublishSubject.create().toSerialized();
        logger.debug("Creating {} with args: maxBatchDurationMs[{}], maxBatchSize[{}], maxPendingBatches[{}],maxInflightSubmissions[{}]", BufferingAsyncTelemetryHttpClient.class.getSimpleName(), Long.valueOf(this.maxBatchDuration.toMillis()), Integer.valueOf(this.maxBatchSize), Integer.valueOf(this.maxPendingBatches), Integer.valueOf(this.maxInflightSubmissions));
        this.statusFlowable = this.incomingSubject.buffer(this.maxBatchDuration.toMillis(), TimeUnit.MILLISECONDS, this.scheduler, this.maxBatchSize).filter(list -> {
            return list.size() > 0;
        }).map(list2 -> {
            return new BufferingAsyncTelemetryHttpClientBatch(this.statsCollector.recordBatch(list2.size()), list2);
        }).toFlowable(BackpressureStrategy.MISSING).onBackpressureBuffer(this.maxPendingBatches, () -> {
            this.statsCollector.recordDroppedBatch();
            logger.debug("Dropping batch due to backpressure.");
        }, BackpressureOverflowStrategy.DROP_OLDEST).flatMapSingle(bufferingAsyncTelemetryHttpClientBatch -> {
            Instant now = Instant.now();
            long batchId = bufferingAsyncTelemetryHttpClientBatch.getBatchId();
            List<T> items = bufferingAsyncTelemetryHttpClientBatch.getItems();
            return this.client.submit(this.createRequestFn.apply(items)).map(telemetryHttpResponse -> {
                this.statsCollector.recordSuccessfulBatch(items.size(), now);
                return new BufferingAsyncTelemetryHttpClientBatchResult(bufferingAsyncTelemetryHttpClientBatch, Duration.between(now, Instant.now()), telemetryHttpResponse, null);
            }).onErrorResumeNext(th -> {
                this.statsCollector.recordFailedBatch(items.size(), now);
                logger.debug(String.format("Failed to submit batch %s", Long.valueOf(batchId)), th);
                return Single.just(new BufferingAsyncTelemetryHttpClientBatchResult(bufferingAsyncTelemetryHttpClientBatch, Duration.between(now, Instant.now()), null, th));
            });
        }, false, this.maxInflightSubmissions).share();
        this.subscription = this.statusFlowable.subscribe();
    }

    public static <T, S extends MessageLite, R> Builder<T, S, R> newBuilder() {
        return new Builder<>();
    }

    public void submit(Collection<T> collection) {
        Subject<T> subject = this.incomingSubject;
        Objects.requireNonNull(subject);
        collection.forEach(subject::onNext);
    }

    public Observable<BufferingAsyncTelemetryHttpClientBatchResult<T, R>> getBatchResults() {
        return this.statusFlowable.toObservable();
    }

    public BufferingAsyncTelemetryHttpClientStats stats() {
        return this.statsCollector.snapshot();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this.incomingSubject.onComplete();
        this.subscription.dispose();
        this.client.close();
    }
}
