package kafka.tier.topic;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.cluster.BrokerEndPoint;
import kafka.server.BrokerBlockingSender;
import kafka.server.KafkaConfig;
import kafka.server.RemoteLeaderEndPoint;
import kafka.server.RemoteLeaderRequestBuilder;
import kafka.server.ReplicaManager;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionState;
import kafka.tier.tools.RecoveryUtils;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.PartitionResult;
import org.apache.kafka.clients.admin.ReplicaStatusOptions;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/topic/TierTopicDataLossValidator.class */
public class TierTopicDataLossValidator {
    private static final Logger log = LoggerFactory.getLogger(TierTopicDataLossValidator.class);
    private static final int DATA_LOSS_DETECTION_TIMEOUT_MS = 1800000;
    private static final int DATA_LOSS_DETECTION_THREAD_POOL_SIZE_MAX = 10;
    private final TierTopicManagerConfig config;
    private final TierTopic tierTopic;
    private final ReplicaManager replicaManager;
    private final Time time;
    private final Metrics metrics;
    private final Map<Integer, RemoteLeaderEndPoint> brokerToLeaderEndPoint = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/tier/topic/TierTopicDataLossValidator$OffsetForLeaderEpochTask.class */
    public class OffsetForLeaderEpochTask implements Callable<Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset>> {
        final int leaderId;
        final Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> request;

        OffsetForLeaderEpochTask(int i, Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> map) {
            this.leaderId = i;
            this.request = map;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> call() throws Exception {
            RemoteLeaderEndPoint leaderEndPoint = TierTopicDataLossValidator.this.getLeaderEndPoint(TierTopicDataLossValidator.this.config.brokerId, this.leaderId, TierTopicDataLossValidator.this.brokerToLeaderEndPoint);
            if (leaderEndPoint == null) {
                TierTopicDataLossValidator.log.error("Failed to get LeaderEndPoint for node {}", Integer.valueOf(this.leaderId));
                return Collections.emptyMap();
            }
            TierTopicDataLossValidator.log.debug("Sending OffsetForLeaderEpoch request to node {} for {} partitions: {}", new Object[]{Integer.valueOf(this.leaderId), Integer.valueOf(this.request.size()), this.request});
            Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> fetchEpochEndOffsetsAsJava = leaderEndPoint.fetchEpochEndOffsetsAsJava(this.request);
            TierTopicDataLossValidator.log.debug("Completed OffsetForLeaderEpoch request to node {} for {} partitions: {}", new Object[]{Integer.valueOf(this.leaderId), Integer.valueOf(fetchEpochEndOffsetsAsJava.size()), fetchEpochEndOffsetsAsJava});
            return fetchEpochEndOffsetsAsJava;
        }
    }

