package kafka.coordinator.transaction;

import com.yammer.metrics.core.MetricName;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import kafka.common.RequestAndCompletionHandler;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
import kafka.server.RequestLocal;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: TransactionMarkerChannelManagerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\t}a\u0001B\u001f?\u0001\u0015CQ\u0001\u0014\u0001\u0005\u00025Cq\u0001\u0015\u0001C\u0002\u0013%\u0011\u000b\u0003\u0004Y\u0001\u0001\u0006IA\u0015\u0005\b3\u0002\u0011\r\u0011\"\u0003[\u0011\u00191\u0007\u0001)A\u00057\"9q\r\u0001b\u0001\n\u0013A\u0007B\u00027\u0001A\u0003%\u0011\u000eC\u0004n\u0001\t\u0007I\u0011\u00028\t\rU\u0004\u0001\u0015!\u0003p\u0011\u001d1\bA1A\u0005\n9Daa\u001e\u0001!\u0002\u0013y\u0007b\u0002=\u0001\u0005\u0004%I!\u001f\u0005\u0007{\u0002\u0001\u000b\u0011\u0002>\t\u000fy\u0004!\u0019!C\u0005s\"1q\u0010\u0001Q\u0001\niD\u0011\"!\u0001\u0001\u0005\u0004%I!a\u0001\t\u0011\u0005U\u0001\u0001)A\u0005\u0003\u000bA\u0011\"a\u0006\u0001\u0005\u0004%I!a\u0001\t\u0011\u0005e\u0001\u0001)A\u0005\u0003\u000bA\u0011\"a\u0007\u0001\u0005\u0004%I!!\b\t\u0011\u0005\u0015\u0002\u0001)A\u0005\u0003?A\u0011\"a\n\u0001\u0005\u0004%I!!\b\t\u0011\u0005%\u0002\u0001)A\u0005\u0003?A\u0011\"a\u000b\u0001\u0005\u0004%I!!\f\t\u0011\u0005U\u0002\u0001)A\u0005\u0003_A\u0011\"a\u000e\u0001\u0005\u0004%I!!\f\t\u0011\u0005e\u0002\u0001)A\u0005\u0003_A\u0011\"a\u000f\u0001\u0005\u0004%I!!\u0010\t\u0011\u0005\u0015\u0003\u0001)A\u0005\u0003\u007fA\u0011\"a\u0012\u0001\u0005\u0004%I!!\u0010\t\u0011\u0005%\u0003\u0001)A\u0005\u0003\u007fA\u0011\"a\u0013\u0001\u0005\u0004%I!!\u0010\t\u0011\u00055\u0003\u0001)A\u0005\u0003\u007fA\u0011\"a\u0014\u0001\u0005\u0004%I!!\u0010\t\u0011\u0005E\u0003\u0001)A\u0005\u0003\u007fA\u0011\"a\u0015\u0001\u0005\u0004%I!!\u0016\t\u0011\u0005\r\u0004\u0001)A\u0005\u0003/B\u0011\"!\u001a\u0001\u0005\u0004%I!a\u001a\t\u0011\u0005=\u0004\u0001)A\u0005\u0003SB\u0011\"!\u001d\u0001\u0005\u0004%I!a\u001a\t\u0011\u0005M\u0004\u0001)A\u0005\u0003SB\u0011\"!\u001e\u0001\u0005\u0004%I!a\u001e\t\u0011\u0005u\u0005\u0001)A\u0005\u0003sB\u0011\"a(\u0001\u0005\u0004%I!!)\t\u0011\u0005=\u0006\u0001)A\u0005\u0003GC\u0011\"!-\u0001\u0005\u0004%I!a-\t\u0011\u0005m\u0006\u0001)A\u0005\u0003kCq!!0\u0001\t\u0013\ty\fC\u0004\u0002B\u0002!\t!a0\t\u000f\u0005e\u0007\u0001\"\u0001\u0002@\"9\u0011Q\u001c\u0001\u0005\u0002\u0005}\u0006bBAq\u0001\u0011\u0005\u0011q\u0018\u0005\b\u0003K\u0004A\u0011AA`\u0011\u001d\tI\u000f\u0001C\u0001\u0003\u007fCq!!<\u0001\t\u0003\ty\fC\u0004\u0002r\u0002!\t!a0\t\u000f\u0005U\b\u0001\"\u0001\u0002@\"9\u0011\u0011 \u0001\u0005\u0002\u0005}\u0006bBA\u007f\u0001\u0011%\u0011q \u0005\b\u00057\u0001A\u0011AA`\u0005\r\"&/\u00198tC\u000e$\u0018n\u001c8NCJ\\WM]\"iC:tW\r\\'b]\u0006<WM\u001d+fgRT!a\u0010!\u0002\u0017Q\u0014\u0018M\\:bGRLwN\u001c\u0006\u0003\u0003\n\u000b1bY8pe\u0012Lg.\u0019;pe*\t1)A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u00011\u0005CA$K\u001b\u0005A%\"A%\u0002\u000bM\u001c\u0017\r\\1\n\u0005-C%AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002\u001dB\u0011q\nA\u0007\u0002}\u0005iQ.\u001a;bI\u0006$\u0018mQ1dQ\u0016,\u0012A\u0015\t\u0003'Zk\u0011\u0001\u0016\u0006\u0003+\n\u000baa]3sm\u0016\u0014\u0018BA,U\u00055iU\r^1eCR\f7)Y2iK\u0006qQ.\u001a;bI\u0006$\u0018mQ1dQ\u0016\u0004\u0013!\u00048fi^|'o[\"mS\u0016tG/F\u0001\\!\taF-D\u0001^\u0015\tqv,A\u0004dY&,g\u000e^:\u000b\u0005\r\u0003'BA1c\u0003\u0019\t\u0007/Y2iK*\t1-A\u0002pe\u001eL!!Z/\u0003\u001b9+Go^8sW\u000ec\u0017.\u001a8u\u00039qW\r^<pe.\u001cE.[3oi\u0002\nq\u0002\u001e=o'R\fG/Z'b]\u0006<WM]\u000b\u0002SB\u0011qJ[\u0005\u0003Wz\u0012q\u0003\u0016:b]N\f7\r^5p]N#\u0018\r^3NC:\fw-\u001a:\u0002!QDhn\u0015;bi\u0016l\u0015M\\1hKJ\u0004\u0013A\u00039beRLG/[8ocU\tq\u000e\u0005\u0002qg6\t\u0011O\u0003\u0002s?\u000611m\\7n_:L!\u0001^9\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u0006Y\u0001/\u0019:uSRLwN\\\u0019!\u0003)\u0001\u0018M\u001d;ji&|gNM\u0001\fa\u0006\u0014H/\u001b;j_:\u0014\u0004%A\u0004ce>\\WM]\u0019\u0016\u0003i\u0004\"\u0001]>\n\u0005q\f(\u0001\u0002(pI\u0016\f\u0001B\u0019:pW\u0016\u0014\u0018\u0007I\u0001\bEJ|7.\u001a:3\u0003!\u0011'o\\6feJ\u0002\u0013\u0001\u0005;sC:\u001c\u0018m\u0019;j_:\fG.\u001332+\t\t)\u0001\u0005\u0003\u0002\b\u0005EQBAA\u0005\u0015\u0011\tY!!\u0004\u0002\t1\fgn\u001a\u0006\u0003\u0003\u001f\tAA[1wC&!\u00111CA\u0005\u0005\u0019\u0019FO]5oO\u0006\tBO]1og\u0006\u001cG/[8oC2LE-\r\u0011\u0002!Q\u0014\u0018M\\:bGRLwN\\1m\u0013\u0012\u0014\u0014!\u0005;sC:\u001c\u0018m\u0019;j_:\fG.\u001333A\u0005Y\u0001O]8ek\u000e,'/\u001332+\t\ty\u0002E\u0002H\u0003CI1!a\tI\u0005\u0011auN\\4\u0002\u0019A\u0014x\u000eZ;dKJLE-\r\u0011\u0002\u0017A\u0014x\u000eZ;dKJLEMM\u0001\raJ|G-^2fe&#'\u0007I\u0001\u000eaJ|G-^2fe\u0016\u0003xn\u00195\u0016\u0005\u0005=\u0002cA$\u00022%\u0019\u00111\u0007%\u0003\u000bMCwN\u001d;\u0002\u001dA\u0014x\u000eZ;dKJ,\u0005o\\2iA\u0005\tB.Y:u!J|G-^2fe\u0016\u0003xn\u00195\u0002%1\f7\u000f\u001e)s_\u0012,8-\u001a:Fa>\u001c\u0007\u000eI\u0001\u0013ibtGk\u001c9jGB\u000b'\u000f^5uS>t\u0017'\u0006\u0002\u0002@A\u0019q)!\u0011\n\u0007\u0005\r\u0003JA\u0002J]R\f1\u0003\u001e=o)>\u0004\u0018n\u0019)beRLG/[8oc\u0001\n!\u0003\u001e=o)>\u0004\u0018n\u0019)beRLG/[8oe\u0005\u0019B\u000f\u001f8U_BL7\rU1si&$\u0018n\u001c83A\u0005\u00012m\\8sI&t\u0017\r^8s\u000bB|7\r[\u0001\u0012G>|'\u000fZ5oCR|'/\u00129pG\"\u0004\u0013\u0001\u0004;y]RKW.Z8vi6\u001b\u0018!\u0004;y]RKW.Z8vi6\u001b\b%A\u0005uq:\u0014Vm];miV\u0011\u0011q\u000b\t\u0005\u00033\ny&\u0004\u0002\u0002\\)\u0019\u0011QL9\u0002\u0011I,\u0017/^3tiNLA!!\u0019\u0002\\\t\tBK]1og\u0006\u001cG/[8o%\u0016\u001cX\u000f\u001c;\u0002\u0015QDhNU3tk2$\b%\u0001\u0007uq:lU\r^1eCR\f\u0017'\u0006\u0002\u0002jA\u0019q*a\u001b\n\u0007\u00055dHA\nUe\u0006t7/Y2uS>tW*\u001a;bI\u0006$\u0018-A\u0007uq:lU\r^1eCR\f\u0017\u0007I\u0001\ribtW*\u001a;bI\u0006$\u0018MM\u0001\u000eibtW*\u001a;bI\u0006$\u0018M\r\u0011\u0002-\r\f\u0007\u000f^;sK\u0012,%O]8sg\u000e\u000bG\u000e\u001c2bG.,\"!!\u001f\u0011\r\u0005m\u0014\u0011QAC\u001b\t\tiHC\u0002\u0002��\t\fq!\\8dW&$x.\u0003\u0003\u0002\u0004\u0006u$AD!sOVlWM\u001c;DCB$xN\u001d\t\b\u000f\u0006\u001d\u00151RAL\u0013\r\tI\t\u0013\u0002\n\rVt7\r^5p]F\u0002B!!$\u0002\u00146\u0011\u0011q\u0012\u0006\u0004\u0003#\u000b\u0018\u0001\u00039s_R|7m\u001c7\n\t\u0005U\u0015q\u0012\u0002\u0007\u000bJ\u0014xN]:\u0011\u0007\u001d\u000bI*C\u0002\u0002\u001c\"\u0013A!\u00168ji\u000692-\u00199ukJ,G-\u0012:s_J\u001c8)\u00197mE\u0006\u001c7\u000eI\u0001\u0005i&lW-\u0006\u0002\u0002$B!\u0011QUAV\u001b\t\t9KC\u0002\u0002*F\fQ!\u001e;jYNLA!!,\u0002(\nAQj\\2l)&lW-A\u0003uS6,\u0007%\u0001\bdQ\u0006tg.\u001a7NC:\fw-\u001a:\u0016\u0005\u0005U\u0006cA(\u00028&\u0019\u0011\u0011\u0018 \u0003?Q\u0013\u0018M\\:bGRLwN\\'be.,'o\u00115b]:,G.T1oC\u001e,'/A\bdQ\u0006tg.\u001a7NC:\fw-\u001a:!\u0003%iwnY6DC\u000eDW\r\u0006\u0002\u0002\u0018\u0006\u00013\u000f[8vY\u0012|e\u000e\\=Xe&$X\r\u0016=o\u0007>l\u0007\u000f\\3uS>twJ\\2fQ\r\t\u0014Q\u0019\t\u0005\u0003\u000f\f).\u0004\u0002\u0002J*!\u00111ZAg\u0003\r\t\u0007/\u001b\u0006\u0005\u0003\u001f\f\t.A\u0004kkBLG/\u001a:\u000b\u0007\u0005M'-A\u0003kk:LG/\u0003\u0003\u0002X\u0006%'\u0001\u0002+fgR\fqf\u001d5pk2$w)\u001a8fe\u0006$X-R7qifl\u0015\r],iK:tuNU3rk\u0016\u001cHo](viN$\u0018M\u001c3j]\u001eD3AMAc\u0003)\u001a\bn\\;mI\u001e+g.\u001a:bi\u0016\u0014V-];fgR\u0004VM\u001d)beRLG/[8o!\u0016\u0014(I]8lKJD3aMAc\u00035\u001a\bn\\;mI:{GoR3oKJ\fG/\u001a(foJ+\u0017/^3tiNLeMU3rk\u0016\u001cH/\u00138GY&<\u0007\u000e\u001e\u0015\u0004i\u0005\u0015\u0017aJ:i_VdGmU6jaN+g\u000eZ'be.,'o],iK:dU-\u00193fe:{GOR8v]\u0012D3!NAc\u0003I\u001a\bn\\;mIN\u000bg/\u001a$pe2\u000bG/\u001a:XQ\u0016tG*Z1eKJ,fn\u001b8po:\u0014U\u000f\u001e(pi\u00063\u0018-\u001b7bE2,\u0007f\u0001\u001c\u0002F\u0006A4\u000f[8vY\u0012\u0014V-\\8wK6\u000b'o[3sg\u001a{'\u000f\u0016=o!\u0006\u0014H/\u001b;j_:<\u0006.\u001a8QCJ$\u0018\u000e^5p]\u0016k\u0017n\u001a:bi\u0016$\u0007fA\u001c\u0002F\u000694\u000f[8vY\u0012\u001cu.\u001c9mKR,\u0017\t\u001d9f]\u0012$v\u000eT8h\u001f:,e\u000e\u001a+y]^CWM\\*f]\u0012l\u0015M]6feN\u001cVoY2fK\u0012D3\u0001OAc\u0003U\u001a\bn\\;mI\u0006\u0013wN\u001d;BaB,g\u000e\u001a+p\u0019><wJ\\#oIRChn\u00165f]:{GoQ8pe\u0012Lg.\u0019;pe\u0016\u0013(o\u001c:)\u0007e\n)-\u0001 tQ>,H\u000e\u001a*fiJL\u0018\t\u001d9f]\u0012$v\u000eT8h\u001f:,e\u000e\u001a+y]^CWM\\\"p_J$\u0017N\\1u_Jtu\u000e^!wC&d\u0017M\u00197f\u000bJ\u0014xN\u001d\u0015\u0004u\u0005\u0015\u0017!E2sK\u0006$X\rU5e\u000bJ\u0014xN]'baR!!\u0011\u0001B\f!!\u0011\u0019A!\u0003\u0003\u000e\tEQB\u0001B\u0003\u0015\u0011\u00119!!\u0004\u0002\tU$\u0018\u000e\\\u0005\u0005\u0005\u0017\u0011)AA\u0004ICNDW*\u00199\u0011\t\u0005\u001d!qB\u0005\u0005\u0003G\tI\u0001E\u0004\u0003\u0004\tMq.a#\n\t\tU!Q\u0001\u0002\u0004\u001b\u0006\u0004\bb\u0002B\rw\u0001\u0007\u00111R\u0001\u0007KJ\u0014xN]:\u0002;MDw.\u001e7e\u0007J,\u0017\r^3NKR\u0014\u0018nY:P]N#\u0018M\u001d;j]\u001eD3\u0001PAc\u0001")
/* loaded from: input_file:kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.class */
public class TransactionMarkerChannelManagerTest {
    private final MetadataCache metadataCache = (MetadataCache) Mockito.mock(MetadataCache.class);
    private final NetworkClient networkClient = (NetworkClient) Mockito.mock(NetworkClient.class);
    private final TransactionStateManager txnStateManager = (TransactionStateManager) Mockito.mock(TransactionStateManager.class);
    private final TopicPartition partition1 = new TopicPartition("topic1", 0);
    private final TopicPartition partition2 = new TopicPartition("topic1", 1);
    private final Node broker1 = new Node(1, "host", 10);
    private final Node broker2 = new Node(2, "otherhost", 10);
    private final String transactionalId1 = "txnId1";
    private final String transactionalId2 = "txnId2";
    private final long producerId1 = 0;
    private final long producerId2 = 1;
    private final short producerEpoch = (short) 0;
    private final short lastProducerEpoch = -1;
    private final int txnTopicPartition1 = 0;
    private final int txnTopicPartition2 = 1;
    private final int coordinatorEpoch = 0;
    private final int txnTimeoutMs = 0;
    private final TransactionResult txnResult = TransactionResult.COMMIT;
    private final TransactionMetadata txnMetadata1 = new TransactionMetadata(transactionalId1(), producerId1(), producerId1(), producerEpoch(), lastProducerEpoch(), txnTimeoutMs(), PrepareCommit$.MODULE$, (Set) Set$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{partition1(), partition2()})), 0, 0);
    private final TransactionMetadata txnMetadata2 = new TransactionMetadata(transactionalId2(), producerId2(), producerId2(), producerEpoch(), lastProducerEpoch(), txnTimeoutMs(), PrepareCommit$.MODULE$, (Set) Set$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{partition1()})), 0, 0);
    private final ArgumentCaptor<Function1<Errors, BoxedUnit>> capturedErrorsCallback = ArgumentCaptor.forClass(Function1.class);
    private final MockTime time = new MockTime();
    private final TransactionMarkerChannelManager channelManager;

    private MetadataCache metadataCache() {
        return this.metadataCache;
    }

    private NetworkClient networkClient() {
        return this.networkClient;
    }

    private TransactionStateManager txnStateManager() {
        return this.txnStateManager;
    }

    private TopicPartition partition1() {
        return this.partition1;
    }

    private TopicPartition partition2() {
        return this.partition2;
    }

    private Node broker1() {
        return this.broker1;
    }

    private Node broker2() {
        return this.broker2;
    }

    private String transactionalId1() {
        return this.transactionalId1;
    }

    private String transactionalId2() {
        return this.transactionalId2;
    }

    private long producerId1() {
        return this.producerId1;
    }

    private long producerId2() {
        return this.producerId2;
    }

    private short producerEpoch() {
        return this.producerEpoch;
    }

    private short lastProducerEpoch() {
        return this.lastProducerEpoch;
    }

    private int txnTopicPartition1() {
        return this.txnTopicPartition1;
    }

    private int txnTopicPartition2() {
        return this.txnTopicPartition2;
    }

    private int coordinatorEpoch() {
        return this.coordinatorEpoch;
    }

    private int txnTimeoutMs() {
        return this.txnTimeoutMs;
    }

    private TransactionResult txnResult() {
        return this.txnResult;
    }

    private TransactionMetadata txnMetadata1() {
        return this.txnMetadata1;
    }

    private TransactionMetadata txnMetadata2() {
        return this.txnMetadata2;
    }

    private ArgumentCaptor<Function1<Errors, BoxedUnit>> capturedErrorsCallback() {
        return this.capturedErrorsCallback;
    }

    private MockTime time() {
        return this.time;
    }

    private TransactionMarkerChannelManager channelManager() {
        return this.channelManager;
    }

    private void mockCache() {
        Mockito.when(BoxesRunTime.boxToInteger(txnStateManager().partitionFor(transactionalId1()))).thenReturn(BoxesRunTime.boxToInteger(txnTopicPartition1()));
        Mockito.when(BoxesRunTime.boxToInteger(txnStateManager().partitionFor(transactionalId2()))).thenReturn(BoxesRunTime.boxToInteger(txnTopicPartition2()));
        Mockito.when(txnStateManager().getTransactionState((String) ArgumentMatchers.eq(transactionalId1()))).thenReturn(package$.MODULE$.Right().apply(new Some(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch(), txnMetadata1()))));
        Mockito.when(txnStateManager().getTransactionState((String) ArgumentMatchers.eq(transactionalId2()))).thenReturn(package$.MODULE$.Right().apply(new Some(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch(), txnMetadata2()))));
    }

    @Test
    public void shouldOnlyWriteTxnCompletionOnce() {
        mockCache();
        TxnTransitMetadata prepareComplete = txnMetadata2().prepareComplete(time().milliseconds());
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition1().topic()), ArgumentMatchers.eq(partition1().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker1()));
        txnStateManager().appendTransactionToLog((String) ArgumentMatchers.eq(transactionalId2()), ArgumentMatchers.eq(coordinatorEpoch()), (TxnTransitMetadata) ArgumentMatchers.eq(prepareComplete), (Function1) capturedErrorsCallback().capture(), (Function1) ArgumentMatchers.any(), (RequestLocal) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$shouldOnlyWriteTxnCompletionOnce$1(this, prepareComplete, invocationOnMock);
            return BoxedUnit.UNIT;
        });
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        txnMetadata2().lock().lock();
        try {
            Future submit = newFixedThreadPool.submit(() -> {
                return Try$.MODULE$.apply(() -> {
                    this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata2(), prepareComplete);
                });
            });
            ClientResponse clientResponse = new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, (short) 0, "client", 1), (RequestCompletionHandler) null, (String) null, time().milliseconds(), time().milliseconds(), false, (UnsupportedVersionException) null, (AuthenticationException) null, new WriteTxnMarkersResponse(Collections.singletonMap(Predef$.MODULE$.long2Long(producerId2()), Collections.singletonMap(partition1(), Errors.NONE))));
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            TestUtils$ testUtils$3 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$shouldOnlyWriteTxnCompletionOnce$4(this, clientResponse)) {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Assertions.fail("Timed out waiting for expected WriteTxnMarkers request");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            txnMetadata2().lock().unlock();
            newFixedThreadPool.shutdown();
            Assertions.assertNotNull(submit);
            Assertions.assertTrue(((Try) submit.get()).isSuccess(), new StringBuilder(38).append("Add marker task failed with exception ").append(((Try) submit.get()).get()).toString());
            ((TransactionStateManager) Mockito.verify(txnStateManager())).appendTransactionToLog((String) ArgumentMatchers.eq(transactionalId2()), ArgumentMatchers.eq(coordinatorEpoch()), (TxnTransitMetadata) ArgumentMatchers.eq(prepareComplete), (Function1) capturedErrorsCallback().capture(), (Function1) ArgumentMatchers.any(), (RequestLocal) ArgumentMatchers.any());
        } catch (Throwable th) {
            txnMetadata2().lock().unlock();
            newFixedThreadPool.shutdown();
            throw th;
        }
    }

    @Test
    public void shouldGenerateEmptyMapWhenNoRequestsOutstanding() {
        Assertions.assertTrue(channelManager().generateRequests().isEmpty());
    }

    @Test
    public void shouldGenerateRequestPerPartitionPerBroker() {
        mockCache();
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition1().topic()), ArgumentMatchers.eq(partition1().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker1()));
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition2().topic()), ArgumentMatchers.eq(partition2().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker2()));
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata1(), txnMetadata1().prepareComplete(time().milliseconds()));
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata2(), txnMetadata2().prepareComplete(time().milliseconds()));
        Assertions.assertEquals(2, channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals(2, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers(txnTopicPartition1()));
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers(txnTopicPartition2()));
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers());
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition1()));
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition2()));
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(broker1()), new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1(), producerEpoch(), coordinatorEpoch(), txnResult(), Arrays.asList(partition1())), new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2(), producerEpoch(), coordinatorEpoch(), txnResult(), Arrays.asList(partition1())))).build()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(broker2()), new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1(), producerEpoch(), coordinatorEpoch(), txnResult(), Arrays.asList(partition2())))).build())})), ((IterableOnceOps) channelManager().generateRequests().map(requestAndCompletionHandler -> {
            return new Tuple2(requestAndCompletionHandler.destination(), requestAndCompletionHandler.request().build());
        })).toMap($less$colon$less$.MODULE$.refl()));
        Assertions.assertTrue(channelManager().generateRequests().isEmpty());
    }

    @Test
    public void shouldNotGenerateNewRequestsIfRequestInFlight() {
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition1().topic()), ArgumentMatchers.eq(partition1().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker1()));
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition2().topic()), ArgumentMatchers.eq(partition2().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker2()));
        Mockito.when(BoxesRunTime.boxToBoolean(networkClient().hasInFlightRequests((String) ArgumentMatchers.eq(broker1().idString())))).thenReturn(BoxesRunTime.boxToBoolean(true));
        channelManager().unsentRequests().put(broker2(), new ClientRequest(broker2().idString(), (AbstractRequest.Builder) null, 0, "0", time().milliseconds(), true, 10000, (RequestCompletionHandler) null, (Optional) null));
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata1(), txnMetadata1().prepareComplete(time().milliseconds()));
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata2(), txnMetadata2().prepareComplete(time().milliseconds()));
        Assertions.assertTrue(channelManager().generateRequests().isEmpty());
        Mockito.when(BoxesRunTime.boxToBoolean(networkClient().hasInFlightRequests((String) ArgumentMatchers.eq(broker1().idString())))).thenReturn(BoxesRunTime.boxToBoolean(false));
        channelManager().unsentRequests().clearUnsentRequests(broker2());
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(broker1()), new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1(), producerEpoch(), coordinatorEpoch(), txnResult(), Arrays.asList(partition1())), new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2(), producerEpoch(), coordinatorEpoch(), txnResult(), Arrays.asList(partition1())))).build()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(broker2()), new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1(), producerEpoch(), coordinatorEpoch(), txnResult(), Arrays.asList(partition2())))).build())})), ((IterableOnceOps) channelManager().generateRequests().map(requestAndCompletionHandler -> {
            return new Tuple2(requestAndCompletionHandler.destination(), requestAndCompletionHandler.request().build());
        })).toMap($less$colon$less$.MODULE$.refl()));
        Assertions.assertTrue(channelManager().generateRequests().isEmpty());
    }

    @Test
    public void shouldSkipSendMarkersWhenLeaderNotFound() {
        mockCache();
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition1().topic()), ArgumentMatchers.eq(partition1().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(None$.MODULE$);
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition2().topic()), ArgumentMatchers.eq(partition2().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker2()));
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata1(), txnMetadata1().prepareComplete(time().milliseconds()));
        Assertions.assertEquals(1, channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers());
        Assertions.assertTrue(channelManager().queueForBroker(broker1().id()).isEmpty());
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition1()));
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition2()));
    }

    @Test
    public void shouldSaveForLaterWhenLeaderUnknownButNotAvailable() {
        mockCache();
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition1().topic()), ArgumentMatchers.eq(partition1().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(Node.noNode())).thenReturn(new Some(Node.noNode())).thenReturn(new Some(Node.noNode())).thenReturn(new Some(Node.noNode())).thenReturn(new Some(broker1())).thenReturn(new Some(broker1()));
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition2().topic()), ArgumentMatchers.eq(partition2().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker2()));
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata1(), txnMetadata1().prepareComplete(time().milliseconds()));
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata2(), txnMetadata2().prepareComplete(time().milliseconds()));
        Assertions.assertEquals(2, channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers());
        Assertions.assertTrue(channelManager().queueForBroker(broker1().id()).isEmpty());
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition1()));
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition2()));
        Assertions.assertEquals(2, channelManager().queueForUnknownBroker().totalNumMarkers());
        Assertions.assertEquals(1, channelManager().queueForUnknownBroker().totalNumMarkers(txnTopicPartition1()));
        Assertions.assertEquals(1, channelManager().queueForUnknownBroker().totalNumMarkers(txnTopicPartition2()));
        WriteTxnMarkersRequest build = new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1(), producerEpoch(), coordinatorEpoch(), txnResult(), Arrays.asList(partition1())), new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2(), producerEpoch(), coordinatorEpoch(), txnResult(), Arrays.asList(partition1())))).build();
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(broker2()), new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1(), producerEpoch(), coordinatorEpoch(), txnResult(), Arrays.asList(partition2())))).build())})), ((IterableOnceOps) channelManager().generateRequests().map(requestAndCompletionHandler -> {
            return new Tuple2(requestAndCompletionHandler.destination(), requestAndCompletionHandler.request().build());
        })).toMap($less$colon$less$.MODULE$.refl()));
        Assertions.assertEquals(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(broker1()), build)})), ((IterableOnceOps) channelManager().generateRequests().map(requestAndCompletionHandler2 -> {
            return new Tuple2(requestAndCompletionHandler2.destination(), requestAndCompletionHandler2.request().build());
        })).toMap($less$colon$less$.MODULE$.refl()));
    }

    @Test
    public void shouldRemoveMarkersForTxnPartitionWhenPartitionEmigrated() {
        mockCache();
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition1().topic()), ArgumentMatchers.eq(partition1().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker1()));
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition2().topic()), ArgumentMatchers.eq(partition2().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker2()));
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata1(), txnMetadata1().prepareComplete(time().milliseconds()));
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata2(), txnMetadata2().prepareComplete(time().milliseconds()));
        Assertions.assertEquals(2, channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals(2, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers(txnTopicPartition1()));
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers(txnTopicPartition2()));
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers());
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition1()));
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition2()));
        channelManager().removeMarkersForTxnTopicPartition(txnTopicPartition1());
        Assertions.assertEquals(1, channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers(txnTopicPartition1()));
        Assertions.assertEquals(1, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers(txnTopicPartition2()));
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers());
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition1()));
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker2().id()).get()).totalNumMarkers(txnTopicPartition2()));
    }

    @Test
    public void shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed() {
        mockCache();
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition1().topic()), ArgumentMatchers.eq(partition1().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker1()));
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition2().topic()), ArgumentMatchers.eq(partition2().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker2()));
        TxnTransitMetadata prepareComplete = txnMetadata2().prepareComplete(time().milliseconds());
        txnStateManager().appendTransactionToLog((String) ArgumentMatchers.eq(transactionalId2()), ArgumentMatchers.eq(coordinatorEpoch()), (TxnTransitMetadata) ArgumentMatchers.eq(prepareComplete), (Function1) capturedErrorsCallback().capture(), (Function1) ArgumentMatchers.any(), (RequestLocal) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed$1(this, prepareComplete, invocationOnMock);
            return BoxedUnit.UNIT;
        });
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata2(), prepareComplete);
        Iterable generateRequests = channelManager().generateRequests();
        WriteTxnMarkersResponse writeTxnMarkersResponse = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE));
        generateRequests.foreach(requestAndCompletionHandler -> {
            $anonfun$shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed$2(writeTxnMarkersResponse, requestAndCompletionHandler);
            return BoxedUnit.UNIT;
        });
        ((TransactionStateManager) Mockito.verify(txnStateManager())).appendTransactionToLog((String) ArgumentMatchers.eq(transactionalId2()), ArgumentMatchers.eq(coordinatorEpoch()), (TxnTransitMetadata) ArgumentMatchers.eq(prepareComplete), (Function1) capturedErrorsCallback().capture(), (Function1) ArgumentMatchers.any(), (RequestLocal) ArgumentMatchers.any());
        Assertions.assertEquals(0, channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals(None$.MODULE$, txnMetadata2().pendingState());
        Assertions.assertEquals(CompleteCommit$.MODULE$, txnMetadata2().state());
    }

    @Test
    public void shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError() {
        mockCache();
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition1().topic()), ArgumentMatchers.eq(partition1().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker1()));
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition2().topic()), ArgumentMatchers.eq(partition2().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker2()));
        TxnTransitMetadata prepareComplete = txnMetadata2().prepareComplete(time().milliseconds());
        txnStateManager().appendTransactionToLog((String) ArgumentMatchers.eq(transactionalId2()), ArgumentMatchers.eq(coordinatorEpoch()), (TxnTransitMetadata) ArgumentMatchers.eq(prepareComplete), (Function1) capturedErrorsCallback().capture(), (Function1) ArgumentMatchers.any(), (RequestLocal) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError$1(this, invocationOnMock);
            return BoxedUnit.UNIT;
        });
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata2(), prepareComplete);
        Iterable generateRequests = channelManager().generateRequests();
        WriteTxnMarkersResponse writeTxnMarkersResponse = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE));
        generateRequests.foreach(requestAndCompletionHandler -> {
            $anonfun$shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError$2(writeTxnMarkersResponse, requestAndCompletionHandler);
            return BoxedUnit.UNIT;
        });
        ((TransactionStateManager) Mockito.verify(txnStateManager())).appendTransactionToLog((String) ArgumentMatchers.eq(transactionalId2()), ArgumentMatchers.eq(coordinatorEpoch()), (TxnTransitMetadata) ArgumentMatchers.eq(prepareComplete), (Function1) capturedErrorsCallback().capture(), (Function1) ArgumentMatchers.any(), (RequestLocal) ArgumentMatchers.any());
        Assertions.assertEquals(0, channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals(None$.MODULE$, txnMetadata2().pendingState());
        Assertions.assertEquals(PrepareCommit$.MODULE$, txnMetadata2().state());
    }

    @Test
    public void shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError() {
        mockCache();
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition1().topic()), ArgumentMatchers.eq(partition1().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker1()));
        Mockito.when(metadataCache().getPartitionLeaderEndpoint((String) ArgumentMatchers.eq(partition2().topic()), ArgumentMatchers.eq(partition2().partition()), (ListenerName) ArgumentMatchers.any())).thenReturn(new Some(broker2()));
        TxnTransitMetadata prepareComplete = txnMetadata2().prepareComplete(time().milliseconds());
        txnStateManager().appendTransactionToLog((String) ArgumentMatchers.eq(transactionalId2()), ArgumentMatchers.eq(coordinatorEpoch()), (TxnTransitMetadata) ArgumentMatchers.eq(prepareComplete), (Function1) capturedErrorsCallback().capture(), (Function1) ArgumentMatchers.any(), (RequestLocal) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$1(this, invocationOnMock);
            return BoxedUnit.UNIT;
        }).thenAnswer(invocationOnMock2 -> {
            $anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$2(this, prepareComplete, invocationOnMock2);
            return BoxedUnit.UNIT;
        });
        channelManager().addTxnMarkersToSend(coordinatorEpoch(), txnResult(), txnMetadata2(), prepareComplete);
        Iterable generateRequests = channelManager().generateRequests();
        WriteTxnMarkersResponse writeTxnMarkersResponse = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE));
        generateRequests.foreach(requestAndCompletionHandler -> {
            $anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$3(writeTxnMarkersResponse, requestAndCompletionHandler);
            return BoxedUnit.UNIT;
        });
        channelManager().generateRequests();
        ((TransactionStateManager) Mockito.verify(txnStateManager(), Mockito.times(2))).appendTransactionToLog((String) ArgumentMatchers.eq(transactionalId2()), ArgumentMatchers.eq(coordinatorEpoch()), (TxnTransitMetadata) ArgumentMatchers.eq(prepareComplete), (Function1) capturedErrorsCallback().capture(), (Function1) ArgumentMatchers.any(), (RequestLocal) ArgumentMatchers.any());
        Assertions.assertEquals(0, channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals(0, ((TxnMarkerQueue) channelManager().queueForBroker(broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals(None$.MODULE$, txnMetadata2().pendingState());
        Assertions.assertEquals(CompleteCommit$.MODULE$, txnMetadata2().state());
    }

    private HashMap<Long, Map<TopicPartition, Errors>> createPidErrorMap(Errors errors) {
        HashMap<Long, Map<TopicPartition, Errors>> hashMap = new HashMap<>();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(partition1(), errors);
        hashMap.put(Predef$.MODULE$.long2Long(producerId2()), hashMap2);
        return hashMap;
    }

    @Test
    public void shouldCreateMetricsOnStarting() {
        scala.collection.mutable.Map asScala = CollectionConverters$.MODULE$.MapHasAsScala(KafkaYammerMetrics.defaultRegistry().allMetrics()).asScala();
        Assertions.assertEquals(1, asScala.count(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$shouldCreateMetricsOnStarting$1(tuple2));
        }));
        Assertions.assertEquals(1, asScala.count(tuple22 -> {
            return BoxesRunTime.boxToBoolean($anonfun$shouldCreateMetricsOnStarting$2(tuple22));
        }));
    }

    public static final /* synthetic */ void $anonfun$shouldOnlyWriteTxnCompletionOnce$1(TransactionMarkerChannelManagerTest transactionMarkerChannelManagerTest, TxnTransitMetadata txnTransitMetadata, InvocationOnMock invocationOnMock) {
        transactionMarkerChannelManagerTest.txnMetadata2().completeTransitionTo(txnTransitMetadata);
        ((Function1) transactionMarkerChannelManagerTest.capturedErrorsCallback().getValue()).apply(Errors.NONE);
    }

    public static final /* synthetic */ boolean $anonfun$shouldOnlyWriteTxnCompletionOnce$4(TransactionMarkerChannelManagerTest transactionMarkerChannelManagerTest, ClientResponse clientResponse) {
        Iterable generateRequests = transactionMarkerChannelManagerTest.channelManager().generateRequests();
        if (!generateRequests.nonEmpty()) {
            return false;
        }
        Assertions.assertEquals(1, generateRequests.size());
        ((RequestAndCompletionHandler) generateRequests.head()).handler().onComplete(clientResponse);
        return true;
    }

    public static final /* synthetic */ String $anonfun$shouldOnlyWriteTxnCompletionOnce$5() {
        return "Timed out waiting for expected WriteTxnMarkers request";
    }

    public static final /* synthetic */ void $anonfun$shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed$1(TransactionMarkerChannelManagerTest transactionMarkerChannelManagerTest, TxnTransitMetadata txnTransitMetadata, InvocationOnMock invocationOnMock) {
        transactionMarkerChannelManagerTest.txnMetadata2().completeTransitionTo(txnTransitMetadata);
        ((Function1) transactionMarkerChannelManagerTest.capturedErrorsCallback().getValue()).apply(Errors.NONE);
    }

    public static final /* synthetic */ void $anonfun$shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed$2(WriteTxnMarkersResponse writeTxnMarkersResponse, RequestAndCompletionHandler requestAndCompletionHandler) {
        requestAndCompletionHandler.handler().onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, (short) 0, "client", 1), (RequestCompletionHandler) null, (String) null, 0L, 0L, false, (UnsupportedVersionException) null, (AuthenticationException) null, writeTxnMarkersResponse));
    }

    public static final /* synthetic */ void $anonfun$shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError$1(TransactionMarkerChannelManagerTest transactionMarkerChannelManagerTest, InvocationOnMock invocationOnMock) {
        transactionMarkerChannelManagerTest.txnMetadata2().pendingState_$eq(None$.MODULE$);
        ((Function1) transactionMarkerChannelManagerTest.capturedErrorsCallback().getValue()).apply(Errors.NOT_COORDINATOR);
    }

    public static final /* synthetic */ void $anonfun$shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError$2(WriteTxnMarkersResponse writeTxnMarkersResponse, RequestAndCompletionHandler requestAndCompletionHandler) {
        requestAndCompletionHandler.handler().onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, (short) 0, "client", 1), (RequestCompletionHandler) null, (String) null, 0L, 0L, false, (UnsupportedVersionException) null, (AuthenticationException) null, writeTxnMarkersResponse));
    }

    public static final /* synthetic */ void $anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$1(TransactionMarkerChannelManagerTest transactionMarkerChannelManagerTest, InvocationOnMock invocationOnMock) {
        ((Function1) transactionMarkerChannelManagerTest.capturedErrorsCallback().getValue()).apply(Errors.COORDINATOR_NOT_AVAILABLE);
    }

    public static final /* synthetic */ void $anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$2(TransactionMarkerChannelManagerTest transactionMarkerChannelManagerTest, TxnTransitMetadata txnTransitMetadata, InvocationOnMock invocationOnMock) {
        transactionMarkerChannelManagerTest.txnMetadata2().completeTransitionTo(txnTransitMetadata);
        ((Function1) transactionMarkerChannelManagerTest.capturedErrorsCallback().getValue()).apply(Errors.NONE);
    }

    public static final /* synthetic */ void $anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$3(WriteTxnMarkersResponse writeTxnMarkersResponse, RequestAndCompletionHandler requestAndCompletionHandler) {
        requestAndCompletionHandler.handler().onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, (short) 0, "client", 1), (RequestCompletionHandler) null, (String) null, 0L, 0L, false, (UnsupportedVersionException) null, (AuthenticationException) null, writeTxnMarkersResponse));
    }

    public static final /* synthetic */ boolean $anonfun$shouldCreateMetricsOnStarting$1(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        String mBeanName = ((MetricName) tuple2._1()).getMBeanName();
        return mBeanName != null && mBeanName.equals("kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=UnknownDestinationQueueSize");
    }

    public static final /* synthetic */ boolean $anonfun$shouldCreateMetricsOnStarting$2(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        String mBeanName = ((MetricName) tuple2._1()).getMBeanName();
        return mBeanName != null && mBeanName.equals("kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=LogAppendRetryQueueSize");
    }

    public TransactionMarkerChannelManagerTest() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        this.channelManager = new TransactionMarkerChannelManager(kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, "localhost:2181", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1)), metadataCache(), networkClient(), txnStateManager(), time());
    }
}
