package kafka.link;

import java.util.Collections;
import java.util.Properties;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.server.ConfigType$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.ReplicaManager;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.server.link.FailedClusterLink$;
import kafka.server.link.SecureLinkConfigEncoder;
import kafka.server.link.TopicLinkMirror$;
import kafka.utils.CoreUtils$;
import kafka.utils.JaasTestUtils$;
import kafka.utils.TestUtils$;
import kafka.zk.AdminZkClient;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.StringOps$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkFailureTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005\t\rh\u0001B\u0016-\u0001EBQA\u000e\u0001\u0005\u0002]Bq!\u000f\u0001C\u0002\u0013\u0005#\b\u0003\u0004B\u0001\u0001\u0006Ia\u000f\u0005\b\u0005\u0002\u0011\r\u0011\"\u0003D\u0011\u0019a\u0005\u0001)A\u0005\t\"9Q\n\u0001b\u0001\n\u0013q\u0005B\u0002*\u0001A\u0003%q\nC\u0004T\u0001\t\u0007I\u0011\u0002(\t\rQ\u0003\u0001\u0015!\u0003P\u0011\u001d)\u0006A1A\u0005\nYCaA\u0017\u0001!\u0002\u00139\u0006\"C.\u0001\u0001\u0004\u0005\r\u0011\"\u0003]\u0011%Q\u0007\u00011AA\u0002\u0013%1\u000eC\u0005r\u0001\u0001\u0007\t\u0011)Q\u0005;\"I!\u000f\u0001a\u0001\u0002\u0004%Ia\u001d\u0005\nu\u0002\u0001\r\u00111A\u0005\nmD\u0011\" \u0001A\u0002\u0003\u0005\u000b\u0015\u0002;\t\u000by\u0004A\u0011I@\t\u000f\u0005\u0005\u0002\u0001\"\u0001\u0002$!9\u0011\u0011\u000f\u0001\u0005\u0002\u0005M\u0004bBAB\u0001\u0011\u0005\u0011Q\u0011\u0005\b\u0003#\u0003A\u0011AAJ\u0011\u001d\ty\n\u0001C\u0001\u0003CCq!!,\u0001\t\u0003\ty\u000bC\u0004\u0002<\u0002!\t!!0\t\u000f\u0005%\u0007\u0001\"\u0001\u0002L\"9\u0011q\u001b\u0001\u0005\u0002\u0005e\u0007bBAs\u0001\u0011\u0005\u0011q\u001d\u0005\b\u0003g\u0004A\u0011AA{\u0011\u001d\u0011\t\u0001\u0001C\u0001\u0005\u0007AqAa\u0004\u0001\t\u0003\u0011\t\u0002C\u0004\u0003\u001e\u0001!\tAa\b\t\u000f\t-\u0002\u0001\"\u0001\u0003.!9!\u0011\b\u0001\u0005\u0002\tm\u0002b\u0002B$\u0001\u0011\u0005!\u0011\n\u0005\b\u0005+\u0002A\u0011\u0001B,\u0011\u001d\u0011\t\u0007\u0001C\u0001\u0005GBqAa\u001c\u0001\t#\u0011\t\bC\u0005\u0003\u0010\u0002\t\n\u0011\"\u0005\u0003\u0012\"9!q\u0015\u0001\u0005\n\t%\u0006b\u0002BV\u0001\u0011%!Q\u0016\u0005\b\u0005+\u0004A\u0011\u0002BU\u0005Y\u0019E.^:uKJd\u0015N\\6GC&dWO]3UKN$(BA\u0017/\u0003\u0011a\u0017N\\6\u000b\u0003=\nQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001eA\u00111\u0007N\u0007\u0002Y%\u0011Q\u0007\f\u0002#\u0003\n\u001cHO]1di\u000ecWo\u001d;fe2Kgn[%oi\u0016<'/\u0019;j_:$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005A\u0004CA\u001a\u0001\u0003E\u0011X\r\u001d7jG\u0006$\u0018n\u001c8GC\u000e$xN]\u000b\u0002wA\u0011AhP\u0007\u0002{)\ta(A\u0003tG\u0006d\u0017-\u0003\u0002A{\t)1\u000b[8si\u0006\u0011\"/\u001a9mS\u000e\fG/[8o\r\u0006\u001cGo\u001c:!\u00035\u0019wN\\:v[\u0016\u0014xI]8vaV\tA\t\u0005\u0002F\u00156\taI\u0003\u0002H\u0011\u0006!A.\u00198h\u0015\u0005I\u0015\u0001\u00026bm\u0006L!a\u0013$\u0003\rM#(/\u001b8h\u00039\u0019wN\\:v[\u0016\u0014xI]8va\u0002\naa]=oG6\u001bX#A(\u0011\u0005q\u0002\u0016BA)>\u0005\u0011auN\\4\u0002\u000fMLhnY'tA\u00059!/\u001a;ss6\u001b\u0018\u0001\u0003:fiJLXj\u001d\u0011\u0002\u00159,XNU3d_J$7/F\u0001X!\ta\u0004,\u0003\u0002Z{\t\u0019\u0011J\u001c;\u0002\u00179,XNU3d_J$7\u000fI\u0001\nI\u0016\u001cH/\u00113nS:,\u0012!\u0018\t\u0003=\"l\u0011a\u0018\u0006\u0003A\u0006\fQ!\u00193nS:T!AY2\u0002\u000f\rd\u0017.\u001a8ug*\u0011q\u0006\u001a\u0006\u0003K\u001a\fa!\u00199bG\",'\"A4\u0002\u0007=\u0014x-\u0003\u0002j?\nq1i\u001c8gYV,g\u000e^!e[&t\u0017!\u00043fgR\fE-\\5o?\u0012*\u0017\u000f\u0006\u0002m_B\u0011A(\\\u0005\u0003]v\u0012A!\u00168ji\"9\u0001/DA\u0001\u0002\u0004i\u0016a\u0001=%c\u0005QA-Z:u\u0003\u0012l\u0017N\u001c\u0011\u0002\r1Lgn[%e+\u0005!\bCA;y\u001b\u00051(BA<d\u0003\u0019\u0019w.\\7p]&\u0011\u0011P\u001e\u0002\u0005+VLG-\u0001\u0006mS:\\\u0017\nZ0%KF$\"\u0001\u001c?\t\u000fA\u0004\u0012\u0011!a\u0001i\u00069A.\u001b8l\u0013\u0012\u0004\u0013!B:fiV\u0003Hc\u00017\u0002\u0002!9\u00111\u0001\nA\u0002\u0005\u0015\u0011\u0001\u0003;fgRLeNZ8\u0011\t\u0005\u001d\u0011QC\u0007\u0003\u0003\u0013QA!a\u0003\u0002\u000e\u0005\u0019\u0011\r]5\u000b\t\u0005=\u0011\u0011C\u0001\bUV\u0004\u0018\u000e^3s\u0015\r\t\u0019BZ\u0001\u0006UVt\u0017\u000e^\u0005\u0005\u0003/\tIA\u0001\u0005UKN$\u0018J\u001c4pQ\r\u0011\u00121\u0004\t\u0005\u0003\u000f\ti\"\u0003\u0003\u0002 \u0005%!A\u0003\"fM>\u0014X-R1dQ\u00061C/Z:u\u001b&\u0014(o\u001c:U_BL7m\u00149fe\u0006$\u0018n\u001c8t\u001f:d\u0015N\\6GC&dWO]3\u0015\u000b1\f)#!\u0010\t\u000f\u0005\u001d2\u00031\u0001\u0002*\u00051\u0011/^8sk6\u0004B!a\u000b\u0002:9!\u0011QFA\u001b!\r\ty#P\u0007\u0003\u0003cQ1!a\r1\u0003\u0019a$o\\8u}%\u0019\u0011qG\u001f\u0002\rA\u0013X\rZ3g\u0013\rY\u00151\b\u0006\u0004\u0003oi\u0004bBA '\u0001\u0007\u0011\u0011I\u0001\fG>|'\u000fZ5oCR|'\u000fE\u0002=\u0003\u0007J1!!\u0012>\u0005\u001d\u0011un\u001c7fC:DsaEA%\u00033\nY\u0006\u0005\u0003\u0002L\u0005USBAA'\u0015\u0011\ty%!\u0015\u0002\u0011A\u0014xN^5eKJTA!a\u0015\u0002\u000e\u00051\u0001/\u0019:b[NLA!a\u0016\u0002N\taQ*\u001a;i_\u0012\u001cv.\u001e:dK\u0006)a/\u00197vK2\u0012\u0011QL\u0011\u0003\u0003?\naB_6D_6\u0014\u0017N\\1uS>t7\u000fK\u0004\u0014\u0003G\nY'!\u001c\u0011\t\u0005\u0015\u0014qM\u0007\u0003\u0003#JA!!\u001b\u0002R\t\t\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;\u0002\t9\fW.Z\u0011\u0003\u0003_\n\u0001f\u001f3jgBd\u0017-\u001f(b[\u0016lh&];peVlWh\u001f\u0019~]\r|wN\u001d3j]\u0006$xN]\u001f|cu\fQ\u0004^3tiN{WO]2f\u00072,8\u000f^3s\u001d>$\u0018I^1jY\u0006\u0014G.\u001a\u000b\u0006Y\u0006U\u0014q\u000f\u0005\b\u0003O!\u0002\u0019AA\u0015\u0011\u001d\ty\u0004\u0006a\u0001\u0003\u0003Bs\u0001FA%\u00033\nY\b\f\u0002\u0002~\u0005\u0012\u0011qP\u0001\u0010C2d7i\\7cS:\fG/[8og\":A#a\u0019\u0002l\u00055\u0014!\u0007;fgR$U\r\\3uK2Kgn\u001b#ve&twMU3uef$R\u0001\\AD\u0003\u0013Cq!a\n\u0016\u0001\u0004\tI\u0003C\u0004\u0002@U\u0001\r!!\u0011)\u000fU\tI%!\u0017\u0002\u000e2\u0012\u0011Q\f\u0015\b+\u0005\r\u00141NA7\u0003e!Xm\u001d;BkRDWM\u001c;jG\u0006$\u0018n\u001c8GC&dWO]3\u0015\u000b1\f)*a&\t\u000f\u0005\u001db\u00031\u0001\u0002*!9\u0011q\b\fA\u0002\u0005\u0005\u0003f\u0002\f\u0002J\u0005e\u00131\u0014\u0017\u0003\u0003{BsAFA2\u0003W\ni'\u0001\ruKN$\u0018)\u001e;i_JL'0\u0019;j_:4\u0015-\u001b7ve\u0016$R\u0001\\AR\u0003KCq!a\n\u0018\u0001\u0004\tI\u0003C\u0004\u0002@]\u0001\r!!\u0011)\u000f]\tI%!\u0017\u0002*2\u0012\u0011Q\u0010\u0015\b/\u0005\r\u00141NA7\u0003I!Xm\u001d;D_J\u0014X\u000f\u001d;D_:4\u0017nZ:\u0015\u000b1\f\t,a-\t\u000f\u0005\u001d\u0002\u00041\u0001\u0002*!9\u0011q\b\rA\u0002\u0005\u0005\u0003f\u0002\r\u0002J\u0005e\u0013q\u0017\u0017\u0003\u0003;Bs\u0001GA2\u0003W\ni'\u0001\fuKN$XK\u001c3fG>$\u0017M\u00197f\u0007>tg-[4t)\u0015a\u0017qXAa\u0011\u001d\t9#\u0007a\u0001\u0003SAq!a\u0010\u001a\u0001\u0004\t\t\u0005K\u0004\u001a\u0003\u0013\nI&!2-\u0005\u0005u\u0003fB\r\u0002d\u0005-\u0014QN\u0001\u001fi\u0016\u001cH\u000fT5ti\u0012+7o\u0019:jE\u0016<\u0016\u000e\u001e5pkR\u001cuN\u001c4jON$R\u0001\\Ag\u0003\u001fDq!a\n\u001b\u0001\u0004\tI\u0003C\u0004\u0002@i\u0001\r!!\u0011)\u000fi\tI%!\u0017\u0002T2\u0012\u0011Q\f\u0015\b5\u0005\r\u00141NA7\u0003U!Xm\u001d;T_V\u00148-\u001a+pa&\u001cG)\u001a7fi\u0016$R\u0001\\An\u0003;Dq!a\n\u001c\u0001\u0004\tI\u0003C\u0004\u0002@m\u0001\r!!\u0011)\u000fm\tI%!\u0017\u0002b2\u0012\u0011Q\u0010\u0015\b7\u0005\r\u00141NA7\u00035\"Xm\u001d;T_V\u00148-\u001a+pa&\u001c'+Z2sK\u0006$X\rR3uK\u000e$\u0018n\u001c8Vg&tw\rV8qS\u000eLEm\u001d\u000b\u0006Y\u0006%\u00181\u001e\u0005\b\u0003Oa\u0002\u0019AA\u0015\u0011\u001d\ty\u0004\ba\u0001\u0003\u0003Bs\u0001HA%\u00033\ny\u000f\f\u0002\u0002~!:A$a\u0019\u0002l\u00055\u0014a\u0006;fgR\u001cv.\u001e:dK2+\u0017\rZ3s\u0007\"\fgnZ3t)\u0015a\u0017q_A}\u0011\u001d\t9#\ba\u0001\u0003SAq!a\u0010\u001e\u0001\u0004\t\t\u0005K\u0004\u001e\u0003\u0013\nI&!@-\u0005\u0005u\u0003fB\u000f\u0002d\u0005-\u0014QN\u00012i\u0016\u001cH/\u00168dY\u0016\fgnU8ve\u000e,G*Z1eKJ,E.Z2uS>tw+\u001b;i\t\u0016\u001cH/\u00129pG\"\f\u0005.Z1e)\u0015a'Q\u0001B\u0004\u0011\u001d\t9C\ba\u0001\u0003SAq!a\u0010\u001f\u0001\u0004\t\t\u0005K\u0004\u001f\u0003\u0013\nIFa\u0003-\u0005\u0005u\u0003f\u0002\u0010\u0002d\u0005-\u0014QN\u0001\u0016i\u0016\u001cH\u000fR3ti2+\u0017\rZ3s\u0007\"\fgnZ3t)\u0015a'1\u0003B\u000b\u0011\u001d\t9c\ba\u0001\u0003SAq!a\u0010 \u0001\u0004\t\t\u0005K\u0004 \u0003\u0013\nIF!\u0007-\u0005\u0005u\u0003fB\u0010\u0002d\u0005-\u0014QN\u0001\u001ei\u0016\u001cH\u000fR3ti\u001a{G\u000e\\8xKJ\f\u0005.Z1e\u001f\u001adU-\u00193feR)AN!\t\u0003$!9\u0011q\u0005\u0011A\u0002\u0005%\u0002bBA A\u0001\u0007\u0011\u0011\t\u0015\bA\u0005%\u0013\u0011\fB\u0014Y\t\ti\u0006K\u0004!\u0003G\nY'!\u001c\u0002]Q,7\u000f\u001e#fgR\u0014V\r\u001d7jG\u0006$\u0018n\u001c8O_RLU\u000e]1di\u0016$')_*pkJ\u001cWMR1jYV\u0014Xm\u001d\u000b\u0006Y\n=\"\u0011\u0007\u0005\b\u0003O\t\u0003\u0019AA\u0015\u0011\u001d\ty$\ta\u0001\u0003\u0003Bs!IA%\u00033\u0012)\u0004\f\u0002\u0002~!:\u0011%a\u0019\u0002l\u00055\u0014A\u0011;fgR$Um\u001d;SKBd\u0017nY1uS>tgj\u001c;J[B\f7\r^3e\u0005f\u001cv.\u001e:dK\u001a\u000b\u0017\u000e\\;sKN<\u0016\u000e\u001e5PY\u0012lUm]:bO\u00164uN]7biR)AN!\u0010\u0003@!9\u0011q\u0005\u0012A\u0002\u0005%\u0002bBA E\u0001\u0007\u0011\u0011\t\u0015\bE\u0005%\u0013\u0011\fB\"Y\t\ti\bK\u0004#\u0003G\nY'!\u001c\u00029Q,7\u000f\u001e#fgRLg.\u0019;j_:D\u0015n\u001a5XCR,'/\\1sWR)ANa\u0013\u0003N!9\u0011qE\u0012A\u0002\u0005%\u0002bBA G\u0001\u0007\u0011\u0011\t\u0015\bG\u0005%\u0013\u0011\fB)Y\t\ti\bK\u0004$\u0003G\nY'!\u001c\u0002wQ,7\u000f\u001e(p)J,hnY1uS>t')\u001a7po\"Kw\r[,bi\u0016\u0014X.\u0019:l/&$\b.R7qifdU-\u00193fe\u0016\u0003xn\u00195DC\u000eDW\rF\u0002m\u00053Bq!a\n%\u0001\u0004\tI\u0003K\u0004%\u0003\u0013\nIF!\u0018-\u0005\u0005u\u0004f\u0002\u0013\u0002d\u0005-\u0014QN\u0001.i\u0016\u001cH\u000fR3mKR,\u0017)\u001e;p\u0007J,\u0017\r^3e\u001b&\u0014(o\u001c:U_BL7MR8s\r\u0006LG.\u001a3MS:\\G#\u00027\u0003f\t\u001d\u0004bBA\u0014K\u0001\u0007\u0011\u0011\u0006\u0005\b\u0003\u007f)\u0003\u0019AA!Q\u001d)\u0013\u0011JA-\u0005Wb#!! )\u000f\u0015\n\u0019'a\u001b\u0002n\u0005Ab/\u001a:jMf4\u0015-\u001b7ve\u0016\fe\u000e\u001a*fG>4XM]=\u0015\u00131\u0014\u0019H! \u0003\b\n-\u0005b\u0002B;M\u0001\u0007!qO\u0001\fM\u0006LG.\u001e:f)f\u0004X\rE\u00024\u0005sJ1Aa\u001f-\u0005-1\u0015-\u001b7ve\u0016$\u0016\u0010]3\t\u000f\t}d\u00051\u0001\u0003\u0002\u0006a1\u000f^1si\u001a\u000b\u0017\u000e\\;sKB!AHa!m\u0013\r\u0011))\u0010\u0002\n\rVt7\r^5p]BBqA!#'\u0001\u0004\u0011\t)A\u0006ti>\u0004h)Y5mkJ,\u0007\"\u0003BGMA\u0005\t\u0019AA!\u0003\u00152XM]5gs\u00063\u0018-\u001b7bE&d\u0017\u000e^=NKR\u0014\u0018nY!gi\u0016\u0014(+Z2pm\u0016\u0014\u00180\u0001\u0012wKJLg-\u001f$bS2,(/Z!oIJ+7m\u001c<fef$C-\u001a4bk2$H\u0005N\u000b\u0003\u0005'SC!!\u0011\u0003\u0016.\u0012!q\u0013\t\u0005\u00053\u0013\u0019+\u0004\u0002\u0003\u001c*!!Q\u0014BP\u0003%)hn\u00195fG.,GMC\u0002\u0003\"v\n!\"\u00198o_R\fG/[8o\u0013\u0011\u0011)Ka'\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-A\bxC&$hi\u001c:SK\u000e|g/\u001a:z)\u0005a\u0017AC1dY\nKg\u000eZ5oOR1!q\u0016B^\u0005\u0017\u0004BA!-\u000386\u0011!1\u0017\u0006\u0004\u0005k3\u0018aA1dY&!!\u0011\u0018BZ\u0005)\t5\r\u001c\"j]\u0012Lgn\u001a\u0005\b\u0005{K\u0003\u0019\u0001B`\u00031\u0011Xm]8ve\u000e,G+\u001f9f!\u0011\u0011\tMa2\u000e\u0005\t\r'b\u0001Bcm\u0006A!/Z:pkJ\u001cW-\u0003\u0003\u0003J\n\r'\u0001\u0004*fg>,(oY3UsB,\u0007b\u0002BgS\u0001\u0007!qZ\u0001\u000fa\u0016\u0014X.[:tS>tG+\u001f9f!\u0011\u0011\tL!5\n\t\tM'1\u0017\u0002\u0012\u0003\u000ed\u0007+\u001a:nSN\u001c\u0018n\u001c8UsB,\u0017aC1eI2Kgn[!dYNDs\u0001\u0001Bm\u00033\u0012y\u000e\u0005\u0003\u0002\b\tm\u0017\u0002\u0002Bo\u0003\u0013\u00111\u0001V1hC\t\u0011\t/A\u0006j]R,wM]1uS>t\u0007")
/* loaded from: input_file:kafka/link/ClusterLinkFailureTest.class */
public class ClusterLinkFailureTest extends AbstractClusterLinkIntegrationTest {
    private final short replicationFactor = 3;
    private final String consumerGroup = "testGroup";
    private final long syncMs = 100;
    private final long retryMs = 1000;
    private final int numRecords = 20;
    private ConfluentAdmin destAdmin;
    private Uuid linkId;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public short replicationFactor() {
        return this.replicationFactor;
    }

