package io.confluent.controlcenter.streams.alert;

import io.confluent.controlcenter.alert.record.Alert;
import io.confluent.controlcenter.streams.internals.KeyValueStoreFacade;
import io.confluent.serializers.OrderedKeyUberSerde;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/alert/AlertHistoryProcessorSupplier.class */
public class AlertHistoryProcessorSupplier implements ProcessorSupplier<Alert.AlertInfo, Alert.AlertInfo> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AlertHistoryProcessorSupplier.class);
    private static final long DEFAULT_RETAIN_MS = TimeUnit.DAYS.toMillis(7);
    private static final long DEFAULT_PUNCTUATE_INTERVAL_MS = TimeUnit.MINUTES.toMillis(10);
    private static final long MAX_PUNCTUATE_ITEMS = 1000;
    private final String storeName;
    private final OrderedKeyUberSerde<Alert.AlertInfo> alertKeySerde;
    private final long storeRetainMs;
    private final long punctuateIntervalMs;
    private final long maxPunctuateItems;
    private final OrderedKeyUberSerde<Alert.AlertInfo> alertRangeSerde;

    public AlertHistoryProcessorSupplier(String str, long j, long j2, long j3, OrderedKeyUberSerde<Alert.AlertInfo> orderedKeyUberSerde, OrderedKeyUberSerde<Alert.AlertInfo> orderedKeyUberSerde2) {
        this.storeName = str;
        this.storeRetainMs = j;
        this.punctuateIntervalMs = j2;
        this.maxPunctuateItems = j3;
        this.alertKeySerde = orderedKeyUberSerde;
        this.alertRangeSerde = orderedKeyUberSerde2;
    }

    public AlertHistoryProcessorSupplier(String str, OrderedKeyUberSerde<Alert.AlertInfo> orderedKeyUberSerde, OrderedKeyUberSerde<Alert.AlertInfo> orderedKeyUberSerde2) {
        this(str, DEFAULT_RETAIN_MS, DEFAULT_PUNCTUATE_INTERVAL_MS, 1000L, orderedKeyUberSerde, orderedKeyUberSerde2);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<Alert.AlertInfo, Alert.AlertInfo> get() {
        return new AbstractProcessor<Alert.AlertInfo, Alert.AlertInfo>() { // from class: io.confluent.controlcenter.streams.alert.AlertHistoryProcessorSupplier.1
            private KeyValueStore<Bytes, Alert.AlertInfo> kvStore;
            private long lastPunctuateTimeMs = -1;

            @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
            public void init(ProcessorContext processorContext) {
                super.init(processorContext);
                StateStore stateStore = processorContext.getStateStore(AlertHistoryProcessorSupplier.this.storeName);
                if (stateStore instanceof TimestampedKeyValueStore) {
                    this.kvStore = new KeyValueStoreFacade((TimestampedKeyValueStore) stateStore);
                } else {
                    this.kvStore = (KeyValueStore) stateStore;
                }
            }

            /* JADX WARN: Code restructure failed: missing block: B:24:0x00f7, code lost:
            
                io.confluent.controlcenter.streams.alert.AlertHistoryProcessorSupplier.log.debug("stopping punctuate (after removing max items) at key={} store={}", r0.key, r8.this$0.storeName);
             */
            /* JADX WARN: Failed to calculate best type for var: r15v0 ??
            java.lang.NullPointerException
             */
            /* JADX WARN: Failed to calculate best type for var: r16v0 ??
            java.lang.NullPointerException
             */
            /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException
             */
            /* JADX WARN: Not initialized variable reg: 15, insn: 0x0163: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r15 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:46:0x0163 */
            /* JADX WARN: Not initialized variable reg: 16, insn: 0x0168: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r16 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:48:0x0168 */
            /* JADX WARN: Type inference failed for: r15v0, types: [org.apache.kafka.streams.state.KeyValueIterator] */
            /* JADX WARN: Type inference failed for: r16v0, types: [java.lang.Throwable] */
            @Override // org.apache.kafka.streams.processor.Processor
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public void process(io.confluent.controlcenter.alert.record.Alert.AlertInfo r9, io.confluent.controlcenter.alert.record.Alert.AlertInfo r10) {
                /*
                    Method dump skipped, instructions count: 438
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: io.confluent.controlcenter.streams.alert.AlertHistoryProcessorSupplier.AnonymousClass1.process(io.confluent.controlcenter.alert.record.Alert$AlertInfo, io.confluent.controlcenter.alert.record.Alert$AlertInfo):void");
            }
        };
    }
}
