package io.confluent.ksql.util;

import io.confluent.ksql.config.ConfigItem;
import io.confluent.ksql.config.KsqlConfigResolver;
import io.confluent.ksql.configdef.ConfigValidators;
import io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.model.SemanticVersion;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.testing.EffectivelyImmutable;
import io.confluent.shaded.com.google.common.base.Splitter;
import io.confluent.shaded.com.google.common.collect.ImmutableList;
import io.confluent.shaded.com.google.common.collect.ImmutableMap;
import io.confluent.shaded.com.google.common.collect.ImmutableSet;
import io.confluent.shaded.io.netty.handler.traffic.AbstractTrafficShapingHandler;
import io.confluent.shaded.io.vertx.core.http.HttpClientOptions;
import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.streams.StreamsConfig;

@EffectivelyImmutable
/* loaded from: input_file:io/confluent/ksql/util/KsqlConfig.class */
public class KsqlConfig extends AbstractConfig {
    public static final String KSQL_CONFIG_PROPERTY_PREFIX = "ksql.";
    public static final String KSQL_FUNCTIONS_PROPERTY_PREFIX = "ksql.functions.";
    static final String KSQ_FUNCTIONS_GLOBAL_PROPERTY_PREFIX = "ksql.functions._global_.";
    public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
    public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
    public static final String KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY = "ksql.internal.topic.replicas";
    public static final String KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY = "ksql.internal.topic.min.insync.replicas";
    public static final String KSQL_INTERNAL_METRIC_COLLECTORS_CONFIG = "ksql.internal.metric.collectors";
    public static final String KSQL_INTERNAL_METRICS_CONFIG = "ksql.internal.metrics";
    public static final String KSQL_INTERNAL_STREAMS_ERROR_COLLECTOR_CONFIG = "ksql.internal.streams.error.collector";
    public static final String KSQL_SCHEMA_REGISTRY_PREFIX = "ksql.schema.registry.";
    public static final String SCHEMA_REGISTRY_URL_PROPERTY = "ksql.schema.registry.url";
    public static final String KSQL_CONNECT_PREFIX = "ksql.connect.";
    public static final String CONNECT_URL_PROPERTY = "ksql.connect.url";
    public static final String CONNECT_WORKER_CONFIG_FILE_PROPERTY = "ksql.connect.worker.config";
    public static final String CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY = "ksql.connect.basic.auth.credentials.source";
    public static final String BASIC_AUTH_CREDENTIALS_USERNAME = "username";
    public static final String BASIC_AUTH_CREDENTIALS_PASSWORD = "password";
    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm";
    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_HTTPS = "https";
    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_NONE = "none";
    public static final String CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY = "ksql.connect.basic.auth.credentials.file";
    public static final String CONNECT_BASIC_AUTH_CREDENTIALS_RELOAD_PROPERTY = "ksql.connect.basic.auth.credentials.reload";
    public static final String CONNECT_REQUEST_TIMEOUT_MS = "ksql.connect.request.timeout.ms";
    private static final String CONNECT_REQUEST_TIMEOUT_MS_DOC = "Timeout, in milliseconds, used for each of the connection timeout and request timeout for connector requests issued by ksqlDB.";
    public static final String CONNECT_REQUEST_HEADERS_PLUGIN = "ksql.connect.request.headers.plugin";
    private static final String CONNECT_REQUEST_HEADERS_PLUGIN_DOC = "Custom extension to allow for more fine-grained control of connector requests made by ksqlDB. Extensions should implement the ConnectRequestHeadersExtension interface.";
    public static final String KSQL_ENABLE_UDFS = "ksql.udfs.enabled";
    public static final String KSQL_EXT_DIR = "ksql.extension.dir";
    public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY = "ksql.sink.window.change.log.additional.retention";
    public static final String FAIL_ON_DESERIALIZATION_ERROR_CONFIG = "ksql.fail.on.deserialization.error";
    public static final String FAIL_ON_PRODUCTION_ERROR_CONFIG = "ksql.fail.on.production.error";
    public static final String KSQL_SERVICE_ID_CONFIG = "ksql.service.id";
    public static final String KSQL_SERVICE_ID_DEFAULT = "default_";
    public static final String KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG = "ksql.persistent.prefix";
    public static final String KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT = "query_";
    public static final String KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC = "Prefixes persistent queries with this value.";
    public static final String KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG = "ksql.transient.prefix";
    public static final String KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT = "transient_";
    public static final String KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG = "ksql.output.topic.name.prefix";
    private static final String KSQL_OUTPUT_TOPIC_NAME_PREFIX_DOCS = "A prefix to add to any output topic names, where the statement does not include an explicit topic name. E.g. given 'ksql.output.topic.name.prefix = \"thing-\"', then statement 'CREATE STREAM S AS ...' will create a topic 'thing-S', where as the statement 'CREATE STREAM S WITH(KAFKA_TOPIC = 'foo') AS ...' will create a topic 'foo'.";
    public static final String KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG = "ksql.query.persistent.active.limit";
    private static final int KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT = Integer.MAX_VALUE;
    private static final String KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DOC = "An upper limit on the number of active, persistent queries that may be running at a time, in interactive mode. Once this limit is reached, any further persistent queries will not be accepted.";
    public static final String KSQL_DEFAULT_KEY_FORMAT_CONFIG = "ksql.persistence.default.format.key";
    private static final String KSQL_DEFAULT_KEY_FORMAT_DEFAULT = "KAFKA";
    private static final String KSQL_DEFAULT_KEY_FORMAT_DOC = "Key format that will be used by default if none is specified in the WITH properties of CREATE STREAM/TABLE statements.";
    public static final String KSQL_DEFAULT_VALUE_FORMAT_CONFIG = "ksql.persistence.default.format.value";
    private static final String KSQL_DEFAULT_VALUE_FORMAT_DOC = "Value format that will be used by default if none is specified in the WITH properties of CREATE STREAM/TABLE statements.";
    public static final String KSQL_WRAP_SINGLE_VALUES = "ksql.persistence.wrap.single.values";
    public static final String KSQL_QUERYANONYMIZER_ENABLED = "ksql.queryanonymizer.logs_enabled";
    private static final String KSQL_QUERYANONYMIZER_ENABLED_DOC = "This defines whether we log anonymized queries out of query logger or if we log themin plain text. Defaults to true";
    public static final String KSQL_QUERYANONYMIZER_CLUSTER_NAMESPACE = "ksql.queryanonymizer.cluster_namespace";
    private static final String KSQL_QUERYANONYMIZER_CLUSTER_NAMESPACE_DOC = "Namespace used in the query anonymization process, representing cluster id and organization id respectively. For example, 'clusterid.orgid'.";
    public static final String KSQL_CUSTOM_METRICS_TAGS = "ksql.metrics.tags.custom";
    private static final String KSQL_CUSTOM_METRICS_TAGS_DOC = "A list of tags to be included with emitted JMX metrics, formatted as a string of key:value pairs separated by commas. For example, 'key1:value1,key2:value2'.";
    public static final String KSQL_CUSTOM_METRICS_EXTENSION = "ksql.metrics.extension";
    private static final String KSQL_CUSTOM_METRICS_EXTENSION_DOC = "Extension for supplying custom metrics to be emitted along with the engine's default JMX metrics";
    public static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8081";
    public static final String DEFAULT_CONNECT_URL = "http://localhost:8083";
    public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";
    public static final String KSQL_COLLECT_UDF_METRICS = "ksql.udf.collect.metrics";
    public static final String KSQL_UDF_SECURITY_MANAGER_ENABLED = "ksql.udf.enable.security.manager";
    public static final String KSQL_INSERT_INTO_VALUES_ENABLED = "ksql.insert.into.values.enabled";
    public static final String DEFAULT_EXT_DIR = "ext";
    public static final String KSQL_SECURITY_EXTENSION_CLASS = "ksql.security.extension.class";
    public static final String KSQL_SECURITY_EXTENSION_DOC = "A KSQL security extension class that provides authorization to KSQL servers.";
    public static final String KSQL_ENABLE_ACCESS_VALIDATOR = "ksql.access.validator.enable";
    public static final String KSQL_ACCESS_VALIDATOR_ON = "on";
    public static final String KSQL_ACCESS_VALIDATOR_OFF = "off";
    public static final String KSQL_ACCESS_VALIDATOR_AUTO = "auto";
    public static final String KSQL_ACCESS_VALIDATOR_DOC = "Config to enable/disable the topic access validator, which checks that KSQL can access the involved topics before committing to execute a statement. Possible values are \"on\", \"off\", and \"auto\". Setting to \"on\" enables the validator. Setting to \"off\" disables the validator. If set to \"auto\", KSQL will attempt to discover whether the Kafka cluster supports the required API, and enables the validator if it does.";
    public static final String KSQL_PULL_QUERIES_ENABLE_CONFIG = "ksql.pull.queries.enable";
    public static final String KSQL_QUERY_PULL_ENABLE_DOC = "Config to enable or disable transient pull queries on a specific KSQL server.";
    public static final boolean KSQL_QUERY_PULL_ENABLE_DEFAULT = true;
    public static final String KSQL_QUERY_PULL_ENABLE_STANDBY_READS = "ksql.query.pull.enable.standby.reads";
    private static final String KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DOC = "Config to enable/disable forwarding pull queries to standby hosts when the active is dead. This means that stale values may be returned for these queries since standby hostsreceive updates from the changelog topic (to which the active writes to) asynchronously. Turning on this configuration, effectively sacrifices consistency for higher availability.  Possible values are \"true\", \"false\". Setting to \"true\" guarantees high availability for pull queries. If set to \"false\", pull queries will fail whenthe active is dead and until a new active is elected. Default value is \"false\". For using this functionality, the server must be configured with to ksql.streams.num.standby.replicas >= 1";
    public static final boolean KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DEFAULT = false;
    public static final String KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_CONFIG = "ksql.query.pull.max.allowed.offset.lag";
    private static final String KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_DOC = "Controls the maximum lag tolerated by a pull query against a table. This is applied to all hosts storing it, both active and standbys included. This can be overridden per query or set in the CLI. It's only enabled when lag.reporting.enable is true. By default, any amount of lag is is allowed.";
    public static final String KSQL_QUERY_PULL_METRICS_ENABLED = "ksql.query.pull.metrics.enabled";
    public static final String KSQL_QUERY_PULL_METRICS_ENABLED_DOC = "Config to enable/disable collecting JMX metrics for pull queries.";
    public static final String KSQL_QUERY_PULL_MAX_QPS_CONFIG = "ksql.query.pull.max.qps";
    public static final String KSQL_QUERY_PULL_MAX_QPS_DOC = "The maximum qps allowed for pull queries on this host. Once the limit is hit, queries will fail immediately";
    public static final String KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_CONFIG = "ksql.query.pull.max.concurrent.requests";
    public static final String KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DOC = "The maximum number of concurrent requests allowed for pull queries on this host. Once the limit is hit, queries will fail immediately";
    public static final String KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG = "ksql.query.pull.max.hourly.bandwidth.megabytes";
    public static final String KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC = "The maximum amount of pull query bandwidth in megabytes allowed over a period of one hour. Once the limit is hit, queries will fail immediately";
    public static final String KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG = "ksql.query.push.v2.max.hourly.bandwidth.megabytes";
    public static final String KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC = "The maximum amount of v2 push query bandwidth in megabytes allowed over a period of one hour. Once the limit is hit, queries will fail immediately";
    public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG = "ksql.query.pull.thread.pool.size";
    public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_DOC = "Size of thread pool used for coordinating pull queries";
    public static final String KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_CONFIG = "ksql.query.pull.router.thread.pool.size";
    public static final String KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_DOC = "Size of thread pool used for routing pull queries";
    public static final String KSQL_QUERY_PULL_TABLE_SCAN_ENABLED = "ksql.query.pull.table.scan.enabled";
    public static final String KSQL_QUERY_PULL_TABLE_SCAN_ENABLED_DOC = "Config to enable pull queries that scan over the data";
    public static final boolean KSQL_QUERY_PULL_TABLE_SCAN_ENABLED_DEFAULT = true;
    public static final String KSQL_QUERY_STREAM_PULL_QUERY_ENABLED = "ksql.query.pull.stream.enabled";
    public static final String KSQL_QUERY_STREAM_PULL_QUERY_ENABLED_DOC = "Config to enable pull queries on streams";
    public static final boolean KSQL_QUERY_STREAM_PULL_QUERY_ENABLED_DEFAULT = true;
    public static final String KSQL_QUERY_PULL_RANGE_SCAN_ENABLED = "ksql.query.pull.range.scan.enabled";
    public static final String KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DOC = "Config to enable range scans on table for pull queries";
    public static final boolean KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DEFAULT = true;
    public static final String KSQL_QUERY_PULL_INTERPRETER_ENABLED = "ksql.query.pull.interpreter.enabled";
    public static final String KSQL_QUERY_PULL_INTERPRETER_ENABLED_DOC = "Enables whether we use the interpreter for expression evaluation for pull queries, or thedefault code generator. They should produce the same results, but the interpreter is much faster for short-lived queries.";
    public static final boolean KSQL_QUERY_PULL_INTERPRETER_ENABLED_DEFAULT = true;
    public static final boolean KSQL_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR_ENABLED_DEFAULT = false;
    public static final String KSQL_QUERY_PULL_LIMIT_CLAUSE_ENABLED = "ksql.query.pull.limit.clause.enabled";
    public static final String KSQL_QUERY_PULL_LIMIT_CLAUSE_ENABLED_DOC = "Enables the use of LIMIT clause in pull queries";
    public static final boolean KSQL_QUERY_PULL_LIMIT_CLAUSE_ENABLED_DEFAULT = true;
    public static final String KSQL_QUERY_PUSH_V2_ENABLED = "ksql.query.push.v2.enabled";
    public static final String KSQL_QUERY_PUSH_V2_ENABLED_DOC = "Enables whether v2 push queries are enabled. The scalable form of push queries require no window functions, aggregations, or joins, but may include projections and filters. If they cannot be utilized, the streams application version will automatically be used instead.";
    public static final boolean KSQL_QUERY_PUSH_V2_ENABLED_DEFAULT = false;
    public static final String KSQL_QUERY_PUSH_V2_REGISTRY_INSTALLED = "ksql.query.push.v2.registry.installed";
    public static final String KSQL_QUERY_PUSH_V2_REGISTRY_INSTALLED_DOC = "Enables whether v2 push registry should be installed. This is a requirement of enabling scalable push queries using ksql.query.push.v2.enabled.";
    public static final boolean KSQL_QUERY_PUSH_V2_REGISTRY_INSTALLED_DEFAULT = false;
    public static final String KSQL_QUERY_PUSH_V2_ALOS_ENABLED = "ksql.query.push.v2.alos.enabled";
    public static final String KSQL_QUERY_PUSH_V2_ALOS_ENABLED_DOC = "Whether at-least-once semantics are enabled for the scalable form of push queries. This means that a query will replay data if necessary if network or other disruptions cause it to miss any data";
    public static final boolean KSQL_QUERY_PUSH_V2_ALOS_ENABLED_DEFAULT = true;
    public static final String KSQL_QUERY_PUSH_V2_INTERPRETER_ENABLED = "ksql.query.push.v2.interpreter.enabled";
    public static final String KSQL_QUERY_PUSH_V2_INTERPRETER_ENABLED_DOC = "Enables whether we use the interpreter for expression evaluation for scalable push queries, or the default code generator. They should produce the same results, but may have different performance characteristics.";
    public static final boolean KSQL_QUERY_PUSH_V2_INTERPRETER_ENABLED_DEFAULT = true;
    public static final String KSQL_QUERY_PUSH_V2_NEW_LATEST_DELAY_MS = "ksql.query.push.v2.new.latest.delay.ms";
    public static final String KSQL_QUERY_PUSH_V2_NEW_LATEST_DELAY_DOC = "The delay in ms for a new latest consumer to start processing records.If new nodes are added to the cluster, a longer delay makes it less likely thatstragglers requests will miss a record and be forced to catchup.";
    public static final long KSQL_QUERY_PUSH_V2_NEW_LATEST_DELAY_DEFAULT = 5000;
    public static final String KSQL_QUERY_PUSH_V2_LATEST_RESET_AGE_MS = "ksql.query.push.v2.latest.reset.age.ms";
    public static final String KSQL_QUERY_PUSH_V2_LATEST_RESET_AGE_MS_DOC = "The maximum age in ms of existing committed offsets for latest consumer to adopt those offsets rather than seek to the end.";
    public static final long KSQL_QUERY_PUSH_V2_LATEST_RESET_AGE_MS_DEFAULT = 30000;
    public static final String KSQL_QUERY_PUSH_V2_CONTINUATION_TOKENS_ENABLED = "ksql.query.push.v2.continuation.tokens.enabled";
    public static final String KSQL_QUERY_PUSH_V2_CONTINUATION_TOKENS_ENABLED_DOC = "Whether to output continuation tokens to the user.";
    public static final boolean KSQL_QUERY_PUSH_V2_CONTINUATION_TOKENS_ENABLED_DEFAULT = false;
    public static final String KSQL_QUERY_PUSH_V2_MAX_CATCHUP_CONSUMERS = "ksql.query.push.v2.max.catchup.consumers";
    public static final String KSQL_QUERY_PUSH_V2_MAX_CATCHUP_CONSUMERS_DOC = "The maximum number of concurrent catchup consumers.";
    public static final int KSQL_QUERY_PUSH_V2_MAX_CATCHUP_CONSUMERS_DEFAULT = 5;
    public static final String KSQL_QUERY_PUSH_V2_CATCHUP_CONSUMER_MSG_WINDOW = "ksql.query.push.v2.catchup.consumer.msg.window";
    public static final String KSQL_QUERY_PUSH_V2_CATCHUP_CONSUMER_MSG_WINDOW_DOC = "How close the catchup consumer must be to the latest before it will stop the latest to join with it.";
    public static final int KSQL_QUERY_PUSH_V2_CATCHUP_CONSUMER_MSG_WINDOW_DEFAULT = 50;
    public static final String KSQL_QUERY_PUSH_V2_METRICS_ENABLED = "ksql.query.push.v2.metrics.enabled";
    public static final String KSQL_QUERY_PUSH_V2_METRICS_ENABLED_DOC = "Config to enable/disable collecting JMX metrics for push queries v2.";
    public static final String KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG = "ksql.streams.shutdown.timeout.ms";
    public static final String KSQL_SHUTDOWN_TIMEOUT_MS_DOC = "Timeout in milliseconds to block waiting for the underlying streams instance to exit";
    public static final String KSQL_AUTH_CACHE_EXPIRY_TIME_SECS = "ksql.authorization.cache.expiry.time.secs";
    public static final String KSQL_AUTH_CACHE_EXPIRY_TIME_SECS_DOC = "Time in seconds to keep KSQL authorization responses in the cache. (The cache is disabled if 0 or a negative number is set).";
    public static final String KSQL_AUTH_CACHE_MAX_ENTRIES = "ksql.authorization.cache.max.entries";
    public static final String KSQL_AUTH_CACHE_MAX_ENTRIES_DOC = "Controls the size of the cache to a maximum number of KSQL authorization responses entries.";
    public static final String KSQL_HIDDEN_TOPICS_CONFIG = "ksql.hidden.topics";
    public static final String KSQL_HIDDEN_TOPICS_DEFAULT = "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses";
    public static final String KSQL_HIDDEN_TOPICS_DOC = "Comma-separated list of topics that will be hidden. Entries in the list may be literal topic names or [Java regular expressions](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html). For example, `_confluent.*` will match any topic whose name starts with the `_confluent`).\nHidden topics will not visible when running the `SHOW TOPICS` command unless `SHOW ALL TOPICS` is used.\nThe default value hides known system topics from Kafka and Confluent products.\nKSQL also marks its own internal topics as hidden. This is not controlled by this config.";
    public static final String KSQL_READONLY_TOPICS_CONFIG = "ksql.readonly.topics";
    public static final String KSQL_READONLY_TOPICS_DEFAULT = "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses";
    public static final String KSQL_READONLY_TOPICS_DOC = "Comma-separated list of topics that should be marked as read-only. Entries in the list may be literal topic names or [Java regular expressions](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html). For example, `_confluent.*` will match any topic whose name starts with the `_confluent`).\nRead-only topics cannot be modified by any KSQL command.\nThe default value marks known system topics from Kafka and Confluent products as read-only.\nKSQL also marks its own internal topics as read-only. This is not controlled by this config.";
    public static final String KSQL_TIMESTAMP_THROW_ON_INVALID = "ksql.timestamp.throw.on.invalid";
    public static final String KSQL_TIMESTAMP_THROW_ON_INVALID_DOC = "If an incoming message contains an invalid timestamp, ksqlDB will log a warning and continue. To disable this behavior, and instead throw an exception to ensure that no data is missed, set ksql.timestamp.throw.on.invalid to true.";
    public static final String KSQL_ERROR_CLASSIFIER_REGEX_PREFIX = "ksql.error.classifier.regex";
    public static final String KSQL_CREATE_OR_REPLACE_ENABLED = "ksql.create.or.replace.enabled";
    public static final String KSQL_CREATE_OR_REPLACE_ENABLED_DOC = "Feature flag for CREATE OR REPLACE";
    public static final String KSQL_METASTORE_BACKUP_LOCATION = "ksql.metastore.backup.location";
    public static final String KSQL_METASTORE_BACKUP_LOCATION_DEFAULT = "";
    public static final String KSQL_METASTORE_BACKUP_LOCATION_DOC = "Specify the directory where KSQL metastore backup files are located.";
    public static final String KSQL_SUPPRESS_ENABLED = "ksql.suppress.enabled";
    public static final String KSQL_SUPPRESS_ENABLED_DOC = "Feature flag for suppression, specifically EMIT FINAL";
    public static final String KSQL_LAMBDAS_ENABLED = "ksql.lambdas.enabled";
    public static final String KSQL_LAMBDAS_ENABLED_DOC = "Feature flag for lambdas. Default is true. If true, lambdas are processed normally, if false, new lambda queries won't be processed but any existing lambda queries are unaffected.";
    public static final String KSQL_HEADERS_COLUMNS_ENABLED = "ksql.headers.columns.enabled";
    public static final String KSQL_HEADERS_COLUMNS_ENABLED_DOC = "Feature flag that allows the use of kafka headers columns on streams and tables. If false, the HEADERS and HEADER(<key>) columns constraints won't be allowed in CREATE statements. Current CREATE statements found in the KSQL command topic that contains headers columns will work with the headers functionality to prevent a degraded command topic situation when restarting ksqlDB.";
    public static final String KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED = "ksql.json_sr.converter.deserializer.enabled";
    private static final String KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DOC = "Feature flag that enables the use of the JsonSchemaConverter class for deserializing JSON_SR records. JsonSchemaConverter is required to support `anyOf` JSON_SR types. This flag should be used to disable this feature only when users experience deserialization issues caused by the JsonSchemaConverter. Otherwise, this flag should remain true to take advantage of the new `anyOf` types and other JSON_SR serde fixes.";
    public static final String KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED = "ksql.source.table.materialization.enabled";
    private static final String KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DOC = "Feature flag that enables table materialization on source tables. Default is true. If false, CREATE SOURCE [TABLE|STREAM] statements will be rejected. Current CREATE SOURCE TABLE statements found in the KSQL command topic will not be materialized and pull queries won't be allowed on them. However, current CREATE SOURCE [TABLE|STREAM] statements will continue being read-only.";
    public static final String KSQL_SHARED_RUNTIME_ENABLED = "ksql.runtime.feature.shared.enabled";
    public static final String KSQL_SHARED_RUNTIME_ENABLED_DOC = "Feature flag for sharing streams runtimes. Default is false. If false, persistent queries will use separate  runtimes, if true, new queries may share streams instances.";
    public static final String KSQL_NEW_QUERY_PLANNER_ENABLED = "ksql.new.query.planner.enabled";
    private static final String KSQL_NEW_QUERY_PLANNER_ENABLED_DOC = "Feature flag that enables the new planner for persistent queries. Default is false.";
    public static final String KSQL_SHARED_RUNTIMES_COUNT = "ksql.shared.runtimes.count";
    public static final String KSQL_SHARED_RUNTIMES_COUNT_DOC = "Controls how many runtimes queries are allocated over initially.this is only used when ksql.runtime.feature.shared.enabled is true.";
    public static final String KSQL_SUPPRESS_BUFFER_SIZE_BYTES = "ksql.suppress.buffer.size.bytes";
    public static final String KSQL_SUPPRESS_BUFFER_SIZE_BYTES_DOC = "Bound the number of bytes that the buffer can use for suppression. Negative size means the buffer will be unbounded. If the maximum capacity is exceeded, the query will be terminated";
    public static final String KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS = "ksql.query.retry.backoff.initial.ms";
    public static final String KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS_DOC = "The initial amount of time to wait before attempting to retry a persistent query in ERROR state.";
    public static final String KSQL_QUERY_RETRY_BACKOFF_MAX_MS = "ksql.query.retry.backoff.max.ms";
    public static final String KSQL_QUERY_RETRY_BACKOFF_MAX_MS_DOC = "The maximum amount of time to wait before attempting to retry a persistent query in ERROR state.";
    public static final String KSQL_QUERY_ERROR_MAX_QUEUE_SIZE = "ksql.query.error.max.queue.size";
    public static final String KSQL_QUERY_ERROR_MAX_QUEUE_SIZE_DOC = "The maximum number of error messages (per query) to hold in the internal query errors queue and displayin the query description when executing the `EXPLAIN <query>` command.";
    public static final String KSQL_QUERY_STATUS_RUNNING_THRESHOLD_SECS = "ksql.query.status.running.threshold.seconds";
    private static final String KSQL_QUERY_STATUS_RUNNING_THRESHOLD_SECS_DOC = "Amount of time in seconds to wait before setting a restarted query status as healthy (or running).";
    public static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST = "ksql.properties.overrides.denylist";
    private static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC = "Comma-separated list of properties that KSQL users cannot override.";
    public static final String KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING = "ksql.query.persistent.max.bytes.buffering.total";
    public static final long KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING_DEFAULT = -1;
    public static final String KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING_DOC = "Limit on the total bytes used by Kafka Streams cache across all persistent queries";
    public static final String KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING_TRANSIENT = "ksql.query.transient.max.bytes.buffering.total";
    public static final long KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING_TRANSIENT_DEFAULT = -1;
    public static final String KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING_TRANSIENT_DOC = "Limit on the total bytes used by Kafka Streams cache across all transient queries";
    public static final String KSQL_VARIABLE_SUBSTITUTION_ENABLE = "ksql.variable.substitution.enable";
    public static final boolean KSQL_VARIABLE_SUBSTITUTION_ENABLE_DEFAULT = true;
    public static final String KSQL_VARIABLE_SUBSTITUTION_ENABLE_DOC = "Enable variable substitution on SQL statements.";
    public static final String KSQL_QUERY_CLEANUP_SHUTDOWN_TIMEOUT_MS = "ksql.query.cleanup.shutdown.timeout.ms";
    public static final long KSQL_QUERY_CLEANUP_SHUTDOWN_TIMEOUT_MS_DEFAULT = 30000;
    public static final String KSQL_QUERY_CLEANUP_SHUTDOWN_TIMEOUT_MS_DOC = "The total time that the query cleanup spends trying to clean things up on shutdown.";
    public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE = "ksql.transient.query.cleanup.service.enable";
    public static final boolean KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE_DEFAULT = true;
    public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE_DOC = "Enable transient query cleanup service.";
    public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS = "ksql.transient.query.cleanup.service.initial.delay.seconds";
    public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS_DOC = "The time to delay the first execution of the transient query cleanup service in seconds.";
    public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS = "ksql.transient.query.cleanup.service.period.seconds";
    public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC = "the period between successive executions of the transient query cleanup service.";
    public static final String KSQL_ENDPOINT_MIGRATE_QUERY_CONFIG = "ksql.endpoint.migrate.query";
    private static final boolean KSQL_ENDPOINT_MIGRATE_QUERY_DEFAULT = true;
    private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC = "Migrates the /query endpoint to use the same handler as /query-stream.";
    public static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS = "ksql.assert.topic.default.timeout.ms";
    private static final int KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DEFAULT = 1000;
    private static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC = "The default timeout for ASSERT TOPIC statements if no timeout is specified in the statement.";
    public static final String KSQL_ASSERT_SCHEMA_DEFAULT_TIMEOUT_MS = "ksql.assert.schema.default.timeout.ms";
    private static final int KSQL_ASSERT_SCHEMA_DEFAULT_TIMEOUT_MS_DEFAULT = 1000;
    private static final String KSQL_ASSERT_SCHEMA_DEFAULT_TIMEOUT_MS_DOC = "The default timeout for ASSERT SCHEMA statements if no timeout is specified in the statement.";
    public static final String KSQL_WEBSOCKET_CONNECTION_MAX_TIMEOUT_MS = "ksql.websocket.connection.max.timeout.ms";
    public static final long KSQL_WEBSOCKET_CONNECTION_MAX_TIMEOUT_MS_DEFAULT = 3600000;
    public static final String KSQL_WEBSOCKET_CONNECTION_MAX_TIMEOUT_MS_DOC = "If this config is set to a positive number, then ksqlDB will terminate websocket connections after a timeout. The timeout will be the lower of the auth token's lifespan (if present) and the value of this config. If this config is set to 0, then ksqlDB will not close websockets even if the token has an expiration time.";
    private final Map<String, ConfigValue> ksqlStreamConfigProps;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KsqlConfig.class);
    private static final String TELEMETRY_PREFIX = "confluent.telemetry";
    private static final Set<String> REPORTER_CONFIGS_PREFIXES = ImmutableSet.of(TELEMETRY_PREFIX, "metrics.context.");
    public static final String BASIC_AUTH_CREDENTIALS_SOURCE_NONE = "NONE";
    public static final String BASIC_AUTH_CREDENTIALS_SOURCE_FILE = "FILE";
    private static final ConfigDef.ValidString BASIC_AUTH_CREDENTIALS_SOURCE_VALIDATOR = ConfigDef.ValidString.in(new String[]{BASIC_AUTH_CREDENTIALS_SOURCE_NONE, BASIC_AUTH_CREDENTIALS_SOURCE_FILE});
    public static final Long CONNECT_REQUEST_TIMEOUT_DEFAULT = 5000L;
    public static final String KSQL_SECURITY_EXTENSION_DEFAULT = null;
    public static final Long KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_DEFAULT = Long.MAX_VALUE;
    public static final Integer KSQL_QUERY_PULL_MAX_QPS_DEFAULT = Integer.MAX_VALUE;
    public static final Integer KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DEFAULT = Integer.MAX_VALUE;
    public static final Integer KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT = Integer.MAX_VALUE;
    public static final Integer KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT = Integer.MAX_VALUE;
    public static final Integer KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT = 50;
    public static final Integer KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_DEFAULT = 50;
    public static final Long KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT = 300000L;
    public static final Long KSQL_AUTH_CACHE_EXPIRY_TIME_SECS_DEFAULT = 30L;
    public static final Long KSQL_AUTH_CACHE_MAX_ENTRIES_DEFAULT = 10000L;
    public static final Boolean KSQL_TIMESTAMP_THROW_ON_INVALID_DEFAULT = false;
    public static final String KSQL_ERROR_CLASSIFIER_REGEX_PREFIX_DOC = "Any configuration with the regex prefix will create a new classifier that will be configured to classify anything that matches the content as the specified type. The value must match <TYPE><whitespace><REGEX> (for example ksql.error.classifier.regex.invalid=\"USER .*InvalidTopicException.*\"). The type can be one of " + GrammaticalJoiner.or().join(Arrays.stream(QueryError.Type.values())) + " and the regex pattern will be matched against the error class name and message of any uncaught error and subsequent error causes in the Kafka Streams applications.";
    public static final Boolean KSQL_CREATE_OR_REPLACE_ENABLED_DEFAULT = true;
    public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = true;
    public static final Boolean KSQL_LAMBDAS_ENABLED_DEFAULT = true;
    public static final Boolean KSQL_HEADERS_COLUMNS_ENABLED_DEFAULT = true;
    private static final Boolean KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DEFAULT = true;
    private static final Boolean KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DEFAULT = true;
    public static final Boolean KSQL_SHARED_RUNTIME_ENABLED_DEFAULT = false;
    private static final Boolean KSQL_NEW_QUERY_PLANNER_ENABLED_DEFAULT = false;
    public static final Integer KSQL_SHARED_RUNTIMES_COUNT_DEFAULT = 2;
    public static final Long KSQL_SUPPRESS_BUFFER_SIZE_BYTES_DEFAULT = -1L;
    public static final Long KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS_DEFAULT = Long.valueOf(AbstractTrafficShapingHandler.DEFAULT_MAX_TIME);
    public static final Long KSQL_QUERY_RETRY_BACKOFF_MAX_MS_DEFAULT = 900000L;
    public static final Integer KSQL_QUERY_ERROR_MAX_QUEUE_SIZE_DEFAULT = 10;
    private static final Integer KSQL_QUERY_STATUS_RUNNING_THRESHOLD_SECS_DEFAULT = 300;
    public static final Integer KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS_DEFAULT = 600;
    public static final Integer KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DEFAULT = 600;
    public static final String KSQL_STRING_CASE_CONFIG_TOGGLE = "ksql.cast.strings.preserve.nulls";
    public static final String KSQL_STRING_CASE_CONFIG_TOGGLE_DOC = "When casting a SQLType to string, if false, use String.valueof(), else if true useObjects.toString()";
    public static final String KSQL_NESTED_ERROR_HANDLING_CONFIG = "ksql.nested.error.set.null";
    public static final String KSQL_NESTED_ERROR_HANDLING_CONFIG_DOC = "If there is a processing error in an element of a map, struct or array, if true set only the failing element to null and preserve the rest of the value, else if false, set the the entire value to null.";
    public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS = ImmutableList.of(new CompatibilityBreakingConfigDef(KSQL_STRING_CASE_CONFIG_TOGGLE, ConfigDef.Type.BOOLEAN, false, true, ConfigDef.Importance.LOW, Optional.empty(), KSQL_STRING_CASE_CONFIG_TOGGLE_DOC), new CompatibilityBreakingConfigDef(KSQL_NESTED_ERROR_HANDLING_CONFIG, ConfigDef.Type.BOOLEAN, false, true, ConfigDef.Importance.LOW, Optional.empty(), KSQL_NESTED_ERROR_HANDLING_CONFIG_DOC));
    private static final Collection<CompatibilityBreakingStreamsConfig> COMPATIBILITY_BREAKING_STREAMS_CONFIGS = ImmutableList.of(new CompatibilityBreakingStreamsConfig("topology.optimization", "all", "all"));
    public static final ConfigDef CURRENT_DEF = buildConfigDef(ConfigGeneration.CURRENT);
    public static final ConfigDef LEGACY_DEF = buildConfigDef(ConfigGeneration.LEGACY);
    public static final Set<String> SSL_CONFIG_NAMES = sslConfigNames();
    public static final Set<String> STREAM_TOPIC_CONFIG_NAMES = streamTopicConfigNames();

    /* loaded from: input_file:io/confluent/ksql/util/KsqlConfig$CompatibilityBreakingConfigDef.class */
    public static class CompatibilityBreakingConfigDef {
        private final String name;
        private final ConfigDef.Type type;
        private final Object defaultValueLegacy;
        private final Object defaultValueCurrent;
        private final ConfigDef.Importance importance;
        private final String documentation;
        private final Optional<SemanticVersion> since;
        private final ConfigDef.Validator validator;

        CompatibilityBreakingConfigDef(String str, ConfigDef.Type type, Object obj, Object obj2, ConfigDef.Importance importance, Optional<SemanticVersion> optional, String str2) {
            this(str, type, obj, obj2, importance, str2, optional, null);
        }

        CompatibilityBreakingConfigDef(String str, ConfigDef.Type type, Object obj, Object obj2, ConfigDef.Importance importance, String str2, Optional<SemanticVersion> optional, ConfigDef.Validator validator) {
            this.name = (String) Objects.requireNonNull(str, "name");
            this.type = (ConfigDef.Type) Objects.requireNonNull(type, ProcessingLogMessageSchema.TYPE);
            this.defaultValueLegacy = obj;
            this.defaultValueCurrent = obj2;
            this.importance = (ConfigDef.Importance) Objects.requireNonNull(importance, "importance");
            this.documentation = (String) Objects.requireNonNull(str2, "documentation");
            this.since = (Optional) Objects.requireNonNull(optional, "since");
            this.validator = validator;
        }

        public String getName() {
            return this.name;
        }

        public Optional<SemanticVersion> since() {
            return this.since;
        }

        public Object getCurrentDefaultValue() {
            return this.defaultValueCurrent;
        }

        private void define(ConfigDef configDef, Object obj) {
            configDef.define(this.name, this.type, obj, this.validator, this.importance, this.documentation);
        }

        void defineLegacy(ConfigDef configDef) {
            define(configDef, this.defaultValueLegacy);
        }

        void defineCurrent(ConfigDef configDef) {
            define(configDef, this.defaultValueCurrent);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/util/KsqlConfig$CompatibilityBreakingStreamsConfig.class */
    public static final class CompatibilityBreakingStreamsConfig {
        final String name;
        final Object defaultValueLegacy;
        final Object defaultValueCurrent;

        CompatibilityBreakingStreamsConfig(String str, Object obj, Object obj2) {
            this.name = (String) Objects.requireNonNull(str);
            if (!StreamsConfig.configDef().names().contains(str)) {
                throw new IllegalArgumentException(String.format("%s is not a valid streams config", str));
            }
            this.defaultValueLegacy = obj;
            this.defaultValueCurrent = obj2;
        }

        String getName() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/util/KsqlConfig$ConfigGeneration.class */
    public enum ConfigGeneration {
        LEGACY,
        CURRENT
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/util/KsqlConfig$ConfigValue.class */
    public static final class ConfigValue {
        final ConfigItem configItem;
        final String key;
        final Object value;

        private ConfigValue(ConfigItem configItem, String str, Object obj) {
            this.configItem = configItem;
            this.key = str;
            this.value = obj;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isResolved() {
            return this.configItem.isResolved();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String convertToObfuscatedString() {
            return this.configItem.convertToString(this.value);
        }
    }

    public static KsqlConfig empty() {
        return new KsqlConfig(ImmutableMap.of());
    }

    private static ConfigDef configDef(ConfigGeneration configGeneration) {
        return configGeneration == ConfigGeneration.CURRENT ? CURRENT_DEF : LEGACY_DEF;
    }

    private static ConfigDef buildConfigDef(ConfigGeneration configGeneration) {
        ConfigDef withClientSslSupport = new ConfigDef().define(KSQL_SERVICE_ID_CONFIG, ConfigDef.Type.STRING, KSQL_SERVICE_ID_DEFAULT, ConfigDef.Importance.MEDIUM, "Indicates the ID of the ksql service. It will be used as prefix for all implicitly named resources created by this instance in Kafka. By convention, the id should end in a seperator character of some form, e.g. a dash or underscore, as this makes identifiers easier to read.").define(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT, ConfigDef.Importance.MEDIUM, "Second part of the prefix for transient queries. For instance if the prefix is transient_ the query name would be ksql_transient_4120896722607083946_1509389010601 where 'ksql_' is the first prefix and '_transient' is the second part of the prefix for the query id the third and 4th parts are a random long value and the current timestamp. ").define(KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, KSQL_OUTPUT_TOPIC_NAME_PREFIX_DOCS).define(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY, ConfigDef.Type.LONG, Long.valueOf(KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention), ConfigDef.Importance.MEDIUM, "The default window change log additional retention time. This is a streams config value which will be added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day").define(SCHEMA_REGISTRY_URL_PROPERTY, ConfigDef.Type.STRING, "", new ConfigDef.NonNullValidator(), ConfigDef.Importance.MEDIUM, "The URL for the schema registry").define(CONNECT_URL_PROPERTY, ConfigDef.Type.STRING, DEFAULT_CONNECT_URL, ConfigDef.Importance.MEDIUM, "The URL for the connect deployment, defaults to http://localhost:8083").define(CONNECT_WORKER_CONFIG_FILE_PROPERTY, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, "The path to a connect worker configuration file. An empty value for this configurationwill prevent connect from starting up embedded within KSQL. For more information on configuring connect, see https://docs.confluent.io/current/connect/userguide.html#configuring-workers.").define(CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY, ConfigDef.Type.STRING, BASIC_AUTH_CREDENTIALS_SOURCE_NONE, BASIC_AUTH_CREDENTIALS_SOURCE_VALIDATOR, ConfigDef.Importance.LOW, "If providing explicit basic auth credentials for ksqlDB to use when sending connector requests, this config specifies how credentials should be loaded. Valid options are 'FILE' in order to specify the username and password in a properties file, or 'NONE' to indicate that custom basic auth should not be used. If 'NONE', ksqlDB will forward the auth header, if present, on the incoming ksql request to Connect.").define(CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, "If 'ksql.connect.basic.auth.credentials.source' is set to 'FILE', then this config specifies the path to the credentials file.").define(CONNECT_BASIC_AUTH_CREDENTIALS_RELOAD_PROPERTY, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "If true, basic auth credentials for connector auth will automatically reload on file change (creation or modification). File deletion is not monitored and old credentials will continue to be used in this case.").define(CONNECT_REQUEST_TIMEOUT_MS, ConfigDef.Type.LONG, CONNECT_REQUEST_TIMEOUT_DEFAULT, ConfigDef.Importance.LOW, CONNECT_REQUEST_TIMEOUT_MS_DOC).define(CONNECT_REQUEST_HEADERS_PLUGIN, ConfigDef.Type.CLASS, (Object) null, ConfigDef.Importance.LOW, CONNECT_REQUEST_HEADERS_PLUGIN_DOC).define(KSQL_ENABLE_UDFS, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, "Whether or not custom UDF jars found in the ext dir should be loaded. Default is true ").define(KSQL_COLLECT_UDF_METRICS, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "Whether or not metrics should be collected for custom udfs. Default is false. Note: this will add some overhead to udf invocation. It is recommended that this  be set to false in production.").define(KSQL_EXT_DIR, ConfigDef.Type.STRING, DEFAULT_EXT_DIR, ConfigDef.Importance.LOW, "The path to look for and load extensions such as UDFs from.").define(KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY, ConfigDef.Type.SHORT, (short) 1, ConfigDef.Importance.MEDIUM, "The replication factor for the internal topics of KSQL server.").define(KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY, ConfigDef.Type.SHORT, (short) 1, ConfigDef.Importance.MEDIUM, "The minimum number of insync replicas for the internal topics of KSQL server.").define(KSQL_UDF_SECURITY_MANAGER_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, "Enable the security manager for UDFs. Default is true and will stop UDFs from calling System.exit or executing processes").define(KSQL_INSERT_INTO_VALUES_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, "Enable the INSERT INTO ... VALUES functionality.").define(KSQL_SECURITY_EXTENSION_CLASS, ConfigDef.Type.CLASS, KSQL_SECURITY_EXTENSION_DEFAULT, ConfigDef.Importance.LOW, KSQL_SECURITY_EXTENSION_DOC).define(KSQL_DEFAULT_KEY_FORMAT_CONFIG, ConfigDef.Type.STRING, KSQL_DEFAULT_KEY_FORMAT_DEFAULT, ConfigDef.Importance.LOW, KSQL_DEFAULT_KEY_FORMAT_DOC).define(KSQL_DEFAULT_VALUE_FORMAT_CONFIG, ConfigDef.Type.STRING, (Object) null, ConfigDef.Importance.LOW, KSQL_DEFAULT_VALUE_FORMAT_DOC).define(KSQL_WRAP_SINGLE_VALUES, ConfigDef.Type.BOOLEAN, (Object) null, ConfigDef.Importance.LOW, "Controls how KSQL will serialize a value whose schema contains only a single column. The setting only sets the default for `CREATE STREAM`, `CREATE TABLE`, `CREATE STREAM AS SELECT`, `CREATE TABLE AS SELECT` and `INSERT INTO` statements, where `WRAP_SINGLE_VALUE` is not provided explicitly in the statement." + System.lineSeparator() + "When set to true, KSQL will persist the single column nested with a STRUCT, for formats that support them. When set to false KSQL will persist the column as the anonymous values." + System.lineSeparator() + "For example, if the value contains only a single column 'FOO INT' and the format is JSON,  and this setting is `false`, then KSQL will persist the value as an unnamed JSON number, e.g. '10'. Where as, if this setting is `true`, KSQL will persist the value as a JSON document with a single numeric property, e.g. '{\"FOO\": 10}." + System.lineSeparator() + "Note: the DELIMITED format ignores this setting as it does not support the concept of a STRUCT, record or object.").define(KSQL_CUSTOM_METRICS_TAGS, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, KSQL_CUSTOM_METRICS_TAGS_DOC).define(KSQL_QUERYANONYMIZER_CLUSTER_NAMESPACE, ConfigDef.Type.STRING, (Object) null, ConfigDef.Importance.LOW, KSQL_QUERYANONYMIZER_CLUSTER_NAMESPACE_DOC).define(KSQL_QUERYANONYMIZER_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERYANONYMIZER_ENABLED_DOC).define(KSQL_CUSTOM_METRICS_EXTENSION, ConfigDef.Type.CLASS, (Object) null, ConfigDef.Importance.LOW, KSQL_CUSTOM_METRICS_EXTENSION_DOC).define(KSQL_ENABLE_ACCESS_VALIDATOR, ConfigDef.Type.STRING, KSQL_ACCESS_VALIDATOR_AUTO, ConfigDef.ValidString.in(new String[]{KSQL_ACCESS_VALIDATOR_ON, KSQL_ACCESS_VALIDATOR_OFF, KSQL_ACCESS_VALIDATOR_AUTO}), ConfigDef.Importance.LOW, KSQL_ACCESS_VALIDATOR_DOC).define(METRIC_REPORTER_CLASSES_CONFIG, ConfigDef.Type.LIST, "", ConfigDef.Importance.LOW, METRIC_REPORTER_CLASSES_DOC).define(KSQL_PULL_QUERIES_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_ENABLE_DOC).define(KSQL_QUERY_PULL_ENABLE_STANDBY_READS, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DOC).define(KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_CONFIG, ConfigDef.Type.LONG, KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_DEFAULT, ConfigValidators.zeroOrPositive(), ConfigDef.Importance.MEDIUM, KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_DOC).define(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT, ConfigDef.Importance.LOW, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC).define(KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, ConfigDef.Type.INT, Integer.MAX_VALUE, ConfigDef.Importance.MEDIUM, KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DOC).define(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT, ConfigDef.Importance.MEDIUM, KSQL_SHUTDOWN_TIMEOUT_MS_DOC).define(KSQL_AUTH_CACHE_EXPIRY_TIME_SECS, ConfigDef.Type.LONG, KSQL_AUTH_CACHE_EXPIRY_TIME_SECS_DEFAULT, ConfigDef.Importance.LOW, KSQL_AUTH_CACHE_EXPIRY_TIME_SECS_DOC).define(KSQL_AUTH_CACHE_MAX_ENTRIES, ConfigDef.Type.LONG, KSQL_AUTH_CACHE_MAX_ENTRIES_DEFAULT, ConfigDef.Importance.LOW, KSQL_AUTH_CACHE_MAX_ENTRIES_DOC).define(KSQL_HIDDEN_TOPICS_CONFIG, ConfigDef.Type.LIST, "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", ConfigValidators.validRegex(), ConfigDef.Importance.LOW, KSQL_HIDDEN_TOPICS_DOC).define(KSQL_READONLY_TOPICS_CONFIG, ConfigDef.Type.LIST, "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", ConfigValidators.validRegex(), ConfigDef.Importance.LOW, KSQL_READONLY_TOPICS_DOC).define(KSQL_TIMESTAMP_THROW_ON_INVALID, ConfigDef.Type.BOOLEAN, KSQL_TIMESTAMP_THROW_ON_INVALID_DEFAULT, ConfigDef.Importance.MEDIUM, KSQL_TIMESTAMP_THROW_ON_INVALID_DOC).define(KSQL_QUERY_PULL_METRICS_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_METRICS_ENABLED_DOC).define(KSQL_QUERY_PULL_MAX_QPS_CONFIG, ConfigDef.Type.INT, KSQL_QUERY_PULL_MAX_QPS_DEFAULT, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_MAX_QPS_DOC).define(KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_CONFIG, ConfigDef.Type.INT, KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DEFAULT, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DOC).define(KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG, ConfigDef.Type.INT, KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT, ConfigDef.Importance.HIGH, KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC).define(KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG, ConfigDef.Type.INT, KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT, ConfigDef.Importance.HIGH, KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC).define(KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG, ConfigDef.Type.INT, KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_THREAD_POOL_SIZE_DOC).define(KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_CONFIG, ConfigDef.Type.INT, KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_DEFAULT, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_DOC).define(KSQL_QUERY_PULL_TABLE_SCAN_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_TABLE_SCAN_ENABLED_DOC).define(KSQL_QUERY_STREAM_PULL_QUERY_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_STREAM_PULL_QUERY_ENABLED_DOC).define(KSQL_QUERY_PULL_RANGE_SCAN_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DOC).define(KSQL_QUERY_PULL_INTERPRETER_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_INTERPRETER_ENABLED_DOC).define(KSQL_QUERY_PULL_LIMIT_CLAUSE_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_PULL_LIMIT_CLAUSE_ENABLED_DOC).define(KSQL_QUERY_PUSH_V2_ENABLED, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_ENABLED_DOC).define(KSQL_QUERY_PUSH_V2_REGISTRY_INSTALLED, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_REGISTRY_INSTALLED_DOC).define(KSQL_QUERY_PUSH_V2_ALOS_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_ALOS_ENABLED_DOC).define(KSQL_QUERY_PUSH_V2_INTERPRETER_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_INTERPRETER_ENABLED_DOC).define(KSQL_QUERY_PUSH_V2_NEW_LATEST_DELAY_MS, ConfigDef.Type.LONG, 5000L, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_NEW_LATEST_DELAY_DOC).define(KSQL_QUERY_PUSH_V2_LATEST_RESET_AGE_MS, ConfigDef.Type.LONG, 30000L, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_LATEST_RESET_AGE_MS_DOC).define(KSQL_QUERY_PUSH_V2_CONTINUATION_TOKENS_ENABLED, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_CONTINUATION_TOKENS_ENABLED_DOC).define(KSQL_QUERY_PUSH_V2_MAX_CATCHUP_CONSUMERS, ConfigDef.Type.INT, 5, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_MAX_CATCHUP_CONSUMERS_DOC).define(KSQL_QUERY_PUSH_V2_CATCHUP_CONSUMER_MSG_WINDOW, ConfigDef.Type.LONG, 50, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_CATCHUP_CONSUMER_MSG_WINDOW_DOC).define(KSQL_QUERY_PUSH_V2_METRICS_ENABLED, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_QUERY_PUSH_V2_METRICS_ENABLED_DOC).define(KSQL_ERROR_CLASSIFIER_REGEX_PREFIX, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, KSQL_ERROR_CLASSIFIER_REGEX_PREFIX_DOC).define(KSQL_CREATE_OR_REPLACE_ENABLED, ConfigDef.Type.BOOLEAN, KSQL_CREATE_OR_REPLACE_ENABLED_DEFAULT, ConfigDef.Importance.LOW, KSQL_CREATE_OR_REPLACE_ENABLED_DOC).define(KSQL_METASTORE_BACKUP_LOCATION, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, KSQL_METASTORE_BACKUP_LOCATION_DOC).define(KSQL_SUPPRESS_ENABLED, ConfigDef.Type.BOOLEAN, KSQL_SUPPRESS_ENABLED_DEFAULT, ConfigDef.Importance.LOW, KSQL_SUPPRESS_ENABLED_DOC).define(KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS, ConfigDef.Type.LONG, KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS_DEFAULT, ConfigDef.Importance.LOW, KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS_DOC).define(KSQL_QUERY_RETRY_BACKOFF_MAX_MS, ConfigDef.Type.LONG, KSQL_QUERY_RETRY_BACKOFF_MAX_MS_DEFAULT, ConfigDef.Importance.LOW, KSQL_QUERY_RETRY_BACKOFF_MAX_MS_DOC).define(KSQL_QUERY_ERROR_MAX_QUEUE_SIZE, ConfigDef.Type.INT, KSQL_QUERY_ERROR_MAX_QUEUE_SIZE_DEFAULT, ConfigDef.Importance.LOW, KSQL_QUERY_ERROR_MAX_QUEUE_SIZE_DOC).define(KSQL_SUPPRESS_BUFFER_SIZE_BYTES, ConfigDef.Type.LONG, KSQL_SUPPRESS_BUFFER_SIZE_BYTES_DEFAULT, ConfigDef.Importance.LOW, KSQL_SUPPRESS_BUFFER_SIZE_BYTES_DOC).define(KSQL_PROPERTIES_OVERRIDES_DENYLIST, ConfigDef.Type.LIST, "", ConfigDef.Importance.LOW, KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC).define(KSQL_QUERY_STATUS_RUNNING_THRESHOLD_SECS, ConfigDef.Type.INT, KSQL_QUERY_STATUS_RUNNING_THRESHOLD_SECS_DEFAULT, ConfigDef.Importance.LOW, KSQL_QUERY_STATUS_RUNNING_THRESHOLD_SECS_DOC).define(KSQL_VARIABLE_SUBSTITUTION_ENABLE, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_VARIABLE_SUBSTITUTION_ENABLE_DOC).define(KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING, ConfigDef.Type.LONG, -1L, ConfigDef.Importance.LOW, KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING_DOC).define(KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING_TRANSIENT, ConfigDef.Type.LONG, -1L, ConfigDef.Importance.LOW, KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING_TRANSIENT_DOC).define(KSQL_LAMBDAS_ENABLED, ConfigDef.Type.BOOLEAN, KSQL_LAMBDAS_ENABLED_DEFAULT, ConfigDef.Importance.LOW, KSQL_LAMBDAS_ENABLED_DOC).define(KSQL_SHARED_RUNTIME_ENABLED, ConfigDef.Type.BOOLEAN, KSQL_SHARED_RUNTIME_ENABLED_DEFAULT, ConfigDef.Importance.MEDIUM, KSQL_SHARED_RUNTIME_ENABLED_DOC).define(KSQL_SHARED_RUNTIMES_COUNT, ConfigDef.Type.INT, KSQL_SHARED_RUNTIMES_COUNT_DEFAULT, ConfigDef.Importance.MEDIUM, KSQL_SHARED_RUNTIMES_COUNT_DOC).define(KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED, ConfigDef.Type.BOOLEAN, KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DEFAULT, ConfigDef.Importance.LOW, KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DOC).define(KSQL_NEW_QUERY_PLANNER_ENABLED, ConfigDef.Type.BOOLEAN, KSQL_NEW_QUERY_PLANNER_ENABLED_DEFAULT, ConfigDef.Importance.LOW, KSQL_NEW_QUERY_PLANNER_ENABLED_DOC).define(KSQL_QUERY_CLEANUP_SHUTDOWN_TIMEOUT_MS, ConfigDef.Type.LONG, 30000L, ConfigDef.Importance.LOW, KSQL_QUERY_CLEANUP_SHUTDOWN_TIMEOUT_MS_DOC).define(KSQL_HEADERS_COLUMNS_ENABLED, ConfigDef.Type.BOOLEAN, KSQL_HEADERS_COLUMNS_ENABLED_DEFAULT, ConfigDef.Importance.LOW, KSQL_HEADERS_COLUMNS_ENABLED_DOC).define(KSQL_ENDPOINT_MIGRATE_QUERY_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_ENDPOINT_MIGRATE_QUERY_DOC).define(KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE_DOC).define(KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS, ConfigDef.Type.INT, KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS_DEFAULT, ConfigDef.Importance.LOW, KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS_DOC).define(KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS, ConfigDef.Type.INT, KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DEFAULT, ConfigDef.Importance.LOW, KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC).define(KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS, ConfigDef.Type.INT, Integer.valueOf(HttpClientOptions.DEFAULT_POOL_CLEANER_PERIOD), ConfigDef.Importance.LOW, KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC).define(KSQL_ASSERT_SCHEMA_DEFAULT_TIMEOUT_MS, ConfigDef.Type.INT, Integer.valueOf(HttpClientOptions.DEFAULT_POOL_CLEANER_PERIOD), ConfigDef.Importance.LOW, KSQL_ASSERT_SCHEMA_DEFAULT_TIMEOUT_MS_DOC).define(KSQL_WEBSOCKET_CONNECTION_MAX_TIMEOUT_MS, ConfigDef.Type.LONG, Long.valueOf(KSQL_WEBSOCKET_CONNECTION_MAX_TIMEOUT_MS_DEFAULT), ConfigDef.Importance.LOW, KSQL_WEBSOCKET_CONNECTION_MAX_TIMEOUT_MS_DOC).define(KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED, ConfigDef.Type.BOOLEAN, KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DEFAULT, ConfigDef.Importance.LOW, KSQL_JSON_SR_CONVERTER_DESERIALIZER_ENABLED_DOC).withClientSslSupport();
        for (CompatibilityBreakingConfigDef compatibilityBreakingConfigDef : COMPATIBLY_BREAKING_CONFIG_DEFS) {
            if (configGeneration == ConfigGeneration.CURRENT) {
                compatibilityBreakingConfigDef.defineCurrent(withClientSslSupport);
            } else {
                compatibilityBreakingConfigDef.defineLegacy(withClientSslSupport);
            }
        }
        return withClientSslSupport;
    }

    public Map<String, Object> originalsWithPrefixOverride(String str) {
        Map originals = originals();
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : originals.entrySet()) {
            if (!isKeyPrefixed((String) entry.getKey(), str)) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        for (Map.Entry entry2 : originals.entrySet()) {
            if (isKeyPrefixed((String) entry2.getKey(), str)) {
                hashMap.put(((String) entry2.getKey()).substring(str.length()), entry2.getValue());
            }
        }
        return hashMap;
    }

    private boolean isKeyPrefixed(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return str.startsWith(str2) && str.length() > str2.length();
    }

    private static void applyStreamsConfig(Map<String, ?> map, Map<String, ConfigValue> map2) {
        map.entrySet().stream().map(entry -> {
            return resolveStreamsConfig((String) entry.getKey(), entry.getValue());
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).forEach(configValue -> {
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Optional<ConfigValue> resolveStreamsConfig(String str, Object obj) {
        String substring = str.startsWith(KSQL_STREAMS_PREFIX) ? str.substring(KSQL_STREAMS_PREFIX.length()) : str;
        return substring.startsWith(KSQL_CONFIG_PROPERTY_PREFIX) ? Optional.empty() : new KsqlConfigResolver().resolve(str, false).map(configItem -> {
            return new ConfigValue(configItem, substring, configItem.parseValue(obj));
        });
    }

    private static Map<String, ConfigValue> buildStreamingConfig(Map<String, ?> map, Map<String, ?> map2) {
        HashMap hashMap = new HashMap();
        applyStreamsConfig(map, hashMap);
        applyStreamsConfig(map2, hashMap);
        return ImmutableMap.copyOf((Map) hashMap);
    }

    public KsqlConfig(Map<?, ?> map) {
        this(ConfigGeneration.CURRENT, map);
    }

    private KsqlConfig(ConfigGeneration configGeneration, Map<?, ?> map) {
        super(configDef(configGeneration), map);
        HashMap hashMap = new HashMap();
        hashMap.put("commit.interval.ms", Long.valueOf(KsqlConstants.defaultCommitIntervalMsConfig));
        hashMap.put("cache.max.bytes.buffering", Long.valueOf(KsqlConstants.defaultCacheMaxBytesBufferingConfig));
        hashMap.put("num.stream.threads", 4);
        if (!getBooleanConfig(FAIL_ON_DESERIALIZATION_ERROR_CONFIG, false)) {
            hashMap.put("default.deserialization.exception.handler", LogMetricAndContinueExceptionHandler.class);
        }
        hashMap.put("default.production.exception.handler", ProductionExceptionHandlerUtil.getHandler(getBooleanConfig(FAIL_ON_PRODUCTION_ERROR_CONFIG, true)));
        COMPATIBILITY_BREAKING_STREAMS_CONFIGS.forEach(compatibilityBreakingStreamsConfig -> {
            hashMap.put(compatibilityBreakingStreamsConfig.name, configGeneration == ConfigGeneration.CURRENT ? compatibilityBreakingStreamsConfig.defaultValueCurrent : compatibilityBreakingStreamsConfig.defaultValueLegacy);
        });
        this.ksqlStreamConfigProps = buildStreamingConfig(hashMap, originalsWithPrefixOverride(KSQL_STREAMS_PREFIX));
    }

    private static Set<String> streamTopicConfigNames() {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        Arrays.stream(TopicConfig.class.getDeclaredFields()).filter(field -> {
            return field.getType() == String.class;
        }).filter(field2 -> {
            return field2.getName().endsWith("_CONFIG");
        }).forEach(field3 -> {
            try {
                builder.add((ImmutableSet.Builder) field3.get(null));
            } catch (IllegalAccessException e) {
                LOG.warn("Could not get the '{}' config from TopicConfig.class. Setting and listing properties with 'ksql.streams.topics.*' from clients will be disabled.", field3.getName());
            }
        });
        return builder.build();
    }

    private boolean getBooleanConfig(String str, boolean z) {
        Object obj = originals().get(str);
        return obj == null ? z : Boolean.parseBoolean(obj.toString());
    }

    private KsqlConfig(ConfigGeneration configGeneration, Map<String, ?> map, Map<String, ConfigValue> map2) {
        super(configDef(configGeneration), map);
        this.ksqlStreamConfigProps = map2;
    }

    public Map<String, Object> getKsqlStreamConfigProps(String str) {
        HashMap hashMap = new HashMap(getKsqlStreamConfigProps());
        hashMap.put("metrics.context.resource.application.id", str);
        hashMap.putAll(addConfluentMetricsContextConfigsKafka(Collections.emptyMap()));
        return Collections.unmodifiableMap(hashMap);
    }

    public Map<String, Object> getKsqlStreamConfigProps() {
        HashMap hashMap = new HashMap();
        for (ConfigValue configValue : this.ksqlStreamConfigProps.values()) {
            hashMap.put(configValue.key, configValue.value);
        }
        return Collections.unmodifiableMap(hashMap);
    }

    public Optional<Object> getKsqlStreamConfigProp(String str) {
        return Optional.ofNullable(this.ksqlStreamConfigProps.get(str)).map(configValue -> {
            return configValue.value;
        });
    }

    public Map<String, Object> getKsqlAdminClientConfigProps() {
        HashMap hashMap = new HashMap();
        hashMap.putAll(getConfigsFor(AdminClientConfig.configNames()));
        hashMap.putAll(addConfluentMetricsContextConfigsKafka(Collections.emptyMap()));
        return Collections.unmodifiableMap(hashMap);
    }

    public Map<String, Object> getProducerClientConfigProps() {
        HashMap hashMap = new HashMap();
        hashMap.putAll(getConfigsFor(ProducerConfig.configNames()));
        hashMap.putAll(addConfluentMetricsContextConfigsKafka(Collections.emptyMap()));
        return Collections.unmodifiableMap(hashMap);
    }

    public Map<String, Object> getConsumerClientConfigProps() {
        HashMap hashMap = new HashMap();
        hashMap.putAll(getConfigsFor(ConsumerConfig.configNames()));
        hashMap.putAll(addConfluentMetricsContextConfigsKafka(Collections.emptyMap()));
        return Collections.unmodifiableMap(hashMap);
    }

    public Map<String, Object> addConfluentMetricsContextConfigsKafka(Map<String, Object> map) {
        HashMap hashMap = new HashMap(map);
        AppInfoParser.AppInfo appInfo = new AppInfoParser.AppInfo(System.currentTimeMillis());
        hashMap.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES));
        hashMap.put(MetricCollectors.RESOURCE_LABEL_VERSION, appInfo.getVersion());
        hashMap.put(MetricCollectors.RESOURCE_LABEL_COMMIT_ID, appInfo.getCommitId());
        return hashMap;
    }

    public Map<String, Object> getProcessingLogConfigProps() {
        return getConfigsFor(ProcessingLogConfig.configNames());
    }

    private Map<String, Object> getConfigsFor(Set<String> set) {
        HashMap hashMap = new HashMap();
        this.ksqlStreamConfigProps.values().stream().filter(configValue -> {
            return set.contains(configValue.key);
        }).forEach(configValue2 -> {
            hashMap.put(configValue2.key, configValue2.value);
        });
        return Collections.unmodifiableMap(hashMap);
    }

    private Map<String, Object> getConfigsForPrefix(Set<String> set) {
        HashMap hashMap = new HashMap();
        this.ksqlStreamConfigProps.values().stream().filter(configValue -> {
            Stream stream = set.stream();
            String str = configValue.key;
            str.getClass();
            return stream.anyMatch(str::startsWith);
        }).forEach(configValue2 -> {
            hashMap.put(configValue2.key, configValue2.value);
        });
        return Collections.unmodifiableMap(hashMap);
    }

    public Map<String, Object> getKsqlFunctionsConfigProps(String str) {
        Map<String, Object> originalsWithPrefix = originalsWithPrefix(KSQL_FUNCTIONS_PROPERTY_PREFIX + str.toLowerCase(), false);
        originalsWithPrefix.putAll(originalsWithPrefix(KSQ_FUNCTIONS_GLOBAL_PROPERTY_PREFIX, false));
        return originalsWithPrefix;
    }

    private Map<String, String> getKsqlConfigPropsWithSecretsObfuscated() {
        HashMap hashMap = new HashMap();
        originalsWithPrefix(KSQL_FUNCTIONS_PROPERTY_PREFIX, false).forEach((str, obj) -> {
        });
        configDef(ConfigGeneration.CURRENT).names().stream().filter(str2 -> {
            return !SSL_CONFIG_NAMES.contains(str2);
        }).forEach(str3 -> {
        });
        return Collections.unmodifiableMap(hashMap);
    }

    private Map<String, String> getKsqlStreamConfigPropsWithSecretsObfuscated() {
        HashMap hashMap = new HashMap();
        this.ksqlStreamConfigProps.values().stream().filter(obj -> {
            return ((ConfigValue) obj).isResolved();
        }).forEach(configValue -> {
        });
        hashMap.putAll(getKsqlStreamTopicConfigProps());
        return Collections.unmodifiableMap(hashMap);
    }

    private Map<String, String> getKsqlStreamTopicConfigProps() {
        HashMap hashMap = new HashMap();
        originalsWithPrefix(KSQL_STREAMS_PREFIX, true).entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith("topic.");
        }).filter(entry2 -> {
            return STREAM_TOPIC_CONFIG_NAMES.contains(((String) entry2.getKey()).substring("topic.".length()));
        }).forEach(entry3 -> {
        });
        return Collections.unmodifiableMap(hashMap);
    }

    public Map<String, String> getAllConfigPropsWithSecretsObfuscated() {
        HashMap hashMap = new HashMap();
        hashMap.putAll(getKsqlConfigPropsWithSecretsObfuscated());
        hashMap.putAll((Map) getKsqlStreamConfigPropsWithSecretsObfuscated().entrySet().stream().collect(Collectors.toMap(entry -> {
            return KSQL_STREAMS_PREFIX + ((String) entry.getKey());
        }, (v0) -> {
            return v0.getValue();
        })));
        return Collections.unmodifiableMap(hashMap);
    }

    public KsqlConfig cloneWithPropertyOverwrite(Map<String, ?> map) {
        HashMap hashMap = new HashMap(originals());
        hashMap.putAll(map);
        return new KsqlConfig(ConfigGeneration.CURRENT, hashMap, buildStreamingConfig(getKsqlStreamConfigProps(), map));
    }

    public KsqlConfig overrideBreakingConfigsWithOriginalValues(Map<String, ?> map) {
        KsqlConfig ksqlConfig = new KsqlConfig(ConfigGeneration.LEGACY, map);
        HashMap hashMap = new HashMap(originals());
        COMPATIBLY_BREAKING_CONFIG_DEFS.stream().map((v0) -> {
            return v0.getName();
        }).forEach(str -> {
            hashMap.put(str, ksqlConfig.get(str));
        });
        HashMap hashMap2 = new HashMap(this.ksqlStreamConfigProps);
        COMPATIBILITY_BREAKING_STREAMS_CONFIGS.stream().map((v0) -> {
            return v0.getName();
        }).forEach(str2 -> {
        });
        return new KsqlConfig(ConfigGeneration.LEGACY, hashMap, hashMap2);
    }

    public Map<String, String> getStringAsMap(String str) {
        return parseStringAsMap(str, getString(str).trim());
    }

    public static Map<String, String> getStringAsMap(String str, Map<String, ?> map) {
        String str2 = (String) map.get(str);
        return str2 != null ? parseStringAsMap(str, str2) : Collections.emptyMap();
    }

    public static Map<String, String> parseStringAsMap(String str, String str2) {
        try {
            return str2.equals("") ? Collections.emptyMap() : Splitter.on(",").trimResults().withKeyValueSeparator(":").split(str2);
        } catch (IllegalArgumentException e) {
            throw new KsqlException(String.format("Invalid config value for '%s'. value: %s. reason: %s", str, str2, e.getMessage()));
        }
    }

    private static Set<String> sslConfigNames() {
        ConfigDef configDef = new ConfigDef();
        SslConfigs.addClientSslSupport(configDef);
        return configDef.names();
    }
}