    private String consumerGroup() {
        return this.consumerGroup;
    }

    private long syncMs() {
        return this.syncMs;
    }

    private long retryMs() {
        return this.retryMs;
    }

    private int numRecords() {
        return this.numRecords;
    }

    private ConfluentAdmin destAdmin() {
        return this.destAdmin;
    }

    private void destAdmin_$eq(ConfluentAdmin confluentAdmin) {
        this.destAdmin = confluentAdmin;
    }

    private Uuid linkId() {
        return this.linkId;
    }

    private void linkId_$eq(Uuid uuid) {
        this.linkId = uuid;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        if (sourceCluster() == null && destCluster() == null) {
            SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
            ClusterLinkTestHarness$ clusterLinkTestHarness$ = ClusterLinkTestHarness$.MODULE$;
            sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, None$.MODULE$, 0, 3));
            SecurityProtocol securityProtocol2 = SecurityProtocol.SASL_PLAINTEXT;
            ClusterLinkTestHarness$ clusterLinkTestHarness$2 = ClusterLinkTestHarness$.MODULE$;
            destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, None$.MODULE$, 100, 3));
            sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), authorizerClassName(testInfo));
            sourceCluster().serverConfig().setProperty("super.users", new StringBuilder(11).append("User:").append(JaasTestUtils$.MODULE$.KafkaScramAdmin()).append(";User:").append(JaasTestUtils$.MODULE$.KafkaScramUser2()).toString());
        }
        super.setUp(testInfo);
        addLinkAcls();
        ClusterLinkTestHarness destCluster = destCluster();
        destAdmin_$eq(destCluster.createConfluentAdminClient(destCluster.createConfluentAdminClient$default$1()));
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorTopicOperationsOnLinkFailure(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        linkId_$eq(setupLinkAndMirrorForFailureTest(syncMs(), retryMs(), consumerGroup(), setupLinkAndMirrorForFailureTest$default$4()));
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        destCluster().updateZkLinkConfig(linkId(), ClusterLinkConfig$.MODULE$.ConnectionModeProp(), "invalid");
        waitForFailure(destAdmin(), FailureType$CorruptConfigs$.MODULE$, waitForFailure$default$3());
        String str2 = "linkedTopic2";
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic("linkedTopic2", numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            ClusterLinkTestHarness destCluster = this.destCluster();
            destCluster.linkTopic(str2, (short) 2, this.linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        });
        destCluster().deleteTopic(topic(), true);
        destCluster().updateZkLinkConfig(linkId(), ClusterLinkConfig$.MODULE$.ConnectionModeProp(), ((ConnectionMode) (useSourceInitiatedLink() ? ConnectionMode$Inbound$.MODULE$ : ConnectionMode$Outbound$.MODULE$)).name());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            ClusterLinkListing $anonfun$testMirrorTopicOperationsOnLinkFailure$2 = $anonfun$testMirrorTopicOperationsOnLinkFailure$2(this);
            if ($anonfun$testMirrorTopicOperationsOnLinkFailure$2.available()) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testMirrorTopicOperationsOnLinkFailure$2), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testMirrorTopicOperationsOnLinkFailure$2), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertTrue(((ClusterLinkListing) tuple2._1()).available(), "Link not available");
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForRecovery();
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceClusterNotAvailable(String str, boolean z) {
        Assumptions.assumeFalse(useSourceInitiatedLink());
        verifyFailureAndRecovery(FailureType$NetworkFailure$.MODULE$, () -> {
            this.sourceCluster().killAllBrokers();
        }, () -> {
            this.restartCluster(this.sourceCluster(), this.restartCluster$default$2());
        }, true);
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDeleteLinkDuringRetry(String str, boolean z) {
        Assumptions.assumeFalse(useSourceInitiatedLink());
        linkId_$eq(setupLinkAndMirrorForFailureTest(syncMs(), retryMs(), consumerGroup(), setupLinkAndMirrorForFailureTest$default$4()));
        produceToSourceCluster(numRecords());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        destCluster().updateZkLinkConfig(linkId(), "bootstrap.servers", "999.999.999.999:0000");
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testDeleteLinkDuringRetry$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("cluster link didn't fail as expected");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        verifyFailureRetryMetric(new Some(linkName()));
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.deleteClusterLink(linkName(), true, destCluster.deleteClusterLink$default$3());
        ClusterLinkTestHarness destCluster2 = destCluster();
        Assertions.assertTrue(destCluster2.listClusterLinks(destCluster2.listClusterLinks$default$1(), destCluster2.listClusterLinks$default$2()).isEmpty());
        Assertions.assertFalse(destCluster().brokers().exists(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testDeleteLinkDuringRetry$3(this, kafkaBroker));
        }));
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAuthenticationFailure(String str, boolean z) {
        ClusterLinkTestHarness destCluster = useSourceInitiatedLink() ? destCluster() : sourceCluster();
        ObjectRef create = ObjectRef.create((Object) null);
        verifyFailureAndRecovery(FailureType$AuthenticationFailure$.MODULE$, () -> {
            create.elem = this.updateCredentials(destCluster);
        }, () -> {
            this.alterClusterLink(this.linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), (String) create.elem)})));
        }, verifyFailureAndRecovery$default$4());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAuthorizationFailure(String str, boolean z) {
        AclBinding aclBinding = aclBinding(ResourceType.TOPIC, AclPermissionType.DENY);
        verifyFailureAndRecovery(FailureType$AuthorizationFailure$.MODULE$, () -> {
            this.sourceCluster().addAcls(new $colon.colon(aclBinding, Nil$.MODULE$));
        }, () -> {
            this.sourceCluster().deleteAcls(new $colon.colon(aclBinding, Nil$.MODULE$));
        }, verifyFailureAndRecovery$default$4());
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testCorruptConfigs(String str, boolean z) {
        ConnectionMode$Inbound$ connectionMode$Inbound$ = useSourceInitiatedLink() ? ConnectionMode$Inbound$.MODULE$ : ConnectionMode$Outbound$.MODULE$;
        verifyFailureAndRecovery(FailureType$CorruptConfigs$.MODULE$, () -> {
            this.destCluster().updateZkLinkConfig(this.linkId(), ClusterLinkConfig$.MODULE$.ConnectionModeProp(), "invalid");
        }, () -> {
            this.destCluster().updateZkLinkConfig(this.linkId(), ClusterLinkConfig$.MODULE$.ConnectionModeProp(), ((ConnectionMode) connectionMode$Inbound$).name());
        }, verifyFailureAndRecovery$default$4());
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testUndecodableConfigs(String str, boolean z) {
        Assumptions.assumeFalse(useSourceInitiatedLink());
        Properties properties = new Properties();
        verifyFailureAndRecovery(FailureType$UndecodableConfigs$.MODULE$, () -> {
            this.makeConfigUndecodable$1(properties);
        }, () -> {
            this.reconfigureLink$1(properties);
        }, verifyFailureAndRecovery$default$4());
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testListDescribeWithoutConfigs(String str, boolean z) {
        Assumptions.assumeFalse(useSourceInitiatedLink());
        useBidirectionalLink_$eq(false);
        linkId_$eq(setupLinkAndMirrorForFailureTest(syncMs(), retryMs(), consumerGroup(), setupLinkAndMirrorForFailureTest$default$4()));
        produceToSourceCluster(numRecords());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        destCluster().updateZkLinkConfig(linkId(), "bootstrap.servers", "invalid");
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testListDescribeWithoutConfigs$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("cluster link didn't fail as expected");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        ClusterLinkTestHarness destCluster = destCluster();
        Seq<ClusterLinkListing> listClusterLinks = destCluster.listClusterLinks(destCluster.listClusterLinks$default$1(), destCluster.listClusterLinks$default$2());
        Assertions.assertEquals(((ClusterLinkListing) listClusterLinks.head()).linkName(), linkName());
        Assertions.assertEquals(((ClusterLinkListing) listClusterLinks.head()).clusterLinkError(), ClusterLinkError.UNKNOWN);
        createClusterLink("test-link2", createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        Seq<ClusterLinkDescription> describeClusterLinks = destCluster2.describeClusterLinks(destCluster2.describeClusterLinks$default$1(), destCluster2.describeClusterLinks$default$2(), destCluster2.describeClusterLinks$default$3());
        Assertions.assertEquals(describeClusterLinks.size(), 2);
        Assertions.assertTrue(((IterableOnceOps) describeClusterLinks.filter(clusterLinkDescription -> {
            return BoxesRunTime.boxToBoolean($anonfun$testListDescribeWithoutConfigs$3(this, clusterLinkDescription));
        })).exists(clusterLinkDescription2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testListDescribeWithoutConfigs$4(clusterLinkDescription2));
        }));
        Assertions.assertTrue(((IterableOnceOps) describeClusterLinks.filter(clusterLinkDescription3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testListDescribeWithoutConfigs$5(clusterLinkDescription3));
        })).exists(clusterLinkDescription4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testListDescribeWithoutConfigs$6(clusterLinkDescription4));
        }));
        Assertions.assertTrue(((IterableOnceOps) describeClusterLinks.filter(clusterLinkDescription5 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testListDescribeWithoutConfigs$7(this, clusterLinkDescription5));
        })).exists(clusterLinkDescription6 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testListDescribeWithoutConfigs$8(clusterLinkDescription6));
        }));
        Assertions.assertTrue(((IterableOnceOps) describeClusterLinks.filter(clusterLinkDescription7 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testListDescribeWithoutConfigs$9(clusterLinkDescription7));
        })).exists(clusterLinkDescription8 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testListDescribeWithoutConfigs$10(clusterLinkDescription8));
        }));
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceTopicDelete(String str, boolean z) {
        linkId_$eq(setupLinkAndMirrorForFailureTest(syncMs(), retryMs(), consumerGroup(), setupLinkAndMirrorForFailureTest$default$4()));
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        commitOffsets(sourceCluster(), topic(), 0, 5L, consumerGroup());
        verifyOffsetMigration(topic(), 0, 5L, consumerGroup(), verifyOffsetMigration$default$5());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        Uuid uuid = sourceCluster().describeTopic(topic()).topicId();
        sourceCluster().deleteTopic(topic(), true);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED);
        waitForFailure(destAdmin(), FailureType$.MODULE$.SourceTopicMayBeDeleted(), waitForFailure$default$3());
        Assertions.assertEquals(uuid, destCluster().describeMirrorTopic(topic()).sourceTopicId());
        Properties properties = new Properties();
        properties.put("max.message.bytes", "100000");
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), 1, replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(10);
        truncate(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        commitOffsets(sourceCluster(), topic(), 0, 10L, consumerGroup());
        verifyOffsetMigration(topic(), 0, 5L, consumerGroup(), verifyOffsetMigration$default$5());
        Assertions.assertEquals(numPartitions(), destCluster().describeTopic(topic()).partitions().size());
        Option apply = Option$.MODULE$.apply(destCluster().describeTopicConfig(topic()).get("max.message.bytes"));
        Assertions.assertTrue(apply.nonEmpty());
        Assertions.assertNotEquals("100000", ((ConfigEntry) apply.get()).value());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED);
        destCluster().killAllBrokers();
        destAdmin_$eq((ConfluentAdmin) restartCluster(destCluster(), !useSourceInitiatedLink()).get());
        waitForFailure(destAdmin(), FailureType$.MODULE$.SourceTopicMayBeDeleted(), waitForFailure$default$3());
        verifyMirror(topic(), verifyMirror$default$2(), false, verifyMirror$default$4());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceTopicRecreateDetectionUsingTopicIds(String str, boolean z) {
        numPartitions_$eq(1);
        linkId_$eq(setupLinkAndMirrorForFailureTest(300000L, 300000L, consumerGroup(), setupLinkAndMirrorForFailureTest$default$4()));
        produceToSourceCluster(20);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        sourceCluster().changeLeader(topicPartition);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testSourceTopicRecreateDetectionUsingTopicIds$1(this, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Destination epoch not bumped");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        produceToSourceCluster(5);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        sourceCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        truncate(20);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        waitForFailure(destAdmin(), FailureType$.MODULE$.SourceTopicIdChanged(), waitForFailure$default$3());
        destCluster().killAllBrokers();
        destAdmin_$eq((ConfluentAdmin) restartCluster(destCluster(), !useSourceInitiatedLink()).get());
        verifyMirror(topic(), verifyMirror$default$2(), false, false);
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceLeaderChanges(String str, boolean z) {
        linkId_$eq(createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5()));
        verifyMirrorWithSourceEpochChanges(false);
        verifyBasicLinkMetrics(linkId(), verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyLinkedLeaderChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testUncleanSourceLeaderElectionWithDestEpochAhead(String str, boolean z) {
        linkId_$eq(createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5()));
        verifyMirrorWithSourceEpochChanges(true);
        verifyLinkedLeaderChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestLeaderChanges(String str, boolean z) {
        numPartitions_$eq(sourceCluster().brokerCount());
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).head();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        linkId_$eq(createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5()));
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(numPartitions());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(linkId(), verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        Tuple2<Object, Object> shutdownLeader = destCluster().shutdownLeader(topicPartition);
        if (shutdownLeader == null) {
            throw new MatchError((Object) null);
        }
        int _1$mcI$sp = shutdownLeader._1$mcI$sp();
        produceToSourceCluster(numPartitions());
        waitForMirror((Seq) destCluster().brokers().filter(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testDestLeaderChanges$1(this, _1$mcI$sp, kafkaBroker));
        }), waitForMirror$default$2());
        Tuple2<Object, Object> shutdownLeader2 = destCluster().shutdownLeader(topicPartition);
        if (shutdownLeader2 == null) {
            throw new MatchError((Object) null);
        }
        int _1$mcI$sp2 = shutdownLeader2._1$mcI$sp();
        destCluster().startBroker(_1$mcI$sp);
        produceToSourceCluster(numPartitions());
        waitForMirror((Seq) destCluster().brokers().filter(kafkaBroker2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testDestLeaderChanges$2(this, _1$mcI$sp2, kafkaBroker2));
        }), waitForMirror$default$2());
        destCluster().startBroker(_1$mcI$sp2);
        produceToSourceCluster(numPartitions());
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestFollowerAheadOfLeader(String str, boolean z) {
        numPartitions_$eq(1);
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).head();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        linkId_$eq(createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5()));
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        Tuple2<Object, Object> shutdownLeader = destCluster().shutdownLeader(topicPartition);
        if (shutdownLeader == null) {
            throw new MatchError((Object) null);
        }
        int _1$mcI$sp = shutdownLeader._1$mcI$sp();
        int _2$mcI$sp = shutdownLeader._2$mcI$sp();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        Seq<KafkaBroker> aliveServers = destCluster().aliveServers();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        int awaitLeaderAndEpochChange = testUtils$.awaitLeaderAndEpochChange(aliveServers, topicPartition, _1$mcI$sp, _2$mcI$sp, 15000L);
        produceToSourceCluster(100);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            this.sourceCluster().bounceLeader(topicPartition);
            this.produceToSourceCluster(100);
        });
        Tuple2<Object, Object> shutdownLeader2 = sourceCluster().shutdownLeader(topicPartition);
        if (shutdownLeader2 == null) {
            throw new MatchError((Object) null);
        }
        int _1$mcI$sp2 = shutdownLeader2._1$mcI$sp();
        produceToSourceCluster(100);
        waitForMirror(new $colon.colon(destCluster().serverWithBrokerId(awaitLeaderAndEpochChange), Nil$.MODULE$), waitForMirror$default$2());
        verifyBasicLinkMetrics(linkId(), verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        Tuple2<Object, Object> shutdownLeader3 = destCluster().shutdownLeader(topicPartition);
        if (shutdownLeader3 == null) {
            throw new MatchError((Object) null);
        }
        int _2$mcI$sp2 = shutdownLeader3._2$mcI$sp();
        sourceCluster().shutdownLeader(topicPartition);
        truncate(100);
        sourceCluster().startBroker(_1$mcI$sp2);
        destCluster().startBroker(_1$mcI$sp);
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        Seq<KafkaBroker> aliveServers2 = destCluster().aliveServers();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        Assertions.assertEquals(_1$mcI$sp, testUtils$3.awaitLeaderAndEpochChange(aliveServers2, topicPartition, awaitLeaderAndEpochChange, _2$mcI$sp2, 15000L));
        destCluster().startBroker(awaitLeaderAndEpochChange);
        produceToSourceCluster(100);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestReplicationNotImpactedBySourceFailures(String str, boolean z) {
        Assumptions.assumeFalse(useSourceInitiatedLink());
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(new Properties(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$2(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        verifyMirror(topic(), verifyMirror$default$2(), false, false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestReplicationNotImpactedBySourceFailuresWithOldMessageFormat(String str, boolean z) {
        Assumptions.assumeFalse(useSourceInitiatedLink());
        Properties properties = new Properties();
        properties.setProperty("message.format.version", "0.9.0");
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(properties, createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$2(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        verifyMirror(topic(), verifyMirror$default$2(), false, false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestinationHighWatermark(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        Tuple2 $minus$greater$extension2;
        Tuple2 $minus$greater$extension3;
        Tuple2 $minus$greater$extension4;
        Tuple2 $minus$greater$extension5;
        Tuple2 $minus$greater$extension6;
        Assumptions.assumeFalse(useSourceInitiatedLink());
        numPartitions_$eq(1);
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).head();
        Properties properties = new Properties();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        linkId_$eq(createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5()));
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        LongRef create = LongRef.create(0L);
        TestUtils$.MODULE$.waitUntilLeaderIsKnown(destCluster().brokers(), topicPartition, TestUtils$.MODULE$.waitUntilLeaderIsKnown$default$3());
        Buffer buffer = (Buffer) ((IterableOps) destCluster().brokers().map(kafkaBroker -> {
            return kafkaBroker.replicaManager();
        })).filter(replicaManager -> {
            return BoxesRunTime.boxToBoolean($anonfun$testDestinationHighWatermark$9(topicPartition, replicaManager));
        });
        ReplicaManager replicaManager2 = sourceCluster().partitionLeader(topicPartition).replicaManager();
        produceToSourceCluster(100);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        buffer.foreach(replicaManager3 -> {
            $anonfun$testDestinationHighWatermark$10(this, topicPartition, create, replicaManager3);
            return BoxedUnit.UNIT;
        });
        create.elem = 50L;
        replicaManager2.deleteRecords(15000L, (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(create.elem))})), map -> {
            $anonfun$testDestinationHighWatermark$11(map);
            return BoxedUnit.UNIT;
        });
        produceToSourceCluster(100);
        long verifyLog$default$2$1 = verifyLog$default$2$1();
        AbstractLog abstractLog = (AbstractLog) replicaManager2.getLog(topicPartition).get();
        int size = producedRecords().size();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long logStartOffset = abstractLog.logStartOffset();
            Long boxToLong = BoxesRunTime.boxToLong(logStartOffset);
            if ($anonfun$testDestinationHighWatermark$3(create, logStartOffset)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(create.elem, tuple2._1$mcJ$sp(), new StringBuilder(29).append("Unexpected log start offset: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager2.onlinePartition(topicPartition).exists(partition -> {
            return BoxesRunTime.boxToBoolean(partition.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(create.elem).append(", end=").append(size).append(", hwm=").append(verifyLog$default$2$1).append(")").toString()).toString());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            long logEndOffset = abstractLog.logEndOffset();
            Long boxToLong2 = BoxesRunTime.boxToLong(logEndOffset);
            if ($anonfun$testDestinationHighWatermark$5(size, logEndOffset)) {
                $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong2), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong2), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple22 = $minus$greater$extension2;
        if (tuple22 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(size, tuple22._1$mcJ$sp(), new StringBuilder(27).append("Unexpected log end offset: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager2.onlinePartition(topicPartition).exists(partition2 -> {
            return BoxesRunTime.boxToBoolean(partition2.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(create.elem).append(", end=").append(size).append(", hwm=").append(verifyLog$default$2$1).append(")").toString()).toString());
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (true) {
            long highWatermark = abstractLog.highWatermark();
            Long boxToLong3 = BoxesRunTime.boxToLong(highWatermark);
            if ($anonfun$testDestinationHighWatermark$7(verifyLog$default$2$1, highWatermark)) {
                $minus$greater$extension3 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong3), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                    $minus$greater$extension3 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong3), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple23 = $minus$greater$extension3;
        if (tuple23 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(verifyLog$default$2$1, tuple23._1$mcJ$sp(), new StringBuilder(27).append("Unexpected high watermark: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager2.onlinePartition(topicPartition).exists(partition22 -> {
            return BoxesRunTime.boxToBoolean(partition22.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(create.elem).append(", end=").append(size).append(", hwm=").append(verifyLog$default$2$1).append(")").toString()).toString());
        buffer.foreach(replicaManager4 -> {
            $anonfun$testDestinationHighWatermark$12(this, topicPartition, create, replicaManager4);
            return BoxedUnit.UNIT;
        });
        int brokerId = ((ReplicaManager) ((IterableOps) buffer.filter(replicaManager5 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testDestinationHighWatermark$13(topicPartition, replicaManager5));
        })).head()).config().brokerId();
        destCluster().serverWithBrokerId(brokerId).replicaManager().replicaFetcherManager().closeAllFetchers();
        bufferedProduceToSource$1(100);
        long size2 = producedRecords().size() - 100;
        ReplicaManager verifyLog$default$1$1 = verifyLog$default$1$1(topicPartition);
        AbstractLog abstractLog2 = (AbstractLog) verifyLog$default$1$1.getLog(topicPartition).get();
        int size3 = producedRecords().size();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        long currentTimeMillis4 = System.currentTimeMillis();
        while (true) {
            long logStartOffset2 = abstractLog2.logStartOffset();
            Long boxToLong4 = BoxesRunTime.boxToLong(logStartOffset2);
            if ($anonfun$testDestinationHighWatermark$3(create, logStartOffset2)) {
                $minus$greater$extension4 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong4), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis4 + 15000) {
                    $minus$greater$extension4 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong4), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple24 = $minus$greater$extension4;
        if (tuple24 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(create.elem, tuple24._1$mcJ$sp(), new StringBuilder(29).append("Unexpected log start offset: ").append(new StringBuilder(67).append("isLeader=").append(verifyLog$default$1$1.onlinePartition(topicPartition).exists(partition222 -> {
            return BoxesRunTime.boxToBoolean(partition222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog2.logStartOffset()).append(", end=").append(abstractLog2.logEndOffset()).append(", hwm=").append(abstractLog2.highWatermark()).append(") ").append("expected (start=").append(create.elem).append(", end=").append(size3).append(", hwm=").append(size2).append(")").toString()).toString());
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        long currentTimeMillis5 = System.currentTimeMillis();
        while (true) {
            long logEndOffset2 = abstractLog2.logEndOffset();
            Long boxToLong5 = BoxesRunTime.boxToLong(logEndOffset2);
            if ($anonfun$testDestinationHighWatermark$5(size3, logEndOffset2)) {
                $minus$greater$extension5 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong5), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis5 + 15000) {
                    $minus$greater$extension5 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong5), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple25 = $minus$greater$extension5;
        if (tuple25 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(size3, tuple25._1$mcJ$sp(), new StringBuilder(27).append("Unexpected log end offset: ").append(new StringBuilder(67).append("isLeader=").append(verifyLog$default$1$1.onlinePartition(topicPartition).exists(partition2222 -> {
            return BoxesRunTime.boxToBoolean(partition2222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog2.logStartOffset()).append(", end=").append(abstractLog2.logEndOffset()).append(", hwm=").append(abstractLog2.highWatermark()).append(") ").append("expected (start=").append(create.elem).append(", end=").append(size3).append(", hwm=").append(size2).append(")").toString()).toString());
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        TestUtils$ testUtils$17 = TestUtils$.MODULE$;
        TestUtils$ testUtils$18 = TestUtils$.MODULE$;
        long currentTimeMillis6 = System.currentTimeMillis();
        while (true) {
            long highWatermark2 = abstractLog2.highWatermark();
            Long boxToLong6 = BoxesRunTime.boxToLong(highWatermark2);
            if ($anonfun$testDestinationHighWatermark$7(size2, highWatermark2)) {
                $minus$greater$extension6 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong6), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis6 + 15000) {
                    $minus$greater$extension6 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong6), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple26 = $minus$greater$extension6;
        if (tuple26 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(size2, tuple26._1$mcJ$sp(), new StringBuilder(27).append("Unexpected high watermark: ").append(new StringBuilder(67).append("isLeader=").append(verifyLog$default$1$1.onlinePartition(topicPartition).exists(partition22222 -> {
            return BoxesRunTime.boxToBoolean(partition22222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog2.logStartOffset()).append(", end=").append(abstractLog2.logEndOffset()).append(", hwm=").append(abstractLog2.highWatermark()).append(") ").append("expected (start=").append(create.elem).append(", end=").append(size3).append(", hwm=").append(size2).append(")").toString()).toString());
        destCluster().shutdownBroker(brokerId);
        destCluster().startBroker(brokerId);
        produceToSourceCluster(100);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyLog$1(verifyLog$default$1$1(topicPartition), verifyLog$default$2$1(), topicPartition, create);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})));
        destCluster().serverWithBrokerId(brokerId).replicaManager().replicaFetcherManager().closeAllFetchers();
        bufferedProduceToSource$1(100);
        create.elem = ((AbstractLog) replicaManager2.getLog(topicPartition).get()).logEndOffset() - 50;
        replicaManager2.deleteRecords(15000L, (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(create.elem))})), map2 -> {
            $anonfun$testDestinationHighWatermark$15(map2);
            return BoxedUnit.UNIT;
        });
        verifyLog$1(replicaManager2, verifyLog$default$2$1(), topicPartition, create);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})));
        create.elem -= 50;
        verifyLog$1(verifyLog$default$1$1(topicPartition), create.elem, topicPartition, create);
        create.elem += 50;
        destCluster().shutdownBroker(brokerId);
        destCluster().startBroker(brokerId);
        produceToSourceCluster(100);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyLog$1(verifyLog$default$1$1(topicPartition), verifyLog$default$2$1(), topicPartition, create);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})));
        destCluster().serverWithBrokerId(brokerId).replicaManager().replicaFetcherManager().closeAllFetchers();
        bufferedProduceToSource$1(100);
        create.elem = ((AbstractLog) replicaManager2.getLog(topicPartition).get()).logEndOffset();
        replicaManager2.deleteRecords(15000L, (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(create.elem))})), map3 -> {
            $anonfun$testDestinationHighWatermark$16(map3);
            return BoxedUnit.UNIT;
        });
        verifyLog$1(replicaManager2, verifyLog$default$2$1(), topicPartition, create);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), new $colon.colon(destCluster().partitionLeader(topicPartition), Nil$.MODULE$), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        verifyLog$1(verifyLog$default$1$1(topicPartition), verifyLog$default$2$1(), topicPartition, create);
        bufferedProduceToSource$1(100);
        verifyLog$1(verifyLog$default$1$1(topicPartition), create.elem, topicPartition, create);
        destCluster().shutdownBroker(brokerId);
        produceToSourceCluster(100);
        verifyLog$1(verifyLog$default$1$1(topicPartition), verifyLog$default$2$1(), topicPartition, create);
        destCluster().startBroker(brokerId);
        verifyLog$1(destCluster().serverWithBrokerId(brokerId).replicaManager(), verifyLog$default$2$1(), topicPartition, create);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testNoTruncationBelowHighWatermarkWithEmptyLeaderEpochCache(String str) {
        Assumptions.assumeFalse(useSourceInitiatedLink());
        Assumptions.assumeTrue(clusterLinkPrefix().isEmpty());
        verifyNoTruncationBelowHighWatermarkWithEmptyLeaderEpochCache();
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDeleteAutoCreatedMirrorTopicForFailedLink(String str, boolean z) {
        String str2 = "autoCreatedTopic";
        linkId_$eq(setupLinkAndMirrorForFailureTest(syncMs(), retryMs(), consumerGroup(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(197).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"").append("autoCreatedTopic").append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString())))}))));
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic("autoCreatedTopic", numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic("nonAutoCreatedTopic", numPartitions(), 2, sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic("nonAutoCreatedTopic", (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForAutoMirrorCreation("autoCreatedTopic");
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "999.999.999.999:0000")})), destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), false);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testDeleteAutoCreatedMirrorTopicForFailedLink$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("cluster link didn't fail as expected");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        destCluster().deleteTopic(topic(), true);
        Assertions.assertThrows(TopicDeletionDisabledException.class, () -> {
            this.destCluster().deleteTopic(str2, false);
        });
        destCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.restartDeadBrokers(destCluster3.restartDeadBrokers$default$1());
        destCluster().updateBootstrapServers();
        destCluster().deleteTopic("nonAutoCreatedTopic", true);
        Assertions.assertThrows(TopicDeletionDisabledException.class, () -> {
            this.destCluster().deleteTopic(str2, false);
        });
    }

    public void verifyFailureAndRecovery(FailureType failureType, Function0<BoxedUnit> function0, Function0<BoxedUnit> function02, boolean z) {
        linkId_$eq(setupLinkAndMirrorForFailureTest(syncMs(), retryMs(), consumerGroup(), setupLinkAndMirrorForFailureTest$default$4()));
        produceToSourceCluster(numRecords());
        commitOffsets(sourceCluster(), topic(), 0, 5L, consumerGroup());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        function0.apply$mcV$sp();
        waitForFailure(destAdmin(), failureType, waitForFailure$default$3());
        function02.apply$mcV$sp();
        waitForRecovery();
        produceToSourceCluster(numRecords());
        commitOffsets(sourceCluster(), topic(), 0, 10L, consumerGroup());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyOffsetMigration(topic(), 0, 10L, consumerGroup(), verifyOffsetMigration$default$5());
        if (z) {
            double unavailabilityCountMetricValueOnDest = unavailabilityCountMetricValueOnDest(linkName(), unavailabilityCountMetricValueOnDest$default$2());
            Thread.sleep(5 * retryMs());
            Assertions.assertEquals(unavailabilityCountMetricValueOnDest, unavailabilityCountMetricValueOnDest(linkName(), unavailabilityCountMetricValueOnDest$default$2()), 0.0d);
        }
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.unlinkTopic(topic(), linkName(), destCluster.unlinkTopic$default$3(), false, destCluster.unlinkTopic$default$5(), destCluster.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.deleteClusterLink(linkName(), destCluster2.deleteClusterLink$default$2(), destCluster2.deleteClusterLink$default$3());
    }

    public boolean verifyFailureAndRecovery$default$4() {
        return false;
    }

    private void waitForRecovery() {
        Tuple2 $minus$greater$extension;
        Tuple2 $minus$greater$extension2;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            ClusterLinkListing $anonfun$waitForRecovery$1 = $anonfun$waitForRecovery$1(this);
            if ($anonfun$waitForRecovery$1.available()) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$waitForRecovery$1), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$waitForRecovery$1), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertTrue(((ClusterLinkListing) tuple2._1()).available(), "Link not available");
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        Set set = (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new ReplicaStatus.MirrorInfo.State[]{ReplicaStatus.MirrorInfo.State.ACTIVE}));
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            Set $anonfun$waitForRecovery$3 = $anonfun$waitForRecovery$3(this);
            if ($anonfun$waitForRecovery$4(set, $anonfun$waitForRecovery$3)) {
                $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$waitForRecovery$3), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$waitForRecovery$3), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple22 = $minus$greater$extension2;
        if (tuple22 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(set, (Set) tuple22._1());
        waitForLinkCountMetric(destinationLinkMode(), "active", destCluster());
        Assertions.assertEquals(1.0d, totalKafkaMetricValue(destCluster().aliveServers(), "mirror-topic-count", (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("state"), TopicLinkMirror$.MODULE$.name())})), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()));
        Assertions.assertEquals(numPartitions(), totalKafkaMetricValue(destCluster().aliveServers(), "mirror-partition-count", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()));
        kafkaMetricMaxValue(destCluster().aliveServers(), "broker-failed-link-count", ClusterLinkMetrics$.MODULE$.metricsGroup(), new Some(linkName()), (Map) Map$.MODULE$.empty(), false, kafkaMetricMaxValue$default$7());
    }

    private AclBinding aclBinding(ResourceType resourceType, AclPermissionType aclPermissionType) {
        return new AclBinding(new ResourcePattern(resourceType, "*", PatternType.LITERAL), new AccessControlEntry(new StringBuilder(5).append("User:").append(linkUserName(linkName())).toString(), "*", AclOperation.ALL, aclPermissionType));
    }

    private void addLinkAcls() {
        sourceCluster().addAcls(new $colon.colon(aclBinding(ResourceType.TOPIC, AclPermissionType.ALLOW), new $colon.colon(aclBinding(ResourceType.CLUSTER, AclPermissionType.ALLOW), new $colon.colon(aclBinding(ResourceType.GROUP, AclPermissionType.ALLOW), Nil$.MODULE$))));
    }

    public static final /* synthetic */ ClusterLinkListing $anonfun$testMirrorTopicOperationsOnLinkFailure$2(ClusterLinkFailureTest clusterLinkFailureTest) {
        return clusterLinkFailureTest.linkInfo(clusterLinkFailureTest.destAdmin());
    }

    public static final /* synthetic */ boolean $anonfun$testDeleteLinkDuringRetry$1(ClusterLinkFailureTest clusterLinkFailureTest) {
        return ((KafkaBroker) clusterLinkFailureTest.destCluster().brokers().head()).clusterLinkManager().linkState(clusterLinkFailureTest.linkName()).equals(FailedClusterLink$.MODULE$);
    }

    public static final /* synthetic */ String $anonfun$testDeleteLinkDuringRetry$2() {
        return "cluster link didn't fail as expected";
    }

    public static final /* synthetic */ boolean $anonfun$testDeleteLinkDuringRetry$3(ClusterLinkFailureTest clusterLinkFailureTest, KafkaBroker kafkaBroker) {
        return kafkaBroker.clusterLinkManager().getLinkRetryState(clusterLinkFailureTest.linkName()).nonEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void makeConfigUndecodable$1(Properties properties) {
        Properties decode = ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().configEncoder().decode(((KafkaServer) destCluster().servers().head()).zkClient().getEntityConfigs(ConfigType$.MODULE$.ClusterLink(), CoreUtils$.MODULE$.toJavaUUID(linkId()).toString()));
        properties.putAll(decode);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:2181", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.PasswordEncoderSecretProp(), "bad-secret");
        new AdminZkClient(((KafkaServer) destCluster().servers().head()).zkClient()).changeClusterLinkConfig(linkId(), new SecureLinkConfigEncoder(KafkaConfig$.MODULE$.fromProps(createBrokerConfig)).encode(decode));
    }

    public static final /* synthetic */ boolean $anonfun$testUndecodableConfigs$2(Tuple2 tuple2) {
        Object _1 = tuple2._1();
        String AvailabilityCheckMsProp = ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp();
        return _1 == null ? AvailabilityCheckMsProp == null : _1.equals(AvailabilityCheckMsProp);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void reconfigureLink$1(Properties properties) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, linkName());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        KafkaFuture all = destAdmin().describeConfigs(Collections.singleton(configResource)).all();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        testUtils$.assertFutureExceptionTypeEquals(all, InvalidConfigurationException.class, None$.MODULE$);
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.alterClusterLink(this.linkName(), (Map) CollectionConverters$.MODULE$.PropertiesHasAsScala(properties).asScala().filterNot(tuple2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$testUndecodableConfigs$2(tuple2));
            }));
        });
        alterClusterLink(linkName(), CollectionConverters$.MODULE$.PropertiesHasAsScala(properties).asScala());
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeWithoutConfigs$1(ClusterLinkFailureTest clusterLinkFailureTest) {
        return ((KafkaBroker) clusterLinkFailureTest.destCluster().brokers().head()).clusterLinkManager().linkState(clusterLinkFailureTest.linkName()).equals(FailedClusterLink$.MODULE$);
    }

    public static final /* synthetic */ String $anonfun$testListDescribeWithoutConfigs$2() {
        return "cluster link didn't fail as expected";
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeWithoutConfigs$3(ClusterLinkFailureTest clusterLinkFailureTest, ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.linkName().equals(clusterLinkFailureTest.linkName());
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeWithoutConfigs$4(ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.linkState().equals(ClusterLinkDescription.LinkState.FAILED);
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeWithoutConfigs$5(ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.linkName().equals("test-link2");
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeWithoutConfigs$6(ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.linkState().equals(ClusterLinkDescription.LinkState.ACTIVE);
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeWithoutConfigs$7(ClusterLinkFailureTest clusterLinkFailureTest, ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.linkName().equals(clusterLinkFailureTest.linkName());
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeWithoutConfigs$8(ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.clusterLinkError().equals(ClusterLinkError.UNKNOWN);
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeWithoutConfigs$9(ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.linkName().equals("test-link2");
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeWithoutConfigs$10(ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.clusterLinkError().equals(ClusterLinkError.NO_ERROR);
    }

    public static final /* synthetic */ boolean $anonfun$testSourceTopicRecreateDetectionUsingTopicIds$1(ClusterLinkFailureTest clusterLinkFailureTest, TopicPartition topicPartition) {
        return clusterLinkFailureTest.destCluster().leaderEpoch(topicPartition) > 0;
    }

    public static final /* synthetic */ String $anonfun$testSourceTopicRecreateDetectionUsingTopicIds$2() {
        return "Destination epoch not bumped";
    }

    public static final /* synthetic */ boolean $anonfun$testDestLeaderChanges$1(ClusterLinkFailureTest clusterLinkFailureTest, int i, KafkaBroker kafkaBroker) {
        KafkaBroker serverWithBrokerId = clusterLinkFailureTest.destCluster().serverWithBrokerId(i);
        return kafkaBroker == null ? serverWithBrokerId != null : !kafkaBroker.equals(serverWithBrokerId);
    }

    public static final /* synthetic */ boolean $anonfun$testDestLeaderChanges$2(ClusterLinkFailureTest clusterLinkFailureTest, int i, KafkaBroker kafkaBroker) {
        KafkaBroker serverWithBrokerId = clusterLinkFailureTest.destCluster().serverWithBrokerId(i);
        return kafkaBroker == null ? serverWithBrokerId != null : !kafkaBroker.equals(serverWithBrokerId);
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationHighWatermark$3(LongRef longRef, long j) {
        return j == longRef.elem;
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationHighWatermark$5(int i, long j) {
        return j == ((long) i);
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationHighWatermark$7(long j, long j2) {
        return j2 == j;
    }

    private final void verifyLog$1(ReplicaManager replicaManager, long j, TopicPartition topicPartition, LongRef longRef) {
        Tuple2 $minus$greater$extension;
        Tuple2 $minus$greater$extension2;
        Tuple2 $minus$greater$extension3;
        AbstractLog abstractLog = (AbstractLog) replicaManager.getLog(topicPartition).get();
        int size = producedRecords().size();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long logStartOffset = abstractLog.logStartOffset();
            Long boxToLong = BoxesRunTime.boxToLong(logStartOffset);
            if (logStartOffset == longRef.elem) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(longRef.elem, tuple2._1$mcJ$sp(), new StringBuilder(29).append("Unexpected log start offset: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager.onlinePartition(topicPartition).exists(partition22222 -> {
            return BoxesRunTime.boxToBoolean(partition22222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(longRef.elem).append(", end=").append(size).append(", hwm=").append(j).append(")").toString()).toString());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            long logEndOffset = abstractLog.logEndOffset();
            Long boxToLong2 = BoxesRunTime.boxToLong(logEndOffset);
            if ($anonfun$testDestinationHighWatermark$5(size, logEndOffset)) {
                $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong2), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong2), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple22 = $minus$greater$extension2;
        if (tuple22 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(size, tuple22._1$mcJ$sp(), new StringBuilder(27).append("Unexpected log end offset: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager.onlinePartition(topicPartition).exists(partition222222 -> {
            return BoxesRunTime.boxToBoolean(partition222222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(longRef.elem).append(", end=").append(size).append(", hwm=").append(j).append(")").toString()).toString());
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (true) {
            long highWatermark = abstractLog.highWatermark();
            Long boxToLong3 = BoxesRunTime.boxToLong(highWatermark);
            if ($anonfun$testDestinationHighWatermark$7(j, highWatermark)) {
                $minus$greater$extension3 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong3), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                    $minus$greater$extension3 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong3), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple23 = $minus$greater$extension3;
        if (tuple23 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(j, tuple23._1$mcJ$sp(), new StringBuilder(27).append("Unexpected high watermark: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager.onlinePartition(topicPartition).exists(partition2222222 -> {
            return BoxesRunTime.boxToBoolean(partition2222222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(longRef.elem).append(", end=").append(size).append(", hwm=").append(j).append(")").toString()).toString());
    }

    private final ReplicaManager verifyLog$default$1$1(TopicPartition topicPartition) {
        return destCluster().partitionLeader(topicPartition).replicaManager();
    }

    private final long verifyLog$default$2$1() {
        return producedRecords().size();
    }

    private final void bufferedProduceToSource$1(int i) {
        sourceCluster().producerConfig().setProperty("linger.ms", "100");
        produceToSourceCluster(i);
        sourceCluster().producerConfig().remove("linger.ms");
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationHighWatermark$9(TopicPartition topicPartition, ReplicaManager replicaManager) {
        return replicaManager.getLog(topicPartition).nonEmpty();
    }

    public static final /* synthetic */ void $anonfun$testDestinationHighWatermark$10(ClusterLinkFailureTest clusterLinkFailureTest, TopicPartition topicPartition, LongRef longRef, ReplicaManager replicaManager) {
        Tuple2 $minus$greater$extension;
        Tuple2 $minus$greater$extension2;
        Tuple2 $minus$greater$extension3;
        long verifyLog$default$2$1 = clusterLinkFailureTest.verifyLog$default$2$1();
        AbstractLog abstractLog = (AbstractLog) replicaManager.getLog(topicPartition).get();
        int size = clusterLinkFailureTest.producedRecords().size();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long logStartOffset = abstractLog.logStartOffset();
            Long boxToLong = BoxesRunTime.boxToLong(logStartOffset);
            if (logStartOffset == longRef.elem) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(longRef.elem, tuple2._1$mcJ$sp(), new StringBuilder(29).append("Unexpected log start offset: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager.onlinePartition(topicPartition).exists(partition2222222 -> {
            return BoxesRunTime.boxToBoolean(partition2222222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(longRef.elem).append(", end=").append(size).append(", hwm=").append(verifyLog$default$2$1).append(")").toString()).toString());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            long logEndOffset = abstractLog.logEndOffset();
            Long boxToLong2 = BoxesRunTime.boxToLong(logEndOffset);
            if ($anonfun$testDestinationHighWatermark$5(size, logEndOffset)) {
                $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong2), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong2), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple22 = $minus$greater$extension2;
        if (tuple22 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(size, tuple22._1$mcJ$sp(), new StringBuilder(27).append("Unexpected log end offset: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager.onlinePartition(topicPartition).exists(partition22222222 -> {
            return BoxesRunTime.boxToBoolean(partition22222222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(longRef.elem).append(", end=").append(size).append(", hwm=").append(verifyLog$default$2$1).append(")").toString()).toString());
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (true) {
            long highWatermark = abstractLog.highWatermark();
            Long boxToLong3 = BoxesRunTime.boxToLong(highWatermark);
            if ($anonfun$testDestinationHighWatermark$7(verifyLog$default$2$1, highWatermark)) {
                $minus$greater$extension3 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong3), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                    $minus$greater$extension3 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong3), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple23 = $minus$greater$extension3;
        if (tuple23 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(verifyLog$default$2$1, tuple23._1$mcJ$sp(), new StringBuilder(27).append("Unexpected high watermark: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager.onlinePartition(topicPartition).exists(partition222222222 -> {
            return BoxesRunTime.boxToBoolean(partition222222222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(longRef.elem).append(", end=").append(size).append(", hwm=").append(verifyLog$default$2$1).append(")").toString()).toString());
    }

    public static final /* synthetic */ void $anonfun$testDestinationHighWatermark$11(Map map) {
    }

    public static final /* synthetic */ void $anonfun$testDestinationHighWatermark$12(ClusterLinkFailureTest clusterLinkFailureTest, TopicPartition topicPartition, LongRef longRef, ReplicaManager replicaManager) {
        Tuple2 $minus$greater$extension;
        Tuple2 $minus$greater$extension2;
        Tuple2 $minus$greater$extension3;
        long verifyLog$default$2$1 = clusterLinkFailureTest.verifyLog$default$2$1();
        AbstractLog abstractLog = (AbstractLog) replicaManager.getLog(topicPartition).get();
        int size = clusterLinkFailureTest.producedRecords().size();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long logStartOffset = abstractLog.logStartOffset();
            Long boxToLong = BoxesRunTime.boxToLong(logStartOffset);
            if (logStartOffset == longRef.elem) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(longRef.elem, tuple2._1$mcJ$sp(), new StringBuilder(29).append("Unexpected log start offset: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager.onlinePartition(topicPartition).exists(partition222222222 -> {
            return BoxesRunTime.boxToBoolean(partition222222222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(longRef.elem).append(", end=").append(size).append(", hwm=").append(verifyLog$default$2$1).append(")").toString()).toString());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            long logEndOffset = abstractLog.logEndOffset();
            Long boxToLong2 = BoxesRunTime.boxToLong(logEndOffset);
            if ($anonfun$testDestinationHighWatermark$5(size, logEndOffset)) {
                $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong2), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong2), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple22 = $minus$greater$extension2;
        if (tuple22 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(size, tuple22._1$mcJ$sp(), new StringBuilder(27).append("Unexpected log end offset: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager.onlinePartition(topicPartition).exists(partition2222222222 -> {
            return BoxesRunTime.boxToBoolean(partition2222222222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(longRef.elem).append(", end=").append(size).append(", hwm=").append(verifyLog$default$2$1).append(")").toString()).toString());
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (true) {
            long highWatermark = abstractLog.highWatermark();
            Long boxToLong3 = BoxesRunTime.boxToLong(highWatermark);
            if ($anonfun$testDestinationHighWatermark$7(verifyLog$default$2$1, highWatermark)) {
                $minus$greater$extension3 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong3), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                    $minus$greater$extension3 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToLong3), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple23 = $minus$greater$extension3;
        if (tuple23 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(verifyLog$default$2$1, tuple23._1$mcJ$sp(), new StringBuilder(27).append("Unexpected high watermark: ").append(new StringBuilder(67).append("isLeader=").append(replicaManager.onlinePartition(topicPartition).exists(partition22222222222 -> {
            return BoxesRunTime.boxToBoolean(partition22222222222.isLeader());
        })).append(" ").append("actual (start=").append(abstractLog.logStartOffset()).append(", end=").append(abstractLog.logEndOffset()).append(", hwm=").append(abstractLog.highWatermark()).append(") ").append("expected (start=").append(longRef.elem).append(", end=").append(size).append(", hwm=").append(verifyLog$default$2$1).append(")").toString()).toString());
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationHighWatermark$14(Partition partition) {
        return !partition.isLeader();
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationHighWatermark$13(TopicPartition topicPartition, ReplicaManager replicaManager) {
        return replicaManager.onlinePartition(topicPartition).exists(partition -> {
            return BoxesRunTime.boxToBoolean($anonfun$testDestinationHighWatermark$14(partition));
        });
    }

    public static final /* synthetic */ void $anonfun$testDestinationHighWatermark$15(Map map) {
    }

    public static final /* synthetic */ void $anonfun$testDestinationHighWatermark$16(Map map) {
    }

    public static final /* synthetic */ boolean $anonfun$testDeleteAutoCreatedMirrorTopicForFailedLink$1(ClusterLinkFailureTest clusterLinkFailureTest) {
        return ((KafkaBroker) clusterLinkFailureTest.destCluster().brokers().head()).clusterLinkManager().linkState(clusterLinkFailureTest.linkName()).equals(FailedClusterLink$.MODULE$);
    }

    public static final /* synthetic */ String $anonfun$testDeleteAutoCreatedMirrorTopicForFailedLink$2() {
        return "cluster link didn't fail as expected";
    }

    public static final /* synthetic */ ClusterLinkListing $anonfun$waitForRecovery$1(ClusterLinkFailureTest clusterLinkFailureTest) {
        return clusterLinkFailureTest.linkInfo(clusterLinkFailureTest.destAdmin());
    }

    public static final /* synthetic */ Set $anonfun$waitForRecovery$3(ClusterLinkFailureTest clusterLinkFailureTest) {
        return clusterLinkFailureTest.mirrorPartitionStates(clusterLinkFailureTest.destAdmin(), clusterLinkFailureTest.mirrorPartitionStates$default$2());
    }

    public static final /* synthetic */ boolean $anonfun$waitForRecovery$4(Set set, Set set2) {
        return set2 == null ? set == null : set2.equals(set);
    }
}
