package io.confluent.security.audit.provider;

import com.google.protobuf.Message;
import io.confluent.crn.ConfluentCloudCrnAuthority;
import io.confluent.crn.ConfluentServerCrnAuthority;
import io.confluent.crn.CrnAuthorityConfig;
import io.confluent.crn.CrnSyntaxException;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuditLogConfig;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.protobuf.events.auditlog.v2.AuditLog;
import io.confluent.security.audit.AuditLogConfig;
import io.confluent.security.audit.AuditLogEntry;
import io.confluent.security.audit.AuditLogUtils;
import io.confluent.security.audit.kafka.AuditExtractorOptions;
import io.confluent.security.audit.kafka.KafkaRequestToAuditEntry;
import io.confluent.security.audit.router.AuditLogRouter;
import io.confluent.security.audit.router.AuditLogRouterJsonConfig;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.authorizer.provider.ConfluentAuthorizationEvent;
import io.confluent.security.authorizer.utils.ThreadUtils;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.events.EventLogger;
import io.confluent.telemetry.events.EventUtils;
import io.confluent.telemetry.events.Extensions;
import java.time.Duration;
import java.time.ZoneOffset;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.DetailedRequestAuditLogFilter;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.audit.AuditEvent;
import org.apache.kafka.server.audit.AuditEventType;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.audit.AuthenticationEvent;
import org.apache.kafka.server.audit.KafkaRequestEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/security/audit/provider/ConfluentAuditLogProvider.class */
public class ConfluentAuditLogProvider implements AuditLogProvider, ClusterResourceListener {
    public static final String AUTHORIZATION_MESSAGE_TYPE = "io.confluent.kafka.server/authorization";
    public static final String AUTHENTICATION_MESSAGE_TYPE = "io.confluent.kafka.server/authentication";
    public static final String KAFKA_REQUEST_MESSAGE_TYPE = "io.confluent.kafka.server/request";
    private static final String FALLBACK_LOGGER = "io.confluent.security.audit.log.fallback";
    private UnaryOperator<AuditEvent> sanitizer;
    private ConfiguredState configuredState;
    private ExecutorService initExecutor;
    private ConfluentServerCrnAuthority crnAuthority;
    private volatile boolean eventLoggerReady;
    private Scope scope;
    private boolean omitClientAddress;
    private AuditExtractorOptions auditExtractorOptions;
    private AuditLogMetrics auditLogMetrics;
    protected static final Logger log = LoggerFactory.getLogger((Class<?>) ConfluentAuditLogProvider.class);
    private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(30);
    protected final Logger fallbackLog = LoggerFactory.getLogger(FALLBACK_LOGGER);
    private volatile Map<String, Object> originalInterBrokerListenerConfigs = new HashMap();

    /* loaded from: input_file:io/confluent/security/audit/provider/ConfluentAuditLogProvider$AuditLogMetrics.class */
    public class AuditLogMetrics {
        public static final String GROUP_NAME = "confluent-audit-metrics";
        public static final String AUDIT_LOG_RATE_MINUTE = "audit-log-rate-per-minute";
        public static final String AUDIT_LOG_FALLBACK_RATE_MINUTE = "audit-log-fallback-rate-per-minute";
        public static final String AUTHENTICATION_AUDIT_LOG_RATE = "authentication-audit-log-rate";
        public static final String AUTHENTICATION_AUDIT_LOG_FAILURE_RATE = "authentication-audit-log-failure-rate";
        public static final String AUTHORIZATION_AUDIT_LOG_RATE = "authorization-audit-log-rate";
        public static final String AUTHORIZATION_AUDIT_LOG_FAILURE_RATE = "authorization-audit-log-failure-rate";
        public static final String KAFKA_REQUEST_EVENT_AUDIT_LOG_RATE = "kafka-request-event-audit-log-rate";
        public static final String KAFKA_REQUEST_EVENT_AUDIT_LOG_FAILURE_RATE = "kafka-request-event-audit-log-failure-rate";
        private static final String AUDIT_LOG_NORMAL_SENSOR = "audit-log-normal";
        private static final String AUDIT_LOG_FALLBACK_SENSOR = "audit-log-fallback";
        private static final String AUTHENTICATION_AUDIT_LOG_SENSOR = "authentication-audit-log";
        private static final String AUTHENTICATION_AUDIT_LOG_FAILURE_SENSOR = "authentication-audit-log-failure";
        private static final String AUTHORIZATION_AUDIT_LOG_SENSOR = "authorization-audit-log";
        private static final String AUTHORIZATION_AUDIT_LOG_FAILURE_SENSOR = "authorization-audit-log-failure";
        private static final String KAFKA_REQUEST_EVENT_AUDIT_LOG_SENSOR = "kafka-request-event-audit-log";
        private static final String KAFKA_REQUEST_EVENT_AUDIT_LOG_FAILURE_SENSOR = "kafka-request-event-audit-log-failure";
        private Sensor normalAuditSensor = null;
        private Sensor fallbackAuditSensor = null;
        private Sensor authenticationAuditLogSensor = null;
        private Sensor authenticationAuditLogFailureSensor = null;
        private Sensor authorizationAuditLogSensor = null;
        private Sensor authorizationAuditLogFailureSensor = null;
        private Map<ApiKeys, Sensor> kafkaRequestEventAuditLogSensor = new EnumMap(ApiKeys.class);
        private Sensor kafkaRequestEventAuditLogFailureSensor = null;
        private Time time;
        private Metrics metrics;

