package io.confluent.controlcenter.streams.verify;

import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.record.Monitoring;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/verify/ThroughTopicVerifierTransformerSupplier.class */
public class ThroughTopicVerifierTransformerSupplier implements ValueTransformerSupplier<Controlcenter.VerifiableMonitoringMessage, Monitoring.MonitoringMessage> {
    private final String storeName;
    private final Clock clock;

    /* loaded from: input_file:io/confluent/controlcenter/streams/verify/ThroughTopicVerifierTransformerSupplier$ThroughTopicVerifierTransformer.class */
    private class ThroughTopicVerifierTransformer implements ValueTransformer<Controlcenter.VerifiableMonitoringMessage, Monitoring.MonitoringMessage> {
        private final Logger log = LoggerFactory.getLogger((Class<?>) ThroughTopicVerifierTransformer.class);
        private final String storeName;
        private ProcessorContext context;
        private KeyValueStore<String, Long> kvStore;

        ThroughTopicVerifierTransformer(String str) {
            this.storeName = str;
        }

        @Override // org.apache.kafka.streams.kstream.ValueTransformer
        public void init(ProcessorContext processorContext) {
            this.context = processorContext;
            this.kvStore = (KeyValueStore) this.context.getStateStore(this.storeName);
        }

        @Override // org.apache.kafka.streams.kstream.ValueTransformer
        public Monitoring.MonitoringMessage transform(Controlcenter.VerifiableMonitoringMessage verifiableMonitoringMessage) {
            if (verifiableMonitoringMessage == null) {
                return null;
            }
            String guid = verifiableMonitoringMessage.getGuid();
            Monitoring.MonitoringMessage monitoringMessage = null;
            try {
                Long l = this.kvStore.get(guid);
                this.kvStore.put(guid, Long.valueOf(ThroughTopicVerifierTransformerSupplier.this.clock.currentTimeMillis()));
                if (l == null) {
                    monitoringMessage = verifiableMonitoringMessage.getMonitoringMessage();
                } else {
                    this.log.warn("dropping duplicate message key={} lastSeen={} store={}", guid, l, this.storeName);
                }
            } catch (Exception e) {
                this.log.warn("failed to update key={} store={} {}", guid, this.storeName, e);
            }
            return monitoringMessage;
        }

        @Override // org.apache.kafka.streams.kstream.ValueTransformer
        public void close() {
            this.log.info("closing store={}", this.storeName);
        }
    }

    public ThroughTopicVerifierTransformerSupplier(String str, Clock clock) {
        this.storeName = str;
        this.clock = clock;
    }

    @Override // org.apache.kafka.streams.kstream.ValueTransformerSupplier
    public ValueTransformer<Controlcenter.VerifiableMonitoringMessage, Monitoring.MonitoringMessage> get() {
        return new ThroughTopicVerifierTransformer(this.storeName);
    }
}
