package io.confluent.kafkarest.response;

import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.RuntimeJsonMappingException;
import com.fasterxml.jackson.jaxrs.base.JsonMappingExceptionMapper;
import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import io.confluent.kafkarest.common.CompletableFutures;
import io.confluent.kafkarest.exceptions.BadRequestException;
import io.confluent.kafkarest.exceptions.RestConstraintViolationExceptionMapper;
import io.confluent.kafkarest.exceptions.StatusCodeException;
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
import io.confluent.kafkarest.exceptions.v3.V3ExceptionMapper;
import io.confluent.rest.entities.ErrorMessage;
import io.confluent.rest.exceptions.KafkaExceptionMapper;
import io.confluent.rest.exceptions.RestConstraintViolationException;
import io.confluent.rest.exceptions.WebApplicationExceptionMapper;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import org.glassfish.jersey.server.ChunkedOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse.class */
public abstract class StreamingResponse<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StreamingResponse.class);
    private static final CompositeErrorMapper EXCEPTION_MAPPER = new CompositeErrorMapper.Builder().putMapper(JsonMappingException.class, new JsonMappingExceptionMapper(), response -> {
        return Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode());
    }, response2 -> {
        return (String) response2.getEntity();
    }).putMapper(JsonParseException.class, new JsonParseExceptionMapper(), response3 -> {
        return Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode());
    }, response4 -> {
        return (String) response4.getEntity();
    }).putMapper(StatusCodeException.class, new V3ExceptionMapper(), response5 -> {
        return Integer.valueOf(((ErrorResponse) response5.getEntity()).getErrorCode());
    }, response6 -> {
        return ((ErrorResponse) response6.getEntity()).getMessage();
    }).putMapper(RestConstraintViolationException.class, new RestConstraintViolationExceptionMapper(), response7 -> {
        return Integer.valueOf(((ErrorResponse) response7.getEntity()).getErrorCode());
    }, response8 -> {
        return ((ErrorResponse) response8.getEntity()).getMessage();
    }).putMapper(WebApplicationException.class, new WebApplicationExceptionMapper(null), response9 -> {
        return Integer.valueOf(((ErrorMessage) response9.getEntity()).getErrorCode());
    }, response10 -> {
        return ((ErrorMessage) response10.getEntity()).getMessage();
    }).setDefaultMapper(new KafkaExceptionMapper(null), response11 -> {
        return Integer.valueOf(((ErrorMessage) response11.getEntity()).getErrorCode());
    }, response12 -> {
        return ((ErrorMessage) response12.getEntity()).getMessage();
    }).build();
    private final ChunkedOutputFactory chunkedOutputFactory;

    /* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse$AsyncResponseQueue.class */
    private static final class AsyncResponseQueue {
        private final ChunkedOutput<ResultOrError> sink;
        private CompletableFuture<Void> tail;
        private volatile boolean sinkClosed;

        private AsyncResponseQueue(ChunkedOutputFactory chunkedOutputFactory) {
            this.sinkClosed = false;
            this.sink = chunkedOutputFactory.getChunkedOutput();
            this.tail = CompletableFuture.completedFuture(null);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void asyncResume(AsyncResponse asyncResponse) {
            asyncResponse.resume(Response.ok(this.sink).build());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isClosed() {
            return this.sinkClosed;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void push(CompletableFuture<ResultOrError> completableFuture) {
            StreamingResponse.log.debug("Pushing to response queue");
            this.tail = CompletableFuture.allOf(this.tail, completableFuture).thenApply(r6 -> {
                try {
                    if (this.sinkClosed || this.sink.isClosed()) {
                        this.sinkClosed = true;
                        return null;
                    }
                    ResultOrError resultOrError = (ResultOrError) completableFuture.join();
                    StreamingResponse.log.debug("Writing to sink");
                    this.sink.write(resultOrError);
                    return null;
                } catch (IOException e) {
                    StreamingResponse.log.error("Error when writing streaming result to response channel.", (Throwable) e);
                    return null;
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            this.tail.whenComplete((r5, th) -> {
                try {
                    this.sinkClosed = true;
                    this.sink.close();
                } catch (IOException e) {
                    StreamingResponse.log.error("Error when closing response channel.", (Throwable) e);
                }
            });
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse$ComposingStreamingResponse.class */
    private static final class ComposingStreamingResponse<I, O> extends StreamingResponse<O> {
        private final StreamingResponse<I> streamingResponseInput;
        private final Function<? super I, ? extends CompletableFuture<O>> transform;

        private ComposingStreamingResponse(StreamingResponse<I> streamingResponse, Function<? super I, ? extends CompletableFuture<O>> function, ChunkedOutputFactory chunkedOutputFactory) {
            super(chunkedOutputFactory);
            this.streamingResponseInput = (StreamingResponse) Objects.requireNonNull(streamingResponse);
            this.transform = (Function) Objects.requireNonNull(function);
        }

        @Override // io.confluent.kafkarest.response.StreamingResponse
        public boolean hasNext() {
            return this.streamingResponseInput.hasNext();
        }

        @Override // io.confluent.kafkarest.response.StreamingResponse
        public CompletableFuture<O> next() {
            return (CompletableFuture<O>) this.streamingResponseInput.next().thenCompose((Function<? super I, ? extends CompletionStage<U>>) this.transform);
        }

        @Override // io.confluent.kafkarest.response.StreamingResponse
        public void close() {
            this.streamingResponseInput.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse$CompositeErrorMapper.class */
    public static final class CompositeErrorMapper {
        private final List<ErrorMapper<?>> mappers;
        private final ErrorMapper<Throwable> defaultMapper;

        /* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse$CompositeErrorMapper$Builder.class */
        private static final class Builder {
            private final ImmutableList.Builder<ErrorMapper<?>> mappers;
            private ErrorMapper<Throwable> defaultMapper;

            private Builder() {
                this.mappers = ImmutableList.builder();
            }

            /* JADX INFO: Access modifiers changed from: private */
            public <T extends Throwable> Builder putMapper(Class<T> cls, ExceptionMapper<T> exceptionMapper, Function<Response, Integer> function, Function<Response, String> function2) {
                this.mappers.add((ImmutableList.Builder<ErrorMapper<?>>) new ErrorMapper<>(cls, exceptionMapper, function, function2));
                return this;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Builder setDefaultMapper(ExceptionMapper<Throwable> exceptionMapper, Function<Response, Integer> function, Function<Response, String> function2) {
                this.defaultMapper = new ErrorMapper<>(Throwable.class, exceptionMapper, function, function2);
                return this;
            }

            public CompositeErrorMapper build() {
                return new CompositeErrorMapper(this.mappers.build(), this.defaultMapper);
            }
        }

        private CompositeErrorMapper(List<ErrorMapper<?>> list, ErrorMapper<Throwable> errorMapper) {
            this.mappers = (List) Objects.requireNonNull(list);
            this.defaultMapper = (ErrorMapper) Objects.requireNonNull(errorMapper);
        }

        public ErrorResponse toErrorResponse(Throwable th) {
            for (ErrorMapper<?> errorMapper : this.mappers) {
                if (errorMapper.handles(th)) {
                    return errorMapper.toErrorResponse(th);
                }
            }
            return this.defaultMapper.toErrorResponse(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse$ErrorHolder.class */
    public static abstract class ErrorHolder extends ResultOrError {
        /* JADX INFO: Access modifiers changed from: package-private */
        @JsonValue
        public abstract ErrorResponse getError();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse$ErrorMapper.class */
    public static final class ErrorMapper<T extends Throwable> {
        private final Class<T> errorClass;
        private final ExceptionMapper<T> mapper;
        private final Function<Response, Integer> errorCode;
        private final Function<Response, String> message;

        private ErrorMapper(Class<T> cls, ExceptionMapper<T> exceptionMapper, Function<Response, Integer> function, Function<Response, String> function2) {
            this.errorClass = (Class) Objects.requireNonNull(cls);
            this.mapper = (ExceptionMapper) Objects.requireNonNull(exceptionMapper);
            this.errorCode = (Function) Objects.requireNonNull(function);
            this.message = (Function) Objects.requireNonNull(function2);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean handles(Throwable th) {
            return this.errorClass.isInstance(th);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ErrorResponse toErrorResponse(Throwable th) {
            Response response = this.mapper.toResponse(th);
            return ErrorResponse.create(this.errorCode.apply(response).intValue(), this.message.apply(response));
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse$InputStreamingResponse.class */
    private static class InputStreamingResponse<T> extends StreamingResponse<T> {
        private final MappingIterator<T> mappingIteratorInput;

        private InputStreamingResponse(MappingIterator<T> mappingIterator, ChunkedOutputFactory chunkedOutputFactory) {
            super(chunkedOutputFactory);
            this.mappingIteratorInput = (MappingIterator) Objects.requireNonNull(mappingIterator);
        }

        @Override // io.confluent.kafkarest.response.StreamingResponse
        public void close() {
            try {
                this.mappingIteratorInput.close();
            } catch (IOException e) {
                StreamingResponse.log.error("Error when closing the request stream.", (Throwable) e);
            }
        }

        @Override // io.confluent.kafkarest.response.StreamingResponse
        public boolean hasNext() {
            try {
                return this.mappingIteratorInput.hasNext();
            } catch (RuntimeJsonMappingException e) {
                throw new BadRequestException(String.format("Error processing JSON: %s", e.getMessage()), e);
            } catch (RuntimeException e2) {
                throw new BadRequestException(String.format("Error processing message: %s", e2.getMessage()), e2);
            }
        }

        @Override // io.confluent.kafkarest.response.StreamingResponse
        public CompletableFuture<T> next() {
            try {
                return CompletableFuture.completedFuture(this.mappingIteratorInput.nextValue());
            } catch (Throwable th) {
                return CompletableFutures.failedFuture(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse$ResultHolder.class */
    public static abstract class ResultHolder<T> extends ResultOrError {
        /* JADX INFO: Access modifiers changed from: package-private */
        @JsonValue
        public abstract T getResult();
    }

    /* loaded from: input_file:io/confluent/kafkarest/response/StreamingResponse$ResultOrError.class */
    public static abstract class ResultOrError {
        public static <T> ResultHolder<T> result(T t) {
            return new AutoValue_StreamingResponse_ResultHolder(t);
        }

        public static ErrorHolder error(ErrorResponse errorResponse) {
            return new AutoValue_StreamingResponse_ErrorHolder(errorResponse);
        }
    }

    StreamingResponse(ChunkedOutputFactory chunkedOutputFactory) {
        this.chunkedOutputFactory = (ChunkedOutputFactory) Objects.requireNonNull(chunkedOutputFactory);
    }

    public static <T> StreamingResponse<T> from(MappingIterator<T> mappingIterator, ChunkedOutputFactory chunkedOutputFactory) {
        return new InputStreamingResponse(mappingIterator, chunkedOutputFactory);
    }

    public final <O> StreamingResponse<O> compose(Function<? super T, ? extends CompletableFuture<O>> function) {
        return new ComposingStreamingResponse(function, this.chunkedOutputFactory);
    }

    public final void resume(AsyncResponse asyncResponse) {
        log.debug("Resuming StreamingResponse");
        AsyncResponseQueue asyncResponseQueue = new AsyncResponseQueue(this.chunkedOutputFactory);
        asyncResponseQueue.asyncResume(asyncResponse);
        while (hasNext() && !asyncResponseQueue.isClosed()) {
            try {
                try {
                    asyncResponseQueue.push(next().handle((BiFunction) this::handleNext));
                } catch (Exception e) {
                    log.debug("Exception thrown when processing stream ", (Throwable) e);
                    asyncResponseQueue.push(CompletableFuture.completedFuture(ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e))));
                    close();
                    asyncResponseQueue.close();
                    return;
                }
            } finally {
                close();
                asyncResponseQueue.close();
            }
        }
    }

    private ResultOrError handleNext(T t, @Nullable Throwable th) {
        if (th == null) {
            return ResultOrError.result(t);
        }
        log.debug("Error processing streaming operation.", th);
        return ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(th.getCause()));
    }

    abstract boolean hasNext();

    abstract void close();

    abstract CompletableFuture<T> next();
}
