package io.confluent.controlcenter.streams.verify;

import com.google.common.base.MoreObjects;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import io.confluent.controlcenter.errors.AlreadyShutdownException;
import io.confluent.controlcenter.errors.DuplicateSequenceNumberException;
import io.confluent.controlcenter.errors.SequenceAfterShutdownException;
import io.confluent.controlcenter.errors.ShutdownSequenceNotHighestException;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.streams.aggregation.MetricsAggregation;
import io.confluent.monitoring.common.MonitoringMessageUtil;
import io.confluent.monitoring.common.TimeBucket;
import io.confluent.monitoring.record.Monitoring;
import io.netty.handler.codec.rtsp.RtspHeaders;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/verify/MonitoringVerifier.class */
public class MonitoringVerifier {
    private static final long DEFAULT_NORMALIZED_WINDOW_DISTANCE = 10;
    private static final long MIN_NORMALIZED_WINDOW_DISTANCE = 1;
    private static final long MAX_CONSECUTIVE_REPORTED_MISSING_SEQUENCES = 20;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MonitoringVerifier.class);
    private final Monitoring.MonitoringMessage.Builder baseMonitoringMessageBuilder;
    private final long timeout;
    private final long windowHistorySize;
    private RangeSet<SequenceWithMetadata> ranges;
    private boolean shutdownReceived;
    private boolean sessionEnded;
    private boolean maybeSessionEnded;
    private long lastSequenceTime;
    private SequenceWithMetadata lastSequenceWithWindow;
    private long expectedWindowDistance;

    public MonitoringVerifier(Monitoring.MonitoringMessage monitoringMessage, long j) {
        this(monitoringMessage, j, 10);
    }

    public MonitoringVerifier(Monitoring.MonitoringMessage monitoringMessage, long j, int i) {
        this.ranges = TreeRangeSet.create();
        this.shutdownReceived = false;
        this.sessionEnded = false;
        this.maybeSessionEnded = false;
        this.lastSequenceTime = -1L;
        this.lastSequenceWithWindow = null;
        this.expectedWindowDistance = -1L;
        this.baseMonitoringMessageBuilder = Monitoring.MonitoringMessage.newBuilder(MonitoringMessageUtil.baseMonitoringMessage()).setClusterId(monitoringMessage.getClusterId()).setWindow(monitoringMessage.getWindow()).setSession(monitoringMessage.getSession()).setGroup(monitoringMessage.getGroup()).setClientId(monitoringMessage.getClientId()).setClientType(monitoringMessage.getClientType()).setTopic(monitoringMessage.getTopic()).setPartition(monitoringMessage.getPartition()).setMinWindow(monitoringMessage.getMinWindow()).setMaxWindow(monitoringMessage.getMaxWindow()).setSamplePeriod(monitoringMessage.getSamplePeriod());
        this.baseMonitoringMessageBuilder.setSamplePeriod(monitoringMessage.getSamplePeriod() < TimeBucket.SIZE ? TimeBucket.SIZE : monitoringMessage.getSamplePeriod());
        this.timeout = j;
        this.windowHistorySize = i;
    }

    public MonitoringVerifier(Controlcenter.VerifierInfo verifierInfo) {
        this.ranges = TreeRangeSet.create();
        this.shutdownReceived = false;
        this.sessionEnded = false;
        this.maybeSessionEnded = false;
        this.lastSequenceTime = -1L;
        this.lastSequenceWithWindow = null;
        this.expectedWindowDistance = -1L;
        this.baseMonitoringMessageBuilder = Monitoring.MonitoringMessage.newBuilder(verifierInfo.getBaseMonitoringMessage());
        this.timeout = verifierInfo.getTimeout();
        this.shutdownReceived = verifierInfo.getShutdownReceived();
        this.sessionEnded = verifierInfo.getSessionEnded();
        this.maybeSessionEnded = verifierInfo.getMaybeSessionEnded();
        this.lastSequenceTime = verifierInfo.getLastSequenceTime();
        this.windowHistorySize = verifierInfo.getWindowHistorySize();
        this.expectedWindowDistance = verifierInfo.getExpectedWindowDistance();
        if (verifierInfo.getLastSequenceWithWindow() != null && verifierInfo.getLastSequenceWithWindow().getSequence() > -1) {
            this.lastSequenceWithWindow = new SequenceWithMetadata(verifierInfo.getLastSequenceWithWindow());
        }
        for (Controlcenter.SequenceInfoPair sequenceInfoPair : verifierInfo.getRangesList()) {
            this.ranges.add(Range.closedOpen(new SequenceWithMetadata(sequenceInfoPair.getLower()), new SequenceWithMetadata(sequenceInfoPair.getUpper())));
        }
    }

    public Controlcenter.VerifierInfo createVerifierInfo() {
        ArrayList arrayList = new ArrayList();
        for (Range<SequenceWithMetadata> range : this.ranges.asRanges()) {
            arrayList.add(Controlcenter.SequenceInfoPair.newBuilder().setLower(range.lowerEndpoint().toSequenceInfo()).setUpper(range.upperEndpoint().toSequenceInfo()).build());
        }
        Controlcenter.VerifierInfo.Builder addAllRanges = Controlcenter.VerifierInfo.newBuilder().setBaseMonitoringMessage(this.baseMonitoringMessageBuilder).setTimeout(this.timeout).setShutdownReceived(this.shutdownReceived).setSessionEnded(this.sessionEnded).setMaybeSessionEnded(this.maybeSessionEnded).setLastSequenceTime(this.lastSequenceTime).setWindowHistorySize(this.windowHistorySize).setExpectedWindowDistance(this.expectedWindowDistance).addAllRanges(arrayList);
        if (this.lastSequenceWithWindow != null) {
            addAllRanges.setLastSequenceWithWindow(this.lastSequenceWithWindow.toSequenceInfo());
        } else {
            addAllRanges.setLastSequenceWithWindow(Controlcenter.SequenceInfo.newBuilder().setSequence(-1L).build());
        }
        return addAllRanges.build();
    }

    public String getClientId() {
        return this.baseMonitoringMessageBuilder.getClientId();
    }

    public String getTopic() {
        return this.baseMonitoringMessageBuilder.getTopic();
    }

    public int getPartition() {
        return this.baseMonitoringMessageBuilder.getPartition();
    }

    public long getSamplePeriod() {
        return this.baseMonitoringMessageBuilder.getSamplePeriod();
    }

    public String getSession() {
        return this.baseMonitoringMessageBuilder.getSession();
    }

    public Monitoring.MonitoringMessage getCopyOfBaseMonitoringMessage() {
        return this.baseMonitoringMessageBuilder.build();
    }

    public long lastActivityTimeMs() {
        return this.lastSequenceTime;
    }

    public void addSequence(Monitoring.MonitoringMessage monitoringMessage, long j) {
        if (isDone()) {
            throw new IllegalStateException("Received monitoring message after all expected sequences were received. Sequence=" + monitoringMessage.getSequence());
        }
        this.maybeSessionEnded = false;
        SequenceWithMetadata sequenceWithMetadata = new SequenceWithMetadata(monitoringMessage, j);
        ensureSequenceNotDuplicate(sequenceWithMetadata);
        SequenceWithMetadata nextWithTimestamp = sequenceWithMetadata.nextWithTimestamp(Long.valueOf(sequenceWithMetadata.timestamp()));
        if (monitoringMessage.getShutdown()) {
            if (this.shutdownReceived) {
                throw new AlreadyShutdownException("One more non-duplicate sequence with shutdown. " + sequenceWithMetadata);
            }
            if (!this.ranges.isEmpty() && (this.ranges.span().contains(sequenceWithMetadata) || sequenceWithMetadata.sequence() < this.ranges.span().lowerEndpoint().sequence())) {
                throw new ShutdownSequenceNotHighestException("Sequence with shutdown flag is not highest: " + sequenceWithMetadata);
            }
            this.shutdownReceived = true;
        } else if (this.shutdownReceived && this.ranges.span().intersection(Range.atLeast(sequenceWithMetadata)).isEmpty()) {
            throw new SequenceAfterShutdownException("Sequence number after shutdown is higher than any other sequence number " + sequenceWithMetadata);
        }
        this.ranges.add(Range.closedOpen(sequenceWithMetadata, nextWithTimestamp));
        if (j >= 0) {
            this.lastSequenceTime = j;
        }
        updateWindowDistanceAndLastSequence(sequenceWithMetadata, monitoringMessage.getType() == Monitoring.MessageType.NORMAL ? monitoringMessage.getWindow() : -1L);
        verifySequencesBeforeShutdownAndMaybeSetDone();
    }

    public RangeSet<Long> getMissingMonitoringData(long j) {
        if (this.ranges.isEmpty() || j < 0) {
            return null;
        }
        if (this.lastSequenceTime < 0) {
            this.lastSequenceTime = j;
        }
        SequenceWithMetadata upperEndpoint = this.ranges.asDescendingSetOfRanges().iterator().next().upperEndpoint();
        long j2 = this.lastSequenceTime <= 0 ? 0L : j - this.lastSequenceTime;
        long timestamp = (upperEndpoint.timestamp() - this.timeout) + j2;
        log.trace("thresholdTimestamp=" + timestamp + ", local delta = " + j2);
        TreeRangeSet treeRangeSet = null;
        RangeSet<Long> rangeSet = null;
        Range<SequenceWithMetadata> range = null;
        Long l = null;
        for (Range<SequenceWithMetadata> range2 : this.ranges.asRanges()) {
            SequenceWithMetadata lowerEndpoint = range2.lowerEndpoint();
            if (range == null && lowerEndpoint.sequence() > 0) {
                l = 0L;
            } else if (range != null) {
                if (range.upperBoundType() == BoundType.CLOSED) {
                    throw new IllegalStateException("Upper bound of any range must always be open");
                }
                l = Long.valueOf(range.upperEndpoint().sequence());
            }
            Long valueOf = Long.valueOf(lowerEndpoint.sequence() - 1);
            if (l != null && l.longValue() <= valueOf.longValue()) {
                long timedOutSequenceTimestamp = getTimedOutSequenceTimestamp(j, range == null ? null : range.upperEndpoint(), lowerEndpoint.timestamp());
                if (timedOutSequenceTimestamp >= 0) {
                    log.warn("Missing sequence numbers=[{}, {}] from client={}, topic={}, partition={}, estimated timestamp={}, next timestamp={}", l, valueOf, getClientId(), getTopic(), Integer.valueOf(getPartition()), Long.valueOf(timedOutSequenceTimestamp), Long.valueOf(lowerEndpoint.timestamp()));
                    long longValue = l.longValue();
                    if (valueOf.longValue() - longValue >= MAX_CONSECUTIVE_REPORTED_MISSING_SEQUENCES) {
                        log.warn("Missing {} consecutive sequences; will report {} highest sequences for client={},  topic={}, partition=={}", Long.valueOf((valueOf.longValue() - longValue) + 1), Long.valueOf(MAX_CONSECUTIVE_REPORTED_MISSING_SEQUENCES), getClientId(), getTopic(), Integer.valueOf(getPartition()));
                        longValue = (valueOf.longValue() - MAX_CONSECUTIVE_REPORTED_MISSING_SEQUENCES) + 1;
                    }
                    long j3 = longValue;
                    while (true) {
                        long j4 = j3;
                        if (j4 > valueOf.longValue()) {
                            break;
                        }
                        rangeSet = addMissingWindowRange(j4, lowerEndpoint, rangeSet, j4 == longValue);
                        j3 = j4 + 1;
                    }
                    if (treeRangeSet == null) {
                        treeRangeSet = TreeRangeSet.create();
                    }
                    treeRangeSet.add(Range.closedOpen(new SequenceWithMetadata(l.longValue(), Long.valueOf(timedOutSequenceTimestamp)), lowerEndpoint));
                }
            } else if (l != null) {
                throw new IllegalStateException("Ranges must be in increasing sequence order");
            }
            if (l != null && rangeSet == null) {
                break;
            }
            range = range2;
        }
        if (treeRangeSet != null && !treeRangeSet.isEmpty()) {
            this.ranges.addAll(treeRangeSet);
        }
        RangeSet<Long> maybeTimeoutSequenceAfterHighest = maybeTimeoutSequenceAfterHighest(upperEndpoint, timestamp, rangeSet);
        if (maybeTimeoutSequenceAfterHighest != null) {
            verifySequencesBeforeShutdownAndMaybeSetDone();
        }
        return maybeTimeoutSequenceAfterHighest;
    }

    public void notifySessionEnded() {
        this.sessionEnded = true;
    }

    public boolean isDone() {
        return this.shutdownReceived && this.ranges.isEmpty();
    }

    public boolean isSessionNotActive() {
        return isDone() || this.sessionEnded || this.maybeSessionEnded;
    }

    protected boolean isSessionEnded() {
        return this.sessionEnded;
    }

    private void ensureSequenceNotDuplicate(SequenceWithMetadata sequenceWithMetadata) {
        if (this.ranges.contains(sequenceWithMetadata)) {
            throw new DuplicateSequenceNumberException("Duplicate sequence number: " + sequenceWithMetadata.sequence());
        }
    }

    private RangeSet<Long> maybeTimeoutSequenceAfterHighest(SequenceWithMetadata sequenceWithMetadata, long j, RangeSet<Long> rangeSet) {
        if (!this.shutdownReceived) {
            long timestamp = sequenceWithMetadata.timestamp() + (5 * getSamplePeriod());
            long sequence = sequenceWithMetadata.sequence();
            if (this.sessionEnded && timestamp <= j) {
                log.info("Session ended and timed out at least one sequence number ({}) after the highest received sequence number. Done", Long.valueOf(sequence));
                this.ranges.clear();
                this.shutdownReceived = true;
                return addMissingWindowRange(sequence, rangeSet, true);
            }
            if (!this.maybeSessionEnded && !this.sessionEnded && timestamp <= j) {
                log.info("Timed out sequence={} after the highest received sequence for topic={}, partition={}, clientID={}. Session maybe ended.", Long.valueOf(sequence), getTopic(), Integer.valueOf(getPartition()), getClientId());
                this.ranges.add(Range.closedOpen(new SequenceWithMetadata(0L), sequenceWithMetadata.nextWithTimestamp(Long.valueOf(timestamp))));
                this.maybeSessionEnded = true;
                return addMissingWindowRange(sequence, rangeSet, true);
            }
        }
        return rangeSet;
    }

    private void verifySequencesBeforeShutdownAndMaybeSetDone() {
        if (this.shutdownReceived) {
            Set<Range<SequenceWithMetadata>> asRanges = this.ranges.asRanges();
            if (asRanges.size() == 1 && asRanges.iterator().next().lowerEndpoint().sequence() == 0) {
                log.debug("MonitoringVerifier got all sequence numbers and a shutdown flag!");
                this.ranges.clear();
            }
        }
    }

    private long getTimedOutSequenceTimestamp(long j, SequenceWithMetadata sequenceWithMetadata, long j2) {
        long j3 = j - this.timeout;
        long j4 = j2;
        if (sequenceWithMetadata != null && sequenceWithMetadata.timestamp() > 0) {
            j4 = Math.max(j4, sequenceWithMetadata.timestamp());
        }
        if (j4 <= j3) {
            return j4;
        }
        return -1L;
    }

    private RangeSet<Long> addMissingWindowRange(long j, SequenceWithMetadata sequenceWithMetadata, RangeSet<Long> rangeSet, boolean z) {
        long sequence = sequenceWithMetadata.sequence() - j;
        if (sequence < 1) {
            throw new IllegalArgumentException("Invalid missing sequence");
        }
        if (sequenceWithMetadata.minWindow() < 0 && sequence <= this.windowHistorySize) {
            return rangeSet;
        }
        RangeSet<Long> rangeSet2 = rangeSet;
        if (rangeSet2 == null) {
            rangeSet2 = TreeRangeSet.create();
        }
        if (sequenceWithMetadata.minWindow() < 0) {
            return addMissingWindowRange(j, rangeSet, z);
        }
        long minWindow = sequenceWithMetadata.minWindow();
        long maxWindow = sequenceWithMetadata.maxWindow();
        if (sequence > this.windowHistorySize) {
            log.info("Missed too many consecutive sequence numbers for client=" + getClientId() + ", topic=" + getTopic() + ", partition=" + getPartition() + ". Will estimate time window range with missed monitoring data.");
            long expectedDistanceMs = getExpectedDistanceMs(sequence - this.windowHistorySize);
            minWindow = Math.max(0L, minWindow - expectedDistanceMs);
            maxWindow += expectedDistanceMs;
        }
        rangeSet2.add(Range.closed(Long.valueOf(minWindow), Long.valueOf(maxWindow)));
        log.info("Missing sequence=" + j + " window range=[" + minWindow + "..." + maxWindow + "] for clientId=" + getClientId() + ", topic=" + getTopic() + ", partition=" + getPartition());
        return rangeSet2;
    }

    private RangeSet<Long> addMissingWindowRange(long j, RangeSet<Long> rangeSet, boolean z) {
        RangeSet<Long> rangeSet2 = rangeSet;
        if (rangeSet2 == null) {
            rangeSet2 = TreeRangeSet.create();
        }
        if (this.lastSequenceWithWindow == null || this.lastSequenceWithWindow.minWindow() < 0 || this.lastSequenceWithWindow.maxWindow() < 0 || this.lastSequenceWithWindow.sequence() < 0) {
            if (z) {
                log.warn("No information about windows received for this session for client=" + getClientId() + "topic=" + getTopic() + ", partition=" + getPartition());
            }
            rangeSet2.add(Range.closed(0L, 0L));
            return rangeSet2;
        }
        long abs = Math.abs(j - this.lastSequenceWithWindow.sequence());
        if (abs < 1) {
            throw new IllegalArgumentException("Missing sequence expected to be different from last seen");
        }
        long expectedDistanceMs = getExpectedDistanceMs(abs + 1);
        long max = Math.max(0L, this.lastSequenceWithWindow.minWindow() - expectedDistanceMs);
        long maxWindow = this.lastSequenceWithWindow.maxWindow() + expectedDistanceMs;
        rangeSet2.add(Range.closed(Long.valueOf(max), Long.valueOf(maxWindow)));
        log.warn("No window info in the next non-missing sequence. Estimated window range for missing sequence=" + j + " [" + max + "..." + maxWindow + "], lastSequenceWithWindow=" + this.lastSequenceWithWindow + " for clientId=" + getClientId() + ", topic=" + getTopic() + ", partition=" + getPartition());
        return rangeSet2;
    }

    private void updateWindowDistanceAndLastSequence(SequenceWithMetadata sequenceWithMetadata, long j) {
        if (sequenceWithMetadata.minWindow() < 0 || sequenceWithMetadata.maxWindow() < 0) {
            if (j >= 0) {
                this.lastSequenceWithWindow = sequenceWithMetadata.withMinMaxWindow(Long.valueOf(j));
            }
        } else {
            this.lastSequenceWithWindow = sequenceWithMetadata;
            long maxWindow = sequenceWithMetadata.maxWindow() - sequenceWithMetadata.minWindow();
            if (maxWindow < 0) {
                log.error("maxWindow < minWindow. Not able to record expected window range.");
            } else {
                this.expectedWindowDistance = Math.max(this.expectedWindowDistance, maxWindow);
            }
        }
    }

    private long getExpectedDistanceMs(long j) {
        long min = Math.min(Math.max((this.expectedWindowDistance < 0 || this.windowHistorySize <= 1) ? DEFAULT_NORMALIZED_WINDOW_DISTANCE : (this.expectedWindowDistance / (this.windowHistorySize - 1)) / getSamplePeriod(), 1L), 600L) * getSamplePeriod();
        log.trace("Calculated normalized distance =" + min + ", history size=" + this.windowHistorySize + ", expectedWindowDistance=" + this.expectedWindowDistance + ", default normalized distance = " + DEFAULT_NORMALIZED_WINDOW_DISTANCE);
        return min * j;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        MonitoringVerifier monitoringVerifier = (MonitoringVerifier) obj;
        return Objects.equals(this.baseMonitoringMessageBuilder.buildPartial(), monitoringVerifier.baseMonitoringMessageBuilder.buildPartial()) && Objects.equals(Long.valueOf(this.lastSequenceTime), Long.valueOf(monitoringVerifier.lastSequenceTime)) && Objects.equals(this.ranges, monitoringVerifier.ranges) && Objects.equals(Boolean.valueOf(this.shutdownReceived), Boolean.valueOf(monitoringVerifier.shutdownReceived)) && Objects.equals(Boolean.valueOf(this.sessionEnded), Boolean.valueOf(monitoringVerifier.sessionEnded)) && Objects.equals(Boolean.valueOf(this.maybeSessionEnded), Boolean.valueOf(monitoringVerifier.maybeSessionEnded)) && Objects.equals(Long.valueOf(this.timeout), Long.valueOf(monitoringVerifier.timeout)) && Objects.equals(Long.valueOf(this.windowHistorySize), Long.valueOf(monitoringVerifier.windowHistorySize)) && Objects.equals(Long.valueOf(this.expectedWindowDistance), Long.valueOf(monitoringVerifier.expectedWindowDistance)) && Objects.equals(this.lastSequenceWithWindow, monitoringVerifier.lastSequenceWithWindow);
    }

    public int hashCode() {
        return Objects.hash(this.baseMonitoringMessageBuilder.buildPartial(), Long.valueOf(this.lastSequenceTime), this.ranges, Boolean.valueOf(this.shutdownReceived), Boolean.valueOf(this.sessionEnded), Boolean.valueOf(this.maybeSessionEnded), Long.valueOf(this.timeout), Long.valueOf(this.windowHistorySize), Long.valueOf(this.expectedWindowDistance), this.lastSequenceWithWindow);
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("clientId", getClientId()).add("topic", getTopic()).add(MetricsAggregation.PARTITION_DIMENSION, getPartition()).add("session", getSession()).add(RtspHeaders.Values.TIMEOUT, this.timeout).add("windowHistorySize", this.windowHistorySize).add("shutdownReceived", this.shutdownReceived).add("maybeSessionEnded", this.maybeSessionEnded).add("sessionEnded", this.sessionEnded).add("lastSequenceTime", this.lastSequenceTime).add("expectedWindowDistance", this.expectedWindowDistance).add("lastSequenceWithWindow", this.lastSequenceWithWindow).add("ranges", this.ranges).toString();
    }
}
