package io.confluent.controlcenter.kafka;

import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.confluent.controlcenter.ReferenceCountingHolder;
import io.confluent.controlcenter.kafka.ClusterView;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;

/* loaded from: input_file:io/confluent/controlcenter/kafka/CachingConsumerSupplier.class */
public class CachingConsumerSupplier<K, V> implements ConsumerSupplier<K, V, String> {
    private static final int DEFAULT_MAX_CACHE_SIZE = 100;
    private static final long DEFAULT_CACHE_EXPIRATION_MS = TimeUnit.MINUTES.toMillis(15);
    private final Cache<String, ReferenceCountingHolder<Consumer<K, V>>> cache = CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(DEFAULT_CACHE_EXPIRATION_MS, TimeUnit.MILLISECONDS).removalListener(new RemovalListener<String, ReferenceCountingHolder<Consumer<K, V>>>() { // from class: io.confluent.controlcenter.kafka.CachingConsumerSupplier.1
        @Override // com.google.common.cache.RemovalListener
        public void onRemoval(RemovalNotification<String, ReferenceCountingHolder<Consumer<K, V>>> removalNotification) {
            removalNotification.getValue().close();
        }
    }).build();
    private final ConsumerSupplier<K, V, String> consumerSupplier;

    public CachingConsumerSupplier(ClusterView clusterView, ConsumerSupplier<K, V, String> consumerSupplier) {
        this.consumerSupplier = consumerSupplier;
        clusterView.registerClusterCallback(new ClusterView.ClusterCallback() { // from class: io.confluent.controlcenter.kafka.CachingConsumerSupplier.2
            @Override // io.confluent.controlcenter.kafka.ClusterView.ClusterCallback
            public void clusterUpdated(String str) {
                CachingConsumerSupplier.this.cache.invalidate(str);
            }

            @Override // io.confluent.controlcenter.kafka.ClusterView.ClusterCallback
            public void clusterAdded(String str) {
            }
        });
    }

    @Override // io.confluent.controlcenter.kafka.ConsumerSupplier
    public Consumer<K, V> getConsumer(final String str) {
        try {
            final ReferenceCountingHolder<Consumer<K, V>> referenceCountingHolder = this.cache.get(str, new Callable<ReferenceCountingHolder<Consumer<K, V>>>() { // from class: io.confluent.controlcenter.kafka.CachingConsumerSupplier.3
                @Override // java.util.concurrent.Callable
                public ReferenceCountingHolder<Consumer<K, V>> call() throws Exception {
                    return new ReferenceCountingHolder<>(CachingConsumerSupplier.this.consumerSupplier.getConsumer(str));
                }
            });
            referenceCountingHolder.increment();
            return new DelegatingConsumer<K, V>(referenceCountingHolder.get()) { // from class: io.confluent.controlcenter.kafka.CachingConsumerSupplier.4
                final AtomicBoolean closed = new AtomicBoolean(false);

                @Override // io.confluent.controlcenter.kafka.DelegatingConsumer, org.apache.kafka.clients.consumer.Consumer, java.io.Closeable, java.lang.AutoCloseable
                public void close() {
                    release();
                }

                @Override // io.confluent.controlcenter.kafka.DelegatingConsumer, org.apache.kafka.clients.consumer.Consumer
                public void close(long j, TimeUnit timeUnit) {
                    release();
                }

                private void release() {
                    if (this.closed.compareAndSet(false, true)) {
                        ((Consumer) referenceCountingHolder.get()).unsubscribe();
                        referenceCountingHolder.decrement();
                    }
                }
            };
        } catch (UncheckedExecutionException e) {
            Throwables.throwIfUnchecked(e.getCause());
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2.getCause());
        }
    }
}
