package org.apache.kafka.streams.processor.internals.assignment;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.class */
public class HighAvailabilityTaskAssignor implements TaskAssignor {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HighAvailabilityTaskAssignor.class);

    @Override // org.apache.kafka.streams.processor.internals.assignment.TaskAssignor
    public boolean assign(Map<UUID, ClientState> map, Set<TaskId> set, Set<TaskId> set2, AssignorConfiguration.AssignmentConfigs assignmentConfigs) {
        TreeSet treeSet = new TreeSet(set2);
        TreeMap treeMap = new TreeMap(map);
        assignActiveStatefulTasks(treeMap, treeSet);
        assignStandbyReplicaTasks(treeMap, treeSet, assignmentConfigs.numStandbyReplicas);
        AtomicInteger atomicInteger = new AtomicInteger(assignmentConfigs.maxWarmupReplicas);
        Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(treeSet, treeMap, assignmentConfigs.acceptableRecoveryLag);
        TreeMap treeMap2 = new TreeMap();
        int assignActiveTaskMovements = TaskMovement.assignActiveTaskMovements(tasksToCaughtUpClients, treeMap, treeMap2, atomicInteger);
        int assignStandbyTaskMovements = TaskMovement.assignStandbyTaskMovements(tasksToCaughtUpClients, treeMap, atomicInteger, treeMap2);
        assignStatelessActiveTasks(treeMap, Utils.diff(TreeSet::new, set, treeSet));
        boolean z = assignActiveTaskMovements + assignStandbyTaskMovements > 0;
        log.info("Decided on assignment: " + treeMap + " with" + (z ? "" : " no") + " followup probing rebalance.");
        return z;
    }

    private static void assignActiveStatefulTasks(SortedMap<UUID, ClientState> sortedMap, SortedSet<TaskId> sortedSet) {
        Iterator<ClientState> it = null;
        for (TaskId taskId : sortedSet) {
            if (it == null || !it.hasNext()) {
                it = sortedMap.values().iterator();
            }
            it.next().assignActive(taskId);
        }
        balanceTasksOverThreads(sortedMap, (v0) -> {
            return v0.activeTasks();
        }, (v0, v1) -> {
            v0.unassignActive(v1);
        }, (v0, v1) -> {
            v0.assignActive(v1);
        });
    }

    private static void assignStandbyReplicaTasks(TreeMap<UUID, ClientState> treeMap, Set<TaskId> set, int i) {
        UUID poll;
        Map map = (Map) set.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return Integer.valueOf(i);
        }));
        ConstrainedPrioritySet constrainedPrioritySet = new ConstrainedPrioritySet((uuid, taskId3) -> {
            return Boolean.valueOf(!((ClientState) treeMap.get(uuid)).hasAssignedTask(taskId3));
        }, uuid2 -> {
            return Double.valueOf(((ClientState) treeMap.get(uuid2)).assignedTaskLoad());
        });
        constrainedPrioritySet.offerAll(treeMap.keySet());
        for (TaskId taskId4 : set) {
            int intValue = ((Integer) map.get(taskId4)).intValue();
            while (intValue > 0 && (poll = constrainedPrioritySet.poll(taskId4)) != null) {
                treeMap.get(poll).assignStandby(taskId4);
                intValue--;
                constrainedPrioritySet.offer(poll);
            }
            if (intValue > 0) {
                log.warn("Unable to assign {} of {} standby tasks for task [{}]. There is not enough available capacity. You should increase the number of application instances to maintain the requested number of standby replicas.", Integer.valueOf(intValue), Integer.valueOf(i), taskId4);
            }
        }
        balanceTasksOverThreads(treeMap, (v0) -> {
            return v0.standbyTasks();
        }, (v0, v1) -> {
            v0.unassignStandby(v1);
        }, (v0, v1) -> {
            v0.assignStandby(v1);
        });
    }

    private static void balanceTasksOverThreads(SortedMap<UUID, ClientState> sortedMap, Function<ClientState, Set<TaskId>> function, BiConsumer<ClientState, TaskId> biConsumer, BiConsumer<ClientState, TaskId> biConsumer2) {
        boolean z = true;
        while (z) {
            z = false;
            for (Map.Entry<UUID, ClientState> entry : sortedMap.entrySet()) {
                UUID key = entry.getKey();
                ClientState value = entry.getValue();
                for (Map.Entry<UUID, ClientState> entry2 : sortedMap.entrySet()) {
                    UUID key2 = entry2.getKey();
                    ClientState value2 = entry2.getValue();
                    if (!key.equals(key2)) {
                        Iterator it = new TreeSet(function.apply(value)).iterator();
                        while (shouldMoveATask(value, value2) && it.hasNext()) {
                            TaskId taskId = (TaskId) it.next();
                            if (!value2.hasAssignedTask(taskId)) {
                                biConsumer.accept(value, taskId);
                                biConsumer2.accept(value2, taskId);
                                z = true;
                            }
                        }
                    }
                }
            }
        }
    }

    private static boolean shouldMoveATask(ClientState clientState, ClientState clientState2) {
        double assignedTaskLoad = clientState.assignedTaskLoad() - clientState2.assignedTaskLoad();
        if (assignedTaskLoad <= ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT) {
            return false;
        }
        double assignedTaskCount = ((clientState.assignedTaskCount() - 1.0d) / clientState.capacity()) - ((clientState2.assignedTaskCount() + 1.0d) / clientState2.capacity());
        return assignedTaskCount >= ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT && assignedTaskCount < assignedTaskLoad;
    }

    private static void assignStatelessActiveTasks(TreeMap<UUID, ClientState> treeMap, Iterable<TaskId> iterable) {
        ConstrainedPrioritySet constrainedPrioritySet = new ConstrainedPrioritySet((uuid, taskId) -> {
            return true;
        }, uuid2 -> {
            return Double.valueOf(((ClientState) treeMap.get(uuid2)).activeTaskLoad());
        });
        constrainedPrioritySet.offerAll(treeMap.keySet());
        for (TaskId taskId2 : iterable) {
            UUID poll = constrainedPrioritySet.poll(taskId2);
            treeMap.get(poll).assignActive(taskId2);
            constrainedPrioritySet.offer(poll);
        }
    }

    private static Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients(Set<TaskId> set, Map<UUID, ClientState> map, long j) {
        HashMap hashMap = new HashMap();
        for (TaskId taskId : set) {
            TreeSet treeSet = new TreeSet();
            for (Map.Entry<UUID, ClientState> entry : map.entrySet()) {
                UUID key = entry.getKey();
                long lagFor = entry.getValue().lagFor(taskId);
                if (activeRunning(lagFor) || unbounded(j) || acceptable(j, lagFor)) {
                    treeSet.add(key);
                }
            }
            hashMap.put(taskId, treeSet);
        }
        return hashMap;
    }

    private static boolean unbounded(long j) {
        return j == Long.MAX_VALUE;
    }

    private static boolean acceptable(long j, long j2) {
        return j2 >= 0 && j2 <= j;
    }

    private static boolean activeRunning(long j) {
        return j == -2;
    }
}
