package org.apache.kafka.streams.kstream.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.class */
public class InternalStreamsBuilder implements InternalNameProvider {
    final InternalTopologyBuilder internalTopologyBuilder;
    private static final String TOPOLOGY_ROOT = "root";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) InternalStreamsBuilder.class);
    private final AtomicInteger index = new AtomicInteger(0);
    private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
    private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
    private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>();
    protected final StreamsGraphNode root = new StreamsGraphNode(TOPOLOGY_ROOT, false) { // from class: org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.1
        @Override // org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode
        public void writeToTopology(InternalTopologyBuilder internalTopologyBuilder) {
        }
    };

    public InternalStreamsBuilder(InternalTopologyBuilder internalTopologyBuilder) {
        this.internalTopologyBuilder = internalTopologyBuilder;
    }

    public <K, V> KStream<K, V> stream(Collection<String> collection, ConsumedInternal<K, V> consumedInternal) {
        String newProcessorName = newProcessorName("KSTREAM-SOURCE-");
        StreamSourceNode streamSourceNode = new StreamSourceNode(newProcessorName, collection, consumedInternal);
        addGraphNode(this.root, streamSourceNode);
        return new KStreamImpl(newProcessorName, consumedInternal.keySerde(), consumedInternal.valueSerde(), Collections.singleton(newProcessorName), false, streamSourceNode, this);
    }

    public <K, V> KStream<K, V> stream(Pattern pattern, ConsumedInternal<K, V> consumedInternal) {
        String newProcessorName = newProcessorName("KSTREAM-SOURCE-");
        StreamSourceNode streamSourceNode = new StreamSourceNode(newProcessorName, pattern, consumedInternal);
        addGraphNode(this.root, streamSourceNode);
        return new KStreamImpl(newProcessorName, consumedInternal.keySerde(), consumedInternal.valueSerde(), Collections.singleton(newProcessorName), false, streamSourceNode, this);
    }

    public <K, V> KTable<K, V> table(String str, ConsumedInternal<K, V> consumedInternal, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        String newProcessorName = newProcessorName("KSTREAM-SOURCE-");
        String newProcessorName2 = newProcessorName("KTABLE-SOURCE-");
        KTableSource kTableSource = new KTableSource(materializedInternal.storeName(), materializedInternal.queryableStoreName());
        TableSourceNode<K, V> build = TableSourceNode.tableSourceNodeBuilder().withTopic(str).withSourceName(newProcessorName).withNodeName(newProcessorName2).withConsumedInternal(consumedInternal).withMaterializedInternal(materializedInternal).withProcessorParameters(new ProcessorParameters<>(kTableSource, newProcessorName2)).build();
        addGraphNode(this.root, build);
        return new KTableImpl(newProcessorName2, consumedInternal.keySerde(), consumedInternal.valueSerde(), Collections.singleton(newProcessorName), materializedInternal.queryableStoreName(), kTableSource, build, this);
    }

    public <K, V> GlobalKTable<K, V> globalTable(String str, ConsumedInternal<K, V> consumedInternal, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        Objects.requireNonNull(consumedInternal, "consumed can't be null");
        Objects.requireNonNull(materializedInternal, "materialized can't be null");
        materializedInternal.withLoggingDisabled();
        String newProcessorName = newProcessorName("KTABLE-SOURCE-");
        String newProcessorName2 = newProcessorName("KTABLE-SOURCE-");
        String storeName = materializedInternal.storeName();
        addGraphNode(this.root, TableSourceNode.tableSourceNodeBuilder().withTopic(str).isGlobalKTable(true).withSourceName(newProcessorName).withConsumedInternal(consumedInternal).withMaterializedInternal(materializedInternal).withProcessorParameters(new ProcessorParameters<>(new KTableSource(storeName, storeName), newProcessorName2)).build());
        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier(storeName), materializedInternal.queryableStoreName());
    }

    @Override // org.apache.kafka.streams.kstream.internals.InternalNameProvider
    public String newProcessorName(String str) {
        return str + String.format("%010d", Integer.valueOf(this.index.getAndIncrement()));
    }

    @Override // org.apache.kafka.streams.kstream.internals.InternalNameProvider
    public String newStoreName(String str) {
        return str + String.format("STATE-STORE-%010d", Integer.valueOf(this.index.getAndIncrement()));
    }

    public synchronized void addStateStore(StoreBuilder storeBuilder) {
        addGraphNode(this.root, new StateStoreNode(storeBuilder));
    }

    public synchronized void addGlobalStore(StoreBuilder<KeyValueStore> storeBuilder, String str, String str2, ConsumedInternal consumedInternal, String str3, ProcessorSupplier processorSupplier) {
        addGraphNode(this.root, new GlobalStoreNode(storeBuilder, str, str2, consumedInternal, str3, processorSupplier));
    }

    public synchronized void addGlobalStore(StoreBuilder<KeyValueStore> storeBuilder, String str, ConsumedInternal consumedInternal, ProcessorSupplier processorSupplier) {
        storeBuilder.withLoggingDisabled();
        addGlobalStore(storeBuilder, newProcessorName("KSTREAM-SOURCE-"), str, consumedInternal, newProcessorName("KTABLE-SOURCE-"), processorSupplier);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addGraphNode(StreamsGraphNode streamsGraphNode, StreamsGraphNode streamsGraphNode2) {
        Objects.requireNonNull(streamsGraphNode, "parent node can't be null");
        Objects.requireNonNull(streamsGraphNode2, "child node can't be null");
        streamsGraphNode.addChild(streamsGraphNode2);
        maybeAddNodeForOptimizationMetadata(streamsGraphNode2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addGraphNode(Collection<StreamsGraphNode> collection, StreamsGraphNode streamsGraphNode) {
        Objects.requireNonNull(collection, "parent node can't be null");
        Objects.requireNonNull(streamsGraphNode, "child node can't be null");
        if (collection.isEmpty()) {
            throw new StreamsException("Parent node collection can't be empty");
        }
        Iterator<StreamsGraphNode> it = collection.iterator();
        while (it.hasNext()) {
            addGraphNode(it.next(), streamsGraphNode);
        }
    }

    private void maybeAddNodeForOptimizationMetadata(StreamsGraphNode streamsGraphNode) {
        streamsGraphNode.setBuildPriority(this.buildPriorityIndex.getAndIncrement());
        if (streamsGraphNode.parentNodes().isEmpty() && !streamsGraphNode.nodeName().equals(TOPOLOGY_ROOT)) {
            throw new IllegalStateException("Nodes should not have a null parent node.  Name: " + streamsGraphNode.nodeName() + " Type: " + streamsGraphNode.getClass().getSimpleName());
        }
        if (streamsGraphNode.isKeyChangingOperation()) {
            this.keyChangingOperationsToOptimizableRepartitionNodes.put(streamsGraphNode, new LinkedHashSet<>());
            return;
        }
        if (!(streamsGraphNode instanceof OptimizableRepartitionNode)) {
            if (streamsGraphNode.isMergeNode()) {
                this.mergeNodes.add(streamsGraphNode);
            }
        } else {
            StreamsGraphNode keyChangingParentNode = getKeyChangingParentNode(streamsGraphNode);
            if (keyChangingParentNode != null) {
                this.keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParentNode).add((OptimizableRepartitionNode) streamsGraphNode);
            }
        }
    }

    public void buildAndOptimizeTopology() {
        buildAndOptimizeTopology(null);
    }

    public void buildAndOptimizeTopology(Properties properties) {
        maybePerformOptimizations(properties);
        PriorityQueue priorityQueue = new PriorityQueue(5, Comparator.comparing((v0) -> {
            return v0.buildPriority();
        }));
        priorityQueue.offer(this.root);
        while (!priorityQueue.isEmpty()) {
            StreamsGraphNode streamsGraphNode = (StreamsGraphNode) priorityQueue.remove();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Adding nodes to topology {} child nodes {}", streamsGraphNode, streamsGraphNode.children());
            }
            if (streamsGraphNode.allParentsWrittenToTopology() && !streamsGraphNode.hasWrittenToTopology()) {
                streamsGraphNode.writeToTopology(this.internalTopologyBuilder);
                streamsGraphNode.setHasWrittenToTopology(true);
            }
            Iterator<StreamsGraphNode> it = streamsGraphNode.children().iterator();
            while (it.hasNext()) {
                priorityQueue.offer(it.next());
            }
        }
    }

    private void maybePerformOptimizations(Properties properties) {
        if (properties == null || !StreamsConfig.OPTIMIZE.equals(properties.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION))) {
            return;
        }
        LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
        maybeOptimizeRepartitionOperations();
    }

    private void maybeOptimizeRepartitionOperations() {
        maybeUpdateKeyChangingRepartitionNodeMap();
        Iterator<Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> it = this.keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> next = it.next();
            StreamsGraphNode key = next.getKey();
            if (!next.getValue().isEmpty()) {
                GroupedInternal groupedInternal = new GroupedInternal(getRepartitionSerdes(next.getValue()));
                OptimizableRepartitionNode createRepartitionNode = createRepartitionNode(getFirstRepartitionTopicName(next.getValue()), groupedInternal.keySerde(), groupedInternal.valueSerde());
                createRepartitionNode.setBuildPriority(key.buildPriority().intValue());
                Iterator<OptimizableRepartitionNode> it2 = next.getValue().iterator();
                while (it2.hasNext()) {
                    OptimizableRepartitionNode next2 = it2.next();
                    StreamsGraphNode findParentNodeMatching = findParentNodeMatching(next2, streamsGraphNode -> {
                        return streamsGraphNode.parentNodes().contains(key);
                    });
                    if (findParentNodeMatching == null) {
                        throw new StreamsException(String.format("Found a null keyChangingChild node for %s", next2));
                    }
                    LOG.debug("Found the child node of the key changer {} from the repartition {}.", findParentNodeMatching, next2);
                    createRepartitionNode.addChild(findParentNodeMatching);
                    LOG.debug("Removing {} from {}  children {}", findParentNodeMatching, key, key.children());
                    key.removeChild(findParentNodeMatching);
                    Collection<StreamsGraphNode> children = next2.children();
                    Collection<StreamsGraphNode> parentNodes = next2.parentNodes();
                    for (StreamsGraphNode streamsGraphNode2 : children) {
                        Iterator<StreamsGraphNode> it3 = parentNodes.iterator();
                        while (it3.hasNext()) {
                            it3.next().addChild(streamsGraphNode2);
                        }
                    }
                    Iterator<StreamsGraphNode> it4 = parentNodes.iterator();
                    while (it4.hasNext()) {
                        it4.next().removeChild(next2);
                    }
                    next2.clearChildren();
                    LOG.debug("Updated node {} children {}", createRepartitionNode, createRepartitionNode.children());
                }
                key.addChild(createRepartitionNode);
                it.remove();
            }
        }
    }

    private void maybeUpdateKeyChangingRepartitionNodeMap() {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        Iterator<StreamsGraphNode> it = this.mergeNodes.iterator();
        while (it.hasNext()) {
            StreamsGraphNode next = it.next();
            hashMap.put(next, new LinkedHashSet());
            for (StreamsGraphNode streamsGraphNode : this.keyChangingOperationsToOptimizableRepartitionNodes.keySet()) {
                if (findParentNodeMatching(next, streamsGraphNode2 -> {
                    return streamsGraphNode2.parentNodes().contains(streamsGraphNode);
                }) != null) {
                    ((Set) hashMap.get(next)).add(streamsGraphNode);
                }
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            StreamsGraphNode streamsGraphNode3 = (StreamsGraphNode) entry.getKey();
            Collection<StreamsGraphNode> collection = (Collection) entry.getValue();
            LinkedHashSet<OptimizableRepartitionNode> linkedHashSet = new LinkedHashSet<>();
            for (StreamsGraphNode streamsGraphNode4 : collection) {
                linkedHashSet.addAll(this.keyChangingOperationsToOptimizableRepartitionNodes.get(streamsGraphNode4));
                hashSet.add(streamsGraphNode4);
            }
            this.keyChangingOperationsToOptimizableRepartitionNodes.put(streamsGraphNode3, linkedHashSet);
        }
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            this.keyChangingOperationsToOptimizableRepartitionNodes.remove((StreamsGraphNode) it2.next());
        }
    }

    private OptimizableRepartitionNode createRepartitionNode(String str, Serde serde, Serde serde2) {
        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
        KStreamImpl.createRepartitionedSource(this, serde, serde2, str, optimizableRepartitionNodeBuilder);
        optimizableRepartitionNodeBuilder.withRepartitionTopic(str);
        return optimizableRepartitionNodeBuilder.build();
    }

    private StreamsGraphNode getKeyChangingParentNode(StreamsGraphNode streamsGraphNode) {
        StreamsGraphNode findParentNodeMatching = findParentNodeMatching(streamsGraphNode, streamsGraphNode2 -> {
            return streamsGraphNode2.isKeyChangingOperation() || streamsGraphNode2.isValueChangingOperation();
        });
        StreamsGraphNode findParentNodeMatching2 = findParentNodeMatching(streamsGraphNode, (v0) -> {
            return v0.isKeyChangingOperation();
        });
        if (findParentNodeMatching == null || !findParentNodeMatching.equals(findParentNodeMatching2)) {
            return null;
        }
        return findParentNodeMatching2;
    }

    private String getFirstRepartitionTopicName(Collection<OptimizableRepartitionNode> collection) {
        return collection.iterator().next().repartitionTopic();
    }

    private GroupedInternal getRepartitionSerdes(Collection<OptimizableRepartitionNode> collection) {
        Serde serde = null;
        Serde serde2 = null;
        for (OptimizableRepartitionNode optimizableRepartitionNode : collection) {
            if (serde == null && optimizableRepartitionNode.keySerde() != null) {
                serde = optimizableRepartitionNode.keySerde();
            }
            if (serde2 == null && optimizableRepartitionNode.valueSerde() != null) {
                serde2 = optimizableRepartitionNode.valueSerde();
            }
            if (serde != null && serde2 != null) {
                break;
            }
        }
        return new GroupedInternal(Grouped.with(serde, serde2));
    }

    private StreamsGraphNode findParentNodeMatching(StreamsGraphNode streamsGraphNode, Predicate<StreamsGraphNode> predicate) {
        if (predicate.test(streamsGraphNode)) {
            return streamsGraphNode;
        }
        StreamsGraphNode streamsGraphNode2 = null;
        for (StreamsGraphNode streamsGraphNode3 : streamsGraphNode.parentNodes()) {
            if (predicate.test(streamsGraphNode3)) {
                return streamsGraphNode3;
            }
            streamsGraphNode2 = findParentNodeMatching(streamsGraphNode3, predicate);
        }
        return streamsGraphNode2;
    }

    public StreamsGraphNode root() {
        return this.root;
    }
}
