package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafkarest.KafkaRestConfig;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.metrics.BrokerLoadConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.test.TestUtils;
import org.apache.maven.artifact.Artifact;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantKafkaIntegrationTest.class */
public class MultiTenantKafkaIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private final ListenerName externalListenerName = new ListenerName("external");
    private final String externalListenerPrefix = this.externalListenerName.configPrefix();

    @Test
    public void testMultiTenantMetadataInstances() {
        setUp();
        createPhysicalAndLogicalClusters();
        List list = (List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> {
            Object obj = kafkaServer.config().values().get(KafkaConfig.BrokerSessionUuidProp());
            return obj == null ? "" : obj.toString();
        }).distinct().collect(Collectors.toList());
        Assertions.assertEquals(2, list.size(), "Expect each broker to have unique session UUID.");
        list.forEach(str -> {
            Assertions.assertNotNull(PhysicalClusterMetadata.getInstance(str), "Expect valid instance of PhysicalClusterMetadata for broker session UUID " + str);
        });
    }

    @Test
    public void testProduceConsume() throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0);
        this.testHarness.produceConsume(this.logicalCluster2.user(21), this.logicalCluster2.user(22), "testtopic", "group1", 1000);
    }

    @Test
    public void testUpdatingBrokerLoadMetricTenantTag() throws Throwable {
        setUp();
        this.physicalCluster = this.testHarness.start(brokerProps());
        addLkcFileAndSyncMetadata(Utils.LC_META_ABC);
        assertBrokerLoadMetricTenantTag(Utils.LC_META_ABC.logicalClusterId());
        addLkcFileAndSyncMetadata(Utils.LC_META_XYZ);
        assertBrokerLoadMetricTenantTag(Utils.LC_META_ABC.logicalClusterId());
        assertBrokerLoadMetricTenantTag(Utils.LC_META_XYZ.logicalClusterId());
        deleteLkcFileAndSyncMetadata(Utils.LC_META_ABC);
        assertBrokerLoadMetricTenantTag(Utils.LC_META_XYZ.logicalClusterId());
    }

    private void assertBrokerLoadMetricTenantTag(String str) throws Throwable {
        Map singletonMap = Collections.singletonMap("tenant", str);
        TestUtils.waitForCondition(() -> {
            Stream<R> map = this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> {
                return kafkaServer.socketServer().brokerLoad().get().brokerLoadPercentMetric(str).get().metricName().tags();
            });
            singletonMap.getClass();
            return map.allMatch((v1) -> {
                return r1.equals(v1);
            });
        }, String.format("Expect that the broker's broker load metric tenant tag to be %s", str));
    }

    @Test
    public void testAlterBrokerConfigsWhenConfigDisabled() throws Exception {
        setUp();
        createPhysicalAndLogicalClusters();
        expectAlterBrokerConfigsViaExternalListenerRejected(this.testHarness.createAdminClient(this.logicalCluster1.adminUser()), this.physicalCluster.superAdminClient(), new ConfigResource(ConfigResource.Type.BROKER, "0"));
    }

    @Test
    public void testAlterClusterConfigsWhenConfigDisabled() throws Exception {
        setUp();
        createPhysicalAndLogicalClusters();
        expectAlterBrokerConfigsViaExternalListenerRejected(this.testHarness.createAdminClient(this.logicalCluster1.adminUser()), this.physicalCluster.superAdminClient(), new ConfigResource(ConfigResource.Type.BROKER, ""));
    }

    private void expectAlterBrokerConfigsViaExternalListenerRejected(AdminClient adminClient, AdminClient adminClient2, ConfigResource configResource) throws Exception {
        KafkaServer kafkaServer = this.physicalCluster.kafkaCluster().kafkas().get(0).kafkaServer();
        List<String> list = kafkaServer.config().getList("ssl.cipher.suites");
        int intValue = kafkaServer.config().messageMaxBytes().intValue();
        Map singletonMap = Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), KafkaRestConfig.PRODUCE_MAX_REQUESTS_PER_SECOND_DEFAULT), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry("ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"))));
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            adminClient.alterConfigs(singletonMap).all().get();
        })).getCause().getClass());
        Assertions.assertEquals(intValue, kafkaServer.config().messageMaxBytes().intValue());
        Assertions.assertEquals(false, kafkaServer.config().autoCreateTopicsEnable());
        Assertions.assertEquals(list, kafkaServer.config().get("ssl.cipher.suites"));
        Map singletonMap2 = Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "15000"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET)));
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            adminClient.incrementalAlterConfigs(singletonMap2).all().get();
        })).getCause().getClass());
        Assertions.assertEquals(intValue, kafkaServer.config().messageMaxBytes().intValue());
        Assertions.assertEquals(false, kafkaServer.config().autoCreateTopicsEnable());
        Assertions.assertEquals(list, kafkaServer.config().get("ssl.cipher.suites"));
        adminClient2.alterConfigs(Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), KafkaRestConfig.PRODUCE_MAX_REQUESTS_PER_SECOND_DEFAULT), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry(this.externalListenerPrefix + "ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"))))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().messageMaxBytes().intValue() == 10000;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256".equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config not updated");
        adminClient2.incrementalAlterConfigs(Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "15000"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(this.externalListenerPrefix + "ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().messageMaxBytes().intValue() == 15000;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384".equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config not updated");
        Map singletonMap3 = Collections.singletonMap(configResource, new Config(Collections.singleton(new ConfigEntry(KafkaConfig.BrokerIdProp(), "20"))));
        Assertions.assertEquals(InvalidRequestException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            adminClient2.alterConfigs(singletonMap3).all().get();
        })).getCause().getClass());
        Assertions.assertEquals(0, kafkaServer.config().brokerId());
    }

    @Test
    public void testAlterBrokerConfigsWhenConfigEnabled() throws Exception {
        setUp();
        Properties brokerProps = brokerProps();
        brokerProps.put(AlterConfigPolicy.ClusterPolicyConfig.ALTER_ENABLE_CONFIG, "true");
        createPhysicalAndLogicalClusters(brokerProps);
        expectAlterBrokerConfigsViaExternalListenerRejected(this.testHarness.createAdminClient(this.logicalCluster1.adminUser()), this.physicalCluster.superAdminClient(), new ConfigResource(ConfigResource.Type.BROKER, "0"));
    }

    @Test
    public void testAlterClusterConfigsWhenConfigEnabled() throws Exception {
        setUp();
        Properties brokerProps = brokerProps();
        brokerProps.put(AlterConfigPolicy.ClusterPolicyConfig.ALTER_ENABLE_CONFIG, "true");
        createPhysicalAndLogicalClusters(brokerProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "");
        String str = "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256";
        KafkaServer kafkaServer = this.physicalCluster.kafkaCluster().kafkas().get(0).kafkaServer();
        long j = 3600000;
        createAdminClient.alterConfigs(Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(3600000L)), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry(KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"), new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(2147483646L)))))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().logRetentionTimeMillis() == j;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return str.equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config for ssl-cipher-suites not updated.");
        String str2 = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
        long j2 = 3600001;
        int i = 2;
        createAdminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(3600001L)), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.NumPartitionsProp(), String.valueOf(2)), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "false"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(2147483647L)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().logRetentionTimeMillis() == j2;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().numPartitions().intValue() == i;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return !kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return str2.equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config for ssl-cipher-suites not updated.");
        superAdminClient.alterConfigs(Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(3600000L)), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry(this.externalListenerPrefix + KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"), new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(2147483646L)))))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().logRetentionTimeMillis() == j;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return str.equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config for ssl-cipher-suites not updated.");
        superAdminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(3600001L)), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "false"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(this.externalListenerPrefix + KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(2147483647L)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().logRetentionTimeMillis() == j2;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return !kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return str2.equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config for ssl-cipher-suites not updated.");
        Map singletonMap = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(2599999L)), AlterConfigOp.OpType.SET)));
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createAdminClient.incrementalAlterConfigs(singletonMap).all().get();
        })).getCause().getClass());
        Assertions.assertEquals(3600001L, kafkaServer.config().logRetentionTimeMillis());
        int intValue = kafkaServer.config().messageMaxBytes().intValue();
        Map singletonMap2 = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "500000"), AlterConfigOp.OpType.SET)));
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createAdminClient.incrementalAlterConfigs(singletonMap2).all().get();
        })).getCause().getClass());
        Assertions.assertEquals(intValue, kafkaServer.config().messageMaxBytes().intValue());
    }

    private String sslCipherSuitesFromConfig(KafkaConfig kafkaConfig, ListenerName listenerName) {
        return (String) kafkaConfig.originals().get(listenerName.configPrefix() + KafkaConfig.SslCipherSuitesProp());
    }

    @Test
    public void testAcls() throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient createAdminClient2 = this.testHarness.createAdminClient(this.logicalCluster2.adminUser());
        Assertions.assertEquals(Collections.emptySet(), describeAllAcls(createAdminClient));
        List asList = Arrays.asList(ResourceType.TOPIC, ResourceType.GROUP, ResourceType.TRANSACTIONAL_ID);
        HashSet hashSet = new HashSet();
        asList.forEach(resourceType -> {
            hashSet.add(new AclBinding(new ResourcePattern(resourceType, "test.resource", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)));
        });
        asList.forEach(resourceType2 -> {
            hashSet.add(new AclBinding(new ResourcePattern(resourceType2, "test.", PatternType.PREFIXED), new AccessControlEntry(this.logicalCluster1.user(12).unprefixedKafkaPrincipal().toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)));
        });
        asList.forEach(resourceType3 -> {
            hashSet.add(new AclBinding(new ResourcePattern(resourceType3, "*", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
        });
        createAdminClient.createAcls(hashSet).all().get();
        Assertions.assertEquals(hashSet, describeAllAcls(createAdminClient));
        Assertions.assertEquals(Collections.emptySet(), describeAllAcls(createAdminClient2));
        createAdminClient2.createAcls(hashSet).all().get();
        Assertions.assertEquals(hashSet, describeAllAcls(createAdminClient2));
        createAdminClient2.deleteAcls(Collections.singletonList(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, Artifact.SCOPE_TEST, PatternType.PREFIXED), new AccessControlEntryFilter("User:*", "*", AclOperation.ANY, AclPermissionType.ANY)))).all().get();
    }

    @Test
    public void testBrokerLoadConfigIsDynamicallyUpdated() throws Exception {
        setUp();
        createPhysicalAndLogicalClusters();
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        List list = (List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> {
            return kafkaServer.socketServer().brokerLoad().get().brokerLoadConfig();
        }).collect(Collectors.toList());
        list.forEach(brokerLoadConfig -> {
            Assertions.assertEquals(0.1d, brokerLoadConfig.averageServiceRequestTime(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, "Expect that the broker load config is set to the default value");
        });
        list.forEach(brokerLoadConfig2 -> {
            Assertions.assertEquals(20.0d, brokerLoadConfig2.workloadCoefficient(), ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT, "Expect that the broker load config is set to the the default value");
        });
        superAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry(ConfluentConfigs.BROKER_LOAD_AVERAGE_SERVICE_REQUEST_TIME_MS_CONFIG, String.valueOf(0.2d)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.waitForCondition(() -> {
            return 0.2d == ((BrokerLoadConfig) ((List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer2 -> {
                return kafkaServer2.socketServer().brokerLoad().get().brokerLoadConfig();
            }).collect(Collectors.toList())).get(0)).averageServiceRequestTime();
        }, "Expect that the broker load config is set to the configured value");
        superAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry(ConfluentConfigs.BROKER_LOAD_WORKLOAD_COEFFICIENT_CONFIG, String.valueOf(60.0d)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.waitForCondition(() -> {
            return 60.0d == ((BrokerLoadConfig) ((List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer2 -> {
                return kafkaServer2.socketServer().brokerLoad().get().brokerLoadConfig();
            }).collect(Collectors.toList())).get(0)).workloadCoefficient();
        }, "Expect that the broker load config is set to the configured value");
    }

    @Test
    public void testValidBrokerLoadMetricValue() throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        TestUtils.waitForCondition(() -> {
            this.physicalCluster.kafkaCluster().brokers().forEach(kafkaServer -> {
                kafkaServer.time().sleep(10L);
            });
            return this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer2 -> {
                return kafkaServer2.socketServer().brokerLoad().get();
            }).allMatch(brokerLoad -> {
                return (Double.isNaN(((Double) brokerLoad.weightedAverageExternalNetworkThreadIdleRatioMetric().get().metricValue()).doubleValue()) || Double.isNaN(((Double) brokerLoad.weightedAverageReplicationNetworkThreadIdleRatioMetric().get().metricValue()).doubleValue()) || Double.isNaN(((Double) brokerLoad.weightedAverageRequestHandlerThreadIdleRatioMetric().get().metricValue()).doubleValue())) ? false : true;
            });
        }, "Broker load's thread idle ratio metrics should not be NaN");
    }
}
