package kafka.catalog.event;

import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.protobuf.events.catalog.v1.TopicMetadata;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import kafka.catalog.CatalogTopicConfig;
import kafka.catalog.MetadataEventUtils;
import kafka.catalog.ZKMetadataCollector;
import kafka.catalog.ZKMetadataCollectorContext;
import kafka.restore.configmap.ConfigmapUtil;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.LogConfig;

/* loaded from: input_file:kafka/catalog/event/BrokerDefaultConfigChangeEvent.class */
public class BrokerDefaultConfigChangeEvent extends MetadataCollectorEvent {
    private static final String LOG_RETENTION_MS_PROP = KafkaConfig.LogRetentionTimeMillisProp();
    private static final String LOG_RETENTION_BYTES_PROP = KafkaConfig.LogRetentionBytesProp();
    private static final String CLEANUP_POLICY_PROP = KafkaConfig.LogCleanupPolicyProp();
    private final KafkaConfig oldConfig;
    private final KafkaConfig newConfig;

    public BrokerDefaultConfigChangeEvent(ZKMetadataCollector zKMetadataCollector, KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2, Time time) {
        super(zKMetadataCollector, time);
        this.oldConfig = kafkaConfig;
        this.newConfig = kafkaConfig2;
    }

    public void run() throws Exception {
        ZKMetadataCollectorContext context = context();
        if (!hasCatalogInterestedChange(this.oldConfig, this.newConfig)) {
            LOG.debug("No catalog interested config to propagate.");
            return;
        }
        Timestamp fromMillis = Timestamps.fromMillis(this.eventObservedTimeMillis);
        for (String str : context.localStore().logicalClusters()) {
            for (String str2 : context.localStore().topics(str)) {
                try {
                    TopicMetadata.Builder updateTime = TopicMetadata.newBuilder().mergeFrom(context.localStore().topicMetadataEvent(str2).getTopicMetadata()).setUpdateTime(fromMillis);
                    if (propagateBrokerConfigChange(context, this.oldConfig, this.newConfig, str2, updateTime)) {
                        LOG.info("In BrokerDefaultConfigChangeEvent, propagating new KafkaConfig: {} for topic: '{}'", this.newConfig, str2);
                        context.localStore().addTopicMetadataEvent(str, str2, MetadataEvent.newBuilder().setTopicMetadata(updateTime.build()).build());
                    }
                } catch (Exception e) {
                    LOG.error("Skip propagating new KafkaConfig: {} to topic '{}' due to", new Object[]{this.newConfig, str2, e});
                    context.catalogMetrics().collectorEventHandleErrorSensor.record();
                }
            }
        }
    }

    public static boolean propagateBrokerConfigChange(ZKMetadataCollectorContext zKMetadataCollectorContext, KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2, String str, TopicMetadata.Builder builder) {
        Set<String> set = zKMetadataCollectorContext.localStore().topicConfigOverrides(str);
        LogConfig logConfig = new LogConfig(zKMetadataCollectorContext.originalConfig().extractLogConfigMap(), Collections.emptySet());
        boolean z = false;
        Object obj = kafkaConfig2.get(LOG_RETENTION_MS_PROP);
        if (!Objects.equals(kafkaConfig.get(LOG_RETENTION_MS_PROP), obj) && !set.contains(ConfigmapUtil.RETENTION_MS)) {
            builder.setRetentionMs(Long.valueOf(obj == null ? logConfig.retentionMs : ((Long) obj).longValue()).longValue());
            z = true;
        }
        Object obj2 = kafkaConfig2.get(LOG_RETENTION_BYTES_PROP);
        if (!Objects.equals(kafkaConfig.get(LOG_RETENTION_BYTES_PROP), obj2) && !set.contains("retention.bytes")) {
            builder.setRetentionBytes(Long.valueOf(obj2 == null ? logConfig.retentionSize : ((Long) obj2).longValue()).longValue());
            z = true;
        }
        Object obj3 = kafkaConfig2.get(CLEANUP_POLICY_PROP);
        if (!Objects.equals(kafkaConfig.get(CLEANUP_POLICY_PROP), obj3) && !set.contains(ConfigmapUtil.CLEANUP_POLICY)) {
            builder.setCleanupPolicy(MetadataEventUtils.cleanupPolicyFromLogConfig(obj3 == null ? logConfig : new LogConfig(kafkaConfig2.extractLogConfigMap())));
            z = true;
        }
        return z;
    }

    public static boolean hasCatalogInterestedChange(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        for (String str : CatalogTopicConfig.BROKER_DEFAULT_CONFIGS_TO_PROPAGATE) {
            if (!Objects.equals(kafkaConfig.get(str), kafkaConfig2.get(str))) {
                return true;
            }
        }
        return false;
    }

    public String toString() {
        return "BrokerDefaultConfigChangeEvent(oldConfig=" + this.oldConfig + ", newConfig=" + this.newConfig + ')';
    }
}
