package io.confluent.telemetry.reporter;

import io.confluent.shaded.com.google.common.annotations.VisibleForTesting;
import io.confluent.shaded.com.google.common.base.Preconditions;
import io.confluent.shaded.com.google.common.collect.ImmutableList;
import io.confluent.shaded.com.google.common.collect.ImmutableMap;
import io.confluent.shaded.com.google.common.collect.Maps;
import io.confluent.shaded.com.google.common.collect.Sets;
import io.confluent.shaded.io.confluent.telemetry.client.TelemetryHttpClient;
import io.confluent.shaded.io.confluent.telemetry.events.EventEmitterConfig;
import io.confluent.shaded.io.confluent.telemetry.events.EventEmitterImpl;
import io.confluent.shaded.io.confluent.telemetry.events.EventLogger;
import io.confluent.shaded.io.confluent.telemetry.events.EventLoggerConfig;
import io.confluent.shaded.io.confluent.telemetry.events.EventLoggerFactory;
import io.confluent.shaded.io.confluent.telemetry.events.exporter.http.EventHttpExporter;
import io.confluent.shaded.io.opencensus.proto.resource.v1.Resource;
import io.confluent.telemetry.BrokerConfigUtils;
import io.confluent.telemetry.ConfluentTelemetryConfig;
import io.confluent.telemetry.Context;
import io.confluent.telemetry.MetricsCollectorTask;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.api.events.EventEmitter;
import io.confluent.telemetry.api.events.EventEmitterProvider;
import io.confluent.telemetry.api.events.NoOpEventEmitter;
import io.confluent.telemetry.collector.ConfluentMetricNamingConvention;
import io.confluent.telemetry.collector.KafkaMetricsCollector;
import io.confluent.telemetry.collector.MetricsCollector;
import io.confluent.telemetry.config.NamedFilterSet;
import io.confluent.telemetry.config.remote.RemoteConfigConfiguration;
import io.confluent.telemetry.config.remote.RemoteConfiguration;
import io.confluent.telemetry.config.remote.polling.HttpRemoteConfigurationSource;
import io.confluent.telemetry.emitter.Emitter;
import io.confluent.telemetry.emitter.TelemetryEmitter;
import io.confluent.telemetry.exporter.Exporter;
import io.confluent.telemetry.exporter.ExporterConfig;
import io.confluent.telemetry.exporter.http.HttpExporter;
import io.confluent.telemetry.exporter.http.HttpExporterConfig;
import io.confluent.telemetry.exporter.kafka.KafkaExporter;
import io.confluent.telemetry.exporter.kafka.KafkaExporterConfig;
import io.confluent.telemetry.provider.KafkaServerProvider;
import io.confluent.telemetry.provider.Provider;
import io.confluent.telemetry.provider.ProviderRegistry;
import io.confluent.telemetry.provider.Utils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/telemetry/reporter/TelemetryReporter.class */
public class TelemetryReporter implements MetricsReporter, ClusterResourceListener, EventEmitterProvider {
    private static final Logger log = LoggerFactory.getLogger(TelemetryReporter.class);
    public static final String SELF_METRICS_DOMAIN = "io.confluent.telemetry";
    public static final String SELF_METRICS_NAMESPACE = "confluent.telemetry";
    private Map<String, Object> rawOriginalConfig;
    private ConfluentTelemetryConfig originalConfig;
    private ConfluentTelemetryConfig config;
    private volatile Context ctx;
    private MetricsCollectorTask collectorTask;
    private final Map<String, Exporter> exporters;
    private final List<MetricsCollector> collectors;
    private volatile KafkaMetricsCollector kafkaMetricsCollector;
    private volatile TelemetryEmitter emitter;
    private Provider activeProvider;
    private HttpRemoteConfigurationSource remoteConfigSource;
    private final EventLoggerFactory eventLoggerFactory;

    @VisibleForTesting
    volatile EventLogger configEventLogger;
    private volatile Optional<EventEmitterImpl> eventEmitterOpt;
    private Metrics selfMetrics;
    private NamedFilterSet metricFilters;
    private Set<String> activeFilters;

