package io.confluent.kafka.schemaregistry.storage;

import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/storage/KafkaStore.class */
public class KafkaStore<K, V> implements Store<K, V> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaStore.class);
    private final String topic;
    private final int desiredReplicationFactor;
    private final String groupId;
    private final StoreUpdateHandler<K, V> storeUpdateHandler;
    private final Serializer<K, V> serializer;
    private final Store<K, V> localStore;
    private final int initTimeout;
    private final int timeout;
    private final String bootstrapBrokers;
    private KafkaProducer<byte[], byte[]> producer;
    private KafkaStoreReaderThread<K, V> kafkaTopicReader;
    private final K noopKey;
    private final SchemaRegistryConfig config;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final CountDownLatch initLatch = new CountDownLatch(1);
    private volatile long lastWrittenOffset = -1;
    private final Lock lock = new ReentrantLock();

    public KafkaStore(SchemaRegistryConfig schemaRegistryConfig, StoreUpdateHandler<K, V> storeUpdateHandler, Serializer<K, V> serializer, Store<K, V> store, K k) throws SchemaRegistryException {
        this.topic = schemaRegistryConfig.getString(SchemaRegistryConfig.KAFKASTORE_TOPIC_CONFIG);
        this.desiredReplicationFactor = schemaRegistryConfig.getInt(SchemaRegistryConfig.KAFKASTORE_TOPIC_REPLICATION_FACTOR_CONFIG).intValue();
        this.groupId = schemaRegistryConfig.getString(SchemaRegistryConfig.KAFKASTORE_GROUP_ID_CONFIG).isEmpty() ? String.format("schema-registry-%s-%d", schemaRegistryConfig.getString(SchemaRegistryConfig.HOST_NAME_CONFIG), Integer.valueOf(KafkaSchemaRegistry.getSchemeAndPortForIdentity(schemaRegistryConfig.getInt("port").intValue(), schemaRegistryConfig.getList("listeners"), schemaRegistryConfig.interInstanceProtocol()).port)) : schemaRegistryConfig.getString(SchemaRegistryConfig.KAFKASTORE_GROUP_ID_CONFIG);
        this.initTimeout = schemaRegistryConfig.getInt(SchemaRegistryConfig.KAFKASTORE_INIT_TIMEOUT_CONFIG).intValue();
        this.timeout = schemaRegistryConfig.getInt(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG).intValue();
        this.storeUpdateHandler = storeUpdateHandler;
        this.serializer = serializer;
        this.localStore = store;
        this.noopKey = k;
        this.config = schemaRegistryConfig;
        this.bootstrapBrokers = schemaRegistryConfig.bootstrapBrokers();
        log.info("Initializing KafkaStore with broker endpoints: " + this.bootstrapBrokers);
    }

    @Override // io.confluent.kafka.schemaregistry.storage.Store
    public void init() throws StoreInitializationException {
        if (this.initialized.get()) {
            throw new StoreInitializationException("Illegal state while initializing store. Store was already initialized");
        }
        this.localStore.init();
        createOrVerifySchemaTopic();
        Properties properties = new Properties();
        addSchemaRegistryConfigsToClientProperties(this.config, properties);
        properties.put("bootstrap.servers", this.bootstrapBrokers);
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        properties.put("retries", 0);
        this.producer = new KafkaProducer<>(properties);
        this.kafkaTopicReader = new KafkaStoreReaderThread<>(this.bootstrapBrokers, this.topic, this.groupId, this.storeUpdateHandler, this.serializer, this.localStore, this.producer, this.noopKey, this.initialized, this.config);
        this.kafkaTopicReader.start();
        try {
            waitUntilKafkaReaderReachesLastOffset(this.initTimeout);
            if (!this.initialized.compareAndSet(false, true)) {
                throw new StoreInitializationException("Illegal state while initializing store. Store was already initialized");
            }
            this.initLatch.countDown();
        } catch (StoreException e) {
            throw new StoreInitializationException(e);
        }
    }

    public static void addSchemaRegistryConfigsToClientProperties(SchemaRegistryConfig schemaRegistryConfig, Properties properties) {
        properties.putAll(schemaRegistryConfig.originalsWithPrefix("kafkastore."));
    }

    /* JADX WARN: Failed to calculate best type for var: r7v2 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r8v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException
     */
    /* JADX WARN: Not initialized variable reg: 7, insn: 0x007f: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:26:0x007f */
    /* JADX WARN: Not initialized variable reg: 8, insn: 0x0083: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:28:0x0083 */
    /* JADX WARN: Type inference failed for: r7v2, types: [org.apache.kafka.clients.admin.AdminClient] */
    /* JADX WARN: Type inference failed for: r8v0, types: [java.lang.Throwable] */
    private void createOrVerifySchemaTopic() throws StoreInitializationException {
        ?? r7;
        ?? r8;
        Properties properties = new Properties();
        addSchemaRegistryConfigsToClientProperties(this.config, properties);
        properties.put("bootstrap.servers", this.bootstrapBrokers);
        try {
            try {
                AdminClient create = AdminClient.create(properties);
                Throwable th = null;
                if (create.listTopics().names().get(this.initTimeout, TimeUnit.MILLISECONDS).contains(this.topic)) {
                    verifySchemaTopic(create);
                } else {
                    createSchemaTopic(create);
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } catch (Throwable th3) {
                if (r7 != 0) {
                    if (r8 != 0) {
                        try {
                            r7.close();
                        } catch (Throwable th4) {
                            r8.addSuppressed(th4);
                        }
                    } else {
                        r7.close();
                    }
                }
                throw th3;
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new StoreInitializationException("Failed trying to create or validate schema topic configuration", e);
        } catch (TimeoutException e2) {
            throw new StoreInitializationException("Timed out trying to create or validate schema topic configuration", e2);
        }
    }

    private void createSchemaTopic(AdminClient adminClient) throws StoreInitializationException, InterruptedException, ExecutionException, TimeoutException {
        log.info("Creating schemas topic {}", this.topic);
        int size = adminClient.describeCluster().nodes().get(this.initTimeout, TimeUnit.MILLISECONDS).size();
        if (size <= 0) {
            throw new StoreInitializationException("No live Kafka brokers");
        }
        int min = Math.min(size, this.desiredReplicationFactor);
        if (min < this.desiredReplicationFactor) {
            log.warn("Creating the schema topic " + this.topic + " using a replication factor of " + min + ", which is less than the desired one of " + this.desiredReplicationFactor + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
        }
        NewTopic newTopic = new NewTopic(this.topic, 1, (short) min);
        HashMap hashMap = new HashMap(this.config.originalsWithPrefix("kafkastore.topic.config."));
        hashMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
        newTopic.configs(hashMap);
        try {
            adminClient.createTopics(Collections.singleton(newTopic)).all().get(this.initTimeout, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw e;
            }
            verifySchemaTopic(adminClient);
        }
    }

    private void verifySchemaTopic(AdminClient adminClient) throws StoreInitializationException, InterruptedException, ExecutionException, TimeoutException {
        log.info("Validating schemas topic {}", this.topic);
        TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(this.topic)).all().get(this.initTimeout, TimeUnit.MILLISECONDS).get(this.topic);
        int size = topicDescription.partitions().size();
        if (size != 1) {
            throw new StoreInitializationException("The schema topic " + this.topic + " should have only 1 partition but has " + size);
        }
        if (topicDescription.partitions().get(0).replicas().size() < this.desiredReplicationFactor) {
            log.warn("The replication factor of the schema topic " + this.topic + " is less than the desired one of " + this.desiredReplicationFactor + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
        }
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.topic);
        String value = adminClient.describeConfigs(Collections.singleton(configResource)).all().get(this.initTimeout, TimeUnit.MILLISECONDS).get(configResource).get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
        if (value == null || !TopicConfig.CLEANUP_POLICY_COMPACT.equals(value)) {
            log.error("The retention policy of the schema topic " + this.topic + " is incorrect. You must configure the topic to 'compact' cleanup policy to avoid Kafka deleting your schemas after a week. Refer to Kafka documentation for more details on cleanup policies");
            throw new StoreInitializationException("The retention policy of the schema topic " + this.topic + " is incorrect. Expected cleanup.policy to be 'compact' but it is " + value);
        }
    }

    public void waitUntilKafkaReaderReachesLastOffset(int i) throws StoreException {
        waitUntilKafkaReaderReachesOffset(getLatestOffset(i), i);
    }

    public void waitUntilKafkaReaderReachesLastOffset(String str, int i) throws StoreException {
        long lastOffset = lastOffset(str);
        if (lastOffset == -1) {
            lastOffset = getLatestOffset(i);
        }
        waitUntilKafkaReaderReachesOffset(lastOffset, i);
    }

    private void waitUntilKafkaReaderReachesOffset(long j, int i) throws StoreException {
        log.info("Wait to catch up until the offset at {}", Long.valueOf(j));
        this.kafkaTopicReader.waitUntilOffset(j, i, TimeUnit.MILLISECONDS);
        log.info("Reached offset at {}", Long.valueOf(j));
    }

    public void markLastWrittenOffsetInvalid() {
        this.lastWrittenOffset = -1L;
    }

    @Override // io.confluent.kafka.schemaregistry.storage.Store
    public V get(K k) throws StoreException {
        return this.localStore.get(k);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.confluent.kafka.schemaregistry.storage.Store
    public V put(K k, V v) throws StoreTimeoutException, StoreException {
        assertInitialized();
        if (k == 0) {
            throw new StoreException("Key should not be null");
        }
        V v2 = get(k);
        try {
            ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(this.topic, 0, this.serializer.serializeKey(k), v == null ? null : this.serializer.serializeValue(v));
            boolean z = false;
            try {
                try {
                    try {
                        try {
                            log.trace("Sending record to KafkaStore topic: " + producerRecord);
                            RecordMetadata recordMetadata = this.producer.send(producerRecord).get(this.timeout, TimeUnit.MILLISECONDS);
                            log.trace("Waiting for the local store to catch up to offset " + recordMetadata.offset());
                            this.lastWrittenOffset = recordMetadata.offset();
                            if (k instanceof SubjectKey) {
                                setLastOffset(((SubjectKey) k).getSubject(), recordMetadata.offset());
                            }
                            waitUntilKafkaReaderReachesOffset(recordMetadata.offset(), this.timeout);
                            z = true;
                            if (1 == 0) {
                                markLastWrittenOffsetInvalid();
                            }
                            return v2;
                        } catch (TimeoutException e) {
                            throw new StoreTimeoutException("Put operation timed out while waiting for an ack from Kafka", e);
                        }
                    } catch (ExecutionException e2) {
                        throw new StoreException("Put operation failed while waiting for an ack from Kafka", e2);
                    }
                } catch (InterruptedException e3) {
                    throw new StoreException("Put operation interrupted while waiting for an ack from Kafka", e3);
                } catch (KafkaException e4) {
                    throw new StoreException("Put operation to Kafka failed", e4);
                }
            } catch (Throwable th) {
                if (!z) {
                    markLastWrittenOffsetInvalid();
                }
                throw th;
            }
        } catch (SerializationException e5) {
            throw new StoreException("Error serializing schema while creating the Kafka produce record", e5);
        }
    }

    @Override // io.confluent.kafka.schemaregistry.storage.Store
    public CloseableIterator<V> getAll(K k, K k2) throws StoreException {
        assertInitialized();
        return this.localStore.getAll(k, k2);
    }

    @Override // io.confluent.kafka.schemaregistry.storage.Store
    public void putAll(Map<K, V> map) throws StoreException {
        assertInitialized();
        for (Map.Entry<K, V> entry : map.entrySet()) {
            put(entry.getKey(), entry.getValue());
        }
    }

    @Override // io.confluent.kafka.schemaregistry.storage.Store
    public V delete(K k) throws StoreException {
        assertInitialized();
        return put(k, null);
    }

    @Override // io.confluent.kafka.schemaregistry.storage.Store
    public CloseableIterator<K> getAllKeys() throws StoreException {
        assertInitialized();
        return this.localStore.getAllKeys();
    }

    @Override // io.confluent.kafka.schemaregistry.storage.Store
    public void flush() throws StoreException {
        this.localStore.flush();
    }

    @Override // io.confluent.kafka.schemaregistry.storage.Store
    public void close() {
        try {
            if (this.kafkaTopicReader != null) {
                this.kafkaTopicReader.shutdown();
            }
            if (this.producer != null) {
                this.producer.close();
                log.info("Kafka store producer shut down");
            }
            this.localStore.close();
            if (this.storeUpdateHandler != null) {
                this.storeUpdateHandler.close();
            }
            log.info("Kafka store shut down complete");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void waitForInit() throws InterruptedException {
        if (this.initLatch.getCount() > 0) {
            this.initLatch.await();
        }
    }

    public boolean initialized() {
        return this.initialized.get();
    }

    KafkaStoreReaderThread<K, V> getKafkaStoreReaderThread() {
        return this.kafkaTopicReader;
    }

    private void assertInitialized() throws StoreException {
        if (!this.initialized.get()) {
            throw new StoreException("Illegal state. Store not initialized yet");
        }
    }

    private long getLatestOffset(int i) throws StoreException {
        if (this.lastWrittenOffset >= 0) {
            return this.lastWrittenOffset;
        }
        try {
            ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(this.topic, 0, this.serializer.serializeKey(this.noopKey), null);
            try {
                log.trace("Sending Noop record to KafkaStore to find last offset.");
                this.lastWrittenOffset = this.producer.send(producerRecord).get(i, TimeUnit.MILLISECONDS).offset();
                log.trace("Noop record's offset is " + this.lastWrittenOffset);
                return this.lastWrittenOffset;
            } catch (Exception e) {
                throw new StoreException("Failed to write Noop record to kafka store.", e);
            }
        } catch (SerializationException e2) {
            throw new StoreException("Failed to serialize noop key.", e2);
        }
    }

    public long lastOffset(String str) {
        return this.lastWrittenOffset;
    }

    public void setLastOffset(String str, long j) {
        this.lastWrittenOffset = j;
    }

    public Lock leaderLock() {
        return this.lock;
    }

    public Lock lockFor(String str) {
        return this.lock;
    }
}
