package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import io.confluent.shaded.org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Time;

@NotThreadSafe
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ExecutorInterBrokerReplicaMovement.class */
public class ExecutorInterBrokerReplicaMovement extends AbstractExecutorReplicaMovement {
    private final Set<Integer> removedBrokers;
    private final LoadMonitor loadMonitor;
    private final long retryWaitMs;
    private final Time time;
    private final Cluster cluster;
    private final int numTotalPartitionMovements;
    private final long totalDataToMoveInMB;
    private final long sleepIntervalMs = 100;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ExecutorInterBrokerReplicaMovement$NewReplicaAssignments.class */
    public static class NewReplicaAssignments {
        Map<TopicPartition, Optional<NewPartitionReassignment>> newReplicaAssignments;
        List<ExecutionTask> tasksToRetry;

        public NewReplicaAssignments(Map<TopicPartition, Optional<NewPartitionReassignment>> map, List<ExecutionTask> list) {
            this.newReplicaAssignments = map;
            this.tasksToRetry = list;
        }
    }

    public ExecutorInterBrokerReplicaMovement(String str, ExecutionTaskManager executionTaskManager, Set<Integer> set, ReplicationThrottleHelper replicationThrottleHelper, ConfluentAdmin confluentAdmin, SbkAdminUtils sbkAdminUtils, AtomicBoolean atomicBoolean, Set<Integer> set2, LoadMonitor loadMonitor, Cluster cluster, Time time, long j) {
        super(str, executionTaskManager, set, replicationThrottleHelper, confluentAdmin, sbkAdminUtils, atomicBoolean);
        this.sleepIntervalMs = 100L;
        this.cluster = cluster;
        this.time = time;
        this.retryWaitMs = j;
        this.loadMonitor = loadMonitor;
        this.removedBrokers = set2;
        this.numTotalPartitionMovements = this.executionTaskManager.numPendingInterBrokerPartitionMovements();
        this.totalDataToMoveInMB = this.executionTaskManager.remainingInterBrokerDataToMoveInMB();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.linkedin.kafka.cruisecontrol.executor.AbstractExecutorReplicaMovement
    public ExecutorState.State state() {
        return ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.linkedin.kafka.cruisecontrol.executor.AbstractExecutorReplicaMovement
    public ExecutorState executorState() {
        return ExecutorState.operationInProgress(state(), this.executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)), this.executionTaskManager.interBrokerPartitionMovementConcurrency(), this.executionTaskManager.intraBrokerPartitionMovementConcurrency(), this.executionTaskManager.leadershipMovementConcurrency(), this.uuid, this.recentlyRemovedBrokers);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.linkedin.kafka.cruisecontrol.executor.AbstractExecutorReplicaMovement
    public ExecutionTask.TaskType taskType() {
        return ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
    }

    @Override // com.linkedin.kafka.cruisecontrol.executor.AbstractExecutorReplicaMovement
    public void move(Executor.ExecutionTaskWaiter executionTaskWaiter) throws InterruptedException {
        LOG.info("Starting {} inter-broker partition movements.", Integer.valueOf(this.numTotalPartitionMovements));
        maybeRetryTasks(doMove(this.numTotalPartitionMovements, executionTaskWaiter, true), executionTaskWaiter);
        Set<ExecutionTask> inExecutionTasks = this.executionTaskManager.inExecutionTasks();
        while (!inExecutionTasks.isEmpty()) {
            LOG.info("Waiting for {} tasks moving {} MB to finish.", Integer.valueOf(inExecutionTasks.size()), Long.valueOf(this.executionTaskManager.inExecutionInterBrokerDataToMoveInMB()));
            List<ExecutionTask> waitForAnyTaskToFinish = executionTaskWaiter.waitForAnyTaskToFinish(this);
            inExecutionTasks = this.executionTaskManager.inExecutionTasks();
            synchronized (this.throttleHelper) {
                this.throttleHelper.clearThrottles(waitForAnyTaskToFinish, new ArrayList(inExecutionTasks), this.removedBrokers);
            }
        }
    }

    private void maybeRetryTasks(List<ExecutionTask> list, Executor.ExecutionTaskWaiter executionTaskWaiter) throws InterruptedException {
        if (list.isEmpty()) {
            LOG.debug("No inter-broker partition movement tasks to retry.");
            return;
        }
        int numInterBrokerPartitionMovementsToBeRetried = this.executionTaskManager.numInterBrokerPartitionMovementsToBeRetried();
        if (this.stopRequested.get()) {
            LOG.info("Skipping retry of {} tasks because replica reassignment was interrupted", Integer.valueOf(numInterBrokerPartitionMovementsToBeRetried));
            return;
        }
        LOG.info("Will be retrying {} failed tasks in {} ms", Integer.valueOf(numInterBrokerPartitionMovementsToBeRetried), Long.valueOf(this.retryWaitMs));
        if (sleepFor(this.retryWaitMs)) {
            LOG.info("Skipping retry of {} tasks because replica reassignment was interrupted", Integer.valueOf(numInterBrokerPartitionMovementsToBeRetried));
            return;
        }
        this.executionTaskManager.reloadInterBrokerTasksToBeRetried(this.cluster);
        LOG.info("Retrying {} tasks", Integer.valueOf(this.executionTaskManager.numPendingInterBrokerPartitionMovements()));
        doMove(numInterBrokerPartitionMovementsToBeRetried, executionTaskWaiter, false);
    }

    private boolean sleepFor(long j) {
        long hiResClockMs = this.time.hiResClockMs() + j;
        while (this.time.hiResClockMs() < hiResClockMs) {
            if (this.stopRequested.get()) {
                return true;
            }
            this.time.sleep(100L);
        }
        return this.stopRequested.get();
    }

    private List<ExecutionTask> doMove(int i, Executor.ExecutionTaskWaiter executionTaskWaiter, boolean z) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        boolean z2 = i > 0 || this.executionTaskManager.inExecutionTasks().size() > 0;
        while (true) {
            if (!z2) {
                break;
            }
            if (this.stopRequested.get()) {
                LOG.info("Stopping inter-broker replica reassignment because it was interrupted.");
                break;
            }
            List<ExecutionTask> executeBatch = executeBatch(z);
            arrayList.addAll(executeBatch);
            List<ExecutionTask> waitForAnyTaskToFinish = executionTaskWaiter.waitForAnyTaskToFinish(this);
            logProgress(waitForAnyTaskToFinish);
            int numPendingInterBrokerPartitionMovements = this.executionTaskManager.numPendingInterBrokerPartitionMovements();
            Set<ExecutionTask> inExecutionTasks = this.executionTaskManager.inExecutionTasks();
            ArrayList arrayList2 = new ArrayList(waitForAnyTaskToFinish);
            arrayList2.addAll(executeBatch);
            clearThrottles(arrayList2, inExecutionTasks);
            z2 = numPendingInterBrokerPartitionMovements > 0 || inExecutionTasks.size() > 0;
        }
        return arrayList;
    }

