package io.confluent.controlcenter.streams.verify;

import com.google.common.collect.Lists;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.UberSerde;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/verify/MonitoringHeartbeatSender.class */
public class MonitoringHeartbeatSender implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MonitoringHeartbeatSender.class);
    private final long timeout;
    private final Monitoring.MonitoringMessage baseMonitoringMessage;
    private final String publishTopic;
    private final Set<TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void>> internalPublishTopics;
    private final UberSerde<Monitoring.MonitoringMessage> monitoringMessageSerde;
    private final Clock clock;
    private KafkaProducer<byte[], byte[]> heartbeatProducer;

    public MonitoringHeartbeatSender(String str, Set<TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void>> set, Monitoring.MonitoringMessage monitoringMessage, KafkaProducer<byte[], byte[]> kafkaProducer, UberSerde<Monitoring.MonitoringMessage> uberSerde, long j, Clock clock) {
        this.timeout = j;
        this.publishTopic = str;
        this.internalPublishTopics = set;
        this.baseMonitoringMessage = monitoringMessage;
        this.heartbeatProducer = kafkaProducer;
        this.monitoringMessageSerde = uberSerde;
        this.clock = clock;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int numberOfTopicPartitions(String str) {
        return this.heartbeatProducer.partitionsFor(str).size();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            sendHeartbeat();
        } catch (Throwable th) {
            log.error("Terminating heartbeat thread", th);
        }
    }

    public void close() {
        try {
            this.heartbeatProducer.close();
        } catch (Exception e) {
            log.error("Failed to close monitoring heartbeat producer", (Throwable) e);
        }
        log.info("Closed monitoring heartbeat producer");
    }

    private void sendHeartbeat() {
        long currentTimeMillis = this.clock.currentTimeMillis();
        Monitoring.MonitoringMessage build = Monitoring.MonitoringMessage.newBuilder(this.baseMonitoringMessage).setTimestamp(currentTimeMillis).setWindow(currentTimeMillis).build();
        try {
            ArrayList newArrayList = Lists.newArrayList();
            sendHeartbeatToAllPartitions(this.publishTopic, null, build, newArrayList);
            for (TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> topic : this.internalPublishTopics) {
                sendHeartbeatToAllPartitions(topic.name, topic.keySerde.serialize(Controlcenter.WindowedClusterGroup.newBuilder().setWindow(0L).build()), build, newArrayList);
            }
            long j = currentTimeMillis + this.timeout;
            Iterator<Future<RecordMetadata>> it = newArrayList.iterator();
            while (j > this.clock.currentTimeMillis() && it.hasNext()) {
                it.next().get(j - this.clock.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
        } catch (Throwable th) {
            log.warn("Failed to publish monitoring heartbeat message", th);
        }
    }

    private void sendHeartbeatToAllPartitions(final String str, byte[] bArr, Monitoring.MonitoringMessage monitoringMessage, List<Future<RecordMetadata>> list) {
        int size = this.heartbeatProducer.partitionsFor(str).size();
        for (int i = 0; i < size; i++) {
            final int i2 = i;
            list.add(this.heartbeatProducer.send(new ProducerRecord<>(str, Integer.valueOf(i2), bArr, this.monitoringMessageSerde.serialize(monitoringMessage)), new Callback() { // from class: io.confluent.controlcenter.streams.verify.MonitoringHeartbeatSender.1
                @Override // org.apache.kafka.clients.producer.Callback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null) {
                        MonitoringHeartbeatSender.log.warn("Failed to publish monitoring heartbeat message to topic-partition [{}-{}]", str, Integer.valueOf(i2), exc);
                    }
                }
            }));
        }
    }
}
