package io.confluent.kafka.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.metrics.TenantMetrics;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import kafka.server.link.ClusterLinkConfigDefaults;
import kafka.server.link.ClusterLinkManager;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.CorrelationIdMismatchException;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.server.link.ClusterLinkMetricsUtils;

/* loaded from: input_file:io/confluent/kafka/link/ClusterLinkInterceptor.class */
public class ClusterLinkInterceptor implements ClientInterceptor, Configurable {
    public static final String DEST_METRICS_GROUP = "cluster-link-dest-tenant-metrics";
    public static final String SOURCE_METRICS_GROUP = "cluster-link-source-tenant-metrics";
    private final TenantMetrics tenantMetrics = new TenantMetrics();
    private String metricsGroup;
    private Metrics metrics;
    private LinkContext linkContext;
    private MultiTenantPrincipal principal;
    private boolean isMultiTenant;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/link/ClusterLinkInterceptor$ClusterLinkClientMetricsContext.class */
    public static class ClusterLinkClientMetricsContext extends TenantMetrics.MetricsRequestContext {
        private final String linkName;
        private final String metricsGroup;

        public ClusterLinkClientMetricsContext(String str, String str2, MultiTenantPrincipal multiTenantPrincipal, String str3, ApiKeys apiKeys) {
            super(multiTenantPrincipal, str3, apiKeys);
            this.linkName = str;
            this.metricsGroup = str2;
        }

        @Override // io.confluent.kafka.multitenant.metrics.TenantMetrics.TenantMetricsContext, org.apache.kafka.server.metrics.MetricsBuilderContext
        public String metricsGroup() {
            return this.metricsGroup;
        }

        @Override // io.confluent.kafka.multitenant.metrics.TenantMetrics.TenantMetricsContext, org.apache.kafka.server.metrics.MetricsBuilderContext
        public Map<String, String> metricTags() {
            HashMap hashMap = new HashMap();
            hashMap.put("tenant", principal().tenantMetadata().tenantName);
            hashMap.put(ClusterLinkMetricsUtils.LINK_NAME_TAG, this.linkName);
            return hashMap;
        }

        @Override // io.confluent.kafka.multitenant.metrics.TenantMetrics.TenantMetricsContext, org.apache.kafka.server.metrics.MetricsBuilderContext
        public String sensorSuffix() {
            return String.format(":%s-%s:%s-%s", "tenant", principal().tenantMetadata().tenantName, ClusterLinkMetricsUtils.LINK_NAME_TAG, this.linkName);
        }
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        String str = (String) map.get(ClusterLinkManager.LocalTenantPrefixProp());
        Boolean bool = (Boolean) map.get(ClusterLinkManager.IsMultiTenantProp());
        this.isMultiTenant = bool == null ? false : bool.booleanValue();
        if (!this.isMultiTenant) {
            str = str == null ? "" : str;
        } else {
            if (str == null || str.isEmpty()) {
                throw new ConfigException("ClusterLinkInterceptor is multi-tenant and should be configured with a valid tenant prefix");
            }
            if (!str.endsWith("_")) {
                throw new ConfigException("Tenant prefix does not contain delimiter");
            }
            this.principal = new MultiTenantPrincipal("unused", new TenantMetadata.Builder(str.substring(0, str.length() - 1), "unused").build());
        }
        String str2 = (String) map.get(ClusterLinkManager.LinkNameProp());
        if (str2 == null || str2.isEmpty()) {
            throw new ConfigException("ClusterLinkInterceptor is not configured with a valid link name");
        }
        this.metricsGroup = ((ClusterLinkConfig.LinkMode) map.get(ClusterLinkManager.LinkMetricsModeProp())) == ClusterLinkConfig.LinkMode.SOURCE ? SOURCE_METRICS_GROUP : DEST_METRICS_GROUP;
        String str3 = (String) map.get(kafka.server.link.ClusterLinkConfig.ClusterLinkPrefixProp());
        this.linkContext = new LinkContext(str, str3 == null ? "" : str3, str2, prefixConsumerGroupWithClusterLinkPrefix(map));
    }

    private Boolean prefixConsumerGroupWithClusterLinkPrefix(Map<String, ?> map) {
        Boolean bool = (Boolean) map.get(kafka.server.link.ClusterLinkConfig.ConsumerGroupPrefixEnableProp());
        return Boolean.valueOf(bool == null ? ClusterLinkConfigDefaults.ConsumerGroupPrefixEnableDefault() : bool.booleanValue());
    }

    @Override // org.apache.kafka.clients.ClientInterceptor
    public void configureMetrics(Metrics metrics) {
        this.metrics = metrics;
    }

    @Override // org.apache.kafka.clients.ClientInterceptor
    public void ensureConnectionAllowed(InetSocketAddress inetSocketAddress) throws IOException {
        if (this.isMultiTenant && kafka.server.link.ClusterLinkUtils.isInternalNetworkOrPort(inetSocketAddress)) {
            throw new IOException(String.format("Connection to %s:%d not allowed from Confluent Cloud brokers", inetSocketAddress.getAddress().getHostAddress(), Integer.valueOf(inetSocketAddress.getPort())));
        }
    }

    @Override // org.apache.kafka.clients.ClientInterceptor
    public Send toSend(RequestHeader requestHeader, AbstractRequest abstractRequest, long j) {
        if (!ClusterLinkApis.isApiAllowed(requestHeader.apiKey())) {
            throw new IllegalStateException("Request " + requestHeader.apiKey() + " is not allowed on cluster links");
        }
        Send send = abstractRequest.toSend(requestHeader, this.linkContext);
        if (this.isMultiTenant) {
            this.tenantMetrics.recordRequest(this.metrics, metricsRequestContext(requestHeader), send.size() + 4, j);
        }
        return send;
    }

    @Override // org.apache.kafka.clients.ClientInterceptor
    public AbstractResponse parseResponse(ByteBuffer byteBuffer, RequestHeader requestHeader, long j, long j2) {
        ApiKeys apiKey = requestHeader.apiKey();
        if (!ClusterLinkApis.isApiAllowed(apiKey)) {
            throw new IllegalStateException("Request " + apiKey + " is not allowed on cluster links");
        }
        if (this.isMultiTenant) {
            this.tenantMetrics.recordResponse(this.metrics, metricsRequestContext(requestHeader), 4 + byteBuffer.remaining(), TimeUnit.MILLISECONDS.toNanos(Math.max(0L, j2 - j)), Collections.emptyMap(), j2);
        }
        ResponseHeader parse = ResponseHeader.parse(byteBuffer, apiKey.responseHeaderVersion(requestHeader.apiVersion()));
        if (requestHeader.correlationId() != parse.correlationId()) {
            throw new CorrelationIdMismatchException("Correlation id for response (" + parse.correlationId() + ") does not match request (" + requestHeader.correlationId() + "), request requestHeader: " + requestHeader, requestHeader.correlationId(), parse.correlationId());
        }
        return AbstractResponse.parseResponse(apiKey, byteBuffer, requestHeader.apiVersion(), this.linkContext);
    }

    private TenantMetrics.MetricsRequestContext metricsRequestContext(RequestHeader requestHeader) {
        return new ClusterLinkClientMetricsContext(this.linkContext.linkName(), this.metricsGroup, this.principal, requestHeader.clientId(), requestHeader.apiKey());
    }
}