        AuditLogMetrics(Metrics metrics) {
            this.metrics = metrics;
            setupMetrics();
        }

        AuditLogMetrics(Time time) {
            this.time = time;
            this.metrics = new Metrics(time);
            setupMetrics();
        }

        void recordNormalAuditLogMetrics() {
            this.normalAuditSensor.record();
        }

        void recordFallbackAuditLogMetrics() {
            this.fallbackAuditSensor.record();
        }

        void recordAuthenticationAuditLogMetrics() {
            this.authenticationAuditLogSensor.record();
        }

        void recordAuthenticationAuditLogFailureMetrics() {
            this.authenticationAuditLogFailureSensor.record();
        }

        void recordAuthorizationAuditLogMetrics() {
            this.authorizationAuditLogSensor.record();
        }

        void recordAuthorizationAuditLogFailureMetrics() {
            this.authorizationAuditLogFailureSensor.record();
        }

        void recordKafkaRequestEventAuditLogMetrics(ApiKeys apiKeys, double d) {
            this.kafkaRequestEventAuditLogSensor.get(apiKeys).record(d);
        }

        void recordKafkaRequestAuditLogFailureMetrics(double d) {
            this.kafkaRequestEventAuditLogFailureSensor.record(d);
        }

        Metrics metrics() {
            return this.metrics;
        }

        Time metricsTime() {
            return this.time;
        }

        void setupMetrics() {
            this.normalAuditSensor = this.metrics.sensor(AUDIT_LOG_NORMAL_SENSOR);
            this.normalAuditSensor.add(this.metrics.metricName(AUDIT_LOG_RATE_MINUTE, GROUP_NAME, "The number of audit log per minute"), new Rate(TimeUnit.MINUTES, new WindowedCount()));
            this.fallbackAuditSensor = this.metrics.sensor(AUDIT_LOG_FALLBACK_SENSOR);
            this.fallbackAuditSensor.add(this.metrics.metricName(AUDIT_LOG_FALLBACK_RATE_MINUTE, GROUP_NAME, "The number of audit log fallback per minute"), new Rate(TimeUnit.MINUTES, new WindowedCount()));
            this.authenticationAuditLogSensor = this.metrics.sensor(AUTHENTICATION_AUDIT_LOG_SENSOR);
            this.authenticationAuditLogSensor.add(this.metrics.metricName(AUTHENTICATION_AUDIT_LOG_RATE, GROUP_NAME, "The number of authentication audit log per second"), new Rate());
            this.authenticationAuditLogFailureSensor = this.metrics.sensor(AUTHENTICATION_AUDIT_LOG_FAILURE_SENSOR);
            this.authenticationAuditLogFailureSensor.add(this.metrics.metricName(AUTHENTICATION_AUDIT_LOG_FAILURE_RATE, GROUP_NAME, "The number of authentication audit log failure per second"), new Rate());
            this.authorizationAuditLogSensor = this.metrics.sensor(AUTHORIZATION_AUDIT_LOG_SENSOR);
            this.authorizationAuditLogSensor.add(this.metrics.metricName(AUTHORIZATION_AUDIT_LOG_RATE, GROUP_NAME, "The number of authorization audit log per second"), new Rate());
            this.authorizationAuditLogFailureSensor = this.metrics.sensor(AUTHORIZATION_AUDIT_LOG_FAILURE_SENSOR);
            this.authorizationAuditLogFailureSensor.add(this.metrics.metricName(AUTHORIZATION_AUDIT_LOG_FAILURE_RATE, GROUP_NAME, "The number of authorization audit log failure per second"), new Rate());
            this.kafkaRequestEventAuditLogFailureSensor = this.metrics.sensor(KAFKA_REQUEST_EVENT_AUDIT_LOG_FAILURE_SENSOR);
            this.kafkaRequestEventAuditLogFailureSensor.add(this.metrics.metricName(KAFKA_REQUEST_EVENT_AUDIT_LOG_FAILURE_RATE, GROUP_NAME, "The number of kafka request event audit log failure per second"), new Rate());
            addKafkaRequestEventMetrics();
        }

