package com.linkedin.kafka.cruisecontrol.config;

import com.linkedin.kafka.cruisecontrol.common.Resource;
import io.confluent.metrics.reporter.VolumeMetricsProvider;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.DoubleUnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.config.ConfigException;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/config/BrokerCapacityResolver.class */
public class BrokerCapacityResolver implements BrokerCapacityConfigResolver {
    public static final int DEFAULT_CAPACITY_BROKER_ID = -1;
    static final double BYTES_PER_KB = 1024.0d;
    private static final double DISK_CAPACITY_UPDATE_THRESHOLD = 1.0d;
    private static final double DISK_CAPACITY_WARN_THRESHOLD = 1.0d;
    private final Map<Integer, BrokerCapacityInfo> capacitiesForBrokers = new ConcurrentHashMap();
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BrokerCapacityResolver.class);
    public static final String LOG_DIRS_CONFIG = KafkaConfig$.MODULE$.LogDirsProp();
    private static final Double DEFAULT_CPU_CAPACITY = Double.valueOf(100.0d);
    private static final Map<Resource, ResourceConfig> BROKER_RESOURCE_CONFIGS = (Map) Stream.of((Object[]) new AbstractMap.SimpleEntry[]{new AbstractMap.SimpleEntry(Resource.NW_IN, new ResourceConfig("network.in.max.bytes.per.second", d -> {
        return d / BYTES_PER_KB;
    })), new AbstractMap.SimpleEntry(Resource.PRODUCE_IN, new ResourceConfig("producer.in.max.bytes.per.second", d2 -> {
        return d2 / BYTES_PER_KB;
    })), new AbstractMap.SimpleEntry(Resource.NW_OUT, new ResourceConfig("network.out.max.bytes.per.second", d3 -> {
        return d3 / BYTES_PER_KB;
    })), new AbstractMap.SimpleEntry(Resource.CONSUME_OUT, new ResourceConfig("consumer.out.max.bytes.per.second", d4 -> {
        return d4 / BYTES_PER_KB;
    }))}).collect(Collectors.toMap((v0) -> {
        return v0.getKey();
    }, (v0) -> {
        return v0.getValue();
    }));

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/config/BrokerCapacityResolver$ResourceConfig.class */
    private static class ResourceConfig {
        String configName;
        DoubleUnaryOperator conversionFunc;

        ResourceConfig(String str, DoubleUnaryOperator doubleUnaryOperator) {
            this.configName = (String) Objects.requireNonNull(str);
            this.conversionFunc = (DoubleUnaryOperator) Objects.requireNonNull(doubleUnaryOperator);
        }

        Double convertConfigValue(Double d) {
            return Double.valueOf(this.conversionFunc.applyAsDouble(d.doubleValue()));
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver
    public BrokerCapacityInfo capacityForBroker(String str, String str2, int i) {
        if (this.capacitiesForBrokers.isEmpty()) {
            throw new ConfigException("not configured");
        }
        if (i < 0) {
            throw new IllegalArgumentException("The broker id(" + i + ") should be non-negative.");
        }
        BrokerCapacityInfo brokerCapacityInfo = this.capacitiesForBrokers.get(Integer.valueOf(i));
        if (brokerCapacityInfo != null) {
            return brokerCapacityInfo;
        }
        LOG.debug("Returning default broker capacity for broker {}", Integer.valueOf(i));
        return this.capacitiesForBrokers.get(-1);
    }

    @Override // com.linkedin.cruisecontrol.common.CruiseControlConfigurable
    public void configure(Map<String, ?> map) {
        LOG.info("CruiseControl: Attempting to configure Broker Capacity from config properties");
        HashMap hashMap = new HashMap();
        hashMap.put(Resource.CPU, DEFAULT_CPU_CAPACITY);
        for (Map.Entry<Resource, ResourceConfig> entry : BROKER_RESOURCE_CONFIGS.entrySet()) {
            ResourceConfig value = entry.getValue();
            Long l = (Long) map.get(value.configName);
            try {
                Double convertConfigValue = value.convertConfigValue(Double.valueOf(l.doubleValue()));
                if (convertConfigValue.doubleValue() < 0.0d || convertConfigValue == null) {
                    throw new ConfigException("Negative capacity " + convertConfigValue + " (from " + l + ") is not allowed for " + value);
                }
                hashMap.put(entry.getKey(), convertConfigValue);
            } catch (NumberFormatException e) {
                throw new ConfigException("Invalid capacity (unparseable) " + l + " for capacity measure " + value, e);
            }
        }
        Boolean bool = (Boolean) map.get(KafkaCruiseControlConfig.POPULATE_DEFAULT_DISK_CAPACITY_FROM_LOCAL_CONFIG);
        if (bool != null && bool.booleanValue()) {
            double diskCapacityForCurrentBrokerInMiB = diskCapacityForCurrentBrokerInMiB(map);
            if (diskCapacityForCurrentBrokerInMiB < 1.0d) {
                LOG.warn("Default broker disk capacity configured to {} MB", Double.valueOf(diskCapacityForCurrentBrokerInMiB));
            }
            hashMap.put(Resource.DISK, Double.valueOf(diskCapacityForCurrentBrokerInMiB));
        }
        this.capacitiesForBrokers.put(-1, BrokerCapacityInfo.builder().capacity(hashMap).estimationInfo("default broker capacity").build());
    }

    @Override // com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver
    public void updateDiskCapacityForBroker(String str, String str2, int i, double d) {
        if (i < 0) {
            LOG.warn("Received disk capacity update for broker with negative id: {}, skipping update");
            return;
        }
        BrokerCapacityInfo capacityForBroker = capacityForBroker(str, str2, i);
        double doubleValue = capacityForBroker.capacity().getOrDefault(Resource.DISK, Double.valueOf(0.0d)).doubleValue();
        if (Math.abs(doubleValue - d) <= 1.0d) {
            LOG.debug("Skipping request to update disk capacity for broker {} from {} to {} mebibytes", Integer.valueOf(i), Double.valueOf(doubleValue), Double.valueOf(d));
            return;
        }
        LOG.info("Updating disk capacity for broker {} from {} to {} mebibytes", Integer.valueOf(i), Double.valueOf(doubleValue), Double.valueOf(d));
        HashMap hashMap = new HashMap(capacityForBroker.capacity());
        hashMap.put(Resource.DISK, Double.valueOf(d));
        this.capacitiesForBrokers.put(Integer.valueOf(i), BrokerCapacityInfo.builder().capacity(hashMap).build());
    }

    private double diskCapacityForCurrentBrokerInMiB(Map<String, ?> map) {
        String str = map.get(LOG_DIRS_CONFIG) instanceof String ? (String) map.get(LOG_DIRS_CONFIG) : null;
        if (str == null || str.length() == 0) {
            throw new ConfigException("Error calculating disk capacity, invalid log dir " + str);
        }
        Collection values = new VolumeMetricsProvider(0L, new String[]{str}).getMetrics().values();
        if (values.size() > 1) {
            throw new IllegalStateException("Static disk size estimation not supported for multiple volumes");
        }
        if (values.stream().findFirst().isPresent()) {
            return ((VolumeMetricsProvider.VolumeInfo) r0.get()).totalBytes() / 1048576.0d;
        }
        throw new ConfigException("Error calculating disk capacity, couldn't find volumes for log dir" + str);
    }
}
