package kafka.server.link;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import kafka.controller.KafkaController;
import kafka.server.link.ClusterLinkScheduler;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ClusterLinkSyncAcls.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005mc\u0001B\t\u0013\u0001eA\u0001B\t\u0001\u0003\u0006\u0004%\ta\t\u0005\tO\u0001\u0011\t\u0011)A\u0005I!A\u0001\u0006\u0001B\u0001B\u0003%\u0011\u0006\u0003\u0005/\u0001\t\u0005\t\u0015!\u00030\u0011\u0015\u0011\u0004\u0001\"\u00014\u0011\u001dA\u0004A1A\u0005\neBa!\u0015\u0001!\u0002\u0013Q\u0004b\u0002*\u0001\u0001\u0004%Ia\u0015\u0005\b1\u0002\u0001\r\u0011\"\u0003Z\u0011\u0019y\u0006\u0001)Q\u0005)\")\u0001\r\u0001C)C\")Q\r\u0001C\u0005M\")\u0001\u0010\u0001C\u0005s\"9\u0011Q\u0007\u0001\u0005\n\u0005]\u0002BBA,\u0001\u0011\u0005\u0011\b\u0003\u0004\u0002Z\u0001!\ta\u0015\u0002\u0014\u00072,8\u000f^3s\u0019&t7nU=oG\u0006\u001bGn\u001d\u0006\u0003'Q\tA\u0001\\5oW*\u0011QCF\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003]\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u00015A\u00111d\b\b\u00039ui\u0011AE\u0005\u0003=I\tAc\u00117vgR,'\u000fT5oWN\u001b\u0007.\u001a3vY\u0016\u0014\u0018B\u0001\u0011\"\u00051\u0001VM]5pI&\u001cG+Y:l\u0015\tq\"#A\u0007dY&,g\u000e^'b]\u0006<WM]\u000b\u0002IA\u0011A$J\u0005\u0003MI\u0011\u0001d\u00117vgR,'\u000fT5oW\u000ec\u0017.\u001a8u\u001b\u0006t\u0017mZ3s\u00039\u0019G.[3oi6\u000bg.Y4fe\u0002\n!bY8oiJ|G\u000e\\3s!\tQC&D\u0001,\u0015\tAc#\u0003\u0002.W\ty1*\u00194lC\u000e{g\u000e\u001e:pY2,'/A\u0004nKR\u0014\u0018nY:\u0011\u0005q\u0001\u0014BA\u0019\u0013\u0005I\u0019E.^:uKJd\u0015N\\6NKR\u0014\u0018nY:\u0002\rqJg.\u001b;?)\u0011!TGN\u001c\u0011\u0005q\u0001\u0001\"\u0002\u0012\u0006\u0001\u0004!\u0003\"\u0002\u0015\u0006\u0001\u0004I\u0003\"\u0002\u0018\u0006\u0001\u0004y\u0013!D2veJ,g\u000e^!dYN+G/F\u0001;!\rY$\tR\u0007\u0002y)\u0011QHP\u0001\b[V$\u0018M\u00197f\u0015\ty\u0004)\u0001\u0006d_2dWm\u0019;j_:T\u0011!Q\u0001\u0006g\u000e\fG.Y\u0005\u0003\u0007r\u00121aU3u!\t)u*D\u0001G\u0015\t9\u0005*A\u0002bG2T!!\u0013&\u0002\r\r|W.\\8o\u0015\t92J\u0003\u0002M\u001b\u00061\u0011\r]1dQ\u0016T\u0011AT\u0001\u0004_J<\u0017B\u0001)G\u0005)\t5\r\u001c\"j]\u0012LgnZ\u0001\u000fGV\u0014(/\u001a8u\u0003\u000ed7+\u001a;!\u0003A!\u0018m]6t\u001fV$8\u000f^1oI&tw-F\u0001U!\t)f+D\u0001A\u0013\t9\u0006IA\u0002J]R\fA\u0003^1tWN|U\u000f^:uC:$\u0017N\\4`I\u0015\fHC\u0001.^!\t)6,\u0003\u0002]\u0001\n!QK\\5u\u0011\u001dq\u0016\"!AA\u0002Q\u000b1\u0001\u001f\u00132\u0003E!\u0018m]6t\u001fV$8\u000f^1oI&tw\rI\u0001\u0004eVtG#\u00012\u0011\u0005U\u001b\u0017B\u00013A\u0005\u001d\u0011un\u001c7fC:\f!\"\u001e9eCR,\u0017i\u00197t)\t\u0011w\rC\u0003i\u0019\u0001\u0007\u0011.\u0001\u0006gkR,(/\u001a'jgR\u00042a\u000f6m\u0013\tYGH\u0001\u0006MSN$()\u001e4gKJ\u00042!\u001c8q\u001b\u0005A\u0015BA8I\u0005-Y\u0015MZ6b\rV$XO]3\u0011\u0007E4H)D\u0001s\u0015\t\u0019H/\u0001\u0003vi&d'\"A;\u0002\t)\fg/Y\u0005\u0003oJ\u0014!bQ8mY\u0016\u001cG/[8o\u0003u\tG\rZ!dYN\fe\u000e\u001a'pO\u000e\u0013X-\u0019;j_:<\u0016M\u001d8j]\u001e\u001cHC\u00022{\u0003W\t\t\u0004C\u0003|\u001b\u0001\u0007A0A\nbG2\u001c%/Z1uKJ+7/\u001e7u\u0019&\u001cH\u000fE\u0003~\u0003\u0017\t\tBD\u0002\u007f\u0003\u000fq1a`A\u0003\u001b\t\t\tAC\u0002\u0002\u0004a\ta\u0001\u0010:p_Rt\u0014\"A!\n\u0007\u0005%\u0001)A\u0004qC\u000e\\\u0017mZ3\n\t\u00055\u0011q\u0002\u0002\u0005\u0019&\u001cHOC\u0002\u0002\n\u0001\u0003b!a\u0005\u0002\u001a\u0005uQBAA\u000b\u0015\r\t9B]\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BA\u000e\u0003+\u0011\u0011cQ8na2,G/\u00192mK\u001a+H/\u001e:f!\u0011\ty\"a\n\u000e\u0005\u0005\u0005\"\u0002BA\u0012\u0003K\t!\"Y;uQ>\u0014\u0018N_3s\u0015\t)\"*\u0003\u0003\u0002*\u0005\u0005\"aD!dY\u000e\u0013X-\u0019;f%\u0016\u001cX\u000f\u001c;\t\u000f\u00055R\u00021\u0001\u00020\u0005q1M]3bi\u0016$\u0017i\u00197MSN$\b\u0003B?\u0002\f\u0011Ca!a\r\u000e\u0001\u0004Q\u0014aC1eI\u0016$\u0017i\u00197TKR\f\u0001\u0005Z3mKR,\u0017i\u00197t\u0003:$Gj\\4EK2,G/[8o/\u0006\u0014h.\u001b8hgR9!-!\u000f\u0002H\u0005-\u0003bBA\u001e\u001d\u0001\u0007\u0011QH\u0001\u0014C\u000edG)\u001a7fi\u0016\u0014Vm];mi2K7\u000f\u001e\t\u0006{\u0006-\u0011q\b\t\u0007\u0003'\tI\"!\u0011\u0011\t\u0005}\u00111I\u0005\u0005\u0003\u000b\n\tCA\bBG2$U\r\\3uKJ+7/\u001e7u\u0011\u0019\tIE\u0004a\u0001u\u0005iA-\u001a7fi\u0016$\u0017i\u00197TKRDq!!\u0014\u000f\u0001\u0004\ty%\u0001\u000beK2,G/\u001a3BG24\u0015\u000e\u001c;fe2K7\u000f\u001e\t\u0006{\u0006-\u0011\u0011\u000b\t\u0004\u000b\u0006M\u0013bAA+\r\n\u0001\u0012i\u00197CS:$\u0017N\\4GS2$XM]\u0001\u0011O\u0016$8)\u001e:sK:$\u0018i\u00197TKR\fqcY;se\u0016tG\u000fV1tWN|U\u000f^:uC:$\u0017N\\4")
/* loaded from: input_file:kafka/server/link/ClusterLinkSyncAcls.class */
public class ClusterLinkSyncAcls extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final KafkaController controller;
    private final ClusterLinkMetrics metrics;
    private final Set<AclBinding> currentAclSet;
    private int tasksOutstanding;

    public ClusterLinkClientManager clientManager() {
        return this.clientManager;
    }

    private Set<AclBinding> currentAclSet() {
        return this.currentAclSet;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int i) {
        this.tasksOutstanding = i;
    }

    @Override // kafka.server.link.ClusterLinkScheduler.PeriodicTask
    public boolean run() {
        if (this.controller.isActive()) {
            if (tasksOutstanding() != 0) {
                warn(() -> {
                    return "Number of outstanding tasks was not 0 at the beginning of run. Resetting to 0 and continuing on.";
                });
                tasksOutstanding_$eq(0);
            }
            ListBuffer<AclBindingFilter> aclBindingFilters = AclJson$.MODULE$.toAclBindingFilters((AclFiltersJson) clientManager().currentConfig().aclFilters().get());
            ListBuffer listBuffer = (ListBuffer) ListBuffer$.MODULE$.apply(Nil$.MODULE$);
            aclBindingFilters.foreach(aclBindingFilter -> {
                Some some;
                try {
                    this.trace(() -> {
                        return "Attempting to retrieve ACLs from source cluster";
                    });
                    some = new Some(this.clientManager().getAdmin().describeAcls(aclBindingFilter));
                } catch (ExecutionException e) {
                    Throwable cause = e.getCause();
                    if (cause instanceof AuthorizationException) {
                        this.warn(() -> {
                            return "Unable to retrieve ACLs on source cluster. Please enable DESCRIBE ACLs on the source cluster to proceed with ACL migration.";
                        });
                        some = None$.MODULE$;
                    } else {
                        if (cause == null) {
                            throw new MatchError((Object) null);
                        }
                        this.warn(() -> {
                            return new StringBuilder(78).append("Unexpected error encountered while trying to retrieve ACLs on source cluster: ").append(e).toString();
                        });
                        some = None$.MODULE$;
                    }
                } catch (Throwable th) {
                    this.warn(() -> {
                        return new StringBuilder(78).append("Unexpected error encountered while trying to retrieve ACLs on source cluster: ").append(th).toString();
                    });
                    some = None$.MODULE$;
                }
                Some some2 = some;
                return some2.isDefined() ? listBuffer.$plus$eq(some2) : BoxedUnit.UNIT;
            });
            if (listBuffer.nonEmpty()) {
                ListBuffer listBuffer2 = (ListBuffer) listBuffer.map(option -> {
                    return ((DescribeAclsResult) option.get()).values();
                });
                scheduleWhenComplete(KafkaFuture.allOf((KafkaFuture[]) listBuffer2.toArray(ClassTag$.MODULE$.apply(KafkaFuture.class))), (Function0<Object>) () -> {
                    return this.updateAcls(listBuffer2);
                });
                tasksOutstanding_$eq(tasksOutstanding() + 1);
            }
        }
        return tasksOutstanding() == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean updateAcls(ListBuffer<KafkaFuture<Collection<AclBinding>>> listBuffer) {
        tasksOutstanding_$eq(tasksOutstanding() - 1);
        Set set = (Set) Set$.MODULE$.apply(Nil$.MODULE$);
        listBuffer.foreach(kafkaFuture -> {
            return set.$plus$plus$eq(CollectionConverters$.MODULE$.CollectionHasAsScala((Collection) kafkaFuture.get()).asScala());
        });
        trace(() -> {
            return new StringBuilder(42).append("Result of describeAcls on source cluster: ").append(set).toString();
        });
        Set diff = currentAclSet().diff(set);
        Set diff2 = set.diff(currentAclSet());
        trace(() -> {
            return new StringBuilder(48).append("Removing following ACLs on destination cluster: ").append(diff).toString();
        });
        trace(() -> {
            return new StringBuilder(46).append("Adding following ACLs on destination cluster: ").append(diff2).toString();
        });
        clientManager().getAuthorizer().foreach(authorizer -> {
            $anonfun$updateAcls$5(this, diff2, diff, authorizer);
            return BoxedUnit.UNIT;
        });
        return tasksOutstanding() == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean addAclsAndLogCreationWarnings(List<CompletableFuture<AclCreateResult>> list, List<AclBinding> list2, Set<AclBinding> set) {
        ((List) list.zip(list2)).foreach(tuple2 -> {
            $anonfun$addAclsAndLogCreationWarnings$1(this, set, tuple2);
            return BoxedUnit.UNIT;
        });
        currentAclSet().$plus$plus$eq(set);
        tasksOutstanding_$eq(tasksOutstanding() - 1);
        return tasksOutstanding() == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean deleteAclsAndLogDeletionWarnings(List<CompletableFuture<AclDeleteResult>> list, Set<AclBinding> set, List<AclBindingFilter> list2) {
        ((List) list.zip(list2)).foreach(tuple2 -> {
            $anonfun$deleteAclsAndLogDeletionWarnings$1(this, set, tuple2);
            return BoxedUnit.UNIT;
        });
        currentAclSet().$minus$minus$eq(set);
        tasksOutstanding_$eq(tasksOutstanding() - 1);
        return tasksOutstanding() == 0;
    }

    public Set<AclBinding> getCurrentAclSet() {
        return currentAclSet();
    }

    public int currentTasksOutstanding() {
        return tasksOutstanding();
    }

    public static final /* synthetic */ void $anonfun$updateAcls$5(ClusterLinkSyncAcls clusterLinkSyncAcls, Set set, Set set2, Authorizer authorizer) {
        if (set.nonEmpty()) {
            try {
                List list = set.toList();
                Buffer buffer = (Buffer) CollectionConverters$.MODULE$.ListHasAsScala(authorizer.createAcls((AuthorizableRequestContext) null, CollectionConverters$.MODULE$.SeqHasAsJava(list).asJava(), Optional.empty())).asScala().map(completionStage -> {
                    return completionStage.toCompletableFuture();
                });
                clusterLinkSyncAcls.scheduleWhenComplete(CompletableFuture.allOf((CompletableFuture[]) buffer.toArray(ClassTag$.MODULE$.apply(CompletableFuture.class))), (Function0<Object>) () -> {
                    return clusterLinkSyncAcls.addAclsAndLogCreationWarnings(buffer.toList(), list, set);
                });
                clusterLinkSyncAcls.tasksOutstanding_$eq(clusterLinkSyncAcls.tasksOutstanding() + 1);
            } catch (Throwable th) {
                clusterLinkSyncAcls.warn(() -> {
                    return new StringBuilder(81).append("Unexpected error encountered while trying to create ACLs on destination cluster: ").append(th).toString();
                });
            }
        }
        if (set2.nonEmpty()) {
            try {
                List list2 = ((IterableOnceOps) set2.map(aclBinding -> {
                    return aclBinding.toFilter();
                })).toList();
                Buffer buffer2 = (Buffer) CollectionConverters$.MODULE$.ListHasAsScala(authorizer.deleteAcls((AuthorizableRequestContext) null, CollectionConverters$.MODULE$.SeqHasAsJava(list2).asJava(), Optional.empty())).asScala().map(completionStage2 -> {
                    return completionStage2.toCompletableFuture();
                });
                clusterLinkSyncAcls.scheduleWhenComplete(CompletableFuture.allOf((CompletableFuture[]) buffer2.toArray(ClassTag$.MODULE$.apply(CompletableFuture.class))), (Function0<Object>) () -> {
                    return clusterLinkSyncAcls.deleteAclsAndLogDeletionWarnings(buffer2.toList(), set2, list2);
                });
                clusterLinkSyncAcls.tasksOutstanding_$eq(clusterLinkSyncAcls.tasksOutstanding() + 1);
            } catch (Throwable th2) {
                clusterLinkSyncAcls.warn(() -> {
                    return new StringBuilder(81).append("Unexpected error encountered while trying to delete ACLs on destination cluster: ").append(th2).toString();
                });
            }
        }
    }

    public static final /* synthetic */ void $anonfun$addAclsAndLogCreationWarnings$1(ClusterLinkSyncAcls clusterLinkSyncAcls, Set set, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        CompletableFuture completableFuture = (CompletableFuture) tuple2._1();
        AclBinding aclBinding = (AclBinding) tuple2._2();
        try {
            AclCreateResult aclCreateResult = (AclCreateResult) completableFuture.get();
            if (!aclCreateResult.exception().isPresent()) {
                clusterLinkSyncAcls.metrics.aclsAddedSensor().record();
                return;
            }
            clusterLinkSyncAcls.warn(() -> {
                return new StringBuilder(64).append("Encountered the following exception while trying to create ACL: ").append(aclCreateResult.exception().get()).toString();
            });
            set.$minus$eq(aclBinding);
            clusterLinkSyncAcls.metrics.aclsAddFailedSensor().record();
        } catch (Throwable th) {
            clusterLinkSyncAcls.warn(() -> {
                return new StringBuilder(57).append("Unexpected error encountered while trying to create ACL: ").append(th).toString();
            });
            set.$minus$eq(aclBinding);
            clusterLinkSyncAcls.metrics.aclsAddFailedSensor().record();
        }
    }

    public static final /* synthetic */ void $anonfun$deleteAclsAndLogDeletionWarnings$2(ClusterLinkSyncAcls clusterLinkSyncAcls, Set set, AclDeleteResult.AclBindingDeleteResult aclBindingDeleteResult) {
        if (!aclBindingDeleteResult.exception().isPresent()) {
            clusterLinkSyncAcls.metrics.aclsDeletedSensor().record();
            return;
        }
        clusterLinkSyncAcls.warn(() -> {
            return new StringBuilder(64).append("Encountered the following exception while trying to delete ACL: ").append(aclBindingDeleteResult.exception().get()).toString();
        });
        set.$minus$eq(aclBindingDeleteResult.aclBinding());
        clusterLinkSyncAcls.metrics.aclsDeleteFailedSensor().record();
    }

    public static final /* synthetic */ void $anonfun$deleteAclsAndLogDeletionWarnings$1(ClusterLinkSyncAcls clusterLinkSyncAcls, Set set, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        CompletableFuture completableFuture = (CompletableFuture) tuple2._1();
        AclBindingFilter aclBindingFilter = (AclBindingFilter) tuple2._2();
        try {
            CollectionConverters$.MODULE$.CollectionHasAsScala(((AclDeleteResult) completableFuture.get()).aclBindingDeleteResults()).asScala().foreach(aclBindingDeleteResult -> {
                $anonfun$deleteAclsAndLogDeletionWarnings$2(clusterLinkSyncAcls, set, aclBindingDeleteResult);
                return BoxedUnit.UNIT;
            });
        } catch (Throwable th) {
            clusterLinkSyncAcls.warn(() -> {
                return new StringBuilder(57).append("Unexpected error encountered while trying to delete ACL: ").append(th).toString();
            });
            set.$minus$minus$eq((IterableOnce) set.filter(aclBinding -> {
                return BoxesRunTime.boxToBoolean(aclBindingFilter.matches(aclBinding));
            }));
            clusterLinkSyncAcls.metrics.aclsDeleteFailedSensor().record();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ClusterLinkSyncAcls(ClusterLinkClientManager clusterLinkClientManager, KafkaController kafkaController, ClusterLinkMetrics clusterLinkMetrics) {
        super(clusterLinkClientManager.scheduler(), "ClusterLinkSyncAcls", Predef$.MODULE$.Integer2int(clusterLinkClientManager.currentConfig().aclSyncMs()));
        this.clientManager = clusterLinkClientManager;
        this.controller = kafkaController;
        this.metrics = clusterLinkMetrics;
        this.currentAclSet = (Set) Set$.MODULE$.apply(Nil$.MODULE$);
        this.tasksOutstanding = 0;
    }
}