    private List<ExecutionTask> executeBatch(boolean z) throws InterruptedException {
        List<ExecutionTask> drainInterBrokerReplicaMovementTasks = this.executionTaskManager.drainInterBrokerReplicaMovementTasks();
        if (drainInterBrokerReplicaMovementTasks.isEmpty()) {
            LOG.info("There were no eligible tasks to execute. In execution tasks: {},", this.executionTaskManager.inExecutionTasks().stream().map((v0) -> {
                return v0.executionId();
            }).collect(Collectors.toList()));
            return Collections.emptyList();
        }
        LOG.info("Executor will execute {} task(s): {} as part of operation {}", Integer.valueOf(drainInterBrokerReplicaMovementTasks.size()), drainInterBrokerReplicaMovementTasks, this.uuid);
        setThrottles(drainInterBrokerReplicaMovementTasks);
        this.executionTaskManager.markTasksInProgress(drainInterBrokerReplicaMovementTasks);
        List<ExecutionTask> executeReplicaReassignmentTasks = executeReplicaReassignmentTasks(drainInterBrokerReplicaMovementTasks, z);
        if (!executeReplicaReassignmentTasks.isEmpty()) {
            this.executionTaskManager.markTasksToBeRetried(executeReplicaReassignmentTasks);
            LOG.info("{} tasks failed with a retriable error. Saving them for later (tasks: {})", Integer.valueOf(executeReplicaReassignmentTasks.size()), executeReplicaReassignmentTasks);
        }
        return executeReplicaReassignmentTasks;
    }

