Class OdysseyIntegrationManager

java.lang.Object
io.confluent.kafka.odyssey.OdysseyIntegrationManager

public final class OdysseyIntegrationManager extends Object
Manages the integration between Kafka topic creation and Odyssey (Schema Registry) associations for K1.

This class orchestrates the 4-step topic creation flow:

  1. Resolve Schema Registry endpoint via Resource Manager
  2. Validate schemas with Odyssey (async, non-blocking)
  3. Create topics in Kafka (controller thread pool)
  4. Create associations with Odyssey (async, non-blocking)

Schema Registry endpoint resolution is performed dynamically via LsrcLookupService, which queries Resource Manager's RRI (Resource Relation Index) for LKC→LSRC endpoint mapping. If the lookup fails or returns no mapping, the operation fails - there is no fallback mode.

  • Constructor Details

    • OdysseyIntegrationManager

      public OdysseyIntegrationManager(OdysseyRequestManager odysseyRequestManager, TopicIdLookup topicIdLookup, String clusterId, org.apache.kafka.common.utils.LogContext logContext, LsrcLookupService lsrcLookupService)
      Creates a new OdysseyIntegrationManager with dynamic SR endpoint resolution.

      This manager will query Resource Manager's RRI (Resource Relation Index) to dynamically resolve Schema Registry endpoints for each namespace (LKC). If the lookup fails or returns no mapping, the operation fails.

      Parameters:
      odysseyRequestManager - The manager for Odyssey/Schema Registry requests
      topicIdLookup - Function to look up topic IDs (typically from metadata cache)
      clusterId - The cluster ID for namespace fallback
      logContext - The log context for creating loggers
      lsrcLookupService - Service for dynamic SR endpoint resolution (required)
      Throws:
      NullPointerException - if any required argument is null
  • Method Details

    • handleCreateTopicsWithOdyssey

      public CompletableFuture<org.apache.kafka.common.message.CreateTopicsResponseData> handleCreateTopicsWithOdyssey(org.apache.kafka.common.requests.CreateTopicsRequest request, String namespace, Function<org.apache.kafka.common.message.CreateTopicsRequestData, CompletableFuture<org.apache.kafka.common.message.CreateTopicsResponseData>> topicCreator)
      Handles CreateTopics requests with Odyssey integration for K1.

      Flow:

      1. Resolve Schema Registry endpoint (via RM)
      2. Classify topics: with vs without Odyssey configs
      3. Validate only topics with Odyssey configs
      4. Create topics: those without Odyssey configs + passed validation (skip failed validation)
      5. Create associations for created topics with Odyssey configs
      6. Merge all results into final response
      Parameters:
      request - The CreateTopics request (K1 type: CreateTopicsRequest)
      namespace - The namespace (LKC cluster ID) for Odyssey
      topicCreator - Function to create topics in Kafka
      Returns:
      CompletableFuture with the complete response
      Throws:
      NullPointerException - if request or topicCreator is null
    • extractNamespace

      public String extractNamespace(org.apache.kafka.common.security.auth.KafkaPrincipal principal)
      Extracts namespace from request principal (K1-specific). Uses MultiTenantPrincipal if available, otherwise falls back to clusterId.
      Parameters:
      principal - The request principal (typically KafkaPrincipal or MultiTenantPrincipal)
      Returns:
      The namespace string (LKC cluster ID for multi-tenant, controller clusterId otherwise)