package io.confluent.kafkarest.resources.v2;

import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.UriUtils;
import io.confluent.kafkarest.Versions;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.entities.v2.BinaryConsumerRecord;
import io.confluent.kafkarest.entities.v2.CommitOffsetsResponse;
import io.confluent.kafkarest.entities.v2.ConsumerAssignmentRequest;
import io.confluent.kafkarest.entities.v2.ConsumerAssignmentResponse;
import io.confluent.kafkarest.entities.v2.ConsumerCommittedRequest;
import io.confluent.kafkarest.entities.v2.ConsumerCommittedResponse;
import io.confluent.kafkarest.entities.v2.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSeekRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSeekToRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionResponse;
import io.confluent.kafkarest.entities.v2.CreateConsumerInstanceRequest;
import io.confluent.kafkarest.entities.v2.CreateConsumerInstanceResponse;
import io.confluent.kafkarest.entities.v2.JsonConsumerRecord;
import io.confluent.kafkarest.entities.v2.SchemaConsumerRecord;
import io.confluent.kafkarest.extension.ResourceAccesslistFeature;
import io.confluent.kafkarest.v2.BinaryKafkaConsumerState;
import io.confluent.kafkarest.v2.JsonKafkaConsumerState;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import io.confluent.kafkarest.v2.KafkaConsumerState;
import io.confluent.kafkarest.v2.SchemaKafkaConsumerState;
import io.confluent.rest.annotations.PerformanceMetric;
import java.time.Duration;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;

@Path("/consumers")
@Consumes({"application/vnd.kafka.binary.v2+json", Versions.KAFKA_V2_JSON_AVRO, Versions.KAFKA_V2_JSON_JSON, Versions.KAFKA_V2_JSON_JSON_SCHEMA, Versions.KAFKA_V2_JSON_PROTOBUF, Versions.KAFKA_V2_JSON})
@ResourceAccesslistFeature.ResourceName("api.v2.consumers.*")
@Produces({Versions.KAFKA_V2_JSON_BINARY_WEIGHTED_LOW, Versions.KAFKA_V2_JSON_AVRO_WEIGHTED_LOW, Versions.KAFKA_V2_JSON_JSON_WEIGHTED_LOW, Versions.KAFKA_V2_JSON_JSON_SCHEMA_WEIGHTED_LOW, Versions.KAFKA_V2_JSON_PROTOBUF_WEIGHTED_LOW, Versions.KAFKA_V2_JSON_WEIGHTED})
/* loaded from: input_file:io/confluent/kafkarest/resources/v2/ConsumersResource.class */
public final class ConsumersResource {
    private final KafkaRestContext ctx;

    public ConsumersResource(KafkaRestContext kafkaRestContext) {
        this.ctx = kafkaRestContext;
    }

    @Path("/{group}")
    @Valid
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.create")
    @POST
    @PerformanceMetric("consumer.create+v2")
    public CreateConsumerInstanceResponse createGroup(@Context UriInfo uriInfo, @PathParam("group") String str, @Valid CreateConsumerInstanceRequest createConsumerInstanceRequest) {
        if (createConsumerInstanceRequest == null) {
            createConsumerInstanceRequest = CreateConsumerInstanceRequest.PROTOTYPE;
        }
        String createConsumer = this.ctx.getKafkaConsumerManager().createConsumer(str, createConsumerInstanceRequest.toConsumerInstanceConfig());
        return new CreateConsumerInstanceResponse(createConsumer, UriUtils.absoluteUri(this.ctx.getConfig(), uriInfo, "consumers", str, "instances", createConsumer));
    }

    @Path("/{group}/instances/{instance}")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.delete")
    @DELETE
    @PerformanceMetric("consumer.delete+v2")
    public void deleteGroup(@PathParam("group") String str, @PathParam("instance") String str2) {
        this.ctx.getKafkaConsumerManager().deleteConsumer(str, str2);
    }