    public TierTopicDataLossValidator(TierTopicManagerConfig tierTopicManagerConfig, TierTopic tierTopic, ReplicaManager replicaManager, Time time, Metrics metrics) {
        this.config = tierTopicManagerConfig;
        this.tierTopic = tierTopic;
        this.replicaManager = replicaManager;
        this.time = time;
        this.metrics = metrics;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean detectDataLossInTierTopicHead(Producer<byte[], byte[]> producer) throws InterruptedException {
        if (!this.config.enableTierTopicDataLossDetection.booleanValue()) {
            log.info("Skip data loss detection in tier topic as {} is disabled.", KafkaConfig.TierTopicDataLossDetectionEnableProp());
            return false;
        }
        if (this.replicaManager.logManager().hadCleanShutdown()) {
            log.info("Skip data loss detection in tier topic as broker had clean shutdown.");
            return false;
        }
        log.info("Broker did not have clean shutdown. Validating data loss in tier topic head.");
        long milliseconds = this.time.milliseconds();
        Map<TopicPartition, OffsetAndEpoch> tierTopicMaxOffsetAndEpochByFtps = getTierTopicMaxOffsetAndEpochByFtps(this.tierTopic);
        if (tierTopicMaxOffsetAndEpochByFtps.isEmpty()) {
            log.info("Skip data loss detection in tier topic as there are no tier topic partitions to validate.");
            return false;
        }
        ConfluentAdmin createAdminClient = createAdminClient();
        if (createAdminClient == null) {
            log.warn("Skip data loss detection in tier topic as admin client is not created.");
            return false;
        }
        ExecutorService executorService = null;
        try {
            try {
                HashMap hashMap = new HashMap();
                Map<TopicPartition, PartitionResult> tierTopicPartitionLeaderInfo = getTierTopicPartitionLeaderInfo(tierTopicMaxOffsetAndEpochByFtps.keySet(), createAdminClient);
                while (true) {
                    if (tierTopicMaxOffsetAndEpochByFtps.isEmpty()) {
                        break;
                    }
                    log.info("Try to detect data loss on {} tier topic partitions", Integer.valueOf(tierTopicMaxOffsetAndEpochByFtps.size()));
                    Map<Integer, Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>> buildRequests = buildRequests(tierTopicMaxOffsetAndEpochByFtps, tierTopicPartitionLeaderInfo);
                    if (executorService == null && !buildRequests.isEmpty()) {
                        executorService = Executors.newFixedThreadPool(Math.min(buildRequests.size(), 10));
                    }
                    ArrayList arrayList = new ArrayList();
                    for (Map.Entry<Integer, Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>> entry : buildRequests.entrySet()) {
                        arrayList.add(executorService.submit(new OffsetForLeaderEpochTask(entry.getKey().intValue(), entry.getValue())));
                    }
                    HashMap hashMap2 = new HashMap();
                    arrayList.forEach(future -> {
                        try {
                            hashMap2.putAll((Map) future.get());
                        } catch (Exception e) {
                            log.error("Failed to finish the OffsetForLeaderEpoch task", e);
                        }
                    });
                    Map<TopicPartition, PartitionResult> tierTopicPartitionLeaderInfo2 = getTierTopicPartitionLeaderInfo(tierTopicMaxOffsetAndEpochByFtps.keySet(), createAdminClient);
                    validateDataLoss(tierTopicMaxOffsetAndEpochByFtps, hashMap, hashMap2, tierTopicPartitionLeaderInfo, tierTopicPartitionLeaderInfo2);
                    tierTopicPartitionLeaderInfo = tierTopicPartitionLeaderInfo2;
                    if (this.time.milliseconds() - milliseconds > 1800000) {
                        log.warn("Tier topic data loss detection timed out after {} ms. {} partitions remaining to be validated: {}", new Object[]{Integer.valueOf(DATA_LOSS_DETECTION_TIMEOUT_MS), Integer.valueOf(tierTopicMaxOffsetAndEpochByFtps.size()), tierTopicMaxOffsetAndEpochByFtps.keySet()});
                        break;
                    }
                    Thread.sleep(TimeUnit.SECONDS.toMillis(15L));
                }
                fenceUserTopicPartitions(hashMap, producer);
                log.info("Finished detecting data loss in tier topic in {} ms - {} partitions are found to have data loss: {}", new Object[]{Long.valueOf(this.time.milliseconds() - milliseconds), Integer.valueOf(hashMap.size()), hashMap.keySet()});
                return !hashMap.isEmpty();
            } catch (Exception e) {
                log.error("Failed to detect data loss in tier topic head", e);
                throw e;
            }
        } finally {
            Iterator<RemoteLeaderEndPoint> it = this.brokerToLeaderEndPoint.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            if (executorService != null) {
                executorService.shutdown();
            }
            createAdminClient.close();
        }
    }

    void validateDataLoss(Map<TopicPartition, OffsetAndEpoch> map, Map<TopicPartition, Long> map2, Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> map3, Map<TopicPartition, PartitionResult> map4, Map<TopicPartition, PartitionResult> map5) {
        map3.forEach((topicPartition, epochEndOffset) -> {
            OffsetAndEpoch offsetAndEpoch = (OffsetAndEpoch) map.getOrDefault(topicPartition, OffsetAndEpoch.EMPTY);
            PartitionResult partitionResult = (PartitionResult) map4.getOrDefault(topicPartition, new PartitionResult(Collections.emptyList()));
            PartitionResult partitionResult2 = (PartitionResult) map5.getOrDefault(topicPartition, new PartitionResult(Collections.emptyList()));
            if (offsetAndEpoch != OffsetAndEpoch.EMPTY && partitionResult.leaderId() == partitionResult2.leaderId() && partitionResult.leaderEpoch().equals(partitionResult2.leaderEpoch())) {
                if (epochEndOffset.errorCode() != Errors.NONE.code()) {
                    log.error("OffsetForLeaderEpoch request to node {} for partition {} returned error code {}", new Object[]{Integer.valueOf(partitionResult2.leaderId()), topicPartition, Short.valueOf(epochEndOffset.errorCode())});
                    return;
                }
                map.remove(topicPartition);
                if (!(epochEndOffset.leaderEpoch() != offsetAndEpoch.epoch().orElse(-1).intValue() || epochEndOffset.endOffset() <= offsetAndEpoch.offset())) {
                    log.debug("No data loss detected in partition {}: Leader epoch in the response {}, leader epoch in the request {}; end offset in response {}, highest committed offset in request {}", new Object[]{topicPartition, Integer.valueOf(epochEndOffset.leaderEpoch()), offsetAndEpoch.epoch(), Long.valueOf(epochEndOffset.endOffset()), Long.valueOf(offsetAndEpoch.offset())});
                } else {
                    map2.put(topicPartition, Long.valueOf(epochEndOffset.endOffset()));
                    log.error("Data loss detected in partition {}: Leader epoch in the response {}, leader epoch in the request {}; end offset in response {}, highest committed offset in request {}", new Object[]{topicPartition, Integer.valueOf(epochEndOffset.leaderEpoch()), offsetAndEpoch.epoch(), Long.valueOf(epochEndOffset.endOffset()), Long.valueOf(offsetAndEpoch.offset())});
                }
            }
        });
    }

    Map<TopicPartition, OffsetAndEpoch> getTierTopicMaxOffsetAndEpochByFtps(TierTopic tierTopic) {
        HashMap hashMap = new HashMap();
        this.replicaManager.logManager().allLogs().foreach(abstractLog -> {
            if (abstractLog.isDeleted() || abstractLog.isStray() || !abstractLog.topicIdPartition().isDefined()) {
                return null;
            }
            TierPartitionState tierPartitionState = abstractLog.tierPartitionState();
            if (tierPartitionState.status().hasError() || tierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch() == OffsetAndEpoch.EMPTY || !tierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch().epoch().isPresent()) {
                return null;
            }
            TopicPartition tierTopicPartition = tierTopic.toTierTopicPartition((TopicIdPartition) abstractLog.topicIdPartition().get());
            if (tierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch().offset() <= ((OffsetAndEpoch) hashMap.getOrDefault(tierTopicPartition, OffsetAndEpoch.EMPTY)).offset()) {
                return null;
            }
            hashMap.put(tierTopicPartition, tierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch());
            return null;
        });
        return hashMap;
    }

    ConfluentAdmin createAdminClient() {
        if (this.config.interBrokerClientConfigs.get().isEmpty()) {
            log.warn("Failed to create admin client - cannot resolve bootstrap server.");
            return null;
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.config.interBrokerClientConfigs.get().get("bootstrap.servers"));
        return ConfluentAdmin.create(properties);
    }

    Map<TopicPartition, PartitionResult> getTierTopicPartitionLeaderInfo(Set<TopicPartition> set, ConfluentAdmin confluentAdmin) {
        HashMap hashMap = new HashMap();
        confluentAdmin.replicaStatus(set, new ReplicaStatusOptions()).partitionResults().forEach((topicPartition, kafkaFuture) -> {
            PartitionResult partitionResult;
            try {
                partitionResult = (PartitionResult) kafkaFuture.get();
            } catch (Exception e) {
                log.warn("Failed to get PartitionResult for tier topic partition {}", topicPartition, e);
                partitionResult = new PartitionResult(Collections.emptyList());
            }
            hashMap.put(topicPartition, partitionResult);
        });
        return hashMap;
    }

    private Map<Integer, Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>> buildRequests(Map<TopicPartition, OffsetAndEpoch> map, Map<TopicPartition, PartitionResult> map2) {
        HashMap hashMap = new HashMap();
        map2.forEach((topicPartition, partitionResult) -> {
            OffsetAndEpoch offsetAndEpoch = (OffsetAndEpoch) map.getOrDefault(topicPartition, OffsetAndEpoch.EMPTY);
            if (partitionResult.leaderId() == -1 || !offsetAndEpoch.epoch().isPresent()) {
                return;
            }
            ((Map) hashMap.computeIfAbsent(Integer.valueOf(partitionResult.leaderId()), num -> {
                return new HashMap();
            })).put(topicPartition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(topicPartition.partition()).setLeaderEpoch(offsetAndEpoch.epoch().get().intValue()));
        });
        return hashMap;
    }

    private void fenceUserTopicPartitions(Map<TopicPartition, Long> map, Producer<byte[], byte[]> producer) {
        if (map.isEmpty()) {
            log.debug("No tier topic partition has data loss, no need to fence user topic partitions.");
            return;
        }
        if (!this.config.enableTierTopicFencingDuringDataLoss.booleanValue()) {
            log.info("Skip fencing user topic partitions as {} is disabled", KafkaConfig.TierTopicFencingDuringDataLossEnableProp());
            return;
        }
        HashMap hashMap = new HashMap();
        this.replicaManager.logManager().allLogs().foreach(abstractLog -> {
            if (abstractLog.isStray() || abstractLog.isDeleted() || !abstractLog.topicIdPartition().isDefined() || abstractLog.tierPartitionState().status().hasError()) {
                return null;
            }
            long offset = abstractLog.tierPartitionState().lastLocalMaterializedSrcOffsetAndEpoch().offset();
            if (offset < ((Long) map.getOrDefault(this.tierTopic.toTierTopicPartition((TopicIdPartition) abstractLog.topicIdPartition().get()), Long.valueOf(TierObjectMetadata.DEFAULT_STATE_CHANGE_TIMESTAMP))).longValue()) {
                return null;
            }
            hashMap.put(abstractLog.topicIdPartition().get(), Long.valueOf(offset));
            return null;
        });
        log.info("Fencing {} user topic partitions for {} tier topic partitions", Integer.valueOf(hashMap.size()), Integer.valueOf(map.size()));
        for (Map.Entry entry : hashMap.entrySet()) {
            TopicIdPartition topicIdPartition = (TopicIdPartition) entry.getKey();
            try {
                RecoveryUtils.injectTierTopicEventsUntilOffset(producer, topicIdPartition, this.tierTopic.topicName(), this.tierTopic.numPartitions().getAsInt(), ((Long) entry.getValue()).longValue() + 1, false);
            } catch (Exception e) {
                log.error("Failed to send tier partition fence event for partition {}", topicIdPartition, e);
            }
        }
    }

    RemoteLeaderEndPoint getLeaderEndPoint(int i, int i2, Map<Integer, RemoteLeaderEndPoint> map) {
        RemoteLeaderEndPoint remoteLeaderEndPoint = map.get(Integer.valueOf(i2));
        if (remoteLeaderEndPoint != null) {
            return remoteLeaderEndPoint;
        }
        Node node = (Node) this.replicaManager.metadataCache().getAliveBrokerNode(i2, this.replicaManager.config().interBrokerListenerName()).getOrElse(() -> {
            return Node.noNode();
        });
        if (node == Node.noNode()) {
            log.error("No broker node found for id {} when building RemoteLeaderEndPoint.", Integer.valueOf(i2));
            return null;
        }
        RemoteLeaderEndPoint remoteLeaderEndPoint2 = new RemoteLeaderEndPoint(String.format("[RemoteLeaderEndPoint NodeId=%d]", Integer.valueOf(node.id())), new BrokerBlockingSender(new BrokerEndPoint(node.id(), node.host(), node.port()), this.replicaManager.config(), this.metrics, this.time, -1, String.format("tier-topic-data-loss-detection-%d-to-%d-", Integer.valueOf(i), Integer.valueOf(i2)), new LogContext(), "tier-topic-data-loss-validator"), null, new RemoteLeaderRequestBuilder(this.replicaManager.config(), () -> {
            return this.replicaManager.metadataCache().metadataVersion();
        }, this.replicaManager.brokerEpochSupplier()), null, this.replicaManager.config(), this.replicaManager, null, () -> {
            return this.replicaManager.metadataCache().metadataVersion();
        });
        log.debug("Created a remote LeaderEndPoint for broker {}", Integer.valueOf(i2));
        map.put(Integer.valueOf(i2), remoteLeaderEndPoint2);
        return remoteLeaderEndPoint2;
    }
}
