package io.confluent.connect.replicator.offsets;

import io.confluent.connect.replicator.util.Utils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/offsets/ConsumerOffsetsTopicCommitter.class */
public class ConsumerOffsetsTopicCommitter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerOffsetsTopicCommitter.class);
    private final Consumer<byte[], byte[]> consumer;
    private final ConcurrentMap<TopicPartition, Long> offsets;
    private final AtomicBoolean doCommit;
    private ConcurrentMap<TopicPartition, Long> numUncommittedRecords;
    private final Time time;
    private final long commitOffsetPeriodMs;
    private long lastCommitOffsetMs;

    public ConsumerOffsetsTopicCommitter(Consumer<byte[], byte[]> consumer) {
        this(consumer, false, Time.SYSTEM, -1);
    }

    public ConsumerOffsetsTopicCommitter(Consumer<byte[], byte[]> consumer, boolean z, Time time, int i) {
        this.offsets = new ConcurrentHashMap();
        this.doCommit = new AtomicBoolean(false);
        this.lastCommitOffsetMs = 0L;
        this.consumer = consumer;
        if (z) {
            this.numUncommittedRecords = new ConcurrentHashMap();
        }
        this.time = time;
        this.commitOffsetPeriodMs = i;
    }

    public void commitRecords(List<ConsumerRecord<byte[], byte[]>> list) {
        log.debug("Committing consumer offsets...");
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            commitRecord(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
        }
    }

    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        Map<String, ?> sourcePartition = sourceRecord.sourcePartition();
        if (sourcePartition == null) {
            log.warn("Could not get source partition map from source record with destination topic {}", sourceRecord.topic());
            return;
        }
        String str = (String) sourcePartition.get("topic");
        if (str == null) {
            log.warn("Could not get source topic from source record with destination topic {}", sourceRecord.topic());
            return;
        }
        Integer num = (Integer) sourcePartition.get(Utils.PARTITION);
        if (num == null) {
            log.warn("Could not get source partition from source record with destination topic {}", sourceRecord.topic());
            return;
        }
        Map<String, ?> sourceOffset = sourceRecord.sourceOffset();
        if (sourceOffset == null) {
            log.warn("Could not get source offset map from source record with destination topic {}", sourceRecord.topic());
            return;
        }
        Long l = (Long) sourceOffset.get(Utils.OFFSET);
        if (l == null) {
            log.warn("Could not get source offset from source record with destination topic {}", sourceRecord.topic());
        } else if (recordMetadata != null || this.numUncommittedRecords == null) {
            commitRecord(str, num.intValue(), l.longValue());
        } else {
            maybeCommitFilteredRecordByConnect(str, num.intValue(), l.longValue());
        }
    }

    public void commitRecord(String str, int i, long j) {
        Long computeIfPresent;
        TopicPartition topicPartition = new TopicPartition(str, i);
        if (this.numUncommittedRecords != null && (computeIfPresent = this.numUncommittedRecords.computeIfPresent(topicPartition, (topicPartition2, l) -> {
            return Long.valueOf(l.longValue() - 1);
        })) != null && computeIfPresent.longValue() < 0) {
            log.error("Number of uncommitted records is {} for topic partition {} and offset {}", computeIfPresent, topicPartition, Long.valueOf(j));
        }
        this.offsets.put(topicPartition, Long.valueOf(j));
    }

    private void maybeCommitFilteredRecordByConnect(String str, int i, long j) {
        TopicPartition topicPartition = new TopicPartition(str, i);
        Long computeIfPresent = this.numUncommittedRecords.computeIfPresent(topicPartition, (topicPartition2, l) -> {
            return Long.valueOf(l.longValue() - 1);
        });
        if (computeIfPresent == null) {
            log.error("Topic partition {} not found for offset {}", topicPartition, Long.valueOf(j));
        } else if (computeIfPresent.longValue() < 0) {
            log.error("Number of uncommitted records is {} for topic partition {} and offset {}", computeIfPresent, topicPartition, Long.valueOf(j));
        } else if (computeIfPresent.longValue() == 0) {
            this.offsets.put(topicPartition, Long.valueOf(j));
        }
    }

    public void maybeCommitFilteredRecordByReplicator(String str, int i, long j) {
        TopicPartition topicPartition = new TopicPartition(str, i);
        Long l = this.numUncommittedRecords != null ? this.numUncommittedRecords.get(topicPartition) : null;
        if (l == null || l.longValue() == 0) {
            this.offsets.put(topicPartition, Long.valueOf(j));
        } else if (l.longValue() < 0) {
            log.error("Number of uncommitted records is {} for topic partition {} and offset {}", l, topicPartition, Long.valueOf(j));
        }
    }

    public void updateNumUncommittedRecords(TopicPartition topicPartition, long j) {
        if (this.numUncommittedRecords == null || j == 0 || this.numUncommittedRecords.computeIfPresent(topicPartition, (topicPartition2, l) -> {
            return Long.valueOf(l.longValue() + j);
        }) != null) {
            return;
        }
        this.numUncommittedRecords.put(topicPartition, Long.valueOf(j));
    }

    public void commit() {
        this.doCommit.set(true);
    }

    public void checkCommit() {
        if (this.numUncommittedRecords == null) {
            if (this.doCommit.getAndSet(false)) {
                commitConsumerOffset();
                return;
            }
            return;
        }
        long milliseconds = this.time.milliseconds();
        if (milliseconds < this.lastCommitOffsetMs) {
            this.lastCommitOffsetMs = milliseconds;
        } else {
            if (milliseconds - this.lastCommitOffsetMs < this.commitOffsetPeriodMs) {
                return;
            }
            this.lastCommitOffsetMs = milliseconds;
            commitConsumerOffset();
        }
    }

    private void commitConsumerOffset() {
        log.debug("Writing {} internal offset records", Integer.valueOf(this.offsets.size()));
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : this.offsets.keySet()) {
            Long l = this.offsets.get(topicPartition);
            if (l != null) {
                hashMap.put(topicPartition, new OffsetAndMetadata(l.longValue() + 1, ""));
            }
        }
        try {
            this.consumer.commitSync(hashMap);
            ConcurrentMap<TopicPartition, Long> concurrentMap = this.offsets;
            concurrentMap.getClass();
            hashMap.forEach((v1, v2) -> {
                r1.remove(v1, v2);
            });
        } catch (KafkaException e) {
            log.warn("Commit of offsets threw an unexpected exception: {}", hashMap, e);
            if (e.getCause() instanceof WakeupException) {
                log.debug("Commit of offsets threw WakeupException. This is expected on the consumer thread and may happen from time to time: {}", hashMap);
            } else {
                log.error("Commit of offsets threw an unexpected exception: {}", hashMap, e);
            }
            this.doCommit.set(true);
        }
    }
}