    private void setThrottles(List<ExecutionTask> list) throws InterruptedException {
        synchronized (this.throttleHelper) {
            this.throttleHelper.setThrottles((List) list.stream().map((v0) -> {
                return v0.proposal();
            }).collect(Collectors.toList()), this.loadMonitor, this.removedBrokers);
        }
    }

    private void clearThrottles(List<ExecutionTask> list, Set<ExecutionTask> set) {
        synchronized (this.throttleHelper) {
            this.throttleHelper.clearThrottles(list, new ArrayList(set), this.removedBrokers);
        }
    }

    List<ExecutionTask> executeReplicaReassignmentTasks(List<ExecutionTask> list, boolean z) throws InterruptedException {
        if (list == null || list.isEmpty()) {
            return Collections.emptyList();
        }
        Map<TopicPartition, ExecutionTask> map = (Map) list.stream().collect(Collectors.toMap(executionTask -> {
            return executionTask.proposal().topicPartition();
        }, executionTask2 -> {
            return executionTask2;
        }));
        NewReplicaAssignments determineReassignmentReplicas = determineReassignmentReplicas(map, z);
        if (determineReassignmentReplicas.newReplicaAssignments.isEmpty()) {
            return determineReassignmentReplicas.tasksToRetry;
        }
        ArrayList arrayList = new ArrayList(determineReassignmentReplicas.tasksToRetry);
        arrayList.addAll(alterPartitionReassignments(map, determineReassignmentReplicas.newReplicaAssignments, z));
        return arrayList;
    }

    private NewReplicaAssignments determineReassignmentReplicas(Map<TopicPartition, ExecutionTask> map, boolean z) throws InterruptedException {
        Set<TopicPartition> keySet = map.keySet();
        Map<TopicPartition, Executor.PartitionReplicas> listTargetReplicasBeingReassigned = this.adminUtils.listTargetReplicasBeingReassigned(Optional.of(keySet));
        Map<TopicPartition, SbkAdminUtils.ReplicaDescription> replicasForPartitions = this.adminUtils.getReplicasForPartitions(keySet);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (ExecutionTask executionTask : map.values()) {
            TopicPartition topicPartition = executionTask.proposal().topicPartition();
            SbkAdminUtils.ReplicaDescription replicaDescription = (SbkAdminUtils.ReplicaDescription) Objects.requireNonNull(replicasForPartitions.get(topicPartition));
            if (replicaDescription.isFailed()) {
                Throwable th = replicaDescription.throwableOpt.get();
                if (z) {
                    LOG.warn("Describing topic partition {} failed due to {}. Will retry it later.", topicPartition, th);
                    arrayList.add(executionTask);
                } else if (th instanceof UnknownTopicOrPartitionException) {
                    LOG.info("Topic partition {} could not be found. It is possible that the topic was deleted while the reassignment execution was taking place. Ignoring this exception...", topicPartition);
                } else {
                    SbkAdminUtils.sneakyThrow(th);
                }
            }
            Optional<Executor.PartitionReplicas> replicasToWrite = replicasToWrite(executionTask, replicaDescription.replicaSet, Optional.ofNullable(listTargetReplicasBeingReassigned.get(topicPartition)));
            if (replicasToWrite.isPresent()) {
                hashMap.put(topicPartition, Optional.of(NewPartitionReassignment.ofReplicasAndObservers(replicasToWrite.get().replicas(), replicasToWrite.get().observers())));
            } else {
                LOG.warn("We couldn't find a good partition placement based on the data we received we received from the latest Admin API calls. TopicPartition: {}", topicPartition);
            }
        }
        return new NewReplicaAssignments(hashMap, arrayList);
    }

    private List<ExecutionTask> alterPartitionReassignments(Map<TopicPartition, ExecutionTask> map, Map<TopicPartition, Optional<NewPartitionReassignment>> map2, boolean z) {
        AlterPartitionReassignmentsResult alterPartitionReassignments = this.adminClient.alterPartitionReassignments(map2);
        try {
            alterPartitionReassignments.all().get();
        } catch (Throwable th) {
            if (z && isInvalidReplicaAssignmentException(th)) {
                Stream<TopicPartition> stream = filterInvalidReplicaAssignments(alterPartitionReassignments).stream();
                map.getClass();
                return (List) stream.map((v1) -> {
                    return r1.get(v1);
                }).collect(Collectors.toList());
            }
            SbkAdminUtils.sneakyThrow(th);
        }
        return Collections.emptyList();
    }