        void addKafkaRequestEventMetrics() {
            EnumSet copyOf = EnumSet.copyOf((EnumSet) DetailedRequestAuditLogFilter.SUPPORTED_APIS_MGMT_OPERATIONS);
            copyOf.addAll(DetailedRequestAuditLogFilter.SUPPORTED_APIS_PRODUCE_CONSUME);
            Iterator it = copyOf.iterator();
            while (it.hasNext()) {
                ApiKeys apiKeys = (ApiKeys) it.next();
                this.kafkaRequestEventAuditLogSensor.put(apiKeys, this.metrics.sensor("kafka-request-event-audit-log-" + apiKeys.name()));
                this.kafkaRequestEventAuditLogSensor.get(apiKeys).add(this.metrics.metricName(KAFKA_REQUEST_EVENT_AUDIT_LOG_RATE, GROUP_NAME, "The number of kafka request event audit log per second", "api-key", apiKeys.name()), new Rate());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/security/audit/provider/ConfluentAuditLogProvider$ConfiguredState.class */
    public class ConfiguredState {
        final EventLogger logger;
        final AuditLogRouter router;
        final AuditLogConfig config;

        private ConfiguredState(EventLogger eventLogger, AuditLogRouter auditLogRouter, AuditLogConfig auditLogConfig) {
            this.logger = eventLogger;
            this.router = auditLogRouter;
            this.config = auditLogConfig;
        }
    }

    @Override // org.apache.kafka.common.ClusterResourceListener
    public void onUpdate(ClusterResource clusterResource) {
        this.scope = Scope.kafkaClusterScope(clusterResource.clusterId());
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        AuditLogConfig auditLogConfig = new AuditLogConfig(map);
        if (auditLogConfig.getBoolean(ConfluentConfigs.AUDIT_LOGGER_ENABLE_CONFIG).booleanValue()) {
            this.configuredState = new ConfiguredState(new EventLogger(), new AuditLogRouter(auditLogConfig.routerJsonConfig(), auditLogConfig.getInt(AuditLogConfig.ROUTER_CACHE_ENTRIES_CONFIG).intValue()), auditLogConfig);
            CrnAuthorityConfig crnAuthorityConfig = new CrnAuthorityConfig(map);
            if (crnAuthorityConfig.getString(CrnAuthorityConfig.CRN_AUTHORITY_TYPE_CONFIG).equals("CP")) {
                this.crnAuthority = new ConfluentServerCrnAuthority();
            } else {
                this.crnAuthority = new ConfluentCloudCrnAuthority();
            }
            this.crnAuthority.configure(crnAuthorityConfig.values());
            MultiTenantAuditLogConfig multiTenantAuditLogConfig = new MultiTenantAuditLogConfig(map);
            this.omitClientAddress = false;
            if (multiTenantAuditLogConfig.getBoolean("confluent.security.event.logger.multitenant.enable").booleanValue()) {
                this.omitClientAddress = !multiTenantAuditLogConfig.getBoolean(MultiTenantAuditLogConfig.LOG_CLIENT_IP_ADDRESS).booleanValue();
            }
            this.auditExtractorOptions = new AuditExtractorOptions(this.crnAuthority, this.omitClientAddress);
            this.eventLoggerReady = false;
        }
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public Set<String> reconfigurableConfigs() {
        HashSet hashSet = new HashSet();
        hashSet.add(ConfluentConfigs.AUDIT_EVENT_ROUTER_CONFIG);
        return hashSet;
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
        new AuditLogConfig(map).routerJsonConfig();
    }

    private void updateConfiguredState(Map<String, Object> map, AuditLogRouter auditLogRouter, AuditLogConfig auditLogConfig) {
        EventLogger eventLogger = this.configuredState != null ? this.configuredState.logger : null;
        EventLogger eventLogger2 = new EventLogger();
        eventLogger2.configure(map);
        this.configuredState = new ConfiguredState(eventLogger2, auditLogRouter, auditLogConfig);
        if (eventLogger != null) {
            Utils.closeQuietly(eventLogger, "eventLogger");
        }
    }

    @Override // org.apache.kafka.common.Reconfigurable
    public void reconfigure(Map<String, ?> map) {
        AuditLogConfig auditLogConfig = new AuditLogConfig(map);
        AuditLogRouterJsonConfig routerJsonConfig = auditLogConfig.routerJsonConfig();
        HashMap hashMap = new HashMap(map);
        if (auditLogConfig.routerJsonConfig().bootstrapServers() == null) {
            hashMap.putAll(this.originalInterBrokerListenerConfigs);
        }
        updateConfiguredState(AuditLogConfig.toEventLoggerConfig(hashMap), new AuditLogRouter(routerJsonConfig, auditLogConfig.getInt(AuditLogConfig.ROUTER_CACHE_ENTRIES_CONFIG).intValue()), auditLogConfig);
    }

    @Override // org.apache.kafka.server.audit.AuditLogProvider
    public CompletionStage<Void> start(Map<String, ?> map) {
        this.initExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("audit-init-%d", true));
        this.originalInterBrokerListenerConfigs = new HashMap(map);
        CompletableFuture completableFuture = new CompletableFuture();
        this.initExecutor.submit(() -> {
            try {
                HashMap hashMap = new HashMap(map);
                hashMap.putAll(this.configuredState.config.values());
                updateConfiguredState(AuditLogConfig.toEventLoggerConfig(hashMap), this.configuredState.router, this.configuredState.config);
                this.eventLoggerReady = true;
                completableFuture.complete(null);
                log.info("ConfluentAuditLogProvider startup is completed");
            } catch (Throwable th) {
                log.error("Audit log provider could not be started", th);
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture.whenComplete((r3, th) -> {
            this.initExecutor.shutdownNow();
        });
    }

    @Override // org.apache.kafka.server.audit.AuditLogProvider
    public void logEvent(AuditEvent auditEvent) {
        if (auditEvent.type() == AuditEventType.AUTHORIZATION) {
            logAuthorization((ConfluentAuthorizationEvent) auditEvent);
            return;
        }
        if (auditEvent.type() == AuditEventType.AUTHENTICATION) {
            logAuthentication((AuthenticationEvent) auditEvent);
        } else if (auditEvent.type() == AuditEventType.KAFKA_REQUEST) {
            logKafkaRequestEvent((KafkaRequestEvent) auditEvent);
        } else {
            log.error("Unknown event received {}", auditEvent);
        }
    }

    @Override // org.apache.kafka.server.audit.AuditLogProvider
    public boolean usesMetadataFromThisKafkaCluster() {
        return true;
    }

    @Override // org.apache.kafka.server.audit.AuditLogProvider
    public boolean providerConfigured(Map<String, ?> map) {
        return new AuditLogConfig(map).getBoolean(ConfluentConfigs.AUDIT_LOGGER_ENABLE_CONFIG).booleanValue();
    }

    @Override // org.apache.kafka.server.audit.AuditLogProvider
    public void setSanitizer(UnaryOperator<AuditEvent> unaryOperator) {
        this.sanitizer = unaryOperator;
    }

    @Override // org.apache.kafka.server.audit.AuditLogProvider
    public void setMetrics(Metrics metrics) {
        this.auditLogMetrics = new AuditLogMetrics(metrics);
    }

    private void logAuthorization(ConfluentAuthorizationEvent confluentAuthorizationEvent) {
        ConfiguredState configuredState = this.configuredState;
        if (shouldLog(confluentAuthorizationEvent, configuredState)) {
            try {
                KafkaPrincipal principal = confluentAuthorizationEvent.requestContext().principal();
                AuditLogEntry authorizationEvent = AuditLogUtils.authorizationEvent(confluentAuthorizationEvent, principal, this.crnAuthority, this.omitClientAddress);
                Optional<String> optional = configuredState.router.topic(authorizationEvent);
                if (isSuppressed(optional)) {
                    return;
                }
                if (this.sanitizer != null) {
                    confluentAuthorizationEvent = (ConfluentAuthorizationEvent) this.sanitizer.apply(confluentAuthorizationEvent);
                    if (confluentAuthorizationEvent == null) {
                        return;
                    } else {
                        authorizationEvent = AuditLogUtils.authorizationEvent(confluentAuthorizationEvent, principal, this.crnAuthority, this.omitClientAddress);
                    }
                }
                logEventToRoute(configuredState, optional, event(authorizationEvent.getServiceName(), authorizationEvent.getResourceName(), authorizationEvent, configuredState, confluentAuthorizationEvent, AUTHORIZATION_MESSAGE_TYPE), !this.eventLoggerReady);
                this.auditLogMetrics.recordAuthorizationAuditLogMetrics();
            } catch (CrnSyntaxException e) {
                log.error("Couldn't create cloud event due to internally generated CRN syntax problem", (Throwable) e);
                this.auditLogMetrics.recordAuthorizationAuditLogFailureMetrics();
            } catch (Exception e2) {
                log.error("Error occurred while handling authorization event", (Throwable) e2);
                this.auditLogMetrics.recordAuthorizationAuditLogFailureMetrics();
            }
        }
    }

    private boolean isSuppressed(Optional<String> optional) {
        return optional.isPresent() && optional.get().equalsIgnoreCase("");
    }

    private void logEventToRoute(ConfiguredState configuredState, Optional<String> optional, Event event, boolean z) {
        if (z) {
            this.fallbackLog.info("Event logger is not ready for audit event {}", EventUtils.toJson(event));
            this.auditLogMetrics.recordFallbackAuditLogMetrics();
        } else {
            if (!optional.isPresent()) {
                this.fallbackLog.error("Route is not present for audit event {}", EventUtils.toJson(event));
                return;
            }
            event.setExtension(Extensions.ROUTE, optional.get());
            if (configuredState.logger.ready(event)) {
                configuredState.logger.log(event);
                this.auditLogMetrics.recordNormalAuditLogMetrics();
            } else {
                this.fallbackLog.info("Route is not ready for audit event {}", EventUtils.toJson(event));
                this.auditLogMetrics.recordFallbackAuditLogMetrics();
            }
        }
    }

    private Event event(String str, String str2, Message message, ConfiguredState configuredState, AuditEvent auditEvent, String str3) {
        String dataContentType = dataContentType(configuredState);
        return new Event().setId(auditEvent.uuid().toString()).setTime(auditEvent.timestamp().atOffset(ZoneOffset.UTC)).setSource(str).setSubject(str2).setType(str3).setData(dataContentType, EventUtils.protoToBytes(message, dataContentType));
    }

    private String dataContentType(ConfiguredState configuredState) {
        String str;
        String string = configuredState.config.getString("confluent.security.event.logger.cloudevent.codec");
        boolean z = -1;
        switch (string.hashCode()) {
            case -1388966911:
                if (string.equals("binary")) {
                    z = true;
                    break;
                }
                break;
            case 185106769:
                if (string.equals("structured")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                str = "application/json";
                break;
            case true:
                str = "application/protobuf";
                break;
            default:
                throw new RuntimeException("unknown encoding " + string);
        }
        return str;
    }

    private boolean shouldLog(ConfluentAuthorizationEvent confluentAuthorizationEvent, ConfiguredState configuredState) {
        if (!configuredState.router.isEventRoutable(confluentAuthorizationEvent)) {
            return false;
        }
        switch (confluentAuthorizationEvent.authorizeResult()) {
            case ALLOWED:
                return confluentAuthorizationEvent.action().logIfAllowed();
            case DENIED:
                return confluentAuthorizationEvent.action().logIfDenied();
            default:
                return true;
        }
    }

    private void logAuthentication(AuthenticationEvent authenticationEvent) {
        try {
            ConfluentAuthenticationEvent confluentAuthenticationEvent = new ConfluentAuthenticationEvent(authenticationEvent, this.scope);
            KafkaPrincipal kafkaPrincipal = confluentAuthenticationEvent.principal().isPresent() ? confluentAuthenticationEvent.principal().get() : null;
            AuditLogEntry authenticationEvent2 = AuditLogUtils.authenticationEvent(confluentAuthenticationEvent, kafkaPrincipal, this.crnAuthority, this.omitClientAddress);
            ConfiguredState configuredState = this.configuredState;
            Optional<String> optional = configuredState.router.topic(authenticationEvent2);
            if (isSuppressed(optional)) {
                return;
            }
            if (this.sanitizer != null) {
                ConfluentAuthenticationEvent confluentAuthenticationEvent2 = (ConfluentAuthenticationEvent) this.sanitizer.apply(confluentAuthenticationEvent);
                if (confluentAuthenticationEvent2 == null) {
                    return;
                } else {
                    authenticationEvent2 = AuditLogUtils.authenticationEvent(confluentAuthenticationEvent2, kafkaPrincipal, this.crnAuthority, this.omitClientAddress);
                }
            }
            logEventToRoute(configuredState, optional, event(authenticationEvent2.getServiceName(), authenticationEvent2.getResourceName(), authenticationEvent2, configuredState, authenticationEvent, AUTHENTICATION_MESSAGE_TYPE), !this.eventLoggerReady);
            this.auditLogMetrics.recordAuthenticationAuditLogMetrics();
        } catch (Exception e) {
            this.auditLogMetrics.recordAuthenticationAuditLogFailureMetrics();
            log.error("Error occurred while handling authentication event : {}", authenticationEvent, e);
        }
    }

    private void logKafkaRequestEvent(KafkaRequestEvent kafkaRequestEvent) {
        try {
            if (this.sanitizer != null) {
                List<AuditLog> extractAuditLog = KafkaRequestToAuditEntry.extractAuditLog(kafkaRequestEvent, this.auditExtractorOptions);
                ConfiguredState configuredState = this.configuredState;
                int i = 0;
                int i2 = 0;
                for (AuditLog auditLog : extractAuditLog) {
                    try {
                        logEventToRoute(configuredState, AuditLogRouterJsonConfig.DEFAULT_V2_TOPIC_ROUTE, event(auditLog.getServiceName(), auditLog.getResourceName(), auditLog, configuredState, kafkaRequestEvent, KAFKA_REQUEST_MESSAGE_TYPE), !this.eventLoggerReady);
                        i++;
                    } catch (Exception e) {
                        i2++;
                        log.error("Error occurred while handling Kafka management event : {}", auditLog, e);
                    }
                }
                if (i != 0) {
                    this.auditLogMetrics.recordKafkaRequestEventAuditLogMetrics(ApiKeys.forId(kafkaRequestEvent.requestContext().requestType()), i);
                }
                if (i2 != 0) {
                    this.auditLogMetrics.recordKafkaRequestAuditLogFailureMetrics(i2);
                }
            }
        } catch (Exception e2) {
            this.auditLogMetrics.recordKafkaRequestAuditLogFailureMetrics(1.0d);
            log.error("Error occurred while handling Kafka management events : {}", kafkaRequestEvent, e2);
        }
    }

    @Override // org.apache.kafka.server.audit.AuditLogProvider
    public void close(String str) throws Exception {
        close();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.initExecutor != null) {
            this.initExecutor.shutdownNow();
            try {
                this.initExecutor.awaitTermination(CLOSE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.debug("ConfluentAuditLogProvider was interrupted while waiting to close");
                throw new InterruptException(e);
            }
        }
        Utils.closeQuietly(this.configuredState.logger, "eventLogger");
    }

    public ExecutorService initExecutor() {
        return this.initExecutor;
    }

    public EventLogger getEventLogger() {
        return this.configuredState.logger;
    }

    public boolean isEventLoggerReady() {
        return this.eventLoggerReady;
    }

    public Metrics metrics() {
        return this.auditLogMetrics.metrics();
    }

    protected void setupMetrics(Time time) {
        this.auditLogMetrics = new AuditLogMetrics(time);
    }

    protected Time metricsTime() {
        return this.auditLogMetrics.metricsTime();
    }
}