    public TelemetryReporter() {
        this(map -> {
            EventLogger eventLogger = new EventLogger();
            eventLogger.configure(map);
            return eventLogger;
        });
    }

    @VisibleForTesting
    public TelemetryReporter(EventLoggerFactory eventLoggerFactory) {
        this.exporters = new ConcurrentHashMap();
        this.collectors = new CopyOnWriteArrayList();
        this.activeFilters = Collections.singleton(ConfluentTelemetryConfig.DEFAULT_NAMED_FILTER_NAME);
        this.eventLoggerFactory = eventLoggerFactory;
    }

    public synchronized void configure(Map<String, ?> map) {
        this.rawOriginalConfig = map;
        this.eventEmitterOpt = createEventEmitter(map);
    }

    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
        createConfiguration(map, false);
    }

    public synchronized void reconfigure(Map<String, ?> map) {
        Preconditions.checkState((this.config == null || this.emitter == null) ? false : true, "contextChange() was not called before reconfigure()");
        ConfluentTelemetryConfig confluentTelemetryConfig = this.config;
        this.config = createConfiguration(map, true);
        this.metricFilters = this.config.getDefaultFilterSet().union(this.metricFilters);
        reconfigureEventLogger(confluentTelemetryConfig, this.config);
        Event configEvent = Utils.configEvent(this.config.originals(), this.activeProvider.configInclude(), this.ctx.getResource(), this.activeProvider, this.activeProvider.domain() + "/config/dynamic");
        if (this.activeProvider != null) {
            if (this.configEventLogger != null) {
                this.configEventLogger.log(configEvent);
            }
            this.eventEmitterOpt.ifPresent(eventEmitterImpl -> {
                eventEmitterImpl.emit(configEvent);
            });
        }
        reconfigureRemoteConfigSource(this.config);
        reconfigureExporters(confluentTelemetryConfig, this.config);
        reconfigureEventEmitter(map);
        updateEmitterPredicate();
    }

    private ConfluentTelemetryConfig createConfiguration(Map<String, ?> map, boolean z) {
        Map<String, ?> reconcileConfigs = ConfluentTelemetryConfig.reconcileConfigs(map);
        HashMap newHashMap = Maps.newHashMap(this.originalConfig.originals());
        newHashMap.putAll(onlyReconfigurables(reconcileConfigs));
        return new ConfluentTelemetryConfig(newHashMap, z);
    }

    private void initExporters(Map<String, ExporterConfig> map) {
        Exporter httpExporter;
        for (Map.Entry<String, ExporterConfig> entry : map.entrySet()) {
            ExporterConfig value = entry.getValue();
            String key = entry.getKey();
            log.info("Creating {} exporter named '{}'", value.getType().name(), key);
            if (value instanceof KafkaExporterConfig) {
                httpExporter = KafkaExporter.newBuilder((KafkaExporterConfig) value).setName(key).build();
            } else {
                if (!(value instanceof HttpExporterConfig)) {
                    throw new IllegalStateException("Unexpected exporter config: " + value.getClass());
                }
                httpExporter = new HttpExporter(key, (HttpExporterConfig) value);
            }
            Exporter exporter = httpExporter;
            configureExporterPredicate(value, exporter);
            exporter.setMetricsRegistry(this.selfMetrics);
            this.exporters.put(key, exporter);
        }
    }

    private void configureExporterPredicate(ExporterConfig exporterConfig, Exporter exporter) {
        exporter.reconfigurePredicate(exporterConfig.buildMetricsPredicate().orElse(this.metricFilters.subset(this.activeFilters)));
    }

    private void updateExporters(Map<String, ExporterConfig> map) {
        for (Map.Entry<String, ExporterConfig> entry : map.entrySet()) {
            Exporter exporter = this.exporters.get(entry.getKey());
            ExporterConfig value = entry.getValue();
            if (exporter instanceof HttpExporter) {
                ((HttpExporter) exporter).setDynamicFields((HttpExporterConfig) value);
            }
            configureExporterPredicate(value, exporter);
        }
    }

    private void closeExporters(Map<String, ExporterConfig> map) {
        for (Map.Entry<String, ExporterConfig> entry : map.entrySet()) {
            log.info("Closing {} exporter named '{}'", entry.getValue().getType().name(), entry.getKey());
            try {
                this.exporters.remove(entry.getKey()).close();
            } catch (Exception e) {
                log.warn("exception closing {} exporter named '{}'", new Object[]{entry.getValue().getType(), entry.getKey(), e});
            }
        }
    }

    private void reconfigureExporters(ConfluentTelemetryConfig confluentTelemetryConfig, ConfluentTelemetryConfig confluentTelemetryConfig2) {
        Set<String> keySet = confluentTelemetryConfig.enabledExporters().keySet();
        Set<String> keySet2 = confluentTelemetryConfig2.enabledExporters().keySet();
        closeExporters(confluentTelemetryConfig2.allExportersWithNames(Sets.difference(keySet, keySet2)));
        updateExporters(confluentTelemetryConfig2.allExportersWithNames(Sets.intersection(keySet, keySet2)));
        initExporters(confluentTelemetryConfig2.allExportersWithNames(Sets.difference(keySet2, keySet)));
    }

    private void reconfigureRemoteConfigSource(ConfluentTelemetryConfig confluentTelemetryConfig) {
        if (confluentTelemetryConfig.getRemoteConfigConfiguration().isEnabled()) {
            HttpExporterConfig httpExporterConfig = confluentTelemetryConfig.allHttpExporters().get(ConfluentTelemetryConfig.EXPORTER_CONFLUENT_NAME);
            if (httpExporterConfig == null) {
                throw new ConfigException("Exporter '_confluent' config to use the HttpRemoteConfigurationSource.");
            }
            this.remoteConfigSource.setProxyConfig(httpExporterConfig.getProxyConfig());
            this.remoteConfigSource.setCredentials(httpExporterConfig.getCredentials());
        }
    }

    public Set<String> reconfigurableConfigs() {
        if (this.config == null) {
            throw new IllegalStateException("contextChange() was not called before reconfigurableConfigs()");
        }
        HashSet hashSet = new HashSet(ConfluentTelemetryConfig.RECONFIGURABLES);
        for (String str : this.config.allExporters().keySet()) {
            hashSet.addAll((Collection) ExporterConfig.RECONFIGURABLES.stream().map(str2 -> {
                return ConfluentTelemetryConfig.exporterPrefixForName(str) + str2;
            }).collect(Collectors.toSet()));
        }
        for (String str3 : this.config.allHttpExporters().keySet()) {
            hashSet.addAll((Collection) HttpExporterConfig.RECONFIGURABLE_CONFIGS.stream().map(str4 -> {
                return ConfluentTelemetryConfig.exporterPrefixForName(str3) + str4;
            }).collect(Collectors.toSet()));
        }
        return hashSet;
    }

    public synchronized void contextChange(MetricsContext metricsContext) {
        Preconditions.checkState(this.rawOriginalConfig != null, "configure() was not called before contextChange()");
        log.debug("metricsContext {}", metricsContext.contextLabels());
        if (!metricsContext.contextLabels().containsKey("_namespace")) {
            log.error("_namespace not found in metrics context. Metrics collection is disabled");
            return;
        }
        this.collectors.forEach((v0) -> {
            v0.stop();
        });
        this.activeProvider = ProviderRegistry.getProvider((String) metricsContext.contextLabels().get("_namespace"));
        if (this.activeProvider == null) {
            log.error("No provider was detected for context {}. Available providers {}.", metricsContext.contextLabels(), ProviderRegistry.providers.keySet());
            return;
        }
        log.debug("provider {} is selected.", this.activeProvider.getClass().getCanonicalName());
        if (!this.activeProvider.validate(metricsContext, this.rawOriginalConfig)) {
            log.warn("Validation failed for {} context {}", this.activeProvider.getClass(), metricsContext.contextLabels());
            return;
        }
        if (this.collectorTask == null) {
            this.activeProvider.configure(this.rawOriginalConfig);
        }
        this.activeProvider.contextChange(metricsContext);
        if (this.collectorTask == null) {
            initConfig();
            initContext();
            initRemoteConfig();
            initSelfMetrics();
            initCollectors();
            initExporters(this.config.enabledExporters());
            createEmitter();
            updateEmitterPredicate();
            startMetricCollectorTask();
        }
        Event configEvent = Utils.configEvent(this.config.originals(), this.activeProvider.configInclude(), this.ctx.getResource(), this.activeProvider, this.activeProvider.domain() + "/config/static");
        this.eventEmitterOpt.ifPresent(eventEmitterImpl -> {
            eventEmitterImpl.setEventLabels(this.activeProvider.resource().getLabelsMap());
            eventEmitterImpl.emit(configEvent);
        });
        if (this.configEventLogger != null) {
            this.configEventLogger.log(configEvent);
        }
    }

    private void initConfig() {
        this.originalConfig = new ConfluentTelemetryConfig(injectProviderConfigs(this.activeProvider, this.rawOriginalConfig));
        maybeInitEventLogger(this.originalConfig);
        this.config = this.originalConfig;
        this.metricFilters = this.config.getDefaultFilterSet();
    }

    private void initContext() {
        Resource.Builder builder = this.activeProvider.resource().toBuilder();
        Map<String, String> labels = this.config.getLabels();
        builder.getClass();
        labels.forEach(builder::putLabels);
        this.ctx = new Context(builder.build(), this.activeProvider.domain(), this.config.getBoolean(ConfluentTelemetryConfig.DEBUG_ENABLED).booleanValue());
    }

    private Map<String, Object> toEventLoggerConfig(ConfluentTelemetryConfig confluentTelemetryConfig) {
        ExporterConfig exporterConfig = confluentTelemetryConfig.allExporters().get(ConfluentTelemetryConfig.EXPORTER_CONFLUENT_NAME);
        if (exporterConfig == null) {
            throw new ConfigException("Expected exporter '_confluent' to exist but it does not.");
        }
        Map<String, Object> originals = exporterConfig.originals();
        originals.put(EventLoggerConfig.EVENT_EXPORTER_CLASS_CONFIG, EventHttpExporter.class.getCanonicalName());
        return originals;
    }

    private void maybeInitEventLogger(ConfluentTelemetryConfig confluentTelemetryConfig) {
        if (confluentTelemetryConfig.getBoolean(ConfluentTelemetryConfig.CONFIG_EVENTS_ENABLE_CONFIG).booleanValue()) {
            initEventLogger(confluentTelemetryConfig);
        }
    }

    private void reconfigureEventLogger(ConfluentTelemetryConfig confluentTelemetryConfig, ConfluentTelemetryConfig confluentTelemetryConfig2) {
        boolean booleanValue = confluentTelemetryConfig.getBoolean(ConfluentTelemetryConfig.CONFIG_EVENTS_ENABLE_CONFIG).booleanValue();
        if (!confluentTelemetryConfig2.getBoolean(ConfluentTelemetryConfig.CONFIG_EVENTS_ENABLE_CONFIG).booleanValue()) {
            closeEventLogger();
        } else if (booleanValue) {
            reconfigureEventLogger(this.config);
        } else {
            initEventLogger(this.config);
        }
    }

    private void initEventLogger(ConfluentTelemetryConfig confluentTelemetryConfig) {
        if (this.configEventLogger != null) {
            log.warn("Trying to initialize the event logger but it's already initialized! Will not initialize another one...");
        } else {
            log.info("Initializing the event logger");
            this.configEventLogger = this.eventLoggerFactory.create(toEventLoggerConfig(confluentTelemetryConfig));
        }
    }

    private void reconfigureEventLogger(ConfluentTelemetryConfig confluentTelemetryConfig) {
        if (this.configEventLogger == null) {
            log.warn("Trying to reconfigure the event logger but it's not initialized!");
        } else {
            this.configEventLogger.reconfigure(toEventLoggerConfig(confluentTelemetryConfig));
        }
    }

    private void closeEventLogger() {
        if (this.configEventLogger == null) {
            return;
        }
        log.info("Closing the event logger");
        try {
            this.configEventLogger.close();
            this.configEventLogger = null;
        } catch (Exception e) {
            log.warn("Exception closing event logger", e);
        }
    }

    private void startMetricCollectorTask() {
        long longValue = this.config.getLong(ConfluentTelemetryConfig.COLLECT_INTERVAL_CONFIG).longValue();
        log.info("Starting Confluent telemetry reporter with an interval of {} ms for resource = (type = {})", Long.valueOf(longValue), this.ctx.getResource().getType());
        log.debug("Telemetry reporter resource labels: {}", this.ctx.getResource().getLabelsMap());
        this.collectorTask = new MetricsCollectorTask(this.collectors, longValue, this.emitter);
        this.collectors.forEach((v0) -> {
            v0.start();
        });
        this.collectorTask.start();
    }

    private void initSelfMetrics() {
        final KafkaMetricsCollector kafkaMetricsCollector = new KafkaMetricsCollector(ConfluentMetricNamingConvention.forKafkaMetrics(SELF_METRICS_DOMAIN, this.ctx.isDebugEnabled(), this.ctx.isDebugEnabled()));
        this.collectors.add(kafkaMetricsCollector);
        this.selfMetrics = new Metrics(new MetricConfig(), ImmutableList.of((MetricsReporter) new JmxReporter(), new MetricsReporter() { // from class: io.confluent.telemetry.reporter.TelemetryReporter.1
            public void init(List<KafkaMetric> list) {
                kafkaMetricsCollector.init(list);
            }

            public void metricChange(KafkaMetric kafkaMetric) {
                kafkaMetricsCollector.metricChange(kafkaMetric);
            }

            public void metricRemoval(KafkaMetric kafkaMetric) {
                kafkaMetricsCollector.metricRemoval(kafkaMetric);
            }

            public void close() {
            }

            public void configure(Map<String, ?> map) {
            }
        }), Time.SYSTEM, new KafkaMetricsContext(SELF_METRICS_NAMESPACE));
    }

    private void initCollectors() {
        this.kafkaMetricsCollector = new KafkaMetricsCollector(ConfluentMetricNamingConvention.forKafkaMetrics(this.ctx.getDomain(), this.ctx.isDebugEnabled(), this.ctx.isDebugEnabled()));
        this.collectors.add(this.kafkaMetricsCollector);
        this.collectors.addAll(this.activeProvider.extraCollectors(this.ctx));
    }

    @VisibleForTesting
    Map<String, Exporter> getExporters() {
        return this.exporters;
    }

    @VisibleForTesting
    public List<MetricsCollector> getCollectors() {
        return this.collectors;
    }

    public void close() {
        log.info("Stopping TelemetryReporter collectorTask");
        if (this.collectorTask != null) {
            this.collectorTask.close();
        }
        this.collectors.forEach((v0) -> {
            v0.stop();
        });
        closeEventLogger();
        for (Exporter exporter : this.exporters.values()) {
            try {
                exporter.close();
            } catch (Exception e) {
                log.error("Error while closing {}", exporter, e);
            }
        }
        closeEventEmitter();
        if (this.remoteConfigSource != null) {
            log.info("Stopping TelemetryReporter remoteConfigTask");
            this.remoteConfigSource.stop();
        }
    }

    public synchronized void onUpdate(ClusterResource clusterResource) {
    }

    public void init(List<KafkaMetric> list) {
        if (this.kafkaMetricsCollector != null) {
            this.kafkaMetricsCollector.init(list);
        }
    }

    private void initRemoteConfig() {
        if (this.remoteConfigSource != null) {
            return;
        }
        RemoteConfigConfiguration remoteConfigConfiguration = this.config.getRemoteConfigConfiguration();
        if (remoteConfigConfiguration.isEnabled()) {
            HttpExporterConfig httpExporterConfig = this.config.allHttpExporters().get(ConfluentTelemetryConfig.EXPORTER_CONFLUENT_NAME);
            if (httpExporterConfig == null) {
                throw new ConfigException("Exporter '_confluent' config to use the HttpRemoteConfigurationSource.");
            }
            this.remoteConfigSource = createHttpRemoteConfigurationSource(this.ctx.getResource(), remoteConfigConfiguration, httpExporterConfig);
            this.remoteConfigSource.setConfigurationChangeCallback(this::onRemoteConfigurationReceived);
            this.remoteConfigSource.start();
        }
    }

    private static HttpRemoteConfigurationSource createHttpRemoteConfigurationSource(Resource resource, RemoteConfigConfiguration remoteConfigConfiguration, HttpExporterConfig httpExporterConfig) {
        TelemetryHttpClient.Builder builder = new TelemetryHttpClient.Builder();
        httpExporterConfig.configureClientDefaults(builder);
        HttpRemoteConfigurationSource httpRemoteConfigurationSource = new HttpRemoteConfigurationSource(builder, remoteConfigConfiguration.getRefreshInterval().longValue(), resource);
        httpRemoteConfigurationSource.setProxyConfig(httpExporterConfig.getProxyConfig());
        httpRemoteConfigurationSource.setCredentials(httpExporterConfig.getCredentials());
        return httpRemoteConfigurationSource;
    }

    public void onRemoteConfigurationReceived(RemoteConfiguration remoteConfiguration) {
        NamedFilterSet filters = remoteConfiguration.getFilters();
        if (filters != null) {
            this.metricFilters = this.config.getDefaultFilterSet().union(filters);
            this.activeFilters = remoteConfiguration.getActiveFilters();
            updateExporters(this.config.enabledExporters());
            updateEmitterPredicate();
        }
    }

    public void metricChange(KafkaMetric kafkaMetric) {
        if (this.kafkaMetricsCollector != null) {
            this.kafkaMetricsCollector.metricChange(kafkaMetric);
        }
    }

    public void metricRemoval(KafkaMetric kafkaMetric) {
        if (this.kafkaMetricsCollector != null) {
            this.kafkaMetricsCollector.metricRemoval(kafkaMetric);
        }
    }

    private Map<String, ?> onlyReconfigurables(Map<String, ?> map) {
        return (Map) reconfigurableConfigs().stream().filter(str -> {
            return map.get(str) != null;
        }).collect(Collectors.toMap(str2 -> {
            return str2;
        }, str3 -> {
            return map.get(str3);
        }));
    }

    private void updateEmitterPredicate() {
        this.emitter.reconfigurePredicate((Predicate) getExporters().values().stream().map((v0) -> {
            return v0.getPredicate();
        }).reduce((v0, v1) -> {
            return v0.or(v1);
        }).orElse(keyed -> {
            return false;
        }));
    }

    private static Map<String, Object> prefixedExporterConfigs(String str, Map<String, Object> map) {
        String exporterPrefixForName = ConfluentTelemetryConfig.exporterPrefixForName(str);
        return (Map) map.entrySet().stream().filter(entry -> {
            return entry.getValue() != null;
        }).collect(Collectors.toMap(entry2 -> {
            return exporterPrefixForName + ((String) entry2.getKey());
        }, entry3 -> {
            return entry3.getValue();
        }));
    }

    protected static Map<String, Object> injectProviderConfigs(Provider provider, Map<String, Object> map) {
        return maybeInjectLocalExporter(provider, maybeInjectProviderDefaultIncludeConfig(provider, map));
    }

    private static Map<String, Object> maybeInjectProviderDefaultIncludeConfig(Provider provider, Map<String, Object> map) {
        return (map.containsKey(ConfluentTelemetryConfig.METRICS_INCLUDE_CONFIG) || map.containsKey(ConfluentTelemetryConfig.METRICS_INCLUDE_CONFIG_ALIAS)) ? map : updateMetricsInclude(map, provider.metricsIncludeRegexDefault());
    }

    private static Map<String, Object> updateMetricsInclude(Map<String, Object> map, List<String> list) {
        HashMap hashMap = new HashMap(map);
        hashMap.put(ConfluentTelemetryConfig.METRICS_INCLUDE_CONFIG, ConfluentTelemetryConfig.joinIncludeRegexList(ImmutableList.builder().add((ImmutableList.Builder) "io.confluent.telemetry/.*").addAll((Iterable) list).build()));
        return hashMap;
    }

    private static boolean isRunningInsideBroker(Provider provider, Map<String, Object> map) {
        if (!(provider instanceof KafkaServerProvider)) {
            return false;
        }
        String obj = map.getOrDefault(KafkaConfig.ProcessRolesProp(), "").toString();
        return !(obj.contains("controller") && !obj.contains("broker"));
    }

    private static Map<String, Object> maybeInjectLocalExporter(Provider provider, Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        if (isRunningInsideBroker(provider, map)) {
            hashMap.putAll(prefixedExporterConfigs(ConfluentTelemetryConfig.EXPORTER_LOCAL_NAME, ConfluentTelemetryConfig.EXPORTER_LOCAL_DEFAULTS));
            hashMap.putAll(prefixedExporterConfigs(ConfluentTelemetryConfig.EXPORTER_LOCAL_NAME, BrokerConfigUtils.deriveLocalProducerConfigs(map)));
            String balanceReplicationFactor = BrokerConfigUtils.getBalanceReplicationFactor(map);
            if (balanceReplicationFactor != null) {
                hashMap.putAll(prefixedExporterConfigs(ConfluentTelemetryConfig.EXPORTER_LOCAL_NAME, ImmutableMap.of("topic.replicas", balanceReplicationFactor)));
            }
        }
        hashMap.putAll(map);
        return hashMap;
    }

    @VisibleForTesting
    Context getContext() {
        return this.ctx;
    }

    public Emitter emitter() {
        if (this.emitter == null) {
            throw new IllegalStateException("emitter() was called before the Emitter was instantiated.");
        }
        return this.emitter;
    }

    private void createEmitter() {
        Context context = this.ctx;
        Map<String, Exporter> map = this.exporters;
        map.getClass();
        this.emitter = new TelemetryEmitter(context, map::values, this.selfMetrics);
    }

    @Override // io.confluent.telemetry.api.events.EventEmitterProvider
    public EventEmitter eventEmitter() {
        return this.eventEmitterOpt.isPresent() ? this.eventEmitterOpt.get() : NoOpEventEmitter.INSTANCE;
    }

    private void closeEventEmitter() {
        this.eventEmitterOpt.ifPresent(eventEmitterImpl -> {
            try {
                eventEmitterImpl.close();
            } catch (Exception e) {
                log.error("Error while closing {}", eventEmitterImpl, e);
            }
        });
    }

    private Optional<EventEmitterImpl> createEventEmitter(Map<String, ?> map) {
        Optional<EventEmitterImpl> empty = Optional.empty();
        if (maybeInitEventEmitter(map)) {
            empty = Optional.of(new EventEmitterImpl(map));
        }
        return empty;
    }

    private void reconfigureEventEmitter(Map<String, ?> map) {
        closeEventEmitter();
        this.eventEmitterOpt = createEventEmitter(map);
        this.eventEmitterOpt.ifPresent(eventEmitterImpl -> {
            eventEmitterImpl.setEventLabels(this.activeProvider.resource().getLabelsMap());
        });
    }

    private boolean maybeInitEventEmitter(Map<String, ?> map) {
        EventEmitterConfig eventEmitterConfig = new EventEmitterConfig(map);
        Map<String, Map<String, Object>> enabledExporterConfigs = eventEmitterConfig.getEnabledExporterConfigs(EventEmitterConfig.EventType.events);
        if (enabledExporterConfigs == null || enabledExporterConfigs.isEmpty()) {
            return false;
        }
        Iterator<Map.Entry<String, Map<String, Object>>> it = enabledExporterConfigs.entrySet().iterator();
        while (it.hasNext()) {
            if (eventEmitterConfig.isKafkaExporter(it.next().getValue())) {
                return true;
            }
        }
        return false;
    }
}