    private List<TopicPartition> filterInvalidReplicaAssignments(AlterPartitionReassignmentsResult alterPartitionReassignmentsResult) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<TopicPartition, KafkaFuture<Void>> entry : alterPartitionReassignmentsResult.values().entrySet()) {
            try {
                entry.getValue().get();
            } catch (Throwable th) {
                if (isInvalidReplicaAssignmentException(th)) {
                    arrayList.add(entry.getKey());
                } else {
                    SbkAdminUtils.sneakyThrow(th);
                }
            }
        }
        return arrayList;
    }

    private boolean isInvalidReplicaAssignmentException(Throwable th) {
        return (th instanceof ExecutionException) && th.getCause() != null && (th.getCause() instanceof InvalidReplicaAssignmentException);
    }

    private Optional<Executor.PartitionReplicas> replicasToWrite(ExecutionTask executionTask, List<Integer> list, Optional<Executor.PartitionReplicas> optional) {
        if (executionTask.state() != ExecutionTask.State.IN_PROGRESS) {
            LOG.warn("The inter-broker movement is trying to place a task which has already been scheduled. Task ID: {} Task state: {}", Integer.valueOf(executionTask.brokerId()), executionTask.state());
            return Optional.empty();
        }
        if (list == null || list.isEmpty()) {
            LOG.warn("Could not determine the replicas for partition {}. It is possible the topic or partition doesn't exist.", executionTask.proposal().topicPartition());
            return Optional.empty();
        }
        List list2 = (List) executionTask.proposal().newReplicas().stream().map((v0) -> {
            return v0.brokerId();
        }).collect(Collectors.toList());
        List list3 = (List) executionTask.proposal().newObservers().stream().map((v0) -> {
            return v0.brokerId();
        }).collect(Collectors.toList());
        if (!optional.isPresent()) {
            return Optional.of(new Executor.PartitionReplicas(list2, list3));
        }
        LOG.debug("Task {} is being reassigned already.", Long.valueOf(executionTask.executionId()));
        List<Integer> replicas = optional.get().replicas();
        List<Integer> observers = optional.get().observers();
        if (list2.equals(replicas) || list3.equals(observers)) {
            return Optional.empty();
        }
        throw new RuntimeException("The provided new replica list " + list2 + " is different from the in progress replica list " + replicas + " for " + executionTask.proposal().topicPartition());
    }

    private void logProgress(List<ExecutionTask> list) {
        int numFinishedInterBrokerPartitionMovements = this.executionTaskManager.numFinishedInterBrokerPartitionMovements();
        long finishedInterBrokerDataMovementInMB = this.executionTaskManager.finishedInterBrokerDataMovementInMB();
        Logger logger = LOG;
        Object[] objArr = new Object[8];
        objArr[0] = Integer.valueOf(list.size());
        objArr[1] = list.stream().map((v0) -> {
            return v0.executionId();
        }).collect(Collectors.toList());
        objArr[2] = Integer.valueOf(numFinishedInterBrokerPartitionMovements);
        objArr[3] = Integer.valueOf(this.numTotalPartitionMovements);
        objArr[4] = String.format(Locale.US, "%.2f", Double.valueOf((numFinishedInterBrokerPartitionMovements * 100.0d) / this.numTotalPartitionMovements));
        objArr[5] = Long.valueOf(finishedInterBrokerDataMovementInMB);
        objArr[6] = Long.valueOf(this.totalDataToMoveInMB);
        objArr[7] = this.totalDataToMoveInMB == 0 ? 100 : String.format(Locale.US, "%.2f", Double.valueOf((finishedInterBrokerDataMovementInMB * 100.0d) / this.totalDataToMoveInMB));
        logger.info("{} execution tasks just completed (IDs: {}). Total: {}/{} ({}%) inter-broker partition movements completed. {}MB/{}MB ({}%) of data has been moved.", objArr);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Remaining data to start moving into destination brokers: {}MB", this.executionTaskManager.remainingInterBrokerDataToMoveByDestinationBroker());
        }
    }
}
