package io.confluent.controlcenter.streams.monitoring;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.confluent.controlcenter.keys.Keys;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.serialization.OrderedKeyPrefixedSerdeSupplier;
import io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension;
import io.confluent.controlcenter.streams.C3Stream;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.TypeMapper;
import io.confluent.controlcenter.streams.verify.Verifiable;
import io.confluent.monitoring.record.Monitoring;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/monitoring/MonitoringStreamExtension.class */
public class MonitoringStreamExtension extends AbstractMonitoringMessageRollupStreamExtension<Monitoring.MonitoringMessage> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MonitoringStreamExtension.class);
    private final Verifiable verifiable;
    private final TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> monitoringMsgExtensionStore;
    private final TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> monitoringStreamStore;
    private final OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier;

    @Inject
    public MonitoringStreamExtension(Verifiable verifiable, @TopicStoreModule.MonitoringMsgExtensionStore TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> store, @TopicStoreModule.MonitoringStreamStore TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> store2, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier) {
        this.verifiable = verifiable;
        this.monitoringMsgExtensionStore = store;
        this.monitoringStreamStore = store2;
        this.orderedKeyPrefixedSerdeSupplier = orderedKeyPrefixedSerdeSupplier;
    }

    @Override // io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension
    protected String name() {
        return this.monitoringStreamStore.name;
    }

    @Override // io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension
    protected Iterable<? extends TypeMapper<Void, Monitoring.MonitoringMessage>> getMappers() {
        return ImmutableList.of(new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE)), new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_GROUP)), new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_CLIENT)), new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_TOPIC)), new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_TOPICPARTITION)), new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_CLIENT_TOPIC_PARTITION)), new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_GROUP_TOPICPARTITION)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension
    protected KTable<Windowed<Bytes>, Monitoring.MonitoringMessage> filterAndAggregateRollup(KStream<Bytes, Monitoring.MonitoringMessage> kStream, Windows<Window> windows, String str) {
        return kStream.groupByKey(Serialized.with(this.monitoringStreamStore.keySerde, this.monitoringStreamStore.valueSerde)).windowedBy(windows).aggregate(new MonitoringMessageInitializer(), MonitoringMessageAggregator.stripped(), Materialized.as(str).withKeySerde(this.monitoringStreamStore.keySerde).withValueSerde(this.monitoringStreamStore.aggregateSerde));
    }

    @Override // io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension
    protected C3Stream<Bytes, Monitoring.MonitoringMessage> shuffle(C3Stream<Bytes, Monitoring.MonitoringMessage> c3Stream) {
        return this.verifiable.transform(c3Stream, this.orderedKeyPrefixedSerdeSupplier.get(), this.monitoringMsgExtensionStore);
    }
}
