package org.apache.kafka.coordinator.group;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/coordinator/group/GroupMetadataManager.class */
public class GroupMetadataManager {
    private final LogContext logContext;
    private final Logger log;
    private final SnapshotRegistry snapshotRegistry;
    private final Time time;
    private final CoordinatorTimer<Void, Record> timer;
    private final Map<String, PartitionAssignor> assignors;
    private final PartitionAssignor defaultAssignor;
    private final TimelineHashMap<String, Group> groups;
    private final TimelineHashMap<String, TimelineHashSet<String>> groupsByTopics;
    private final int consumerGroupMaxSize;
    private final int consumerGroupHeartbeatIntervalMs;
    private final int consumerGroupSessionTimeoutMs;
    private final int consumerGroupMetadataRefreshIntervalMs;
    private MetadataImage metadataImage;
    static final CoordinatorResult<Void, Record> EMPTY_RESULT = new CoordinatorResult<>(Collections.emptyList(), CompletableFuture.completedFuture(null));
    private final int genericGroupMaxSize;
    private final int genericGroupInitialRebalanceDelayMs;
    private final int genericGroupNewMemberJoinTimeoutMs;
    private final int genericGroupMinSessionTimeoutMs;
    private final int genericGroupMaxSessionTimeoutMs;

    /* loaded from: input_file:org/apache/kafka/coordinator/group/GroupMetadataManager$Builder.class */
    public static class Builder {
        private LogContext logContext = null;
        private SnapshotRegistry snapshotRegistry = null;
        private Time time = null;
        private CoordinatorTimer<Void, Record> timer = null;
        private List<PartitionAssignor> consumerGroupAssignors = null;
        private int consumerGroupMaxSize = Integer.MAX_VALUE;
        private int consumerGroupHeartbeatIntervalMs = 5000;
        private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
        private MetadataImage metadataImage = null;
        private int consumerGroupSessionTimeoutMs = 45000;
        private int genericGroupMaxSize = Integer.MAX_VALUE;
        private int genericGroupInitialRebalanceDelayMs = ZooKeeperServer.DEFAULT_TICK_TIME;
        private int genericGroupNewMemberJoinTimeoutMs = GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS;
        private int genericGroupMinSessionTimeoutMs;
        private int genericGroupMaxSessionTimeoutMs;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withTime(Time time) {
            this.time = time;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withTimer(CoordinatorTimer<Void, Record> coordinatorTimer) {
            this.timer = coordinatorTimer;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withConsumerGroupAssignors(List<PartitionAssignor> list) {
            this.consumerGroupAssignors = list;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withConsumerGroupMaxSize(int i) {
            this.consumerGroupMaxSize = i;
            return this;
        }

        Builder withConsumerGroupSessionTimeout(int i) {
            this.consumerGroupSessionTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withConsumerGroupHeartbeatInterval(int i) {
            this.consumerGroupHeartbeatIntervalMs = i;
            return this;
        }

        Builder withConsumerGroupMetadataRefreshIntervalMs(int i) {
            this.consumerGroupMetadataRefreshIntervalMs = i;
            return this;
        }

        Builder withMetadataImage(MetadataImage metadataImage) {
            this.metadataImage = metadataImage;
            return this;
        }

        Builder withGenericGroupMaxSize(int i) {
            this.genericGroupMaxSize = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withGenericGroupInitialRebalanceDelayMs(int i) {
            this.genericGroupInitialRebalanceDelayMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withGenericGroupNewMemberJoinTimeoutMs(int i) {
            this.genericGroupNewMemberJoinTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withGenericGroupMinSessionTimeoutMs(int i) {
            this.genericGroupMinSessionTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withGenericGroupMaxSessionTimeoutMs(int i) {
            this.genericGroupMaxSessionTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public GroupMetadataManager build() {
            if (this.logContext == null) {
                this.logContext = new LogContext();
            }
            if (this.snapshotRegistry == null) {
                this.snapshotRegistry = new SnapshotRegistry(this.logContext);
            }
            if (this.metadataImage == null) {
                this.metadataImage = MetadataImage.EMPTY;
            }
            if (this.time == null) {
                this.time = Time.SYSTEM;
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            if (this.consumerGroupAssignors == null || this.consumerGroupAssignors.isEmpty()) {
                throw new IllegalArgumentException("Assignors must be set before building.");
            }
            return new GroupMetadataManager(this.snapshotRegistry, this.logContext, this.time, this.timer, this.consumerGroupAssignors, this.metadataImage, this.consumerGroupMaxSize, this.consumerGroupSessionTimeoutMs, this.consumerGroupHeartbeatIntervalMs, this.consumerGroupMetadataRefreshIntervalMs, this.genericGroupMaxSize, this.genericGroupInitialRebalanceDelayMs, this.genericGroupNewMemberJoinTimeoutMs, this.genericGroupMinSessionTimeoutMs, this.genericGroupMaxSessionTimeoutMs);
        }
    }

    private GroupMetadataManager(SnapshotRegistry snapshotRegistry, LogContext logContext, Time time, CoordinatorTimer<Void, Record> coordinatorTimer, List<PartitionAssignor> list, MetadataImage metadataImage, int i, int i2, int i3, int i4, int i5, int i6, int i7, int i8, int i9) {
        this.logContext = logContext;
        this.log = logContext.logger(GroupMetadataManager.class);
        this.snapshotRegistry = snapshotRegistry;
        this.time = time;
        this.timer = coordinatorTimer;
        this.metadataImage = metadataImage;
        this.assignors = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, Function.identity()));
        this.defaultAssignor = list.get(0);
        this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
        this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
        this.consumerGroupMaxSize = i;
        this.consumerGroupSessionTimeoutMs = i2;
        this.consumerGroupHeartbeatIntervalMs = i3;
        this.consumerGroupMetadataRefreshIntervalMs = i4;
        this.genericGroupMaxSize = i5;
        this.genericGroupInitialRebalanceDelayMs = i6;
        this.genericGroupNewMemberJoinTimeoutMs = i7;
        this.genericGroupMinSessionTimeoutMs = i8;
        this.genericGroupMaxSessionTimeoutMs = i9;
    }

    public MetadataImage image() {
        return this.metadataImage;
    }

    public Group group(String str) throws GroupIdNotFoundException {
        Group group = this.groups.get(str);
        if (group == null) {
            throw new GroupIdNotFoundException(String.format("Group %s not found.", str));
        }
        return group;
    }

    ConsumerGroup getOrMaybeCreateConsumerGroup(String str, boolean z) throws GroupIdNotFoundException {
        Group group = this.groups.get(str);
        if (group == null && !z) {
            throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", str));
        }
        if (group == null) {
            ConsumerGroup consumerGroup = new ConsumerGroup(this.snapshotRegistry, str);
            this.groups.put(str, consumerGroup);
            return consumerGroup;
        }
        if (group.type() == Group.GroupType.CONSUMER) {
            return (ConsumerGroup) group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", str));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GenericGroup getOrMaybeCreateGenericGroup(String str, boolean z) throws UnknownMemberIdException, GroupIdNotFoundException {
        Group group = this.groups.get(str);
        if (group == null && !z) {
            throw new UnknownMemberIdException(String.format("Generic group %s not found.", str));
        }
        if (group == null) {
            GenericGroup genericGroup = new GenericGroup(this.logContext, str, GenericGroupState.EMPTY, this.time);
            this.groups.put(str, genericGroup);
            return genericGroup;
        }
        if (group.type() == Group.GroupType.GENERIC) {
            return (GenericGroup) group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a generic group.", str));
    }

    private void removeGroup(String str) {
        this.groups.remove(str);
    }

    private void throwIfEmptyString(String str, String str2) throws InvalidRequestException {
        if (str != null && str.isEmpty()) {
            throw new InvalidRequestException(str2);
        }
    }

    private void throwIfNotNull(Object obj, String str) throws InvalidRequestException {
        if (obj != null) {
            throw new InvalidRequestException(str);
        }
    }

    private void throwIfConsumerGroupHeartbeatRequestIsInvalid(ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData) throws InvalidRequestException, UnsupportedAssignorException {
        throwIfEmptyString(consumerGroupHeartbeatRequestData.groupId(), "GroupId can't be empty.");
        throwIfEmptyString(consumerGroupHeartbeatRequestData.instanceId(), "InstanceId can't be empty.");
        throwIfEmptyString(consumerGroupHeartbeatRequestData.rackId(), "RackId can't be empty.");
        throwIfNotNull(consumerGroupHeartbeatRequestData.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet.");
        throwIfNotNull(consumerGroupHeartbeatRequestData.clientAssignors(), "Client side assignors are not supported yet.");
        if (consumerGroupHeartbeatRequestData.memberEpoch() > 0 || consumerGroupHeartbeatRequestData.memberEpoch() == -1) {
            throwIfEmptyString(consumerGroupHeartbeatRequestData.memberId(), "MemberId can't be empty.");
        } else {
            if (consumerGroupHeartbeatRequestData.memberEpoch() != 0) {
                throw new InvalidRequestException("MemberEpoch is invalid.");
            }
            if (consumerGroupHeartbeatRequestData.rebalanceTimeoutMs() == -1) {
                throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
            }
            if (consumerGroupHeartbeatRequestData.topicPartitions() == null || !consumerGroupHeartbeatRequestData.topicPartitions().isEmpty()) {
                throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining.");
            }
            if (consumerGroupHeartbeatRequestData.subscribedTopicNames() == null || consumerGroupHeartbeatRequestData.subscribedTopicNames().isEmpty()) {
                throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
            }
        }
        if (consumerGroupHeartbeatRequestData.serverAssignor() != null && !this.assignors.containsKey(consumerGroupHeartbeatRequestData.serverAssignor())) {
            throw new UnsupportedAssignorException("ServerAssignor " + consumerGroupHeartbeatRequestData.serverAssignor() + " is not supported. Supported assignors: " + String.join(", ", this.assignors.keySet()) + ".");
        }
    }

    private boolean isSubset(List<ConsumerGroupHeartbeatRequestData.TopicPartitions> list, Map<Uuid, Set<Integer>> map) {
        if (list == null) {
            return false;
        }
        for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : list) {
            Set<Integer> set = map.get(topicPartitions.topicId());
            if (set == null) {
                return false;
            }
            Iterator<Integer> it = topicPartitions.partitions().iterator();
            while (it.hasNext()) {
                if (!set.contains(it.next())) {
                    return false;
                }
            }
        }
        return true;
    }

    private void throwIfConsumerGroupIsFull(ConsumerGroup consumerGroup, String str) throws GroupMaxSizeReachedException {
        if (consumerGroup.numMembers() >= this.consumerGroupMaxSize) {
            if (str.isEmpty() || !consumerGroup.hasMember(str)) {
                throw new GroupMaxSizeReachedException("The consumer group has reached its maximum capacity of " + this.consumerGroupMaxSize + " members.");
            }
        }
    }

    private void throwIfMemberEpochIsInvalid(ConsumerGroupMember consumerGroupMember, int i, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> list) {
        if (i > consumerGroupMember.memberEpoch()) {
            throw new FencedMemberEpochException("The consumer group member has a greater member epoch (" + i + ") than the one known by the group coordinator (" + consumerGroupMember.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
        }
        if (i < consumerGroupMember.memberEpoch()) {
            if (i != consumerGroupMember.previousMemberEpoch() || !isSubset(list, consumerGroupMember.assignedPartitions())) {
                throw new FencedMemberEpochException("The consumer group member has a smaller member epoch (" + i + ") than the one known by the group coordinator (" + consumerGroupMember.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
            }
        }
    }

    private ConsumerGroupHeartbeatResponseData.Assignment createResponseAssignment(ConsumerGroupMember consumerGroupMember) {
        ConsumerGroupHeartbeatResponseData.Assignment assignedTopicPartitions = new ConsumerGroupHeartbeatResponseData.Assignment().setAssignedTopicPartitions(fromAssignmentMap(consumerGroupMember.assignedPartitions()));
        if (consumerGroupMember.state() == ConsumerGroupMember.MemberState.ASSIGNING) {
            assignedTopicPartitions.setPendingTopicPartitions(fromAssignmentMap(consumerGroupMember.partitionsPendingAssignment()));
        }
        return assignedTopicPartitions;
    }

    private List<ConsumerGroupHeartbeatResponseData.TopicPartitions> fromAssignmentMap(Map<Uuid, Set<Integer>> map) {
        return (List) map.entrySet().stream().map(entry -> {
            return new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId((Uuid) entry.getKey()).setPartitions(new ArrayList((Collection) entry.getValue()));
        }).collect(Collectors.toList());
    }

    private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupHeartbeat(String str, String str2, int i, String str3, String str4, int i2, String str5, String str6, List<String> list, String str7, String str8, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> list2) throws ApiException {
        long milliseconds = this.time.milliseconds();
        ArrayList arrayList = new ArrayList();
        boolean z = i == 0;
        ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(str, z);
        throwIfConsumerGroupIsFull(orMaybeCreateConsumerGroup, str2);
        if (str2.isEmpty()) {
            str2 = Uuid.randomUuid().toString();
        }
        ConsumerGroupMember orMaybeCreateMember = orMaybeCreateConsumerGroup.getOrMaybeCreateMember(str2, z);
        throwIfMemberEpochIsInvalid(orMaybeCreateMember, i, list2);
        if (i == 0) {
            this.log.info("[GroupId {}] Member {} joins the consumer group.", str, str2);
        }
        int groupEpoch = orMaybeCreateConsumerGroup.groupEpoch();
        Map<String, TopicMetadata> subscriptionMetadata = orMaybeCreateConsumerGroup.subscriptionMetadata();
        ConsumerGroupMember build = new ConsumerGroupMember.Builder(orMaybeCreateMember).maybeUpdateInstanceId(Optional.ofNullable(str3)).maybeUpdateRackId(Optional.ofNullable(str4)).maybeUpdateRebalanceTimeoutMs(Utils.ofSentinel(i2)).maybeUpdateServerAssignorName(Optional.ofNullable(str8)).maybeUpdateSubscribedTopicNames(Optional.ofNullable(list)).maybeUpdateSubscribedTopicRegex(Optional.ofNullable(str7)).setClientId(str5).setClientHost(str6).build();
        boolean z2 = false;
        if (!build.equals(orMaybeCreateMember)) {
            arrayList.add(RecordHelpers.newMemberSubscriptionRecord(str, build));
            if (!build.subscribedTopicNames().equals(orMaybeCreateMember.subscribedTopicNames())) {
                this.log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", str, str2, build.subscribedTopicNames());
                z2 = true;
            }
            if (!build.subscribedTopicRegex().equals(orMaybeCreateMember.subscribedTopicRegex())) {
                this.log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.", str, str2, build.subscribedTopicRegex());
                z2 = true;
            }
        }
        if (z2 || orMaybeCreateConsumerGroup.hasMetadataExpired(milliseconds)) {
            subscriptionMetadata = orMaybeCreateConsumerGroup.computeSubscriptionMetadata(orMaybeCreateMember, build, this.metadataImage.topics(), this.metadataImage.cluster());
            if (!subscriptionMetadata.equals(orMaybeCreateConsumerGroup.subscriptionMetadata())) {
                this.log.info("[GroupId {}] Computed new subscription metadata: {}.", str, subscriptionMetadata);
                z2 = true;
                arrayList.add(RecordHelpers.newGroupSubscriptionMetadataRecord(str, subscriptionMetadata));
            }
            if (z2) {
                groupEpoch++;
                arrayList.add(RecordHelpers.newGroupEpochRecord(str, groupEpoch));
                this.log.info("[GroupId {}] Bumped group epoch to {}.", str, Integer.valueOf(groupEpoch));
            }
            orMaybeCreateConsumerGroup.setMetadataRefreshDeadline(milliseconds + this.consumerGroupMetadataRefreshIntervalMs, groupEpoch);
        }
        int assignmentEpoch = orMaybeCreateConsumerGroup.assignmentEpoch();
        Assignment targetAssignment = orMaybeCreateConsumerGroup.targetAssignment(str2);
        if (groupEpoch > assignmentEpoch) {
            try {
                TargetAssignmentBuilder.TargetAssignmentResult build2 = new TargetAssignmentBuilder(str, groupEpoch, this.assignors.get(orMaybeCreateConsumerGroup.computePreferredServerAssignor(orMaybeCreateMember, build).orElse(this.defaultAssignor.name()))).withMembers(orMaybeCreateConsumerGroup.members()).withSubscriptionMetadata(subscriptionMetadata).withTargetAssignment(orMaybeCreateConsumerGroup.targetAssignment()).addOrUpdateMember(str2, build).build();
                this.log.info("[GroupId {}] Computed a new target assignment for epoch {}: {}.", str, Integer.valueOf(groupEpoch), build2.targetAssignment());
                arrayList.addAll(build2.records());
                targetAssignment = build2.targetAssignment().get(str2);
                assignmentEpoch = groupEpoch;
            } catch (PartitionAssignorException e) {
                String format = String.format("Failed to compute a new target assignment for epoch %d: %s", Integer.valueOf(groupEpoch), e.getMessage());
                this.log.error("[GroupId {}] {}.", str, format);
                throw new UnknownServerException(format, e);
            }
        }
        boolean z3 = false;
        if (build.state() != ConsumerGroupMember.MemberState.STABLE || build.targetMemberEpoch() != assignmentEpoch) {
            CurrentAssignmentBuilder withTargetAssignment = new CurrentAssignmentBuilder(build).withTargetAssignment(assignmentEpoch, targetAssignment);
            orMaybeCreateConsumerGroup.getClass();
            build = withTargetAssignment.withCurrentPartitionEpoch((v1, v2) -> {
                return r1.currentPartitionEpoch(v1, v2);
            }).withOwnedTopicPartitions(list2).build();
            if (build != build) {
                z3 = true;
                arrayList.add(RecordHelpers.newCurrentAssignmentRecord(str, build));
                this.log.info("[GroupId {}] Member {} transitioned from {} to {}.", str, str2, orMaybeCreateMember.currentAssignmentSummary(), build.currentAssignmentSummary());
                if (build.state() == ConsumerGroupMember.MemberState.REVOKING) {
                    scheduleConsumerGroupRevocationTimeout(str, str2, build.rebalanceTimeoutMs(), build.memberEpoch());
                } else {
                    cancelConsumerGroupRevocationTimeout(str, str2);
                }
            }
        }
        scheduleConsumerGroupSessionTimeout(str, str2);
        ConsumerGroupHeartbeatResponseData heartbeatIntervalMs = new ConsumerGroupHeartbeatResponseData().setMemberId(build.memberId()).setMemberEpoch(build.memberEpoch()).setHeartbeatIntervalMs(this.consumerGroupHeartbeatIntervalMs);
        if (list2 != null || i == 0 || z3) {
            heartbeatIntervalMs.setAssignment(createResponseAssignment(build));
        }
        return new CoordinatorResult<>(arrayList, heartbeatIntervalMs);
    }

    private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave(String str, String str2) throws ApiException {
        ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(str, false);
        ConsumerGroupMember orMaybeCreateMember = orMaybeCreateConsumerGroup.getOrMaybeCreateMember(str2, false);
        this.log.info("[GroupId " + str + "] Member " + str2 + " left the consumer group.");
        return new CoordinatorResult<>(consumerGroupFenceMember(orMaybeCreateConsumerGroup, orMaybeCreateMember), new ConsumerGroupHeartbeatResponseData().setMemberId(str2).setMemberEpoch(-1));
    }

    private List<Record> consumerGroupFenceMember(ConsumerGroup consumerGroup, ConsumerGroupMember consumerGroupMember) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(RecordHelpers.newCurrentAssignmentTombstoneRecord(consumerGroup.groupId(), consumerGroupMember.memberId()));
        arrayList.add(RecordHelpers.newTargetAssignmentTombstoneRecord(consumerGroup.groupId(), consumerGroupMember.memberId()));
        arrayList.add(RecordHelpers.newMemberSubscriptionTombstoneRecord(consumerGroup.groupId(), consumerGroupMember.memberId()));
        Map<String, TopicMetadata> computeSubscriptionMetadata = consumerGroup.computeSubscriptionMetadata(consumerGroupMember, null, this.metadataImage.topics(), this.metadataImage.cluster());
        if (!computeSubscriptionMetadata.equals(consumerGroup.subscriptionMetadata())) {
            this.log.info("[GroupId {}] Computed new subscription metadata: {}.", consumerGroup.groupId(), computeSubscriptionMetadata);
            arrayList.add(RecordHelpers.newGroupSubscriptionMetadataRecord(consumerGroup.groupId(), computeSubscriptionMetadata));
        }
        arrayList.add(RecordHelpers.newGroupEpochRecord(consumerGroup.groupId(), consumerGroup.groupEpoch() + 1));
        cancelConsumerGroupSessionTimeout(consumerGroup.groupId(), consumerGroupMember.memberId());
        cancelConsumerGroupRevocationTimeout(consumerGroup.groupId(), consumerGroupMember.memberId());
        return arrayList;
    }

    private void scheduleConsumerGroupSessionTimeout(String str, String str2) {
        this.timer.schedule(consumerGroupSessionTimeoutKey(str, str2), this.consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
            try {
                ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(str, false);
                ConsumerGroupMember orMaybeCreateMember = orMaybeCreateConsumerGroup.getOrMaybeCreateMember(str2, false);
                this.log.info("[GroupId {}] Member {} fenced from the group because its session expired.", str, str2);
                return new CoordinatorResult(consumerGroupFenceMember(orMaybeCreateConsumerGroup, orMaybeCreateMember));
            } catch (GroupIdNotFoundException e) {
                this.log.debug("[GroupId {}] Could not fence {} because the group does not exist.", str, str2);
                return new CoordinatorResult(Collections.emptyList());
            } catch (UnknownMemberIdException e2) {
                this.log.debug("[GroupId {}] Could not fence {} because the member does not exist.", str, str2);
                return new CoordinatorResult(Collections.emptyList());
            }
        });
    }

    private void cancelConsumerGroupSessionTimeout(String str, String str2) {
        this.timer.cancel(consumerGroupSessionTimeoutKey(str, str2));
    }

    private void scheduleConsumerGroupRevocationTimeout(String str, String str2, long j, int i) {
        this.timer.schedule(consumerGroupRevocationTimeoutKey(str, str2), j, TimeUnit.MILLISECONDS, true, () -> {
            try {
                ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(str, false);
                ConsumerGroupMember orMaybeCreateMember = orMaybeCreateConsumerGroup.getOrMaybeCreateMember(str2, false);
                if (orMaybeCreateMember.state() == ConsumerGroupMember.MemberState.REVOKING && orMaybeCreateMember.memberEpoch() == i) {
                    this.log.info("[GroupId {}] Member {} fenced from the group because it failed to revoke partitions within {}ms.", str, str2, Long.valueOf(j));
                    return new CoordinatorResult(consumerGroupFenceMember(orMaybeCreateConsumerGroup, orMaybeCreateMember));
                }
                this.log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member state does not match the expected state.", str, str2);
                return new CoordinatorResult(Collections.emptyList());
            } catch (GroupIdNotFoundException e) {
                this.log.debug("[GroupId {}] Could not fence {}} because the group does not exist.", str, str2);
                return new CoordinatorResult(Collections.emptyList());
            } catch (UnknownMemberIdException e2) {
                this.log.debug("[GroupId {}] Could not fence {} because the member does not exist.", str, str2);
                return new CoordinatorResult(Collections.emptyList());
            }
        });
    }

    private void cancelConsumerGroupRevocationTimeout(String str, String str2) {
        this.timer.cancel(consumerGroupRevocationTimeoutKey(str, str2));
    }

    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupHeartbeat(RequestContext requestContext, ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData) throws ApiException {
        throwIfConsumerGroupHeartbeatRequestIsInvalid(consumerGroupHeartbeatRequestData);
        return consumerGroupHeartbeatRequestData.memberEpoch() == -1 ? consumerGroupLeave(consumerGroupHeartbeatRequestData.groupId(), consumerGroupHeartbeatRequestData.memberId()) : consumerGroupHeartbeat(consumerGroupHeartbeatRequestData.groupId(), consumerGroupHeartbeatRequestData.memberId(), consumerGroupHeartbeatRequestData.memberEpoch(), consumerGroupHeartbeatRequestData.instanceId(), consumerGroupHeartbeatRequestData.rackId(), consumerGroupHeartbeatRequestData.rebalanceTimeoutMs(), requestContext.clientId(), requestContext.clientAddress.toString(), consumerGroupHeartbeatRequestData.subscribedTopicNames(), consumerGroupHeartbeatRequestData.subscribedTopicRegex(), consumerGroupHeartbeatRequestData.serverAssignor(), consumerGroupHeartbeatRequestData.topicPartitions());
    }

    public void replay(ConsumerGroupMemberMetadataKey consumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataValue consumerGroupMemberMetadataValue) {
        String groupId = consumerGroupMemberMetadataKey.groupId();
        String memberId = consumerGroupMemberMetadataKey.memberId();
        ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(groupId, consumerGroupMemberMetadataValue != null);
        HashSet hashSet = new HashSet(orMaybeCreateConsumerGroup.subscribedTopicNames());
        if (consumerGroupMemberMetadataValue != null) {
            orMaybeCreateConsumerGroup.updateMember(new ConsumerGroupMember.Builder(orMaybeCreateConsumerGroup.getOrMaybeCreateMember(memberId, true)).updateWith(consumerGroupMemberMetadataValue).build());
        } else {
            if (orMaybeCreateConsumerGroup.getOrMaybeCreateMember(memberId, false).memberEpoch() != -1) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but did not receive ConsumerGroupCurrentMemberAssignmentValue tombstone.");
            }
            if (orMaybeCreateConsumerGroup.targetAssignment().containsKey(memberId)) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
            }
            orMaybeCreateConsumerGroup.removeMember(memberId);
        }
        updateGroupsByTopics(groupId, hashSet, orMaybeCreateConsumerGroup.subscribedTopicNames());
    }

    public Set<String> groupsSubscribedToTopic(String str) {
        TimelineHashSet<String> timelineHashSet = this.groupsByTopics.get(str);
        return timelineHashSet != null ? timelineHashSet : Collections.emptySet();
    }

    private void subscribeGroupToTopic(String str, String str2) {
        this.groupsByTopics.computeIfAbsent(str2, str3 -> {
            return new TimelineHashSet(this.snapshotRegistry, 1);
        }).add(str);
    }

    private void unsubscribeGroupFromTopic(String str, String str2) {
        this.groupsByTopics.computeIfPresent(str2, (str3, timelineHashSet) -> {
            timelineHashSet.remove(str);
            if (timelineHashSet.isEmpty()) {
                return null;
            }
            return timelineHashSet;
        });
    }

    private void updateGroupsByTopics(String str, Set<String> set, Set<String> set2) {
        if (set.isEmpty()) {
            set2.forEach(str2 -> {
                subscribeGroupToTopic(str, str2);
            });
        } else if (set2.isEmpty()) {
            set.forEach(str3 -> {
                unsubscribeGroupFromTopic(str, str3);
            });
        } else {
            set.forEach(str4 -> {
                if (set2.contains(str4)) {
                    return;
                }
                unsubscribeGroupFromTopic(str, str4);
            });
            set2.forEach(str5 -> {
                if (set.contains(str5)) {
                    return;
                }
                subscribeGroupToTopic(str, str5);
            });
        }
    }

    public void replay(ConsumerGroupMetadataKey consumerGroupMetadataKey, ConsumerGroupMetadataValue consumerGroupMetadataValue) {
        String groupId = consumerGroupMetadataKey.groupId();
        if (consumerGroupMetadataValue != null) {
            getOrMaybeCreateConsumerGroup(groupId, true).setGroupEpoch(consumerGroupMetadataValue.epoch());
            return;
        }
        ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
        if (!orMaybeCreateConsumerGroup.members().isEmpty()) {
            throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + orMaybeCreateConsumerGroup.members().size() + " members.");
        }
        if (!orMaybeCreateConsumerGroup.targetAssignment().isEmpty()) {
            throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the target assignment still has " + orMaybeCreateConsumerGroup.targetAssignment().size() + " members.");
        }
        if (orMaybeCreateConsumerGroup.assignmentEpoch() != -1) {
            throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
        }
        removeGroup(groupId);
    }

    public void replay(ConsumerGroupPartitionMetadataKey consumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataValue consumerGroupPartitionMetadataValue) {
        ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(consumerGroupPartitionMetadataKey.groupId(), false);
        if (consumerGroupPartitionMetadataValue == null) {
            orMaybeCreateConsumerGroup.setSubscriptionMetadata(Collections.emptyMap());
            return;
        }
        HashMap hashMap = new HashMap();
        consumerGroupPartitionMetadataValue.topics().forEach(topicMetadata -> {
            hashMap.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata));
        });
        orMaybeCreateConsumerGroup.setSubscriptionMetadata(hashMap);
    }

    public void replay(ConsumerGroupTargetAssignmentMemberKey consumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberValue consumerGroupTargetAssignmentMemberValue) {
        String groupId = consumerGroupTargetAssignmentMemberKey.groupId();
        String memberId = consumerGroupTargetAssignmentMemberKey.memberId();
        ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
        if (consumerGroupTargetAssignmentMemberValue != null) {
            orMaybeCreateConsumerGroup.updateTargetAssignment(memberId, Assignment.fromRecord(consumerGroupTargetAssignmentMemberValue));
        } else {
            orMaybeCreateConsumerGroup.removeTargetAssignment(memberId);
        }
    }

    public void replay(ConsumerGroupTargetAssignmentMetadataKey consumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataValue consumerGroupTargetAssignmentMetadataValue) {
        String groupId = consumerGroupTargetAssignmentMetadataKey.groupId();
        ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
        if (consumerGroupTargetAssignmentMetadataValue != null) {
            orMaybeCreateConsumerGroup.setTargetAssignmentEpoch(consumerGroupTargetAssignmentMetadataValue.assignmentEpoch());
        } else {
            if (!orMaybeCreateConsumerGroup.targetAssignment().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId + " but the assignment still has " + orMaybeCreateConsumerGroup.targetAssignment().size() + " members.");
            }
            orMaybeCreateConsumerGroup.setTargetAssignmentEpoch(-1);
        }
    }

    public void replay(ConsumerGroupCurrentMemberAssignmentKey consumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentValue consumerGroupCurrentMemberAssignmentValue) {
        String groupId = consumerGroupCurrentMemberAssignmentKey.groupId();
        String memberId = consumerGroupCurrentMemberAssignmentKey.memberId();
        ConsumerGroup orMaybeCreateConsumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
        ConsumerGroupMember orMaybeCreateMember = orMaybeCreateConsumerGroup.getOrMaybeCreateMember(memberId, false);
        if (consumerGroupCurrentMemberAssignmentValue != null) {
            orMaybeCreateConsumerGroup.updateMember(new ConsumerGroupMember.Builder(orMaybeCreateMember).updateWith(consumerGroupCurrentMemberAssignmentValue).build());
        } else {
            orMaybeCreateConsumerGroup.updateMember(new ConsumerGroupMember.Builder(orMaybeCreateMember).setMemberEpoch(-1).setPreviousMemberEpoch(-1).setTargetMemberEpoch(-1).setAssignedPartitions(Collections.emptyMap()).setPartitionsPendingRevocation(Collections.emptyMap()).setPartitionsPendingAssignment(Collections.emptyMap()).build());
        }
    }

    public void onNewMetadataImage(MetadataImage metadataImage, MetadataDelta metadataDelta) {
        this.metadataImage = metadataImage;
        Optional.ofNullable(metadataDelta.topicsDelta()).ifPresent(topicsDelta -> {
            HashSet hashSet = new HashSet();
            topicsDelta.changedTopics().forEach((uuid, topicDelta) -> {
                hashSet.addAll(groupsSubscribedToTopic(topicDelta.name()));
            });
            topicsDelta.deletedTopicIds().forEach(uuid2 -> {
                hashSet.addAll(groupsSubscribedToTopic(metadataDelta.image().topics().getTopic(uuid2).name()));
            });
            hashSet.forEach(str -> {
                Group group = this.groups.get(str);
                if (group == null || group.type() != Group.GroupType.CONSUMER) {
                    return;
                }
                ((ConsumerGroup) group).requestMetadataRefresh();
            });
        });
    }

    public void onLoaded() {
        this.groups.forEach((str, group) -> {
            switch (group.type()) {
                case CONSUMER:
                    ConsumerGroup consumerGroup = (ConsumerGroup) group;
                    this.log.info("Loaded consumer group {} with {} members.", str, Integer.valueOf(consumerGroup.members().size()));
                    consumerGroup.members().forEach((str, consumerGroupMember) -> {
                        this.log.debug("Loaded member {} in consumer group {}.", str, str);
                        scheduleConsumerGroupSessionTimeout(str, str);
                        if (consumerGroupMember.state() == ConsumerGroupMember.MemberState.REVOKING) {
                            scheduleConsumerGroupRevocationTimeout(str, str, consumerGroupMember.rebalanceTimeoutMs(), consumerGroupMember.memberEpoch());
                        }
                    });
                    return;
                case GENERIC:
                    GenericGroup genericGroup = (GenericGroup) group;
                    this.log.info("Loaded generic group {} with {} members.", str, Integer.valueOf(genericGroup.allMembers().size()));
                    genericGroup.allMembers().forEach(genericGroupMember -> {
                        this.log.debug("Loaded member {} in generic group {}.", genericGroupMember.memberId(), str);
                        rescheduleGenericGroupMemberHeartbeat(genericGroup, genericGroupMember);
                    });
                    if (genericGroup.size() > this.genericGroupMaxSize) {
                        prepareRebalance(genericGroup, "Freshly-loaded group " + str + " (size " + genericGroup.size() + ") is over capacity " + this.genericGroupMaxSize + ". Rebalancing in order to give a chance for consumers to commit offsets");
                        return;
                    }
                    return;
                default:
                    return;
            }
        });
    }

    public static String consumerGroupSessionTimeoutKey(String str, String str2) {
        return "session-timeout-" + str + "-" + str2;
    }

    public static String consumerGroupRevocationTimeoutKey(String str, String str2) {
        return "revocation-timeout-" + str + "-" + str2;
    }

    public void replay(GroupMetadataKey groupMetadataKey, GroupMetadataValue groupMetadataValue) {
        String group = groupMetadataKey.group();
        if (groupMetadataValue == null) {
            removeGroup(group);
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (GroupMetadataValue.MemberMetadata memberMetadata : groupMetadataValue.members()) {
            int sessionTimeout = memberMetadata.rebalanceTimeout() == -1 ? memberMetadata.sessionTimeout() : memberMetadata.rebalanceTimeout();
            JoinGroupRequestData.JoinGroupRequestProtocolCollection joinGroupRequestProtocolCollection = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
            joinGroupRequestProtocolCollection.add((JoinGroupRequestData.JoinGroupRequestProtocolCollection) new JoinGroupRequestData.JoinGroupRequestProtocol().setName(groupMetadataValue.protocol()).setMetadata(memberMetadata.subscription()));
            arrayList.add(new GenericGroupMember(memberMetadata.memberId(), Optional.ofNullable(memberMetadata.groupInstanceId()), memberMetadata.clientId(), memberMetadata.clientHost(), sessionTimeout, memberMetadata.sessionTimeout(), groupMetadataValue.protocolType(), joinGroupRequestProtocolCollection, memberMetadata.assignment()));
        }
        String protocolType = groupMetadataValue.protocolType();
        GenericGroup genericGroup = new GenericGroup(this.logContext, group, arrayList.isEmpty() ? GenericGroupState.EMPTY : GenericGroupState.STABLE, this.time, groupMetadataValue.generation(), (protocolType == null || protocolType.isEmpty()) ? Optional.empty() : Optional.of(protocolType), Optional.ofNullable(groupMetadataValue.protocol()), Optional.ofNullable(groupMetadataValue.leader()), groupMetadataValue.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(Long.valueOf(groupMetadataValue.currentStateTimestamp())));
        arrayList.forEach(genericGroupMember -> {
            genericGroup.add(genericGroupMember, null);
        });
        this.groups.put(group, genericGroup);
        genericGroup.setSubscribedTopics(genericGroup.computeSubscribedTopics());
    }

    public CoordinatorResult<Void, Record> genericGroupJoin(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, CompletableFuture<JoinGroupResponseData> completableFuture) {
        CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT;
        String groupId = joinGroupRequestData.groupId();
        String memberId = joinGroupRequestData.memberId();
        int sessionTimeoutMs = joinGroupRequestData.sessionTimeoutMs();
        if (sessionTimeoutMs < this.genericGroupMinSessionTimeoutMs || sessionTimeoutMs > this.genericGroupMaxSessionTimeoutMs) {
            completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()));
        } else {
            boolean equals = memberId.equals("");
            boolean z = !this.groups.containsKey(groupId);
            try {
                GenericGroup orMaybeCreateGenericGroup = getOrMaybeCreateGenericGroup(groupId, equals);
                if (acceptJoiningMember(orMaybeCreateGenericGroup, memberId)) {
                    coordinatorResult = equals ? genericGroupJoinNewMember(requestContext, joinGroupRequestData, orMaybeCreateGenericGroup, completableFuture) : genericGroupJoinExistingMember(requestContext, joinGroupRequestData, orMaybeCreateGenericGroup, completableFuture);
                } else {
                    orMaybeCreateGenericGroup.remove(memberId);
                    completableFuture.complete(new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code()));
                }
                if (z && coordinatorResult == EMPTY_RESULT) {
                    CompletableFuture completableFuture2 = new CompletableFuture();
                    completableFuture2.whenComplete((r8, th) -> {
                        if (th != null) {
                            this.log.warn("Failed to write empty metadata for group {}: {}", orMaybeCreateGenericGroup.groupId(), th.getMessage());
                            completableFuture.complete(new JoinGroupResponseData().setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(th)).code()));
                        }
                    });
                    return new CoordinatorResult<>(Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(orMaybeCreateGenericGroup, this.metadataImage.features().metadataVersion())), completableFuture2);
                }
            } catch (Throwable th2) {
                completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.forException(th2).code()));
                return EMPTY_RESULT;
            }
        }
        return coordinatorResult;
    }

    private CoordinatorResult<Void, Record> maybeCompleteJoinPhase(GenericGroup genericGroup) {
        return (genericGroup.isInState(GenericGroupState.PREPARING_REBALANCE) && genericGroup.hasAllMembersJoined() && genericGroup.previousState() != GenericGroupState.EMPTY) ? completeGenericGroupJoin(genericGroup) : EMPTY_RESULT;
    }

    private CoordinatorResult<Void, Record> genericGroupJoinNewMember(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, GenericGroup genericGroup, CompletableFuture<JoinGroupResponseData> completableFuture) {
        if (genericGroup.isInState(GenericGroupState.DEAD)) {
            completableFuture.complete(new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        } else {
            if (genericGroup.supportsProtocols(joinGroupRequestData.protocolType(), joinGroupRequestData.protocols())) {
                Optional<String> ofNullable = Optional.ofNullable(joinGroupRequestData.groupInstanceId());
                String generateMemberId = genericGroup.generateMemberId(requestContext.clientId(), ofNullable);
                return ofNullable.isPresent() ? genericGroupJoinNewStaticMember(requestContext, joinGroupRequestData, genericGroup, generateMemberId, completableFuture) : genericGroupJoinNewDynamicMember(requestContext, joinGroupRequestData, genericGroup, generateMemberId, completableFuture);
            }
            completableFuture.complete(new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code()));
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, Record> genericGroupJoinNewStaticMember(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, GenericGroup genericGroup, String str, CompletableFuture<JoinGroupResponseData> completableFuture) {
        String groupInstanceId = joinGroupRequestData.groupInstanceId();
        String staticMemberId = genericGroup.staticMemberId(groupInstanceId);
        if (staticMemberId != null) {
            this.log.info("Static member with groupInstanceId={} and unknown member id joins group {} in {} state. Replacing previously mapped member {} with this groupInstanceId.", groupInstanceId, genericGroup.groupId(), genericGroup.currentState(), staticMemberId);
            return updateStaticMemberThenRebalanceOrCompleteJoin(requestContext, joinGroupRequestData, genericGroup, staticMemberId, str, completableFuture);
        }
        this.log.info("Static member with groupInstanceId={} and unknown member id joins group {} in {} state. Created a new member id {} for this member and added to the group.", groupInstanceId, genericGroup.groupId(), genericGroup.currentState(), str);
        return addMemberThenRebalanceOrCompleteJoin(requestContext, joinGroupRequestData, genericGroup, str, completableFuture);
    }

    private CoordinatorResult<Void, Record> genericGroupJoinNewDynamicMember(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, GenericGroup genericGroup, String str, CompletableFuture<JoinGroupResponseData> completableFuture) {
        if (!JoinGroupRequest.requiresKnownMemberId(requestContext.apiVersion())) {
            this.log.info("Dynamic member with unknown member id joins group {} in state {}. Created a new member id {} and added the member to the group.", genericGroup.groupId(), genericGroup.currentState(), str);
            return addMemberThenRebalanceOrCompleteJoin(requestContext, joinGroupRequestData, genericGroup, str, completableFuture);
        }
        this.log.info("Dynamic member with unknown member id joins group {} in {} state. Created a new member id {} and requesting the member to rejoin with this id.", genericGroup.groupId(), genericGroup.currentState(), str);
        genericGroup.addPendingMember(str);
        this.timer.schedule(genericGroupHeartbeatKey(genericGroup.groupId(), str), joinGroupRequestData.sessionTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> {
            return expireGenericGroupMemberHeartbeat(genericGroup, str);
        });
        completableFuture.complete(new JoinGroupResponseData().setMemberId(str).setErrorCode(Errors.MEMBER_ID_REQUIRED.code()));
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, Record> genericGroupJoinExistingMember(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, GenericGroup genericGroup, CompletableFuture<JoinGroupResponseData> completableFuture) {
        String memberId = joinGroupRequestData.memberId();
        String groupInstanceId = joinGroupRequestData.groupInstanceId();
        if (genericGroup.isInState(GenericGroupState.DEAD)) {
            completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        } else if (!genericGroup.supportsProtocols(joinGroupRequestData.protocolType(), joinGroupRequestData.protocols())) {
            completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code()));
        } else {
            if (genericGroup.isPendingMember(memberId)) {
                if (groupInstanceId != null) {
                    throw new IllegalStateException("Received unexpected JoinGroup with groupInstanceId=" + groupInstanceId + " for pending member with memberId=" + memberId);
                }
                this.log.debug("Pending dynamic member with id {} joins group {} in {} state. Adding to the group now.", memberId, genericGroup.groupId(), genericGroup.currentState());
                return addMemberThenRebalanceOrCompleteJoin(requestContext, joinGroupRequestData, genericGroup, memberId, completableFuture);
            }
            try {
                genericGroup.validateMember(memberId, groupInstanceId, "join-group");
                GenericGroupMember member = genericGroup.member(memberId);
                if (genericGroup.isInState(GenericGroupState.PREPARING_REBALANCE)) {
                    return updateMemberThenRebalanceOrCompleteJoin(joinGroupRequestData, genericGroup, member, "Member " + member.memberId() + " is joining group during " + genericGroup.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData), completableFuture);
                }
                if (genericGroup.isInState(GenericGroupState.COMPLETING_REBALANCE)) {
                    if (!member.matches(joinGroupRequestData.protocols())) {
                        return updateMemberThenRebalanceOrCompleteJoin(joinGroupRequestData, genericGroup, member, "Updating metadata for member " + memberId + " during " + genericGroup.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData), completableFuture);
                    }
                    completableFuture.complete(new JoinGroupResponseData().setMembers(genericGroup.isLeader(memberId) ? genericGroup.currentGenericGroupMembers() : Collections.emptyList()).setMemberId(memberId).setGenerationId(genericGroup.generationId()).setProtocolName(genericGroup.protocolName().orElse(null)).setProtocolType(genericGroup.protocolType().orElse(null)).setLeader(genericGroup.leaderOrNull()).setSkipAssignment(false));
                } else if (!genericGroup.isInState(GenericGroupState.STABLE)) {
                    this.log.warn("Attempt to add rejoining member {} of group {} in unexpected group state {}", memberId, genericGroup.groupId(), genericGroup.stateAsString());
                    completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
                } else {
                    if (genericGroup.isLeader(memberId)) {
                        return updateMemberThenRebalanceOrCompleteJoin(joinGroupRequestData, genericGroup, member, "Leader " + memberId + " re-joining group during " + genericGroup.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData), completableFuture);
                    }
                    if (!member.matches(joinGroupRequestData.protocols())) {
                        return updateMemberThenRebalanceOrCompleteJoin(joinGroupRequestData, genericGroup, member, "Updating metadata for member " + memberId + " during " + genericGroup.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData), completableFuture);
                    }
                    completableFuture.complete(new JoinGroupResponseData().setMembers(Collections.emptyList()).setMemberId(memberId).setGenerationId(genericGroup.generationId()).setProtocolName(genericGroup.protocolName().orElse(null)).setProtocolType(genericGroup.protocolType().orElse(null)).setLeader(genericGroup.leaderOrNull()).setSkipAssignment(false));
                }
            } catch (KafkaException e) {
                completableFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.forException(e).code()).setProtocolType(null).setProtocolName(null));
                return EMPTY_RESULT;
            }
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, Record> completeGenericGroupJoin(GenericGroup genericGroup) {
        this.timer.cancel(genericGroupJoinKey(genericGroup.groupId()));
        String groupId = genericGroup.groupId();
        Map map = (Map) genericGroup.notYetRejoinedMembers().entrySet().stream().filter(entry -> {
            return !((GenericGroupMember) entry.getValue()).isStaticMember();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (!map.isEmpty()) {
            map.values().forEach(genericGroupMember -> {
                genericGroup.remove(genericGroupMember.memberId());
                this.timer.cancel(genericGroupHeartbeatKey(genericGroup.groupId(), genericGroupMember.memberId()));
            });
            this.log.info("Group {} removed dynamic members who haven't joined: {}", groupId, map.keySet());
        }
        if (genericGroup.isInState(GenericGroupState.DEAD)) {
            this.log.info("Group {} is dead, skipping rebalance stage.", groupId);
        } else {
            if (!genericGroup.maybeElectNewJoinedLeader() && !genericGroup.allMembers().isEmpty()) {
                this.log.error("Group {} could not complete rebalance because no members rejoined.", groupId);
                this.timer.schedule(genericGroupJoinKey(groupId), genericGroup.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> {
                    return completeGenericGroupJoin(genericGroup);
                });
                return EMPTY_RESULT;
            }
            genericGroup.initNextGeneration();
            if (genericGroup.isInState(GenericGroupState.EMPTY)) {
                this.log.info("Group {} with generation {} is now empty.", groupId, Integer.valueOf(genericGroup.generationId()));
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.whenComplete((r7, th) -> {
                    if (th != null) {
                        this.log.warn("Failed to write empty metadata for group {}: {}", genericGroup.groupId(), th.getMessage());
                    }
                });
                return new CoordinatorResult<>(Collections.singletonList(RecordHelpers.newGroupMetadataRecord(genericGroup, Collections.emptyMap(), this.metadataImage.features().metadataVersion())), completableFuture);
            }
            this.log.info("Stabilized group {} generation {} with {} members.", groupId, Integer.valueOf(genericGroup.generationId()), Integer.valueOf(genericGroup.size()));
            genericGroup.allMembers().forEach(genericGroupMember2 -> {
                List<JoinGroupResponseData.JoinGroupResponseMember> emptyList = Collections.emptyList();
                if (genericGroup.isLeader(genericGroupMember2.memberId())) {
                    emptyList = genericGroup.currentGenericGroupMembers();
                }
                genericGroup.completeJoinFuture(genericGroupMember2, new JoinGroupResponseData().setMembers(emptyList).setMemberId(genericGroupMember2.memberId()).setGenerationId(genericGroup.generationId()).setProtocolName(genericGroup.protocolName().orElse(null)).setProtocolType(genericGroup.protocolType().orElse(null)).setLeader(genericGroup.leaderOrNull()).setSkipAssignment(false).setErrorCode(Errors.NONE.code()));
                rescheduleGenericGroupMemberHeartbeat(genericGroup, genericGroupMember2);
                genericGroupMember2.setIsNew(false);
                genericGroup.addPendingSyncMember(genericGroupMember2.memberId());
            });
            schedulePendingSync(genericGroup);
        }
        return EMPTY_RESULT;
    }

    private void schedulePendingSync(GenericGroup genericGroup) {
        this.timer.schedule(genericGroupSyncKey(genericGroup.groupId()), genericGroup.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> {
            return expirePendingSync(genericGroup, genericGroup.generationId());
        });
    }

    private CoordinatorResult<Void, Record> expireGenericGroupMemberHeartbeat(GenericGroup genericGroup, String str) {
        if (genericGroup.isInState(GenericGroupState.DEAD)) {
            this.log.info("Received notification of heartbeat expiration for member {} after group {} had already been unloaded or deleted.", str, genericGroup.groupId());
        } else {
            if (genericGroup.isPendingMember(str)) {
                this.log.info("Pending member {} in group {} has been removed after session timeout expiration.", str, genericGroup.groupId());
                return removePendingMemberAndUpdateGenericGroup(genericGroup, str);
            }
            if (genericGroup.hasMemberId(str)) {
                GenericGroupMember member = genericGroup.member(str);
                if (!member.hasSatisfiedHeartbeat()) {
                    this.log.info("Member {} in group {} has failed, removing it from the group.", member.memberId(), genericGroup.groupId());
                    return removeMemberAndUpdateGenericGroup(genericGroup, member, "removing member " + member.memberId() + " on heartbeat expiration.");
                }
            } else {
                this.log.debug("Member {} has already been removed from the group.", str);
            }
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, Record> removeMemberAndUpdateGenericGroup(GenericGroup genericGroup, GenericGroupMember genericGroupMember, String str) {
        genericGroup.completeJoinFuture(genericGroupMember, new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
        genericGroup.remove(genericGroupMember.memberId());
        return (genericGroup.isInState(GenericGroupState.STABLE) || genericGroup.isInState(GenericGroupState.COMPLETING_REBALANCE)) ? maybePrepareRebalanceOrCompleteJoin(genericGroup, str) : (genericGroup.isInState(GenericGroupState.PREPARING_REBALANCE) && genericGroup.hasAllMembersJoined()) ? completeGenericGroupJoin(genericGroup) : EMPTY_RESULT;
    }

    private CoordinatorResult<Void, Record> removePendingMemberAndUpdateGenericGroup(GenericGroup genericGroup, String str) {
        genericGroup.remove(str);
        return (genericGroup.isInState(GenericGroupState.PREPARING_REBALANCE) && genericGroup.hasAllMembersJoined()) ? completeGenericGroupJoin(genericGroup) : EMPTY_RESULT;
    }

    private CoordinatorResult<Void, Record> updateMemberThenRebalanceOrCompleteJoin(JoinGroupRequestData joinGroupRequestData, GenericGroup genericGroup, GenericGroupMember genericGroupMember, String str, CompletableFuture<JoinGroupResponseData> completableFuture) {
        genericGroup.updateMember(genericGroupMember, joinGroupRequestData.protocols(), joinGroupRequestData.rebalanceTimeoutMs(), joinGroupRequestData.sessionTimeoutMs(), completableFuture);
        return maybePrepareRebalanceOrCompleteJoin(genericGroup, str);
    }

    private CoordinatorResult<Void, Record> addMemberThenRebalanceOrCompleteJoin(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, GenericGroup genericGroup, String str, CompletableFuture<JoinGroupResponseData> completableFuture) {
        GenericGroupMember genericGroupMember = new GenericGroupMember(str, Optional.ofNullable(joinGroupRequestData.groupInstanceId()), requestContext.clientId(), requestContext.clientAddress().toString(), joinGroupRequestData.rebalanceTimeoutMs(), joinGroupRequestData.sessionTimeoutMs(), joinGroupRequestData.protocolType(), joinGroupRequestData.protocols());
        genericGroupMember.setIsNew(true);
        if (genericGroup.isInState(GenericGroupState.PREPARING_REBALANCE) && genericGroup.previousState() == GenericGroupState.EMPTY) {
            genericGroup.setNewMemberAdded(true);
        }
        genericGroup.add(genericGroupMember, completableFuture);
        rescheduleGenericGroupMemberHeartbeat(genericGroup, genericGroupMember, this.genericGroupNewMemberJoinTimeoutMs);
        return maybePrepareRebalanceOrCompleteJoin(genericGroup, "Adding new member " + str + " with group instance id " + joinGroupRequestData.groupInstanceId() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData));
    }

    private CoordinatorResult<Void, Record> maybePrepareRebalanceOrCompleteJoin(GenericGroup genericGroup, String str) {
        return genericGroup.canRebalance() ? prepareRebalance(genericGroup, str) : maybeCompleteJoinPhase(genericGroup);
    }

    CoordinatorResult<Void, Record> prepareRebalance(GenericGroup genericGroup, String str) {
        if (genericGroup.isInState(GenericGroupState.COMPLETING_REBALANCE)) {
            resetAndPropagateAssignmentWithError(genericGroup, Errors.REBALANCE_IN_PROGRESS);
        }
        removeSyncExpiration(genericGroup);
        boolean isInState = genericGroup.isInState(GenericGroupState.EMPTY);
        if (isInState) {
            int i = this.genericGroupInitialRebalanceDelayMs;
            int max = Math.max(genericGroup.rebalanceTimeoutMs() - this.genericGroupInitialRebalanceDelayMs, 0);
            this.timer.schedule(genericGroupJoinKey(genericGroup.groupId()), i, TimeUnit.MILLISECONDS, false, () -> {
                return tryCompleteInitialRebalanceElseSchedule(genericGroup, i, max);
            });
        }
        genericGroup.transitionTo(GenericGroupState.PREPARING_REBALANCE);
        this.log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).", genericGroup.groupId(), genericGroup.currentState(), Integer.valueOf(genericGroup.generationId()), str);
        return isInState ? EMPTY_RESULT : maybeCompleteJoinElseSchedule(genericGroup);
    }

    private CoordinatorResult<Void, Record> maybeCompleteJoinElseSchedule(GenericGroup genericGroup) {
        String genericGroupJoinKey = genericGroupJoinKey(genericGroup.groupId());
        if (genericGroup.hasAllMembersJoined()) {
            return completeGenericGroupJoin(genericGroup);
        }
        this.timer.schedule(genericGroupJoinKey, genericGroup.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> {
            return completeGenericGroupJoin(genericGroup);
        });
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, Record> tryCompleteInitialRebalanceElseSchedule(GenericGroup genericGroup, int i, int i2) {
        if (!genericGroup.newMemberAdded() || i2 == 0) {
            return completeGenericGroupJoin(genericGroup);
        }
        genericGroup.setNewMemberAdded(false);
        int min = Math.min(this.genericGroupInitialRebalanceDelayMs, i2);
        int max = Math.max(i2 - i, 0);
        this.timer.schedule(genericGroupJoinKey(genericGroup.groupId()), min, TimeUnit.MILLISECONDS, false, () -> {
            return tryCompleteInitialRebalanceElseSchedule(genericGroup, min, max);
        });
        return EMPTY_RESULT;
    }

    private void resetAndPropagateAssignmentWithError(GenericGroup genericGroup, Errors errors) {
        if (!genericGroup.isInState(GenericGroupState.COMPLETING_REBALANCE)) {
            throw new IllegalStateException("Group " + genericGroup.groupId() + " must be in " + GenericGroupState.COMPLETING_REBALANCE.name() + " state but is in " + genericGroup.currentState() + ".");
        }
        genericGroup.allMembers().forEach(genericGroupMember -> {
            genericGroupMember.setAssignment(GenericGroupMember.EMPTY_ASSIGNMENT);
        });
        propagateAssignment(genericGroup, errors);
    }

    private void setAndPropagateAssignment(GenericGroup genericGroup, Map<String, byte[]> map) {
        if (!genericGroup.isInState(GenericGroupState.COMPLETING_REBALANCE)) {
            throw new IllegalStateException("The group must be in CompletingRebalance state to set and propagate assignment.");
        }
        genericGroup.allMembers().forEach(genericGroupMember -> {
            genericGroupMember.setAssignment((byte[]) map.getOrDefault(genericGroupMember.memberId(), GenericGroupMember.EMPTY_ASSIGNMENT));
        });
        propagateAssignment(genericGroup, Errors.NONE);
    }

    private void propagateAssignment(GenericGroup genericGroup, Errors errors) {
        Optional<String> empty = Optional.empty();
        Optional<String> empty2 = Optional.empty();
        if (errors == Errors.NONE) {
            empty = genericGroup.protocolName();
            empty2 = genericGroup.protocolType();
        }
        for (GenericGroupMember genericGroupMember : genericGroup.allMembers()) {
            if (!genericGroupMember.hasAssignment() && errors == Errors.NONE) {
                this.log.warn("Sending empty assignment to member {} of {} for generation {} with no errors", genericGroupMember.memberId(), genericGroup.groupId(), Integer.valueOf(genericGroup.generationId()));
            }
            if (genericGroup.completeSyncFuture(genericGroupMember, new SyncGroupResponseData().setProtocolName(empty.orElse(null)).setProtocolType(empty2.orElse(null)).setAssignment(genericGroupMember.assignment()).setErrorCode(errors.code()))) {
                rescheduleGenericGroupMemberHeartbeat(genericGroup, genericGroupMember);
            }
        }
    }

    public void rescheduleGenericGroupMemberHeartbeat(GenericGroup genericGroup, GenericGroupMember genericGroupMember) {
        rescheduleGenericGroupMemberHeartbeat(genericGroup, genericGroupMember, genericGroupMember.sessionTimeoutMs());
    }

    private void rescheduleGenericGroupMemberHeartbeat(GenericGroup genericGroup, GenericGroupMember genericGroupMember, long j) {
        this.timer.schedule(genericGroupHeartbeatKey(genericGroup.groupId(), genericGroupMember.memberId()), j, TimeUnit.MILLISECONDS, false, () -> {
            return expireGenericGroupMemberHeartbeat(genericGroup, genericGroupMember.memberId());
        });
    }

    private void removeSyncExpiration(GenericGroup genericGroup) {
        genericGroup.clearPendingSyncMembers();
        this.timer.cancel(genericGroupSyncKey(genericGroup.groupId()));
    }

    private CoordinatorResult<Void, Record> expirePendingSync(GenericGroup genericGroup, int i) {
        if (i != genericGroup.generationId()) {
            this.log.error("Received unexpected notification of sync expiration for {} with an old generation {} while the group has {}.", genericGroup.groupId(), Integer.valueOf(i), Integer.valueOf(genericGroup.generationId()));
        } else if (genericGroup.isInState(GenericGroupState.DEAD) || genericGroup.isInState(GenericGroupState.EMPTY) || genericGroup.isInState(GenericGroupState.PREPARING_REBALANCE)) {
            this.log.error("Received unexpected notification of sync expiration after group {} already transitioned to {} state.", genericGroup.groupId(), genericGroup.stateAsString());
        } else if ((genericGroup.isInState(GenericGroupState.COMPLETING_REBALANCE) || genericGroup.isInState(GenericGroupState.STABLE)) && !genericGroup.hasReceivedSyncFromAllMembers()) {
            HashSet hashSet = new HashSet(genericGroup.allPendingSyncMembers());
            hashSet.forEach(str -> {
                genericGroup.remove(str);
                this.timer.cancel(genericGroupHeartbeatKey(genericGroup.groupId(), str));
            });
            this.log.debug("Group {} removed members who haven't sent their sync requests: {}", genericGroup.groupId(), hashSet);
            return prepareRebalance(genericGroup, "Removing " + hashSet + " on pending sync request expiration");
        }
        return EMPTY_RESULT;
    }

    private boolean acceptJoiningMember(GenericGroup genericGroup, String str) {
        switch (genericGroup.currentState()) {
            case EMPTY:
            case DEAD:
                return true;
            case PREPARING_REBALANCE:
                return (genericGroup.hasMemberId(str) && genericGroup.member(str).isAwaitingJoin()) || genericGroup.numAwaitingJoinResponse() < this.genericGroupMaxSize;
            case COMPLETING_REBALANCE:
            case STABLE:
                return genericGroup.hasMemberId(str) || genericGroup.size() < this.genericGroupMaxSize;
            default:
                throw new IllegalStateException("Unknown group state: " + genericGroup.stateAsString());
        }
    }

    private CoordinatorResult<Void, Record> updateStaticMemberThenRebalanceOrCompleteJoin(RequestContext requestContext, JoinGroupRequestData joinGroupRequestData, GenericGroup genericGroup, String str, String str2, CompletableFuture<JoinGroupResponseData> completableFuture) {
        String leaderOrNull = genericGroup.leaderOrNull();
        GenericGroupMember replaceStaticMember = genericGroup.replaceStaticMember(joinGroupRequestData.groupInstanceId(), str, str2);
        rescheduleGenericGroupMemberHeartbeat(genericGroup, replaceStaticMember);
        int rebalanceTimeoutMs = replaceStaticMember.rebalanceTimeoutMs();
        int sessionTimeoutMs = replaceStaticMember.sessionTimeoutMs();
        JoinGroupRequestData.JoinGroupRequestProtocolCollection supportedProtocols = replaceStaticMember.supportedProtocols();
        genericGroup.updateMember(replaceStaticMember, joinGroupRequestData.protocols(), joinGroupRequestData.rebalanceTimeoutMs(), joinGroupRequestData.sessionTimeoutMs(), completableFuture);
        if (!genericGroup.isInState(GenericGroupState.STABLE)) {
            if (genericGroup.isInState(GenericGroupState.COMPLETING_REBALANCE)) {
                return prepareRebalance(genericGroup, "Updating metadata for static member " + replaceStaticMember.memberId() + " with instance id " + joinGroupRequestData.groupInstanceId() + "; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData));
            }
            if (genericGroup.isInState(GenericGroupState.EMPTY) || genericGroup.isInState(GenericGroupState.DEAD)) {
                throw new IllegalStateException("Group " + genericGroup.groupId() + " was not supposed to be in the state " + genericGroup.stateAsString() + " when the unknown static member " + joinGroupRequestData.groupInstanceId() + " rejoins.");
            }
            return maybeCompleteJoinPhase(genericGroup);
        }
        String groupInstanceId = joinGroupRequestData.groupInstanceId();
        if (!genericGroup.protocolName().orElse("").equals(genericGroup.selectProtocol())) {
            return maybePrepareRebalanceOrCompleteJoin(genericGroup, "Group's selectedProtocol will change because static member " + replaceStaticMember.memberId() + " with instance id " + groupInstanceId + " joined with change of protocol; client reason: " + JoinGroupRequest.joinReason(joinGroupRequestData));
        }
        this.log.info("Static member which joins during Stable stage and doesn't affect the selected protocol will not trigger a rebalance.");
        Map<String, byte[]> groupAssignment = genericGroup.groupAssignment();
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture2.whenComplete((r19, th) -> {
            if (th != null) {
                this.log.warn("Failed to persist metadata for group {} static member {} with group instance id {} due to {}. Reverting to old member id {}.", genericGroup.groupId(), str2, groupInstanceId, th.getMessage(), str);
                genericGroup.updateMember(replaceStaticMember, supportedProtocols, rebalanceTimeoutMs, sessionTimeoutMs, null);
                rescheduleGenericGroupMemberHeartbeat(genericGroup, genericGroup.replaceStaticMember(groupInstanceId, str2, str));
                completableFuture.complete(new JoinGroupResponseData().setMembers(Collections.emptyList()).setMemberId("").setGenerationId(genericGroup.generationId()).setProtocolName(genericGroup.protocolName().orElse(null)).setProtocolType(genericGroup.protocolType().orElse(null)).setLeader(leaderOrNull).setSkipAssignment(false).setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(th)).code()));
                return;
            }
            if (!JoinGroupRequest.supportsSkippingAssignment(requestContext.apiVersion())) {
                genericGroup.completeJoinFuture(replaceStaticMember, new JoinGroupResponseData().setMembers(Collections.emptyList()).setMemberId(str2).setGenerationId(genericGroup.generationId()).setProtocolName(genericGroup.protocolName().orElse(null)).setProtocolType(genericGroup.protocolType().orElse(null)).setLeader(leaderOrNull).setSkipAssignment(false));
            } else {
                boolean isLeader = genericGroup.isLeader(str2);
                genericGroup.completeJoinFuture(replaceStaticMember, new JoinGroupResponseData().setMembers(isLeader ? genericGroup.currentGenericGroupMembers() : Collections.emptyList()).setMemberId(str2).setGenerationId(genericGroup.generationId()).setProtocolName(genericGroup.protocolName().orElse(null)).setProtocolType(genericGroup.protocolType().orElse(null)).setLeader(genericGroup.leaderOrNull()).setSkipAssignment(isLeader));
            }
        });
        return new CoordinatorResult<>(Collections.singletonList(RecordHelpers.newGroupMetadataRecord(genericGroup, groupAssignment, this.metadataImage.features().metadataVersion())), completableFuture2);
    }

    public CoordinatorResult<Void, Record> genericGroupSync(RequestContext requestContext, SyncGroupRequestData syncGroupRequestData, CompletableFuture<SyncGroupResponseData> completableFuture) throws UnknownMemberIdException, GroupIdNotFoundException {
        String groupId = syncGroupRequestData.groupId();
        String memberId = syncGroupRequestData.memberId();
        try {
            GenericGroup orMaybeCreateGenericGroup = getOrMaybeCreateGenericGroup(groupId, false);
            Optional<Errors> validateSyncGroup = validateSyncGroup(orMaybeCreateGenericGroup, syncGroupRequestData);
            if (validateSyncGroup.isPresent()) {
                completableFuture.complete(new SyncGroupResponseData().setErrorCode(validateSyncGroup.get().code()));
            } else if (orMaybeCreateGenericGroup.isInState(GenericGroupState.EMPTY)) {
                completableFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
            } else if (orMaybeCreateGenericGroup.isInState(GenericGroupState.PREPARING_REBALANCE)) {
                completableFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
            } else if (orMaybeCreateGenericGroup.isInState(GenericGroupState.COMPLETING_REBALANCE)) {
                orMaybeCreateGenericGroup.member(memberId).setAwaitingSyncFuture(completableFuture);
                removePendingSyncMember(orMaybeCreateGenericGroup, syncGroupRequestData.memberId());
                if (orMaybeCreateGenericGroup.isLeader(memberId)) {
                    this.log.info("Assignment received from leader {} for group {} for generation {}. The group has {} members, {} of which are static.", memberId, groupId, Integer.valueOf(orMaybeCreateGenericGroup.generationId()), Integer.valueOf(orMaybeCreateGenericGroup.size()), Integer.valueOf(orMaybeCreateGenericGroup.allStaticMemberIds().size()));
                    HashMap hashMap = new HashMap();
                    syncGroupRequestData.assignments().forEach(syncGroupRequestAssignment -> {
                    });
                    HashMap hashMap2 = new HashMap();
                    orMaybeCreateGenericGroup.allMembers().forEach(genericGroupMember -> {
                        if (hashMap.containsKey(genericGroupMember.memberId())) {
                            return;
                        }
                        hashMap2.put(genericGroupMember.memberId(), GenericGroupMember.EMPTY_ASSIGNMENT);
                    });
                    hashMap.putAll(hashMap2);
                    if (!hashMap2.isEmpty()) {
                        this.log.warn("Setting empty assignments for members {} of {} for generation {}.", hashMap2, groupId, Integer.valueOf(orMaybeCreateGenericGroup.generationId()));
                    }
                    CompletableFuture completableFuture2 = new CompletableFuture();
                    completableFuture2.whenComplete((r10, th) -> {
                        if (orMaybeCreateGenericGroup.isInState(GenericGroupState.COMPLETING_REBALANCE) && syncGroupRequestData.generationId() == orMaybeCreateGenericGroup.generationId()) {
                            if (th == null) {
                                setAndPropagateAssignment(orMaybeCreateGenericGroup, hashMap);
                                orMaybeCreateGenericGroup.transitionTo(GenericGroupState.STABLE);
                            } else {
                                Errors forException = Errors.forException(th);
                                resetAndPropagateAssignmentWithError(orMaybeCreateGenericGroup, forException);
                                maybePrepareRebalanceOrCompleteJoin(orMaybeCreateGenericGroup, "Error " + forException + " when storing group assignmentduring SyncGroup (member: " + memberId + ").");
                            }
                        }
                    });
                    return new CoordinatorResult<>(Collections.singletonList(RecordHelpers.newGroupMetadataRecord(orMaybeCreateGenericGroup, hashMap, this.metadataImage.features().metadataVersion())), completableFuture2);
                }
            } else if (orMaybeCreateGenericGroup.isInState(GenericGroupState.STABLE)) {
                removePendingSyncMember(orMaybeCreateGenericGroup, memberId);
                completableFuture.complete(new SyncGroupResponseData().setProtocolType(orMaybeCreateGenericGroup.protocolType().orElse(null)).setProtocolName(orMaybeCreateGenericGroup.protocolName().orElse(null)).setAssignment(orMaybeCreateGenericGroup.member(memberId).assignment()).setErrorCode(Errors.NONE.code()));
            } else if (orMaybeCreateGenericGroup.isInState(GenericGroupState.DEAD)) {
                throw new IllegalStateException("Reached unexpected condition for Dead group " + groupId);
            }
            return EMPTY_RESULT;
        } catch (Throwable th2) {
            completableFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.forException(th2).code()));
            return EMPTY_RESULT;
        }
    }

    static Errors appendGroupMetadataErrorToResponseError(Errors errors) {
        switch (errors) {
            case UNKNOWN_TOPIC_OR_PARTITION:
            case NOT_ENOUGH_REPLICAS:
                return Errors.COORDINATOR_NOT_AVAILABLE;
            case NOT_LEADER_OR_FOLLOWER:
            case KAFKA_STORAGE_ERROR:
                return Errors.NOT_COORDINATOR;
            case MESSAGE_TOO_LARGE:
            case RECORD_LIST_TOO_LARGE:
            case INVALID_FETCH_SIZE:
                return Errors.UNKNOWN_SERVER_ERROR;
            default:
                return errors;
        }
    }

    private Optional<Errors> validateSyncGroup(GenericGroup genericGroup, SyncGroupRequestData syncGroupRequestData) {
        if (genericGroup.isInState(GenericGroupState.DEAD)) {
            return Optional.of(Errors.COORDINATOR_NOT_AVAILABLE);
        }
        try {
            genericGroup.validateMember(syncGroupRequestData.memberId(), syncGroupRequestData.groupInstanceId(), "sync-group");
            return syncGroupRequestData.generationId() != genericGroup.generationId() ? Optional.of(Errors.ILLEGAL_GENERATION) : (isProtocolInconsistent(syncGroupRequestData.protocolType(), genericGroup.protocolType().orElse(null)) || isProtocolInconsistent(syncGroupRequestData.protocolName(), genericGroup.protocolName().orElse(null))) ? Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL) : Optional.empty();
        } catch (KafkaException e) {
            return Optional.of(Errors.forException(e));
        }
    }

    private void removePendingSyncMember(GenericGroup genericGroup, String str) {
        genericGroup.removePendingSyncMember(str);
        String genericGroupSyncKey = genericGroupSyncKey(genericGroup.groupId());
        switch (genericGroup.currentState()) {
            case EMPTY:
            case DEAD:
            case PREPARING_REBALANCE:
                this.timer.cancel(genericGroupSyncKey);
                return;
            case COMPLETING_REBALANCE:
            case STABLE:
                if (genericGroup.hasReceivedSyncFromAllMembers()) {
                    this.timer.cancel(genericGroupSyncKey);
                    return;
                }
                return;
            default:
                throw new IllegalStateException("Unknown group state: " + genericGroup.stateAsString());
        }
    }

    public HeartbeatResponseData genericGroupHeartbeat(RequestContext requestContext, HeartbeatRequestData heartbeatRequestData) {
        GenericGroup orMaybeCreateGenericGroup = getOrMaybeCreateGenericGroup(heartbeatRequestData.groupId(), false);
        validateGenericGroupHeartbeat(orMaybeCreateGenericGroup, heartbeatRequestData.memberId(), heartbeatRequestData.groupInstanceId(), heartbeatRequestData.generationId());
        switch (orMaybeCreateGenericGroup.currentState()) {
            case EMPTY:
                return new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
            case DEAD:
            default:
                throw new IllegalStateException("Reached unexpected state " + orMaybeCreateGenericGroup.currentState() + " for group " + orMaybeCreateGenericGroup.groupId());
            case PREPARING_REBALANCE:
                rescheduleGenericGroupMemberHeartbeat(orMaybeCreateGenericGroup, orMaybeCreateGenericGroup.member(heartbeatRequestData.memberId()));
                return new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code());
            case COMPLETING_REBALANCE:
            case STABLE:
                rescheduleGenericGroupMemberHeartbeat(orMaybeCreateGenericGroup, orMaybeCreateGenericGroup.member(heartbeatRequestData.memberId()));
                return new HeartbeatResponseData();
        }
    }

    private void validateGenericGroupHeartbeat(GenericGroup genericGroup, String str, String str2, int i) throws CoordinatorNotAvailableException, IllegalGenerationException {
        if (genericGroup.isInState(GenericGroupState.DEAD)) {
            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
        }
        genericGroup.validateMember(str, str2, "heartbeat");
        if (i != genericGroup.generationId()) {
            throw Errors.ILLEGAL_GENERATION.exception();
        }
    }

    private boolean isProtocolInconsistent(String str, String str2) {
        return (str == null || str2 == null || str2.equals(str)) ? false : true;
    }

    static String genericGroupHeartbeatKey(String str, String str2) {
        return "heartbeat-" + str + "-" + str2;
    }

    static String genericGroupJoinKey(String str) {
        return "join-" + str;
    }

    static String genericGroupSyncKey(String str) {
        return "sync-" + str;
    }
}
