package com.linkedin.kafka.cruisecontrol.config;

import com.linkedin.cruisecontrol.common.CruiseControlConfigurable;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ConsumerOutboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IncrementalCPUResourceDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.MirrorInboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.MovementExclusionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ProducerInboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.SystemTopicEvenDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorNoopNotifier;
import com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.executor.strategy.PostponeUrpReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeLargeReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeSmallReplicaMovementStrategy;
import io.confluent.cruisecontrol.analyzer.goals.CellAwareGoal;
import io.confluent.cruisecontrol.analyzer.goals.CrossRackMovementGoal;
import io.confluent.cruisecontrol.analyzer.goals.MaxReplicaMovementParallelismGoal;
import io.confluent.cruisecontrol.analyzer.goals.ReplicaPlacementGoal;
import io.confluent.cruisecontrol.analyzer.goals.TenantAwareGoal;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.BalancerConfigs;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.eclipse.jetty.server.session.HouseKeeper;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/config/KafkaCruiseControlConfig.class */
public class KafkaCruiseControlConfig extends AbstractConfig {
    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    public static final String CLIENT_ID_CONFIG = "client.id";
    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
    public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
    public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
    public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
    private static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.";
    public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
    private static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. This configuration is used as the default timeout for all client operations that do not specify a <code>timeout</code> parameter.";
    public static final int DEFAULT_API_TIMEOUT_MS_DEFAULT = 60000;
    private static final String PARTITION_METRIC_SAMPLE_AGGREGATOR_COMPLETENESS_CACHE_SIZE_DOC = "The metric sample aggregator caches the completeness metadata for fast query. The completeness describes the confidence level of the data in the metric sample aggregator. It is primarily measured by the validity of the metricssamples in different windows. This configuration configures The number of completeness cache slots to maintain.";
    private static final String BROKER_METRIC_SAMPLE_AGGREGATOR_COMPLETENESS_CACHE_SIZE_DOC = "The metric sample aggregator caches the completeness metadata for fast query. The completeness describes the confidence level of the data in the metric sample aggregator. It is primarily measured by the validity of the metricssamples in different windows. This configuration configures The number of completeness cache slots to maintain.";
    public static final String NETWORK_IN_CAPACITY_BYTES_CONFIG = "network.in.max.bytes.per.second";
    public static final String PRODUCER_IN_CAPACITY_BYTES_CONFIG = "producer.in.max.bytes.per.second";
    private static final String NETWORK_IN_CAPACITY_BYTES_DOC = "This config specifies the upper capacity limit for network incoming bytes per second per broker. The Confluent DataBalancer will attempt to keep incoming data throughput below this limit.";
    public static final String NETWORK_OUT_CAPACITY_BYTES_CONFIG = "network.out.max.bytes.per.second";
    public static final String CONSUMER_OUT_CAPACITY_BYTES_CONFIG = "consumer.out.max.bytes.per.second";
    private static final String NETWORK_OUT_CAPACITY_BYTES_DOC = "This config specifies the upper capacity limit for network outgoing bytes per second per broker. The Confluent DataBalancer will attempt to keep outgoing data throughput below this limit.";
    public static final String CPU_BALANCE_THRESHOLD_CONFIG = "cpu.balance.threshold";
    private static final String CPU_BALANCE_THRESHOLD_DOC = "The maximum allowed extent of unbalance for CPU utilization. Under relative threshold calculation, 1.10 means the highest CPU usage of a broker should not be above 1.10x of average CPU utilization of all the brokers. Under absolute threshold calculation, 1.10 means the highest CPU usage of a broker should not be 10% more than the average CPU utilization of the all the brokers. The lower threshold is symmetric to the upper threshold.";
    public static final double DEFAULT_TOPIC_BALANCING_BALANCE_THRESHOLD_MULTIPLIER = 1.0d;
    public static final double DEFAULT_TOPIC_BALANCING_TRIGGER_THRESHOLD_MULTIPLIER = 3.0d;
    public static final String GOAL_VIOLATION_DISTRIBUTION_THRESHOLD_MULTIPLIER_CONFIG = "goal.violation.distribution.threshold.multiplier";
    public static final double DEFAULT_GOAL_VIOLATION_DISTRIBUTION_THRESHOLD_MULTIPLIER = 1.1d;
    private static final String GOAL_VIOLATION_DISTRIBUTION_THRESHOLD_MULTIPLIER_DOC = "The multiplier applied to the threshold of distribution goals used for detecting and fixing anomalies. For example, 2.50 means the threshold for each distribution goal (i.e. Replica Distribution, Leader Replica Distribution, Resource Distribution, and Topic Replica Distribution Goals) will be 2.50x of the value used in manual goal optimization requests (e.g. rebalance).";
    public static final String DISK_CAPACITY_THRESHOLD_CONFIG = "disk.max.load";
    public static final String DISK_CAPACITY_MIN_FREE_SPACE_CONFIG = "disk.min.free.space.gb";
    public static final String MAX_REPLICAS_PER_BROKER_CONFIG = "max.replicas";
    public static final String REPLICATION_THROTTLE_CONFIG = "throttle.bytes.per.second";
    private static final int DEFAULT_SHUTDOWN_TIMEOUT_MS = 15000;
    private static final int DEFAULT_CC_STARTUP_RETRY_DURATION_HOURS = 2;
    private static final int DEFAULT_CC_STARTUP_RETRY_DELAY_MINUTES = 5;
    private static final boolean POPULATE_DEFAULT_DISK_CAPACITY_FROM_LOCAL_DEFAULT = true;
    public static final String GOALS_CONFIG = "goals";
    public static final String ANOMALY_DETECTION_GOALS_CONFIG = "anomaly.detection.goals";
    public static final String GOAL_VIOLATION_DELAY_ON_NEW_BROKER_MS_CONFIG = "goal.violation.delay.on.new.brokers.ms";
    public static final String GOAL_VIOLATION_DELAY_ON_NEW_BROKER_MS_DOC = "Amount of time (in ms) to delay self-healing when new brokers join the cluster. Allows the cluster to stabilize.";
    public static final long GOAL_VIOLATION_DELAY_ON_NEW_BROKERS_MS_DEFAULT = 1800000;
    public static final String PLAN_COMPUTATION_RETRY_TIMEOUT_MS_CONFIG = "confluent.balancer.plan.computation.retry.timeout.ms";
    public static final String PLAN_COMPUTATION_RETRY_INTERVAL_MS_DOC = "The time, in milliseconds, after which plan computation will give up on retrying failures due to insufficient metrics or ongoing reassignments. Note that this only applies to the plan computation used for broker removal and broker addition operations.";
    public static final double DEFAULT_REQUEST_CONTRIBUTION_WEIGHT = 0.8d;
    public static final double DEFAULT_BYTES_CONTRIBUTION_WEIGHT = 0.2d;
    public static final String CAPACITY_BALANCING_DELTA_PERCENTAGE_CONFIG = "max.capacity.balancing.delta.percentage";
    public static final String CAPACITY_BALANCING_DELTA_PERCENTAGE_DOC = "The extra percentage of total capacity that SBC will attempt to balance below the allowed capacity once it gets triggered to fix a broker hitting its maximum allowed capacity for a given resource.";
    public static final String INCREMENTAL_BALANCING_ENABLED_CONFIG = "incremental.balancing.enabled";
    public static final String INCREMENTAL_BALANCING_ENABLED_DOC = "A boolean value controlling whether to use incremental balancing strategy or not.";
    public static final String INCREMENTAL_BALANCING_GOALS_CONFIG = "incremental.balancing.goals";
    public static final String INCREMENTAL_BALANCING_GOALS_DOC = "A list of case insensitive goals in the order of priority. The high priority goals will be executed first. This list of goals will be used for anomaly detection and balancing if incremental.balancing.enabled is set to true.";
    public static final String INCREMENTAL_BALANCING_STEP_RATIO_CONFIG = "incremental.balancing.step.ratio";
    public static final String INCREMENTAL_BALANCING_STEP_RATIO_DOC = "A value in the interval [0, 1] representing the step size of each incremental balancing round. For example, if an overloaded broker has 70% CPU usage and the mean CPU usage across the cluster is 50%, setting incremental.balancing.step.ratio to 0.2 will generate incremental balancing proposals that moves enough replicas off from the overloaded broker so that its CPU usage drops by 20% of the delta: (70% - 50%) * 0.2 = 4%.";
    public static final String INCREMENTAL_BALANCING_LOWER_BOUND_CONFIG = "incremental.balancing.lower.bound";
    public static final String INCREMENTAL_BALANCING_LOWER_BOUND_DOC = "A value in the interval [0, 1] representing the lower bound of replica movements from each incremental balancing round to prevent the long tail effect of incremental balancing. Take CPU usage for example, setting incremental.balancing.lower.bound to 0.02 will cause each incremental balancing round generates enough movements so that the change in CPU usage is at least 0.02 * 100% (Capacity of CPU usage) = 2% of the capacity. Combining with incremental.balancing.step.ratio, these two configurations facilitate the success criteria of an incremental balancing round.";
    public static final String INCREMENTAL_BALANCING_MIN_VALID_WINDOWS_CONFIG = "incremental.balancing.min.valid.windows";
    public static final String INCREMENTAL_BALANCING_MIN_VALID_WINDOWS_DOC = "An positive integer denoting the minimum number of the metric windows required by the incremental balancing algorithm to compute a cluster balancing plan. Setting this to a higher value will cause incremental algorithm to wait for more metric windows to be available before computing a balancing plan. This can help reducing the metric spikes by smoothing the metric values at the expense of longer time interval between each balancing round. The value cannot be greater than the total number of metric windows kept by SBC.";
    public static final int INCREMENTAL_BALANCING_MIN_VALID_WINDOWS_DEFAULT = 5;
    public static final String SBC_METRICS_PARSER_ENABLED_CONFIG = "sbc.metrics.parser.enabled";
    public static final String SBC_METRICS_PARSER_ENABLED_DOC = "A boolean value controlling whether to use the sbc metric parser or the default one.";
    public static final String SELF_HEALING_MAXIMUM_ROUNDS_CONFIG = "self.healing.maximum.rounds";
    public static final String SELF_HEALING_MAXIMUM_ROUNDS_DOC = "A positive integer denoting the maximum number of balancing rounds that one self-healing operation can contain. One balancing round is defined as one execution of the self-healing goal. We consider a self-healing operation as successful if every broker is balanced within the rebalancing resource utilization threshold OR maximum number of self-healing goal execution iteration has been reached.";
    public static final int SELF_HEALING_MAXIMUM_ROUNDS_DEFAULT = 1;
    public static final String CPU_UTILIZATION_DETECTOR_ENABLED_CONFIG = "cpu.utilization.detector.enabled";
    private static final boolean DEFAULT_CPU_UTILIZATION_DETECTOR_ENABLED = false;
    private static final String CPU_UTILIZATION_DETECTOR_ENABLED_DOC = "Specify if cpu optimization detector is enabled";
    public static final String DISK_UTILIZATION_DETECTOR_ENABLED_CONFIG = "disk.utilization.detector.enabled";
    private static final boolean DEFAULT_DISK_UTILIZATION_DETECTOR_ENABLED = false;
    private static final String DISK_UTILIZATION_DETECTOR_ENABLED_DOC = "Specify if disk optimization detector is enabled";
    public static final String V2_ADDITION_ENABLED_CONFIG = "v2.addition.enabled";
    private static final boolean DEFAULT_V2_ADDITION_ENABLED = false;
    private static final String BALANCER_V2_ADDITION_ENABLED_DOC = "A boolean denoting whether the version 2 of the broker addition operation is enabled. Because V2 broker addition relies exclusively on self healing to add the broker to the cluster, its enablement also depends on incremental balancing being enabled ('confluent.balancer.incremental.balancing.enabled'). To enable V2 broker addition, configure both this config and 'confluent.balancer.incremental.balancing.enabled' to true.";
    public static final String RESOURCE_UTILIZATION_DETECTOR_INTERVAL_MS_CONFIG = "resource.utilization.detector.interval.ms";
    public static final int DEFAULT_RESOURCE_UTILIZATION_DETECTOR_INTERVAL_MS = 60000;
    private static final String RESOURCE_UTILIZATION_DETECTOR_INTERVAL_MS_DOC = "The interval in milliseconds that the resource optimization detector will run.";
    public static final String CPU_UTILIZATION_DETECTOR_DURATION_MS_CONFIG = "cpu.utilization.detector.duration.ms";
    private static final int DEFAULT_CPU_UTILIZATION_DETECTOR_DURATION_MS = 600000;
    private static final String CPU_UTILIZATION_DETECTOR_DURATION_MS_DOC = "The duration in milliseconds for which resource utilization values will be accumulated before the decision to raise an alert is made.";
    public static final String CPU_UTILIZATION_DETECTOR_OVERUTILIZATION_THRESHOLD_CONFIG = "cpu.utilization.detector.overutilization.threshold";
    private static final double DEFAULT_CPU_UTILIZATION_DETECTOR_OVERUTILIZATION_THRESHOLD = 80.0d;
    private static final String CPU_UTILIZATION_DETECTOR_OVERUTILIZATION_THRESHOLD_DOC = "CPU usage percentage representing overutilization.";
    public static final String CPU_UTILIZATION_DETECTOR_UNDERUTILIZATION_THRESHOLD_CONFIG = "cpu.utilization.detector.underutilization.threshold";
    private static final double DEFAULT_CPU_UTILIZATION_DETECTOR_UNDERUTILIZATION_THRESHOLD = 50.0d;
    private static final String CPU_UTILIZATION_DETECTOR_UNDERUTILIZATION_THRESHOLD_DOC = "CPU usage percentage representing underutilization.";
    public static final String BROKER_ADDITION_DETECTOR_CPU_PERCENT_COMPLETION_THRESHOLD_CONFIG = "broker.addition.mean.cpu.percent.completion.threshold";
    private static final double DEFAULT_BROKER_ADDITION_DETECTOR_CPU_PERCENT_COMPLETION_THRESHOLD = 0.5d;
    private static final String BROKER_ADDITION_DETECTOR_CPU_PERCENT_COMPLETION_THRESHOLD_DOC = "A double denoting what percentage of the cluster's mean CPU utilization a single broker needs to reach in order to be considered added to the cluster.Once a broker reaches this portion of the cluster's mean cpu utilization threshold, we will consider the broker addition operation for that particular broker complete. For example,If a cluster has mean CPU Utilization of 50%, a new broker is added and threshold is .5, then we would consider the broker addition complete once the broker reaches 25%(50% * .5) CPU Utilization.";
    public static final String BROKER_ADDITION_DETECTOR_COMPLETION_DURATION_THRESHOLD_CONFIG = "broker.addition.elapsed.time.ms.completion.threshold";
    private static final int DEFAULT_BROKER_ADDITION_DETECTOR_COMPLETION_DURATION_THRESHOLD = 57600000;
    private static final String BROKER_ADDITION_DETECTOR_COMPLETION_DURATION_THRESHOLD_DOC = "An integer denoting the milliseconds after which a broker addition operation should be considered complete.This is a safety measure to fall back to completion if the target 'broker.addition.mean.cpu.percent.completion.threshold' CPU threshold cannot be reached in due time.";
    public static final String DISK_UTILIZATION_DETECTOR_DURATION_MS_CONFIG = "disk.utilization.detector.duration.ms";
    private static final int DEFAULT_DISK_UTILIZATION_DETECTOR_DURATION_MS = 600000;
    private static final String DISK_UTILIZATION_DETECTOR_DURATION_MS_DOC = "The duration in milliseconds for which disk utilization values will be accumulated before the decision to raise an alert is made.";
    public static final String DISK_UTILIZATION_DETECTOR_OVERUTILIZATION_THRESHOLD_CONFIG = "disk.utilization.detector.overutilization.threshold";
    private static final double DEFAULT_DISK_UTILIZATION_DETECTOR_OVERUTILIZATION_THRESHOLD = 80.0d;
    private static final String DISK_UTILIZATION_DETECTOR_OVERUTILIZATION_THRESHOLD_DOC = "Disk usage percentage representing overutilization.";
    public static final String DISK_UTILIZATION_DETECTOR_UNDERUTILIZATION_THRESHOLD_CONFIG = "disk.utilization.detector.underutilization.threshold";
    private static final double DEFAULT_DISK_UTILIZATION_DETECTOR_UNDERUTILIZATION_THRESHOLD = 35.0d;
    private static final String DISK_UTILIZATION_DETECTOR_UNDERUTILIZATION_THRESHOLD_DOC = "Disk usage percentage representing underutilization.";
    public static final String DISK_UTILIZATION_DETECTOR_RESERVED_CAPACITY_CONFIG = "disk.utilization.detector.reserved.capacity";
    private static final double DEFAULT_DISK_UTILIZATION_DETECTOR_RESERVED_CAPACITY = 150000.0d;
    private static final String DISK_UTILIZATION_DETECTOR_RESERVED_CAPACITY_DOC = "Minimum disk space beyond which we don't shrink the cluster even if it is underutilized.";
    public static final String TOPIC_PARTITION_MOVEMENT_EXPIRATION_MS_CONFIG = "topic.partition.movement.expiration.ms";
    public static final long DEFAULT_TOPIC_PARTITION_MOVEMENT_EXPIRATION_MS = 10800000;
    public static final String TOPIC_PARTITION_MOVEMENT_EXPIRATION_MS_DOC = "The duration in milliseconds for which a topic partition movement will be kept for observability and oscillation prevention. Effectively, SBC maintains a rolling window of topic partition movements whose length is defined by this configuration.";
    public static final String TOPIC_PARTITION_MAXIMUM_MOVEMENTS_CONFIG = "topic.partition.maximum.movements";
    public static final int DEFAULT_TOPIC_PARTITION_MAXIMUM_MOVEMENTS = 5;
    public static final String TOPIC_PARTITION_MAXIMUM_MOVEMENTS_DOC = "Maximum number of repeated movements allowed for one TopicPartition in the rolling window whose length is defined by topic.partition.movement.expiration.ms";
    public static final String TOPIC_PARTITION_SUSPENSION_MS_CONFIG = "topic.partition.suspension.ms";
    public static final long DEFAULT_TOPIC_PARTITION_SUSPENSION_MS = 18000000;
    public static final String TOPIC_PARTITION_SUSPENSION_MS_DOC = "The duration in milliseconds for which a topic partition will be suspended for goal optimization if number of repeated movements within the rolling window defined by topic.partition.movement.expiration.ms exceeds the maximum allowed number defined by topic.partition.maximum.movements.";
    public static final String ENABLE_CELLS_CONFIG = "confluent.cells.enable";
    public static final String ENABLE_CELLS_DOC = "Whether to enable the cells feature (sub-flags also need to be enabled to turn on cells)";
    private UpdatableSbcGoalsConfig updatableSbcGoalsConfig;
    private static final Long DEFAULT_LEADER_ACTION_TIMEOUT_MS = 180000L;
    public static final Long DEFAULT_METADATA_TTL = 10000L;
    public static final Integer DEFAULT_METADATA_CLIENT_TIMEOUT_MS = 180000;
    public static final Long DEFAULT_PARTITION_METRICS_WINDOW_MS = 180000L;
    public static final Integer DEFAULT_NUM_PARTITION_METRICS_WINDOWS = 12;
    public static final Integer DEFAULT_MIN_SAMPLES_PER_PARTITION_METRICS_WINDOW = 1;
    public static final Double DEFAULT_MIN_VALID_PARTITION_RATIO = Double.valueOf(0.95d);
    public static final Double HOT_PARTITION_CAPACITY_UTILIZATION_THRESHOLD_DEFAULT = Double.valueOf(0.2d);
    public static final Double DEFAULT_CPU_BALANCE_THRESHOLD = Double.valueOf(1.1d);
    public static final Double DEFAULT_DISK_BALANCE_THRESHOLD = Double.valueOf(1.1d);
    public static final Double DEFAULT_NETWORK_INBOUND_BALANCE_THRESHOLD = Double.valueOf(1.1d);
    public static final Double DEFAULT_NETWORK_OUTBOUND_BALANCE_THRESHOLD = Double.valueOf(1.1d);
    public static final Double DEFAULT_CPU_CAPACITY_THRESHOLD = Double.valueOf(1.0d);
    public static final Double DEFAULT_DISK_CAPACITY_THRESHOLD = ConfluentConfigs.BALANCER_DISK_CAPACITY_THRESHOLD_DEFAULT;
    public static final Integer DEFAULT_DISK_CAPACITY_MIN_FREE_SPACE = ConfluentConfigs.BALANCER_DISK_MIN_FREE_SPACE_DEFAULT;
    public static final Double DEFAULT_NETWORK_INBOUND_CAPACITY_THRESHOLD = Double.valueOf(0.8d);
    public static final Double DEFAULT_PRODUCER_INBOUND_CAPACITY_THRESHOLD = Double.valueOf(0.9d);
    public static final Double DEFAULT_CONSUMER_OUTBOUND_CAPACITY_THRESHOLD = Double.valueOf(0.9d);
    public static final Double DEFAULT_NETWORK_OUTBOUND_CAPACITY_THRESHOLD = Double.valueOf(0.8d);
    public static final Double DEFAULT_CPU_LOW_UTILIZATION_THRESHOLD = Double.valueOf(0.2d);
    public static final Double DEFAULT_DISK_LOW_UTILIZATION_THRESHOLD = Double.valueOf(0.2d);
    public static final Double DEFAULT_NETWORK_INBOUND_LOW_UTILIZATION_THRESHOLD = Double.valueOf(0.2d);
    public static final Double DEFAULT_NETWORK_OUTBOUND_LOW_UTILIZATION_THRESHOLD = Double.valueOf(0.2d);
    public static final Long DISABLED_THROTTLE = ConfluentConfigs.BALANCER_THROTTLE_NO_THROTTLE;
    public static final long AUTO_THROTTLE = ConfluentConfigs.BALANCER_THROTTLE_AUTO_THROTTLE.longValue();
    private static final Long DEFAULT_THROTTLE_VALUE = ConfluentConfigs.BALANCER_THROTTLE_DEFAULT;
    static final List<String> SBC_HARD_GOALS = Arrays.asList(MovementExclusionGoal.class.getName(), ReplicaPlacementGoal.class.getName(), RackAwareGoal.class.getName(), CellAwareGoal.class.getName(), TenantAwareGoal.class.getName(), MaxReplicaMovementParallelismGoal.class.getName(), ReplicaCapacityGoal.class.getName(), DiskCapacityGoal.class.getName(), NetworkInboundCapacityGoal.class.getName(), NetworkOutboundCapacityGoal.class.getName(), ProducerInboundCapacityGoal.class.getName(), MirrorInboundCapacityGoal.class.getName(), ConsumerOutboundCapacityGoal.class.getName());
    static final List<String> SBC_V1_SOFT_GOALS = Arrays.asList(SystemTopicEvenDistributionGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskUsageDistributionGoal.class.getName(), LeaderReplicaDistributionGoal.class.getName(), NetworkInboundUsageDistributionGoal.class.getName(), NetworkOutboundUsageDistributionGoal.class.getName(), TopicReplicaDistributionGoal.class.getName(), LeaderBytesInDistributionGoal.class.getName());
    public static final List<String> DEFAULT_GOALS_LIST = (List) Stream.concat(SBC_HARD_GOALS.stream(), SBC_V1_SOFT_GOALS.stream()).collect(Collectors.toList());
    public static final List<String> MANUAL_REBALANCE_GOALS_LIST = Arrays.asList(ConsumerOutboundCapacityGoal.class.getName(), CpuCapacityGoal.class.getName(), CpuUsageDistributionGoal.class.getName(), DiskCapacityGoal.class.getName(), DiskUsageDistributionGoal.class.getName(), IncrementalCPUResourceDistributionGoal.class.getName(), IntraBrokerDiskCapacityGoal.class.getName(), IntraBrokerDiskUsageDistributionGoal.class.getName(), LeaderBytesInDistributionGoal.class.getName(), LeaderReplicaDistributionGoal.class.getName(), MovementExclusionGoal.class.getName(), NetworkInboundCapacityGoal.class.getName(), NetworkInboundUsageDistributionGoal.class.getName(), NetworkOutboundCapacityGoal.class.getName(), NetworkOutboundUsageDistributionGoal.class.getName(), PotentialNwOutGoal.class.getName(), ProducerInboundCapacityGoal.class.getName(), RackAwareGoal.class.getName(), ReplicaCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), SystemTopicEvenDistributionGoal.class.getName(), TopicReplicaDistributionGoal.class.getName(), CellAwareGoal.class.getName(), CrossRackMovementGoal.class.getName(), MaxReplicaMovementParallelismGoal.class.getName(), ReplicaPlacementGoal.class.getName(), TenantAwareGoal.class.getName());
    private static final String DEFAULT_ANOMALY_NOTIFIER_CLASS = SelfHealingNotifier.class.getName();
    private static final String DEFAULT_EXECUTOR_NOTIFIER_CLASS = ExecutorNoopNotifier.class.getName();
    public static final Integer DEFAULT_ANOMALY_DETECTION_INTERVAL_MS = 60000;
    public static final List<String> DEFAULT_ANOMALY_DETECTION_GOALS_LIST = Arrays.asList(ReplicaPlacementGoal.class.getName(), RackAwareGoal.class.getName(), CellAwareGoal.class.getName(), TenantAwareGoal.class.getName(), MaxReplicaMovementParallelismGoal.class.getName(), ReplicaCapacityGoal.class.getName(), DiskCapacityGoal.class.getName(), NetworkInboundCapacityGoal.class.getName(), NetworkOutboundCapacityGoal.class.getName(), ProducerInboundCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskUsageDistributionGoal.class.getName());
    public static final Long DEFAULT_BROKER_FAILURE_ALERT_THRESHOLD_MS = 0L;
    public static final Long DEFAULT_BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS = 1800000L;
    private static final Integer LOGDIR_REPSONSE_TIMEOUT_MS_DEFAULT = 30000;
    public static final Integer DEFAULT_DESCRIBE_TOPICS_RESPONSE_TIMEOUT_MS = 30000;
    public static final Integer DEFAULT_DESCRIBE_CLUSTER_RESPONSE_TIMEOUT_MS = 30000;
    public static final Integer DEFAULT_DESCRIBE_CONFIGS_RESPONSE_TIMEOUT_MS = 30000;
    public static final Integer DEFAULT_ALTER_CONFIGS_RESPONSE_TIMEOUT_MS = 30000;
    public static final Integer DEFAULT_DESCRIBE_CONFIGS_BATCH_SIZE = 1000;
    public static final Long DEFAULT_BROKER_REMOVAL_SHUTDOWN_MS = Long.valueOf(HouseKeeper.DEFAULT_PERIOD_MS);
    public static final Integer DEFAULT_BROKER_REPLICA_EXCLUSION_TIMEOUT_MS = 120000;
    public static final Integer DEFAULT_DESCRIBE_BROKER_REPLICA_EXCLUSION_TIMEOUT_MS = 60000;
    private static final Boolean DYNAMIC_THROTTLING_ENABLED_DEFAULT = true;
    public static final Double DEFAULT_WRITE_THROUGHPUT_MULTIPLIER = Double.valueOf(1.0d);
    public static final Double DEFAULT_READ_THROUGHPUT_MULTIPLIER = Double.valueOf(1.0d);
    public static final Long PLAN_COMPUTATION_RETRY_TIMEOUT_MS_DEFAULT = Long.valueOf(BalancerConfigs.BALANCER_PLAN_COMPUTATION_RETRY_TIMEOUT_MS_DEFAULT);
    public static final Long INVALID_REPLICA_ASSIGNMENT_RETRY_TIMEOUT_MS_DEFAULT = Long.valueOf(TimeUnit.MINUTES.toMillis(5));
    public static final Long EXECUTOR_REFRESH_TIME_MS_DEFAULT = Long.valueOf(TimeUnit.MINUTES.toMillis(1));
    public static final Double CAPACITY_BALANCING_DELTA_PERCENTAGE_DEFAULT = BalancerConfigs.BALANCER_CAPACITY_BALANCING_DELTA_PERCENTAGE_CONFIG_DEFAULT;
    public static final Boolean INCREMENTAL_BALANCING_ENABLED_DEFAULT = BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_ENABLED_DEFAULT;
    static final List<String> SBC_V2_SOFT_GOALS = Collections.singletonList(IncrementalCPUResourceDistributionGoal.class.getName());
    public static final List<String> INCREMENTAL_BALANCING_DEFAULT_GOALS_LIST = (List) Stream.concat(SBC_HARD_GOALS.stream(), SBC_V2_SOFT_GOALS.stream()).collect(Collectors.toList());
    public static final Double INCREMENTAL_BALANCING_STEP_RATIO_DEFAULT = BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_STEP_RATIO_DEFAULT;
    public static final Double INCREMENTAL_BALANCING_LOWER_BOUND_DEFAULT = BalancerConfigs.BALANCER_INCREMENTAL_BALANCING_LOWER_BOUND_DEFAULT;
    public static final Boolean SBC_METRICS_PARSER_ENABLED_DEFAULT = BalancerConfigs.SBC_METRICS_PARSER_ENABLED_DEFAULT;
    public static final Boolean ENABLE_CELLS_DEFAULT = false;
    public static final String METADATA_TTL_CONFIG = "metadata.ttl";
    private static final String METADATA_TTL_DOC = "The amount of time the MetadataClient will cache cluster metadata before refreshing";
    public static final String METADATA_CLIENT_TIMEOUT_MS_CONFIG = "metadata.client.timeout.ms";
    private static final String METADATA_CLIENT_TIMEOUT_MS_DOC = "The timeout, in milliseconds, after which the metadata client should give up trying to fetch the latest metadata";
    public static final String LEADER_ACTION_TIMEOUT_MS_CONFIG = "executor.leader.action.timeout.ms";
    private static final String LEADER_ACTION_TIMEOUT_MS_DOC = "The maximum time that the Executor will wait for a leader movement to finish. A leader movement will be marked as failed if it takes longer than this time to finish.";
    public static final String PARTITION_METRICS_WINDOW_MS_CONFIG = "partition.metrics.window.ms";
    private static final String PARTITION_METRICS_WINDOW_MS_DOC = "The size of the window in milliseconds to aggregate the Kafka partition metrics.";
    public static final String NUM_PARTITION_METRICS_WINDOWS_CONFIG = "num.partition.metrics.windows";
    private static final String NUM_PARTITION_METRICS_WINDOWS_DOC = "The total number of windows to keep for partition metric samples";
    public static final String MIN_SAMPLES_PER_PARTITION_METRICS_WINDOW_CONFIG = "min.samples.per.partition.metrics.window";
    private static final String MIN_SAMPLES_PER_PARTITION_METRICS_WINDOW_DOC = "The minimum number of PartitionMetricSamples needed to make a partition metrics window valid without extrapolation.";
    public static final String MAX_ALLOWED_EXTRAPOLATIONS_PER_PARTITION_CONFIG = "max.allowed.extrapolations.per.partition";
    private static final String MAX_ALLOWED_EXTRAPOLATIONS_PER_PARTITION_DOC = "The maximum allowed number of extrapolations for each partition. A partition will be considered as invalid if the total number extrapolations in all the windows goes above this number.";
    public static final String PARTITION_METRIC_SAMPLE_AGGREGATOR_COMPLETENESS_CACHE_SIZE_CONFIG = "partition.metric.sample.aggregator.completeness.cache.size";
    public static final String REMOVAL_HISTORY_RETENTION_TIME_MS_CONFIG = "removal.history.retention.time.ms";
    private static final String REMOVAL_HISTORY_RETENTION_TIME_MS_DOC = "The maximum time in milliseconds to retain the removal history of brokers.";
    public static final String MAX_ALLOWED_EXTRAPOLATIONS_PER_BROKER_CONFIG = "max.allowed.extrapolations.per.broker";
    private static final String MAX_ALLOWED_EXTRAPOLATIONS_PER_BROKER_DOC = "The maximum allowed number of extrapolations for each broker. A broker will be considered as invalid if the total number extrapolations in all the windows goes above this number.";
    public static final String BROKER_METRIC_SAMPLE_AGGREGATOR_COMPLETENESS_CACHE_SIZE_CONFIG = "broker.metric.sample.aggregator.completeness.cache.size";

    @Deprecated
    public static final String NUM_METRIC_FETCHERS_CONFIG = "num.metric.fetchers";
    private static final String NUM_METRIC_FETCHERS_DOC = "The number of metric fetchers to fetch from the Kafka cluster.";
    public static final String NUM_CACHED_RECENT_ANOMALY_STATES_CONFIG = "num.cached.recent.anomaly.states";
    public static final String NUM_CACHED_RECENT_ANOMALY_STATES_DOC = "The number of recent anomaly states cached for different anomaly types presented via the anomaly substate response of the state endpoint.";
    public static final String METRIC_SAMPLER_CLASS_CONFIG = "metric.sampler.class";
    public static final String METRIC_SAMPLER_CLASS_DEFAULT = "io.confluent.cruisecontrol.metricsreporter.ConfluentTelemetryReporterSampler";
    private static final String METRIC_SAMPLER_CLASS_DOC = "The class name of the metric sampler";
    public static final String BROKER_CAPACITY_CONFIG_RESOLVER_CLASS_CONFIG = "broker.capacity.config.resolver.class";
    private static final String BROKER_CAPACITY_CONFIG_RESOLVER_CLASS_DOC = "The broker capacity configuration resolver class name. The broker capacity configuration resolver is responsible for getting the broker capacity. The default implementation is a file based solution.";
    public static final String GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG = "goal.balancedness.priority.weight";
    private static final String GOAL_BALANCEDNESS_PRIORITY_WEIGHT_DOC = "The impact of having one level higher goal priority on the relative balancedness score. For example, 1.1 means that a goal with higher priority will have the 1.1x balancedness weight of the lower priority goal (assuming the same goal.balancedness.strictness.weight values for both goals).";
    public static final String GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG = "goal.balancedness.strictness.weight";
    private static final String GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_DOC = "The impact of strictness (i.e. hard or soft goal) on the relative balancedness score. For example, 1.5 means that a hard goal will have the 1.5x balancedness weight of a soft goal (assuming goal.balancedness.priority.weight is 1).";
    public static final String MIN_VALID_PARTITION_RATIO_CONFIG = "min.valid.partition.ratio";
    private static final String MIN_VALID_PARTITION_RATIO_DOC = "The minimum percentage of the total partitions required to be monitored in order to generate a valid load model. Because the topic and partitions in a Kafka cluster are dynamically changing. The load monitor will exclude some of the topics that does not have sufficient metric samples. This configuration defines the minimum required percentage of the partitions that must be included in the load model.";
    public static final String LEADER_NETWORK_INBOUND_WEIGHT_FOR_CPU_UTIL_CONFIG = "leader.network.inbound.weight.for.cpu.util";
    private static final String LEADER_NETWORK_INBOUND_WEIGHT_FOR_CPU_UTIL_DOC = "Kafka Cruise Control uses the following model to derive replica level CPU utilization: REPLICA_CPU_UTIL = a * LEADER_BYTES_IN_RATE + b * LEADER_BYTES_OUT_RATE + c * FOLLOWER_BYTES_IN_RATE.This configuration will be used as the weight for LEADER_BYTES_IN_RATE.";
    public static final String LEADER_NETWORK_OUTBOUND_WEIGHT_FOR_CPU_UTIL_CONFIG = "leader.network.outbound.weight.for.cpu.util";
    private static final String LEADER_NETWORK_OUTBOUND_WEIGHT_FOR_CPU_UTIL_DOC = "Kafka Cruise Control uses the following model to derive replica level CPU utilization: REPLICA_CPU_UTIL = a * LEADER_BYTES_IN_RATE + b * LEADER_BYTES_OUT_RATE + c * FOLLOWER_BYTES_IN_RATE.This configuration will be used as the weight for LEADER_BYTES_OUT_RATE.";
    public static final String FOLLOWER_NETWORK_INBOUND_WEIGHT_FOR_CPU_UTIL_CONFIG = "follower.network.inbound.weight.for.cpu.util";
    private static final String FOLLOWER_NETWORK_INBOUND_WEIGHT_FOR_CPU_UTIL_DOC = "Kafka Cruise Control uses the following model to derive replica level CPU utilization: REPLICA_CPU_UTIL = a * LEADER_BYTES_IN_RATE + b * LEADER_BYTES_OUT_RATE + c * FOLLOWER_BYTES_IN_RATE.This configuration will be used as the weight for FOLLOWER_BYTES_IN_RATE.";
    public static final String HOT_PARTITION_CAPACITY_UTILIZATION_THRESHOLD_CONFIG = "hot.partition.capacity.utilization.threshold";
    private static final String HOT_PARTITION_CAPACITY_UTILIZATION_THRESHOLD_DOC = "The maximum allowed percentage threshold of resource utilization that a partition can take from a single broker, before said partition is categorized as hot. A hot partition is simply a partition with heavy resource usage and is solely marked as such for observability purposes - it is not handled in any special way. For example, 0.2 means that if a partition takes more than 20% of a broker's resource capacity, it is considered hot.";
    public static final String DISK_BALANCE_THRESHOLD_CONFIG = "disk.balance.threshold";
    private static final String DISK_BALANCE_THRESHOLD_DOC = "The maximum allowed extent of unbalance for disk utilization. For example, 1.10 means the highest disk usage of a broker should not be above 1.10x of average disk utilization of all the brokers.";
    public static final String NETWORK_INBOUND_BALANCE_THRESHOLD_CONFIG = "network.inbound.balance.threshold";
    private static final String NETWORK_INBOUND_BALANCE_THRESHOLD_DOC = "The maximum allowed extent of unbalance for network inbound usage. For example, 1.10 means the highest network inbound usage of a broker should not be above 1.10x of average network inbound usage of all the brokers.";
    public static final String NETWORK_OUTBOUND_BALANCE_THRESHOLD_CONFIG = "network.outbound.balance.threshold";
    private static final String NETWORK_OUTBOUND_BALANCE_THRESHOLD_DOC = "The maximum allowed extent of unbalance for network outbound usage. For example, 1.10 means the highest network outbound usage of a broker should not be above 1.10x of average network outbound usage of all the brokers.";
    public static final String REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG = "replica.count.balance.threshold";
    private static final String REPLICA_COUNT_BALANCE_THRESHOLD_DOC = "The maximum allowed extent of unbalance for replica distribution. For example, 1.10 means the highest replica count of a broker should not be above 1.10x of average replica count of all brokers.";
    public static final String LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG = "leader.replica.count.balance.threshold";
    private static final String LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_DOC = "The maximum allowed extent of unbalance for leader replica distribution. For example, 1.10 means the highest leader replica count of a broker should not be above 1.10x of average leader replica count of all alive brokers.";
    public static final String TOPIC_BALANCING_BALANCE_THRESHOLD_MULTIPLIER_CONFIG = "topic.balancing.balance.threshold.multiplier";
    private static final String TOPIC_BALANCING_BALANCE_THRESHOLD_MULTIPLIER_DOC = "The maximum allowed extent of unbalance for replica distribution from each topic,also called the 'balancing threshold'. For example, 1.0x means the highest topic leader or follower replica count for a broker should not be above 1.0x of the average leader or follower replica count respectively for all brokers for that topic.The same applies for the lowest topic leader or follower replica count of a broker - it should not surpassthe symmetrical with the upper bound lower bound threshold.Short example, for a cluster with 6 brokers and given topic 'A' with 10 leader and 20 follower replicas, brokers should host between [floor(1.6-(1.6*1.0-1.6)), ceil(1.6*1.0)], i.e. [1,2], leader replicas where 1.6 is the average leader replicas per broker for that topic, whereas for follower replicas the brokersshould host between [floor(3.3-(3.3*1.0-3.3) - ceil(3.3*1.0)], i.e. [3,4], where 3.3 is the average follower replicasper broker for that topic. Config values should be floating point numbers greater than or equal to 1.";
    public static final String TOPIC_BALANCING_TRIGGER_THRESHOLD_MULTIPLIER_CONFIG = "topic.balancing.trigger.threshold.multiplier";
    private static final String TOPIC_BALANCING_TRIGGER_THRESHOLD_MULTIPLIER_DOC = "How tolerable to replica distribution imbalances should the topic replicadistribution goal be during goal violation detections. A value of 3.0 means that it will consider a replica distribution violation if a broker hosts 3.0x{topic.replica.balancing.balance.threshold} replicas more or less than the average for a given topic. Config values should be floating point numbers greater than or equal to 1.";
    private static final String DISK_CAPACITY_MIN_FREE_SPACE_DOC = "The minimum amount of disk space, in gigabytes, that needs to remain unused on a broker. The analyzer will enforce a hard goal that the disk usage of a broker cannot be higher than (broker.disk.capacity - disk.min.free.space.gb).";
    public static final String CPU_CAPACITY_THRESHOLD_CONFIG = "cpu.capacity.threshold";
    private static final String CPU_CAPACITY_THRESHOLD_DOC = "The maximum percentage of the total broker.cpu.capacity that is allowed to be used on a broker. The analyzer will enforce a hard goal that the cpu utilization of a broker cannot be higher than (broker.cpu.capacity * cpu.capacity.threshold).";
    private static final String DISK_CAPACITY_THRESHOLD_DOC = "The maximum percentage of the total broker.disk.capacity that is allowed to be used on a broker. The analyzer will enforce a hard goal that the disk usage of a broker cannot be higher than (broker.disk.capacity * disk.capacity.threshold).";
    public static final String NETWORK_INBOUND_CAPACITY_THRESHOLD_CONFIG = "network.inbound.capacity.threshold";
    private static final String NETWORK_INBOUND_CAPACITY_THRESHOLD_DOC = "The maximum percentage of the total confluent.balancer.network.in.max.bytes.per.second that is allowed to be used on a broker. The analyzer will enforce a hard goal that the network usage of a broker cannot be higher than (confluent.balancer.network.in.max.bytes.per.second * network.inbound.capacity.threshold).";
    public static final String PRODUCER_INBOUND_CAPACITY_THRESHOLD_CONFIG = "producer.inbound.capacity.threshold";
    private static final String PRODUCER_INBOUND_CAPACITY_THRESHOLD_DOC = "The maximum percentage of the total confluent.balancer.producer.in.max.bytes.per.second that is allowed to be used on a broker. The analyzer will enforce a hard goal that the producer inbound traffic of a broker cannot be higher than (confluent.balancer.producer.in.max.bytes.per.second * producer.inbound.capacity.threshold).";
    public static final String NETWORK_OUTBOUND_CAPACITY_THRESHOLD_CONFIG = "network.outbound.capacity.threshold";
    private static final String NETWORK_OUTBOUND_CAPACITY_THRESHOLD_DOC = "The maximum percentage of the total confluent.balancer.network.out.max.bytes.per.second that is allowed to be used on a broker. The analyzer will enforce a hard goal that the network usage of a broker cannot be higher than (confluent.balancer.network.out.max.bytes.per.second * network.outbound.capacity.threshold).";
    public static final String CONSUMER_OUTBOUND_CAPACITY_THRESHOLD_CONFIG = "consumer.outbound.capacity.threshold";
    private static final String CONSUMER_OUTBOUND_CAPACITY_THRESHOLD_DOC = "The maximum percentage of the total confluent.balancer.consumer.out.max.bytes.per.second that is allowed to be used on a broker. The analyzer will enforce a hard goal that the consumer outbound traffic of a broker cannot be higher than (confluent.balancer.consumer.out.max.bytes.per.second * consumer.outbound.capacity.threshold). Note that fetch from follower traffic is not accounted for in this first release.";
    public static final String CPU_LOW_UTILIZATION_THRESHOLD_CONFIG = "cpu.low.utilization.threshold";
    private static final String CPU_LOW_UTILIZATION_THRESHOLD_DOC = "The threshold for Kafka Cruise Control to define the utilization of CPU is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. The threshold is in percentage.";
    public static final String DISK_LOW_UTILIZATION_THRESHOLD_CONFIG = "disk.low.utilization.threshold";
    private static final String DISK_LOW_UTILIZATION_THRESHOLD_DOC = "The threshold for Kafka Cruise Control to define the utilization of DISK is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. The threshold is in percentage.";
    public static final String NETWORK_INBOUND_LOW_UTILIZATION_THRESHOLD_CONFIG = "network.inbound.low.utilization.threshold";
    private static final String NETWORK_INBOUND_LOW_UTILIZATION_THRESHOLD_DOC = "The threshold for Kafka Cruise Control to define the utilization of network inbound rate is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. The threshold is in percentage.";
    public static final String NETWORK_OUTBOUND_LOW_UTILIZATION_THRESHOLD_CONFIG = "network.outbound.low.utilization.threshold";
    private static final String NETWORK_OUTBOUND_LOW_UTILIZATION_THRESHOLD_DOC = "The threshold for Kafka Cruise Control to define the utilization of network outbound rate is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. The threshold is in percentage.";
    private static final String MAX_REPLICAS_PER_BROKER_DOC = "The maximum number of replicas allowed to reside on a broker. The analyzer will enforce a hard goal that the number of replica on a broker cannot be higher than this config.";
    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
    private static final String ZOOKEEPER_CONNECT_DOC = "The zookeeper path used by the Kafka cluster.";
    public static final String ZOOKEEPER_SECURITY_ENABLED_CONFIG = "zookeeper.security.enabled";
    private static final String ZOOKEEPER_SECURITY_ENABLED_DOC = "Specify if zookeeper is secured, true or false";
    public static final String NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG = "num.concurrent.partition.movements.per.broker";
    private static final String NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_DOC = "The maximum number of partitions the executor will move to or out of a broker at the same time. e.g. setting the value to 10 means that the executor will at most allow 10 partitions moves with relation to a broker at any given point, regardless if they are out of the broker or into it. This is useful to avoid overwhelming the cluster by inter-broker partition movements.";
    public static final String NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG = "num.concurrent.intra.broker.partition.movements";
    private static final String NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_DOC = "The maximum number of partitions the executor will move across disks within a broker at the same time. e.g. setting the value to 10 means that the executor will at most allow 10 partitions to move across disks within a broker at any given point. This is to avoid overwhelming the cluster by intra-broker partition movements.";
    public static final String NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG = "num.concurrent.leader.movements";
    private static final String NUM_CONCURRENT_LEADER_MOVEMENTS_DOC = "The maximum number of leader movements the executor will take as one batch. This is mainly because the ZNode has a 1 MB size upper limit. And it will also reduce the controller burden.";
    private static final String REPLICATION_THROTTLE_DOC = "The replication throttle applied to replicas being moved, in bytes per second. A value of null or -1 means no throttle isapplied. A value of -2 means the throttle is automatically calculated based on the cluster metrics.";
    public static final String REPLICA_MOVEMENT_STRATEGIES_CONFIG = "replica.movement.strategies";
    private static final String REPLICA_MOVEMENT_STRATEGIES_DOC = "A list of supported strategies used to determine execution order for generated partition movement tasks.";
    public static final String DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG = "default.replica.movement.strategies";
    private static final String DEFAULT_REPLICA_MOVEMENT_STRATEGIES_DOC = "The list of replica movement strategies that will be used by default if no replica movement strategy list is provided.";
    public static final String EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG = "execution.progress.check.interval.ms";
    private static final long DEFAULT_EXECUTION_PROGRESS_CHECK_INTERVAL_MS = 7000;
    private static final String EXECUTION_PROGRESS_CHECK_INTERVAL_MS_DOC = "The interval in milliseconds that the executor will check on the execution progress.";
    public static final String POPULATE_DEFAULT_DISK_CAPACITY_FROM_LOCAL_CONFIG = "populate.default.disk.capacity.from.local";
    private static final String POPULATE_DEFAULT_DISK_CAPACITY_FROM_LOCAL_DOC = "If this configuration is set to true, SBC will configure the default broker disk capacity with the local disk size. The local disk is the disk mounted to the node where SBC runs. If the configuration is set to false, SBC won't set up the default disk capacity but would rather exclusively rely on the metrics for denoting what capacity a broker has. In the edge cases where new brokers are added and metrics still haven't arrived, SBC will assume its disk capacity is the same as other peer brokers.";
    private static final String GOALS_DOC = "A list of case insensitive goals in the order of priority. The high priority goals will be executed first.";
    public static final String INTRA_BROKER_GOALS_CONFIG = "intra.broker.goals";
    private static final String INTRA_BROKER_GOALS_DOC = "A list of case insensitive intra-broker goals in the order of priority. The high priority goals will be executed first. The intra-broker goals are only relevant if intra-broker operation is supported(i.e. in  Cruise Control versions above 2.*), otherwise this list should be empty.";
    public static final String ANOMALY_NOTIFIER_CLASS_CONFIG = "anomaly.notifier.class";
    private static final String ANOMALY_NOTIFIER_CLASS_DOC = "The notifier class to trigger an alert when an anomaly is violated. The anomaly could be either a goal violation, broker failure.";
    public static final String EXECUTOR_NOTIFIER_CLASS_CONFIG = "executor.notifier.class";
    private static final String EXECUTOR_NOTIFIER_CLASS_DOC = "The executor notifier class to trigger an alert when an execution finishes or is stopped (by a user or by Cruise Control).";
    public static final String ANOMALY_DETECTION_INTERVAL_MS_CONFIG = "anomaly.detection.interval.ms";
    private static final String ANOMALY_DETECTION_INTERVAL_MS_DOC = "The interval in millisecond that the detectors will run to detect the anomalies.";
    public static final String ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG = "anomaly.detection.allow.capacity.estimation";
    private static final String ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_DOC = "The flag to indicate whether anomaly detection threads allow capacity estimation in the generated cluster model they use.";
    public static final String SAMPLING_ALLOW_CPU_CAPACITY_ESTIMATION_CONFIG = "sampling.allow.cpu.capacity.estimation";
    private static final String SAMPLING_ALLOW_CPU_CAPACITY_ESTIMATION_DOC = "The flag to indicate whether sampling process allows CPU capacity estimation of brokers used for CPU utilization estimation.";
    private static final String ANOMALY_DETECTION_GOALS_DOC = "The goals that anomaly detector should detect if they areviolated.";
    public static final String BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG = "broker.failure.exclude.recently.removed.brokers";
    private static final String BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_DOC = "True if recently removed brokers are excluded from optimizations during broker failure self healing, false otherwise.";
    public static final String GOAL_VIOLATION_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG = "goal.violation.exclude.recently.removed.brokers";
    private static final String GOAL_VIOLATION_EXCLUDE_RECENTLY_REMOVED_BROKERS_DOC = "True if recently removed brokers are excluded from optimizations during goal violation self healing, false otherwise.";
    public static final String TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG = "topics.excluded.from.partition.movement";
    private static final String TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_DOC = "The topics that should be excluded from the partition movement. It is a regex. Notice that this regex will be ignored when decommission a broker is invoked.";
    public static final String PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG = "partition.metric.sample.store.topic";
    public static final String DEFAULT_PARTITION_SAMPLE_STORE_TOPIC = "_confluent_balancer_partition_samples";
    public static final String PARTITION_METRIC_SAMPLE_STORE_TOPIC_DOC = "Topic where partition metric samples are stored.";
    public static final String BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG = "broker.metric.sample.store.topic";
    public static final String DEFAULT_BROKER_SAMPLE_STORE_TOPIC = "_confluent_balancer_broker_samples";
    public static final String BROKER_METRIC_SAMPLE_STORE_TOPIC_DOC = "Topic where broker metric samples are stored.";
    public static final String LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG = "logdir.response.timeout.ms";
    private static final String LOGDIR_RESPONSE_TIMEOUT_MS_DOC = "Timeout in ms for broker logdir to respond";
    public static final String DESCRIBE_TOPICS_RESPONSE_TIMEOUT_MS_CONFIG = "describe.topics.response.timeout.ms";
    private static final String DESCRIBE_TOPICS_RESPONSE_TIMEOUT_MS_DOC = "Timeout in ms for the broker to respond to a describe topics request";
    public static final String DESCRIBE_CLUSTER_RESPONSE_TIMEOUT_MS_CONFIG = "describe.cluster.response.timeout.ms";
    private static final String DESCRIBE_CLUSTER_RESPONSE_TIMEOUT_MS_DOC = "Timeout in ms for the broker to respond to a describe cluster request";
    public static final String DESCRIBE_CONFIGS_RESPONSE_TIMEOUT_MS_CONFIG = "describe.configs.response.timeout.ms";
    private static final String DESCRIBE_CONFIGS_RESPONSE_TIMEOUT_MS_DOC = "Timeout in ms for the broker to respond to a describe configs request";
    public static final String ALTER_CONFIGS_RESPONSE_TIMEOUT_MS_CONFIG = "alter.configs.response.timeout.ms";
    private static final String ALTER_CONFIGS_RESPONSE_TIMEOUT_MS_DOC = "Timeout in ms for the broker to respond to an incremental alter configs request";
    public static final String DESCRIBE_CONFIGS_BATCH_SIZE_CONFIG = "describe.configs.batch.size";
    private static final String DESCRIBE_CONFIGS_BATCH_SIZE_DOC = "Maximum size of entities to fetch in one call of describe configs. -1 to ignore this setting and fetch all entities in one ";
    public static final String BROKER_REMOVAL_SHUTDOWN_MS_CONFIG = "broker.removal.shutdown.timeout.ms";
    private static final String BROKER_REMOVAL_SHUTDOWN_MS_DOC = "Timeout in ms for for all brokers with shutdown requests to shut down and leave the cluster.";
    public static final String BROKER_REPLICA_EXCLUSION_TIMEOUT_MS_CONFIG = "broker.replica.exclusion.timeout.ms";
    public static final String BROKER_REPLICA_EXCLUSION_TIMEOUT_MS_DOC = "Timeout in ms for the controller to respond to a broker exclusion request";
    public static final String DESCRIBE_BROKER_REPLICA_EXCLUSION_TIMEOUT_MS_CONFIG = "describe.broker.exclusion.timeout.ms";
    public static final String DESCRIBE_BROKER_REPLICA_EXCLUSION_TIMEOUT_MS_DOC = "Timeout in ms for the controller to respond to a describe broker exclusions request";
    public static final String MAX_VOLUME_THROUGHPUT_MB_CONFIG = "max.volume.throughput.mb";
    private static final String MAX_VOLUME_THROUGHPUT_MB_DOC = "The maximum throughput of the storage volume. This should be the minimum of the maximum disk throughput on the volume and the maximum network throughput to the volume";
    public static final String WRITE_THROUGHPUT_MULTIPLIER_CONFIG = "write.throughput.multiplier";
    private static final String WRITE_THROUGHPUT_MULTIPLIER_DOC = "The factor that write throughput is multiplied by when computing network usage. GCP counts communication with the storage volumes and replication of the written data against the overall network usage";
    public static final String READ_THROUGHPUT_MULTIPLIER_CONFIG = "read.throughput.multiplier";
    private static final String READ_THROUGHPUT_MULTIPLIER_DOC = "The factor that read throughput is multiplied by when computing network usage. GCP counts communication with the storage volumes against the overall network usage";
    public static final String CALCULATED_THROTTLE_RATIO_CONFIG = "calculated.throttle.ratio";
    private static final String CALCULATED_THROTTLE_RATIO_DOC = "When the throttling rate is calculated, this is the amount of available network throughput (max throughput - currently used throughput) that is given to reassignment traffic";
    public static final String DISK_READ_RATIO_CONFIG = "disk.read.ratio";
    private static final String DISK_READ_RATIO_DOC = "When calculating a replication throttle, this is the ratio of reads that are assumed to read from the actual disk, rather than the page cache";
    public static final String OVERRIDE_STATIC_THROTTLES_CONFIG = "static.throttle.rate.override.enabled";
    private static final String OVERRIDE_STATIC_THROTTLES_DOC = "Whether Cruise Control will override statically-set throttles on the broker when executing proposals";
    public static final String DYNAMIC_THROTTLING_ENABLED_CONFIG = "dynamic.throttling.enabled";
    private static final String DYNAMIC_THROTTLING_ENABLED_DOC = "Whether SBC will work on setting throttle rates dynamically. Not necessary when static throttles are set on the cluster.";
    public static final String SELF_HEALING_BROKER_FAILURE_ENABLED_CONFIG = "self.healing.broker.failure.enabled";
    public static final String SELF_HEALING_BROKER_FAILURE_ENABLED_DOC = "Enable self healing for broker failure detector.";
    public static final String SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG = "self.healing.goal.violation.enabled";
    public static final String SELF_HEALING_GOAL_VIOLATION_ENABLED_DOC = "Enable self healing for goal violation detector.";
    public static final String BROKER_FAILURE_ALERT_THRESHOLD_MS_CONFIG = "broker.failure.alert.threshold.ms";
    public static final String BROKER_FAILURE_ALERT_THRESHOLD_MS_DOC = "Grace period in milliseconds for a failed broker to rejoin the cluster before firing an alert";
    public static final String BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG = "broker.failure.self.healing.threshold.ms";
    public static final String BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_DOC = "Grace period in milliseconds for a failed broker to rejoin the cluster before triggering a self-healing action. Must not be less than 'broker.failure.alert.threshold.ms'";
    public static final String INVALID_REPLICA_ASSIGNMENT_RETRY_TIMEOUT_MS_CONFIG = "invalid.replica.assignment.retry.timeout.ms";
    public static final String INVALID_REPLICA_ASSIGNMENT_RETRY_TIMEOUT_MS_DOC = "The time, in milliseconds, after which proposal execution will retry the failed proposal due to an invalid replica assignment on the same proposal. ";
    public static final String EXECUTOR_REFRESH_TIME_MS_CONFIG = "executor.reservation.refresh.time.ms";
    public static final String EXECUTOR_REFRESH_TIME_MS_DOC = "The time, in milliseconds, after which a newly-acquired executor reservation which aborted an executor run should wait before being returned. Useful for edge cases where metadata in the underlying Kafka cluster isn't updated quickly enough.";
    public static final String SHUTDOWN_TIMEOUT_MS_CONFIG = "cdbe.shutdown.wait.ms";
    private static final String SHUTDOWN_TIMEOUT_MS_DOC = "Maximum amount of time for CDBE to wait for its ExecutorService to shutdown.";
    public static final String CC_STARTUP_RETRY_DURATION_HOURS_CONFIG = "startup.retry.max.hours";
    private static final String CC_STARTUP_RETRY_DURATION_HOURS_DOC = "Maximum hours to keep on retrying to start CruiseControl before giving up.";
    public static final String CC_STARTUP_RETRY_DELAY_MINUTES_CONFIG = "startup.retry.delay.minutes";
    private static final String CC_STARTUP_RETRY_DELAY_MINUTES_DOC = "Minutes to wait between two tries to start CruiseControl";
    public static final String REQUEST_CONTRIBUTION_WEIGHT_CONFIG = "request.cpu.contribution.weight";
    private static final String REQUEST_CONTRIBUTION_WEIGHT_DOC = "Fraction of CPU that is caused by number of requests on the broker. This includes produce, consume and replication fetch requests.";
    public static final String BYTES_CONTRIBUTION_WEIGHT_CONFIG = "bytes.cpu.contribution.weight";
    private static final String BYTES_CONTRIBUTION_WEIGHT_DOC = "Fraction of CPU that is caused by bytes in/out on the broker. Includes produce bytes-in and consume bytes-out (FFL and FFF) as well as replication bytes.";
    private static final ConfigDef CONFIG = new ConfigDef().define("bootstrap.servers", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).").define("client.id", ConfigDef.Type.STRING, "kafka-cruise-control", ConfigDef.Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC).define("send.buffer.bytes", ConfigDef.Type.INT, 131072, ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC).define("receive.buffer.bytes", ConfigDef.Type.INT, 32768, ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC).define("reconnect.backoff.ms", ConfigDef.Type.LONG, 50L, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC).define("connections.max.idle.ms", ConfigDef.Type.LONG, 540000, ConfigDef.Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC).define("sbc.metrics.parser.enabled", ConfigDef.Type.BOOLEAN, SBC_METRICS_PARSER_ENABLED_DEFAULT, ConfigDef.Importance.MEDIUM, "A boolean value controlling whether to use the sbc metric parser or the default one.").define("request.timeout.ms", ConfigDef.Type.INT, 30000, ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.").define("default.api.timeout.ms", ConfigDef.Type.INT, 60000, ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, "Specifies the timeout (in milliseconds) for client APIs. This configuration is used as the default timeout for all client operations that do not specify a <code>timeout</code> parameter.").define(METADATA_TTL_CONFIG, ConfigDef.Type.LONG, DEFAULT_METADATA_TTL, ConfigDef.Range.atLeast(0), ConfigDef.Importance.HIGH, METADATA_TTL_DOC).define(METADATA_CLIENT_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, DEFAULT_METADATA_CLIENT_TIMEOUT_MS, ConfigDef.Range.atLeast(0), ConfigDef.Importance.HIGH, METADATA_CLIENT_TIMEOUT_MS_DOC).define(LEADER_ACTION_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_LEADER_ACTION_TIMEOUT_MS, ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, LEADER_ACTION_TIMEOUT_MS_DOC).define(PARTITION_METRICS_WINDOW_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_PARTITION_METRICS_WINDOW_MS, ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, PARTITION_METRICS_WINDOW_MS_DOC).define(NUM_PARTITION_METRICS_WINDOWS_CONFIG, ConfigDef.Type.INT, DEFAULT_NUM_PARTITION_METRICS_WINDOWS, ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, NUM_PARTITION_METRICS_WINDOWS_DOC).define(MIN_SAMPLES_PER_PARTITION_METRICS_WINDOW_CONFIG, ConfigDef.Type.INT, DEFAULT_MIN_SAMPLES_PER_PARTITION_METRICS_WINDOW, ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, MIN_SAMPLES_PER_PARTITION_METRICS_WINDOW_DOC).define(MAX_ALLOWED_EXTRAPOLATIONS_PER_PARTITION_CONFIG, ConfigDef.Type.INT, 5, ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, MAX_ALLOWED_EXTRAPOLATIONS_PER_PARTITION_DOC).define(PARTITION_METRIC_SAMPLE_AGGREGATOR_COMPLETENESS_CACHE_SIZE_CONFIG, ConfigDef.Type.INT, 5, ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, "The metric sample aggregator caches the completeness metadata for fast query. The completeness describes the confidence level of the data in the metric sample aggregator. It is primarily measured by the validity of the metricssamples in different windows. This configuration configures The number of completeness cache slots to maintain.").define(REMOVAL_HISTORY_RETENTION_TIME_MS_CONFIG, ConfigDef.Type.LONG, Long.valueOf(TimeUnit.HOURS.toMillis(24)), ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, REMOVAL_HISTORY_RETENTION_TIME_MS_DOC).define(MAX_ALLOWED_EXTRAPOLATIONS_PER_BROKER_CONFIG, ConfigDef.Type.INT, 5, ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, MAX_ALLOWED_EXTRAPOLATIONS_PER_BROKER_DOC).define(BROKER_METRIC_SAMPLE_AGGREGATOR_COMPLETENESS_CACHE_SIZE_CONFIG, ConfigDef.Type.INT, 5, ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, "The metric sample aggregator caches the completeness metadata for fast query. The completeness describes the confidence level of the data in the metric sample aggregator. It is primarily measured by the validity of the metricssamples in different windows. This configuration configures The number of completeness cache slots to maintain.").define("security.protocol", ConfigDef.Type.STRING, "PLAINTEXT", ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC).define(NUM_METRIC_FETCHERS_CONFIG, ConfigDef.Type.INT, 1, ConfigDef.Importance.HIGH, NUM_METRIC_FETCHERS_DOC).define(NUM_CACHED_RECENT_ANOMALY_STATES_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Range.between(1, 100), ConfigDef.Importance.LOW, NUM_CACHED_RECENT_ANOMALY_STATES_DOC).define(METRIC_SAMPLER_CLASS_CONFIG, ConfigDef.Type.CLASS, METRIC_SAMPLER_CLASS_DEFAULT, ConfigDef.Importance.HIGH, METRIC_SAMPLER_CLASS_DOC).define(BROKER_CAPACITY_CONFIG_RESOLVER_CLASS_CONFIG, ConfigDef.Type.CLASS, BrokerCapacityResolver.class.getName(), ConfigDef.Importance.MEDIUM, BROKER_CAPACITY_CONFIG_RESOLVER_CLASS_DOC).define("network.in.max.bytes.per.second", ConfigDef.Type.LONG, ConfluentConfigs.BALANCER_NETWORK_IN_CAPACITY_DEFAULT, ConfigDef.Importance.HIGH, "This config specifies the upper capacity limit for network incoming bytes per second per broker. The Confluent DataBalancer will attempt to keep incoming data throughput below this limit.").define("producer.in.max.bytes.per.second", ConfigDef.Type.LONG, BalancerConfigs.BALANCER_PRODUCER_IN_CAPACITY_DEFAULT, ConfigDef.Importance.HIGH, BalancerConfigs.BALANCER_PRODUCER_IN_CAPACITY_DOC).define("network.out.max.bytes.per.second", ConfigDef.Type.LONG, ConfluentConfigs.BALANCER_NETWORK_OUT_CAPACITY_DEFAULT, ConfigDef.Importance.HIGH, "This config specifies the upper capacity limit for network outgoing bytes per second per broker. The Confluent DataBalancer will attempt to keep outgoing data throughput below this limit.").define("consumer.out.max.bytes.per.second", ConfigDef.Type.LONG, BalancerConfigs.BALANCER_CONSUMER_OUT_CAPACITY_DEFAULT, ConfigDef.Importance.HIGH, BalancerConfigs.BALANCER_CONSUMER_OUT_CAPACITY_DOC).define(GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(1.1d), ConfigDef.Range.between(1, 2), ConfigDef.Importance.LOW, GOAL_BALANCEDNESS_PRIORITY_WEIGHT_DOC).define(GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(1.5d), ConfigDef.Range.between(1, 2), ConfigDef.Importance.LOW, GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_DOC).define(MIN_VALID_PARTITION_RATIO_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_MIN_VALID_PARTITION_RATIO, ConfigDef.Range.between(0, 1), ConfigDef.Importance.HIGH, MIN_VALID_PARTITION_RATIO_DOC).define(LEADER_NETWORK_INBOUND_WEIGHT_FOR_CPU_UTIL_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(0.7d), ConfigDef.Range.between(0, 1), ConfigDef.Importance.MEDIUM, LEADER_NETWORK_INBOUND_WEIGHT_FOR_CPU_UTIL_DOC).define(LEADER_NETWORK_OUTBOUND_WEIGHT_FOR_CPU_UTIL_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(0.15d), ConfigDef.Range.between(0, 1), ConfigDef.Importance.MEDIUM, LEADER_NETWORK_OUTBOUND_WEIGHT_FOR_CPU_UTIL_DOC).define(FOLLOWER_NETWORK_INBOUND_WEIGHT_FOR_CPU_UTIL_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(0.15d), ConfigDef.Range.between(0, 1), ConfigDef.Importance.MEDIUM, FOLLOWER_NETWORK_INBOUND_WEIGHT_FOR_CPU_UTIL_DOC).define(HOT_PARTITION_CAPACITY_UTILIZATION_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, HOT_PARTITION_CAPACITY_UTILIZATION_THRESHOLD_DEFAULT, ConfigDef.Range.between(0, 1), ConfigDef.Importance.LOW, HOT_PARTITION_CAPACITY_UTILIZATION_THRESHOLD_DOC).define("cpu.balance.threshold", ConfigDef.Type.DOUBLE, DEFAULT_CPU_BALANCE_THRESHOLD, ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, "The maximum allowed extent of unbalance for CPU utilization. Under relative threshold calculation, 1.10 means the highest CPU usage of a broker should not be above 1.10x of average CPU utilization of all the brokers. Under absolute threshold calculation, 1.10 means the highest CPU usage of a broker should not be 10% more than the average CPU utilization of the all the brokers. The lower threshold is symmetric to the upper threshold.").define(DISK_BALANCE_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_DISK_BALANCE_THRESHOLD, ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, DISK_BALANCE_THRESHOLD_DOC).define(NETWORK_INBOUND_BALANCE_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_NETWORK_INBOUND_BALANCE_THRESHOLD, ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, NETWORK_INBOUND_BALANCE_THRESHOLD_DOC).define(NETWORK_OUTBOUND_BALANCE_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_NETWORK_OUTBOUND_BALANCE_THRESHOLD, ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, NETWORK_OUTBOUND_BALANCE_THRESHOLD_DOC).define(REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(1.1d), ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, REPLICA_COUNT_BALANCE_THRESHOLD_DOC).define(LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(1.1d), ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_DOC).define(TOPIC_BALANCING_BALANCE_THRESHOLD_MULTIPLIER_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(1.0d), ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, TOPIC_BALANCING_BALANCE_THRESHOLD_MULTIPLIER_DOC).define(TOPIC_BALANCING_TRIGGER_THRESHOLD_MULTIPLIER_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(3.0d), ConfigDef.Range.atLeast(1), ConfigDef.Importance.HIGH, TOPIC_BALANCING_TRIGGER_THRESHOLD_MULTIPLIER_DOC).define("disk.min.free.space.gb", ConfigDef.Type.INT, DEFAULT_DISK_CAPACITY_MIN_FREE_SPACE, ConfigDef.Range.atLeast(0), ConfigDef.Importance.HIGH, DISK_CAPACITY_MIN_FREE_SPACE_DOC).define("goal.violation.distribution.threshold.multiplier", ConfigDef.Type.DOUBLE, Double.valueOf(1.1d), ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, "The multiplier applied to the threshold of distribution goals used for detecting and fixing anomalies. For example, 2.50 means the threshold for each distribution goal (i.e. Replica Distribution, Leader Replica Distribution, Resource Distribution, and Topic Replica Distribution Goals) will be 2.50x of the value used in manual goal optimization requests (e.g. rebalance).").define(CPU_CAPACITY_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_CPU_CAPACITY_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.HIGH, CPU_CAPACITY_THRESHOLD_DOC).define("disk.max.load", ConfigDef.Type.DOUBLE, DEFAULT_DISK_CAPACITY_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.HIGH, DISK_CAPACITY_THRESHOLD_DOC).define(NETWORK_INBOUND_CAPACITY_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_NETWORK_INBOUND_CAPACITY_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.HIGH, NETWORK_INBOUND_CAPACITY_THRESHOLD_DOC).define(PRODUCER_INBOUND_CAPACITY_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_PRODUCER_INBOUND_CAPACITY_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.HIGH, PRODUCER_INBOUND_CAPACITY_THRESHOLD_DOC).define(NETWORK_OUTBOUND_CAPACITY_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_NETWORK_OUTBOUND_CAPACITY_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.HIGH, NETWORK_OUTBOUND_CAPACITY_THRESHOLD_DOC).define(CONSUMER_OUTBOUND_CAPACITY_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_CONSUMER_OUTBOUND_CAPACITY_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.HIGH, CONSUMER_OUTBOUND_CAPACITY_THRESHOLD_DOC).define(CPU_LOW_UTILIZATION_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_CPU_LOW_UTILIZATION_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.MEDIUM, CPU_LOW_UTILIZATION_THRESHOLD_DOC).define(DISK_LOW_UTILIZATION_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_DISK_LOW_UTILIZATION_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.MEDIUM, DISK_LOW_UTILIZATION_THRESHOLD_DOC).define(NETWORK_INBOUND_LOW_UTILIZATION_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_NETWORK_INBOUND_LOW_UTILIZATION_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.MEDIUM, NETWORK_INBOUND_LOW_UTILIZATION_THRESHOLD_DOC).define(NETWORK_OUTBOUND_LOW_UTILIZATION_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_NETWORK_OUTBOUND_LOW_UTILIZATION_THRESHOLD, ConfigDef.Range.between(0, 1), ConfigDef.Importance.MEDIUM, NETWORK_OUTBOUND_LOW_UTILIZATION_THRESHOLD_DOC).define("max.replicas", ConfigDef.Type.LONG, ConfluentConfigs.BALANCER_REPLICA_CAPACITY_DEFAULT, ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, MAX_REPLICAS_PER_BROKER_DOC).define(ZOOKEEPER_CONNECT_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ZOOKEEPER_CONNECT_DOC).define(ZOOKEEPER_SECURITY_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.HIGH, ZOOKEEPER_SECURITY_ENABLED_DOC).define(NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG, ConfigDef.Type.INT, 5, ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_DOC).define(NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG, ConfigDef.Type.INT, 2, ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_DOC).define(NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG, ConfigDef.Type.INT, 1000, ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, NUM_CONCURRENT_LEADER_MOVEMENTS_DOC).define("throttle.bytes.per.second", ConfigDef.Type.LONG, DEFAULT_THROTTLE_VALUE, ConfigDef.Importance.MEDIUM, REPLICATION_THROTTLE_DOC).define(REPLICA_MOVEMENT_STRATEGIES_CONFIG, ConfigDef.Type.LIST, String.join(",", PostponeUrpReplicaMovementStrategy.class.getName(), PrioritizeLargeReplicaMovementStrategy.class.getName(), PrioritizeSmallReplicaMovementStrategy.class.getName(), BaseReplicaMovementStrategy.class.getName()), ConfigDef.Importance.MEDIUM, REPLICA_MOVEMENT_STRATEGIES_DOC).define(DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG, ConfigDef.Type.LIST, BaseReplicaMovementStrategy.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_REPLICA_MOVEMENT_STRATEGIES_DOC).define(EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, Long.valueOf(DEFAULT_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, EXECUTION_PROGRESS_CHECK_INTERVAL_MS_DOC).define(POPULATE_DEFAULT_DISK_CAPACITY_FROM_LOCAL_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, POPULATE_DEFAULT_DISK_CAPACITY_FROM_LOCAL_DOC).define("goals", ConfigDef.Type.LIST, String.join(",", DEFAULT_GOALS_LIST), ConfigDef.Importance.HIGH, GOALS_DOC).define(INTRA_BROKER_GOALS_CONFIG, ConfigDef.Type.LIST, String.join(",", IntraBrokerDiskCapacityGoal.class.getName(), IntraBrokerDiskUsageDistributionGoal.class.getName()), ConfigDef.Importance.HIGH, INTRA_BROKER_GOALS_DOC).define(ANOMALY_NOTIFIER_CLASS_CONFIG, ConfigDef.Type.CLASS, DEFAULT_ANOMALY_NOTIFIER_CLASS, ConfigDef.Importance.LOW, ANOMALY_NOTIFIER_CLASS_DOC).define(EXECUTOR_NOTIFIER_CLASS_CONFIG, ConfigDef.Type.CLASS, DEFAULT_EXECUTOR_NOTIFIER_CLASS, ConfigDef.Importance.LOW, EXECUTOR_NOTIFIER_CLASS_DOC).define(ANOMALY_DETECTION_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_ANOMALY_DETECTION_INTERVAL_MS, ConfigDef.Importance.LOW, ANOMALY_DETECTION_INTERVAL_MS_DOC).define(ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_DOC).define(SAMPLING_ALLOW_CPU_CAPACITY_ESTIMATION_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, SAMPLING_ALLOW_CPU_CAPACITY_ESTIMATION_DOC).define("anomaly.detection.goals", ConfigDef.Type.LIST, String.join(",", DEFAULT_ANOMALY_DETECTION_GOALS_LIST), ConfigDef.Importance.MEDIUM, ANOMALY_DETECTION_GOALS_DOC).define(BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_DOC).define(GOAL_VIOLATION_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, GOAL_VIOLATION_EXCLUDE_RECENTLY_REMOVED_BROKERS_DOC).define("goal.violation.delay.on.new.brokers.ms", ConfigDef.Type.LONG, 1800000L, ConfigDef.Importance.MEDIUM, "Amount of time (in ms) to delay self-healing when new brokers join the cluster. Allows the cluster to stabilize.").define(TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_DOC).define(PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG, ConfigDef.Type.STRING, DEFAULT_PARTITION_SAMPLE_STORE_TOPIC, ConfigDef.Importance.MEDIUM, PARTITION_METRIC_SAMPLE_STORE_TOPIC_DOC).define(BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG, ConfigDef.Type.STRING, DEFAULT_BROKER_SAMPLE_STORE_TOPIC, ConfigDef.Importance.MEDIUM, BROKER_METRIC_SAMPLE_STORE_TOPIC_DOC).define(LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, LOGDIR_REPSONSE_TIMEOUT_MS_DEFAULT, ConfigDef.Importance.LOW, LOGDIR_RESPONSE_TIMEOUT_MS_DOC).define(DESCRIBE_TOPICS_RESPONSE_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, DEFAULT_DESCRIBE_TOPICS_RESPONSE_TIMEOUT_MS, ConfigDef.Importance.LOW, DESCRIBE_TOPICS_RESPONSE_TIMEOUT_MS_DOC).define(DESCRIBE_CLUSTER_RESPONSE_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, DEFAULT_DESCRIBE_CLUSTER_RESPONSE_TIMEOUT_MS, ConfigDef.Importance.LOW, DESCRIBE_CLUSTER_RESPONSE_TIMEOUT_MS_DOC).define(DESCRIBE_CONFIGS_RESPONSE_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, DEFAULT_DESCRIBE_CONFIGS_RESPONSE_TIMEOUT_MS, ConfigDef.Importance.LOW, DESCRIBE_CONFIGS_RESPONSE_TIMEOUT_MS_DOC).define(ALTER_CONFIGS_RESPONSE_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_ALTER_CONFIGS_RESPONSE_TIMEOUT_MS, ConfigDef.Importance.LOW, ALTER_CONFIGS_RESPONSE_TIMEOUT_MS_DOC).define(DESCRIBE_CONFIGS_BATCH_SIZE_CONFIG, ConfigDef.Type.INT, DEFAULT_DESCRIBE_CONFIGS_BATCH_SIZE, ConfigDef.Importance.LOW, DESCRIBE_CONFIGS_BATCH_SIZE_DOC).define(BROKER_REMOVAL_SHUTDOWN_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_BROKER_REMOVAL_SHUTDOWN_MS, ConfigDef.Importance.LOW, BROKER_REMOVAL_SHUTDOWN_MS_DOC).define(BROKER_REPLICA_EXCLUSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, DEFAULT_BROKER_REPLICA_EXCLUSION_TIMEOUT_MS, ConfigDef.Importance.LOW, BROKER_REPLICA_EXCLUSION_TIMEOUT_MS_DOC).define(DESCRIBE_BROKER_REPLICA_EXCLUSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, DEFAULT_DESCRIBE_BROKER_REPLICA_EXCLUSION_TIMEOUT_MS, ConfigDef.Importance.LOW, DESCRIBE_BROKER_REPLICA_EXCLUSION_TIMEOUT_MS_DOC).define(MAX_VOLUME_THROUGHPUT_MB_CONFIG, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM, MAX_VOLUME_THROUGHPUT_MB_DOC).define(WRITE_THROUGHPUT_MULTIPLIER_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_WRITE_THROUGHPUT_MULTIPLIER, ConfigDef.Importance.MEDIUM, WRITE_THROUGHPUT_MULTIPLIER_DOC).define(READ_THROUGHPUT_MULTIPLIER_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_READ_THROUGHPUT_MULTIPLIER, ConfigDef.Importance.MEDIUM, READ_THROUGHPUT_MULTIPLIER_DOC).define(CALCULATED_THROTTLE_RATIO_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(0.8d), ConfigDef.Importance.MEDIUM, CALCULATED_THROTTLE_RATIO_DOC).define(DISK_READ_RATIO_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(0.2d), ConfigDef.Importance.MEDIUM, DISK_READ_RATIO_DOC).define(OVERRIDE_STATIC_THROTTLES_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.HIGH, OVERRIDE_STATIC_THROTTLES_DOC).define(DYNAMIC_THROTTLING_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, DYNAMIC_THROTTLING_ENABLED_DEFAULT, ConfigDef.Importance.HIGH, DYNAMIC_THROTTLING_ENABLED_DOC).define(SELF_HEALING_BROKER_FAILURE_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.HIGH, SELF_HEALING_BROKER_FAILURE_ENABLED_DOC).define(SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.HIGH, SELF_HEALING_GOAL_VIOLATION_ENABLED_DOC).define(BROKER_FAILURE_ALERT_THRESHOLD_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_BROKER_FAILURE_ALERT_THRESHOLD_MS, ConfigDef.Importance.LOW, BROKER_FAILURE_ALERT_THRESHOLD_MS_DOC).define(BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS, ConfigDef.Importance.HIGH, BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_DOC).define("confluent.balancer.plan.computation.retry.timeout.ms", ConfigDef.Type.LONG, PLAN_COMPUTATION_RETRY_TIMEOUT_MS_DEFAULT, ConfigDef.Importance.LOW, "The time, in milliseconds, after which plan computation will give up on retrying failures due to insufficient metrics or ongoing reassignments. Note that this only applies to the plan computation used for broker removal and broker addition operations.").define(INVALID_REPLICA_ASSIGNMENT_RETRY_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, INVALID_REPLICA_ASSIGNMENT_RETRY_TIMEOUT_MS_DEFAULT, ConfigDef.Importance.LOW, INVALID_REPLICA_ASSIGNMENT_RETRY_TIMEOUT_MS_DOC).define(EXECUTOR_REFRESH_TIME_MS_CONFIG, ConfigDef.Type.LONG, EXECUTOR_REFRESH_TIME_MS_DEFAULT, ConfigDef.Importance.LOW, EXECUTOR_REFRESH_TIME_MS_DOC).define(SHUTDOWN_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, 15000, ConfigDef.Importance.LOW, SHUTDOWN_TIMEOUT_MS_DOC).define(CC_STARTUP_RETRY_DURATION_HOURS_CONFIG, ConfigDef.Type.INT, 2, ConfigDef.Importance.LOW, CC_STARTUP_RETRY_DURATION_HOURS_DOC).define(CC_STARTUP_RETRY_DELAY_MINUTES_CONFIG, ConfigDef.Type.INT, 5, ConfigDef.Importance.LOW, CC_STARTUP_RETRY_DELAY_MINUTES_DOC).define("incremental.balancing.enabled", ConfigDef.Type.BOOLEAN, INCREMENTAL_BALANCING_ENABLED_DEFAULT, ConfigDef.Importance.HIGH, "A boolean value controlling whether to use incremental balancing strategy or not.").define("max.capacity.balancing.delta.percentage", ConfigDef.Type.DOUBLE, CAPACITY_BALANCING_DELTA_PERCENTAGE_DEFAULT, ConfigDef.Importance.MEDIUM, "The extra percentage of total capacity that SBC will attempt to balance below the allowed capacity once it gets triggered to fix a broker hitting its maximum allowed capacity for a given resource.").define("incremental.balancing.goals", ConfigDef.Type.LIST, INCREMENTAL_BALANCING_DEFAULT_GOALS_LIST, ConfigDef.Importance.HIGH, "A list of case insensitive goals in the order of priority. The high priority goals will be executed first. This list of goals will be used for anomaly detection and balancing if incremental.balancing.enabled is set to true.").define("incremental.balancing.step.ratio", ConfigDef.Type.DOUBLE, INCREMENTAL_BALANCING_STEP_RATIO_DEFAULT, ConfigDef.Importance.HIGH, "A value in the interval [0, 1] representing the step size of each incremental balancing round. For example, if an overloaded broker has 70% CPU usage and the mean CPU usage across the cluster is 50%, setting incremental.balancing.step.ratio to 0.2 will generate incremental balancing proposals that moves enough replicas off from the overloaded broker so that its CPU usage drops by 20% of the delta: (70% - 50%) * 0.2 = 4%.").define("incremental.balancing.lower.bound", ConfigDef.Type.DOUBLE, INCREMENTAL_BALANCING_LOWER_BOUND_DEFAULT, ConfigDef.Importance.HIGH, "A value in the interval [0, 1] representing the lower bound of replica movements from each incremental balancing round to prevent the long tail effect of incremental balancing. Take CPU usage for example, setting incremental.balancing.lower.bound to 0.02 will cause each incremental balancing round generates enough movements so that the change in CPU usage is at least 0.02 * 100% (Capacity of CPU usage) = 2% of the capacity. Combining with incremental.balancing.step.ratio, these two configurations facilitate the success criteria of an incremental balancing round.").define("incremental.balancing.min.valid.windows", ConfigDef.Type.INT, 5, ConfigDef.Importance.MEDIUM, "An positive integer denoting the minimum number of the metric windows required by the incremental balancing algorithm to compute a cluster balancing plan. Setting this to a higher value will cause incremental algorithm to wait for more metric windows to be available before computing a balancing plan. This can help reducing the metric spikes by smoothing the metric values at the expense of longer time interval between each balancing round. The value cannot be greater than the total number of metric windows kept by SBC.").define("self.healing.maximum.rounds", ConfigDef.Type.INT, 1, ConfigDef.Importance.MEDIUM, "A positive integer denoting the maximum number of balancing rounds that one self-healing operation can contain. One balancing round is defined as one execution of the self-healing goal. We consider a self-healing operation as successful if every broker is balanced within the rebalancing resource utilization threshold OR maximum number of self-healing goal execution iteration has been reached.").define(BalancerConfigs.BALANCER_INCREMENTAL_CPU_BALANCING_TOP_PROPOSAL_TRACKING_NUM_PROPOSALS_SBC_CONFIG, ConfigDef.Type.INT, BalancerConfigs.BALANCER_INCREMENTAL_CPU_BALANCING_TOP_PROPOSAL_TRACKING_NUM_PROPOSALS_DEFAULT, ConfigDef.Importance.LOW, BalancerConfigs.BALANCER_INCREMENTAL_CPU_BALANCING_TOP_PROPOSAL_TRACKING_NUM_PROPOSALS_DOC).define(BalancerConfigs.BALANCER_INCREMENTAL_CPU_BALANCING_TOP_PROPOSAL_TRACKING_ENABLED_SBC_CONFIG, ConfigDef.Type.BOOLEAN, BalancerConfigs.BALANCER_INCREMENTAL_CPU_BALANCING_TOP_PROPOSAL_TRACKING_ENABLED_DEFAULT, ConfigDef.Importance.LOW, BalancerConfigs.BALANCER_INCREMENTAL_CPU_BALANCING_TOP_PROPOSAL_TRACKING_ENABLED_DOC).define("cpu.utilization.detector.enabled", ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "Specify if cpu optimization detector is enabled").define("disk.utilization.detector.enabled", ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "Specify if disk optimization detector is enabled").define("v2.addition.enabled", ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "A boolean denoting whether the version 2 of the broker addition operation is enabled. Because V2 broker addition relies exclusively on self healing to add the broker to the cluster, its enablement also depends on incremental balancing being enabled ('confluent.balancer.incremental.balancing.enabled'). To enable V2 broker addition, configure both this config and 'confluent.balancer.incremental.balancing.enabled' to true.").define("resource.utilization.detector.interval.ms", ConfigDef.Type.INT, 60000, ConfigDef.Importance.LOW, "The interval in milliseconds that the resource optimization detector will run.").define("cpu.utilization.detector.duration.ms", ConfigDef.Type.INT, 600000, ConfigDef.Importance.LOW, "The duration in milliseconds for which resource utilization values will be accumulated before the decision to raise an alert is made.").define("cpu.utilization.detector.overutilization.threshold", ConfigDef.Type.DOUBLE, Double.valueOf(80.0d), ConfigDef.Importance.LOW, "CPU usage percentage representing overutilization.").define("cpu.utilization.detector.underutilization.threshold", ConfigDef.Type.DOUBLE, Double.valueOf(50.0d), ConfigDef.Importance.LOW, "CPU usage percentage representing underutilization.").define("broker.addition.mean.cpu.percent.completion.threshold", ConfigDef.Type.DOUBLE, Double.valueOf(0.5d), ConfigDef.Importance.LOW, "A double denoting what percentage of the cluster's mean CPU utilization a single broker needs to reach in order to be considered added to the cluster.Once a broker reaches this portion of the cluster's mean cpu utilization threshold, we will consider the broker addition operation for that particular broker complete. For example,If a cluster has mean CPU Utilization of 50%, a new broker is added and threshold is .5, then we would consider the broker addition complete once the broker reaches 25%(50% * .5) CPU Utilization.").define("broker.addition.elapsed.time.ms.completion.threshold", ConfigDef.Type.INT, 57600000, ConfigDef.Importance.LOW, "An integer denoting the milliseconds after which a broker addition operation should be considered complete.This is a safety measure to fall back to completion if the target 'broker.addition.mean.cpu.percent.completion.threshold' CPU threshold cannot be reached in due time.").define("disk.utilization.detector.duration.ms", ConfigDef.Type.INT, 600000, ConfigDef.Importance.LOW, "The duration in milliseconds for which disk utilization values will be accumulated before the decision to raise an alert is made.").define("disk.utilization.detector.overutilization.threshold", ConfigDef.Type.DOUBLE, Double.valueOf(80.0d), ConfigDef.Importance.LOW, "Disk usage percentage representing overutilization.").define("disk.utilization.detector.underutilization.threshold", ConfigDef.Type.DOUBLE, Double.valueOf(35.0d), ConfigDef.Importance.LOW, "Disk usage percentage representing underutilization.").define("disk.utilization.detector.reserved.capacity", ConfigDef.Type.DOUBLE, Double.valueOf(150000.0d), ConfigDef.Importance.LOW, "Minimum disk space beyond which we don't shrink the cluster even if it is underutilized.").define("topic.partition.movement.expiration.ms", ConfigDef.Type.LONG, 10800000L, ConfigDef.Importance.MEDIUM, "The duration in milliseconds for which a topic partition movement will be kept for observability and oscillation prevention. Effectively, SBC maintains a rolling window of topic partition movements whose length is defined by this configuration.").define("topic.partition.maximum.movements", ConfigDef.Type.INT, 5, ConfigDef.Importance.MEDIUM, "Maximum number of repeated movements allowed for one TopicPartition in the rolling window whose length is defined by topic.partition.movement.expiration.ms").define("topic.partition.suspension.ms", ConfigDef.Type.LONG, 18000000L, ConfigDef.Importance.MEDIUM, "The duration in milliseconds for which a topic partition will be suspended for goal optimization if number of repeated movements within the rolling window defined by topic.partition.movement.expiration.ms exceeds the maximum allowed number defined by topic.partition.maximum.movements.").define(REQUEST_CONTRIBUTION_WEIGHT_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(0.8d), ConfigDef.Range.between(Double.valueOf(0.1d), Double.valueOf(1.0d)), ConfigDef.Importance.LOW, REQUEST_CONTRIBUTION_WEIGHT_DOC).define(BYTES_CONTRIBUTION_WEIGHT_CONFIG, ConfigDef.Type.DOUBLE, Double.valueOf(0.2d), ConfigDef.Range.between(Double.valueOf(0.1d), Double.valueOf(1.0d)), ConfigDef.Importance.LOW, BYTES_CONTRIBUTION_WEIGHT_DOC).define("confluent.cells.enable", ConfigDef.Type.BOOLEAN, ENABLE_CELLS_DEFAULT, ConfigDef.Importance.HIGH, "Whether to enable the cells feature (sub-flags also need to be enabled to turn on cells)").withClientSslSupport().withClientSaslSupport();

    public Map<String, Object> mergedConfigValues() {
        Map<String, Object> originals = originals();
        values().forEach((str, obj) -> {
            if (obj != null) {
                originals.put(str, obj);
            }
        });
        return originals;
    }

    @Override // org.apache.kafka.common.config.AbstractConfig
    public <T> T getConfiguredInstance(String str, Class<T> cls) {
        T t = (T) super.getConfiguredInstance(str, cls);
        if (t instanceof CruiseControlConfigurable) {
            ((CruiseControlConfigurable) t).configure(mergedConfigValues());
        }
        return t;
    }

    @Override // org.apache.kafka.common.config.AbstractConfig
    public <T> List<T> getConfiguredInstances(String str, Class<T> cls) {
        List<T> configuredInstances = super.getConfiguredInstances(str, cls);
        for (T t : configuredInstances) {
            if (t instanceof CruiseControlConfigurable) {
                ((CruiseControlConfigurable) t).configure(mergedConfigValues());
            }
        }
        return configuredInstances;
    }

    @Override // org.apache.kafka.common.config.AbstractConfig
    public <T> List<T> getConfiguredInstances(String str, Class<T> cls, Map<String, Object> map) {
        List<T> configuredInstances = super.getConfiguredInstances(str, cls, map);
        Map<String, ?> mergedConfigValues = mergedConfigValues();
        mergedConfigValues.putAll(map);
        for (T t : configuredInstances) {
            if (t instanceof CruiseControlConfigurable) {
                ((CruiseControlConfigurable) t).configure(mergedConfigValues);
            }
        }
        return configuredInstances;
    }

    @Override // org.apache.kafka.common.config.AbstractConfig
    public <T> T getConfiguredInstance(String str, Class<T> cls, Map<String, Object> map) {
        Class<?> cls2 = getClass(str);
        try {
            Object newInstance = cls2.newInstance();
            if (!cls.isInstance(newInstance)) {
                throw new IllegalArgumentException(cls2.getName() + " is not an instance of " + cls.getName());
            }
            T cast = cls.cast(newInstance);
            if (cast instanceof CruiseControlConfigurable) {
                Map<String, ?> mergedConfigValues = mergedConfigValues();
                mergedConfigValues.putAll(map);
                ((CruiseControlConfigurable) cast).configure(mergedConfigValues);
            }
            return cast;
        } catch (IllegalAccessException e) {
            throw new IllegalArgumentException("Could not instantiate class " + cls2.getName(), e);
        } catch (InstantiationException e2) {
            throw new IllegalArgumentException("Could not instantiate class " + cls2.getName() + " Does it have a public no-argument constructor?", e2);
        } catch (NullPointerException e3) {
            throw new IllegalArgumentException("Attempt to get configured instance of null configuration " + cls.getName(), e3);
        }
    }

    private void sanityCheckGoalNames() {
        List<String> list = getList("goals");
        if (list.isEmpty()) {
            throw new ConfigException("Attempt to configure goals configuration with an empty list of goals.");
        }
        List<String> list2 = getList(INTRA_BROKER_GOALS_CONFIG);
        if (list2.isEmpty()) {
            throw new ConfigException("Attempt to configure intra-broker goals configuration with an empty list of goals.");
        }
        TreeSet treeSet = new TreeSet(String.CASE_INSENSITIVE_ORDER);
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            if (!treeSet.add(it.next().replaceAll(".*\\.", ""))) {
                throw new ConfigException("Attempt to configure goals with case sensitive names.");
            }
        }
        Iterator<String> it2 = list2.iterator();
        while (it2.hasNext()) {
            if (!treeSet.add(it2.next().replaceAll(".*\\.", ""))) {
                throw new ConfigException("Attempt to configure intra-broker goals with case sensitive names.");
            }
        }
        List<String> list3 = getList("anomaly.detection.goals");
        if (list3.stream().anyMatch(str -> {
            return !list.contains(str);
        })) {
            throw new ConfigException("Attempt to configure anomaly detection goals as a superset of self healing goals {}.", list3.toString());
        }
        boolean booleanValue = getBoolean("incremental.balancing.enabled").booleanValue();
        List<String> list4 = getList("incremental.balancing.goals");
        if (booleanValue && list4.isEmpty()) {
            throw new ConfigException("Attempt to enable incremental balancing with an empty list of incremental balancing goals.");
        }
    }

    private void sanityCheckSamplingPeriod() {
        long longValue = samplingIntervalWindowMs().longValue();
        int intValue = getInt(METADATA_CLIENT_TIMEOUT_MS_CONFIG).intValue();
        if (intValue > longValue) {
            throw new ConfigException("Attempt to set metadata refresh timeout [" + intValue + "] to be longer than sampling interval window which is simply the maximum window size [" + longValue + "].");
        }
    }

    public KafkaCruiseControlConfig(Map<?, ?> map) {
        super(CONFIG, map);
        sanityCheckGoalNames();
        sanityCheckSamplingPeriod();
    }

    public KafkaCruiseControlConfig(Map<?, ?> map, boolean z) {
        super(CONFIG, map, z);
        sanityCheckGoalNames();
        sanityCheckSamplingPeriod();
    }

    public KafkaCruiseControlConfig clone(String str, Object obj) {
        return clone(Collections.singletonMap(str, obj));
    }

    public KafkaCruiseControlConfig clone(Map<String, Object> map) {
        Map<String, Object> originals = originals();
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            originals.put(entry.getKey(), entry.getValue());
        }
        return new KafkaCruiseControlConfig(originals);
    }

    public UpdatableSbcGoalsConfig updatableSbcGoalsConfig() {
        if (this.updatableSbcGoalsConfig == null) {
            List<String> list = getList("goals");
            List<String> list2 = getList("anomaly.detection.goals");
            boolean booleanValue = getBoolean("incremental.balancing.enabled").booleanValue();
            this.updatableSbcGoalsConfig = new UpdatableSbcGoalsConfig(SbcGoalsConfig.builder().rebalancingGoals(list).triggeringGoals(list2).incrementalBalancingEnabled(booleanValue).incrementalBalancingGoals(getList("incremental.balancing.goals")).build(this));
        }
        return this.updatableSbcGoalsConfig;
    }

    public static String internalGoalsConfigName(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -2085056516:
                if (str.equals(BalancerConfigs.BALANCER_TRIGGERING_GOALS_SBC_CONFIG)) {
                    z = 3;
                    break;
                }
                break;
            case -2007929200:
                if (str.equals(BalancerConfigs.BALANCER_REBALANCING_GOALS_CONFIG)) {
                    z = false;
                    break;
                }
                break;
            case -617855484:
                if (str.equals(BalancerConfigs.BALANCER_TRIGGERING_GOALS_CONFIG)) {
                    z = 2;
                    break;
                }
                break;
            case -246520936:
                if (str.equals(BalancerConfigs.BALANCER_REBALANCING_GOALS_SBC_CONFIG)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
                return "goals";
            case true:
            case true:
                return "anomaly.detection.goals";
            default:
                throw new IllegalArgumentException(String.format("The provided config name %s was not recognized.", str));
        }
    }

    public static Integer shutdownTimeoutMs(Map<String, Object> map) {
        return (Integer) map.getOrDefault(SHUTDOWN_TIMEOUT_MS_CONFIG, 15000);
    }

    public Long samplingIntervalWindowMs() {
        return getLong(PARTITION_METRICS_WINDOW_MS_CONFIG);
    }
}
