package kafka.server.cell;

import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import kafka.common.CellLoadDescriptionInternal;
import kafka.controller.ClusterBalanceManager;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.CellLoad;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.metrics.CellMetrics;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:kafka/server/cell/CellLoadRefresher.class */
public class CellLoadRefresher {
    public static final long DEFAULT_REFRESH_INTERVAL_MS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
    private static final Logger log = LoggerFactory.getLogger(CellLoadRefresher.class);
    private final BiConsumer<Set<CellLoad>, Long> cellLoadConsumer;
    private final ClusterBalanceManager balanceManager;
    private final Scheduler scheduler;
    private final long refreshIntervalMs;
    private final Time time;
    private final KafkaConfig config;
    private final Optional<CellMetrics> metricsOpt;

    public static Scheduler createDefaultScheduler() {
        return new KafkaScheduler(1, true, "cell-load-refresher-", false);
    }

    public CellLoadRefresher(BiConsumer<Set<CellLoad>, Long> biConsumer, ClusterBalanceManager clusterBalanceManager, Time time, KafkaConfig kafkaConfig, Scheduler scheduler, CellMetrics cellMetrics) {
        this(biConsumer, clusterBalanceManager, time, scheduler, kafkaConfig, cellMetrics, DEFAULT_REFRESH_INTERVAL_MS);
    }

    CellLoadRefresher(BiConsumer<Set<CellLoad>, Long> biConsumer, ClusterBalanceManager clusterBalanceManager, Time time, Scheduler scheduler, KafkaConfig kafkaConfig, CellMetrics cellMetrics, long j) {
        this.cellLoadConsumer = biConsumer;
        this.balanceManager = clusterBalanceManager;
        this.scheduler = scheduler;
        this.time = time;
        this.config = kafkaConfig;
        this.refreshIntervalMs = j;
        this.metricsOpt = Optional.ofNullable(cellMetrics);
    }

    public void start() {
        this.scheduler.schedule("CellLoadRefresher", this::refreshCellLoads, 0L, this.refreshIntervalMs);
        log.info("CellLoadRefresher scheduler has now started");
    }

    BoxedUnit refreshCellLoads() {
        if (!this.config.cellLoadRefresherEnabled()) {
            return BoxedUnit.UNIT;
        }
        long milliseconds = this.time.milliseconds();
        this.balanceManager.cellLoad(Collections.emptyList(), (apiError, optional) -> {
            if (apiError.error() != Errors.NONE) {
                log.error("Received error due to {}", apiError);
                return;
            }
            HashSet hashSet = new HashSet(((CellLoadDescriptionInternal) optional.orElse(new CellLoadDescriptionInternal(Collections.emptyList()))).getCellLoad());
            long milliseconds2 = this.time.milliseconds();
            log.info("Received cellLoads of length {}, and took {} milliseconds", Integer.valueOf(hashSet.size()), Long.valueOf(milliseconds2 - milliseconds));
            if (hashSet.isEmpty()) {
                return;
            }
            this.cellLoadConsumer.accept(hashSet, Long.valueOf(milliseconds2));
            this.metricsOpt.ifPresent(cellMetrics -> {
                cellMetrics.updateCellLoads(hashSet, milliseconds2);
            });
        });
        return BoxedUnit.UNIT;
    }
}
