package io.confluent.kafka.jms;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.Serializable;
import java.util.Arrays;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/jms/KafkaSession.class */
class KafkaSession implements Session {
    static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaSession.class);
    final String clientID;
    final int acknowledgeMode;
    final KafkaConnection connection;
    final ClientState clientState;
    final JMSClientConfig jmsClientConfig;
    ProducerFactory producerFactory;
    ConsumerFactory consumerFactory;

    public KafkaSession(JMSClientConfig jMSClientConfig, int i, KafkaConnection kafkaConnection, ClientState clientState) {
        this.jmsClientConfig = jMSClientConfig;
        this.acknowledgeMode = i;
        this.connection = kafkaConnection;
        this.clientState = clientState;
        this.clientID = this.jmsClientConfig.clientID;
        this.producerFactory = new ProducerFactoryImpl(this.jmsClientConfig.producer);
        this.consumerFactory = new ConsumerFactoryImpl(this.jmsClientConfig.consumer);
    }

    @Override // javax.jms.Session
    public BytesMessage createBytesMessage() throws JMSException {
        return new KafkaBytesMessage();
    }

    @Override // javax.jms.Session
    public MapMessage createMapMessage() throws JMSException {
        return new KafkaMapMessage();
    }

    @Override // javax.jms.Session
    public Message createMessage() throws JMSException {
        return new KafkaMessage(MessageType.Message);
    }

    @Override // javax.jms.Session
    public ObjectMessage createObjectMessage() throws JMSException {
        return new KafkaObjectMessage();
    }

    @Override // javax.jms.Session
    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
        ObjectMessage createObjectMessage = createObjectMessage();
        createObjectMessage.setObject(serializable);
        return createObjectMessage;
    }

    @Override // javax.jms.Session
    public StreamMessage createStreamMessage() throws JMSException {
        return new KafkaStreamMessage();
    }

    @Override // javax.jms.Session
    public TextMessage createTextMessage() throws JMSException {
        return new KafkaTextMessage();
    }

    @Override // javax.jms.Session
    public TextMessage createTextMessage(String str) throws JMSException {
        TextMessage createTextMessage = createTextMessage();
        createTextMessage.setText(str);
        return createTextMessage;
    }

    @Override // javax.jms.Session
    @Unsupported("Transactions are not supported by Kafka. This will always return false.")
    public boolean getTransacted() throws JMSException {
        return false;
    }

    @Override // javax.jms.Session
    public int getAcknowledgeMode() throws JMSException {
        return 1;
    }

    @Override // javax.jms.Session
    @Unsupported("Transactions are not supported by Kafka. This is a noop.")
    public void commit() throws JMSException {
    }

    @Override // javax.jms.Session
    @Unsupported("Transactions are not supported by Kafka. This is a noop.")
    public void rollback() throws JMSException {
    }

    @Override // javax.jms.Session
    public void close() throws JMSException {
        this.clientState.close(this);
    }

    @Override // javax.jms.Session
    @Unsupported("Transactions are not supported by Kafka")
    public void recover() throws JMSException {
    }

    @Override // javax.jms.Session
    @Unsupported("Session based MessageListeners are not supported.")
    public MessageListener getMessageListener() throws JMSException {
        return null;
    }

    @Override // javax.jms.Session
    @Unsupported("Session based MessageListeners are not supported.")
    public void setMessageListener(MessageListener messageListener) throws JMSException {
    }

    @Override // javax.jms.Session, java.lang.Runnable
    public void run() {
    }

    @Override // javax.jms.Session
    public MessageProducer createProducer(Destination destination) throws JMSException {
        KafkaMessageProducer kafkaMessageProducer = new KafkaMessageProducer(this.producerFactory.create(), this.clientID, JMSPreconditions.checkDestination(destination));
        this.clientState.register(this, kafkaMessageProducer);
        return kafkaMessageProducer;
    }

    @Override // javax.jms.Session
    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        return createConsumer(destination, null, false);
    }

    @Override // javax.jms.Session
    @Unsupported("Message selectors are not not supported. An exception will be thrown if one is set.")
    public MessageConsumer createConsumer(Destination destination, String str) throws JMSException {
        return createConsumer(destination, str, false);
    }

    @Override // javax.jms.Session
    @Unsupported("Message selectors are not not supported. An exception will be thrown if one is set.")
    public MessageConsumer createConsumer(Destination destination, String str, boolean z) throws JMSException {
        if (null != str) {
            throw new UnsupportedOperationException("messageSelectors are not implemented.");
        }
        KafkaMessageConsumer kafkaMessageConsumer = new KafkaMessageConsumer(this.jmsClientConfig, this.consumerFactory.create(), JMSPreconditions.checkDestination(destination), this.connection);
        this.clientState.register(this, kafkaMessageConsumer);
        return kafkaMessageConsumer;
    }

    @Override // javax.jms.Session
    public Queue createQueue(String str) throws JMSException {
        Preconditions.checkNotNull(str, "queueName cannot be null");
        return new KafkaDestination(str);
    }

    @Override // javax.jms.Session
    public Topic createTopic(String str) throws JMSException {
        Preconditions.checkNotNull(str, "topicName cannot be null");
        return new KafkaDestination(str);
    }

    @Override // javax.jms.Session
    @Unsupported("Durable Subscribers are not supported.")
    public TopicSubscriber createDurableSubscriber(Topic topic, String str) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    @Unsupported("Durable Subscribers are not supported.")
    public TopicSubscriber createDurableSubscriber(Topic topic, String str, String str2, boolean z) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        return createBrowser(queue, null);
    }

    @Override // javax.jms.Session
    @Unsupported("QueueBrowsers with message selectors are not supported.")
    public QueueBrowser createBrowser(Queue queue, String str) throws JMSException {
        if (!Strings.isNullOrEmpty(str)) {
            throw new UnsupportedOperationException("QueueBrowsers with message selectors are not supported.");
        }
        KafkaDestination checkDestination = JMSPreconditions.checkDestination(queue);
        Consumer create = this.consumerFactory.create();
        create.subscribe(Arrays.asList(checkDestination.topic));
        return new KafkaQueueBrowser(checkDestination, str, new KafkaMessageQueue(this.jmsClientConfig, create));
    }

    @Override // javax.jms.Session
    @Unsupported("Kafka does not have a concept of a temporary topic.")
    public TemporaryQueue createTemporaryQueue() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    @Unsupported("Kafka does not have a concept of a temporary topic.")
    public TemporaryTopic createTemporaryTopic() throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Override // javax.jms.Session
    @Unsupported("Durable Subscribers are not supported.")
    public void unsubscribe(String str) throws JMSException {
    }
}
