package kafka.server.link;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import kafka.server.link.ClusterLinkScheduler;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import scala.$less$colon$less$;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.collection.Set;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ClusterLinkSyncTopicsConfigs.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005}a\u0001\u0002\b\u0010\u0001YA\u0001b\b\u0001\u0003\u0002\u0003\u0006I\u0001\t\u0005\tG\u0001\u0011\t\u0011)A\u0005I!Aq\u0005\u0001BC\u0002\u0013\u0005\u0001\u0006\u0003\u0005-\u0001\t\u0005\t\u0015!\u0003*\u0011%i\u0003A!A!\u0002\u0013qC\u0007\u0003\u00057\u0001\t\u0005\t\u0015!\u00038\u0011\u0015Q\u0004\u0001\"\u0001<\u0011\u001d\u0011\u0005\u00011A\u0005\n\rCq\u0001\u0012\u0001A\u0002\u0013%Q\t\u0003\u0004L\u0001\u0001\u0006KA\f\u0005\u0006\u0019\u0002!\t&\u0014\u0005\u0006#\u0002!IA\u0015\u0005\u0006i\u0002!I!\u001e\u0002\u001d\u00072,8\u000f^3s\u0019&t7nU=oGR{\u0007/[2t\u0007>tg-[4t\u0015\t\u0001\u0012#\u0001\u0003mS:\\'B\u0001\n\u0014\u0003\u0019\u0019XM\u001d<fe*\tA#A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u00019\u0002C\u0001\r\u001d\u001d\tI\"$D\u0001\u0010\u0013\tYr\"\u0001\u000bDYV\u001cH/\u001a:MS:\\7k\u00195fIVdWM]\u0005\u0003;y\u0011A\u0002U3sS>$\u0017n\u0019+bg.T!aG\b\u0002\u001b\rd\u0017.\u001a8u\u001b\u0006t\u0017mZ3s!\tI\u0012%\u0003\u0002#\u001f\tA2\t\\;ti\u0016\u0014H*\u001b8l\u00072LWM\u001c;NC:\fw-\u001a:\u0002\u001f5,G/\u00193bi\u0006l\u0015M\\1hKJ\u0004\"!G\u0013\n\u0005\u0019z!AG\"mkN$XM\u001d'j].lU\r^1eCR\fW*\u00198bO\u0016\u0014\u0018!\u00033fgR\fE-\\5o+\u0005I\u0003CA\r+\u0013\tYsBA\u000eM_\u000e\fGn\u00117vgR,'\u000fT5oW\u0006#W.\u001b8DY&,g\u000e^\u0001\u000bI\u0016\u001cH/\u00113nS:\u0004\u0013AD:z]\u000eLe\u000e^3sm\u0006dWj\u001d\t\u0003_Ij\u0011\u0001\r\u0006\u0002c\u0005)1oY1mC&\u00111\u0007\r\u0002\u0004\u0013:$\u0018BA\u001b\u001d\u0003E\u0011Xm]2iK\u0012,H.\u001a#fY\u0006LXj]\u0001\b[\u0016$(/[2t!\tI\u0002(\u0003\u0002:\u001f\t\u00112\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$(/[2t\u0003\u0019a\u0014N\\5u}Q1A(\u0010 @\u0001\u0006\u0003\"!\u0007\u0001\t\u000b}9\u0001\u0019\u0001\u0011\t\u000b\r:\u0001\u0019\u0001\u0013\t\u000b\u001d:\u0001\u0019A\u0015\t\u000b5:\u0001\u0019\u0001\u0018\t\u000bY:\u0001\u0019A\u001c\u0002!Q\f7o[:PkR\u001cH/\u00198eS:<W#\u0001\u0018\u0002)Q\f7o[:PkR\u001cH/\u00198eS:<w\fJ3r)\t1\u0015\n\u0005\u00020\u000f&\u0011\u0001\n\r\u0002\u0005+:LG\u000fC\u0004K\u0013\u0005\u0005\t\u0019\u0001\u0018\u0002\u0007a$\u0013'A\tuCN\\7oT;ugR\fg\u000eZ5oO\u0002\n1A];o)\u0005q\u0005CA\u0018P\u0013\t\u0001\u0006GA\u0004C_>dW-\u00198\u00025!\fg\u000e\u001a7f\t\u0016\u001c8M]5cKR{\u0007/[2D_:4\u0017nZ:\u0015\u00059\u001b\u0006\"\u0002+\r\u0001\u0004)\u0016A\u0002:fgVdG\u000f\u0005\u0003W3nCW\"A,\u000b\u0005a\u0003\u0014AC2pY2,7\r^5p]&\u0011!l\u0016\u0002\u0004\u001b\u0006\u0004\bC\u0001/g\u001b\u0005i&B\u00010`\u0003\u0019\u0019wN\u001c4jO*\u0011\u0001-Y\u0001\u0007G>lWn\u001c8\u000b\u0005Q\u0011'BA2e\u0003\u0019\t\u0007/Y2iK*\tQ-A\u0002pe\u001eL!aZ/\u0003\u001d\r{gNZ5h%\u0016\u001cx.\u001e:dKB\u0019\u0011N\u001b7\u000e\u0003}K!a[0\u0003\u0017-\u000bgm[1GkR,(/\u001a\t\u0003[Jl\u0011A\u001c\u0006\u0003_B\fQ!\u00193nS:T!!]1\u0002\u000f\rd\u0017.\u001a8ug&\u00111O\u001c\u0002\u0007\u0007>tg-[4\u0002/!\fg\u000e\u001a7f\u00032$XM\u001d+pa&\u001c7i\u001c8gS\u001e\u001cHc\u0001(wu\")A+\u0004a\u0001oB\u0011Q\u000e_\u0005\u0003s:\u0014!#\u00117uKJ\u001cuN\u001c4jON\u0014Vm];mi\")10\u0004a\u0001y\u0006qQ\u000f\u001d3bi\u0016$7i\u001c8gS\u001e\u001c\bcB?\u0002\u0002\u0005\r\u0011\u0011D\u0007\u0002}*\u0011qpV\u0001\b[V$\u0018M\u00197f\u0013\tQf\u0010\u0005\u0003\u0002\u0006\u0005Ma\u0002BA\u0004\u0003\u001f\u00012!!\u00031\u001b\t\tYAC\u0002\u0002\u000eU\ta\u0001\u0010:p_Rt\u0014bAA\ta\u00051\u0001K]3eK\u001aLA!!\u0006\u0002\u0018\t11\u000b\u001e:j]\u001eT1!!\u00051!\u0011y\u00131\u00047\n\u0007\u0005u\u0001G\u0001\u0004PaRLwN\u001c")
/* loaded from: input_file:kafka/server/link/ClusterLinkSyncTopicsConfigs.class */
public class ClusterLinkSyncTopicsConfigs extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final ClusterLinkMetadataManager metadataManager;
    private final LocalClusterLinkAdminClient destAdmin;
    private final ClusterLinkMetrics metrics;
    private int tasksOutstanding;

    public LocalClusterLinkAdminClient destAdmin() {
        return this.destAdmin;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int i) {
        this.tasksOutstanding = i;
    }

    @Override // kafka.server.link.ClusterLinkScheduler.PeriodicTask
    public boolean run() {
        if (tasksOutstanding() != 0) {
            warn(() -> {
                return "Number of outstanding tasks was not 0 at the beginning of run. Resetting to 0 and continuing on.";
            });
            tasksOutstanding_$eq(0);
        }
        Set<String> topics = this.clientManager.getTopics();
        scala.collection.immutable.Set set = ((IterableOnceOps) topics.map(str -> {
            return new ConfigResource(ConfigResource.Type.TOPIC, str);
        })).toSet();
        if (set.nonEmpty()) {
            trace(() -> {
                return new StringBuilder(86).append("Attempting to retrieve topic configs from source cluster for following mirror topics: ").append(topics).toString();
            });
            DescribeConfigsResult describeConfigs = this.clientManager.getAdmin().describeConfigs(CollectionConverters$.MODULE$.SetHasAsJava(set).asJava());
            scheduleWhenComplete(describeConfigs.all(), (Function0<Object>) () -> {
                return this.handleDescribeTopicConfigs(CollectionConverters$.MODULE$.MapHasAsScala(describeConfigs.values()).asScala().toMap($less$colon$less$.MODULE$.refl()));
            });
            tasksOutstanding_$eq(tasksOutstanding() + 1);
        }
        return tasksOutstanding() == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean handleDescribeTopicConfigs(Map<ConfigResource, KafkaFuture<Config>> map) {
        tasksOutstanding_$eq(tasksOutstanding() - 1);
        HashMap hashMap = new HashMap();
        scala.collection.mutable.Map map2 = (scala.collection.mutable.Map) Map$.MODULE$.apply(Nil$.MODULE$);
        map.foreach(tuple2 -> {
            Option option;
            Config config;
            Properties topicConfig;
            Properties restrictValidateTopicConfigPolicy;
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            ConfigResource configResource = (ConfigResource) tuple2._1();
            KafkaFuture kafkaFuture = (KafkaFuture) tuple2._2();
            String name = configResource.name();
            try {
                config = (Config) kafkaFuture.get();
                this.trace(() -> {
                    return new StringBuilder(42).append("Topic configuration for source topic ").append(name).append(" is: ").append(config).toString();
                });
                MirrorTopicConfigSyncRules mirrorTopicConfigSyncRules = this.clientManager.topicConfigSyncRules();
                topicConfig = this.metadataManager.getTopicConfig(name);
                restrictValidateTopicConfigPolicy = ClusterLinkUtils$.MODULE$.restrictValidateTopicConfigPolicy(name, new MirrorTopicConfigsDelegate(topicConfig, config, mirrorTopicConfigSyncRules).updateMirrorProps(name), this.clientManager.alterConfigPolicy());
            } catch (PolicyViolationException e) {
                this.warn(() -> {
                    return new StringBuilder(0).append(new StringBuilder(74).append("Could not update mirror topic '").append(name).append("' configuration due to policy violation on ").toString()).append(new StringBuilder(14).append("cluster link ").append(this.clientManager.linkData().linkName()).append("'").toString()).toString();
                }, () -> {
                    return e;
                });
                this.metrics.topicConfigUpdateFailedSensor().record();
                option = BoxedUnit.UNIT;
            } catch (Throwable th) {
                this.warn(() -> {
                    return new StringBuilder(0).append(new StringBuilder(76).append("Error encountered while processing remote configuration for mirror topic '").append(name).append("' ").toString()).append(new StringBuilder(17).append("on cluster link ").append(this.clientManager.linkData().linkName()).append("'").toString()).toString();
                }, () -> {
                    return th;
                });
                this.metrics.topicConfigUpdateFailedSensor().record();
                option = BoxedUnit.UNIT;
            }
            if (restrictValidateTopicConfigPolicy == null) {
                if (topicConfig != null) {
                    this.debug(() -> {
                        return new StringBuilder(67).append("Updating local configuration for mirror topic '").append(name).append("' on cluster link '").append(this.clientManager.linkData().linkName()).append("'").toString();
                    });
                    scala.collection.mutable.Set diff = CollectionConverters$.MODULE$.SetHasAsScala(restrictValidateTopicConfigPolicy.entrySet()).asScala().diff(CollectionConverters$.MODULE$.SetHasAsScala(topicConfig.entrySet()).asScala());
                    this.trace(() -> {
                        return new StringBuilder(44).append("Adding configs ").append(diff).append(" for topic ").append(name).append(" on target cluster").toString();
                    });
                    HashSet hashSet = new HashSet();
                    CollectionConverters$.MODULE$.PropertiesHasAsScala(restrictValidateTopicConfigPolicy).asScala().foreach(tuple2 -> {
                        return BoxesRunTime.boxToBoolean($anonfun$handleDescribeTopicConfigs$5(hashSet, tuple2));
                    });
                    hashMap.put(new ConfigResource(ConfigResource.Type.TOPIC, name), hashSet);
                    option = map2.put(name, new Some(config));
                    return option;
                }
                option = BoxedUnit.UNIT;
                return option;
            }
        });
        if (!hashMap.isEmpty()) {
            AlterConfigsResult incrementalAlterMirrorTopicConfigs = destAdmin().incrementalAlterMirrorTopicConfigs(hashMap);
            scheduleWhenComplete(incrementalAlterMirrorTopicConfigs.all(), (Function0<Object>) () -> {
                return this.handleAlterTopicConfigs(incrementalAlterMirrorTopicConfigs, map2);
            });
            tasksOutstanding_$eq(tasksOutstanding() + 1);
        }
        return tasksOutstanding() == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean handleAlterTopicConfigs(AlterConfigsResult alterConfigsResult, scala.collection.mutable.Map<String, Option<Config>> map) {
        tasksOutstanding_$eq(tasksOutstanding() - 1);
        CollectionConverters$.MODULE$.SetHasAsScala(alterConfigsResult.values().entrySet()).asScala().foreach(entry -> {
            $anonfun$handleAlterTopicConfigs$1(this, entry);
            return BoxedUnit.UNIT;
        });
        return tasksOutstanding() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$handleDescribeTopicConfigs$5(HashSet hashSet, Tuple2 tuple2) {
        return hashSet.add(new AlterConfigOp(new ConfigEntry((String) tuple2._1(), (String) tuple2._2()), AlterConfigOp.OpType.SET));
    }

    public static final /* synthetic */ void $anonfun$handleAlterTopicConfigs$1(ClusterLinkSyncTopicsConfigs clusterLinkSyncTopicsConfigs, Map.Entry entry) {
        String name = ((ConfigResource) entry.getKey()).name();
        try {
            ((KafkaFuture) entry.getValue()).get();
            clusterLinkSyncTopicsConfigs.metrics.topicConfigUpdateSensor().record();
            clusterLinkSyncTopicsConfigs.trace(() -> {
                return new StringBuilder(52).append("Successfully updated configuration for mirror topic ").append(name).toString();
            });
        } catch (Throwable th) {
            clusterLinkSyncTopicsConfigs.warn(() -> {
                return new StringBuilder(65).append("Error encountered while altering configuration on mirror topic ").append(name).append(": ").append(th).toString();
            });
            clusterLinkSyncTopicsConfigs.metrics.topicConfigUpdateFailedSensor().record();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ClusterLinkSyncTopicsConfigs(ClusterLinkClientManager clusterLinkClientManager, ClusterLinkMetadataManager clusterLinkMetadataManager, LocalClusterLinkAdminClient localClusterLinkAdminClient, int i, ClusterLinkMetrics clusterLinkMetrics) {
        super(clusterLinkClientManager.scheduler(), "ClusterLinkSyncTopicsConfigs", i);
        this.clientManager = clusterLinkClientManager;
        this.metadataManager = clusterLinkMetadataManager;
        this.destAdmin = localClusterLinkAdminClient;
        this.metrics = clusterLinkMetrics;
        this.tasksOutstanding = 0;
    }
}
