package kafka.server.cell;

import com.yammer.metrics.core.MetricsRegistry;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import kafka.common.CellLoadDescriptionInternal;
import kafka.controller.ClusterBalanceManager;
import kafka.controller.NoOpDataBalanceManager;
import kafka.server.KafkaConfig;
import kafka.utils.MockTime;
import org.apache.kafka.common.CellLoad;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.metrics.CellControllerMetrics;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:kafka/server/cell/CellLoadRefresherTest.class */
class CellLoadRefresherTest {
    private static final List<CellLoad> CELL_LOADS_0 = Arrays.asList(new CellLoad(0, 0.1d), new CellLoad(1, 0.2d), new CellLoad(2, 0.3d), new CellLoad(3, 0.4d), new CellLoad(4, 0.5d));
    private CellLoadRefresher refresher;
    private Set<CellLoad> cellLoads;
    private Scheduler scheduler;
    private int refreshTimes;
    private long timestamp;
    private MetricsRegistry metricsRegistry;
    private CellControllerMetrics metrics;
    private final KafkaConfig config = kafkaConfig(true);
    private BiConsumer<Set<CellLoad>, Long> cellLoadConsumer = (set, l) -> {
        this.cellLoads = set;
        this.refreshTimes++;
        this.timestamp = l.longValue();
    };

    CellLoadRefresherTest() {
    }

    @BeforeEach
    public void setUp() {
        this.scheduler = CellLoadRefresher.createDefaultScheduler();
        this.scheduler.startup();
        this.cellLoads = new HashSet();
        this.refreshTimes = 0;
        this.timestamp = 0L;
        this.metricsRegistry = new MetricsRegistry();
        this.metrics = new CellControllerMetrics(this.metricsRegistry, new MockTime(0L, 0L), new LogContext());
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.scheduler.shutdown();
        this.metricsRegistry.shutdown();
    }

    @Test
    void testRefresh() throws Exception {
        this.refresher = new CellLoadRefresher(this.cellLoadConsumer, new NoOpDataBalanceManager(NoOpDataBalanceManager.class.getSimpleName(), null) { // from class: kafka.server.cell.CellLoadRefresherTest.1
            public void cellLoad(List<Integer> list, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<CellLoadDescriptionInternal> balanceManagerStatusQueryClientCallback) {
                balanceManagerStatusQueryClientCallback.respond(ApiError.NONE, Optional.of(new CellLoadDescriptionInternal(CellLoadRefresherTest.CELL_LOADS_0)));
            }
        }, new MockTime(0L, 0L), this.scheduler, this.config, this.metrics, 100L);
        this.refresher.start();
        TestUtils.waitForCondition(() -> {
            return this.refreshTimes >= 5;
        }, "Expected cell loads to be refreshed");
        TestUtils.waitForCondition(() -> {
            return this.cellLoads.equals(new HashSet(CELL_LOADS_0));
        }, "Expected cell loads to be refreshed");
    }

    @Test
    void testRefreshEvenAfterException() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        this.refresher = new CellLoadRefresher(this.cellLoadConsumer, new NoOpDataBalanceManager(NoOpDataBalanceManager.class.getSimpleName(), null) { // from class: kafka.server.cell.CellLoadRefresherTest.2
            public void cellLoad(List<Integer> list, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<CellLoadDescriptionInternal> balanceManagerStatusQueryClientCallback) {
                atomicInteger.incrementAndGet();
                throw new ApiException();
            }
        }, new MockTime(), this.scheduler, this.config, this.metrics, 100L);
        this.refresher.start();
        TestUtils.waitForCondition(() -> {
            return atomicInteger.get() >= 5;
        }, "Expected scheduler to keep retrying");
        Assertions.assertEquals(0, this.refreshTimes);
        Assertions.assertEquals(new HashSet(), this.cellLoads);
    }

    @Test
    void testRefreshCellLoads() {
        this.refresher = new CellLoadRefresher(this.cellLoadConsumer, new NoOpDataBalanceManager(NoOpDataBalanceManager.class.getSimpleName(), null) { // from class: kafka.server.cell.CellLoadRefresherTest.3
            public void cellLoad(List<Integer> list, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<CellLoadDescriptionInternal> balanceManagerStatusQueryClientCallback) {
                balanceManagerStatusQueryClientCallback.respond(ApiError.NONE, Optional.of(new CellLoadDescriptionInternal(CellLoadRefresherTest.CELL_LOADS_0)));
            }
        }, new MockTime(0L, 0L), this.scheduler, this.config, this.metrics, 100L);
        this.refresher.refreshCellLoads();
        Assertions.assertEquals(1, this.refreshTimes);
        Assertions.assertEquals(new HashSet(CELL_LOADS_0), this.cellLoads);
        Assertions.assertEquals(0L, this.timestamp);
    }

    @Test
    void testRefreshCellLoadsTurnedOff() {
        this.refresher = new CellLoadRefresher(this.cellLoadConsumer, new NoOpDataBalanceManager(NoOpDataBalanceManager.class.getSimpleName(), null) { // from class: kafka.server.cell.CellLoadRefresherTest.4
            public void cellLoad(List<Integer> list, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<CellLoadDescriptionInternal> balanceManagerStatusQueryClientCallback) {
                balanceManagerStatusQueryClientCallback.respond(ApiError.NONE, Optional.of(new CellLoadDescriptionInternal(CellLoadRefresherTest.CELL_LOADS_0)));
            }
        }, new MockTime(0L, 0L), this.scheduler, kafkaConfig(false), this.metrics, 100L);
        this.refresher.refreshCellLoads();
        Assertions.assertEquals(0, this.refreshTimes);
        Assertions.assertEquals(Collections.emptySet(), this.cellLoads);
        Assertions.assertEquals(0L, this.timestamp);
    }

    private static KafkaConfig kafkaConfig(boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfig.ZkConnectProp(), "127.0.0.1:0000");
        hashMap.put(KafkaConfig.TierFetcherOffsetCacheSizeProp(), "0");
        hashMap.put("confluent.cells.enable", String.valueOf(z));
        hashMap.put("confluent.cells.load.refresher.enable", String.valueOf(z));
        return new KafkaConfig(hashMap);
    }
}
