package io.confluent.controlcenter.rest;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.inject.Inject;
import io.confluent.controlcenter.consumption.ConsumerHelper;
import io.confluent.controlcenter.rest.RestModule;
import io.confluent.controlcenter.rest.res.ConsumeToSocket;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/rest/AbstractConsumerResource.class */
public abstract class AbstractConsumerResource {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractConsumerResource.class);
    private final ListeningScheduledExecutorService executorService;
    private final SchemaRegistryClient schemaRegistryClient;
    private ListenableScheduledFuture<?> current;

    @Inject
    public AbstractConsumerResource(@RestModule.WebSockets ListeningScheduledExecutorService listeningScheduledExecutorService, @Nullable SchemaRegistryClient schemaRegistryClient) {
        this.executorService = listeningScheduledExecutorService;
        this.schemaRegistryClient = schemaRegistryClient;
    }

    abstract Consumer<byte[], byte[]> getConsumer(String str, Session session);

    private static String extract(Map<String, List<String>> map, String str) {
        return StringUtils.stripToEmpty(map.containsKey(str) ? (String) Iterables.getLast(map.get(str), "") : "");
    }

    private static long extractAsLong(Session session, Map<String, List<String>> map, String str, long j) throws IOException {
        long j2 = j;
        try {
            String extract = extract(map, str);
            if (!StringUtils.isEmpty(extract)) {
                j2 = Long.parseLong(extract);
            }
        } catch (NumberFormatException e) {
            log.warn("parsing " + str, (Throwable) e);
            if (session.isOpen()) {
                session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, j2 + " is invalid " + str));
            }
        }
        return j2;
    }

    @OnOpen
    public void onSessionOpened(Session session, @PathParam("clusterId") String str) throws IOException, ExecutionException, InterruptedException {
        Map<String, List<String>> requestParameterMap = session.getRequestParameterMap();
        String extract = extract(requestParameterMap, "fromBeginning");
        String extract2 = extract(requestParameterMap, "offset");
        String extract3 = extract(requestParameterMap, "timestamp");
        if (!atMostOne(extract, extract2, extract3)) {
            session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "only one of fromBeginning, offset, timestamp may be set"));
        }
        ConsumerHelper.Position position = extract.toLowerCase().equals("true") ? new ConsumerHelper.Position(ConsumerHelper.Position.Type.EARLIEST, null) : !StringUtils.isEmpty(extract2) ? getPosition(session, ConsumerHelper.Position.Type.OFFSET, extract2) : !StringUtils.isEmpty(extract3) ? getPosition(session, ConsumerHelper.Position.Type.TIMESTAMP, extract3) : new ConsumerHelper.Position(ConsumerHelper.Position.Type.LATEST, null);
        long extractAsLong = extractAsLong(session, requestParameterMap, "limit", Long.MAX_VALUE);
        long extractAsLong2 = extractAsLong(session, requestParameterMap, "partition", -1L);
        if (session.isOpen()) {
            final Consumer<byte[], byte[]> consumer = getConsumer(str, session);
            final String extract4 = extract(requestParameterMap, "topic");
            log.info("trace getting tps");
            List transform = Lists.transform((List) consumer.partitionsFor(extract4).stream().filter(partitionInfo -> {
                return extractAsLong2 == -1 || ((long) partitionInfo.partition()) == extractAsLong2;
            }).collect(Collectors.toList()), new Function<PartitionInfo, TopicPartition>() { // from class: io.confluent.controlcenter.rest.AbstractConsumerResource.1
                @Override // com.google.common.base.Function
                public TopicPartition apply(PartitionInfo partitionInfo2) {
                    return new TopicPartition(extract4, partitionInfo2.partition());
                }
            });
            if (transform.isEmpty()) {
                session.close();
                return;
            }
            log.info("assigning={}", transform);
            consumer.assign(transform);
            ConsumerHelper.setPosition(consumer, position);
            log.info("trace position set");
            this.current = this.executorService.scheduleWithFixedDelay((Runnable) new ConsumeToSocket(session, consumer, extractAsLong, this.schemaRegistryClient), 0L, 1L, TimeUnit.SECONDS);
            this.current.addListener(new Runnable() { // from class: io.confluent.controlcenter.rest.AbstractConsumerResource.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AbstractConsumerResource.log.trace("closing consumer");
                        consumer.close();
                    } catch (Throwable th) {
                        AbstractConsumerResource.log.warn("unable to close consumer", th);
                    }
                }
            }, this.executorService);
        }
    }

    private ConsumerHelper.Position getPosition(Session session, ConsumerHelper.Position.Type type, String str) throws IOException {
        try {
            ConsumerHelper.Position position = new ConsumerHelper.Position(type, Long.valueOf(Long.parseLong(str)));
            if (position.position() > 0) {
                return position;
            }
            session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, type + " must be nonnegative"));
            return null;
        } catch (NumberFormatException e) {
            log.warn("parsing timestamp", (Throwable) e);
            session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, str + " is invalid " + type));
            return null;
        }
    }

    private boolean atMostOne(String str, String str2, String str3) {
        if (!StringUtils.isEmpty(str)) {
            return StringUtils.isEmpty(str2) && StringUtils.isEmpty(str3);
        }
        if (StringUtils.isEmpty(str2)) {
            return true;
        }
        return StringUtils.isEmpty(str3);
    }

    @OnMessage
    public void onMessageReceived(String str, Session session) {
        log.trace("got message={}", str);
    }

    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        log.trace("closeReason={}", closeReason);
        if (this.current != null) {
            this.current.cancel(true);
        }
    }

    @OnError
    public void onErrorReceived(Throwable th) {
        log.error(ConnectProtocol.ERROR_KEY_NAME, th);
    }
}
