package io.confluent.connect.replicator.offsets;

import io.confluent.connect.replicator.DeadlineManager;
import io.confluent.connect.replicator.ReplicatorSourceTask;
import io.confluent.connect.replicator.ReplicatorSourceTaskConfig;
import io.confluent.connect.replicator.Translator;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import io.confluent.connect.replicator.util.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/offsets/OffsetManager.class */
public class OffsetManager {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OffsetManager.class);
    private ReplicatorSourceTaskConfig config;
    private volatile Consumer<byte[], byte[]> consumer;
    private SourceTaskContext context;
    private DeadlineManager deadlineManager;

    /* loaded from: input_file:io/confluent/connect/replicator/offsets/OffsetManager$TopicPartitionInfo.class */
    public static class TopicPartitionInfo {
        private TopicPartition sourcePartition;
        private String destinationId;
        private boolean pause;

        public TopicPartition getSourcePartition() {
            return this.sourcePartition;
        }

        public String getDestinationId() {
            return this.destinationId;
        }

        public boolean isPaused() {
            return this.pause;
        }

        public TopicPartitionInfo(TopicPartition topicPartition, String str, boolean z) {
            this.sourcePartition = topicPartition;
            this.destinationId = str;
            this.pause = z;
        }
    }

    public void setConfig(ReplicatorSourceTaskConfig replicatorSourceTaskConfig) {
        this.config = replicatorSourceTaskConfig;
    }

    public void setConsumer(Consumer<byte[], byte[]> consumer) {
        this.consumer = consumer;
    }

    public void setContext(SourceTaskContext sourceTaskContext) {
        this.context = sourceTaskContext;
    }

    public OffsetManager(ReplicatorSourceTaskConfig replicatorSourceTaskConfig, Consumer<byte[], byte[]> consumer, SourceTaskContext sourceTaskContext, DeadlineManager deadlineManager) {
        this.config = replicatorSourceTaskConfig;
        this.consumer = consumer;
        this.context = sourceTaskContext;
        this.deadlineManager = deadlineManager;
    }

    public void initializePartitionOffset(Collection<TopicPartition> collection, Map<String, Translator> map, ReplicatorAdminClient replicatorAdminClient) {
        boolean z;
        String str;
        boolean z2;
        HashSet hashSet = new HashSet();
        List<TopicPartitionInfo> arrayList = new ArrayList<>();
        for (TopicPartition topicPartition : collection) {
            Translator translator = map.get(topicPartition.topic());
            if (translator != null) {
                z = translator.isDestinationReady();
                str = "translator for " + translator.topic();
                z2 = translator.seekToBeginningOnPause();
            } else {
                TopicPartition destPartition = ReplicatorSourceTask.toDestPartition(topicPartition, this.config);
                z = !this.config.getTopicPreservePartitions() || replicatorAdminClient.partitionExists(destPartition);
                str = "partition " + destPartition;
                z2 = false;
            }
            if (z || !z2) {
                hashSet.add(new TopicPartitionInfo(topicPartition, str, !z));
            } else {
                arrayList.add(new TopicPartitionInfo(topicPartition, str, true));
            }
        }
        if (!arrayList.isEmpty()) {
            arrayList.forEach(topicPartitionInfo -> {
                log.trace("Seeking to the beginning and pausing source partition {} since destination {} is not ready yet", topicPartitionInfo.getSourcePartition(), topicPartitionInfo.getDestinationId());
            });
            seekToBeginning((Collection) arrayList.stream().map(topicPartitionInfo2 -> {
                return topicPartitionInfo2.getSourcePartition();
            }).collect(Collectors.toList()));
            pauseSourcePartitions(arrayList);
        }
        seekSourcePartitions(hashSet);
    }

    public void pauseSourcePartitions(List<TopicPartitionInfo> list) {
        log.debug("Pausing the following topic partitions: {}", list);
        if (list.isEmpty()) {
            return;
        }
        this.consumer.pause((Collection) list.stream().map(topicPartitionInfo -> {
            return topicPartitionInfo.getSourcePartition();
        }).collect(Collectors.toSet()));
        DeadlineManager deadlineManager = this.deadlineManager;
        DeadlineManager.DeadlineType deadlineType = DeadlineManager.DeadlineType.TRY_RESUME;
        long milliSeconds = this.deadlineManager.getMilliSeconds();
        DeadlineManager deadlineManager2 = this.deadlineManager;
        deadlineManager.set(deadlineType, Long.valueOf(milliSeconds + 5000));
    }

    private void seekUsingConnectOffsets(Set<TopicPartition> set) {
        log.debug("Seeking to the following partitions using connect offsets: {}", set);
        if (set.isEmpty()) {
            return;
        }
        Map map = (Map) set.stream().collect(Collectors.toMap(topicPartition -> {
            return Utils.toConnectPartition(topicPartition);
        }, topicPartition2 -> {
            return topicPartition2;
        }));
        Map offsets = this.context.offsetStorageReader().offsets(map.keySet());
        Map map2 = (Map) map.entrySet().stream().filter(entry -> {
            return offsets.get(entry.getKey()) != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        map2.forEach((map3, topicPartition3) -> {
            seekToConnectOffset(topicPartition3, (Map) offsets.get(map3));
        });
        set.removeAll(map2.values());
    }

    private void seekUsingConsumerOffsets(Set<TopicPartition> set) {
        if (set.isEmpty()) {
            return;
        }
        Map<TopicPartition, OffsetAndMetadata> committed = this.consumer.committed(set);
        Set set2 = (Set) set.stream().filter(topicPartition -> {
            return committed.get(topicPartition) != null;
        }).collect(Collectors.toSet());
        set2.forEach(topicPartition2 -> {
            seekToConsumerOffset(topicPartition2, (OffsetAndMetadata) committed.get(topicPartition2));
        });
        set.removeAll(set2);
    }

    private void seekSourcePartitions(Collection<TopicPartitionInfo> collection) {
        log.debug("Seeking to appropriate offsets for the source partitions...");
        if (collection.isEmpty()) {
            return;
        }
        Set<TopicPartition> set = (Set) collection.stream().map(topicPartitionInfo -> {
            return topicPartitionInfo.getSourcePartition();
        }).collect(Collectors.toSet());
        switch (this.config.getOffsetStart()) {
            case CONNECT:
                seekUsingConnectOffsets(set);
                seekUsingConsumerOffsets(set);
                break;
            case CONSUMER:
                seekUsingConsumerOffsets(set);
                seekUsingConnectOffsets(set);
                break;
            default:
                throw new IllegalArgumentException("Unsupported offset start type");
        }
        if (!set.isEmpty()) {
            seekToBeginning(set);
        }
        collection.forEach(topicPartitionInfo2 -> {
            log.trace("Pausing source partition {} since destination {} is not ready yet", topicPartitionInfo2.getSourcePartition(), topicPartitionInfo2.getDestinationId());
        });
        pauseSourcePartitions((List) collection.stream().filter(topicPartitionInfo3 -> {
            return topicPartitionInfo3.isPaused();
        }).collect(Collectors.toList()));
    }

    public void seekToBeginning(Collection<TopicPartition> collection) {
        log.debug("Seeking to the beginning of source partition for the following topic partitions {}", collection);
        this.consumer.seekToBeginning(collection);
    }

    private void seekToConnectOffset(TopicPartition topicPartition, Map<String, Object> map) {
        long longValue = ((Long) map.get(Utils.OFFSET)).longValue();
        log.debug("Seeking to offset {} committed by Connect for source partition {}", Long.valueOf(longValue), topicPartition);
        this.consumer.seek(topicPartition, longValue + 1);
    }

    private void seekToConsumerOffset(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        log.debug("Using consumer committed offset {} for source partition {}", Long.valueOf(offsetAndMetadata.offset()), topicPartition);
    }
}
