package io.confluent.controlcenter.kafka;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.confluent.controlcenter.data.ClusterMetadataDao;
import io.confluent.controlcenter.kafka.ClusterView;
import io.confluent.controlcenter.rest.res.KafkaCluster;
import io.confluent.controlcenter.util.ConfigUtils;
import io.confluent.controlcenter.util.RetryUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/kafka/ClusterManager.class */
public class ClusterManager implements AdminClientSupplier<String>, ClusterView {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClusterManager.class);
    private static final int CONFIG_RETRY_BACKOFF = 1000;
    private final AdminClientFactory clientFactory;
    private final ClusterMetadataDao clusterMetadataDao;
    private final ListeningScheduledExecutorService exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setThreadFactory(Executors.defaultThreadFactory()).setNameFormat("cluster-manager-%d").setDaemon(true).build()));
    private final Collection<ClusterView.ClusterCallback> callbacks = Collections.synchronizedList(Lists.newLinkedList());
    private final ConcurrentMap<String, String> configKey = Maps.newConcurrentMap();
    private final ConcurrentMap<String, Map<String, Object>> configurations = Maps.newConcurrentMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/controlcenter/kafka/ClusterManager$CallbackConsumer.class */
    public interface CallbackConsumer {
        void apply(ClusterView.ClusterCallback clusterCallback);
    }

    @Inject
    public ClusterManager(AdminClientFactory adminClientFactory, ClusterMetadataDao clusterMetadataDao) {
        this.clientFactory = adminClientFactory;
        this.clusterMetadataDao = clusterMetadataDao;
    }

    private void addConfig(String str, Map<String, Object> map) {
        Preconditions.checkNotNull(str, "Configuration name must not be null");
        Preconditions.checkNotNull(map, "Configuration must not be null");
        Preconditions.checkArgument(this.configurations.putIfAbsent(str, ImmutableMap.copyOf((Map) map)) == null, "Cluster configuration %s already exists", str);
    }

    public String register(String str, Map<String, Object> map) {
        addConfig(str, map);
        try {
            return resolveAndRegister(str).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("interrupted registering cluster {}", str);
            throw new RuntimeException("cluster registration interrupted", e);
        } catch (ExecutionException e2) {
            throw Throwables.propagate(e2.getCause());
        }
    }

    public ListenableFuture<String> addConfiguration(String str, Map<String, Object> map) {
        addConfig(str, map);
        return markForRegistration(str);
    }

    private ListenableFuture<String> markForRegistration(final String str) {
        log.debug("marked cluster configuration {} for registration", str);
        return RetryUtils.retryWithJitter(this.exec, new AsyncCallable<String>() { // from class: io.confluent.controlcenter.kafka.ClusterManager.1
            @Override // com.google.common.util.concurrent.AsyncCallable
            public ListenableFuture<String> call() {
                try {
                    ListenableFuture<String> resolveAndRegister = ClusterManager.this.resolveAndRegister(str);
                    Futures.addCallback(resolveAndRegister, new FutureCallback<String>() { // from class: io.confluent.controlcenter.kafka.ClusterManager.1.1
                        @Override // com.google.common.util.concurrent.FutureCallback
                        public void onSuccess(@Nullable String str2) {
                        }

                        @Override // com.google.common.util.concurrent.FutureCallback
                        public void onFailure(Throwable th) {
                            ClusterManager.log.warn("attempt to register cluster configuration {} failed, will retry", str, th);
                        }
                    });
                    return resolveAndRegister;
                } catch (Exception e) {
                    ClusterManager.log.warn("failed to register cluster configuration {}, please validate the configuration", str, e);
                    throw e;
                }
            }
        }, 1000);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<String> resolveAndRegister(final String str) {
        final Map<String, Object> map = this.configurations.get(str);
        Preconditions.checkNotNull(map, "Configuration {} does not exist");
        return Futures.transform(lookupClusterId(map), new Function<String, String>() { // from class: io.confluent.controlcenter.kafka.ClusterManager.2
            @Override // com.google.common.base.Function
            public String apply(String str2) {
                ClusterManager.this.registerAndUpdateMetadata(str, map, str2);
                return str2;
            }
        }, this.exec);
    }

    private ListenableFuture<String> lookupClusterId(Map<String, Object> map) {
        final AdminClient createClient = this.clientFactory.createClient(map);
        KafkaFuture<String> clusterId = createClient.describeCluster().clusterId();
        final SettableFuture create = SettableFuture.create();
        clusterId.whenComplete(new KafkaFuture.BiConsumer<String, Throwable>() { // from class: io.confluent.controlcenter.kafka.ClusterManager.3
            @Override // org.apache.kafka.common.KafkaFuture.BiConsumer
            public void accept(String str, Throwable th) {
                if (th != null) {
                    create.setException(th);
                } else {
                    create.set(str);
                }
                createClient.close();
            }
        });
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerAndUpdateMetadata(String str, Map<String, Object> map, String str2) {
        registerCluster(str, str2);
        this.clusterMetadataDao.asyncUpdateKafkaClusterPreservingName(str2, new KafkaCluster(str2, str, Collections.emptyList(), ConfigUtils.getList(map, "bootstrap.servers")));
    }

    private void registerCluster(String str, final String str2) {
        String put = this.configKey.put(str2, str);
        if (put != null) {
            log.info("cluster {} updated with configuration {} (was {})", str2, str, put);
            fireCallbacks(new CallbackConsumer() { // from class: io.confluent.controlcenter.kafka.ClusterManager.4
                @Override // io.confluent.controlcenter.kafka.ClusterManager.CallbackConsumer
                public void apply(ClusterView.ClusterCallback clusterCallback) {
                    clusterCallback.clusterUpdated(str2);
                }
            });
        } else {
            log.info("cluster {} registered with configuration {}", str2, str);
            fireCallbacks(new CallbackConsumer() { // from class: io.confluent.controlcenter.kafka.ClusterManager.5
                @Override // io.confluent.controlcenter.kafka.ClusterManager.CallbackConsumer
                public void apply(ClusterView.ClusterCallback clusterCallback) {
                    clusterCallback.clusterAdded(str2);
                }
            });
        }
    }

    private void fireCallbacks(CallbackConsumer callbackConsumer) {
        for (ClusterView.ClusterCallback clusterCallback : this.callbacks) {
            try {
                callbackConsumer.apply(clusterCallback);
            } catch (Exception e) {
                log.error("Error executing callback {}", clusterCallback, e);
            }
        }
    }

    @Override // io.confluent.controlcenter.kafka.AdminClientSupplier
    public AdminClient getClient(String str) {
        return this.clientFactory.createClient(getConfigs(str));
    }

    public Map<String, Object> getConfigs(String str) {
        String str2 = this.configKey.get(str);
        Preconditions.checkArgument(str2 != null, "Unknown cluster id %s", str);
        return this.configurations.get(str2);
    }

    @Override // io.confluent.controlcenter.kafka.ClusterView
    public void registerClusterCallback(ClusterView.ClusterCallback clusterCallback) {
        this.callbacks.add(clusterCallback);
    }

    public Map<String, Map<String, Object>> getAllClusterConfigsWithClusterId() {
        HashMap hashMap = new HashMap();
        for (String str : this.configKey.keySet()) {
            Map<String, Object> orDefault = this.configurations.getOrDefault(this.configKey.get(str), null);
            if (orDefault != null) {
                hashMap.put(str, orDefault);
            }
        }
        return hashMap;
    }
}
