package org.apache.kafka.metadata.migration;

import com.damnhandy.uri.template.UriTemplate;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LoaderManifestType;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.migration.TopicMigrationClient;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.util.Deadline;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.zookeeper.server.persistence.FileSnap;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver.class */
public class KRaftMigrationDriver implements MetadataPublisher {
    private static final Consumer<Throwable> NO_OP_HANDLER = th -> {
    };
    private static final int METADATA_COMMIT_MAX_WAIT_MS = 300000;
    private final Time time;
    private final LogContext logContext;
    private final Logger log;
    private final int nodeId;
    private final MigrationClient zkMigrationClient;
    private final KRaftMigrationZkWriter zkMetadataWriter;
    private final LegacyPropagator propagator;
    private final ZkRecordConsumer zkRecordConsumer;
    private final KafkaEventQueue eventQueue;
    private final FaultHandler faultHandler;
    private final Consumer<MetadataPublisher> initialZkLoadHandler;
    private volatile LeaderAndEpoch leaderAndEpoch;
    private volatile MigrationDriverState migrationState;
    private volatile ZkMigrationLeadershipState migrationLeadershipState;
    private volatile MetadataImage image;
    private volatile QuorumFeatures quorumFeatures;
    private volatile boolean firstPublish;

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$BecomeZkControllerEvent.class */
    class BecomeZkControllerEvent extends MigrationEvent {
        BecomeZkControllerEvent() {
            super();
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.BECOME_CONTROLLER)) {
                KRaftMigrationDriver kRaftMigrationDriver = KRaftMigrationDriver.this;
                MigrationClient migrationClient = KRaftMigrationDriver.this.zkMigrationClient;
                migrationClient.getClass();
                kRaftMigrationDriver.applyMigrationOperation("Claiming ZK controller leadership", migrationClient::claimControllerLeadership);
                if (KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
                    KRaftMigrationDriver.this.log.debug("Unable to claim leadership, will retry until we learn of a different KRaft leader");
                } else if (KRaftMigrationDriver.this.migrationLeadershipState.initialZkMigrationComplete()) {
                    KRaftMigrationDriver.this.transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
                } else {
                    KRaftMigrationDriver.this.transitionTo(MigrationDriverState.ZK_MIGRATION);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$KRaftLeaderEvent.class */
    class KRaftLeaderEvent extends MigrationEvent {
        private final LeaderAndEpoch leaderAndEpoch;

        KRaftLeaderEvent(LeaderAndEpoch leaderAndEpoch) {
            super();
            this.leaderAndEpoch = leaderAndEpoch;
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            KRaftMigrationDriver.this.leaderAndEpoch = this.leaderAndEpoch;
            if (this.leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId)) {
                KRaftMigrationDriver.this.applyMigrationOperation("Became active migration driver", zkMigrationLeadershipState -> {
                    return KRaftMigrationDriver.this.zkMigrationClient.getOrCreateMigrationRecoveryState(zkMigrationLeadershipState).withNewKRaftController(KRaftMigrationDriver.this.nodeId, this.leaderAndEpoch.epoch());
                });
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM);
            } else {
                KRaftMigrationDriver.this.applyMigrationOperation("Became inactive migration driver", zkMigrationLeadershipState2 -> {
                    return zkMigrationLeadershipState2.withNewKRaftController(this.leaderAndEpoch.leaderId().orElse(ZkMigrationLeadershipState.EMPTY.kraftControllerId()), this.leaderAndEpoch.epoch());
                });
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.INACTIVE);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$MetadataChangeEvent.class */
    public class MetadataChangeEvent extends MigrationEvent {
        private final MetadataDelta delta;
        private final MetadataImage image;
        private final MetadataProvenance provenance;
        private final boolean isSnapshot;
        private final Consumer<Throwable> completionHandler;

        MetadataChangeEvent(MetadataDelta metadataDelta, MetadataImage metadataImage, MetadataProvenance metadataProvenance, boolean z, Consumer<Throwable> consumer) {
            super();
            this.delta = metadataDelta;
            this.image = metadataImage;
            this.provenance = metadataProvenance;
            this.isSnapshot = z;
            this.completionHandler = consumer;
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            KRaftMigrationDriver.this.firstPublish = true;
            MetadataImage metadataImage = KRaftMigrationDriver.this.image;
            KRaftMigrationDriver.this.image = this.image;
            String str = this.isSnapshot ? FileSnap.SNAPSHOT_FILE_PREFIX : "delta";
            if (!KRaftMigrationDriver.this.migrationState.allowDualWrite()) {
                KRaftMigrationDriver.this.log.trace("Received metadata {}, but the controller is not in dual-write mode. Ignoring the change to be replicated to Zookeeper", str);
                this.completionHandler.accept(null);
                return;
            }
            if (this.image.highestOffsetAndEpoch().compareTo(KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch()) < 0) {
                KRaftMigrationDriver.this.log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", str, this.provenance);
                this.completionHandler.accept(null);
                return;
            }
            TreeMap treeMap = new TreeMap();
            if (this.isSnapshot) {
                KRaftMigrationZkWriter kRaftMigrationZkWriter = KRaftMigrationDriver.this.zkMetadataWriter;
                MetadataImage metadataImage2 = this.image;
                KRaftMigrationDriver kRaftMigrationDriver = KRaftMigrationDriver.this;
                kRaftMigrationZkWriter.handleSnapshot(metadataImage2, KRaftMigrationDriver.countingOperationConsumer(treeMap, (str2, kRaftMigrationOperation) -> {
                    kRaftMigrationDriver.applyMigrationOperation(str2, kRaftMigrationOperation);
                }));
            } else {
                KRaftMigrationZkWriter kRaftMigrationZkWriter2 = KRaftMigrationDriver.this.zkMetadataWriter;
                MetadataImage metadataImage3 = this.image;
                MetadataDelta metadataDelta = this.delta;
                KRaftMigrationDriver kRaftMigrationDriver2 = KRaftMigrationDriver.this;
                kRaftMigrationZkWriter2.handleDelta(metadataImage, metadataImage3, metadataDelta, KRaftMigrationDriver.countingOperationConsumer(treeMap, (str3, kRaftMigrationOperation2) -> {
                    kRaftMigrationDriver2.applyMigrationOperation(str3, kRaftMigrationOperation2);
                }));
            }
            if (treeMap.isEmpty()) {
                KRaftMigrationDriver.this.log.trace("Did not make any ZK writes when handling KRaft {}", this.isSnapshot ? FileSnap.SNAPSHOT_FILE_PREFIX : "delta");
            } else {
                KRaftMigrationDriver.this.log.debug("Made the following ZK writes when handling KRaft {}: {}", this.isSnapshot ? FileSnap.SNAPSHOT_FILE_PREFIX : "delta", treeMap);
            }
            ZkMigrationLeadershipState withKRaftMetadataOffsetAndEpoch = KRaftMigrationDriver.this.migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(this.image.highestOffsetAndEpoch().offset(), this.image.highestOffsetAndEpoch().epoch());
            KRaftMigrationDriver.this.applyMigrationOperation("Updating ZK migration state after " + str, zkMigrationLeadershipState -> {
                return KRaftMigrationDriver.this.zkMigrationClient.setMigrationRecoveryState(withKRaftMetadataOffsetAndEpoch);
            });
            if (this.delta.topicsDelta() == null && this.delta.clusterDelta() == null) {
                KRaftMigrationDriver.this.log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", str);
            } else {
                KRaftMigrationDriver.this.log.trace("Sending RPCs to brokers for metadata {}.", str);
                KRaftMigrationDriver.this.propagator.sendRPCsToBrokersFromMetadataDelta(this.delta, this.image, KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpoch());
            }
            this.completionHandler.accept(null);
        }

        @Override // org.apache.kafka.metadata.migration.KRaftMigrationDriver.MigrationEvent, org.apache.kafka.queue.EventQueue.Event
        public void handleException(Throwable th) {
            this.completionHandler.accept(th);
            super.handleException(th);
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$MigrateMetadataEvent.class */
    class MigrateMetadataEvent extends MigrationEvent {
        MigrateMetadataEvent() {
            super();
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.ZK_MIGRATION)) {
                HashSet hashSet = new HashSet();
                KRaftMigrationDriver.this.log.info("Starting ZK migration");
                KRaftMigrationDriver.this.zkRecordConsumer.beginMigration();
                try {
                    AtomicInteger atomicInteger = new AtomicInteger(0);
                    MigrationClient migrationClient = KRaftMigrationDriver.this.zkMigrationClient;
                    Consumer<List<ApiMessageAndVersion>> consumer = list -> {
                        try {
                            if (KRaftMigrationDriver.this.log.isTraceEnabled()) {
                                KRaftMigrationDriver.this.log.trace("Migrating {} records from ZK: {}", Integer.valueOf(list.size()), KRaftMigrationDriver.recordBatchToString(list));
                            } else {
                                KRaftMigrationDriver.this.log.info("Migrating {} records from ZK", Integer.valueOf(list.size()));
                            }
                            FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", "the metadata layer to commit migration record batch", KRaftMigrationDriver.this.zkRecordConsumer.acceptBatch(list), Deadline.fromDelay(KRaftMigrationDriver.this.time, ProducerStateManager.LATE_TRANSACTION_BUFFER_MS, TimeUnit.MILLISECONDS), KRaftMigrationDriver.this.time);
                            atomicInteger.addAndGet(list.size());
                        } catch (Throwable th) {
                            throw new RuntimeException(th);
                        }
                    };
                    hashSet.getClass();
                    migrationClient.readAllMetadata(consumer, (v1) -> {
                        r2.add(v1);
                    });
                    OffsetAndEpoch offsetAndEpoch = (OffsetAndEpoch) FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", "the metadata layer to complete the migration", KRaftMigrationDriver.this.zkRecordConsumer.completeMigration(), Deadline.fromDelay(KRaftMigrationDriver.this.time, ProducerStateManager.LATE_TRANSACTION_BUFFER_MS, TimeUnit.MILLISECONDS), KRaftMigrationDriver.this.time);
                    KRaftMigrationDriver.this.log.info("Completed migration of metadata from Zookeeper to KRaft. A total of {} metadata records were generated. The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the migrated metadata {}.", Integer.valueOf(atomicInteger.get()), Long.valueOf(offsetAndEpoch.offset()), Integer.valueOf(offsetAndEpoch.epoch()), Integer.valueOf(hashSet.size()), hashSet);
                    ZkMigrationLeadershipState withKRaftMetadataOffsetAndEpoch = KRaftMigrationDriver.this.migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(offsetAndEpoch.offset(), offsetAndEpoch.epoch());
                    KRaftMigrationDriver.this.applyMigrationOperation("Finished initial migration of ZK metadata to KRaft", zkMigrationLeadershipState -> {
                        return KRaftMigrationDriver.this.zkMigrationClient.setMigrationRecoveryState(withKRaftMetadataOffsetAndEpoch);
                    });
                    KRaftMigrationDriver.this.transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
                } catch (Throwable th) {
                    KRaftMigrationDriver.this.zkRecordConsumer.abortMigration();
                    super.handleException(th);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$MigrationEvent.class */
    abstract class MigrationEvent implements EventQueue.Event {
        MigrationEvent() {
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void handleException(Throwable th) {
            if (th instanceof MigrationClientAuthException) {
                KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, th);
                return;
            }
            if (th instanceof MigrationClientException) {
                KRaftMigrationDriver.this.log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), th.getCause());
            } else if (th instanceof RejectedExecutionException) {
                KRaftMigrationDriver.this.log.debug("Not processing {} because the event queue is closed.", this);
            } else {
                KRaftMigrationDriver.this.faultHandler.handleFault("Unhandled error in " + this, th);
            }
        }

        public String toString() {
            return getClass().getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$PollEvent.class */
    public class PollEvent extends MigrationEvent {
        PollEvent() {
            super();
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            switch (KRaftMigrationDriver.this.migrationState) {
                case UNINITIALIZED:
                    KRaftMigrationDriver.this.recoverMigrationStateFromZK();
                    break;
                case WAIT_FOR_CONTROLLER_QUORUM:
                    KRaftMigrationDriver.this.eventQueue.append(new WaitForControllerQuorumEvent());
                    break;
                case WAIT_FOR_BROKERS:
                    KRaftMigrationDriver.this.eventQueue.append(new WaitForZkBrokersEvent());
                    break;
                case BECOME_CONTROLLER:
                    KRaftMigrationDriver.this.eventQueue.append(new BecomeZkControllerEvent());
                    break;
                case ZK_MIGRATION:
                    KRaftMigrationDriver.this.eventQueue.append(new MigrateMetadataEvent());
                    break;
                case SYNC_KRAFT_TO_ZK:
                    KRaftMigrationDriver.this.eventQueue.append(new SyncKRaftMetadataEvent());
                    break;
                case KRAFT_CONTROLLER_TO_BROKER_COMM:
                    KRaftMigrationDriver.this.eventQueue.append(new SendRPCsToBrokersEvent());
                    break;
            }
            KRaftMigrationDriver.this.eventQueue.scheduleDeferred("poll", new EventQueue.DeadlineFunction(KRaftMigrationDriver.this.time.nanoseconds() + TimeUnit.NANOSECONDS.convert(1L, TimeUnit.SECONDS)), new PollEvent());
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$SendRPCsToBrokersEvent.class */
    class SendRPCsToBrokersEvent extends MigrationEvent {
        SendRPCsToBrokersEvent() {
            super();
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM)) {
                if (KRaftMigrationDriver.this.image.highestOffsetAndEpoch().compareTo(KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch()) < 0) {
                    KRaftMigrationDriver.this.log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}", KRaftMigrationDriver.this.migrationLeadershipState.offsetAndEpoch());
                    return;
                }
                KRaftMigrationDriver.this.log.trace("Sending RPCs to broker before moving to dual-write mode using at offset and epoch {}", KRaftMigrationDriver.this.image.highestOffsetAndEpoch());
                KRaftMigrationDriver.this.propagator.sendRPCsToBrokersFromMetadataImage(KRaftMigrationDriver.this.image, KRaftMigrationDriver.this.migrationLeadershipState.zkControllerEpoch());
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.DUAL_WRITE);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$SyncKRaftMetadataEvent.class */
    class SyncKRaftMetadataEvent extends MigrationEvent {
        SyncKRaftMetadataEvent() {
            super();
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.SYNC_KRAFT_TO_ZK)) {
                KRaftMigrationDriver.this.log.info("Performing a full metadata sync from KRaft to ZK.");
                TreeMap treeMap = new TreeMap();
                KRaftMigrationZkWriter kRaftMigrationZkWriter = KRaftMigrationDriver.this.zkMetadataWriter;
                MetadataImage metadataImage = KRaftMigrationDriver.this.image;
                KRaftMigrationDriver kRaftMigrationDriver = KRaftMigrationDriver.this;
                kRaftMigrationZkWriter.handleSnapshot(metadataImage, KRaftMigrationDriver.countingOperationConsumer(treeMap, (str, kRaftMigrationOperation) -> {
                    kRaftMigrationDriver.applyMigrationOperation(str, kRaftMigrationOperation);
                }));
                KRaftMigrationDriver.this.log.info("Made the following ZK writes when reconciling with KRaft state: {}", treeMap);
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$WaitForControllerQuorumEvent.class */
    class WaitForControllerQuorumEvent extends MigrationEvent {
        WaitForControllerQuorumEvent() {
            super();
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM)) {
                if (!KRaftMigrationDriver.this.firstPublish) {
                    KRaftMigrationDriver.this.log.trace("Waiting until we have received metadata before proceeding with migration");
                    return;
                }
                switch (KRaftMigrationDriver.this.image.features().zkMigrationState()) {
                    case NONE:
                        KRaftMigrationDriver.this.log.error("The controller's ZkMigrationState is NONE which means this cluster should not be migrated from ZooKeeper. This controller should not be configured with 'zookeeper.metadata.migration.enable' set to true. Will not proceed with a migration.");
                        KRaftMigrationDriver.this.transitionTo(MigrationDriverState.INACTIVE);
                        return;
                    case PRE_MIGRATION:
                        if (KRaftMigrationDriver.this.isControllerQuorumReadyForMigration()) {
                            KRaftMigrationDriver.this.log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
                            KRaftMigrationDriver.this.transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
                            return;
                        }
                        return;
                    case MIGRATION:
                        if (KRaftMigrationDriver.this.migrationLeadershipState.initialZkMigrationComplete()) {
                            KRaftMigrationDriver.this.log.debug("Migration is in already progress, not waiting on ZK brokers.");
                            KRaftMigrationDriver.this.transitionTo(MigrationDriverState.BECOME_CONTROLLER);
                            return;
                        } else {
                            KRaftMigrationDriver.this.log.error("KRaft controller indicates an active migration, but the ZK state does not.");
                            KRaftMigrationDriver.this.transitionTo(MigrationDriverState.INACTIVE);
                            return;
                        }
                    case POST_MIGRATION:
                        KRaftMigrationDriver.this.log.error("KRaft controller indicates a completed migration, but the migration driver is somehow active.");
                        KRaftMigrationDriver.this.transitionTo(MigrationDriverState.INACTIVE);
                        return;
                    default:
                        return;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/migration/KRaftMigrationDriver$WaitForZkBrokersEvent.class */
    class WaitForZkBrokersEvent extends MigrationEvent {
        WaitForZkBrokersEvent() {
            super();
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            if (KRaftMigrationDriver.this.checkDriverState(MigrationDriverState.WAIT_FOR_BROKERS) && KRaftMigrationDriver.this.areZkBrokersReadyForMigration()) {
                KRaftMigrationDriver.this.log.debug("Zk brokers are registered and ready for migration");
                KRaftMigrationDriver.this.transitionTo(MigrationDriverState.BECOME_CONTROLLER);
            }
        }
    }

    public KRaftMigrationDriver(int i, ZkRecordConsumer zkRecordConsumer, MigrationClient migrationClient, LegacyPropagator legacyPropagator, Consumer<MetadataPublisher> consumer, FaultHandler faultHandler, QuorumFeatures quorumFeatures, Time time) {
        this.nodeId = i;
        this.zkRecordConsumer = zkRecordConsumer;
        this.zkMigrationClient = migrationClient;
        this.propagator = legacyPropagator;
        this.time = time;
        this.logContext = new LogContext("[KRaftMigrationDriver id=" + i + "] ");
        this.log = this.logContext.logger(KRaftMigrationDriver.class);
        this.migrationState = MigrationDriverState.UNINITIALIZED;
        this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, this.logContext, "controller-" + i + "-migration-driver-");
        this.image = MetadataImage.EMPTY;
        this.firstPublish = false;
        this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
        this.initialZkLoadHandler = consumer;
        this.faultHandler = faultHandler;
        this.quorumFeatures = quorumFeatures;
        this.zkMetadataWriter = new KRaftMigrationZkWriter(migrationClient);
    }

    public KRaftMigrationDriver(int i, ZkRecordConsumer zkRecordConsumer, MigrationClient migrationClient, LegacyPropagator legacyPropagator, Consumer<MetadataPublisher> consumer, FaultHandler faultHandler, QuorumFeatures quorumFeatures) {
        this(i, zkRecordConsumer, migrationClient, legacyPropagator, consumer, faultHandler, quorumFeatures, Time.SYSTEM);
    }

    public void start() {
        this.eventQueue.prepend(new PollEvent());
    }

    public void shutdown() throws InterruptedException {
        this.eventQueue.beginShutdown("KRaftMigrationDriver#shutdown");
        this.log.debug("Shutting down KRaftMigrationDriver");
        this.eventQueue.close();
    }

    public CompletableFuture<MigrationDriverState> migrationState() {
        CompletableFuture<MigrationDriverState> completableFuture = new CompletableFuture<>();
        this.eventQueue.append(() -> {
            completableFuture.complete(this.migrationState);
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recoverMigrationStateFromZK() {
        MigrationClient migrationClient = this.zkMigrationClient;
        migrationClient.getClass();
        applyMigrationOperation("Recovering migration state from ZK", migrationClient::getOrCreateMigrationRecoveryState);
        this.log.info("Initial migration of ZK metadata is {}.", this.migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done");
        this.initialZkLoadHandler.accept(this);
        transitionTo(MigrationDriverState.INACTIVE);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isControllerQuorumReadyForMigration() {
        Optional<String> reasonAllControllersZkMigrationNotReady = this.quorumFeatures.reasonAllControllersZkMigrationNotReady();
        if (!reasonAllControllersZkMigrationNotReady.isPresent()) {
            return true;
        }
        this.log.info("Still waiting for all controller nodes ready to begin the migration. due to:" + reasonAllControllersZkMigrationNotReady.get());
        return false;
    }

    private boolean imageDoesNotContainAllBrokers(MetadataImage metadataImage, Set<Integer> set) {
        for (BrokerRegistration brokerRegistration : metadataImage.cluster().brokers().values()) {
            if (brokerRegistration.isMigratingZkBroker()) {
                set.remove(Integer.valueOf(brokerRegistration.id()));
            }
        }
        return !set.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean areZkBrokersReadyForMigration() {
        if (!this.firstPublish) {
            this.log.info("Waiting for initial metadata publish before checking if Zk brokers are registered.");
            return false;
        }
        if (this.image.cluster().isEmpty()) {
            this.log.info("No brokers are known to KRaft, waiting for brokers to register.");
            return false;
        }
        Set<Integer> readBrokerIds = this.zkMigrationClient.readBrokerIds();
        if (readBrokerIds.isEmpty()) {
            this.log.info("No brokers are registered in ZK, waiting for brokers to register.");
            return false;
        }
        if (imageDoesNotContainAllBrokers(this.image, readBrokerIds)) {
            this.log.info("Still waiting for ZK brokers {} to register with KRaft.", readBrokerIds);
            return false;
        }
        HashSet hashSet = new HashSet();
        this.zkMigrationClient.topicClient().iterateTopics(EnumSet.of(TopicMigrationClient.TopicVisitorInterest.TOPICS), (str, uuid, map) -> {
            Collection values = map.values();
            hashSet.getClass();
            values.forEach((v1) -> {
                r1.addAll(v1);
            });
        });
        if (!imageDoesNotContainAllBrokers(this.image, hashSet)) {
            return true;
        }
        this.log.info("Still waiting for ZK brokers {} found in metadata to register with KRaft.", hashSet);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void applyMigrationOperation(String str, KRaftMigrationOperation kRaftMigrationOperation) {
        ZkMigrationLeadershipState zkMigrationLeadershipState = this.migrationLeadershipState;
        ZkMigrationLeadershipState apply = kRaftMigrationOperation.apply(zkMigrationLeadershipState);
        if (apply.loggableChangeSinceState(zkMigrationLeadershipState)) {
            this.log.info("{}. Transitioned migration state from {} to {}", str, zkMigrationLeadershipState, apply);
        } else if (apply.equals(zkMigrationLeadershipState)) {
            this.log.trace("{}. Kept migration state as {}", str, apply);
        } else {
            this.log.trace("{}. Transitioned migration state from {} to {}", str, zkMigrationLeadershipState, apply);
        }
        this.migrationLeadershipState = apply;
    }

    private boolean isValidStateChange(MigrationDriverState migrationDriverState) {
        if (this.migrationState == migrationDriverState) {
            return true;
        }
        if (migrationDriverState == MigrationDriverState.UNINITIALIZED) {
            return false;
        }
        switch (this.migrationState) {
            case UNINITIALIZED:
            case DUAL_WRITE:
                return migrationDriverState == MigrationDriverState.INACTIVE;
            case INACTIVE:
                return migrationDriverState == MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM;
            case WAIT_FOR_CONTROLLER_QUORUM:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.BECOME_CONTROLLER || migrationDriverState == MigrationDriverState.WAIT_FOR_BROKERS;
            case WAIT_FOR_BROKERS:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.BECOME_CONTROLLER;
            case BECOME_CONTROLLER:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.ZK_MIGRATION || migrationDriverState == MigrationDriverState.SYNC_KRAFT_TO_ZK;
            case ZK_MIGRATION:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.SYNC_KRAFT_TO_ZK;
            case SYNC_KRAFT_TO_ZK:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM;
            case KRAFT_CONTROLLER_TO_BROKER_COMM:
                return migrationDriverState == MigrationDriverState.INACTIVE || migrationDriverState == MigrationDriverState.DUAL_WRITE;
            default:
                this.log.error("Migration driver trying to transition from an unknown state {}", this.migrationState);
                return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkDriverState(MigrationDriverState migrationDriverState) {
        if (this.migrationState.equals(migrationDriverState)) {
            return true;
        }
        this.log.info("Expected driver state {} but found {}. Not running this event {}.", migrationDriverState, this.migrationState, getClass().getSimpleName());
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void transitionTo(MigrationDriverState migrationDriverState) {
        if (!isValidStateChange(migrationDriverState)) {
            throw new IllegalStateException(String.format("Invalid transition in migration driver from %s to %s", this.migrationState, migrationDriverState));
        }
        if (migrationDriverState != this.migrationState) {
            this.log.debug("{} transitioning from {} to {} state", Integer.valueOf(this.nodeId), this.migrationState, migrationDriverState);
        } else {
            this.log.trace("{} transitioning from {} to {} state", Integer.valueOf(this.nodeId), this.migrationState, migrationDriverState);
        }
        this.migrationState = migrationDriverState;
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher
    public String name() {
        return "KRaftMigrationDriver";
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher
    public void onControllerChange(LeaderAndEpoch leaderAndEpoch) {
        this.eventQueue.append(new KRaftLeaderEvent(leaderAndEpoch));
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher
    public void onMetadataUpdate(MetadataDelta metadataDelta, MetadataImage metadataImage, LoaderManifest loaderManifest) {
        enqueueMetadataChangeEvent(metadataDelta, metadataImage, loaderManifest.provenance(), loaderManifest.type() == LoaderManifestType.SNAPSHOT, NO_OP_HANDLER);
    }

    void enqueueMetadataChangeEvent(MetadataDelta metadataDelta, MetadataImage metadataImage, MetadataProvenance metadataProvenance, boolean z, Consumer<Throwable> consumer) {
        this.eventQueue.append(new MetadataChangeEvent(metadataDelta, metadataImage, metadataProvenance, z, consumer));
    }

    @Override // org.apache.kafka.image.publisher.MetadataPublisher, java.lang.AutoCloseable
    public void close() throws Exception {
        this.eventQueue.close();
    }

    static KRaftMigrationOperationConsumer countingOperationConsumer(Map<String, Integer> map, BiConsumer<String, KRaftMigrationOperation> biConsumer) {
        return (str, str2, kRaftMigrationOperation) -> {
            map.compute(str, (str, num) -> {
                if (num == null) {
                    return 1;
                }
                return Integer.valueOf(num.intValue() + 1);
            });
            biConsumer.accept(str2, kRaftMigrationOperation);
        };
    }

    static String recordBatchToString(Collection<ApiMessageAndVersion> collection) {
        return "[" + ((String) collection.stream().map(apiMessageAndVersion -> {
            if (apiMessageAndVersion.message().apiKey() != MetadataRecordType.CONFIG_RECORD.id()) {
                return apiMessageAndVersion.toString();
            }
            StringBuilder sb = new StringBuilder();
            sb.append("ApiMessageAndVersion(");
            ConfigRecord configRecord = (ConfigRecord) apiMessageAndVersion.message();
            sb.append("ConfigRecord(");
            sb.append("resourceType=");
            sb.append((int) configRecord.resourceType());
            sb.append(", resourceName=");
            sb.append(configRecord.resourceName());
            sb.append(", name=");
            sb.append(configRecord.name());
            sb.append(")");
            sb.append(" at version ");
            sb.append((int) apiMessageAndVersion.version());
            sb.append(")");
            return sb.toString();
        }).collect(Collectors.joining(UriTemplate.DEFAULT_SEPARATOR))) + "]";
    }
}
