package io.confluent.controlcenter.rest;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.inject.Inject;
import io.confluent.common.security.auth.JwtPrincipal;
import io.confluent.controlcenter.errors.ProduceMessageException;
import io.confluent.controlcenter.rest.RestModule;
import io.confluent.controlcenter.util.PrincipalUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
import javax.websocket.EncodeException;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/rest/AbstractProducerResource.class */
public abstract class AbstractProducerResource {
    protected String version;
    protected String topic;
    protected KafkaProducer<byte[], byte[]> producer;
    private static final String V2 = "2.0";
    private static final String V3 = "3.0";
    private final ListeningScheduledExecutorService executorService;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractProducerResource.class);
    public static final String KEY = "KEY";
    public static final String VALUE = "VALUE";
    public static final Set<String> PROPS = ImmutableSet.of(KEY, VALUE);
    private static final byte[] NULL = "null".getBytes(StandardCharsets.UTF_8);

    @Inject
    public AbstractProducerResource(@RestModule.WebSockets ListeningScheduledExecutorService listeningScheduledExecutorService) {
        this.executorService = listeningScheduledExecutorService;
    }

    abstract KafkaProducer<byte[], byte[]> getProducer(String str, Session session);

    @OnMessage
    public void onMessage(Session session, String str) {
        log.trace("got message={}", str);
        if (this.producer == null) {
            throw new ProduceMessageException("no producer registered yet");
        }
        String str2 = this.version;
        boolean z = -1;
        switch (str2.hashCode()) {
            case 49524:
                if (str2.equals("2.0")) {
                    z = false;
                    break;
                }
                break;
            case 50485:
                if (str2.equals(V3)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                onMessageV2(session, str);
                return;
            case true:
                onMessageV3(session, str);
                return;
            default:
                throw new ProduceMessageException("faulty version specified");
        }
    }

    @OnOpen
    public void onOpen(Session session, @PathParam("clusterId") String str, @PathParam("version") String str2) {
        if (!str2.equals("2.0") && !str2.equals(V3)) {
            throw new ProduceMessageException("faulty version specified");
        }
        this.version = str2;
        this.topic = extract(session.getRequestParameterMap(), "topic");
        this.producer = getProducer(str, session);
        maybeScheduleSessionTimeout(session);
    }

    @OnClose
    public void onClose(Session session, CloseReason closeReason) throws IOException {
        log.trace("closeReason={}", closeReason);
        if (this.producer != null) {
            try {
                log.trace("closing producer");
                this.producer.flush();
                this.producer.close();
            } catch (Throwable th) {
                log.warn("unable to close producer", th);
            }
        }
    }

    @OnError
    public void onError(Session session, Throwable th) throws IOException, EncodeException {
        log.warn(th.getLocalizedMessage());
        session.getBasicRemote().sendObject(Response.serverError().entity(th.getLocalizedMessage()).build());
        session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, th.getLocalizedMessage()));
    }

    protected void onMessageV2(Session session, String str) {
        produceMessage(session, null, str.getBytes(StandardCharsets.UTF_8));
    }

    protected void onMessageV3(Session session, String str) {
        try {
            JsonObject asJsonObject = JsonParser.parseString(str).getAsJsonObject();
            if (!asJsonObject.keySet().equals(PROPS)) {
                throw new ProduceMessageException("JSON message does not follow required schema");
            }
            produceMessage(session, getBytesFromJson(asJsonObject.get(KEY)), getBytesFromJson(asJsonObject.get(VALUE)));
        } catch (Exception e) {
            throw new ProduceMessageException(e, "failed to parse JSON message");
        }
    }

    private void produceMessage(Session session, byte[] bArr, byte[] bArr2) {
        try {
            this.producer.send(new ProducerRecord<>(this.topic, bArr, bArr2)).get(3000L, TimeUnit.MILLISECONDS);
            session.getBasicRemote().sendObject(Response.ok().build());
        } catch (Exception e) {
            throw new ProduceMessageException(e, "Kafka internal produce error");
        }
    }

    private static byte[] getBytesFromJson(JsonElement jsonElement) {
        byte[] bytes = jsonElement.toString().getBytes(StandardCharsets.UTF_8);
        if (Arrays.equals(bytes, NULL)) {
            bytes = null;
        }
        return bytes;
    }

    private void maybeScheduleSessionTimeout(Session session) {
        JwtPrincipal jwtPrincipalOrNull = PrincipalUtils.jwtPrincipalOrNull(session.getUserPrincipal());
        if (jwtPrincipalOrNull != null) {
            this.executorService.schedule(() -> {
                try {
                    session.close();
                    log.info("Producer session closed for principal={}", jwtPrincipalOrNull.getName());
                } catch (IOException e) {
                    log.warn("Could not force close websockets session", (Throwable) e);
                }
            }, Util.getTokenLifetimeMs(jwtPrincipalOrNull.getJwt()) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private static String extract(Map<String, List<String>> map, String str) {
        return StringUtils.stripToEmpty(map.containsKey(str) ? (String) Iterables.getLast(map.get(str), "") : "");
    }
}
