package kafka.api;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.integration.KafkaServerTestHarness;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.log.ProducerStateEntry;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function3;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;

/* compiled from: TransactionsTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\tec\u0001B\u0001\u0003\u0001\u001d\u0011\u0001\u0003\u0016:b]N\f7\r^5p]N$Vm\u001d;\u000b\u0005\r!\u0011aA1qS*\tQ!A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001A\u0001CA\u0005\r\u001b\u0005Q!BA\u0006\u0005\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8\n\u00055Q!AF&bM.\f7+\u001a:wKJ$Vm\u001d;ICJtWm]:\t\u000b=\u0001A\u0011\u0001\t\u0002\rqJg.\u001b;?)\u0005\t\u0002C\u0001\n\u0001\u001b\u0005\u0011\u0001b\u0002\u000b\u0001\u0005\u0004%\t!F\u0001\u000b]Vl7+\u001a:wKJ\u001cX#\u0001\f\u0011\u0005]QR\"\u0001\r\u000b\u0003e\tQa]2bY\u0006L!a\u0007\r\u0003\u0007%sG\u000f\u0003\u0004\u001e\u0001\u0001\u0006IAF\u0001\f]Vl7+\u001a:wKJ\u001c\b\u0005C\u0004 \u0001\t\u0007I\u0011A\u000b\u00025Q\u0014\u0018M\\:bGRLwN\\1m!J|G-^2fe\u000e{WO\u001c;\t\r\u0005\u0002\u0001\u0015!\u0003\u0017\u0003m!(/\u00198tC\u000e$\u0018n\u001c8bYB\u0013x\u000eZ;dKJ\u001cu.\u001e8uA!91\u0005\u0001b\u0001\n\u0003)\u0012A\u0007;sC:\u001c\u0018m\u0019;j_:\fGnQ8ogVlWM]\"pk:$\bBB\u0013\u0001A\u0003%a#A\u000eue\u0006t7/Y2uS>t\u0017\r\\\"p]N,X.\u001a:D_VtG\u000f\t\u0005\bO\u0001\u0011\r\u0011\"\u0001\u0016\u0003uqwN\u001c+sC:\u001c\u0018m\u0019;j_:\fGnQ8ogVlWM]\"pk:$\bBB\u0015\u0001A\u0003%a#\u0001\u0010o_:$&/\u00198tC\u000e$\u0018n\u001c8bY\u000e{gn];nKJ\u001cu.\u001e8uA!91\u0006\u0001b\u0001\n\u0003a\u0013A\u0002;pa&\u001c\u0017'F\u0001.!\tq3'D\u00010\u0015\t\u0001\u0014'\u0001\u0003mC:<'\"\u0001\u001a\u0002\t)\fg/Y\u0005\u0003i=\u0012aa\u0015;sS:<\u0007B\u0002\u001c\u0001A\u0003%Q&A\u0004u_BL7-\r\u0011\t\u000fa\u0002!\u0019!C\u0001Y\u00051Ao\u001c9jGJBaA\u000f\u0001!\u0002\u0013i\u0013a\u0002;pa&\u001c'\u0007\t\u0005\by\u0001\u0011\r\u0011\"\u0001\u0016\u00035qW/\u001c)beRLG/[8og\"1a\b\u0001Q\u0001\nY\taB\\;n!\u0006\u0014H/\u001b;j_:\u001c\b\u0005C\u0004A\u0001\t\u0007I\u0011A!\u0002-Q\u0014\u0018M\\:bGRLwN\\1m!J|G-^2feN,\u0012A\u0011\t\u0004\u0007\"SU\"\u0001#\u000b\u0005\u00153\u0015aB7vi\u0006\u0014G.\u001a\u0006\u0003\u000fb\t!bY8mY\u0016\u001cG/[8o\u0013\tIEI\u0001\u0004Ck\u001a4WM\u001d\t\u0005\u0017V;v+D\u0001M\u0015\tie*\u0001\u0005qe>$WoY3s\u0015\ty\u0005+A\u0004dY&,g\u000e^:\u000b\u0005\u0015\t&B\u0001*T\u0003\u0019\t\u0007/Y2iK*\tA+A\u0002pe\u001eL!A\u0016'\u0003\u001b-\u000bgm[1Qe>$WoY3s!\r9\u0002LW\u0005\u00033b\u0011Q!\u0011:sCf\u0004\"aF.\n\u0005qC\"\u0001\u0002\"zi\u0016DaA\u0018\u0001!\u0002\u0013\u0011\u0015a\u0006;sC:\u001c\u0018m\u0019;j_:\fG\u000e\u0015:pIV\u001cWM]:!\u0011\u001d\u0001\u0007A1A\u0005\u0002\u0005\fa\u0003\u001e:b]N\f7\r^5p]\u0006d7i\u001c8tk6,'o]\u000b\u0002EB\u00191\tS2\u0011\t\u0011<wkV\u0007\u0002K*\u0011aMT\u0001\tG>t7/^7fe&\u0011\u0001.\u001a\u0002\u000e\u0017\u000647.Y\"p]N,X.\u001a:\t\r)\u0004\u0001\u0015!\u0003c\u0003]!(/\u00198tC\u000e$\u0018n\u001c8bY\u000e{gn];nKJ\u001c\b\u0005C\u0004m\u0001\t\u0007I\u0011A1\u000239|g\u000e\u0016:b]N\f7\r^5p]\u0006d7i\u001c8tk6,'o\u001d\u0005\u0007]\u0002\u0001\u000b\u0011\u00022\u000259|g\u000e\u0016:b]N\f7\r^5p]\u0006d7i\u001c8tk6,'o\u001d\u0011\t\u000bA\u0004A\u0011I9\u0002\u001f\u001d,g.\u001a:bi\u0016\u001cuN\u001c4jON,\u0012A\u001d\t\u0004gR4X\"\u0001$\n\u0005U4%aA*fcB\u0011qO_\u0007\u0002q*\u0011\u0011\u0010B\u0001\u0007g\u0016\u0014h/\u001a:\n\u0005mD(aC&bM.\f7i\u001c8gS\u001eDQ! \u0001\u0005By\fQa]3u+B$\u0012a \t\u0004/\u0005\u0005\u0011bAA\u00021\t!QK\\5uQ\ra\u0018q\u0001\t\u0005\u0003\u0013\ty!\u0004\u0002\u0002\f)\u0019\u0011QB*\u0002\u000b),h.\u001b;\n\t\u0005E\u00111\u0002\u0002\u0007\u0005\u00164wN]3\t\r\u0005U\u0001\u0001\"\u0011\u007f\u0003!!X-\u0019:E_^t\u0007\u0006BA\n\u00033\u0001B!!\u0003\u0002\u001c%!\u0011QDA\u0006\u0005\u0015\te\r^3s\u0011\u0019\t\t\u0003\u0001C\u0001}\u0006)B/Z:u\u0005\u0006\u001c\u0018n\u0019+sC:\u001c\u0018m\u0019;j_:\u001c\b\u0006BA\u0010\u0003K\u0001B!!\u0003\u0002(%!\u0011\u0011FA\u0006\u0005\u0011!Vm\u001d;\t\r\u00055\u0002\u0001\"\u0001\u007f\u0003I\"Xm\u001d;SK\u0006$7i\\7nSR$X\rZ\"p]N,X.\u001a:TQ>,H\u000e\u001a(piN+W-\u00168eK\u000eLG-\u001a3ECR\f\u0007\u0006BA\u0016\u0003KAa!a\r\u0001\t\u0003q\u0018A\u000b;fgR$U\r\\1zK\u00124U\r^2i\u0013:\u001cG.\u001e3fg\u0006\u0013wN\u001d;fIR\u0013\u0018M\\:bGRLwN\u001c\u0015\u0005\u0003c\t)\u0003\u0003\u0004\u0002:\u0001!\tA`\u0001\u001bi\u0016\u001cHoU3oI>3gm]3ug^KG\u000f[$s_V\u0004\u0018\n\u001a\u0015\u0005\u0003o\t)\u0003\u0003\u0004\u0002@\u0001!\tA`\u0001!i\u0016\u001cHoU3oI>3gm]3ug^KG\u000f[$s_V\u0004X*\u001a;bI\u0006$\u0018\r\u000b\u0003\u0002>\u0005\u0015\u0002bBA#\u0001\u0011%\u0011qI\u0001\u000bg\u0016tGm\u00144gg\u0016$HcA@\u0002J!A\u00111JA\"\u0001\u0004\ti%\u0001\u0004d_6l\u0017\u000e\u001e\t\t/\u0005=#*a\u0015d\u007f&\u0019\u0011\u0011\u000b\r\u0003\u0013\u0019+hn\u0019;j_:\u001c\u0004\u0003BA+\u00037r1aFA,\u0013\r\tI\u0006G\u0001\u0007!J,G-\u001a4\n\u0007Q\niFC\u0002\u0002ZaAa!!\u0019\u0001\t\u0003q\u0018a\u0005;fgR4UM\\2j]\u001e|enQ8n[&$\b\u0006BA0\u0003KAa!a\u001a\u0001\t\u0003q\u0018\u0001\u0007;fgR4UM\\2j]\u001e|enU3oI>3gm]3ug\"\"\u0011QMA\u0013\u0011\u0019\ti\u0007\u0001C\u0001}\u0006aC/Z:u\u001f\u001a47/\u001a;NKR\fG-\u0019;b\u0013:\u001cVM\u001c3PM\u001a\u001cX\r^:U_R\u0013\u0018M\\:bGRLwN\u001c\u0015\u0005\u0003W\n)\u0003\u0003\u0004\u0002t\u0001!\tA`\u0001\u0012i\u0016\u001cHOR3oG&twm\u00148TK:$\u0007\u0006BA9\u0003KAa!!\u001f\u0001\t\u0003q\u0018A\u0007;fgR4UM\\2j]\u001e|e.\u00113e!\u0006\u0014H/\u001b;j_:\u001c\b\u0006BA<\u0003KAa!a \u0001\t\u0003q\u0018A\t;fgR4UM\\2j]\u001e|e\u000e\u0016:b]N\f7\r^5p]\u0016C\b/\u001b:bi&|g\u000e\u000b\u0003\u0002~\u0005\u0015\u0002BBAC\u0001\u0011\u0005a0\u0001\u000fuKN$X*\u001e7uSBdW-T1sW\u0016\u00148o\u00148f\u0019\u0016\fG-\u001a:)\t\u0005\r\u0015Q\u0005\u0005\u0007\u0003\u0017\u0003A\u0011\u0001@\u0002IQ,7\u000f^\"p]N,7-\u001e;jm\u0016d\u0017PU;o\u0013:LG\u000f\u0016:b]N\f7\r^5p]ND\u0003\"!#\u0002&\u0005=\u0015\u0011S\u0001\tKb\u0004Xm\u0019;fI\u000e\u0012\u00111\u0013\t\u0005\u0003+\u000bY*\u0004\u0002\u0002\u0018*\u0019\u0011\u0011\u0014)\u0002\r\r|W.\\8o\u0013\u0011\ti*a&\u0003\u001d-\u000bgm[1Fq\u000e,\u0007\u000f^5p]\"1\u0011\u0011\u0015\u0001\u0005\u0002y\fA\u0004^3ti\u000e{W.\\5u)J\fgn]1di&|g\u000eV5nK>,H\u000f\u000b\u0005\u0002 \u0006\u0015\u0012qRASG\t\t9\u000b\u0005\u0003\u0002*\u0006=VBAAV\u0015\u0011\ti+a&\u0002\r\u0015\u0014(o\u001c:t\u0013\u0011\t\t,a+\u0003!QKW.Z8vi\u0016C8-\u001a9uS>t\u0007BBA[\u0001\u0011\u0005a0\u0001\u000euKN$()^7q)J\fgn]1di&|g.\u00197Fa>\u001c\u0007\u000e\u000b\u0003\u00024\u0006\u0015\u0002bBA^\u0001\u0011%\u0011QX\u0001(g\u0016tG\r\u0016:b]N\f7\r^5p]\u0006dW*Z:tC\u001e,7oV5uQZ\u000bG.^3SC:<W\rF\u0006��\u0003\u007f\u000b\t-!2\u0002J\u00065\u0007BB'\u0002:\u0002\u0007!\n\u0003\u0005\u0002D\u0006e\u0006\u0019AA*\u0003\u0015!x\u000e]5d\u0011\u001d\t9-!/A\u0002Y\tQa\u001d;beRDq!a3\u0002:\u0002\u0007a#A\u0002f]\u0012D\u0001\"a4\u0002:\u0002\u0007\u0011\u0011[\u0001\u0010o&dGNQ3D_6l\u0017\u000e\u001e;fIB\u0019q#a5\n\u0007\u0005U\u0007DA\u0004C_>dW-\u00198\t\u000f\u0005e\u0007\u0001\"\u0003\u0002\\\u0006Y1/\u001a:wKJ\u0004&o\u001c9t)\t\ti\u000e\u0005\u0003\u0002`\u0006\u0015XBAAq\u0015\r\t\u0019/M\u0001\u0005kRLG.\u0003\u0003\u0002h\u0006\u0005(A\u0003)s_B,'\u000f^5fg\"9\u00111\u001e\u0001\u0005\n\u00055\u0018aG2sK\u0006$XMU3bI\u000e{W.\\5ui\u0016$7i\u001c8tk6,'\u000fF\u0004d\u0003_\f\u00190a>\t\u0015\u0005E\u0018\u0011\u001eI\u0001\u0002\u0004\t\u0019&A\u0003he>,\b\u000fC\u0005\u0002v\u0006%\b\u0013!a\u0001-\u0005qQ.\u0019=Q_2d'+Z2pe\u0012\u001c\bBCA}\u0003S\u0004\n\u00111\u0001\u0002^\u0006)\u0001O]8qg\"9\u0011Q \u0001\u0005\n\u0005}\u0018!H2sK\u0006$XMU3bIVs7m\\7nSR$X\rZ\"p]N,X.\u001a:\u0015\u0007\r\u0014\t\u0001\u0003\u0005\u0002r\u0006m\b\u0019AA*\u0011\u001d\u0011)\u0001\u0001C\u0005\u0005\u000f\t1d\u0019:fCR,GK]1og\u0006\u001cG/[8oC2\u0004&o\u001c3vG\u0016\u0014H#\u0003&\u0003\n\t5!q\u0003B\u000e\u0011!\u0011YAa\u0001A\u0002\u0005M\u0013a\u0004;sC:\u001c\u0018m\u0019;j_:\fG.\u00133\t\u0015\t=!1\u0001I\u0001\u0002\u0004\u0011\t\"\u0001\u000bue\u0006t7/Y2uS>tG+[7f_V$Xj\u001d\t\u0004/\tM\u0011b\u0001B\u000b1\t!Aj\u001c8h\u0011)\u0011IBa\u0001\u0011\u0002\u0003\u0007!\u0011C\u0001\u000b[\u0006D(\t\\8dW6\u001b\b\"\u0003B\u000f\u0005\u0007\u0001\n\u00111\u0001\u0017\u0003E!W\r\\5wKJLH+[7f_V$Xj\u001d\u0005\n\u0005C\u0001\u0011\u0013!C\u0005\u0005G\tQe\u0019:fCR,GK]1og\u0006\u001cG/[8oC2\u0004&o\u001c3vG\u0016\u0014H\u0005Z3gCVdG\u000f\n\u001a\u0016\u0005\t\u0015\"\u0006\u0002B\t\u0005OY#A!\u000b\u0011\t\t-\"QG\u0007\u0003\u0005[QAAa\f\u00032\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0005gA\u0012AC1o]>$\u0018\r^5p]&!!q\u0007B\u0017\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a\u0005\n\u0005w\u0001\u0011\u0013!C\u0005\u0005G\tQe\u0019:fCR,GK]1og\u0006\u001cG/[8oC2\u0004&o\u001c3vG\u0016\u0014H\u0005Z3gCVdG\u000fJ\u001a\t\u0013\t}\u0002!%A\u0005\n\t\u0005\u0013!J2sK\u0006$X\r\u0016:b]N\f7\r^5p]\u0006d\u0007K]8ek\u000e,'\u000f\n3fM\u0006,H\u000e\u001e\u00135+\t\u0011\u0019EK\u0002\u0017\u0005OA\u0011Ba\u0012\u0001#\u0003%IA!\u0013\u0002K\r\u0014X-\u0019;f%\u0016\fGmQ8n[&$H/\u001a3D_:\u001cX/\\3sI\u0011,g-Y;mi\u0012\nTC\u0001B&U\u0011\t\u0019Fa\n\t\u0013\t=\u0003!%A\u0005\n\t\u0005\u0013!J2sK\u0006$XMU3bI\u000e{W.\\5ui\u0016$7i\u001c8tk6,'\u000f\n3fM\u0006,H\u000e\u001e\u00133\u0011%\u0011\u0019\u0006AI\u0001\n\u0013\u0011)&A\u0013de\u0016\fG/\u001a*fC\u0012\u001cu.\\7jiR,GmQ8ogVlWM\u001d\u0013eK\u001a\fW\u000f\u001c;%gU\u0011!q\u000b\u0016\u0005\u0003;\u00149\u0003")
/* loaded from: input_file:kafka/api/TransactionsTest.class */
public class TransactionsTest extends KafkaServerTestHarness {
    private final int numServers = 3;
    private final int transactionalProducerCount = 2;
    private final int transactionalConsumerCount = 1;
    private final int nonTransactionalConsumerCount = 1;
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final int numPartitions = 4;
    private final Buffer<KafkaProducer<byte[], byte[]>> transactionalProducers = Buffer$.MODULE$.apply(Nil$.MODULE$);
    private final Buffer<KafkaConsumer<byte[], byte[]>> transactionalConsumers = Buffer$.MODULE$.apply(Nil$.MODULE$);
    private final Buffer<KafkaConsumer<byte[], byte[]>> nonTransactionalConsumers = Buffer$.MODULE$.apply(Nil$.MODULE$);

