package kafka.server.link;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import kafka.server.link.ClusterLinkScheduler;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import scala.$less$colon$less$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.Set;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ClusterLinkSyncTopicsConfigs.scala */
@ScalaSignature(bytes = "\u0006\u0005\u00055b\u0001B\t\u0013\u0001eA\u0001B\t\u0001\u0003\u0002\u0003\u0006Ia\t\u0005\tM\u0001\u0011\t\u0011)A\u0005O!A!\u0006\u0001BC\u0002\u0013\u00051\u0006\u0003\u00050\u0001\t\u0005\t\u0015!\u0003-\u0011%\u0001\u0004A!A!\u0002\u0013\tt\u0007\u0003\u0005:\u0001\t\u0005\t\u0015!\u0003;\u0011\u0015i\u0004\u0001\"\u0001?\u0011\u001d)\u0005A1A\u0005\n\u0019CaA\u001b\u0001!\u0002\u00139\u0005bB6\u0001\u0001\u0004%I\u0001\u001c\u0005\b[\u0002\u0001\r\u0011\"\u0003o\u0011\u0019!\b\u0001)Q\u0005c!)Q\u000f\u0001C)m\")!\u0010\u0001C\u0005w\"9\u00111\u0004\u0001\u0005\n\u0005u\u0001BBA\u0016\u0001\u0011\u0005aI\u0001\u000fDYV\u001cH/\u001a:MS:\\7+\u001f8d)>\u0004\u0018nY:D_:4\u0017nZ:\u000b\u0005M!\u0012\u0001\u00027j].T!!\u0006\f\u0002\rM,'O^3s\u0015\u00059\u0012!B6bM.\f7\u0001A\n\u0003\u0001i\u0001\"aG\u0010\u000f\u0005qiR\"\u0001\n\n\u0005y\u0011\u0012\u0001F\"mkN$XM\u001d'j].\u001c6\r[3ek2,'/\u0003\u0002!C\ta\u0001+\u001a:j_\u0012L7\rV1tW*\u0011aDE\u0001\u000eG2LWM\u001c;NC:\fw-\u001a:\u0011\u0005q!\u0013BA\u0013\u0013\u0005a\u0019E.^:uKJd\u0015N\\6DY&,g\u000e^'b]\u0006<WM]\u0001\u0010[\u0016$\u0018\rZ1uC6\u000bg.Y4feB\u0011A\u0004K\u0005\u0003SI\u0011!d\u00117vgR,'\u000fT5oW6+G/\u00193bi\u0006l\u0015M\\1hKJ\f\u0011\u0002Z3ti\u0006#W.\u001b8\u0016\u00031\u0002\"\u0001H\u0017\n\u00059\u0012\"a\u0007'pG\u0006d7\t\\;ti\u0016\u0014H*\u001b8l\u0003\u0012l\u0017N\\\"mS\u0016tG/\u0001\u0006eKN$\u0018\tZ7j]\u0002\nab]=oG&sG/\u001a:wC2l5\u000f\u0005\u00023k5\t1GC\u00015\u0003\u0015\u00198-\u00197b\u0013\t14GA\u0002J]RL!\u0001O\u0010\u0002#I,7o\u00195fIVdW\rR3mCfl5/A\u0004nKR\u0014\u0018nY:\u0011\u0005qY\u0014B\u0001\u001f\u0013\u0005I\u0019E.^:uKJd\u0015N\\6NKR\u0014\u0018nY:\u0002\rqJg.\u001b;?)\u0019y\u0004)\u0011\"D\tB\u0011A\u0004\u0001\u0005\u0006E\u001d\u0001\ra\t\u0005\u0006M\u001d\u0001\ra\n\u0005\u0006U\u001d\u0001\r\u0001\f\u0005\u0006a\u001d\u0001\r!\r\u0005\u0006s\u001d\u0001\rAO\u0001\bG>tg-[4t+\u00059\u0005\u0003\u0002%N\u001fjk\u0011!\u0013\u0006\u0003\u0015.\u000bq!\\;uC\ndWM\u0003\u0002Mg\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u00059K%aA'baB\u0011\u0001k\u0016\b\u0003#V\u0003\"AU\u001a\u000e\u0003MS!\u0001\u0016\r\u0002\rq\u0012xn\u001c;?\u0013\t16'\u0001\u0004Qe\u0016$WMZ\u0005\u00031f\u0013aa\u0015;sS:<'B\u0001,4!\r\u00114,X\u0005\u00039N\u0012aa\u00149uS>t\u0007C\u00010i\u001b\u0005y&B\u00011b\u0003\u0015\tG-\\5o\u0015\t\u00117-A\u0004dY&,g\u000e^:\u000b\u0005]!'BA3g\u0003\u0019\t\u0007/Y2iK*\tq-A\u0002pe\u001eL!![0\u0003\r\r{gNZ5h\u0003!\u0019wN\u001c4jON\u0004\u0013\u0001\u0005;bg.\u001cx*\u001e;ti\u0006tG-\u001b8h+\u0005\t\u0014\u0001\u0006;bg.\u001cx*\u001e;ti\u0006tG-\u001b8h?\u0012*\u0017\u000f\u0006\u0002peB\u0011!\u0007]\u0005\u0003cN\u0012A!\u00168ji\"91oCA\u0001\u0002\u0004\t\u0014a\u0001=%c\u0005\tB/Y:lg>+Ho\u001d;b]\u0012Lgn\u001a\u0011\u0002\u0007I,h\u000eF\u0001x!\t\u0011\u00040\u0003\u0002zg\t9!i\\8mK\u0006t\u0017A\u00075b]\u0012dW\rR3tGJL'-\u001a+pa&\u001c7i\u001c8gS\u001e\u001cHCA<}\u0011\u0015ih\u00021\u0001\u007f\u0003\u0019\u0011Xm];miB9q0!\u0001\u0002\u0004\u0005MQ\"A&\n\u00059[\u0005\u0003BA\u0003\u0003\u001fi!!a\u0002\u000b\t\u0005%\u00111B\u0001\u0007G>tg-[4\u000b\u0007\u000551-\u0001\u0004d_6lwN\\\u0005\u0005\u0003#\t9A\u0001\bD_:4\u0017n\u001a*fg>,(oY3\u0011\u000b\u0005U\u0011qC/\u000e\u0005\u0005-\u0011\u0002BA\r\u0003\u0017\u00111bS1gW\u00064U\u000f^;sK\u00069\u0002.\u00198eY\u0016\fE\u000e^3s)>\u0004\u0018nY\"p]\u001aLwm\u001d\u000b\u0006o\u0006}\u0011q\u0005\u0005\u0007{>\u0001\r!!\t\u0011\u0007y\u000b\u0019#C\u0002\u0002&}\u0013!#\u00117uKJ\u001cuN\u001c4jON\u0014Vm];mi\"1\u0011\u0011F\bA\u0002\u001d\u000ba\"\u001e9eCR,GmQ8oM&<7/\u0001\tdkJ\u0014XM\u001c;D_:4\u0017nZ'ba\u0002")
/* loaded from: input_file:kafka/server/link/ClusterLinkSyncTopicsConfigs.class */
public class ClusterLinkSyncTopicsConfigs extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final ClusterLinkMetadataManager metadataManager;
    private final LocalClusterLinkAdminClient destAdmin;
    private final ClusterLinkMetrics metrics;
    private final Map<String, Option<Config>> configs;
    private int tasksOutstanding;

    public LocalClusterLinkAdminClient destAdmin() {
        return this.destAdmin;
    }

    private Map<String, Option<Config>> configs() {
        return this.configs;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int i) {
        this.tasksOutstanding = i;
    }

    @Override // kafka.server.link.ClusterLinkScheduler.PeriodicTask
    public boolean run() {
        if (tasksOutstanding() != 0) {
            warn(() -> {
                return "Number of outstanding tasks was not 0 at the beginning of run. Resetting to 0 and continuing on.";
            });
            tasksOutstanding_$eq(0);
        }
        Set<String> topics = this.clientManager.getTopics();
        Set keySet = configs().keySet();
        keySet.diff(topics).foreach(str -> {
            return this.configs().remove(str);
        });
        topics.diff(keySet).foreach(str2 -> {
            return this.configs().put(str2, None$.MODULE$);
        });
        scala.collection.immutable.Set set = ((IterableOnceOps) configs().keys().map(str3 -> {
            return new ConfigResource(ConfigResource.Type.TOPIC, str3);
        })).toSet();
        if (set.nonEmpty()) {
            trace(() -> {
                return new StringBuilder(86).append("Attempting to retrieve topic configs from source cluster for following mirror topics: ").append(topics).toString();
            });
            DescribeConfigsResult describeConfigs = this.clientManager.getAdmin().describeConfigs(CollectionConverters$.MODULE$.SetHasAsJava(set).asJava());
            scheduleWhenComplete(describeConfigs.all(), (Function0<Object>) () -> {
                return this.handleDescribeTopicConfigs(CollectionConverters$.MODULE$.MapHasAsScala(describeConfigs.values()).asScala().toMap($less$colon$less$.MODULE$.refl()));
            });
            tasksOutstanding_$eq(tasksOutstanding() + 1);
        }
        return tasksOutstanding() == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean handleDescribeTopicConfigs(scala.collection.Map<ConfigResource, KafkaFuture<Config>> map) {
        tasksOutstanding_$eq(tasksOutstanding() - 1);
        HashMap hashMap = new HashMap();
        Map map2 = (Map) Map$.MODULE$.apply(Nil$.MODULE$);
        map.foreach(tuple2 -> {
            $anonfun$handleDescribeTopicConfigs$1(this, hashMap, map2, tuple2);
            return BoxedUnit.UNIT;
        });
        if (!hashMap.isEmpty()) {
            AlterConfigsResult incrementalAlterMirrorTopicConfigs = destAdmin().incrementalAlterMirrorTopicConfigs(hashMap);
            scheduleWhenComplete(incrementalAlterMirrorTopicConfigs.all(), (Function0<Object>) () -> {
                return this.handleAlterTopicConfigs(incrementalAlterMirrorTopicConfigs, map2);
            });
            tasksOutstanding_$eq(tasksOutstanding() + 1);
        }
        return tasksOutstanding() == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean handleAlterTopicConfigs(AlterConfigsResult alterConfigsResult, Map<String, Option<Config>> map) {
        tasksOutstanding_$eq(tasksOutstanding() - 1);
        CollectionConverters$.MODULE$.SetHasAsScala(alterConfigsResult.values().entrySet()).asScala().foreach(entry -> {
            $anonfun$handleAlterTopicConfigs$1(this, map, entry);
            return BoxedUnit.UNIT;
        });
        return tasksOutstanding() == 0;
    }

    public Map<String, Option<Config>> currentConfigMap() {
        return configs();
    }

    public static final /* synthetic */ boolean $anonfun$handleDescribeTopicConfigs$3(Config config, Config config2) {
        return !config2.equals(config);
    }

    public static final /* synthetic */ boolean $anonfun$handleDescribeTopicConfigs$8(HashSet hashSet, Tuple2 tuple2) {
        return hashSet.add(new AlterConfigOp(new ConfigEntry((String) tuple2._1(), (String) tuple2._2()), AlterConfigOp.OpType.SET));
    }

    public static final /* synthetic */ void $anonfun$handleDescribeTopicConfigs$1(ClusterLinkSyncTopicsConfigs clusterLinkSyncTopicsConfigs, HashMap hashMap, Map map, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ConfigResource configResource = (ConfigResource) tuple2._1();
        KafkaFuture kafkaFuture = (KafkaFuture) tuple2._2();
        String name = configResource.name();
        clusterLinkSyncTopicsConfigs.configs().get(name).foreach(option -> {
            try {
                Config config = (Config) kafkaFuture.get();
                if (!option.forall(config2 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$handleDescribeTopicConfigs$3(config, config2));
                })) {
                    return BoxedUnit.UNIT;
                }
                clusterLinkSyncTopicsConfigs.trace(() -> {
                    return new StringBuilder(42).append("Topic configuration for source topic ").append(name).append(" is: ").append(config).toString();
                });
                clusterLinkSyncTopicsConfigs.debug(() -> {
                    return new StringBuilder(72).append("Detected new remote configuration for mirror topic '").append(name).append("' on cluster link '").append(clusterLinkSyncTopicsConfigs.clientManager.linkData().linkName()).append("'").toString();
                });
                MirrorTopicConfigSyncRules mirrorTopicConfigSyncRules = clusterLinkSyncTopicsConfigs.clientManager.topicConfigSyncRules();
                Properties topicConfig = clusterLinkSyncTopicsConfigs.metadataManager.getTopicConfig(name);
                Properties restrictValidateTopicConfigPolicy = ClusterLinkUtils$.MODULE$.restrictValidateTopicConfigPolicy(name, new MirrorTopicConfigsDelegate(topicConfig, config, mirrorTopicConfigSyncRules).updateMirrorProps(name), clusterLinkSyncTopicsConfigs.clientManager.alterConfigPolicy());
                if (restrictValidateTopicConfigPolicy == null) {
                    if (topicConfig != null) {
                        clusterLinkSyncTopicsConfigs.debug(() -> {
                            return new StringBuilder(67).append("Updating local configuration for mirror topic '").append(name).append("' on cluster link '").append(clusterLinkSyncTopicsConfigs.clientManager.linkData().linkName()).append("'").toString();
                        });
                        scala.collection.mutable.Set diff = CollectionConverters$.MODULE$.SetHasAsScala(restrictValidateTopicConfigPolicy.entrySet()).asScala().diff(CollectionConverters$.MODULE$.SetHasAsScala(topicConfig.entrySet()).asScala());
                        clusterLinkSyncTopicsConfigs.trace(() -> {
                            return new StringBuilder(44).append("Adding configs ").append(diff).append(" for topic ").append(name).append(" on target cluster").toString();
                        });
                        HashSet hashSet = new HashSet();
                        CollectionConverters$.MODULE$.PropertiesHasAsScala(restrictValidateTopicConfigPolicy).asScala().foreach(tuple22 -> {
                            return BoxesRunTime.boxToBoolean($anonfun$handleDescribeTopicConfigs$8(hashSet, tuple22));
                        });
                        hashMap.put(new ConfigResource(ConfigResource.Type.TOPIC, name), hashSet);
                        return map.put(name, new Some(config));
                    }
                    return (!((Option) clusterLinkSyncTopicsConfigs.configs().apply(name)).isEmpty() || hashMap.containsKey(new ConfigResource(ConfigResource.Type.TOPIC, name))) ? BoxedUnit.UNIT : clusterLinkSyncTopicsConfigs.configs().put(name, new Some(config));
                }
            } catch (PolicyViolationException e) {
                clusterLinkSyncTopicsConfigs.warn(() -> {
                    return new StringBuilder(0).append(new StringBuilder(74).append("Could not update mirror topic '").append(name).append("' configuration due to policy violation on ").toString()).append(new StringBuilder(14).append("cluster link ").append(clusterLinkSyncTopicsConfigs.clientManager.linkData().linkName()).append("'").toString()).toString();
                }, () -> {
                    return e;
                });
                clusterLinkSyncTopicsConfigs.metrics.topicConfigUpdateFailedSensor().record();
                return BoxedUnit.UNIT;
            } catch (Throwable th) {
                clusterLinkSyncTopicsConfigs.warn(() -> {
                    return new StringBuilder(0).append(new StringBuilder(76).append("Error encountered while processing remote configuration for mirror topic '").append(name).append("' ").toString()).append(new StringBuilder(17).append("on cluster link ").append(clusterLinkSyncTopicsConfigs.clientManager.linkData().linkName()).append("'").toString()).toString();
                }, () -> {
                    return th;
                });
                clusterLinkSyncTopicsConfigs.metrics.topicConfigUpdateFailedSensor().record();
                return BoxedUnit.UNIT;
            }
        });
    }

    public static final /* synthetic */ void $anonfun$handleAlterTopicConfigs$1(ClusterLinkSyncTopicsConfigs clusterLinkSyncTopicsConfigs, Map map, Map.Entry entry) {
        String name = ((ConfigResource) entry.getKey()).name();
        try {
            ((KafkaFuture) entry.getValue()).get();
            clusterLinkSyncTopicsConfigs.configs().put(name, map.apply(name));
            clusterLinkSyncTopicsConfigs.metrics.topicConfigUpdateSensor().record();
            clusterLinkSyncTopicsConfigs.trace(() -> {
                return new StringBuilder(52).append("Successfully updated configuration for mirror topic ").append(name).toString();
            });
        } catch (Throwable th) {
            clusterLinkSyncTopicsConfigs.warn(() -> {
                return new StringBuilder(65).append("Error encountered while altering configuration on mirror topic ").append(name).append(": ").append(th).toString();
            });
            clusterLinkSyncTopicsConfigs.metrics.topicConfigUpdateFailedSensor().record();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ClusterLinkSyncTopicsConfigs(ClusterLinkClientManager clusterLinkClientManager, ClusterLinkMetadataManager clusterLinkMetadataManager, LocalClusterLinkAdminClient localClusterLinkAdminClient, int i, ClusterLinkMetrics clusterLinkMetrics) {
        super(clusterLinkClientManager.scheduler(), "ClusterLinkSyncTopicsConfigs", i);
        this.clientManager = clusterLinkClientManager;
        this.metadataManager = clusterLinkMetadataManager;
        this.destAdmin = localClusterLinkAdminClient;
        this.metrics = clusterLinkMetrics;
        this.configs = (scala.collection.mutable.Map) Map$.MODULE$.apply(Nil$.MODULE$);
        this.tasksOutstanding = 0;
    }
}