    @Path("/{group}/instances/{instance}/subscription")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.subscribe")
    @POST
    @PerformanceMetric("consumer.subscribe+v2")
    public void subscribe(@Context UriInfo uriInfo, @PathParam("group") String str, @PathParam("instance") String str2, @NotNull @Valid ConsumerSubscriptionRecord consumerSubscriptionRecord) {
        try {
            this.ctx.getKafkaConsumerManager().subscribe(str, str2, consumerSubscriptionRecord);
        } catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @GET
    @Path("/{group}/instances/{instance}/subscription")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.get-subscription")
    @PerformanceMetric("consumer.subscription+v2")
    public ConsumerSubscriptionResponse subscription(@Context UriInfo uriInfo, @PathParam("group") String str, @PathParam("instance") String str2) {
        return this.ctx.getKafkaConsumerManager().subscription(str, str2);
    }

    @Path("/{group}/instances/{instance}/subscription")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.unsubscribe")
    @DELETE
    @PerformanceMetric("consumer.unsubscribe+v2")
    public void unsubscribe(@Context UriInfo uriInfo, @PathParam("group") String str, @PathParam("instance") String str2) {
        this.ctx.getKafkaConsumerManager().unsubscribe(str, str2);
    }

    @GET
    @Path("/{group}/instances/{instance}/records")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.consume-binary")
    @Produces({"application/vnd.kafka.binary.v2+json", Versions.KAFKA_V2_JSON_WEIGHTED})
    @PerformanceMetric("consumer.records.read-binary+v2")
    public void readRecordBinary(@Suspended AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2, @QueryParam("timeout") @DefaultValue("-1") long j, @QueryParam("max_bytes") @DefaultValue("-1") long j2) {
        readRecords(asyncResponse, str, str2, Duration.ofMillis(j), j2, BinaryKafkaConsumerState.class, BinaryConsumerRecord::fromConsumerRecord);
    }

    @GET
    @Path("/{group}/instances/{instance}/records")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.consume-json")
    @Produces({Versions.KAFKA_V2_JSON_JSON_WEIGHTED_LOW})
    @PerformanceMetric("consumer.records.read-json+v2")
    public void readRecordJson(@Suspended AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2, @QueryParam("timeout") @DefaultValue("-1") long j, @QueryParam("max_bytes") @DefaultValue("-1") long j2) {
        readRecords(asyncResponse, str, str2, Duration.ofMillis(j), j2, JsonKafkaConsumerState.class, JsonConsumerRecord::fromConsumerRecord);
    }

    @GET
    @Path("/{group}/instances/{instance}/records")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.consume-avro")
    @Produces({Versions.KAFKA_V2_JSON_AVRO_WEIGHTED_LOW})
    @PerformanceMetric("consumer.records.read-avro+v2")
    public void readRecordAvro(@Suspended AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2, @QueryParam("timeout") @DefaultValue("-1") long j, @QueryParam("max_bytes") @DefaultValue("-1") long j2) {
        readRecords(asyncResponse, str, str2, Duration.ofMillis(j), j2, SchemaKafkaConsumerState.class, SchemaConsumerRecord::fromConsumerRecord);
    }

    @GET
    @Path("/{group}/instances/{instance}/records")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.consume-json-schema")
    @Produces({Versions.KAFKA_V2_JSON_JSON_SCHEMA_WEIGHTED_LOW})
    @PerformanceMetric("consumer.records.read-jsonschema+v2")
    public void readRecordJsonSchema(@Suspended AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2, @QueryParam("timeout") @DefaultValue("-1") long j, @QueryParam("max_bytes") @DefaultValue("-1") long j2) {
        readRecords(asyncResponse, str, str2, Duration.ofMillis(j), j2, SchemaKafkaConsumerState.class, SchemaConsumerRecord::fromConsumerRecord);
    }

    @GET
    @Path("/{group}/instances/{instance}/records")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.consume-protobuf")
    @Produces({Versions.KAFKA_V2_JSON_PROTOBUF_WEIGHTED_LOW})
    @PerformanceMetric("consumer.records.read-protobuf+v2")
    public void readRecordProtobuf(@Suspended AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2, @QueryParam("timeout") @DefaultValue("-1") long j, @QueryParam("max_bytes") @DefaultValue("-1") long j2) {
        readRecords(asyncResponse, str, str2, Duration.ofMillis(j), j2, SchemaKafkaConsumerState.class, SchemaConsumerRecord::fromConsumerRecord);
    }

    @Path("/{group}/instances/{instance}/offsets")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.commit-offsets")
    @POST
    @PerformanceMetric("consumer.commit-offsets+v2")
    public void commitOffsets(@Suspended final AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2, @QueryParam("async") @DefaultValue("false") String str3, @Valid ConsumerOffsetCommitRequest consumerOffsetCommitRequest) {
        this.ctx.getKafkaConsumerManager().commitOffsets(str, str2, str3, consumerOffsetCommitRequest, new KafkaConsumerManager.CommitCallback() { // from class: io.confluent.kafkarest.resources.v2.ConsumersResource.1
            @Override // io.confluent.kafkarest.v2.KafkaConsumerManager.CommitCallback
            public void onCompletion(List<TopicPartitionOffset> list, Exception exc) {
                if (exc != null) {
                    asyncResponse.resume((Throwable) exc);
                } else {
                    asyncResponse.resume(CommitOffsetsResponse.fromOffsets(list));
                }
            }
        });
    }

    @GET
    @Path("/{group}/instances/{instance}/offsets")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.get-committed-offsets")
    @PerformanceMetric("consumer.committed-offsets+v2")
    public ConsumerCommittedResponse committedOffsets(@PathParam("group") String str, @PathParam("instance") String str2, @Valid ConsumerCommittedRequest consumerCommittedRequest) {
        if (consumerCommittedRequest == null) {
            throw Errors.partitionNotFoundException();
        }
        return this.ctx.getKafkaConsumerManager().committed(str, str2, consumerCommittedRequest);
    }

    @Path("/{group}/instances/{instance}/positions/beginning")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.seek-to-beginning")
    @POST
    @PerformanceMetric("consumer.seek-to-beginning+v2")
    public void seekToBeginning(@Context UriInfo uriInfo, @PathParam("group") String str, @PathParam("instance") String str2, @NotNull @Valid ConsumerSeekToRequest consumerSeekToRequest) {
        try {
            this.ctx.getKafkaConsumerManager().seekToBeginning(str, str2, consumerSeekToRequest);
        } catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @Path("/{group}/instances/{instance}/positions/end")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.seek-to-end")
    @POST
    @PerformanceMetric("consumer.seek-to-end+v2")
    public void seekToEnd(@Context UriInfo uriInfo, @PathParam("group") String str, @PathParam("instance") String str2, @NotNull @Valid ConsumerSeekToRequest consumerSeekToRequest) {
        try {
            this.ctx.getKafkaConsumerManager().seekToEnd(str, str2, consumerSeekToRequest);
        } catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @Path("/{group}/instances/{instance}/positions")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.seek-to-offset")
    @POST
    @PerformanceMetric("consumer.seek-to-offset+v2")
    public void seekToOffset(@Context UriInfo uriInfo, @PathParam("group") String str, @PathParam("instance") String str2, @NotNull @Valid ConsumerSeekRequest consumerSeekRequest) {
        try {
            this.ctx.getKafkaConsumerManager().seek(str, str2, consumerSeekRequest);
        } catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @Path("/{group}/instances/{instance}/assignments")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.assign")
    @POST
    @PerformanceMetric("consumer.assign+v2")
    public void assign(@Context UriInfo uriInfo, @PathParam("group") String str, @PathParam("instance") String str2, @NotNull @Valid ConsumerAssignmentRequest consumerAssignmentRequest) {
        try {
            this.ctx.getKafkaConsumerManager().assign(str, str2, consumerAssignmentRequest);
        } catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @GET
    @Path("/{group}/instances/{instance}/assignments")
    @ResourceAccesslistFeature.ResourceName("api.v2.consumers.get-assignments")
    @PerformanceMetric("consumer.assignment+v2")
    public ConsumerAssignmentResponse assignment(@Context UriInfo uriInfo, @PathParam("group") String str, @PathParam("instance") String str2) {
        return this.ctx.getKafkaConsumerManager().assignment(str, str2);
    }

    private <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> void readRecords(@Suspended final AsyncResponse asyncResponse, String str, String str2, Duration duration, long j, Class<? extends KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> cls, final Function<ConsumerRecord<ClientKeyT, ClientValueT>, ?> function) {
        this.ctx.getKafkaConsumerManager().readRecords(str, str2, cls, duration, j <= 0 ? Long.MAX_VALUE : j, new ConsumerReadCallback<ClientKeyT, ClientValueT>() { // from class: io.confluent.kafkarest.resources.v2.ConsumersResource.2
            @Override // io.confluent.kafkarest.ConsumerReadCallback
            public void onCompletion(List<ConsumerRecord<ClientKeyT, ClientValueT>> list, Exception exc) {
                if (exc != null) {
                    asyncResponse.resume((Throwable) exc);
                } else {
                    asyncResponse.resume(list.stream().map(function).collect(Collectors.toList()));
                }
            }
        });
    }
}
