package io.confluent.controlcenter.healthcheck;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Multisets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.hash.Hashing;
import com.segment.analytics.Analytics;
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.confluent.C3Version;
import io.confluent.command.record.alert.CommandAlert;
import io.confluent.controlcenter.BootstrapClientSupplier;
import io.confluent.controlcenter.FeatureFlags;
import io.confluent.controlcenter.data.ClusterMetadataDao;
import io.confluent.controlcenter.kafka.AdminSupplier;
import io.confluent.controlcenter.license.LicenseModule;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.rest.res.KafkaClusterDisplay;
import io.confluent.controlcenter.rest.res.StatusMeasurement;
import io.confluent.controlcenter.util.TopicInfo;
import io.confluent.controlcenter.util.versions.KafkaVersionTracker;
import io.confluent.monitoring.common.Clock;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;

/* loaded from: input_file:io/confluent/controlcenter/healthcheck/HealthCheck.class */
public abstract class HealthCheck implements Runnable {
    Logger log;
    private static final int HEALTHCHECK_TIMEOUT = 15;
    static final String METRIC_GROUP = "healthcheck";
    private final BootstrapClientSupplier bootstrapClient;
    private final AdminSupplier<String> adminClientSupplier;
    private final ClusterMetadataDao clusterMetadataDao;
    private final Map<String, TopicInfo> healthCheckTopics;
    private final Map<String, Map<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement>> clusterStatus;
    private final Clock clock;
    private final String sessionId;
    private final String controlCenterId;
    private String bootstrapClusterId;
    private Node controller;
    private final KafkaVersionTracker kafkaVersionTracker;
    private final Analytics analytics;
    private final LicenseModule.LicenseHolder lincenseHolder;
    private final FeatureFlags flags;
    private final boolean phoneHomeEnabled;
    private volatile int misconfiguredTopicsGaugeValue;
    private volatile int missingTopicConfigurationsValue;
    private volatile int clusterOfflineGaugeValue;
    private volatile int logLocationPersistentGaugeValue;
    volatile int streamsStatusGaugeValue;
    private static final long METRICS_PERIOD = TimeUnit.HOURS.toMillis(1);
    private static final LoadingCache<String, String> clusterHash = CacheBuilder.newBuilder().maximumSize(1024).build(new CacheLoader<String, String>() { // from class: io.confluent.controlcenter.healthcheck.HealthCheck.1
        @Override // com.google.common.cache.CacheLoader
        public String load(String str) throws Exception {
            return Hashing.sha256().hashString(str, StandardCharsets.UTF_8).toString();
        }
    });
    private Multimap<Integer, Node> previousNodes = ImmutableMultimap.of();
    private long lastSubmit = 0;

    public HealthCheck(BootstrapClientSupplier bootstrapClientSupplier, AdminSupplier adminSupplier, ClusterMetadataDao clusterMetadataDao, Set<TopicInfo> set, Map<String, Map<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement>> map, Clock clock, Logger logger, boolean z, String str, String str2, KafkaVersionTracker kafkaVersionTracker, Analytics analytics, LicenseModule.LicenseHolder licenseHolder, FeatureFlags featureFlags, Metrics metrics) {
        this.phoneHomeEnabled = z;
        this.bootstrapClient = bootstrapClientSupplier;
        this.adminClientSupplier = adminSupplier;
        this.clusterMetadataDao = clusterMetadataDao;
        this.healthCheckTopics = Maps.uniqueIndex(set, new Function<TopicInfo, String>() { // from class: io.confluent.controlcenter.healthcheck.HealthCheck.2
            @Override // com.google.common.base.Function, java.util.function.Function
            public String apply(TopicInfo topicInfo) {
                return topicInfo.name;
            }
        });
        this.clusterStatus = map;
        this.clock = clock;
        this.log = logger;
        this.sessionId = str;
        this.controlCenterId = str2;
        this.kafkaVersionTracker = kafkaVersionTracker;
        this.analytics = analytics;
        this.lincenseHolder = licenseHolder;
        this.flags = featureFlags;
        if (z) {
            logger.info("CONTROL CENTER UI\n\nBy using Control Center, subject to any license you may have with Confluent, you agree to the Confluent Data Protection Agreement.  In particular, please note that the version check feature of Control Center is enabled.\n\nWith this enabled, this instance is configured to collect and report certain data (version information, time stamped session IDs, instance ID, instance uptime, license key for subscription customers, IP address, and other product data)  to Confluent, Inc. (\"Confluent\") or its parent, subsidiaries, affiliates or service providers every hour.  By proceeding with `confluent.support.metrics.enable=true`, you agree to all such collection, transfer and use of Version information by Confluent. You can turn the version check feature off by setting `confluent.support.metrics.enable=false` in the Control Center configuration and restarting Control Center.  See the Confluent Enterprise documentation for further information.\n");
        }
        createMetrics(metrics);
    }

