package io.confluent.kafkarest.resources.v3;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.node.NullNode;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.protobuf.ByteString;
import io.confluent.kafkarest.controllers.ProduceController;
import io.confluent.kafkarest.controllers.RecordSerializer;
import io.confluent.kafkarest.controllers.SchemaManager;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.ProduceResult;
import io.confluent.kafkarest.entities.RegisteredSchema;
import io.confluent.kafkarest.entities.v3.ProduceRequest;
import io.confluent.kafkarest.entities.v3.ProduceResponse;
import io.confluent.kafkarest.exceptions.BadRequestException;
import io.confluent.kafkarest.extension.ResourceBlocklistFeature;
import io.confluent.kafkarest.response.StreamingResponse;
import io.confluent.rest.annotations.PerformanceMetric;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collector;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/v3/clusters/{clusterId}/topics/{topicName}/records")
@ResourceBlocklistFeature.ResourceName("api.v3.produce.*")
/* loaded from: input_file:io/confluent/kafkarest/resources/v3/ProduceAction.class */
public final class ProduceAction {
    private static final Collector<ProduceRequest.ProduceRequestHeader, ImmutableMultimap.Builder<String, Optional<ByteString>>, ImmutableMultimap<String, Optional<ByteString>>> PRODUCE_REQUEST_HEADER_COLLECTOR = Collector.of(ImmutableMultimap::builder, (builder, produceRequestHeader) -> {
        builder.put(produceRequestHeader.getName(), produceRequestHeader.getValue());
    }, (builder2, builder3) -> {
        return builder2.putAll(builder3.build());
    }, (v0) -> {
        return v0.build();
    }, new Collector.Characteristics[0]);
    private final Provider<SchemaManager> schemaManager;
    private final Provider<RecordSerializer> recordSerializer;
    private final Provider<ProduceController> produceController;

    @Inject
    public ProduceAction(Provider<SchemaManager> provider, Provider<RecordSerializer> provider2, Provider<ProduceController> provider3) {
        this.schemaManager = (Provider) Objects.requireNonNull(provider);
        this.recordSerializer = (Provider) Objects.requireNonNull(provider2);
        this.produceController = (Provider) Objects.requireNonNull(provider3);
    }

    @Consumes({"application/json"})
    @POST
    @Produces({"application/json"})
    @ResourceBlocklistFeature.ResourceName("api.v3.produce.produce-to-topic")
    @PerformanceMetric("v3.produce.produce-to-topic")
    public void produce(@Suspended AsyncResponse asyncResponse, @PathParam("clusterId") String str, @PathParam("topicName") String str2, MappingIterator<ProduceRequest> mappingIterator) throws Exception {
        ProduceController produceController = this.produceController.get();
        StreamingResponse.from(mappingIterator).compose(produceRequest -> {
            return produce(str, str2, produceRequest, produceController);
        }).resume(asyncResponse);
    }

    private CompletableFuture<ProduceResponse> produce(String str, String str2, ProduceRequest produceRequest, ProduceController produceController) {
        Optional<RegisteredSchema> flatMap = produceRequest.getKey().flatMap(produceRequestData -> {
            return getSchema(str2, true, produceRequestData);
        });
        Optional<EmbeddedFormat> optional = (Optional) flatMap.map(registeredSchema -> {
            return Optional.of(registeredSchema.getFormat());
        }).orElse(produceRequest.getKey().flatMap((v0) -> {
            return v0.getFormat();
        }));
        Optional<ByteString> serialize = serialize(str2, optional, flatMap, produceRequest.getKey(), true);
        Optional<RegisteredSchema> flatMap2 = produceRequest.getValue().flatMap(produceRequestData2 -> {
            return getSchema(str2, false, produceRequestData2);
        });
        Optional<EmbeddedFormat> optional2 = (Optional) flatMap2.map(registeredSchema2 -> {
            return Optional.of(registeredSchema2.getFormat());
        }).orElse(produceRequest.getValue().flatMap((v0) -> {
            return v0.getFormat();
        }));
        return produceController.produce(str, str2, produceRequest.getPartitionId(), (Multimap) produceRequest.getHeaders().stream().collect(PRODUCE_REQUEST_HEADER_COLLECTOR), serialize, serialize(str2, optional2, flatMap2, produceRequest.getValue(), false), produceRequest.getTimestamp().orElse(Instant.now())).thenApply(produceResult -> {
            return toProduceResponse(str, str2, optional, flatMap, optional2, flatMap2, produceResult);
        });
    }

    private Optional<RegisteredSchema> getSchema(String str, boolean z, ProduceRequest.ProduceRequestData produceRequestData) {
        if (produceRequestData.getFormat().isPresent() && !produceRequestData.getFormat().get().requiresSchema()) {
            return Optional.empty();
        }
        try {
            return Optional.of(this.schemaManager.get().getSchema(str, produceRequestData.getFormat(), produceRequestData.getSubject(), produceRequestData.getSubjectNameStrategy().map(Function.identity()), produceRequestData.getSchemaId(), produceRequestData.getSchemaVersion(), produceRequestData.getRawSchema(), z));
        } catch (IllegalArgumentException e) {
            throw new BadRequestException(e.getMessage(), e);
        }
    }

    private Optional<ByteString> serialize(String str, Optional<EmbeddedFormat> optional, Optional<RegisteredSchema> optional2, Optional<ProduceRequest.ProduceRequestData> optional3, boolean z) {
        return this.recordSerializer.get().serialize(optional.orElse(EmbeddedFormat.BINARY), str, optional2, (JsonNode) optional3.map((v0) -> {
            return v0.getData();
        }).orElse(NullNode.getInstance()), z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ProduceResponse toProduceResponse(String str, String str2, Optional<EmbeddedFormat> optional, Optional<RegisteredSchema> optional2, Optional<EmbeddedFormat> optional3, Optional<RegisteredSchema> optional4, ProduceResult produceResult) {
        return ProduceResponse.builder().setClusterId(str).setTopicName(str2).setPartitionId(produceResult.getPartitionId()).setOffset(produceResult.getOffset()).setTimestamp(produceResult.getTimestamp()).setKey((Optional<ProduceResponse.ProduceResponseData>) optional.map(embeddedFormat -> {
            return ProduceResponse.ProduceResponseData.builder().setType((Optional<EmbeddedFormat>) optional).setSubject(optional2.map((v0) -> {
                return v0.getSubject();
            })).setSchemaId(optional2.map((v0) -> {
                return v0.getSchemaId();
            })).setSchemaVersion(optional2.map((v0) -> {
                return v0.getSchemaVersion();
            })).setSize(produceResult.getSerializedKeySize()).build();
        })).setValue((Optional<ProduceResponse.ProduceResponseData>) optional3.map(embeddedFormat2 -> {
            return ProduceResponse.ProduceResponseData.builder().setType((Optional<EmbeddedFormat>) optional3).setSubject(optional4.map((v0) -> {
                return v0.getSubject();
            })).setSchemaId(optional4.map((v0) -> {
                return v0.getSchemaId();
            })).setSchemaVersion(optional4.map((v0) -> {
                return v0.getSchemaVersion();
            })).setSize(produceResult.getSerializedValueSize()).build();
        })).build();
    }
}
