package kafka.link;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.log.LogConfig$;
import kafka.server.AbstractFetcherManager;
import kafka.server.DynamicConfig$Client$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.LeaderEndPoint;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.ClusterLinkFetcherThread;
import kafka.server.link.ClusterLinkFilterJson$;
import kafka.server.link.ClusterLinkLeaderEndPoint;
import kafka.server.link.ClusterLinkLeaderRequestBuilder;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkTopicState;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.server.link.FetchResponseSize;
import kafka.server.link.LinkMode$Destination$;
import kafka.server.link.TopicLinkMirror$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.errors.ClusterLinkInUseException;
import org.apache.kafka.common.errors.ClusterLinkNotFoundException;
import org.apache.kafka.common.errors.ClusterLinkPausedException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversable;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\u0011%b\u0001B!C\u0001\u001dCQ\u0001\u0014\u0001\u0005\u00025Cqa\u0014\u0001C\u0002\u0013\u0005\u0001\u000b\u0003\u0004X\u0001\u0001\u0006I!\u0015\u0005\b1\u0002\u0011\r\u0011\"\u0001Q\u0011\u0019I\u0006\u0001)A\u0005#\"9!\f\u0001b\u0001\n\u0003Y\u0006B\u00023\u0001A\u0003%A\fC\u0004f\u0001\t\u0007I\u0011\u00014\t\rE\u0004\u0001\u0015!\u0003h\u0011\u001d\u0011\bA1A\u0005\u0002\u0019Daa\u001d\u0001!\u0002\u00139\u0007\"\u0002;\u0001\t\u0003)\bbBA\u0019\u0001\u0011\u0005\u00111\u0007\u0005\b\u0003{\u0001A\u0011AA \u0011\u001d\tI\u0005\u0001C\u0001\u0003\u0017Bq!!\u0016\u0001\t\u0003\t9\u0006C\u0004\u0002b\u0001!\t!a\u0019\t\u000f\u00055\u0004\u0001\"\u0001\u0002p!9\u0011\u0011\u0010\u0001\u0005\u0002\u0005m\u0004bBAC\u0001\u0011\u0005\u0011q\u0011\u0005\b\u0003#\u0003A\u0011AAJ\u0011\u001d\ti\n\u0001C\u0001\u0003?Cq!!+\u0001\t\u0003\tY\u000bC\u0004\u00026\u0002!\t!a.\t\u000f\u0005\u0005\u0007\u0001\"\u0001\u0002D\"9\u0011Q\u001a\u0001\u0005\u0002\u0005=\u0007b\u0002B\u0019\u0001\u0011\u0005!1\u0007\u0005\b\u0005o\u0001A\u0011\u0002B\u001d\u0011\u001d\u0011\t\u0005\u0001C\u0005\u0005\u0007BqA!\u0019\u0001\t\u0003\u0011\u0019\u0007C\u0004\u0003n\u0001!\tAa\u001c\t\u000f\te\u0004\u0001\"\u0001\u0003|!9!Q\u0011\u0001\u0005\u0002\t\u001d\u0005b\u0002BI\u0001\u0011\u0005!1\u0013\u0005\b\u0005;\u0003A\u0011\u0001BP\u0011\u001d\u0011I\u000b\u0001C\u0001\u0005WCqA!.\u0001\t\u0003\u00119\fC\u0004\u0003B\u0002!\tAa1\t\u000f\t5\u0007\u0001\"\u0001\u0003P\"9!\u0011\u001c\u0001\u0005\u0002\tm\u0007b\u0002Bs\u0001\u0011\u0005!q\u001d\u0005\b\u0005c\u0004A\u0011\u0001Bz\u0011\u001d\u0011i\u0010\u0001C\t\u0005\u007fDqaa\u0001\u0001\t#\u0019)\u0001C\u0005\u0004\u0016\u0001\t\n\u0011\"\u0005\u0004\u0018!91Q\u0006\u0001\u0005\u0002\r=\u0002bBB\u001d\u0001\u0011\u000511\b\u0005\b\u0007\u000b\u0002A\u0011AB$\u0011\u001d\u0019\t\u0006\u0001C\u0001\u0007'Bqa!\u0018\u0001\t\u0003\u0019y\u0006C\u0004\u0004j\u0001!\taa\u001b\t\u000f\rU\u0004\u0001\"\u0001\u0004x!91\u0011\u0011\u0001\u0005\u0002\r\r\u0005bBBG\u0001\u0011%1q\u0012\u0005\n\u0007+\u0003\u0011\u0013!C\u0005\u0007/Aqaa&\u0001\t\u0003\u0019I\nC\u0004\u0004$\u0002!\ta!*\t\u000f\r=\u0006\u0001\"\u0001\u00042\"911\u0018\u0001\u0005\u0002\ru\u0006bBBd\u0001\u0011\u00051\u0011\u001a\u0005\b\u0007'\u0004A\u0011ABk\u0011\u001d\u0019y\u000e\u0001C\u0001\u0007CDqaa;\u0001\t\u0003\u0019i\u000fC\u0004\u0005\u0010\u0001!I\u0001\"\u0005\u00035\rcWo\u001d;fe2Kgn[%oi\u0016<'/\u0019;j_:$Vm\u001d;\u000b\u0005\r#\u0015\u0001\u00027j].T\u0011!R\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0001\n\u0005\u0002J\u00156\t!)\u0003\u0002L\u0005\n\u0011\u0013IY:ue\u0006\u001cGo\u00117vgR,'\u000fT5oW&sG/Z4sCRLwN\u001c+fgR\fa\u0001P5oSRtD#\u0001(\u0011\u0005%\u0003\u0011AD8gMN,G\u000fV8D_6l\u0017\u000e^\u000b\u0002#B\u0011!+V\u0007\u0002'*\tA+A\u0003tG\u0006d\u0017-\u0003\u0002W'\n!Aj\u001c8h\u0003=ygMZ:fiR{7i\\7nSR\u0004\u0013AC:z]\u000e\u0004VM]5pI\u0006Y1/\u001f8d!\u0016\u0014\u0018n\u001c3!\u00035\u0019wN\\:v[\u0016\u0014xI]8vaV\tA\f\u0005\u0002^E6\taL\u0003\u0002`A\u0006!A.\u00198h\u0015\u0005\t\u0017\u0001\u00026bm\u0006L!a\u00190\u0003\rM#(/\u001b8h\u00039\u0019wN\\:v[\u0016\u0014xI]8va\u0002\n1\u0002^8qS\u000e4\u0015\u000e\u001c;feV\tq\r\u0005\u0002i_:\u0011\u0011.\u001c\t\u0003UNk\u0011a\u001b\u0006\u0003Y\u001a\u000ba\u0001\u0010:p_Rt\u0014B\u00018T\u0003\u0019\u0001&/\u001a3fM&\u00111\r\u001d\u0006\u0003]N\u000bA\u0002^8qS\u000e4\u0015\u000e\u001c;fe\u0002\na#\u001b8dYV$W-\u00117m)>\u0004\u0018nY:GS2$XM]\u0001\u0018S:\u001cG.\u001e3f\u00032dGk\u001c9jGN4\u0015\u000e\u001c;fe\u0002\nQ\u0003^3ti\u000e\u0013X-\u0019;f\u001b&\u0014(o\u001c:U_BL7\r\u0006\u0002wsB\u0011!k^\u0005\u0003qN\u0013A!\u00168ji\")!\u0010\u0004a\u0001O\u00061\u0011/^8sk6Dc\u0001\u0004?\u0002\u0016\u0005]\u0001cA?\u0002\u00125\taPC\u0002��\u0003\u0003\t\u0001\u0002\u001d:pm&$WM\u001d\u0006\u0005\u0003\u0007\t)!\u0001\u0004qCJ\fWn\u001d\u0006\u0005\u0003\u000f\tI!A\u0004kkBLG/\u001a:\u000b\t\u0005-\u0011QB\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0003\u0003\u001f\t1a\u001c:h\u0013\r\t\u0019B \u0002\f-\u0006dW/Z*pkJ\u001cW-A\u0004tiJLgnZ:-\t\u0005e\u0011QD\u0011\u0003\u00037\t!A_6\"\u0005\u0005}\u0011!B6sC\u001a$\bf\u0002\u0007\u0002$\u0005-\u0012Q\u0006\t\u0005\u0003K\t9#\u0004\u0002\u0002\u0002%!\u0011\u0011FA\u0001\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^\u0001\u0005]\u0006lW-\t\u0002\u00020\u0005A2\u0010Z5ta2\f\u0017PT1nKvt\u0013/^8sk6l4\u0010M?\u0002WQ,7\u000f^\"sK\u0006$X-T5se>\u0014Hk\u001c9jG\u0006sGMV3sS\u001aL8k\\;sG\u0016$v\u000e]5d\u0013\u0012$2A^A\u001b\u0011\u0015QX\u00021\u0001hQ\u0019iA0!\u0006\u0002:1\"\u0011\u0011DA\u000fQ\u001di\u00111EA\u0016\u0003[\tq\u0004^3tiR\u0013\u0018M\\:bGRLwN\\:XSRDW*\u001b:s_J$v\u000e]5d)\r1\u0018\u0011\t\u0005\u0006u:\u0001\ra\u001a\u0015\u0007\u001dq\f)\"!\u0012-\t\u0005e\u0011Q\u0004\u0015\b\u001d\u0005\r\u00121FA\u0017\u0003\u0015\"Xm\u001d;Ti>\u0004X*\u001b:s_J$v\u000e]5d/&$\b.\u00138wC2LGMU3rk\u0016\u001cH\u000fF\u0002w\u0003\u001bBQA_\bA\u0002\u001dDca\u0004?\u0002\u0016\u0005EC\u0006BA\r\u0003;AsaDA\u0012\u0003W\ti#\u0001\buKN$8\u000b^8q\u001b&\u0014(o\u001c:\u0015\u0007Y\fI\u0006C\u0003{!\u0001\u0007q\r\u000b\u0004\u0011y\u0006U\u0011Q\f\u0017\u0005\u00033\ti\u0002K\u0004\u0011\u0003G\tY#!\f\u0002OQ,7\u000f^*u_Bl\u0015N\u001d:pe^KG\u000f[*pkJ\u001cWm\u00117vgR,'o\u00155vi\u0012|wO\u001c\u000b\u0004m\u0006\u0015\u0004\"\u0002>\u0012\u0001\u00049\u0007FB\t}\u0003+\tI\u0007\f\u0003\u0002\u001a\u0005u\u0001fB\t\u0002$\u0005-\u0012QF\u0001#i\u0016\u001cHo\u0011:fCR,\u0017I\u001c3EK2,G/Z!oIJ+7M]3bi\u0016d\u0015N\\6\u0015\u0007Y\f\t\bC\u0003{%\u0001\u0007q\r\u000b\u0004\u0013y\u0006U\u0011Q\u000f\u0017\u0005\u00033\ti\u0002K\u0004\u0013\u0003G\tY#!\f\u0002)Q,7\u000f^'jeJ|'OT3x%\u0016\u001cwN\u001d3t)\r1\u0018Q\u0010\u0005\u0006uN\u0001\ra\u001a\u0015\u0007'q\f)\"!!-\t\u0005e\u0011Q\u0004\u0015\b'\u0005\r\u00121FA\u0017\u0003e!Xm\u001d;NSJ\u0014xN]#ySN$\u0018N\\4SK\u000e|'\u000fZ:\u0015\u0007Y\fI\tC\u0003{)\u0001\u0007q\r\u000b\u0004\u0015y\u0006U\u0011Q\u0012\u0017\u0005\u00033\ti\u0002K\u0004\u0015\u0003G\tY#!\f\u0002'Q,7\u000f\u001e+pa&\u001c7i\u001c8gS\u001e\u001c\u0016P\\2\u0015\u0007Y\f)\nC\u0003{+\u0001\u0007q\r\u000b\u0004\u0016y\u0006U\u0011\u0011\u0014\u0017\u0005\u00033\ti\u0002K\u0004\u0016\u0003G\tY#!\f\u0002-Q,7\u000f\u001e'jgR$Um]2sS\n,W*\u001b:s_J$2A^AQ\u0011\u0015Qh\u00031\u0001hQ\u00191B0!\u0006\u0002&2\"\u0011\u0011DA\u000fQ\u001d1\u00121EA\u0016\u0003[\ta\u0003^3tiN{WO]2f\u00072,8\u000f^3s#V|G/\u0019\u000b\u0004m\u00065\u0006\"\u0002>\u0018\u0001\u00049\u0007FB\f}\u0003+\t\t\f\f\u0003\u0002\u001a\u0005u\u0001fB\f\u0002$\u0005-\u0012QF\u0001 i\u0016\u001cH\u000fR3ti&t\u0017\r^5p]\u000ecWo\u001d;fe2Kgn[)v_R\fGc\u0001<\u0002:\")!\u0010\u0007a\u0001O\"2\u0001\u0004`A\u000b\u0003{cC!!\u0007\u0002\u001e!:\u0001$a\t\u0002,\u00055\u0012A\u000b;fgR$Um\u001d;j]\u0006$\u0018n\u001c8DYV\u001cH/\u001a:MS:\\'I]8lKJdUM^3m#V|G/\u0019\u000b\u0004m\u0006\u0015\u0007\"\u0002>\u001a\u0001\u00049\u0007FB\r}\u0003+\tI\r\f\u0003\u0002\u001a\u0005u\u0001fB\r\u0002$\u0005-\u0012QF\u0001\"m\u0016\u0014\u0018NZ=EKN$\u0018N\\1uS>t7\t\\;ti\u0016\u0014H*\u001b8l#V|G/\u0019\u000b\u0007\u0003#\fiNa\u0001\u0011\t\u0005M\u0017\u0011\\\u0007\u0003\u0003+T1!a6a\u0003\u0011)H/\u001b7\n\t\u0005m\u0017Q\u001b\u0002\u0005+VKE\tC\u0004\u0002`j\u0001\r!!9\u0002\u0013I,7o\\;sG\u0016\u001c\bCBAr\u0003S\fi/\u0004\u0002\u0002f*\u0019\u0011q]*\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002l\u0006\u0015(aA*fcB!\u0011q^A��\u001b\t\t\tP\u0003\u0003\u0002t\u0006U\u0018AB2p]\u001aLwM\u0003\u0003\u0002x\u0006e\u0018AB2p[6|gNC\u0002F\u0003wTA!!@\u0002\u000e\u00051\u0011\r]1dQ\u0016LAA!\u0001\u0002r\nq1i\u001c8gS\u001e\u0014Vm]8ve\u000e,\u0007b\u0002B\u00035\u0001\u0007!qA\u0001\ncV|G/Y'pI\u0016\u0004BA!\u0003\u0003,9!!1\u0002B\u0013\u001d\u0011\u0011iA!\t\u000f\t\t=!q\u0004\b\u0005\u0005#\u0011iB\u0004\u0003\u0003\u0014\tma\u0002\u0002B\u000b\u00053q1A\u001bB\f\u0013\t\ty!\u0003\u0003\u0002~\u00065\u0011bA#\u0002|&!\u0011q_A}\u0013\u0011\t\u00190!>\n\t\t\r\u0012\u0011_\u0001\nS:$XM\u001d8bYNLAAa\n\u0003*\u0005\u00012i\u001c8gYV,g\u000e^\"p]\u001aLwm\u001d\u0006\u0005\u0005G\t\t0\u0003\u0003\u0003.\t=\"\u0001F\"mkN$XM\u001d'j].\fVo\u001c;b\u001b>$WM\u0003\u0003\u0003(\t%\u0012a\u0004<fe&4\u00170U;pi\u0006lu\u000eZ3\u0015\u0007Y\u0014)\u0004C\u0004\u0003\u0006m\u0001\rAa\u0002\u0002A\u0011,7\u000f^\"mkN$XM\u001d'j].\u0014V\r\u001d7jG\u0006\u001cH\u000b\u001b:piRdW\r\u001a\u000b\u0003\u0005w\u00012A\u0015B\u001f\u0013\r\u0011yd\u0015\u0002\b\u0005>|G.Z1o\u0003]1XM]5gs\u001a+Go\u00195SKN\u0004xN\\:f'&TX\rF\u0003w\u0005\u000b\u0012I\u0005C\u0004\u0003Hu\u0001\r!!5\u0002\r1Lgn[%e\u0011\u001d\u0011Y%\ba\u0001\u0005\u001b\nA\"\u001a=qK\u000e$X\rZ*ju\u0016\u0004RA\u0015B(\u0005'J1A!\u0015T\u0005\u0019y\u0005\u000f^5p]B!!Q\u000bB/\u001b\t\u00119FC\u0002D\u00053R1Aa\u0017E\u0003\u0019\u0019XM\u001d<fe&!!q\fB,\u0005E1U\r^2i%\u0016\u001c\bo\u001c8tKNK'0Z\u00011i\u0016\u001cH\u000fR3ti&t\u0017\r^5p]\u000ecWo\u001d;fe2Kgn[)v_R\fw+\u001b;i\u0005J|7.\u001a:SKN$\u0018M\u001d;\u0015\u0007Y\u0014)\u0007C\u0003{=\u0001\u0007q\r\u000b\u0004\u001fy\u0006U!\u0011\u000e\u0017\u0005\u00033\ti\u0002K\u0004\u001f\u0003G\tY#!\f\u0002KQ,7\u000f\u001e#fgRLg.\u0019;j_:d\u0015m\u001a'j].4U\r^2iKJ$\u0006N]8ui2,Gc\u0001<\u0003r!)!p\ba\u0001O\"2q\u0004`A\u000b\u0005kbC!!\u0007\u0002\u001e!:q$a\t\u0002,\u00055\u0012!\u0005;fgR\fE\r\u001a)beRLG/[8ogR\u0019aO! \t\u000bi\u0004\u0003\u0019A4)\r\u0001b\u0018Q\u0003BAY\u0011\tI\"!\b)\u000f\u0001\n\u0019#a\u000b\u0002.\u0005YB/Z:u\u00032$XM]\"mkN$XM\u001d'j].\u001cuN\u001c4jON$2A\u001eBE\u0011\u0015Q\u0018\u00051\u0001hQ\u0019\tC0!\u0006\u0003\u000e2\"\u0011\u0011DA\u000fQ\u001d\t\u00131EA\u0016\u0003[\t\u0011\u0006^3ti>3gm]3u\u001b&<'/\u0019;j_:<\u0016\u000e\u001e5BI\u0012,GmQ8ogVlWM]$s_V\u0004Hc\u0001<\u0003\u0016\")!P\ta\u0001O\"2!\u0005`A\u000b\u00053cC!!\u0007\u0002\u001e!:!%a\t\u0002,\u00055\u0012!\t;fgR|eMZ:fi6KwM]1uS>tw+\u001b;i\u0003\u0012$W\r\u001a+pa&\u001cGc\u0001<\u0003\"\")!p\ta\u0001O\"21\u0005`A\u000b\u0005KcC!!\u0007\u0002\u001e!:1%a\t\u0002,\u00055\u0012\u0001\u0005;fgR$Um\u001d;SK\u0006$wJ\u001c7z)\r1(Q\u0016\u0005\u0006u\u0012\u0002\ra\u001a\u0015\u0007Iq\f)B!--\t\u0005e\u0011Q\u0004\u0015\bI\u0005\r\u00121FA\u0017\u0003q!Xm\u001d;EK2,G/Z\"mkN$XM\u001d'j].\u001cE.Z1okB$2A\u001eB]\u0011\u0015QX\u00051\u0001hQ\u0019)C0!\u0006\u0003>2\"\u0011\u0011DA\u000fQ\u001d)\u00131EA\u0016\u0003[\t\u0001\u0005^3ti6K'O]8sK\u0012$v\u000e]5d\u001b\u0006\u00148.\u001a3G_J$U\r\\3uKR\u0019aO!2\t\u000bi4\u0003\u0019A4)\r\u0019b\u0018Q\u0003BeY\u0011\tI\"!\b)\u000f\u0019\n\u0019#a\u000b\u0002.\u0005qA/Z:u!\u0006,8/\u001a+pa&\u001cGc\u0001<\u0003R\")!p\na\u0001O\"2q\u0005`A\u000b\u0005+dC!!\u0007\u0002\u001e!:q%a\t\u0002,\u00055\u0012\u0001\u0006;fgR\u0004\u0016-^:f\u00072,8\u000f^3s\u0019&t7\u000eF\u0002w\u0005;DQA\u001f\u0015A\u0002\u001dDc\u0001\u000b?\u0002\u0016\t\u0005H\u0006BA\r\u0003;As\u0001KA\u0012\u0003W\ti#A\tuKN$(+\u001a9mS\u000e\f7\u000b^1ukN$2A\u001eBu\u0011\u0015Q\u0018\u00061\u0001hQ\u0019IC0!\u0006\u0003n2\"\u0011\u0011DA\u000fQ\u001dI\u00131EA\u0016\u0003[\t\u0011\u0003^3ti\u0006+Ho\\'jeJ|'/\u001b8h)\r1(Q\u001f\u0005\u0006u*\u0002\ra\u001a\u0015\u0007Uq\f)B!?-\t\u0005e\u0011Q\u0004\u0015\bU\u0005\r\u00121FA\u0017\u0003=\tW\u000f^8NSJ\u0014xN\u001d+pa&\u001cGc\u0001<\u0004\u0002!)\u0001l\u000ba\u0001#\u0006iB-Z:u\u0019&t7\u000e\u0015:paN4uN]!vi>l\u0015N\u001d:pe&tw\r\u0006\u0004\u0004\b\r51\u0011\u0003\t\u0005\u0003'\u001cI!\u0003\u0003\u0004\f\u0005U'A\u0003)s_B,'\u000f^5fg\"11q\u0002\u0017A\u0002\u001d\fA\u0002^8qS\u000e4\u0015\u000e\u001c;feND\u0011ba\u0005-!\u0003\u0005\rAa\u000f\u0002'\r|gNZ5hkJ,G*\u001b8l!J,g-\u001b=\u0002O\u0011,7\u000f\u001e'j].\u0004&o\u001c9t\r>\u0014\u0018)\u001e;p\u001b&\u0014(o\u001c:j]\u001e$C-\u001a4bk2$HEM\u000b\u0003\u00073QCAa\u000f\u0004\u001c-\u00121Q\u0004\t\u0005\u0007?\u0019I#\u0004\u0002\u0004\")!11EB\u0013\u0003%)hn\u00195fG.,GMC\u0002\u0004(M\u000b!\"\u00198o_R\fG/[8o\u0013\u0011\u0019Yc!\t\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-A\u001auKN$H*Y:u\r\u0016$8\r[3e\u001f\u001a47/\u001a;Qe>lw\u000e^3e\u001b&\u0014(o\u001c:U_BL7\rR3tGJL\u0007\u000f^5p]R\u0019ao!\r\t\u000bit\u0003\u0019A4)\r9b\u0018QCB\u001bY\u0011\tI\"!\b)\u000f9\n\u0019#a\u000b\u0002.\u0005)D/Z:u\u0019\u0006\u001cHOR3uG\",Gm\u00144gg\u0016$h)Y5mK\u0012|e/\u001a:NSJ\u0014xN\u001d+pa&\u001cG)Z:de&\u0004H/[8o)\r18Q\b\u0005\u0006u>\u0002\ra\u001a\u0015\u0007_q\f)b!\u0011-\t\u0005e\u0011Q\u0004\u0015\b_\u0005\r\u00121FA\u0017\u0003)\"Xm\u001d;BkR|W*\u001b:s_JLgn\u001a(p\u001fZ,'\u000f\\1qa&tw\rV8qS\u000e4\u0015\u000e\u001c;feN$2A^B%\u0011\u0015Q\b\u00071\u0001hQ\u0019\u0001D0!\u0006\u0004N1\"\u0011\u0011DA\u000fQ\u001d\u0001\u00141EA\u0016\u0003[\tq\u0005^3ti\u0006+Ho\\'jeJ|'/\u001b8h\u00032dwn^:MS:\\7i\u001c8gS\u001e,\u0006\u000fZ1uKR\u0019ao!\u0016\t\u000bi\f\u0004\u0019A4)\rEb\u0018QCB-Y\u0011\tI\"!\b)\u000fE\n\u0019#a\u000b\u0002.\u0005!C/Z:u\u001b&\u0014(o\u001c:U_BL7mQ8v]R4uN]'vYRL\u0007\u000f\\3MS:\\7\u000fF\u0002w\u0007CBQA\u001f\u001aA\u0002\u001dDcA\r?\u0002\u0016\r\u0015D\u0006BA\r\u0003;AsAMA\u0012\u0003W\ti#A\u0012uKN$\u0018)\u001e;p\u001b&\u0014(o\u001c:j]\u001e,\u0006\u000fZ1uK\u0016C\u0018n\u001d;j]\u001ed\u0015N\\6\u0015\u0007Y\u001ci\u0007C\u0003{g\u0001\u0007q\r\u000b\u00044y\u0006U1\u0011\u000f\u0017\u0005\u00033\ti\u0002K\u00044\u0003G\tY#!\f\u0002MQ,7\u000f^!vi>l\u0015N\u001d:pe&tw-\u00113eS:<\u0017\t\u001a3ji&|g.\u00197U_BL7\rF\u0002w\u0007sBQA\u001f\u001bA\u0002\u001dDc\u0001\u000e?\u0002\u0016\ruD\u0006BA\r\u0003;As\u0001NA\u0012\u0003W\ti#\u0001\u0011uKN$\u0018)\u001e;p\u001b&\u0014(o\u001c:j]\u001etu.\u0012=jgRLgn\u001a+pa&\u001cGc\u0001<\u0004\u0006\")!0\u000ea\u0001O\"2Q\u0007`A\u000b\u0007\u0013cC!!\u0007\u0002\u001e!:Q'a\t\u0002,\u00055\u0012A\r;fgRd\u0015m\u001d;GKR\u001c\u0007.\u001a3PM\u001a\u001cX\r^*u_B\u0004X\rZ'jeJ|'\u000fV8qS\u000e$Um]2sSB$\u0018n\u001c8\u0015\u0007Y\u001c\t\nC\u0005\u0004\u0014Z\u0002\n\u00111\u0001\u0003<\u00059\u0001O]8n_R,\u0017\u0001\u0010;fgRd\u0015m\u001d;GKR\u001c\u0007.\u001a3PM\u001a\u001cX\r^*u_B\u0004X\rZ'jeJ|'\u000fV8qS\u000e$Um]2sSB$\u0018n\u001c8%I\u00164\u0017-\u001e7uIE\nA\u0004^3ti\u0012+G.\u001a;f\u0003V$x.T5se>\u0014X\r\u001a+pa&\u001c7\u000fF\u0002w\u00077CQA\u001f\u001dA\u0002\u001dDc\u0001\u000f?\u0002\u0016\r}E\u0006BA\r\u0003;As\u0001OA\u0012\u0003W\ti#\u0001\u0012uKN$\u0018J\u001c;feZ\fGn\u00115b]\u001e,gi\u001c:QKJLw\u000eZ5d)\u0006\u001c8n\u001d\u000b\u0004m\u000e\u001d\u0006\"\u0002>:\u0001\u00049\u0007FB\u001d}\u0003+\u0019Y\u000b\f\u0003\u0002\u001a\u0005u\u0001fB\u001d\u0002$\u0005-\u0012QF\u0001\u001fi\u0016\u001cH/\u0012=uKJt\u0017\r\\5{K\u0012\u0004\u0016m]:x_J$7i\u001c8gS\u001e$2A^BZ\u0011\u0015Q(\b1\u0001hQ\u0019QD0!\u0006\u000482\"\u0011\u0011DA\u000fQ\u001dQ\u00141EA\u0016\u0003[\t\u0011\u0006^3ti6K'O]8s\r\u0006LGn\u001c<fe^CWM\\*pkJ\u001cW-S:V]\u00064\u0018-\u001b7bE2,Gc\u0001<\u0004@\")!p\u000fa\u0001O\"21\b`A\u000b\u0007\u0007dC!!\u0007\u0002\u001e!:1(a\t\u0002,\u00055\u0012\u0001\n;fgR\u001cE.^:uKJd\u0015N\\6NKR\fG-\u0019;b)>\u0004\u0018nY\"sK\u0006$\u0018n\u001c8\u0015\u0007Y\u001cY\rC\u0003{y\u0001\u0007q\r\u000b\u0004=y\u0006U1q\u001a\u0017\u0005\u00033\ti\u0002K\u0004=\u0003G\tY#!\f\u0002/Q,7\u000f\u001e#fg\u000e\u0014\u0018NY3DYV\u001cH/\u001a:MS:\\Gc\u0001<\u0004X\")!0\u0010a\u0001O\"2Q\b`A\u000b\u00077d#!!\u0007)\u000fu\n\u0019#a\u000b\u0002.\u00051C/Z:u\u00032$XM]\"mkN$XM\u001d'j].<\u0016\u000e\u001e5J]Z\fG.\u001b3D_:4\u0017nZ:\u0015\u0007Y\u001c\u0019\u000fC\u0003{}\u0001\u0007q\r\u000b\u0004?y\u0006U1q\u001d\u0017\u0003\u00033AsAPA\u0012\u0003W\ti#A\rwKJLg-\u001f#fg\u000e\u0014\u0018NY3MS:\\7OU3tk2$H#\u0002<\u0004p\u0012-\u0001bBBy\u007f\u0001\u000711_\u0001\u000eI\u0016\u001cH\u000fT5oWN#\u0018\r^3\u0011\t\rUHQ\u0001\b\u0005\u0007o$\t!\u0004\u0002\u0004z*!11`B\u007f\u0003\u0015\tG-\\5o\u0015\u0011\u0019y0!?\u0002\u000f\rd\u0017.\u001a8ug&!A1AB}\u0003Y\u0019E.^:uKJd\u0015N\\6EKN\u001c'/\u001b9uS>t\u0017\u0002\u0002C\u0004\t\u0013\u0011\u0011\u0002T5oWN#\u0018\r^3\u000b\t\u0011\r1\u0011 \u0005\b\t\u001by\u0004\u0019ABz\u0003=\u0019x.\u001e:dK2Kgn[*uCR,\u0017!\b<fe&4\u0017pU1tY*\u000b\u0017m]\"p]\u001aLw-\u00128def\u0004H/\u001a3\u0015\u0007Y$\u0019\u0002C\u0004\u0003H\u0001\u0003\r!!5)\u000f\u0001!9\u0002b\t\u0005&A!A\u0011\u0004C\u0010\u001b\t!YB\u0003\u0003\u0005\u001e\u0005\u0015\u0011aA1qS&!A\u0011\u0005C\u000e\u0005\r!\u0016mZ\u0001\u0006m\u0006dW/Z\u0011\u0003\tO\t1\"\u001b8uK\u001e\u0014\u0018\r^5p]\u0002")
/* loaded from: input_file:kafka/link/ClusterLinkIntegrationTest.class */
public class ClusterLinkIntegrationTest extends AbstractClusterLinkIntegrationTest {
    private final long offsetToCommit = 10;
    private final long syncPeriod = 100;
    private final String consumerGroup = "testGroup";
    private final String topicFilter = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(181).append("|{\n        |\"topicFilters\": [\n        |  {\n        |     \"name\": \"").append(topic()).append("\",\n        |     \"patternType\": \"literal\",\n        |     \"filterType\": \"include\"\n        |  }\n        |]}\n        |").toString())).stripMargin();
    private final String includeAllTopicsFilter = new StringOps(Predef$.MODULE$.augmentString("|{\n        |\"topicFilters\": [\n        |  {\n        |     \"name\": \"*\",\n        |     \"patternType\": \"literal\",\n        |     \"filterType\": \"include\"\n        |  }\n        |]}\n        |")).stripMargin();

    public long offsetToCommit() {
        return this.offsetToCommit;
    }

    public long syncPeriod() {
        return this.syncPeriod;
    }

    public String consumerGroup() {
        return this.consumerGroup;
    }

    public String topicFilter() {
        return this.topicFilter;
    }

    public String includeAllTopicsFilter() {
        return this.includeAllTopicsFilter;
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateMirrorTopic(String str) {
        Properties properties = new Properties();
        properties.put(LogConfig$.MODULE$.RetentionMsProp(), "10000");
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), true);
        ClusterLinkTestHarness destCluster = destCluster();
        CreateTopicsResult linkTopic = destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        Assertions.assertEquals(numPartitions(), (Integer) linkTopic.numPartitions(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()).get());
        Assertions.assertEquals(replicationFactor(), (Integer) linkTopic.replicationFactor(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()).get());
        Assertions.assertEquals("10000", ((Config) linkTopic.config(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()).get()).get(LogConfig$.MODULE$.RetentionMsProp()).value());
        Assertions.assertEquals("10000", destCluster().describeTopicConfig(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()).get(LogConfig$.MODULE$.RetentionMsProp()).value());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty() && str.equals("zk")) {
            verifyLinkWithClusterLinkPrefixCountMetric(1, linkName(), None$.MODULE$);
        }
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()})), ((TraversableOnce) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection) ((ClusterLinkListing) ((IterableLike) destCluster().listClusterLinks(true).filter(clusterLinkListing -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateMirrorTopic$1(this, clusterLinkListing));
        })).head()).topics().get()).asScala()).toSet());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()})), ((TraversableOnce) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection) ((ClusterLinkDescription) ((IterableLike) destCluster().describeClusterLinks(true).filter(clusterLinkDescription -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateMirrorTopic$2(this, clusterLinkDescription));
        })).head()).topics().get()).asScala()).toSet());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateMirrorTopicAndVerifySourceTopicId(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        if (isKraftTest()) {
            TestUtils$.MODULE$.ensureConsistentKRaftMetadata(sourceCluster().brokers(), sourceCluster().controllerServer(), TestUtils$.MODULE$.ensureConsistentKRaftMetadata$default$3());
        }
        Uuid uuid = sourceCluster().describeTopic(topic()).topicId();
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), true);
        ClusterLinkTestHarness destCluster = destCluster();
        CreateTopicsResult linkTopic = destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        Assertions.assertEquals(numPartitions(), (Integer) linkTopic.numPartitions(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()).get());
        Assertions.assertEquals(replicationFactor(), (Integer) linkTopic.replicationFactor(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()).get());
        if (isKraftTest()) {
            TestUtils$.MODULE$.ensureConsistentKRaftMetadata(destCluster().brokers(), destCluster().controllerServer(), TestUtils$.MODULE$.ensureConsistentKRaftMetadata$default$3());
        }
        ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().clusterLinkMetadataManager().exists(clusterLinkMetadataManager -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateMirrorTopicAndVerifySourceTopicId$1(this, uuid, clusterLinkMetadataManager));
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testTransactionsWithMirrorTopic(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        String str2 = "anotherTopic";
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createTopic("anotherTopic", numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), true);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        Properties properties = new Properties();
        properties.setProperty("transactional.id", "test_txn");
        properties.setProperty("acks", "all");
        ClusterLinkTestHarness destCluster3 = destCluster();
        KafkaProducer<byte[], byte[]> createProducer = destCluster3.createProducer(destCluster3.createProducer$default$1(), destCluster3.createProducer$default$2(), properties);
        try {
            createProducer.initTransactions();
            Properties properties2 = new Properties();
            properties2.setProperty("group.id", "testGroup");
            properties2.setProperty("isolation.level", "read_committed");
            ClusterLinkTestHarness destCluster4 = destCluster();
            KafkaConsumer<byte[], byte[]> createConsumer = destCluster4.createConsumer(destCluster4.createConsumer$default$1(), destCluster4.createConsumer$default$2(), properties2, destCluster4.createConsumer$default$4());
            try {
                createConsumer.assign((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(partitions(partitions$default$1())).asJava());
                Seq consumeRecords = TestUtils$.MODULE$.consumeRecords(createConsumer, producedRecords().size(), 20000L);
                Map map = (Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) ((TraversableLike) CollectionConverters$.MODULE$.mapAsScalaMapConverter(createConsumer.endOffsets((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(partitions(partitions$default$1())).asJava())).asScala()).map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError((Object) null);
                    }
                    return new Tuple2((TopicPartition) tuple2._1(), new OffsetAndMetadata(Predef$.MODULE$.Long2long((Long) tuple2._2())));
                }, Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava();
                createProducer.beginTransaction();
                consumeRecords.foreach(consumerRecord -> {
                    return createProducer.send(new ProducerRecord(str2, Predef$.MODULE$.int2Integer(consumerRecord.partition()), Predef$.MODULE$.long2Long(consumerRecord.timestamp()), consumerRecord.key(), consumerRecord.value()));
                });
                createProducer.sendOffsetsToTransaction(map, new ConsumerGroupMetadata("testGroup"));
                createProducer.commitTransaction();
                Assertions.assertEquals(map, createConsumer.committed((Set) CollectionConverters$.MODULE$.setAsJavaSetConverter(partitions(partitions$default$1()).toSet()).asJava()));
                createProducer.beginTransaction();
                ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                    this.produceRecords(createProducer, this.topic(), 1, this.produceRecords$default$4());
                });
                Assertions.assertTrue(executionException.getMessage().matches(".*Could not add partitions to transaction due to errors.*INVALID_REQUEST.*"), new StringBuilder(17).append("Unexpected error ").append(executionException.getMessage()).toString());
                createProducer.abortTransaction();
                producedRecords().clear();
                ClusterLinkTestHarness destCluster5 = destCluster();
                destCluster5.unlinkTopic(topic(), linkName(), destCluster5.unlinkTopic$default$3(), false);
                TestUtils$ testUtils$ = TestUtils$.MODULE$;
                long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
                long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
                if (testUtils$ == null) {
                    throw null;
                }
                long currentTimeMillis = System.currentTimeMillis();
                while (!$anonfun$testTransactionsWithMirrorTopic$4(this)) {
                    if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                        Assertions.fail($anonfun$testTransactionsWithMirrorTopic$8());
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
                }
                createProducer.beginTransaction();
                produceRecords(createProducer, topic(), 10, produceRecords$default$4());
                createProducer.commitTransaction();
                consumeRecords(createConsumer, consumeRecords$default$2());
                createConsumer.close();
            } catch (Throwable th) {
                createConsumer.close();
                throw th;
            }
        } finally {
            createProducer.close();
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testStopMirrorTopicWithInvalidRequest(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> {
            this.destCluster().unlinkTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), this.linkName(), false, false);
        });
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().unlinkTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), this.linkName(), false, false);
        });
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), clusterLinkPrefix());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        verifyTimeToStopMirrorTopicPromoteMetric(linkName());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().unlinkTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), this.linkName(), false, false);
        });
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testStopMirror(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        verifyTimeToStopMirrorTopicFailoverMetric(linkName());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.linkTopic(topic(), replicationFactor(), linkName(), destCluster3.linkTopic$default$4(), clusterLinkPrefix());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster4.unlinkTopic$default$3(), destCluster4.unlinkTopic$default$4());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        verifyTimeToStopMirrorTopicPromoteMetric(linkName());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.deleteClusterLink(linkName(), destCluster5.deleteClusterLink$default$2(), destCluster5.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testStopMirrorWithSourceClusterShutdown(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("request.timeout.ms", "1000");
        destLinkProps.setProperty("default.api.timeout.ms", "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "2");
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        sourceCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(topic(), linkName(), destCluster2.unlinkTopic$default$3(), false);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        verifyTimeToStopMirrorTopicFailoverMetric(linkName());
        restartSource$1();
        restartMirrorTopic$1();
        sourceCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4());
        Thread.sleep(1000L);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PENDING_STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        restartSource$1();
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        verifyTimeToStopMirrorTopicPromoteMetric(linkName());
        restartSource$1();
        restartMirrorTopic$1();
        sourceCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.unlinkTopic(topic(), linkName(), destCluster4.unlinkTopic$default$3(), destCluster4.unlinkTopic$default$4());
        Thread.sleep(1000L);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PENDING_STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.unlinkTopic(topic(), linkName(), destCluster5.unlinkTopic$default$3(), false);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        verifyTimeToStopMirrorTopicFailoverMetric(linkName());
        destCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness destCluster6 = destCluster();
        destCluster6.deleteClusterLink(linkName(), destCluster6.deleteClusterLink$default$2(), destCluster6.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateAndDeleteAndRecreateLink(String str) {
        Tuple2 $minus$greater$extension;
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), ((TraversableOnce) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection) ((ClusterLinkListing) ((IterableLike) destCluster().listClusterLinks(true).filter(clusterLinkListing -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateAndDeleteAndRecreateLink$1(this, clusterLinkListing));
        })).head()).topics().get()).asScala()).toSet());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), ((TraversableOnce) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection) ((ClusterLinkDescription) ((IterableLike) destCluster().describeClusterLinks(true).filter(clusterLinkDescription -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateAndDeleteAndRecreateLink$2(this, clusterLinkDescription));
        })).head()).topics().get()).asScala()).toSet());
        produceToSourceCluster(20);
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink);
        destCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
        Assertions.assertTrue(destCluster().listClusterLinks(true).isEmpty());
        Assertions.assertTrue(destCluster().describeClusterLinks(true).isEmpty());
        KafkaBroker kafkaBroker = (KafkaBroker) destCluster().brokers().head();
        destCluster().killBrokerById(kafkaBroker.config().brokerId());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.restartDeadBrokers(destCluster3.restartDeadBrokers$default$1());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            Seq $anonfun$testCreateAndDeleteAndRecreateLink$3 = $anonfun$testCreateAndDeleteAndRecreateLink$3(kafkaBroker);
            if ($anonfun$testCreateAndDeleteAndRecreateLink$4($anonfun$testCreateAndDeleteAndRecreateLink$3)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testCreateAndDeleteAndRecreateLink$3), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testCreateAndDeleteAndRecreateLink$3), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(Nil$.MODULE$, (Seq) $minus$greater$extension._1());
        if (useSourceInitiatedLink()) {
            ClusterLinkTestHarness sourceCluster2 = sourceCluster();
            sourceCluster2.deleteClusterLink(linkName(), sourceCluster2.deleteClusterLink$default$2(), sourceCluster2.deleteClusterLink$default$3());
            Assertions.assertTrue(sourceCluster().listClusterLinks(true).isEmpty());
            Assertions.assertTrue(sourceCluster().describeClusterLinks(true).isEmpty());
        }
        UUID createClusterLink2 = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.linkTopic(topic(), replicationFactor(), linkName(), destCluster4.linkTopic$default$4(), destCluster4.linkTopic$default$5());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), ((TraversableOnce) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection) ((ClusterLinkListing) ((IterableLike) destCluster().listClusterLinks(true).filter(clusterLinkListing2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateAndDeleteAndRecreateLink$5(this, clusterLinkListing2));
        })).head()).topics().get()).asScala()).toSet());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), ((TraversableOnce) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection) ((ClusterLinkDescription) ((IterableLike) destCluster().describeClusterLinks(true).filter(clusterLinkDescription2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateAndDeleteAndRecreateLink$6(this, clusterLinkDescription2));
        })).head()).topics().get()).asScala()).toSet());
        produceToSourceCluster(20);
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink2);
        destCluster().deleteLinkNoVerify(linkName(), true);
        destCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.restartDeadBrokers(destCluster5.restartDeadBrokers$default$1());
        destCluster().updateBootstrapServers();
        Option clusterLinkMetadataManager = kafkaBroker.clusterLinkManager().clusterLinkMetadataManager();
        Assertions.assertTrue(clusterLinkMetadataManager.isDefined());
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testCreateAndDeleteAndRecreateLink$7(clusterLinkMetadataManager, createClusterLink2)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testCreateAndDeleteAndRecreateLink$8(createClusterLink2));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        if (useSourceInitiatedLink()) {
            return;
        }
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("sasl.jaas.config", destLinkProps.getProperty("sasl.jaas.config").replace(linkName(), new StringBuilder(6).append("wrong-").append(linkName()).toString()));
        verifyValidateLinkFailure(destLinkProps, SaslAuthenticationException.class, "Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMirrorNewRecords(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        produceToSourceCluster(20);
        consume(sourceCluster(), "");
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink);
        if (str.equals("zk")) {
            verifySaslJaasConfigEncrypted(createClusterLink);
        }
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMirrorExistingRecords(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testTopicConfigSync(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties destLinkProps = destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "1000")})));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        sourceCluster().alterTopic(topic(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "80000000")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicConfigSync$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicConfigSync$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyTopicConfigChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testListDescribeMirror(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        scala.collection.immutable.Set empty = Predef$.MODULE$.Set().empty();
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        Assertions.assertEquals(empty, sourceCluster2.listMirrorTopics(sourceCluster2.listMirrorTopics$default$1()));
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        GenTraversable apply = Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()}));
        ClusterLinkTestHarness destCluster2 = destCluster();
        Assertions.assertEquals(apply, destCluster2.listMirrorTopics(destCluster2.listMirrorTopics$default$1()));
        MirrorTopicDescription describeMirrorTopic = destCluster().describeMirrorTopic(topic());
        Assertions.assertEquals(linkName(), describeMirrorTopic.linkName());
        Assertions.assertEquals(topic(), describeMirrorTopic.sourceTopic());
        Assertions.assertEquals(MirrorTopicDescription.State.ACTIVE, describeMirrorTopic.state());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, waitUntilMirrorState$default$2());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testListDescribeMirror$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testListDescribeMirror$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        scala.collection.immutable.Set empty2 = Predef$.MODULE$.Set().empty();
        ClusterLinkTestHarness destCluster4 = destCluster();
        Assertions.assertEquals(empty2, destCluster4.listMirrorTopics(destCluster4.listMirrorTopics$default$1()));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), destCluster().listMirrorTopics(true));
        Assertions.assertEquals(destCluster().describeMirrorTopic(topic()).state(), MirrorTopicDescription.State.STOPPED);
        destCluster().deleteTopic(topic(), true);
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), destCluster().listMirrorTopics(true));
        Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> {
            this.destCluster().describeMirrorTopic(this.topic());
        });
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.linkTopic(topic(), (short) 2, linkName(), destCluster5.linkTopic$default$4(), destCluster5.linkTopic$default$5());
        GenTraversable apply2 = Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()}));
        ClusterLinkTestHarness destCluster6 = destCluster();
        Assertions.assertEquals(apply2, destCluster6.listMirrorTopics(destCluster6.listMirrorTopics$default$1()));
        Assertions.assertEquals(MirrorTopicDescription.State.ACTIVE, destCluster().describeMirrorTopic(topic()).state());
        destCluster().pauseTopic(topic(), true);
        Assertions.assertEquals(MirrorTopicDescription.State.PAUSED, destCluster().describeMirrorTopic(topic()).state());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, waitUntilMirrorState$default$2());
        ClusterLinkTestHarness destCluster7 = destCluster();
        destCluster7.deleteClusterLink(linkName(), true, destCluster7.deleteClusterLink$default$3());
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testListDescribeMirror$4(this)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testListDescribeMirror$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSourceClusterQuota(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.ReplicaFetchMaxBytesProp()), "100")}))), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        ConfluentAdmin createConfluentAdminClient = sourceCluster2.createConfluentAdminClient(sourceCluster2.createConfluentAdminClient$default$1());
        verifyQuota(j -> {
            this.setQuota$1(j, createConfluentAdminClient);
        }, () -> {
            return this.throttled$1();
        }, false);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDestinationClusterLinkQuota(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        $colon.colon colonVar = new $colon.colon(new ConfigResource(ConfigResource.Type.BROKER, ""), Nil$.MODULE$);
        UUID verifyDestinationClusterLinkQuota = verifyDestinationClusterLinkQuota(colonVar, ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND);
        verifyFetchResponseSize(verifyDestinationClusterLinkQuota, None$.MODULE$);
        ClusterLinkTestHarness destCluster = destCluster();
        ConfluentAdmin createConfluentAdminClient = destCluster.createConfluentAdminClient(destCluster.createConfluentAdminClient$default$1());
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.fetch.response.total.bytes", "10000"), AlterConfigOp.OpType.SET);
        createConfluentAdminClient.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) colonVar.map(configResource -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{alterConfigOp}))).asJavaCollection());
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava()).all().get();
        verifyFetchResponseSize(verifyDestinationClusterLinkQuota, new Some(new FetchResponseSize(5000, 10000)));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDestinationClusterLinkBrokerLevelQuota(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        verifyDestinationClusterLinkQuota(((SeqLike) ((TraversableLike) destCluster().brokers().map(kafkaBroker -> {
            return BoxesRunTime.boxToInteger($anonfun$testDestinationClusterLinkBrokerLevelQuota$1(kafkaBroker));
        }, Buffer$.MODULE$.canBuildFrom())).map(obj -> {
            return $anonfun$testDestinationClusterLinkBrokerLevelQuota$2(BoxesRunTime.unboxToInt(obj));
        }, Buffer$.MODULE$.canBuildFrom())).toSeq(), ConfluentConfigs.ClusterLinkQuotaMode.CLUSTER_LINK_ONLY);
    }

    public UUID verifyDestinationClusterLinkQuota(Seq<ConfigResource> seq, ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        ConfluentAdmin createConfluentAdminClient = destCluster2.createConfluentAdminClient(destCluster2.createConfluentAdminClient$default$1());
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.replication.quota.mode", clusterLinkQuotaMode.toString()), AlterConfigOp.OpType.SET);
        createConfluentAdminClient.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) seq.map(configResource -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{alterConfigOp}))).asJavaCollection());
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava()).all().get();
        verifyQuota(j -> {
            setQuota$2(j, seq, createConfluentAdminClient);
        }, () -> {
            return this.destClusterLinkReplicasThrottled();
        }, true);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyQuotaMode(clusterLinkQuotaMode);
        return createClusterLink;
    }

    public void verifyQuotaMode(ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createConfluentAdminClient(sourceCluster.createConfluentAdminClient$default$1()).incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{new AlterConfigOp(new ConfigEntry("confluent.cluster.link.replication.quota.mode", clusterLinkQuotaMode.toString()), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("producer_byte_rate", "100000"), AlterConfigOp.OpType.SET)}))).asJavaCollection())).all().get();
        KafkaBroker partitionLeader = sourceCluster().partitionLeader(new TopicPartition(topic(), 0));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyQuotaMode$1(partitionLeader, clusterLinkQuotaMode)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$verifyQuotaMode$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(20);
        verifyClusterLinkQuotaMetrics(new $colon.colon(partitionLeader, Nil$.MODULE$), clusterLinkQuotaMode.equals(ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean destClusterLinkReplicasThrottled() {
        return yammerMetricMaxValue("kafka.server:type=ReplicaManager,name=ThrottledClusterLinkReplicasPerSec", None$.MODULE$) > 0.0d;
    }

    private void verifyFetchResponseSize(UUID uuid, Option<FetchResponseSize> option) {
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) ((IterableLike) ((TraversableLike) destCluster().brokers().map(kafkaBroker -> {
            return (ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get();
        }, Buffer$.MODULE$.canBuildFrom())).filter(clusterLinkFetcherManager2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyFetchResponseSize$2(clusterLinkFetcherManager2));
        })).head();
        LeaderEndPoint leader = ((ClusterLinkFetcherThread) ((HashMap) TestUtils.fieldValue(clusterLinkFetcherManager, AbstractFetcherManager.class, "fetcherThreadMap")).values().head()).leader();
        Object orElse = option.map(fetchResponseSize -> {
            return BoxesRunTime.boxToInteger(fetchResponseSize.perPartitionSize());
        }).getOrElse(() -> {
            return clusterLinkFetcherManager.currentConfig().replicaFetchMaxBytes();
        });
        Object orElse2 = option.map(fetchResponseSize2 -> {
            return BoxesRunTime.boxToInteger(fetchResponseSize2.responseSize());
        }).getOrElse(() -> {
            return clusterLinkFetcherManager.currentConfig().replicaFetchResponseMaxBytes();
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int fetchSize$1 = fetchSize$1(leader);
            Integer boxToInteger = BoxesRunTime.boxToInteger(fetchSize$1);
            if ($anonfun$verifyFetchResponseSize$8(orElse, fetchSize$1)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        Assertions.assertEquals(orElse, BoxesRunTime.boxToInteger(fetchSize$1(leader)));
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long computeUntilTrue$default$22 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$32 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            int fetchResponseSize$1 = fetchResponseSize$1(leader);
            Integer boxToInteger2 = BoxesRunTime.boxToInteger(fetchResponseSize$1);
            if ($anonfun$verifyFetchResponseSize$10(orElse2, fetchResponseSize$1)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger2), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + computeUntilTrue$default$22) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger2), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$22), computeUntilTrue$default$32));
            }
        }
        Assertions.assertEquals(orElse2, BoxesRunTime.boxToInteger(fetchResponseSize$1(leader)));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDestinationClusterLinkQuotaWithBrokerRestart(String str) {
        Tuple2 $minus$greater$extension;
        Assumptions.assumeFalse(useSourceInitiatedLink());
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        ObjectRef create = ObjectRef.create(destCluster2.createConfluentAdminClient(destCluster2.createConfluentAdminClient$default$1()));
        ((ConfluentAdmin) create.elem).incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.ClusterLinkIoMaxBytesPerSecondProp(), "100"), AlterConfigOp.OpType.SET)}))).asJavaCollection())).all().get();
        ((ConfluentAdmin) create.elem).close();
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Tuple2<Object, Object> shutdownLeader = destCluster().shutdownLeader(topicPartition);
        if (shutdownLeader == null) {
            throw new MatchError((Object) null);
        }
        int _1$mcI$sp = shutdownLeader._1$mcI$sp();
        Tuple2<Object, Object> waitForLeaderChange = destCluster().waitForLeaderChange(topicPartition, _1$mcI$sp, shutdownLeader._2$mcI$sp());
        if (waitForLeaderChange == null) {
            throw new MatchError((Object) null);
        }
        int _1$mcI$sp2 = waitForLeaderChange._1$mcI$sp();
        int _2$mcI$sp = waitForLeaderChange._2$mcI$sp();
        destCluster().startBroker(_1$mcI$sp);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            scala.collection.immutable.Set $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1 = $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1(this, topicPartition);
            if ($anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$3(_1$mcI$sp, $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertTrue($minus$greater$extension._2$mcZ$sp(), new StringBuilder(42).append("Broker ").append(_1$mcI$sp).append(" is not part of ISR ").append((scala.collection.immutable.Set) $minus$greater$extension._1()).append(" for partition ").append(topicPartition).toString());
        destCluster().updateBootstrapServers();
        ClusterLinkTestHarness destCluster3 = destCluster();
        create.elem = destCluster3.createConfluentAdminClient(destCluster3.createConfluentAdminClient$default$1());
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        if (testUtils$2 == null) {
            throw null;
        }
        LongRef create2 = LongRef.create(1L);
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$4(create, topicPartition);
                Assertions.assertEquals(destCluster().waitForLeaderChange(topicPartition, _1$mcI$sp2, _2$mcI$sp)._1$mcI$sp(), _1$mcI$sp, "Preferred leader not elected");
                ClusterLinkTestHarness sourceCluster2 = sourceCluster();
                produceUntil(sourceCluster2.createProducer(sourceCluster2.createProducer$default$1(), sourceCluster2.createProducer$default$2(), sourceCluster2.createProducer$default$3()), () -> {
                    return this.destClusterLinkReplicasThrottled();
                }, "Destination quota not applied after broker restart");
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis2 > 15000) {
                    throw e;
                }
                if (testUtils$2.logger().underlying().isInfoEnabled()) {
                    testUtils$2.logger().underlying().info(testUtils$2.msgWithLogIdent(TestUtils$.$anonfun$retry$1(create2)));
                }
                Thread.sleep(create2.elem);
                create2.elem += package$.MODULE$.min(create2.elem, 1000L);
            }
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDestinationLagLinkFetcherThrottle(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        numPartitions_$eq(2);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "-2")})), destCluster2.alterClusterLink$default$3());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()) == 0.0d);
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "-1")})), destCluster3.alterClusterLink$default$3());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()) == 0.0d);
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "10485760")})), destCluster4.alterClusterLink$default$3());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()) == 0.0d);
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "0")})), destCluster5.alterClusterLink$default$3());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()) > 0.0d);
        Assertions.assertEquals(2.0d, totalKafkaMetricValue(destCluster().aliveServers(), "link-fetcher-count", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAddPartitions(String str) {
        Tuple2 $minus$greater$extension;
        if (str.equals("kraft")) {
            Assumptions.assumeFalse(useSourceInitiatedLink());
        }
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "1000")})));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        numPartitions_$eq(4);
        sourceCluster().createPartitions(topic(), numPartitions());
        produceToSourceCluster(8);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testAddPartitions$1 = $anonfun$testAddPartitions$1(this);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testAddPartitions$1);
            if ($anonfun$testAddPartitions$2(this, $anonfun$testAddPartitions$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(numPartitions(), $minus$greater$extension._1$mcI$sp());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyAddPartitionMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAlterClusterLinkConfigs(String str) {
        Tuple2 $minus$greater$extension;
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        numPartitions_$eq(8);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), sourceLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness connectingCluster = super.connectingCluster();
        ClusterLinkTestHarness destCluster2 = destCluster();
        ClusterLinkTestHarness destCluster3 = (connectingCluster != null ? !connectingCluster.equals(destCluster2) : destCluster2 != null) ? destCluster() : sourceCluster();
        connectingCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "60000")})), connectingCluster.alterClusterLink$default$3());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertEquals("60000", connectingCluster.describeClusterLink(linkName()).get("metadata.max.age.ms").value());
        destCluster3.killAllBrokers();
        destCluster3.startAllBrokers();
        connectingCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), destCluster3.bootstrapServers(destCluster3.bootstrapServers$default$1()))})), connectingCluster.alterClusterLink$default$3());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            ClusterLinkTestHarness destCluster4 = this.destCluster();
            destCluster4.alterClusterLink(this.linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkModeProp()), "SOURCE")})), destCluster4.alterClusterLink$default$3());
        });
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            connectingCluster.alterClusterLink(this.linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConnectionModeProp()), "INBOUND")})), connectingCluster.alterClusterLink$default$3());
        });
        Assertions.assertEquals(LinkMode$Destination$.MODULE$, ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().linkMode());
        Assertions.assertEquals(ConnectionMode$Outbound$.MODULE$, ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) connectingCluster.brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().connectionMode());
        String sb = useSourceInitiatedLink() ? new StringBuilder(23).append(ClusterLinkConfig$.MODULE$.LocalPrefix()).append("ssl.truststore.location").toString() : "ssl.truststore.location";
        File file = new File(connectingCluster.describeClusterLink(linkName()).get(sb).value());
        File createTempFile = File.createTempFile("truststore", ".jks");
        Files.copy(file.toPath(), createTempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
        connectingCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sb), createTempFile.getAbsolutePath())})), connectingCluster.alterClusterLink$default$3());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        int brokerId = ((KafkaBroker) destCluster().brokers().head()).config().brokerId();
        destCluster().alterPartitionAssignment((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) partitions(partitions$default$1()).map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Optional.of(new NewPartitionReassignment(Collections.singletonList(Predef$.MODULE$.int2Integer(brokerId)))));
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava());
        destCluster().ensureConsistentKRaftMetadata();
        int brokerId2 = ((KafkaBroker) sourceCluster().brokers().head()).config().brokerId();
        sourceCluster().alterPartitionAssignment((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) partitions(partitions$default$1()).map(topicPartition2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Optional.of(new NewPartitionReassignment(Collections.singletonList(Predef$.MODULE$.int2Integer(brokerId2)))));
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava());
        sourceCluster().ensureConsistentKRaftMetadata();
        Assertions.assertEquals(1, maxFetcherThreadCount(createClusterLink));
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp()), "3")})), destCluster4.alterClusterLink$default$3());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int maxFetcherThreadCount = maxFetcherThreadCount(createClusterLink);
            Integer boxToInteger = BoxesRunTime.boxToInteger(maxFetcherThreadCount);
            if ($anonfun$testAlterClusterLinkConfigs$6(maxFetcherThreadCount)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(3, $minus$greater$extension._1$mcI$sp());
        produceToSourceCluster(8);
        consume(destCluster(), consume$default$2());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testOffsetMigrationWithAddedConsumerGroup(String str) {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(338).append("|{\n          |\"groupFilters\": [\n          |  {\n          |     \"name\": \"").append(consumerGroup()).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append("testGroup2").append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString())).stripMargin();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup());
        scala.collection.Map<String, String> map = (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), stripMargin), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))}));
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), map, destCluster2.alterClusterLink$default$3());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        commitOffsets(sourceCluster(), topic(), 0, 20L, "testGroup2");
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, "testGroup2");
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        verifyConsumerOffsetMigrationMetrics();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testOffsetMigrationWithAddedTopic(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic("linkedTopic2", numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic("linkedTopic2", (short) 2, linkName(), destCluster2.linkTopic$default$4(), clusterLinkPrefix());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        commitOffsets(sourceCluster(), "linkedTopic2", 0, 20L, consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append("linkedTopic2").toString(), 0, 20L, consumerGroup());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        verifyConsumerOffsetMigrationMetrics();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), false, destCluster3.unlinkTopic$default$4());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), consumerGroupFilter(consumerGroup()).replaceAll("include", "exclude"))})));
        Properties properties = new Properties();
        properties.setProperty("group.id", consumerGroup());
        ClusterLinkTestHarness destCluster4 = destCluster();
        KafkaConsumer createConsumer = destCluster4.createConsumer(destCluster4.createConsumer$default$1(), destCluster4.createConsumer$default$2(), properties, destCluster4.createConsumer$default$4());
        createConsumer.subscribe(Collections.singleton("linkedTopic2"));
        do {
            createConsumer.poll(Duration.ofMillis(10L));
        } while (createConsumer.assignment().isEmpty());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.submit(() -> {
            return createConsumer.poll(Duration.ofMillis(10L));
        });
        try {
            ClusterLinkTestHarness destCluster5 = destCluster();
            destCluster5.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append("linkedTopic2").toString(), linkName(), destCluster5.unlinkTopic$default$3(), destCluster5.unlinkTopic$default$4());
            waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append("linkedTopic2").toString());
            newSingleThreadExecutor.shutdownNow();
            ClusterLinkTestHarness destCluster6 = destCluster();
            destCluster6.deleteClusterLink(linkName(), destCluster6.deleteClusterLink$default$2(), destCluster6.deleteClusterLink$default$3());
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDestReadOnly(String str) {
        if (str.equals("kraft")) {
            Assumptions.assumeFalse(useSourceInitiatedLink());
        }
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness destCluster2 = destCluster();
        KafkaProducer<byte[], byte[]> createProducer = destCluster2.createProducer(destCluster2.createProducer$default$1(), destCluster2.createProducer$default$2(), destCluster2.createProducer$default$3());
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createProducer.send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(0L), "key".getBytes(), "value".getBytes())).get(15L, TimeUnit.SECONDS);
        });
        Assertions.assertTrue(executionException.getCause() instanceof InvalidRequestException);
        Assertions.assertTrue(executionException.getMessage().contains("Cannot append records to read-only mirror topic"), new StringBuilder(17).append("Unexpected error ").append(executionException.getMessage()).toString());
        Assertions.assertThrows(InvalidPartitionsException.class, () -> {
            this.destCluster().createPartitions(this.topic(), 8);
        });
        destCluster().withAdmin(confluentAdmin -> {
            $anonfun$testDestReadOnly$3(this, confluentAdmin);
            return BoxedUnit.UNIT;
        });
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4());
        destCluster().verifyTopicWritable(topic(), numPartitions());
        produceRecords(createProducer, topic(), 10, produceRecords$default$4());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDeleteClusterLinkCleanup(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), createClusterLink$default$3(), createClusterLink$default$4());
        scala.collection.immutable.Set set = ((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 5).map(obj -> {
            return $anonfun$testDeleteClusterLinkCleanup$1(BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toSet();
        set.foreach(str2 -> {
            ClusterLinkTestHarness sourceCluster = this.sourceCluster();
            sourceCluster.createTopic(str2, this.numPartitions(), this.replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
            ClusterLinkTestHarness destCluster = this.destCluster();
            return destCluster.linkTopic(str2, this.replicationFactor(), this.linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        });
        int size = set.size();
        ClusterLinkTestHarness destCluster = destCluster();
        Assertions.assertEquals(size, destCluster.listMirrorTopics(destCluster.listMirrorTopics$default$1()).size());
        Assertions.assertThrows(ClusterLinkInUseException.class, () -> {
            ClusterLinkTestHarness destCluster2 = this.destCluster();
            destCluster2.deleteClusterLink(this.linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
        });
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), true, destCluster2.deleteClusterLink$default$3());
        ClusterLinkTestHarness destCluster3 = destCluster();
        Assertions.assertTrue(destCluster3.listMirrorTopics(destCluster3.listMirrorTopics$default$1()).isEmpty());
        ClusterLinkTestHarness destCluster4 = destCluster();
        Assertions.assertTrue(destCluster4.listClusterLinks(destCluster4.listClusterLinks$default$1()).isEmpty());
        Assertions.assertThrows(ClusterLinkNotFoundException.class, () -> {
            this.destCluster().describeClusterLink(this.linkName());
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMirroredTopicMarkedForDelete(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testMirroredTopicMarkedForDelete$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        int _1$mcI$sp = destCluster().shutdownLeader((TopicPartition) indexedSeq.head())._1$mcI$sp();
        Seq<KafkaBroker> seq = ((SeqLike) destCluster().brokers().filter(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirroredTopicMarkedForDelete$2(_1$mcI$sp, kafkaBroker));
        })).toSeq();
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), false);
        seq.foreach(kafkaBroker2 -> {
            $anonfun$testMirroredTopicMarkedForDelete$3(indexedSeq, createClusterLink, kafkaBroker2);
            return BoxedUnit.UNIT;
        });
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), seq);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testPauseTopic(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> {
            ClusterLinkTestHarness destCluster = this.destCluster();
            destCluster.pauseTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), destCluster.pauseTopic$default$2());
        });
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.pauseTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), destCluster2.pauseTopic$default$2());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.pauseTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), destCluster3.pauseTopic$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testPauseTopic$2(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testPauseTopic$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        verifyMirrorTopicCountMetric("mirror-topic-count", (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("state"), "PausedMirror")})), verifyMirrorTopicCountMetric$default$3(), verifyMirrorTopicCountMetric$default$4());
        Seq leaderOffsets$1 = leaderOffsets$1();
        produceToSourceCluster(8);
        Thread.sleep(1000L);
        Assertions.assertEquals(leaderOffsets$1, leaderOffsets$1());
        destCluster().pauseTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), false);
        destCluster().pauseTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), false);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).foreach$mVc$sp(i -> {
            ReplicaStatus.MirrorInfo.State state = ReplicaStatus.MirrorInfo.State.ACTIVE;
            ClusterLinkTestHarness destCluster4 = this.destCluster();
            Assertions.assertEquals(state, ((ReplicaStatus.MirrorInfo) ((ReplicaStatus) ((IterableLike) destCluster4.replicaStatus(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), i, destCluster4.replicaStatus$default$3()).filter(replicaStatus -> {
                return BoxesRunTime.boxToBoolean(replicaStatus.isLeader());
            })).head()).mirrorInfo().get()).state());
        });
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster4.unlinkTopic$default$3(), destCluster4.unlinkTopic$default$4());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            ClusterLinkTestHarness destCluster5 = this.destCluster();
            destCluster5.pauseTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), destCluster5.pauseTopic$default$2());
        });
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.deleteClusterLink(linkName(), destCluster5.deleteClusterLink$default$2(), destCluster5.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testPauseClusterLink(String str) {
        Tuple2 $minus$greater$extension;
        Tuple2 $minus$greater$extension2;
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        int numPartitions = numPartitions();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions, replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        sourceCluster().alterTopic(topic(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.DeleteRetentionMsProp()), "10000")})));
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        commitOffsets(sourceCluster(), topic(), 0, 10, consumerGroup());
        verifyOffsetMigration(topic(), 0, 10, consumerGroup());
        verifyConsumerOffsetMigrationMetrics();
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})));
        int i = numPartitions + 2;
        sourceCluster().createPartitions(topic(), i);
        sourceCluster().alterTopic(topic(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.DeleteRetentionMsProp()), "20000")})));
        produceToSourceCluster(8);
        commitOffsets(sourceCluster(), topic(), 0, 20, consumerGroup());
        verifyPausedLinkMetrics();
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, waitUntilMirrorState$default$2());
        verifyDescribeLinksResult(ClusterLinkDescription.LinkState.PAUSED, ClusterLinkDescription.LinkState.PAUSED);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.pauseTopic(topic(), destCluster2.pauseTopic$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, waitUntilMirrorState$default$2());
        destCluster().pauseTopic(topic(), false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, waitUntilMirrorState$default$2());
        Assertions.assertThrows(ClusterLinkPausedException.class, () -> {
            ClusterLinkTestHarness destCluster3 = this.destCluster();
            destCluster3.linkTopic("paused-topic", this.replicationFactor(), this.linkName(), destCluster3.linkTopic$default$4(), destCluster3.linkTopic$default$5());
        });
        Thread.sleep(250L);
        Assertions.assertEquals(numPartitions, destCluster().describeTopic(topic()).partitions().size());
        Assertions.assertEquals("10000", destCluster().describeTopicConfig(topic()).get(LogConfig$.MODULE$.DeleteRetentionMsProp()).value());
        Assertions.assertEquals(10, destCluster().getOffset(topic(), 0, consumerGroup()));
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})));
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, waitUntilMirrorState$default$2());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testPauseClusterLink$2 = $anonfun$testPauseClusterLink$2(this);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testPauseClusterLink$2);
            if ($anonfun$testPauseClusterLink$3(i, $anonfun$testPauseClusterLink$2)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(i, $minus$greater$extension._1$mcI$sp());
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long computeUntilTrue$default$22 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$32 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            String $anonfun$testPauseClusterLink$4 = $anonfun$testPauseClusterLink$4(this);
            if ($anonfun$testPauseClusterLink$5("20000", $anonfun$testPauseClusterLink$4)) {
                $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testPauseClusterLink$4), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + computeUntilTrue$default$22) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testPauseClusterLink$4), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$22), computeUntilTrue$default$32));
            }
        }
        if ($minus$greater$extension2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals("20000", (String) $minus$greater$extension2._1());
        verifyOffsetMigration(topic(), 0, 20, consumerGroup());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testReplicaStatus(String str) {
        boolean z;
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Seq<ReplicaStatus> replicaStatus = sourceCluster().replicaStatus(topic(), 0, true);
        Assertions.assertEquals(2, replicaStatus.size());
        ReplicaStatus replicaStatus2 = (ReplicaStatus) ((IterableLike) replicaStatus.filter(replicaStatus3 -> {
            return BoxesRunTime.boxToBoolean(replicaStatus3.isLeader());
        })).head();
        Assertions.assertEquals(Optional.empty(), replicaStatus2.linkName());
        Assertions.assertEquals(Optional.empty(), replicaStatus2.mirrorInfo());
        ReplicaStatus replicaStatus4 = (ReplicaStatus) ((IterableLike) replicaStatus.filterNot(replicaStatus5 -> {
            return BoxesRunTime.boxToBoolean(replicaStatus5.isLeader());
        })).head();
        Assertions.assertEquals(Optional.empty(), replicaStatus4.linkName());
        Assertions.assertEquals(Optional.empty(), replicaStatus4.mirrorInfo());
        long milliseconds = Time.SYSTEM.milliseconds();
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        int i = destCluster().isKRaftTest() ? 0 : 1;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testReplicaStatus$3(this, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testReplicaStatus$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Seq<ReplicaStatus> replicaStatus6 = destCluster().replicaStatus(topic(), 0, true);
        Assertions.assertEquals(4, replicaStatus6.size());
        ReplicaStatus replicaStatus7 = (ReplicaStatus) ((IterableLike) replicaStatus6.filter(replicaStatus8 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$5(replicaStatus8));
        })).head();
        Assertions.assertTrue(replicaStatus7.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo = (ReplicaStatus.MirrorInfo) replicaStatus7.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, mirrorInfo.state());
        Assertions.assertTrue(milliseconds <= mirrorInfo.lastFetchTimeMs(), new StringBuilder(14).append("Expected: ").append(milliseconds).append(" <= ").append(mirrorInfo.lastFetchTimeMs()).toString());
        Assertions.assertEquals(10, mirrorInfo.lastFetchSourceHighWatermark());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        Assertions.assertEquals(Optional.empty(), ((ReplicaStatus) ((IterableLike) replicaStatus6.filter(replicaStatus9 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$6(replicaStatus9));
        })).head()).mirrorInfo());
        ReplicaStatus replicaStatus10 = (ReplicaStatus) ((IterableLike) replicaStatus6.filter(replicaStatus11 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$7(replicaStatus11));
        })).head();
        Assertions.assertEquals(linkName(), replicaStatus10.linkName().get());
        Assertions.assertEquals(Optional.empty(), replicaStatus10.mirrorInfo());
        ReplicaStatus replicaStatus12 = (ReplicaStatus) ((IterableLike) replicaStatus6.filter(replicaStatus13 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$8(replicaStatus13));
        })).head();
        Assertions.assertEquals(linkName(), replicaStatus12.linkName().get());
        Assertions.assertEquals(Optional.empty(), replicaStatus12.mirrorInfo());
        long lastFetchTimeMs = mirrorInfo.lastFetchTimeMs();
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Seq<ReplicaStatus> replicaStatus14 = destCluster().replicaStatus(topic(), 0, false);
        Assertions.assertEquals(2, replicaStatus14.size());
        ReplicaStatus replicaStatus15 = (ReplicaStatus) ((IterableLike) replicaStatus14.filter(replicaStatus16 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$9(replicaStatus16));
        })).head();
        Assertions.assertTrue(replicaStatus15.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo2 = (ReplicaStatus.MirrorInfo) replicaStatus15.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, mirrorInfo2.state());
        Assertions.assertTrue(lastFetchTimeMs <= mirrorInfo2.lastFetchTimeMs(), new StringBuilder(14).append("Expected: ").append(lastFetchTimeMs).append(" <= ").append(mirrorInfo2.lastFetchTimeMs()).toString());
        Assertions.assertEquals(10 * 2, mirrorInfo2.lastFetchSourceHighWatermark());
        Assertions.assertEquals(Optional.empty(), ((ReplicaStatus) ((IterableLike) replicaStatus14.filter(replicaStatus17 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$10(replicaStatus17));
        })).head()).mirrorInfo());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(topic(), linkName(), destCluster2.unlinkTopic$default$3(), destCluster2.unlinkTopic$default$4());
        ObjectRef create = ObjectRef.create((Object) null);
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testReplicaStatus$11(this, create)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testReplicaStatus$12());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        ReplicaStatus replicaStatus18 = (ReplicaStatus) ((IterableLike) ((Seq) create.elem).filter(replicaStatus19 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$13(replicaStatus19));
        })).head();
        Assertions.assertTrue(replicaStatus18.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo3 = (ReplicaStatus.MirrorInfo) replicaStatus18.mirrorInfo().get();
        ReplicaStatus.MirrorInfo.State state = mirrorInfo3.state();
        ReplicaStatus.MirrorInfo.State state2 = ReplicaStatus.MirrorInfo.State.PENDING_STOPPED;
        if (state != null ? !state.equals(state2) : state2 != null) {
            ReplicaStatus.MirrorInfo.State state3 = mirrorInfo3.state();
            ReplicaStatus.MirrorInfo.State state4 = ReplicaStatus.MirrorInfo.State.STOPPED;
            if (state3 != null ? !state3.equals(state4) : state4 != null) {
                z = false;
                Assertions.assertTrue(z);
                Assertions.assertEquals(-1L, mirrorInfo3.lastFetchTimeMs());
                Assertions.assertEquals(-1L, mirrorInfo3.lastFetchSourceHighWatermark());
                waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
                Assertions.assertEquals(Optional.empty(), ((ReplicaStatus) ((IterableLike) ((Seq) create.elem).filter(replicaStatus20 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$14(replicaStatus20));
                })).head()).mirrorInfo());
                ClusterLinkTestHarness destCluster3 = destCluster();
                destCluster3.deleteClusterLink(linkName(), destCluster3.deleteClusterLink$default$2(), destCluster3.deleteClusterLink$default$3());
            }
        }
        z = true;
        Assertions.assertTrue(z);
        Assertions.assertEquals(-1L, mirrorInfo3.lastFetchTimeMs());
        Assertions.assertEquals(-1L, mirrorInfo3.lastFetchSourceHighWatermark());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        Assertions.assertEquals(Optional.empty(), ((ReplicaStatus) ((IterableLike) ((Seq) create.elem).filter(replicaStatus202 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$14(replicaStatus202));
        })).head()).mirrorInfo());
        ClusterLinkTestHarness destCluster32 = destCluster();
        destCluster32.deleteClusterLink(linkName(), destCluster32.deleteClusterLink$default$2(), destCluster32.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAutoMirroring(String str) {
        autoMirrorTopic(syncPeriod());
        String sb = new StringBuilder(2).append(topic()).append("-2").toString();
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), includeAllTopicsFilter())})), destCluster.alterClusterLink$default$3());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(String.valueOf(sb), sourceCluster.createTopic$default$2(), sourceCluster.createTopic$default$3(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(sb).toString());
        Thread.sleep(syncPeriod() * 5);
        Assertions.assertEquals(0.0d, totalKafkaMetricValue(destCluster().aliveServers(), "auto-mirror-create-failed-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()));
        Assertions.assertEquals(2.0d, totalKafkaMetricValue(destCluster().aliveServers(), "auto-mirror-created-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()));
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(339).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"*\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append(sb).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"exclude\"\n          |  }\n          |]}\n          |").toString())).stripMargin();
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), stripMargin)})), destCluster2.alterClusterLink$default$3());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(sb).toString(), true);
        Thread.sleep(1000L);
        ClusterLinkTestHarness destCluster3 = destCluster();
        Assertions.assertFalse(destCluster3.listMirrorTopics(destCluster3.listMirrorTopics$default$1()).contains(new StringBuilder(0).append(clusterLinkPrefix()).append(sb).toString()));
        String sb2 = new StringBuilder(9).append(topic()).append("-conflict").toString();
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.createTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(sb2).toString(), destCluster4.createTopic$default$2(), destCluster4.createTopic$default$3(), destCluster4.createTopic$default$4(), destCluster4.createTopic$default$5(), destCluster4.createTopic$default$6());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic(sb2, sourceCluster2.createTopic$default$2(), sourceCluster2.createTopic$default$3(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(sb2).toString(), false);
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(sb2).toString());
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(sb2).toString(), linkName(), false, destCluster5.unlinkTopic$default$4());
        ClusterLinkTestHarness destCluster6 = destCluster();
        destCluster6.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster6.unlinkTopic$default$3(), destCluster6.unlinkTopic$default$4());
        ClusterLinkTestHarness destCluster7 = destCluster();
        destCluster7.deleteClusterLink(linkName(), destCluster7.deleteClusterLink$default$2(), destCluster7.deleteClusterLink$default$3());
    }

    public void autoMirrorTopic(long j) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties destLinkPropsForAutoMirroring = destLinkPropsForAutoMirroring(topicFilter(), true);
        destLinkPropsForAutoMirroring.setProperty(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp(), Long.toString(j * 10));
        UUID createClusterLink = createClusterLink(linkName(), destLinkPropsForAutoMirroring, createClusterLink$default$3(), createClusterLink$default$4());
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyAutoMirroringSuccessMetric();
    }

    public Properties destLinkPropsForAutoMirroring(String str, boolean z) {
        scala.collection.mutable.Map apply = Map$.MODULE$.apply(Nil$.MODULE$);
        apply.put(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        apply.put(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), str);
        apply.put("metadata.max.age.ms", Long.toString(syncPeriod()));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty() && z) {
            apply.put(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        return destLinkProps(apply);
    }

    public boolean destLinkPropsForAutoMirroring$default$2() {
        return false;
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLastFetchedOffsetPromotedMirrorTopicDescription(String str) {
        testLastFetchedOffsetStoppedMirrorTopicDescription(testLastFetchedOffsetStoppedMirrorTopicDescription$default$1());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLastFetchedOffsetFailedOverMirrorTopicDescription(String str) {
        testLastFetchedOffsetStoppedMirrorTopicDescription(false);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAutoMirroringNoOverlappingTopicFilters(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        Properties destLinkPropsForAutoMirroring = destLinkPropsForAutoMirroring(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(335).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"").append(topic()).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |    \"name\": \"").append("linkedTopicTwo").append("\",\n          |    \"patternType\": \"literal\",\n          |    \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString())).stripMargin(), destLinkPropsForAutoMirroring$default$2());
        createClusterLink(linkName(), destLinkPropsForAutoMirroring, createClusterLink$default$3(), createClusterLink$default$4());
        String sb = new StringBuilder(2).append(linkName()).append("-2").toString();
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink(sb, destLinkPropsForAutoMirroring, this.createClusterLink$default$3(), this.createClusterLink$default$4());
        });
        Properties destLinkPropsForAutoMirroring2 = destLinkPropsForAutoMirroring(includeAllTopicsFilter(), destLinkPropsForAutoMirroring$default$2());
        UUID createClusterLink = createClusterLink(sb, destLinkPropsForAutoMirroring2, createClusterLink$default$3(), createClusterLink$default$4());
        Assertions.assertEquals("Found overlapping topic filter(s): new topic filter linkedTopic overlaps with existing topic filter from cluster link testLink.", Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            ClusterLinkTestHarness destCluster = this.destCluster();
            destCluster.alterClusterLink(sb, (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), this.topicFilter())})), destCluster.alterClusterLink$default$3());
        }).getMessage());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(includeAllTopicsFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.deleteClusterLink(linkName(), destCluster.deleteClusterLink$default$2(), destCluster.deleteClusterLink$default$3());
        if (useSourceInitiatedLink()) {
            ClusterLinkTestHarness sourceCluster = sourceCluster();
            sourceCluster.deleteClusterLink(linkName(), sourceCluster.deleteClusterLink$default$2(), sourceCluster.deleteClusterLink$default$3());
            ClusterLinkTestHarness sourceCluster2 = sourceCluster();
            Assertions.assertTrue(sourceCluster2.listClusterLinks(sourceCluster2.listClusterLinks$default$1()).size() == 1);
            ClusterLinkTestHarness sourceCluster3 = sourceCluster();
            Assertions.assertTrue(sourceCluster3.describeClusterLinks(sourceCluster3.describeClusterLinks$default$1()).size() == 1);
        }
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(sb, (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), topicFilter())})), destCluster2.alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(topicFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.alterClusterLink(sb, (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), topicFilter())})), destCluster3.alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(topicFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        createClusterLink(linkName(), destLinkPropsForAutoMirroring2, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.alterClusterLink(sb, (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), includeAllTopicsFilter())})), destCluster4.alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(includeAllTopicsFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.deleteClusterLink(linkName(), destCluster5.deleteClusterLink$default$2(), destCluster5.deleteClusterLink$default$3());
        ClusterLinkTestHarness destCluster6 = destCluster();
        destCluster6.deleteClusterLink(sb, destCluster6.deleteClusterLink$default$2(), destCluster6.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAutoMirroringAllowsLinkConfigUpdate(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        createClusterLink(linkName(), destLinkPropsForAutoMirroring(topicFilter(), destLinkPropsForAutoMirroring$default$2()), createClusterLink$default$3(), createClusterLink$default$4());
        waitForAutoMirrorCreation(topic());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), destCluster.alterClusterLink$default$3());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic("linkedTopicTwo", numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        produceToSourceCluster(20);
        ObjectRef create = ObjectRef.create(destLinkPropsForAutoMirroring(topicFilter(), destLinkPropsForAutoMirroring$default$2()));
        String sb = new StringBuilder(2).append(linkName()).append("-2").toString();
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink(sb, (Properties) create.elem, this.createClusterLink$default$3(), this.createClusterLink$default$4());
        });
        create.elem = destLinkPropsForAutoMirroring(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(197).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"").append("linkedTopicTwo").append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString())).stripMargin(), destLinkPropsForAutoMirroring$default$2());
        createClusterLink(sb, (Properties) create.elem, createClusterLink$default$3(), createClusterLink$default$4());
        waitForAutoMirrorCreation(topic());
        waitForAutoMirrorCreation("linkedTopicTwo");
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), destCluster2.alterClusterLink$default$3());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), false);
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.unlinkTopic("linkedTopicTwo", sb, destCluster4.unlinkTopic$default$3(), false);
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.deleteClusterLink(linkName(), true, destCluster5.deleteClusterLink$default$3());
        ClusterLinkTestHarness destCluster6 = destCluster();
        destCluster6.deleteClusterLink(sb, true, destCluster6.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMirrorTopicCountForMultipleLinks(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        String sb = new StringBuilder(3).append(linkName()).append("Two").toString();
        String sb2 = new StringBuilder(3).append(topic()).append("Two").toString();
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic(sb2, numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        createClusterLink(sb, createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(sb2, replicationFactor(), sb, destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        verifyMirrorTopicCount("mirror-topic-count", (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("state"), TopicLinkMirror$.MODULE$.name())})), 1, linkName(), verifyMirrorTopicCount$default$5());
        verifyMirrorTopicCount("mirror-topic-count", (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("state"), TopicLinkMirror$.MODULE$.name())})), 1, sb, verifyMirrorTopicCount$default$5());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAutoMirroringUpdateExistingLink(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), Long.toString(syncPeriod()))})));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        scala.collection.mutable.Map apply = Map$.MODULE$.apply(Nil$.MODULE$);
        apply.put(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        apply.put(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), topicFilter());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            ClusterLinkTestHarness destCluster2 = this.destCluster();
            destCluster2.alterClusterLink(this.linkName(), apply, destCluster2.alterClusterLink$default$3());
        });
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), apply, destCluster2.alterClusterLink$default$3());
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), false);
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAutoMirroringAddingAdditionalTopic(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), destLinkPropsForAutoMirroring(topicFilter(), true), createClusterLink$default$3(), createClusterLink$default$4());
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic("linkedTopic2", numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(338).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"").append(topic()).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append("linkedTopic2").append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString())).stripMargin();
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), stripMargin)})), destCluster.alterClusterLink$default$3());
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append("linkedTopic2").toString());
        destCluster().unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), false, false);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append("linkedTopic2").toString(), linkName(), destCluster2.unlinkTopic$default$3(), false);
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.deleteClusterLink(linkName(), true, destCluster3.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAutoMirroringNoExistingTopic(String str) {
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), destCluster.createTopic$default$2(), destCluster.createTopic$default$3(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        Properties destLinkPropsForAutoMirroring = destLinkPropsForAutoMirroring(topicFilter(), true);
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink(this.linkName(), destLinkPropsForAutoMirroring, this.createClusterLink$default$3(), this.createClusterLink$default$4());
        });
        destLinkPropsForAutoMirroring.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), includeAllTopicsFilter());
        UUID createClusterLink = createClusterLink(linkName(), destLinkPropsForAutoMirroring, createClusterLink$default$3(), createClusterLink$default$4());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            ClusterLinkTestHarness destCluster2 = this.destCluster();
            destCluster2.alterClusterLink(this.linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), this.topicFilter())})), destCluster2.alterClusterLink$default$3());
        });
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(includeAllTopicsFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), topicFilter())})), destCluster2.alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(topicFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), topicFilter())})), destCluster3.alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(topicFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    private void testLastFetchedOffsetStoppedMirrorTopicDescription(boolean z) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        int i = destCluster().isKRaftTest() ? 0 : 1;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$1(this, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness destCluster2 = destCluster();
        ReplicaStatus replicaStatus = (ReplicaStatus) ((IterableLike) destCluster2.replicaStatus(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, destCluster2.replicaStatus$default$3()).filter(replicaStatus2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$3(replicaStatus2));
        })).head();
        Assertions.assertTrue(replicaStatus.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo = (ReplicaStatus.MirrorInfo) replicaStatus.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, mirrorInfo.state());
        Assertions.assertEquals(10, mirrorInfo.lastFetchSourceHighWatermark());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), z);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        ClusterLinkTestHarness destCluster4 = destCluster();
        ReplicaStatus replicaStatus3 = (ReplicaStatus) ((IterableLike) destCluster4.replicaStatus(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, destCluster4.replicaStatus$default$3()).filter(replicaStatus4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$4(replicaStatus4));
        })).head();
        Assertions.assertTrue(replicaStatus3.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo2 = (ReplicaStatus.MirrorInfo) replicaStatus3.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.STOPPED, mirrorInfo2.state());
        Assertions.assertEquals(-1L, mirrorInfo2.lastFetchSourceHighWatermark());
        MirrorTopicDescription describeMirrorTopic = destCluster().describeMirrorTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        Assertions.assertEquals(describeMirrorTopic.state(), MirrorTopicDescription.State.STOPPED);
        Assertions.assertEquals(1, describeMirrorTopic.stoppedLogEndOffsets().size());
        Assertions.assertEquals(10, Predef$.MODULE$.Long2long((Long) describeMirrorTopic.stoppedLogEndOffsets().get(0)));
    }

    private boolean testLastFetchedOffsetStoppedMirrorTopicDescription$default$1() {
        return true;
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDeleteAutoMirroredTopics(String str) {
        autoMirrorTopic(syncPeriod());
        Assertions.assertThrows(TopicDeletionDisabledException.class, () -> {
            this.destCluster().deleteTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), false);
        });
        ClusterLinkTestHarness destCluster = destCluster();
        Assertions.assertTrue(destCluster.listMirrorTopics(destCluster.listMirrorTopics$default$1()).contains(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()));
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), destCluster2.unlinkTopic$default$4());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), false);
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        sourceCluster().deleteTopic(topic(), true);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.FAILED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        scala.collection.Map<String, String> map = (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(339).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"*\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append(topic()).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"exclude\"\n          |  }\n          |]}\n          |").toString())).stripMargin()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), String.valueOf(syncPeriod()))}));
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.alterClusterLink(linkName(), map, destCluster3.alterClusterLink$default$3());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster4 = destCluster();
        Assertions.assertFalse(destCluster4.listMirrorTopics(destCluster4.listMirrorTopics$default$1()).contains(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()));
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.deleteClusterLink(linkName(), destCluster5.deleteClusterLink$default$2(), destCluster5.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testIntervalChangeForPeriodicTasks(String str) {
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(300000));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp(), String.valueOf(300000));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), includeAllTopicsFilter());
        destLinkProps.setProperty("metadata.max.age.ms", String.valueOf(300000));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp()), String.valueOf(syncPeriod()))})));
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        sourceCluster().alterTopic(topic(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "80000000")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testIntervalChangeForPeriodicTasks$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testIntervalChangeForPeriodicTasks$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        verifyTopicConfigChangeMetrics();
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))})));
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), destCluster2.unlinkTopic$default$4());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), String.valueOf(syncPeriod()))})));
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "false")})));
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testExternalizedPasswordConfig(String str) {
        Assumptions.assumeFalse(useSourceInitiatedLink());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        destLinkProps.put("sasl.jaas.config", new StringOps("${file:%s:sasl.jaas.config}").format(Predef$.MODULE$.genericWrapArray(new Object[]{TestUtils.tempFile(new StringBuilder(17).append("sasl.jaas.config=").append(destLinkProps.getProperty("sasl.jaas.config")).toString()).getAbsolutePath()})));
        destLinkProps.setProperty("config.providers", "file");
        destLinkProps.setProperty("config.providers.file.class", FileConfigProvider.class.getName());
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifySaslJaasConfigEncrypted(createClusterLink);
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp()), "1000")})));
        verifySaslJaasConfigEncrypted(createClusterLink);
        produceToSourceCluster(20);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
        destLinkProps.setProperty("sasl.jaas.config", "someprovider:link.props:sasl.jaas.config");
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink("invalidLink1", destLinkProps, this.createClusterLink$default$3(), true);
        });
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink("invalidLink2", destLinkProps, this.createClusterLink$default$3(), false);
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMirrorFailoverWhenSourceIsUnavailable(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        destLinkProps.setProperty("request.timeout.ms", "1000");
        destLinkProps.setProperty("default.api.timeout.ms", "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "2");
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        int i = destCluster().isKRaftTest() ? 0 : 1;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMirrorFailoverWhenSourceIsUnavailable$1(this, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMirrorFailoverWhenSourceIsUnavailable$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        updateCredentials(useSourceInitiatedLink() ? destCluster() : sourceCluster());
        waitUntilOneOfMirrorStates((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new ReplicaStatus.MirrorInfo.State[]{ReplicaStatus.MirrorInfo.State.SOURCE_AUTHENTICATION_FAILED, ReplicaStatus.MirrorInfo.State.SOURCE_UNAVAILABLE})));
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        verifyDescribeLinksResult(ClusterLinkDescription.LinkState.UNAVAILABLE, ClusterLinkDescription.LinkState.ACTIVE);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), false);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testClusterLinkMetadataTopicCreation(String str) {
        Assumptions.assumeFalse(new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty());
        destCluster().killAllBrokers();
        destCluster().serverConfig().setProperty("confluent.cluster.link.metadata.topic.enable", "true");
        destCluster().serverConfig().setProperty("confluent.cluster.link.metadata.topic.partitions", "2");
        destCluster().serverConfig().setProperty("confluent.cluster.link.metadata.topic.replication.factor", "1");
        destCluster().restartDeadBrokers(true);
        destCluster().updateBootstrapServers();
        destCluster().serverConfig().remove("confluent.cluster.link.metadata.topic.enable");
        destCluster().serverConfig().remove("confluent.cluster.link.metadata.topic.partitions");
        destCluster().serverConfig().remove("confluent.cluster.link.metadata.topic.replication.factor");
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDescribeClusterLink(String str) {
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        verifyDescribeLinksResult(ClusterLinkDescription.LinkState.ACTIVE, ClusterLinkDescription.LinkState.ACTIVE);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        destCluster().updateZkLinkConfig(createClusterLink, ClusterLinkConfig$.MODULE$.ConnectionModeProp(), "invalid");
        ClusterLinkTestHarness destCluster2 = destCluster();
        ConfluentAdmin createConfluentAdminClient = destCluster2.createConfluentAdminClient(destCluster2.createConfluentAdminClient$default$1());
        waitForFailure(createConfluentAdminClient, FailureType$CorruptConfigs$.MODULE$);
        createConfluentAdminClient.close();
        ClusterLinkTestHarness destCluster3 = destCluster();
        Seq<ClusterLinkDescription> describeClusterLinks = destCluster3.describeClusterLinks(destCluster3.describeClusterLinks$default$1());
        Assertions.assertEquals(1, describeClusterLinks.size());
        Assertions.assertEquals(linkName(), ((ClusterLinkDescription) describeClusterLinks.head()).linkName());
        Assertions.assertEquals(ClusterLinkDescription.LinkState.FAILED, ((ClusterLinkDescription) describeClusterLinks.head()).linkState());
        Assertions.assertEquals(ClusterLinkDescription.LinkMode.UNKNOWN, ((ClusterLinkDescription) describeClusterLinks.head()).linkMode());
        Assertions.assertEquals(ClusterLinkDescription.ConnectionMode.UNKNOWN, ((ClusterLinkDescription) describeClusterLinks.head()).connectionMode());
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAlterClusterLinkWithInvalidConfigs(String str) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true")})));
        if (new StringOps(Predef$.MODULE$.augmentString(clusterLinkPrefix())).nonEmpty()) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        destCluster().updateZkLinkConfig(createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), true), ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), "{");
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), includeAllTopicsFilter().replace("topic", "group"))})), destCluster.alterClusterLink$default$3());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), clusterLinkPrefix());
        produceToSourceCluster(100);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    public void verifyDescribeLinksResult(ClusterLinkDescription.LinkState linkState, ClusterLinkDescription.LinkState linkState2) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        Seq<ClusterLinkDescription> describeClusterLinks = sourceCluster.describeClusterLinks(sourceCluster.describeClusterLinks$default$1());
        if (useSourceInitiatedLink()) {
            Assertions.assertEquals(1, describeClusterLinks.size());
            Assertions.assertEquals(linkName(), ((ClusterLinkDescription) describeClusterLinks.head()).linkName());
            Assertions.assertEquals(linkState2, ((ClusterLinkDescription) describeClusterLinks.head()).linkState());
            Assertions.assertEquals(ClusterLinkDescription.LinkMode.SOURCE, ((ClusterLinkDescription) describeClusterLinks.head()).linkMode());
            Assertions.assertEquals(ClusterLinkDescription.ConnectionMode.OUTBOUND, ((ClusterLinkDescription) describeClusterLinks.head()).connectionMode());
        } else {
            Assertions.assertTrue(describeClusterLinks.isEmpty());
        }
        ClusterLinkTestHarness destCluster = destCluster();
        Seq<ClusterLinkDescription> describeClusterLinks2 = destCluster.describeClusterLinks(destCluster.describeClusterLinks$default$1());
        Assertions.assertEquals(1, describeClusterLinks2.size());
        Assertions.assertEquals(linkState, ((ClusterLinkDescription) describeClusterLinks2.head()).linkState());
        Assertions.assertEquals(ClusterLinkDescription.LinkMode.DESTINATION, ((ClusterLinkDescription) describeClusterLinks2.head()).linkMode());
        if (useSourceInitiatedLink()) {
            Assertions.assertEquals(ClusterLinkDescription.ConnectionMode.INBOUND, ((ClusterLinkDescription) describeClusterLinks2.head()).connectionMode());
        } else {
            Assertions.assertEquals(ClusterLinkDescription.ConnectionMode.OUTBOUND, ((ClusterLinkDescription) describeClusterLinks2.head()).connectionMode());
        }
    }

    private void verifySaslJaasConfigEncrypted(UUID uuid) {
        String property = ((ClusterLinkMetadataManager) ((KafkaBroker) connectingCluster().brokers().head()).clusterLinkManager().clusterLinkMetadataManager().get()).getClusterLinkConfigProps(uuid).getProperty("sasl.jaas.config");
        Assertions.assertNotNull(property);
        Assertions.assertFalse(property.contains("secret-"), new StringBuilder(24).append("Password not encrypted: ").append(property).toString());
    }

    public static final /* synthetic */ boolean $anonfun$testCreateMirrorTopic$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ClusterLinkListing clusterLinkListing) {
        String linkName = clusterLinkListing.linkName();
        String linkName2 = clusterLinkIntegrationTest.linkName();
        return linkName == null ? linkName2 == null : linkName.equals(linkName2);
    }

    public static final /* synthetic */ boolean $anonfun$testCreateMirrorTopic$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ClusterLinkDescription clusterLinkDescription) {
        String linkName = clusterLinkDescription.linkName();
        String linkName2 = clusterLinkIntegrationTest.linkName();
        return linkName == null ? linkName2 == null : linkName.equals(linkName2);
    }

    public static final /* synthetic */ boolean $anonfun$testCreateMirrorTopicAndVerifySourceTopicId$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest, Uuid uuid, Tuple2 tuple2) {
        Object _1 = tuple2._1();
        String sb = new StringBuilder(0).append(clusterLinkIntegrationTest.clusterLinkPrefix()).append(clusterLinkIntegrationTest.topic()).toString();
        if (_1 == null) {
            if (sb != null) {
                return false;
            }
        } else if (!_1.equals(sb)) {
            return false;
        }
        Uuid sourceTopicId = ((ClusterLinkTopicState) tuple2._2()).sourceTopicId();
        return sourceTopicId == null ? uuid == null : sourceTopicId.equals(uuid);
    }

    public static final /* synthetic */ boolean $anonfun$testCreateMirrorTopicAndVerifySourceTopicId$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, Uuid uuid, ClusterLinkMetadataManager clusterLinkMetadataManager) {
        return clusterLinkMetadataManager.clusterLinkTopicState(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{new StringBuilder(0).append(clusterLinkIntegrationTest.clusterLinkPrefix()).append(clusterLinkIntegrationTest.topic()).toString()}))).exists(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateMirrorTopicAndVerifySourceTopicId$2(clusterLinkIntegrationTest, uuid, tuple2));
        });
    }

    public static final /* synthetic */ boolean $anonfun$testTransactionsWithMirrorTopic$4(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return ((SeqLike) clusterLinkIntegrationTest.partitions(clusterLinkIntegrationTest.partitions$default$1()).flatMap(topicPartition -> {
            return (Buffer) ((TraversableLike) clusterLinkIntegrationTest.destCluster().brokers().flatMap(kafkaBroker -> {
                return Option$.MODULE$.option2Iterable(kafkaBroker.replicaManager().onlinePartition(topicPartition));
            }, Buffer$.MODULE$.canBuildFrom())).filter(partition -> {
                return BoxesRunTime.boxToBoolean(partition.linkedUpdatesOnly());
            });
        }, Seq$.MODULE$.canBuildFrom())).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testTransactionsWithMirrorTopic$8() {
        return "Mirror not stopped";
    }

    private final void restartMirrorTopic$1() {
        destCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
    }

    private final void restartSource$1() {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.restartDeadBrokers(sourceCluster.restartDeadBrokers$default$1());
        sourceCluster().updateBootstrapServers();
        if (useSourceInitiatedLink()) {
            return;
        }
        ClusterLinkTestHarness destCluster = destCluster();
        String linkName = linkName();
        scala.collection.Map$ map$ = scala.collection.Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc("bootstrap.servers");
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        destCluster.alterClusterLink(linkName, (scala.collection.Map) map$.apply(predef$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, sourceCluster2.bootstrapServers(sourceCluster2.bootstrapServers$default$1()))})), destCluster.alterClusterLink$default$3());
    }

    public static final /* synthetic */ boolean $anonfun$testCreateAndDeleteAndRecreateLink$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ClusterLinkListing clusterLinkListing) {
        String linkName = clusterLinkListing.linkName();
        String linkName2 = clusterLinkIntegrationTest.linkName();
        return linkName == null ? linkName2 == null : linkName.equals(linkName2);
    }

    public static final /* synthetic */ boolean $anonfun$testCreateAndDeleteAndRecreateLink$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ClusterLinkDescription clusterLinkDescription) {
        String linkName = clusterLinkDescription.linkName();
        String linkName2 = clusterLinkIntegrationTest.linkName();
        return linkName == null ? linkName2 == null : linkName.equals(linkName2);
    }

    public static final /* synthetic */ Seq $anonfun$testCreateAndDeleteAndRecreateLink$3(KafkaBroker kafkaBroker) {
        return kafkaBroker.clusterLinkManager().listClusterLinks();
    }

    public static final /* synthetic */ boolean $anonfun$testCreateAndDeleteAndRecreateLink$4(Seq seq) {
        Nil$ nil$ = Nil$.MODULE$;
        return seq == null ? nil$ == null : seq.equals(nil$);
    }

    public static final /* synthetic */ boolean $anonfun$testCreateAndDeleteAndRecreateLink$5(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ClusterLinkListing clusterLinkListing) {
        String linkName = clusterLinkListing.linkName();
        String linkName2 = clusterLinkIntegrationTest.linkName();
        return linkName == null ? linkName2 == null : linkName.equals(linkName2);
    }

    public static final /* synthetic */ boolean $anonfun$testCreateAndDeleteAndRecreateLink$6(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ClusterLinkDescription clusterLinkDescription) {
        String linkName = clusterLinkDescription.linkName();
        String linkName2 = clusterLinkIntegrationTest.linkName();
        return linkName == null ? linkName2 == null : linkName.equals(linkName2);
    }

    public static final /* synthetic */ boolean $anonfun$testCreateAndDeleteAndRecreateLink$7(Option option, UUID uuid) {
        return !((ClusterLinkMetadataManager) option.get()).clusterLinkExists(uuid);
    }

    public static final /* synthetic */ String $anonfun$testCreateAndDeleteAndRecreateLink$8(UUID uuid) {
        return new StringBuilder(28).append("Cluster link ").append(uuid).append(" is not deleted").toString();
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigSync$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopicConfig(new StringBuilder(0).append(clusterLinkIntegrationTest.clusterLinkPrefix()).append(clusterLinkIntegrationTest.topic()).toString()).get("delete.retention.ms").value().equals("80000000");
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigSync$2() {
        return "Topic configs did not get propagated";
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeMirror$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        MirrorTopicDescription.State state = clusterLinkIntegrationTest.destCluster().describeMirrorTopic(clusterLinkIntegrationTest.topic()).state();
        MirrorTopicDescription.State state2 = MirrorTopicDescription.State.STOPPED;
        return state == null ? state2 == null : state.equals(state2);
    }

    public static final /* synthetic */ String $anonfun$testListDescribeMirror$2() {
        return "Mirror took too long to stop.";
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeMirror$4(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().listMirrorTopics(true).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testListDescribeMirror$5() {
        return "Mirror state not removed";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void setQuota$1(long j, ConfluentAdmin confluentAdmin) {
        confluentAdmin.alterClientQuotas(Collections.singleton(new ClientQuotaAlteration(new ClientQuotaEntity((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), linkUserName(linkName()))}))).asJava()), Collections.singleton(new ClientQuotaAlteration.Op(DynamicConfig$Client$.MODULE$.ConsumerByteRateOverrideProp(), Predef$.MODULE$.double2Double(j)))))).all().get(15L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean throttled$1() {
        return kafkaMetricMaxValue(destCluster().brokers(), "fetch-throttle-time-max", "cluster-link", new Some(linkName()), kafkaMetricMaxValue$default$5(), kafkaMetricMaxValue$default$6()) > 0.0d;
    }

    public static final /* synthetic */ int $anonfun$testDestinationClusterLinkBrokerLevelQuota$1(KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId();
    }

    public static final /* synthetic */ ConfigResource $anonfun$testDestinationClusterLinkBrokerLevelQuota$2(int i) {
        return new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(i));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void setQuota$2(long j, Seq seq, ConfluentAdmin confluentAdmin) {
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.ClusterLinkIoMaxBytesPerSecondProp(), Long.toString(j)), AlterConfigOp.OpType.SET);
        confluentAdmin.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) seq.map(configResource -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{alterConfigOp}))).asJavaCollection());
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava()).all().get();
    }

    public static final /* synthetic */ boolean $anonfun$verifyQuotaMode$1(KafkaBroker kafkaBroker, ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode) {
        ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode2 = kafkaBroker.config().clusterLinkQuotaMode();
        return clusterLinkQuotaMode2 == null ? clusterLinkQuotaMode == null : clusterLinkQuotaMode2.equals(clusterLinkQuotaMode);
    }

    public static final /* synthetic */ String $anonfun$verifyQuotaMode$2() {
        return "Quota mode not updated";
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$2(ClusterLinkFetcherManager clusterLinkFetcherManager) {
        return clusterLinkFetcherManager.fetcherThreadCount() > 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final int fetchSize$1(LeaderEndPoint leaderEndPoint) {
        return BoxesRunTime.unboxToInt(TestUtils.fieldValue(leaderEndPoint, ClusterLinkLeaderEndPoint.class, "fetchSize"));
    }

    private static final ClusterLinkLeaderRequestBuilder fetcherThreadLeaderRequestBuilder$1(LeaderEndPoint leaderEndPoint) {
        return (ClusterLinkLeaderRequestBuilder) TestUtils.fieldValue(leaderEndPoint, ClusterLinkLeaderEndPoint.class, "requestBuilder");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final int fetchResponseSize$1(LeaderEndPoint leaderEndPoint) {
        return BoxesRunTime.unboxToInt(TestUtils.fieldValue(fetcherThreadLeaderRequestBuilder$1(leaderEndPoint), ClusterLinkLeaderRequestBuilder.class, "fetchResponseSize"));
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$8(Object obj, int i) {
        return BoxesRunTime.equals(BoxesRunTime.boxToInteger(i), obj);
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$10(Object obj, int i) {
        return BoxesRunTime.equals(BoxesRunTime.boxToInteger(i), obj);
    }

    public static final /* synthetic */ scala.collection.immutable.Set $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, TopicPartition topicPartition) {
        return ((TraversableOnce) ((TraversableLike) CollectionConverters$.MODULE$.asScalaBufferConverter(((TopicPartitionInfo) clusterLinkIntegrationTest.destCluster().describeTopic(clusterLinkIntegrationTest.topic()).partitions().get(topicPartition.partition())).isr()).asScala()).map(node -> {
            return BoxesRunTime.boxToInteger(node.id());
        }, Buffer$.MODULE$.canBuildFrom())).toSet();
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$3(int i, scala.collection.immutable.Set set) {
        return set.contains(BoxesRunTime.boxToInteger(i));
    }

    public static final /* synthetic */ void $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$4(ObjectRef objectRef, TopicPartition topicPartition) {
        try {
            ((ConfluentAdmin) objectRef.elem).electLeaders(ElectionType.PREFERRED, Collections.singleton(topicPartition)).all().get(15L, TimeUnit.SECONDS);
        } catch (Throwable unused) {
            Assertions.fail("Preferred leader election failed");
        }
    }

    public static final /* synthetic */ int $anonfun$testAddPartitions$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopic(new StringBuilder(0).append(clusterLinkIntegrationTest.clusterLinkPrefix()).append(clusterLinkIntegrationTest.topic()).toString()).partitions().size();
    }

    public static final /* synthetic */ boolean $anonfun$testAddPartitions$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest, int i) {
        return i == clusterLinkIntegrationTest.numPartitions();
    }

    public static final /* synthetic */ boolean $anonfun$testAlterClusterLinkConfigs$6(int i) {
        return i == 3;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v18 */
    /* JADX WARN: Type inference failed for: r0v29, types: [boolean] */
    /* JADX WARN: Type inference failed for: r0v36 */
    /* JADX WARN: Type inference failed for: r0v37 */
    public static final /* synthetic */ void $anonfun$testDestReadOnly$4(ConfluentAdmin confluentAdmin, ConfigResource configResource, Tuple2 tuple2) {
        AlterConfigOp alterConfigOp;
        ExecutionException executionException;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        String str = (String) tuple2._1();
        Some some = (Option) tuple2._2();
        String UncleanLeaderElectionEnableProp = LogConfig$.MODULE$.UncleanLeaderElectionEnableProp();
        boolean z = str != null ? str.equals(UncleanLeaderElectionEnableProp) : UncleanLeaderElectionEnableProp == null;
        if (some instanceof Some) {
            AlterConfigOp alterConfigOp2 = new AlterConfigOp(new ConfigEntry(str, (String) some.value()), AlterConfigOp.OpType.SET);
            alterConfigOp = alterConfigOp2;
            executionException = alterConfigOp2;
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            AlterConfigOp alterConfigOp3 = new AlterConfigOp(new ConfigEntry(str, (String) null), AlterConfigOp.OpType.DELETE);
            alterConfigOp = alterConfigOp3;
            executionException = alterConfigOp3;
        }
        try {
            confluentAdmin.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), Collections.singleton(alterConfigOp))}))).asJava()).all().get();
            executionException = z;
            Assertions.assertTrue((boolean) executionException);
        } catch (ExecutionException unused) {
            Assertions.assertTrue(executionException.getCause() instanceof InvalidConfigurationException);
            Assertions.assertFalse(z);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.kafka.common.config.ConfigResource, java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v10, types: [java.lang.Object] */
    public static final /* synthetic */ void $anonfun$testDestReadOnly$3(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ConfluentAdmin confluentAdmin) {
        ExecutionException configResource = new ConfigResource(ConfigResource.Type.TOPIC, clusterLinkIntegrationTest.topic());
        try {
            confluentAdmin.alterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object) configResource), new Config(CollectionConverters$.MODULE$.asJavaCollectionConverter(List$.MODULE$.empty()).asJavaCollection()))}))).asJava()).all().get(20L, TimeUnit.SECONDS);
            configResource = Assertions.fail("alterConfigs() on a mirror topic should fail");
        } catch (ExecutionException unused) {
            Assertions.assertTrue(configResource.getCause() instanceof InvalidRequestException);
        }
        new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.UncleanLeaderElectionEnableProp()), new Some("true")), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.UncleanLeaderElectionEnableProp()), None$.MODULE$), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.CleanupPolicyProp()), new Some("compact")), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.CleanupPolicyProp()), None$.MODULE$), Nil$.MODULE$)))).foreach(tuple2 -> {
            $anonfun$testDestReadOnly$4(confluentAdmin, configResource, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ String $anonfun$testDeleteClusterLinkCleanup$1(int i) {
        return new StringBuilder(6).append("topic-").append(i).toString();
    }

    public static final /* synthetic */ TopicPartition $anonfun$testMirroredTopicMarkedForDelete$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, int i) {
        return new TopicPartition(new StringBuilder(0).append(clusterLinkIntegrationTest.clusterLinkPrefix()).append(clusterLinkIntegrationTest.topic()).toString(), i);
    }

    public static final /* synthetic */ boolean $anonfun$testMirroredTopicMarkedForDelete$2(int i, KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId() != i;
    }

    public static final /* synthetic */ boolean $anonfun$testMirroredTopicMarkedForDelete$4(IndexedSeq indexedSeq, KafkaBroker kafkaBroker) {
        return ((SeqLike) indexedSeq.flatMap(topicPartition -> {
            return Option$.MODULE$.option2Iterable(kafkaBroker.replicaManager().onlinePartition(topicPartition));
        }, IndexedSeq$.MODULE$.canBuildFrom())).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testMirroredTopicMarkedForDelete$6() {
        return "Partitions not offline after topic deletion";
    }

    public static final /* synthetic */ boolean $anonfun$testMirroredTopicMarkedForDelete$7(KafkaBroker kafkaBroker, UUID uuid) {
        return ((ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get()).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testMirroredTopicMarkedForDelete$8() {
        return "Fetcher manager not empty after topic deletion";
    }

    public static final /* synthetic */ void $anonfun$testMirroredTopicMarkedForDelete$3(IndexedSeq indexedSeq, UUID uuid, KafkaBroker kafkaBroker) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMirroredTopicMarkedForDelete$4(indexedSeq, kafkaBroker)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMirroredTopicMarkedForDelete$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testMirroredTopicMarkedForDelete$7(kafkaBroker, uuid)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testMirroredTopicMarkedForDelete$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testPauseTopic$5(ReplicaStatus.MirrorInfo mirrorInfo) {
        ReplicaStatus.MirrorInfo.State state = mirrorInfo.state();
        ReplicaStatus.MirrorInfo.State state2 = ReplicaStatus.MirrorInfo.State.PAUSED;
        return state == null ? state2 == null : state.equals(state2);
    }

    public static final /* synthetic */ boolean $anonfun$testPauseTopic$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), clusterLinkIntegrationTest.numPartitions()).forall(i -> {
            Option$ option$ = Option$.MODULE$;
            ClusterLinkTestHarness destCluster = clusterLinkIntegrationTest.destCluster();
            return option$.apply(((ReplicaStatus) ((IterableLike) destCluster.replicaStatus(new StringBuilder(0).append(clusterLinkIntegrationTest.clusterLinkPrefix()).append(clusterLinkIntegrationTest.topic()).toString(), i, destCluster.replicaStatus$default$3()).filter(replicaStatus -> {
                return BoxesRunTime.boxToBoolean(replicaStatus.isLeader());
            })).head()).mirrorInfo().orElse(null)).exists(mirrorInfo -> {
                return BoxesRunTime.boxToBoolean($anonfun$testPauseTopic$5(mirrorInfo));
            });
        });
    }

    public static final /* synthetic */ String $anonfun$testPauseTopic$6() {
        return "Topic's partitions not paused";
    }

    private final Seq leaderOffsets$1() {
        return ((scala.collection.immutable.Seq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(i -> {
            ClusterLinkTestHarness destCluster = this.destCluster();
            return ((ReplicaStatus) ((IterableLike) destCluster.replicaStatus(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), i, destCluster.replicaStatus$default$3()).filter(replicaStatus -> {
                return BoxesRunTime.boxToBoolean(replicaStatus.isLeader());
            })).head()).logEndOffset();
        }, IndexedSeq$.MODULE$.canBuildFrom())).toSeq();
    }

    public static final /* synthetic */ int $anonfun$testPauseClusterLink$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopic(clusterLinkIntegrationTest.topic()).partitions().size();
    }

    public static final /* synthetic */ boolean $anonfun$testPauseClusterLink$3(int i, int i2) {
        return i2 == i;
    }

    public static final /* synthetic */ String $anonfun$testPauseClusterLink$4(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopicConfig(clusterLinkIntegrationTest.topic()).get(LogConfig$.MODULE$.DeleteRetentionMsProp()).value();
    }

    public static final /* synthetic */ boolean $anonfun$testPauseClusterLink$5(String str, String str2) {
        return str2 == null ? str == null : str2.equals(str);
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$3(ClusterLinkIntegrationTest clusterLinkIntegrationTest, int i) {
        return clusterLinkIntegrationTest.destCluster().leaderEpoch(new TopicPartition(clusterLinkIntegrationTest.topic(), 0)) >= i;
    }

    public static final /* synthetic */ String $anonfun$testReplicaStatus$4() {
        return "Destination leader epoch not updated";
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$5(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$6(ReplicaStatus replicaStatus) {
        return (replicaStatus.isLeader() || replicaStatus.linkName().isPresent()) ? false : true;
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$7(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$8(ReplicaStatus replicaStatus) {
        return !replicaStatus.isLeader() && replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$9(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$10(ReplicaStatus replicaStatus) {
        return (replicaStatus.isLeader() || replicaStatus.linkName().isPresent()) ? false : true;
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$11(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ObjectRef objectRef) {
        objectRef.elem = clusterLinkIntegrationTest.destCluster().replicaStatus(clusterLinkIntegrationTest.topic(), 0, true);
        return ((Seq) objectRef.elem).size() == 2;
    }

    public static final /* synthetic */ String $anonfun$testReplicaStatus$12() {
        return "Cluster link not removed from topic's partition";
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$13(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$14(ReplicaStatus replicaStatus) {
        return (replicaStatus.isLeader() || replicaStatus.linkName().isPresent()) ? false : true;
    }

    public static final /* synthetic */ boolean $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, int i) {
        return clusterLinkIntegrationTest.destCluster().leaderEpoch(new TopicPartition(new StringBuilder(0).append(clusterLinkIntegrationTest.clusterLinkPrefix()).append(clusterLinkIntegrationTest.topic()).toString(), 0)) >= i;
    }

    public static final /* synthetic */ String $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$2() {
        return "Destination leader epoch not updated";
    }

    public static final /* synthetic */ boolean $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$3(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$4(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testIntervalChangeForPeriodicTasks$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopicConfig(new StringBuilder(0).append(clusterLinkIntegrationTest.clusterLinkPrefix()).append(clusterLinkIntegrationTest.topic()).toString()).get("delete.retention.ms").value().equals("80000000");
    }

    public static final /* synthetic */ String $anonfun$testIntervalChangeForPeriodicTasks$2() {
        return "Topic configs did not get propagated";
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorFailoverWhenSourceIsUnavailable$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, int i) {
        return clusterLinkIntegrationTest.destCluster().leaderEpoch(new TopicPartition(clusterLinkIntegrationTest.topic(), 0)) >= i;
    }

    public static final /* synthetic */ String $anonfun$testMirrorFailoverWhenSourceIsUnavailable$2() {
        return "Destination leader epoch not updated";
    }
}