    public int numServers() {
        return this.numServers;
    }

    public int transactionalProducerCount() {
        return this.transactionalProducerCount;
    }

    public int transactionalConsumerCount() {
        return this.transactionalConsumerCount;
    }

    public int nonTransactionalConsumerCount() {
        return this.nonTransactionalConsumerCount;
    }

    public String topic1() {
        return this.topic1;
    }

    public String topic2() {
        return this.topic2;
    }

    public int numPartitions() {
        return this.numPartitions;
    }

    public Buffer<KafkaProducer<byte[], byte[]>> transactionalProducers() {
        return this.transactionalProducers;
    }

    public Buffer<KafkaConsumer<byte[], byte[]>> transactionalConsumers() {
        return this.transactionalConsumers;
    }

    public Buffer<KafkaConsumer<byte[], byte[]>> nonTransactionalConsumers() {
        return this.nonTransactionalConsumers;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs */
    public Seq<KafkaConfig> mo1331generateConfigs() {
        return (Seq) TestUtils$.MODULE$.createBrokerConfigs(numServers(), zkConnect(), TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map(new TransactionsTest$$anonfun$generateConfigs$1(this), Seq$.MODULE$.canBuildFrom());
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), BoxesRunTime.boxToInteger(2).toString());
        createTopic(topic1(), numPartitions(), numServers(), properties);
        createTopic(topic2(), numPartitions(), numServers(), properties);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), transactionalProducerCount()).foreach(new TransactionsTest$$anonfun$setUp$1(this));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), transactionalConsumerCount()).foreach(new TransactionsTest$$anonfun$setUp$2(this));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), nonTransactionalConsumerCount()).foreach(new TransactionsTest$$anonfun$setUp$3(this));
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @After
    public void tearDown() {
        transactionalProducers().foreach(new TransactionsTest$$anonfun$tearDown$1(this));
        transactionalConsumers().foreach(new TransactionsTest$$anonfun$tearDown$2(this));
        nonTransactionalConsumers().foreach(new TransactionsTest$$anonfun$tearDown$3(this));
        super.tearDown();
    }

    @Test
    public void testBasicTransactions() {
        KafkaProducer kafkaProducer = (KafkaProducer) transactionalProducers().head();
        KafkaConsumer kafkaConsumer = (KafkaConsumer) transactionalConsumers().head();
        KafkaConsumer kafkaConsumer2 = (KafkaConsumer) nonTransactionalConsumers().head();
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "2", "2", false));
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "4", "4", false));
        kafkaProducer.flush();
        kafkaProducer.abortTransaction();
        kafkaProducer.beginTransaction();
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "1", "1", true));
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "3", "3", true));
        kafkaProducer.commitTransaction();
        kafkaConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1(), topic2()}))).asJava());
        kafkaConsumer2.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1(), topic2()}))).asJava());
        TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testBasicTransactions$1(this));
        TestUtils$.MODULE$.consumeRecords(kafkaConsumer2, 4, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testBasicTransactions$2(this, List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1", "2", "3", "4"})).toSet()));
    }

    @Test
    public void testReadCommittedConsumerShouldNotSeeUndecidedData() {
        KafkaProducer kafkaProducer = (KafkaProducer) transactionalProducers().head();
        KafkaProducer<byte[], byte[]> kafka$api$TransactionsTest$$createTransactionalProducer = kafka$api$TransactionsTest$$createTransactionalProducer("other", kafka$api$TransactionsTest$$createTransactionalProducer$default$2(), kafka$api$TransactionsTest$$createTransactionalProducer$default$3(), kafka$api$TransactionsTest$$createTransactionalProducer$default$4());
        KafkaConsumer kafkaConsumer = (KafkaConsumer) transactionalConsumers().head();
        KafkaConsumer kafkaConsumer2 = (KafkaConsumer) nonTransactionalConsumers().head();
        kafkaProducer.initTransactions();
        kafka$api$TransactionsTest$$createTransactionalProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafka$api$TransactionsTest$$createTransactionalProducer.beginTransaction();
        long currentTimeMillis = System.currentTimeMillis();
        kafka$api$TransactionsTest$$createTransactionalProducer.send(new ProducerRecord(topic1(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(currentTimeMillis), "x".getBytes(), "1".getBytes()));
        kafka$api$TransactionsTest$$createTransactionalProducer.send(new ProducerRecord(topic2(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(currentTimeMillis), "x".getBytes(), "1".getBytes()));
        kafka$api$TransactionsTest$$createTransactionalProducer.flush();
        long j = currentTimeMillis + 1;
        kafkaProducer.send(new ProducerRecord(topic1(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(j), "a".getBytes(), "1".getBytes()));
        kafkaProducer.send(new ProducerRecord(topic1(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(j), "b".getBytes(), "2".getBytes()));
        kafkaProducer.send(new ProducerRecord(topic2(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(j), "c".getBytes(), "3".getBytes()));
        kafkaProducer.send(new ProducerRecord(topic2(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(j), "d".getBytes(), "4".getBytes()));
        kafkaProducer.flush();
        kafka$api$TransactionsTest$$createTransactionalProducer.send(new ProducerRecord(topic1(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(j), "x".getBytes(), "2".getBytes()));
        kafka$api$TransactionsTest$$createTransactionalProducer.send(new ProducerRecord(topic2(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(j), "x".getBytes(), "2".getBytes()));
        kafka$api$TransactionsTest$$createTransactionalProducer.commitTransaction();
        TopicPartition topicPartition = new TopicPartition(topic1(), 0);
        TopicPartition topicPartition2 = new TopicPartition(topic2(), 0);
        kafkaConsumer2.assign((Collection) JavaConverters$.MODULE$.setAsJavaSetConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition, topicPartition2}))).asJava());
        TestUtils$.MODULE$.consumeRecords(kafkaConsumer2, 8, TestUtils$.MODULE$.consumeRecords$default$3());
        Map offsetsForTimes = kafkaConsumer2.offsetsForTimes((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Predef$.MODULE$.long2Long(j)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Predef$.MODULE$.long2Long(j))}))).asJava());
        Assert.assertEquals(2L, offsetsForTimes.size());
        Assert.assertEquals(j, ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).timestamp());
        Assert.assertEquals(j, ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition2)).timestamp());
        kafkaConsumer2.unsubscribe();
        kafkaConsumer.assign((Collection) JavaConverters$.MODULE$.setAsJavaSetConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition, topicPartition2}))).asJava());
        TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testReadCommittedConsumerShouldNotSeeUndecidedData$1(this));
        Assert.assertEquals(2L, kafkaConsumer.assignment().size());
        kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
        ((IterableLike) JavaConverters$.MODULE$.asScalaSetConverter(kafkaConsumer.assignment()).asScala()).foreach(new TransactionsTest$$anonfun$testReadCommittedConsumerShouldNotSeeUndecidedData$2(this, kafkaConsumer));
        Map offsetsForTimes2 = kafkaConsumer.offsetsForTimes((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Predef$.MODULE$.long2Long(j)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Predef$.MODULE$.long2Long(j))}))).asJava());
        Assert.assertNull(offsetsForTimes2.get(topicPartition));
        Assert.assertNull(offsetsForTimes2.get(topicPartition2));
    }

    @Test
    public void testDelayedFetchIncludesAbortedTransaction() {
        KafkaProducer kafkaProducer = (KafkaProducer) transactionalProducers().head();
        KafkaProducer<byte[], byte[]> kafka$api$TransactionsTest$$createTransactionalProducer = kafka$api$TransactionsTest$$createTransactionalProducer("other", kafka$api$TransactionsTest$$createTransactionalProducer$default$2(), kafka$api$TransactionsTest$$createTransactionalProducer$default$3(), kafka$api$TransactionsTest$$createTransactionalProducer$default$4());
        kafkaProducer.initTransactions();
        kafka$api$TransactionsTest$$createTransactionalProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafka$api$TransactionsTest$$createTransactionalProducer.beginTransaction();
        kafka$api$TransactionsTest$$createTransactionalProducer.send(new ProducerRecord(topic1(), Predef$.MODULE$.int2Integer(0), "x".getBytes(), "1".getBytes()));
        kafka$api$TransactionsTest$$createTransactionalProducer.flush();
        kafkaProducer.send(new ProducerRecord(topic1(), Predef$.MODULE$.int2Integer(0), "y".getBytes(), "1".getBytes()));
        kafkaProducer.send(new ProducerRecord(topic1(), Predef$.MODULE$.int2Integer(0), "y".getBytes(), "2".getBytes()));
        kafkaProducer.flush();
        kafka$api$TransactionsTest$$createTransactionalProducer.send(new ProducerRecord(topic1(), Predef$.MODULE$.int2Integer(0), "x".getBytes(), "2".getBytes()));
        kafka$api$TransactionsTest$$createTransactionalProducer.flush();
        kafkaProducer.abortTransaction();
        kafka$api$TransactionsTest$$createTransactionalProducer.commitTransaction();
        Properties properties = new Properties();
        properties.put("fetch.min.bytes", "100000");
        properties.put("fetch.max.wait.ms", "100");
        KafkaConsumer<byte[], byte[]> kafka$api$TransactionsTest$$createReadCommittedConsumer = kafka$api$TransactionsTest$$createReadCommittedConsumer(createReadCommittedConsumer$default$1(), kafka$api$TransactionsTest$$createReadCommittedConsumer$default$2(), properties);
        kafka$api$TransactionsTest$$createReadCommittedConsumer.assign((Collection) JavaConverters$.MODULE$.setAsJavaSetConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{new TopicPartition(topic1(), 0)}))).asJava());
        Seq consumeRecords = TestUtils$.MODULE$.consumeRecords(kafka$api$TransactionsTest$$createReadCommittedConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3());
        Assert.assertEquals(2L, consumeRecords.size());
        ConsumerRecord consumerRecord = (ConsumerRecord) consumeRecords.head();
        Assert.assertEquals("x", new String((byte[]) consumerRecord.key()));
        Assert.assertEquals("1", new String((byte[]) consumerRecord.value()));
        Assert.assertEquals(0L, consumerRecord.offset());
        ConsumerRecord consumerRecord2 = (ConsumerRecord) consumeRecords.last();
        Assert.assertEquals("x", new String((byte[]) consumerRecord2.key()));
        Assert.assertEquals("2", new String((byte[]) consumerRecord2.value()));
        Assert.assertEquals(3L, consumerRecord2.offset());
    }

    @Test
    public void testSendOffsetsWithGroupId() {
        sendOffset(new TransactionsTest$$anonfun$testSendOffsetsWithGroupId$1(this));
    }

    @Test
    public void testSendOffsetsWithGroupMetadata() {
        sendOffset(new TransactionsTest$$anonfun$testSendOffsetsWithGroupMetadata$1(this));
    }

    private void sendOffset(Function3<KafkaProducer<byte[], byte[]>, String, KafkaConsumer<byte[], byte[]>, BoxedUnit> function3) {
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(topic1(), 500, servers());
        KafkaProducer kafkaProducer = (KafkaProducer) transactionalProducers().head();
        KafkaConsumer<byte[], byte[]> kafka$api$TransactionsTest$$createReadCommittedConsumer = kafka$api$TransactionsTest$$createReadCommittedConsumer("foobar-consumer-group", 500 / 4, kafka$api$TransactionsTest$$createReadCommittedConsumer$default$3());
        kafka$api$TransactionsTest$$createReadCommittedConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1()}))).asJava());
        kafkaProducer.initTransactions();
        BooleanRef create = BooleanRef.create(false);
        IntRef create2 = IntRef.create(0);
        while (create2.elem < 500) {
            try {
                Seq pollUntilAtLeastNumRecords = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(kafka$api$TransactionsTest$$createReadCommittedConsumer, Math.min(10, 500 - create2.elem), TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                kafkaProducer.beginTransaction();
                create.elem = !create.elem;
                pollUntilAtLeastNumRecords.foreach(new TransactionsTest$$anonfun$sendOffset$1(this, kafkaProducer, create));
                function3.apply(kafkaProducer, "foobar-consumer-group", kafka$api$TransactionsTest$$createReadCommittedConsumer);
                if (create.elem) {
                    kafkaProducer.commitTransaction();
                    create2.elem += pollUntilAtLeastNumRecords.size();
                    debug(new TransactionsTest$$anonfun$sendOffset$2(this, create2, pollUntilAtLeastNumRecords));
                } else {
                    kafkaProducer.abortTransaction();
                    debug(new TransactionsTest$$anonfun$sendOffset$3(this, create2, pollUntilAtLeastNumRecords));
                    TestUtils$.MODULE$.resetToCommittedPositions(kafka$api$TransactionsTest$$createReadCommittedConsumer);
                }
            } catch (Throwable th) {
                kafka$api$TransactionsTest$$createReadCommittedConsumer.close();
                throw th;
            }
        }
        kafka$api$TransactionsTest$$createReadCommittedConsumer.close();
        KafkaConsumer kafkaConsumer = (KafkaConsumer) transactionalConsumers().apply(0);
        kafkaConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic2()}))).asJava());
        Seq seq = (Seq) TestUtils$.MODULE$.pollUntilAtLeastNumRecords(kafkaConsumer, 500, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).map(new TransactionsTest$$anonfun$1(this), Seq$.MODULE$.canBuildFrom());
        Set set = seq.toSet();
        Assert.assertEquals(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Expected ", " values in ", "."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(500), topic2()})), 500, seq.size());
        Assert.assertEquals(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Expected ", " unique messages in ", "."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(seq.size()), topic2()})), seq.size(), set.size());
    }

    @Test
    public void testFencingOnCommit() {
        KafkaProducer kafkaProducer = (KafkaProducer) transactionalProducers().apply(0);
        KafkaProducer kafkaProducer2 = (KafkaProducer) transactionalProducers().apply(1);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) transactionalConsumers().apply(0);
        kafkaConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1(), topic2()}))).asJava());
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "1", "1", false));
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "3", "3", false));
        kafkaProducer2.initTransactions();
        kafkaProducer2.beginTransaction();
        kafkaProducer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "2", "4", true));
        kafkaProducer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "2", "4", true));
        try {
            kafkaProducer.commitTransaction();
            throw Assertions$.MODULE$.fail("Should not be able to commit transactions from a fenced producer.", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 330));
        } catch (ProducerFencedException unused) {
            kafkaProducer2.commitTransaction();
            TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testFencingOnCommit$1(this));
        } catch (Exception e) {
            throw Assertions$.MODULE$.fail("Got an unexpected exception from a fenced producer.", e, new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 335));
        }
    }

    @Test
    public void testFencingOnSendOffsets() {
        KafkaProducer kafkaProducer = (KafkaProducer) transactionalProducers().apply(0);
        KafkaProducer kafkaProducer2 = (KafkaProducer) transactionalProducers().apply(1);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) transactionalConsumers().apply(0);
        kafkaConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1(), topic2()}))).asJava());
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "1", "1", false));
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "3", "3", false));
        kafkaProducer2.initTransactions();
        kafkaProducer2.beginTransaction();
        kafkaProducer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "2", "4", true));
        kafkaProducer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "2", "4", true));
        try {
            kafkaProducer.sendOffsetsToTransaction((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new TopicPartition("foobartopic", 0)), new OffsetAndMetadata(110L))}))).asJava(), "foobarGroup");
            throw Assertions$.MODULE$.fail("Should not be able to send offsets from a fenced producer.", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 368));
        } catch (ProducerFencedException unused) {
            kafkaProducer2.commitTransaction();
            TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testFencingOnSendOffsets$1(this));
        } catch (Exception e) {
            throw Assertions$.MODULE$.fail("Got an unexpected exception from a fenced producer.", e, new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 373));
        }
    }

    @Test
    public void testOffsetMetadataInSendOffsetsToTransaction() {
        TopicPartition topicPartition = new TopicPartition(topic1(), 0);
        KafkaProducer kafkaProducer = (KafkaProducer) transactionalProducers().head();
        KafkaConsumer<byte[], byte[]> kafka$api$TransactionsTest$$createReadCommittedConsumer = kafka$api$TransactionsTest$$createReadCommittedConsumer("group", kafka$api$TransactionsTest$$createReadCommittedConsumer$default$2(), kafka$api$TransactionsTest$$createReadCommittedConsumer$default$3());
        kafka$api$TransactionsTest$$createReadCommittedConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1()}))).asJava());
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(110L, Optional.of(Predef$.MODULE$.int2Integer(15)), "some metadata");
        kafkaProducer.sendOffsetsToTransaction((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), offsetAndMetadata)}))).asJava(), "group");
        kafkaProducer.commitTransaction();
        ((KafkaProducer) transactionalProducers().apply(1)).initTransactions();
        TestUtils$.MODULE$.waitUntilTrue(new TransactionsTest$$anonfun$testOffsetMetadataInSendOffsetsToTransaction$1(this, topicPartition, kafka$api$TransactionsTest$$createReadCommittedConsumer, offsetAndMetadata), new TransactionsTest$$anonfun$testOffsetMetadataInSendOffsetsToTransaction$2(this), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
    }

    @Test
    public void testFencingOnSend() {
        KafkaProducer kafkaProducer = (KafkaProducer) transactionalProducers().apply(0);
        KafkaProducer kafkaProducer2 = (KafkaProducer) transactionalProducers().apply(1);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) transactionalConsumers().apply(0);
        kafkaConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1(), topic2()}))).asJava());
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "1", "1", false));
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "3", "3", false));
        kafkaProducer2.initTransactions();
        kafkaProducer2.beginTransaction();
        kafkaProducer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "2", "4", true)).get();
        kafkaProducer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "2", "4", true)).get();
        try {
            error(new TransactionsTest$$anonfun$testFencingOnSend$1(this, (RecordMetadata) kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "1", "5", false)).get()));
            servers().foreach(new TransactionsTest$$anonfun$testFencingOnSend$2(this));
            throw Assertions$.MODULE$.fail("Should not be able to send messages from a fenced producer.", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 435));
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof ProducerFencedException);
            kafkaProducer2.commitTransaction();
            TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testFencingOnSend$3(this));
        } catch (ProducerFencedException unused) {
            kafkaProducer.close();
            kafkaProducer2.commitTransaction();
            TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testFencingOnSend$3(this));
        } catch (Exception e2) {
            throw Assertions$.MODULE$.fail("Got an unexpected exception from a fenced producer.", e2, new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 442));
        }
    }

    @Test
    public void testFencingOnAddPartitions() {
        KafkaProducer kafkaProducer = (KafkaProducer) transactionalProducers().apply(0);
        KafkaProducer kafkaProducer2 = (KafkaProducer) transactionalProducers().apply(1);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) transactionalConsumers().apply(0);
        kafkaConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1(), topic2()}))).asJava());
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "1", "1", false));
        kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "3", "3", false));
        kafkaProducer.abortTransaction();
        kafkaProducer2.initTransactions();
        kafkaProducer2.beginTransaction();
        kafkaProducer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "2", "4", true)).get(20L, TimeUnit.SECONDS);
        kafkaProducer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "2", "4", true)).get(20L, TimeUnit.SECONDS);
        try {
            kafkaProducer.beginTransaction();
            error(new TransactionsTest$$anonfun$testFencingOnAddPartitions$1(this, (RecordMetadata) kafkaProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "1", "5", false)).get()));
            servers().foreach(new TransactionsTest$$anonfun$testFencingOnAddPartitions$2(this));
            throw Assertions$.MODULE$.fail("Should not be able to send messages from a fenced producer.", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 482));
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof ProducerFencedException);
            kafkaProducer2.commitTransaction();
            TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testFencingOnAddPartitions$3(this));
        } catch (Exception e2) {
            throw Assertions$.MODULE$.fail("Got an unexpected exception from a fenced producer.", e2, new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 488));
        } catch (ProducerFencedException unused) {
            kafkaProducer2.commitTransaction();
            TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testFencingOnAddPartitions$3(this));
        }
    }

    @Test
    public void testFencingOnTransactionExpiration() {
        KafkaProducer<byte[], byte[]> kafka$api$TransactionsTest$$createTransactionalProducer = kafka$api$TransactionsTest$$createTransactionalProducer("expiringProducer", 100L, kafka$api$TransactionsTest$$createTransactionalProducer$default$3(), kafka$api$TransactionsTest$$createTransactionalProducer$default$4());
        kafka$api$TransactionsTest$$createTransactionalProducer.initTransactions();
        kafka$api$TransactionsTest$$createTransactionalProducer.beginTransaction();
        Assert.assertTrue(((RecordMetadata) kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "1", "1", false)).get()).hasOffset());
        Thread.sleep(600L);
        try {
            kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "2", "2", false)).get();
            throw Assertions$.MODULE$.fail("should have raised a ProducerFencedException since the transaction has expired", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 516));
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof ProducerFencedException);
            KafkaConsumer kafkaConsumer = (KafkaConsumer) nonTransactionalConsumers().head();
            kafkaConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1()}))).asJava());
            Seq consumeRecords = TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 1, TestUtils$.MODULE$.consumeRecords$default$3());
            Assert.assertEquals(1L, consumeRecords.size());
            Assert.assertEquals("1", TestUtils$.MODULE$.recordValueAsString((ConsumerRecord) consumeRecords.head()));
            KafkaConsumer kafkaConsumer2 = (KafkaConsumer) transactionalConsumers().head();
            kafkaConsumer2.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1()}))).asJava());
            Assert.assertTrue(TestUtils$.MODULE$.consumeRecordsFor(kafkaConsumer2, 1000L).isEmpty());
        } catch (ProducerFencedException unused) {
            KafkaConsumer kafkaConsumer3 = (KafkaConsumer) nonTransactionalConsumers().head();
            kafkaConsumer3.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1()}))).asJava());
            Seq consumeRecords2 = TestUtils$.MODULE$.consumeRecords(kafkaConsumer3, 1, TestUtils$.MODULE$.consumeRecords$default$3());
            Assert.assertEquals(1L, consumeRecords2.size());
            Assert.assertEquals("1", TestUtils$.MODULE$.recordValueAsString((ConsumerRecord) consumeRecords2.head()));
            KafkaConsumer kafkaConsumer22 = (KafkaConsumer) transactionalConsumers().head();
            kafkaConsumer22.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1()}))).asJava());
            Assert.assertTrue(TestUtils$.MODULE$.consumeRecordsFor(kafkaConsumer22, 1000L).isEmpty());
        }
    }

    @Test
    public void testMultipleMarkersOneLeader() {
        KafkaProducer<byte[], byte[]> kafkaProducer = (KafkaProducer) transactionalProducers().head();
        KafkaConsumer kafkaConsumer = (KafkaConsumer) transactionalConsumers().head();
        KafkaConsumer kafkaConsumer2 = (KafkaConsumer) nonTransactionalConsumers().head();
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), BoxesRunTime.boxToInteger(2).toString());
        createTopic("largeTopic", 10, numServers(), properties);
        createTopic("largeTopicOneReplica", 10, 1, new Properties());
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        sendTransactionalMessagesWithValueRange(kafkaProducer, "largeTopic", 0, 5000, false);
        sendTransactionalMessagesWithValueRange(kafkaProducer, "largeTopicOneReplica", 5000, 10000, false);
        kafkaProducer.abortTransaction();
        kafkaProducer.beginTransaction();
        sendTransactionalMessagesWithValueRange(kafkaProducer, "largeTopic", 10000, 11000, true);
        kafkaProducer.commitTransaction();
        kafkaConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"largeTopicOneReplica", "largeTopic"}))).asJava());
        kafkaConsumer2.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"largeTopicOneReplica", "largeTopic"}))).asJava());
        TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 1000, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testMultipleMarkersOneLeader$1(this));
        TestUtils$.MODULE$.consumeRecords(kafkaConsumer2, 11000, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testMultipleMarkersOneLeader$2(this, ((TraversableOnce) package$.MODULE$.Range().apply(0, 11000).map(new TransactionsTest$$anonfun$2(this), IndexedSeq$.MODULE$.canBuildFrom())).toSet()));
    }

    @Test(expected = KafkaException.class)
    public void testConsecutivelyRunInitTransactions() {
        KafkaProducer<byte[], byte[]> kafka$api$TransactionsTest$$createTransactionalProducer = kafka$api$TransactionsTest$$createTransactionalProducer("normalProducer", kafka$api$TransactionsTest$$createTransactionalProducer$default$2(), kafka$api$TransactionsTest$$createTransactionalProducer$default$3(), kafka$api$TransactionsTest$$createTransactionalProducer$default$4());
        kafka$api$TransactionsTest$$createTransactionalProducer.initTransactions();
        kafka$api$TransactionsTest$$createTransactionalProducer.initTransactions();
        throw Assertions$.MODULE$.fail("Should have raised a KafkaException", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 586));
    }

    @Test(expected = TimeoutException.class)
    public void testCommitTransactionTimeout() {
        KafkaProducer<byte[], byte[]> kafka$api$TransactionsTest$$createTransactionalProducer = kafka$api$TransactionsTest$$createTransactionalProducer("transactionalProducer", kafka$api$TransactionsTest$$createTransactionalProducer$default$2(), 1000L, kafka$api$TransactionsTest$$createTransactionalProducer$default$4());
        kafka$api$TransactionsTest$$createTransactionalProducer.initTransactions();
        kafka$api$TransactionsTest$$createTransactionalProducer.beginTransaction();
        kafka$api$TransactionsTest$$createTransactionalProducer.send(new ProducerRecord(topic1(), "foobar".getBytes()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), servers().size()).foreach$mVc$sp(new TransactionsTest$$anonfun$testCommitTransactionTimeout$1(this));
        try {
            kafka$api$TransactionsTest$$createTransactionalProducer.commitTransaction();
        } finally {
            kafka$api$TransactionsTest$$createTransactionalProducer.close(Duration.ZERO);
        }
    }

    @Test
    public void testBumpTransactionalEpoch() {
        KafkaProducer<byte[], byte[]> kafka$api$TransactionsTest$$createTransactionalProducer = kafka$api$TransactionsTest$$createTransactionalProducer("transactionalProducer", kafka$api$TransactionsTest$$createTransactionalProducer$default$2(), kafka$api$TransactionsTest$$createTransactionalProducer$default$3(), 5000);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) transactionalConsumers().head();
        try {
            createTopic("test-topic", numPartitions(), 1, new Properties());
            int waitUntilLeaderIsKnown = TestUtils$.MODULE$.waitUntilLeaderIsKnown(servers(), new TopicPartition("test-topic", 0), TestUtils$.MODULE$.waitUntilLeaderIsKnown$default$3());
            kafka$api$TransactionsTest$$createTransactionalProducer.initTransactions();
            kafka$api$TransactionsTest$$createTransactionalProducer.beginTransaction();
            kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus("test-topic", Predef$.MODULE$.int2Integer(0), "4", "4", true));
            kafka$api$TransactionsTest$$createTransactionalProducer.commitTransaction();
            LogManager logManager = ((KafkaServer) servers().apply(waitUntilLeaderIsKnown)).logManager();
            ProducerStateEntry producerStateEntry = (ProducerStateEntry) ((Tuple2) ((AbstractLog) logManager.getLog(new TopicPartition("test-topic", 0), logManager.getLog$default$2()).get()).producerStateManager().activeProducers().head())._2();
            long producerId = producerStateEntry.producerId();
            short producerEpoch = producerStateEntry.producerEpoch();
            kafka$api$TransactionsTest$$createTransactionalProducer.beginTransaction();
            kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "2", "2", false));
            kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus("test-topic", Predef$.MODULE$.int2Integer(0), "4", "4", false));
            killBroker(waitUntilLeaderIsKnown);
            Future send = kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus("test-topic", Predef$.MODULE$.int2Integer(0), "3", "3", false));
            Thread.sleep(6000L);
            restartDeadBrokers();
            TestUtils.assertFutureThrows(send, TimeoutException.class);
            kafka$api$TransactionsTest$$createTransactionalProducer.abortTransaction();
            kafka$api$TransactionsTest$$createTransactionalProducer.beginTransaction();
            kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic2(), (Integer) null, "2", "2", true));
            kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic1(), (Integer) null, "4", "4", true));
            kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus("test-topic", Predef$.MODULE$.int2Integer(0), "1", "1", true));
            kafka$api$TransactionsTest$$createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus("test-topic", Predef$.MODULE$.int2Integer(0), "3", "3", true));
            kafka$api$TransactionsTest$$createTransactionalProducer.commitTransaction();
            kafkaConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{topic1(), topic2(), "test-topic"}))).asJava());
            TestUtils$.MODULE$.consumeRecords(kafkaConsumer, 5, TestUtils$.MODULE$.consumeRecords$default$3()).foreach(new TransactionsTest$$anonfun$testBumpTransactionalEpoch$1(this));
            LogManager logManager2 = ((KafkaServer) servers().apply(waitUntilLeaderIsKnown)).logManager();
            Assert.assertTrue(((ProducerStateEntry) ((AbstractLog) logManager2.getLog(new TopicPartition("test-topic", 0), logManager2.getLog$default$2()).get()).producerStateManager().activeProducers().apply(BoxesRunTime.boxToLong(producerId))).producerEpoch() > producerEpoch);
        } finally {
            kafka$api$TransactionsTest$$createTransactionalProducer.close(Duration.ZERO);
        }
    }

    private void sendTransactionalMessagesWithValueRange(KafkaProducer<byte[], byte[]> kafkaProducer, String str, int i, int i2, boolean z) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(i), i2).foreach(new TransactionsTest$$anonfun$sendTransactionalMessagesWithValueRange$1(this, kafkaProducer, str, z));
        kafkaProducer.flush();
    }

    public Properties kafka$api$TransactionsTest$$serverProps() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), BoxesRunTime.boxToBoolean(false).toString());
        properties.put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), BoxesRunTime.boxToInteger(1).toString());
        properties.put(KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), BoxesRunTime.boxToInteger(3).toString());
        properties.put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), BoxesRunTime.boxToInteger(2).toString());
        properties.put(KafkaConfig$.MODULE$.TransactionsTopicMinISRProp(), BoxesRunTime.boxToInteger(2).toString());
        properties.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), BoxesRunTime.boxToBoolean(true).toString());
        properties.put(KafkaConfig$.MODULE$.UncleanLeaderElectionEnableProp(), BoxesRunTime.boxToBoolean(false).toString());
        properties.put(KafkaConfig$.MODULE$.AutoLeaderRebalanceEnableProp(), BoxesRunTime.boxToBoolean(false).toString());
        properties.put(KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), "0");
        properties.put(KafkaConfig$.MODULE$.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp(), "200");
        return properties;
    }

    public KafkaConsumer<byte[], byte[]> kafka$api$TransactionsTest$$createReadCommittedConsumer(String str, int i, Properties properties) {
        KafkaConsumer<byte[], byte[]> createConsumer = TestUtils$.MODULE$.createConsumer(TestUtils$.MODULE$.getBrokerListStrFromServers(servers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), str, TestUtils$.MODULE$.createConsumer$default$3(), false, true, i, TestUtils$.MODULE$.createConsumer$default$7(), TestUtils$.MODULE$.createConsumer$default$8(), TestUtils$.MODULE$.createConsumer$default$9(), TestUtils$.MODULE$.createConsumer$default$10(), TestUtils$.MODULE$.createConsumer$default$11());
        transactionalConsumers().$plus$eq(createConsumer);
        return createConsumer;
    }

    private String createReadCommittedConsumer$default$1() {
        return "group";
    }

    public int kafka$api$TransactionsTest$$createReadCommittedConsumer$default$2() {
        return 500;
    }

    public Properties kafka$api$TransactionsTest$$createReadCommittedConsumer$default$3() {
        return new Properties();
    }

    public KafkaConsumer<byte[], byte[]> kafka$api$TransactionsTest$$createReadUncommittedConsumer(String str) {
        KafkaConsumer<byte[], byte[]> createConsumer = TestUtils$.MODULE$.createConsumer(TestUtils$.MODULE$.getBrokerListStrFromServers(servers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), str, TestUtils$.MODULE$.createConsumer$default$3(), false, TestUtils$.MODULE$.createConsumer$default$5(), TestUtils$.MODULE$.createConsumer$default$6(), TestUtils$.MODULE$.createConsumer$default$7(), TestUtils$.MODULE$.createConsumer$default$8(), TestUtils$.MODULE$.createConsumer$default$9(), TestUtils$.MODULE$.createConsumer$default$10(), TestUtils$.MODULE$.createConsumer$default$11());
        nonTransactionalConsumers().$plus$eq(createConsumer);
        return createConsumer;
    }

    public KafkaProducer<byte[], byte[]> kafka$api$TransactionsTest$$createTransactionalProducer(String str, long j, long j2, int i) {
        KafkaProducer<byte[], byte[]> createTransactionalProducer = TestUtils$.MODULE$.createTransactionalProducer(str, servers(), TestUtils$.MODULE$.createTransactionalProducer$default$3(), j, j2, i);
        transactionalProducers().$plus$eq(createTransactionalProducer);
        return createTransactionalProducer;
    }

    public long kafka$api$TransactionsTest$$createTransactionalProducer$default$2() {
        return 60000L;
    }

    public long kafka$api$TransactionsTest$$createTransactionalProducer$default$3() {
        return 60000L;
    }

    public int kafka$api$TransactionsTest$$createTransactionalProducer$default$4() {
        return 120000;
    }
}