    @VisibleForTesting
    final void setLogger(Logger logger) {
        this.log = logger;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            doHealthChecks();
        } catch (Throwable th) {
            this.log.warn("unable to perform health check", th);
        }
        try {
            sendMetrics();
        } catch (Throwable th2) {
            this.log.warn("unable to send metrics", th2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doHealthChecks() throws Throwable {
        checkBootstrapKafkaCluster();
        checkManagedKafkaClusters();
        checkTopics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createMetrics(Metrics metrics) {
        metrics.addMetric(metrics.metricName("misconfigured-topics", "healthcheck"), (metricConfig, j) -> {
            return this.misconfiguredTopicsGaugeValue;
        });
        metrics.addMetric(metrics.metricName("missing-topic-configurations", "healthcheck"), (metricConfig2, j2) -> {
            return this.missingTopicConfigurationsValue;
        });
        metrics.addMetric(metrics.metricName("broker-log-persistent-dir", "healthcheck"), (metricConfig3, j3) -> {
            return this.logLocationPersistentGaugeValue;
        });
        metrics.addMetric(metrics.metricName("cluster-offline", "healthcheck"), (metricConfig4, j4) -> {
            return this.clusterOfflineGaugeValue;
        });
    }

    final void checkBootstrapKafkaCluster() throws Throwable {
        Admin admin;
        Throwable th;
        this.log.debug("attempting to describe bootstrap cluster");
        boolean z = false;
        try {
            admin = this.bootstrapClient.get();
            th = null;
        } catch (Exception e) {
            if (e.getCause() instanceof TimeoutException) {
                this.log.warn("timeout trying to connect to kafka cluster");
            }
            updateClusterStatus(this.bootstrapClusterId, CommandAlert.StatusValue.OFFLINE);
            z = true;
        }
        try {
            try {
                DescribeClusterResult describeCluster = admin.describeCluster();
                updateBootstrapClusterId(describeCluster.clusterId().get(15L, TimeUnit.SECONDS));
                updatedNodes(FluentIterable.from(describeCluster.nodes().get()).index(new Function<Node, Integer>() { // from class: io.confluent.controlcenter.healthcheck.HealthCheck.3
                    @Override // com.google.common.base.Function, java.util.function.Function
                    public Integer apply(Node node) {
                        return Integer.valueOf(node.id());
                    }
                }));
                updateController(describeCluster.controller().get());
                checkBrokerConfigs();
                updateClusterStatus(this.bootstrapClusterId, CommandAlert.StatusValue.ONLINE);
                if (admin != null) {
                    if (0 != 0) {
                        try {
                            admin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        admin.close();
                    }
                }
                this.clusterOfflineGaugeValue = z ? 1 : 0;
            } finally {
            }
        } finally {
        }
    }

    final void checkManagedKafkaClusters() {
        if (!this.clusterMetadataDao.ready()) {
            this.log.debug("Waiting for ClusterMetadataDao to be ready");
            return;
        }
        for (KafkaClusterDisplay kafkaClusterDisplay : this.clusterMetadataDao.getKafkaClustersForManagement().clusters) {
            this.log.debug("attempting to describe cluster:{}", kafkaClusterDisplay.displayName);
            try {
                Admin client = this.adminClientSupplier.getClient(kafkaClusterDisplay.clusterId);
                Throwable th = null;
                try {
                    try {
                        String str = client.describeCluster().clusterId().get(15L, TimeUnit.SECONDS);
                        if (str == null || !str.equals(kafkaClusterDisplay.clusterId)) {
                            this.log.warn("current clusterId={} does not match initial={} for cluster={}", str, kafkaClusterDisplay.clusterId, kafkaClusterDisplay.displayName);
                        }
                        updateClusterStatus(kafkaClusterDisplay.clusterId, CommandAlert.StatusValue.ONLINE);
                        if (client != null) {
                            if (0 != 0) {
                                try {
                                    client.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                client.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                        break;
                    }
                } catch (Throwable th4) {
                    if (client != null) {
                        if (th != null) {
                            try {
                                client.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            client.close();
                        }
                    }
                    throw th4;
                    break;
                }
            } catch (Exception e) {
                if (e.getCause() instanceof TimeoutException) {
                    this.log.warn("timeout trying to connect to kafka cluster");
                }
                updateClusterStatus(kafkaClusterDisplay.clusterId, CommandAlert.StatusValue.OFFLINE);
            }
        }
    }

    private void sendMetrics() throws Throwable {
        long currentTimeMillis = this.clock.currentTimeMillis();
        if (!this.phoneHomeEnabled || this.lastSubmit >= currentTimeMillis - METRICS_PERIOD) {
            return;
        }
        if (this.analytics != null && this.lastSubmit > 0) {
            this.analytics.enqueue(IdentifyMessage.builder().userId(this.sessionId).traits(ImmutableMap.builder().put("controlCenterInstance", this.controlCenterId).put("clusterId", clusterHash.get(this.bootstrapClusterId)).put("audience", this.lincenseHolder.currentLicense.audienceString()).put("c3Session", this.sessionId).put("c3Version", C3Version.getVersionString()).putAll(this.flags.asMap()).build()));
            for (String str : this.kafkaVersionTracker.clusters()) {
                try {
                    this.analytics.enqueue(TrackMessage.builder("Kafka Cluster Info").userId(this.sessionId).properties(ImmutableMap.builder().put("clusterId", clusterHash.get(str)).put("brokers", this.kafkaVersionTracker.brokerCountForCluster(str)).putAll(this.kafkaVersionTracker.versionCountsForCluster(str)).build()));
                } catch (Exception e) {
                    this.log.info("failed to report for cluster={}", str);
                }
            }
        }
        this.lastSubmit = currentTimeMillis;
    }

    final void checkTopics() throws Throwable {
        try {
            Admin admin = this.bootstrapClient.get();
            Throwable th = null;
            try {
                final Set<String> set = admin.listTopics().names().get(15L, TimeUnit.SECONDS);
                this.log.trace("extantTopics={}", set);
                ImmutableList list = FluentIterable.from(this.healthCheckTopics.values()).filter(new Predicate<TopicInfo>() { // from class: io.confluent.controlcenter.healthcheck.HealthCheck.5
                    @Override // com.google.common.base.Predicate
                    public boolean apply(TopicInfo topicInfo) {
                        if (set.contains(topicInfo.name)) {
                            return true;
                        }
                        HealthCheck.this.log.trace("not checking topic={} because it hasn't been created", topicInfo);
                        return false;
                    }
                }).transform(new Function<TopicInfo, ConfigResource>() { // from class: io.confluent.controlcenter.healthcheck.HealthCheck.4
                    @Override // com.google.common.base.Function, java.util.function.Function
                    public ConfigResource apply(TopicInfo topicInfo) {
                        return new ConfigResource(ConfigResource.Type.TOPIC, topicInfo.name);
                    }
                }).toList();
                Map<ConfigResource, Config> map = admin.describeConfigs(list).all().get(15L, TimeUnit.SECONDS);
                this.log.trace("retrieved topicConfigs={}", map);
                int i = 0;
                int i2 = 0;
                UnmodifiableIterator<ConfigResource> it = list.iterator();
                while (it.hasNext()) {
                    ConfigResource next = it.next();
                    if (map.containsKey(next)) {
                        Config config = map.get(next);
                        TopicInfo topicInfo = this.healthCheckTopics.get(next.name());
                        for (ConfigEntry configEntry : topicInfo.config.entries()) {
                            ConfigEntry configEntry2 = config.get(configEntry.name());
                            String value = configEntry2 == null ? "unset" : configEntry2.value();
                            if (!value.equals(configEntry.value())) {
                                if (topicInfo.validateConfig) {
                                    this.log.warn("misconfigured topic={} config={} value={} expected={}", next.name(), configEntry.name(), value, configEntry.value());
                                } else {
                                    this.log.info("misconfigured topic={} config={} value={} expected={}", next.name(), configEntry.name(), value, configEntry.value());
                                }
                                i++;
                            }
                        }
                    } else {
                        i2++;
                        this.log.info("missing config information for topic={}", next.name());
                    }
                }
                this.misconfiguredTopicsGaugeValue = i;
                this.missingTopicConfigurationsValue = i2;
                if (admin != null) {
                    if (0 != 0) {
                        try {
                            admin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        admin.close();
                    }
                }
            } finally {
            }
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof UnsupportedVersionException)) {
                throw e.getCause();
            }
            this.log.info(e.getLocalizedMessage());
        }
    }

    /* JADX WARN: Finally extract failed */
    private void checkBrokerConfigs() throws Throwable {
        try {
            Admin admin = this.bootstrapClient.get();
            Throwable th = null;
            try {
                for (Map.Entry<ConfigResource, Config> entry : admin.describeConfigs(Collections2.transform(this.previousNodes.keySet(), new Function<Integer, ConfigResource>() { // from class: io.confluent.controlcenter.healthcheck.HealthCheck.6
                    @Override // com.google.common.base.Function, java.util.function.Function
                    public ConfigResource apply(Integer num) {
                        return new ConfigResource(ConfigResource.Type.BROKER, num.toString());
                    }
                })).all().get(15L, TimeUnit.SECONDS).entrySet()) {
                    String name = entry.getKey().name();
                    Config value = entry.getValue();
                    ConfigEntry configEntry = value.get(KafkaConfig.MetricReporterClassesProp());
                    if (configEntry == null || !configEntry.value().contains("ConfluentMetricsReporter")) {
                        this.log.warn("broker={} is not instrumented with ConfluentMetricsReporter", name);
                    }
                    validateLogDirs(name, value);
                }
                if (admin != null) {
                    if (0 != 0) {
                        try {
                            admin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        admin.close();
                    }
                }
            } catch (Throwable th3) {
                if (admin != null) {
                    if (0 != 0) {
                        try {
                            admin.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        admin.close();
                    }
                }
                throw th3;
            }
        } catch (ExecutionException e) {
            if (e.getCause() instanceof UnsupportedVersionException) {
                this.log.info(e.getLocalizedMessage());
            } else {
                if (!(e.getCause() instanceof TimeoutException)) {
                    throw e.getCause();
                }
                this.log.warn("timeout trying to connect to kafka cluster");
            }
        }
    }

    final void validateLogDirs(String str, Config config) {
        ConfigEntry configEntry = config.get(KafkaConfig.LogDirsProp());
        List<String> emptyList = Collections.emptyList();
        if (configEntry == null || configEntry.isDefault()) {
            ConfigEntry configEntry2 = config.get(KafkaConfig.LogDirProp());
            if (configEntry2 != null) {
                emptyList = ImmutableList.of(configEntry2.value());
            } else {
                this.log.info("broker={} unable to determine log directory", str);
            }
        } else {
            emptyList = ImmutableList.copyOf(configEntry.value().split(","));
        }
        boolean z = true;
        for (String str2 : emptyList) {
            if (str2.startsWith("/tmp")) {
                this.log.error("broker={} is storing logs in {}, Kafka expects to store log data in a persistent location", str, str2);
                z = false;
            }
        }
        this.logLocationPersistentGaugeValue = z ? 1 : 0;
    }

    private void updateController(Node node) {
        if (this.controller == null || !this.controller.equals(node)) {
            this.log.info("new controller={}", node);
            this.controller = node;
        }
    }

    private void updatedNodes(Multimap<Integer, Node> multimap) {
        if (!this.previousNodes.keys().equals(multimap.keys())) {
            this.log.info("broker id set has changed new={} removed={}", Multimaps.filterKeys(multimap, Predicates.in(Multisets.difference(multimap.keys(), this.previousNodes.keys()))), Multimaps.filterKeys(this.previousNodes, Predicates.in(Multisets.difference(this.previousNodes.keys(), multimap.keys()))));
        }
        for (Integer num : multimap.keySet()) {
            Collection<Node> collection = multimap.get(num);
            if (collection.size() != 1) {
                this.log.warn("found count={} brokers with id={}", Integer.valueOf(collection.size()), num);
            }
        }
        this.previousNodes = multimap;
    }

    final void updateBootstrapClusterId(String str) {
        if (this.bootstrapClusterId == null) {
            this.log.info("current clusterId={}", str);
        } else if (!this.bootstrapClusterId.equals(str)) {
            this.log.error("current clusterId={} does not match initial={}", str, this.bootstrapClusterId);
            this.clusterStatus.remove(this.bootstrapClusterId);
        }
        this.bootstrapClusterId = str;
    }

    private void updateClusterStatus(String str, CommandAlert.StatusValue statusValue) {
        updateClusterMetric(str, Controlcenter.MetricMeasurement.newBuilder().setBrokerMetric(CommandAlert.BrokerTriggerMetricType.CLUSTER_STATUS).setStatusValue(statusValue).build());
    }

    private void updateClusterMetric(String str, Controlcenter.MetricMeasurement metricMeasurement) {
        if (str == null || str.isEmpty()) {
            this.log.warn("ClusterId is not defined, cannot record healthcheck status");
            return;
        }
        if (metricMeasurement == null) {
            return;
        }
        CommandAlert.BrokerTriggerMetricType brokerMetric = metricMeasurement.getBrokerMetric();
        Controlcenter.TriggerMeasurement build = Controlcenter.TriggerMeasurement.newBuilder().setClusterId(str).setComponentType(Controlcenter.ComponentType.BROKER_CLUSTER).setMetricMeasurement(metricMeasurement).setArrivalTime(this.clock.currentTimeMillis()).build();
        Map<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement> map = this.clusterStatus.get(str);
        if (map == null) {
            map = new ConcurrentHashMap();
            this.clusterStatus.put(str, map);
        }
        Controlcenter.TriggerMeasurement triggerMeasurement = map.get(brokerMetric);
        if (triggerMeasurement == null || !triggerMeasurement.getMetricMeasurement().equals(metricMeasurement)) {
            map.put(brokerMetric, build);
        }
    }

    public String getBootstrapClusterId() {
        return this.bootstrapClusterId;
    }

    public Map<String, StatusMeasurement> getAllClusterStatus() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement>> entry : this.clusterStatus.entrySet()) {
            Controlcenter.TriggerMeasurement triggerMeasurement = entry.getValue().get(CommandAlert.BrokerTriggerMetricType.CLUSTER_STATUS);
            if (triggerMeasurement != null) {
                hashMap.put(entry.getKey(), StatusMeasurement.fromTriggerMeasurement(triggerMeasurement));
            }
        }
        return hashMap;
    }

    @VisibleForTesting
    int getMisconfiguredTopicsGaugeValue() {
        return this.misconfiguredTopicsGaugeValue;
    }

    @VisibleForTesting
    int getMissingTopicConfigurationsValue() {
        return this.missingTopicConfigurationsValue;
    }

    @VisibleForTesting
    int getIsClusterOfflineGaugeValue() {
        return this.clusterOfflineGaugeValue;
    }

    @VisibleForTesting
    int getStreamsStatusGaugeValue() {
        return this.streamsStatusGaugeValue;
    }

    @VisibleForTesting
    int getLogLocationPersistentGaugeValue() {
        return this.logLocationPersistentGaugeValue;
    }
}
