package org.apache.kafka.controller;

import io.confluent.kafka.link.ClusterLinkUtils;
import io.confluent.shaded.org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AlterMirrorTopicsRequestData;
import org.apache.kafka.common.message.AlterMirrorTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.metadata.MirrorTopicChangeRecord;
import org.apache.kafka.common.metadata.MirrorTopicRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;

/* loaded from: input_file:org/apache/kafka/controller/MirrorTopicControlManager.class */
public class MirrorTopicControlManager {
    private final SnapshotRegistry snapshotRegistry;
    private final Logger log;
    private final Time time;
    private final Function<String, Optional<Uuid>> clusterLinkResolver;
    private final Function<String, Optional<Uuid>> topicIdResolver;
    private final TimelineHashMap<Uuid, TimelineHashSet<Uuid>> linksToMirrorTopics;
    private final TimelineHashMap<Uuid, MirrorTopic> mirrorTopics;

    public MirrorTopicControlManager(SnapshotRegistry snapshotRegistry, LogContext logContext, Time time, Function<String, Optional<Uuid>> function, Function<String, Optional<Uuid>> function2) {
        this.snapshotRegistry = snapshotRegistry;
        this.time = time;
        this.topicIdResolver = function;
        this.clusterLinkResolver = function2;
        this.log = logContext.logger(MirrorTopicControlManager.class);
        this.linksToMirrorTopics = new TimelineHashMap<>(snapshotRegistry, 0);
        this.mirrorTopics = new TimelineHashMap<>(snapshotRegistry, 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isMirrorTopic(Uuid uuid) {
        return this.mirrorTopics.containsKey(uuid);
    }

    Optional<MirrorTopic> mirrorTopic(Uuid uuid) {
        return Optional.ofNullable(this.mirrorTopics.get(uuid));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isMirrorTopic(String str) {
        return this.topicIdResolver.apply(str).filter(this::isMirrorTopic).isPresent();
    }

    Optional<Uuid> clusterLinkIdForTopicId(Uuid uuid) {
        return Optional.ofNullable(this.mirrorTopics.get(uuid)).map((v0) -> {
            return v0.linkId();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<Uuid> topicIdsForClusterLinkId(Uuid uuid, boolean z) {
        TimelineHashSet<Uuid> timelineHashSet = this.linksToMirrorTopics.get(uuid);
        return timelineHashSet == null ? Collections.emptySet() : z ? (Set) timelineHashSet.stream().filter(uuid2 -> {
            return !this.mirrorTopics.get(uuid2).mirrorState().equals(MirrorTopic.State.STOPPED);
        }).collect(Collectors.toSet()) : timelineHashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<String> topicsInUse(Uuid uuid) {
        TimelineHashSet<Uuid> timelineHashSet = this.linksToMirrorTopics.get(uuid);
        return timelineHashSet == null ? Collections.emptySet() : (Set) timelineHashSet.stream().filter(uuid2 -> {
            return this.mirrorTopics.get(uuid2).mirrorState() != MirrorTopic.State.STOPPED;
        }).map(uuid3 -> {
            return this.mirrorTopics.get(uuid3).topicName();
        }).collect(Collectors.toSet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ControllerResult<AlterMirrorTopicsResponseData> alterMirrorTopics(AlterMirrorTopicsRequestData alterMirrorTopicsRequestData) {
        BoundedList newArrayBacked = BoundedList.newArrayBacked(10000);
        AlterMirrorTopicsResponseData alterMirrorTopicsResponseData = new AlterMirrorTopicsResponseData();
        alterMirrorTopicsResponseData.setAlterMirrorResults(new ArrayList());
        for (AlterMirrorTopicsRequestData.AlterMirrorTopic alterMirrorTopic : alterMirrorTopicsRequestData.alterMirrorTopics()) {
            newArrayBacked.getClass();
            ApiError alterMirrorState = alterMirrorState(alterMirrorTopic, (v1) -> {
                r2.add(v1);
            });
            AlterMirrorTopicsResponseData.AlterMirrorResult alterMirrorResult = new AlterMirrorTopicsResponseData.AlterMirrorResult();
            alterMirrorResult.setTopic(alterMirrorResult.topic());
            alterMirrorResult.setErrorCode(alterMirrorState.error().code());
            alterMirrorResult.setErrorMessage(alterMirrorState.message());
            alterMirrorTopicsResponseData.alterMirrorResults().add(alterMirrorResult);
        }
        alterMirrorTopicsResponseData.setErrorCode(Errors.NONE.code());
        return alterMirrorTopicsRequestData.validateOnly() ? ControllerResult.of(Collections.emptyList(), alterMirrorTopicsResponseData) : ControllerResult.of(newArrayBacked, alterMirrorTopicsResponseData);
    }

    ApiError alterMirrorState(AlterMirrorTopicsRequestData.AlterMirrorTopic alterMirrorTopic, Consumer<ApiMessageAndVersion> consumer) {
        MirrorTopic stopped;
        Optional<Uuid> apply = this.topicIdResolver.apply(alterMirrorTopic.topic());
        if (!apply.isPresent()) {
            return new ApiError(Errors.UNKNOWN_TOPIC_ID, "No such topic '" + alterMirrorTopic.topic() + "'.");
        }
        MirrorTopic mirrorTopic = this.mirrorTopics.get(apply.get());
        if (mirrorTopic == null) {
            return new ApiError(Errors.INVALID_REQUEST, "Topic '" + alterMirrorTopic.topic() + "' is not a mirror topic.");
        }
        try {
            MirrorTopic.State fromStateName = MirrorTopic.State.fromStateName(alterMirrorTopic.mirrorTopicState());
            if (!isAllowedStateChange(mirrorTopic.mirrorState(), fromStateName)) {
                return new ApiError(Errors.INVALID_REQUEST, "Illegal state transition " + mirrorTopic.mirrorState() + " to " + fromStateName + " for mirror topic " + mirrorTopic.topicId());
            }
            long milliseconds = this.time.milliseconds();
            switch (fromStateName) {
                case MIRROR:
                    stopped = MirrorTopic.mirror(mirrorTopic, milliseconds, alterMirrorTopic.mirrorStartOffsets());
                    break;
                case PAUSED:
                    stopped = MirrorTopic.paused(mirrorTopic, milliseconds, alterMirrorTopic.topicLevelPause(), alterMirrorTopic.linkLevelPause(), mirrorTopic.mirrorState(), mirrorTopic.mirrorStartOffsets(), alterMirrorTopic.mirrorTopicError());
                    break;
                case FAILED:
                    stopped = MirrorTopic.failed(mirrorTopic, milliseconds, alterMirrorTopic.mirrorTopicError());
                    break;
                case PENDING_STOPPED:
                    stopped = MirrorTopic.pendingStopped(mirrorTopic, milliseconds, alterMirrorTopic.promoted());
                    break;
                case STOPPED:
                    stopped = MirrorTopic.stopped(mirrorTopic, milliseconds, alterMirrorTopic.stoppedLogEndOffsets());
                    break;
                default:
                    return new ApiError(Errors.INVALID_REQUEST, "Cannot transition mirror topic " + mirrorTopic.topicId() + " from " + mirrorTopic.mirrorState() + " to unknown state " + alterMirrorTopic.mirrorTopicState());
            }
            consumer.accept(new ApiMessageAndVersion(MirrorTopic.toChangeRecord(stopped), (short) 0));
            return ApiError.NONE;
        } catch (IllegalArgumentException e) {
            return new ApiError(Errors.INVALID_REQUEST, "Unknown mirror topic state " + alterMirrorTopic.mirrorTopicState() + " for mirror topic " + mirrorTopic.topicId());
        }
    }

    private boolean isAllowedStateChange(MirrorTopic.State state, MirrorTopic.State state2) {
        if (state == state2) {
            return true;
        }
        switch (state) {
            case MIRROR:
                return state2 == MirrorTopic.State.PAUSED || state2 == MirrorTopic.State.PENDING_STOPPED || state2 == MirrorTopic.State.FAILED;
            case PAUSED:
                return state2 == MirrorTopic.State.MIRROR || state2 == MirrorTopic.State.PENDING_STOPPED || state2 == MirrorTopic.State.FAILED;
            case FAILED:
                return state2 == MirrorTopic.State.PAUSED || state2 == MirrorTopic.State.PENDING_STOPPED;
            case PENDING_STOPPED:
                return state2 == MirrorTopic.State.STOPPED;
            case STOPPED:
                return false;
            default:
                this.log.error("Unhandled current mirror topic state '" + state + "'");
                return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ApiError maybeAddMirrorTopicRecord(CreateTopicsRequestData.CreatableTopic creatableTopic, Uuid uuid, Consumer<ApiMessageAndVersion> consumer) {
        if (creatableTopic.linkName() == null && creatableTopic.mirrorTopic() == null) {
            return ApiError.NONE;
        }
        if (creatableTopic.linkName() == null || creatableTopic.mirrorTopic() == null) {
            return new ApiError(Errors.INVALID_REQUEST, "Link name and mirror topic name must be provided together");
        }
        try {
            ClusterLinkUtils.validateLinkNameOrThrow(creatableTopic.linkName());
            Optional<Uuid> apply = this.clusterLinkResolver.apply(creatableTopic.linkName());
            if (!apply.isPresent()) {
                return new ApiError(Errors.CLUSTER_LINK_NOT_FOUND, "Cluster link " + creatableTopic.linkName() + " was not found.");
            }
            consumer.accept(new ApiMessageAndVersion(new MirrorTopicRecord().setMirrorTopicState(MirrorTopic.State.MIRROR.stateName()).setTopicName(creatableTopic.name()).setTopicId(uuid).setClusterLinkId(apply.get()).setClusterLinkName(creatableTopic.linkName()).setMirrorStartOffsets(creatableTopic.mirrorStartOffsets()).setSourceTopicId(creatableTopic.sourceTopicId()).setSourceTopicName(creatableTopic.mirrorTopic()), (short) 0));
            return ApiError.NONE;
        } catch (Throwable th) {
            return ApiError.fromThrowable(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void failMirrorTopic(Uuid uuid, short s, Consumer<ApiMessageAndVersion> consumer) {
        MirrorTopic mirrorTopic = this.mirrorTopics.get(uuid);
        if (mirrorTopic == null) {
            this.log.error("Could not fail mirror topic {} since it does not exist", uuid);
        } else if (mirrorTopic.mirrorState() == MirrorTopic.State.MIRROR) {
            consumer.accept(new ApiMessageAndVersion(MirrorTopic.toChangeRecord(MirrorTopic.failed(mirrorTopic, this.time.milliseconds(), s)), (short) 0));
        } else {
            this.log.info("Not failing mirror topic {} since it is not currently active", uuid);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unLinkMirrorTopics(Uuid uuid, String str) {
        TimelineHashSet<Uuid> remove = this.linksToMirrorTopics.remove(uuid);
        if (remove != null) {
            this.log.info("Removing mirror topic metadata for {} topics due to deleted cluster link {} with ID {}.", Integer.valueOf(remove.size()), str, uuid);
            TimelineHashMap<Uuid, MirrorTopic> timelineHashMap = this.mirrorTopics;
            timelineHashMap.getClass();
            remove.forEach((v1) -> {
                r1.remove(v1);
            });
            if (this.log.isDebugEnabled()) {
                this.log.debug("Removed mirror topic metadata for topic IDs {}.", remove);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteMirrorTopic(Uuid uuid, String str) {
        MirrorTopic remove = this.mirrorTopics.remove(uuid);
        if (remove != null) {
            this.log.info("Removing mirror state from topic {} with ID {}", str, uuid);
            TimelineHashSet<Uuid> timelineHashSet = this.linksToMirrorTopics.get(remove.linkId());
            if (timelineHashSet != null) {
                timelineHashSet.remove(uuid);
            }
        }
    }

    public void replay(MirrorTopicRecord mirrorTopicRecord) {
        this.linksToMirrorTopics.computeIfAbsent(mirrorTopicRecord.clusterLinkId(), uuid -> {
            return new TimelineHashSet(this.snapshotRegistry, 0);
        }).add(mirrorTopicRecord.topicId());
        this.mirrorTopics.put(mirrorTopicRecord.topicId(), MirrorTopic.fromRecord(mirrorTopicRecord));
        this.log.info("Created mirror topic {} with topic ID {}, link name {}, and link ID {}.", mirrorTopicRecord.topicName(), mirrorTopicRecord.topicId(), mirrorTopicRecord.clusterLinkName(), mirrorTopicRecord.clusterLinkId());
    }

    public void replay(MirrorTopicChangeRecord mirrorTopicChangeRecord) {
        MirrorTopic stopped;
        MirrorTopic.State fromStateName = MirrorTopic.State.fromStateName(mirrorTopicChangeRecord.mirrorTopicState());
        MirrorTopic mirrorTopic = this.mirrorTopics.get(mirrorTopicChangeRecord.topicId());
        if (mirrorTopic == null) {
            throw new IllegalStateException("Attempting to update unknown mirror topic " + mirrorTopicChangeRecord.topicId());
        }
        switch (fromStateName) {
            case MIRROR:
                stopped = MirrorTopic.mirror(mirrorTopic, mirrorTopicChangeRecord.timeMs(), mirrorTopicChangeRecord.mirrorStartOffsets());
                break;
            case PAUSED:
                stopped = MirrorTopic.paused(mirrorTopic, mirrorTopicChangeRecord.timeMs(), mirrorTopicChangeRecord.topicLevelPause(), mirrorTopicChangeRecord.linkLevelPause(), MirrorTopic.State.fromStateName(mirrorTopicChangeRecord.previousToPausedState()), mirrorTopicChangeRecord.mirrorStartOffsets(), mirrorTopicChangeRecord.mirrorTopicError());
                break;
            case FAILED:
                stopped = MirrorTopic.failed(mirrorTopic, mirrorTopicChangeRecord.timeMs(), mirrorTopicChangeRecord.mirrorTopicError());
                break;
            case PENDING_STOPPED:
                stopped = MirrorTopic.pendingStopped(mirrorTopic, mirrorTopicChangeRecord.timeMs(), mirrorTopicChangeRecord.promoted());
                break;
            case STOPPED:
                stopped = MirrorTopic.stopped(mirrorTopic, mirrorTopicChangeRecord.timeMs(), mirrorTopicChangeRecord.stoppedLogEndOffsets());
                break;
            default:
                throw new RuntimeException("Cannot update mirror topic state for topic ID " + mirrorTopicChangeRecord.topicId() + ", unknown mirror state " + mirrorTopicChangeRecord.mirrorTopicState());
        }
        this.log.info("Updating mirror topic {} from {} to {}", stopped.topicId(), mirrorTopic.mirrorState(), fromStateName);
        this.mirrorTopics.put(mirrorTopicChangeRecord.topicId(), stopped);
    }

    void snapshotRecord(Uuid uuid, String str, long j, Consumer<ApiMessageAndVersion> consumer) {
        MirrorTopic mirrorTopic = this.mirrorTopics.get(uuid, j);
        if (mirrorTopic != null) {
            consumer.accept(new ApiMessageAndVersion(MirrorTopic.toSnapshotRecord(mirrorTopic, str), (short) 0));
        }
    }
}
