Class OptimizerNode

  • All Implemented Interfaces:
    EstimateProvider, DumpableNode<OptimizerNode>, org.apache.flink.util.Visitable<OptimizerNode>
    Direct Known Subclasses:
    AbstractPartialSolutionNode, DataSinkNode, DataSourceNode, SingleInputNode, TwoInputNode

    public abstract class OptimizerNode
    extends Object
    implements org.apache.flink.util.Visitable<OptimizerNode>, EstimateProvider, DumpableNode<OptimizerNode>
    The OptimizerNode is the base class of all nodes in the optimizer DAG. The optimizer DAG is the optimizer's representation of a program, created before the actual optimization (which creates different candidate plans and computes their cost).

    Nodes in the DAG correspond (almost) one-to-one to the operators in a program. The optimizer DAG is constructed to hold the additional information that the optimizer needs:

    • Estimates of the data size processed by each operator
    • Helper structures to track where the data flow "splits" and "joins", to support flows that are DAGs but not trees.
    • Tags and weights to differentiate between loop-variant and -invariant parts of an iteration
    • Interesting properties to be used during the enumeration of candidate plans
    • Field Detail

      • MAX_DYNAMIC_PATH_COST_WEIGHT

        public static final int MAX_DYNAMIC_PATH_COST_WEIGHT
        See Also:
        Constant Field Values
      • estimatedOutputSize

        protected long estimatedOutputSize
      • estimatedNumRecords

        protected long estimatedNumRecords
      • uniqueFields

        protected Set<org.apache.flink.api.common.operators.util.FieldSet> uniqueFields
      • id

        protected int id
      • costWeight

        protected int costWeight
      • onDynamicPath

        protected boolean onDynamicPath
    • Constructor Detail

      • OptimizerNode

        public OptimizerNode​(org.apache.flink.api.common.operators.Operator<?> op)
        Creates a new optimizer node that represents the given program operator.
        Parameters:
        op - The operator that the node represents.
      • OptimizerNode

        protected OptimizerNode​(OptimizerNode toCopy)
    • Method Detail

      • getOperatorName

        public abstract String getOperatorName()
        Gets the name of this node, which is the name of the function/operator, or data source / data sink.
        Returns:
        The node name.
      • setInput

        public abstract void setInput​(Map<org.apache.flink.api.common.operators.Operator<?>,​OptimizerNode> operatorToNode,
                                      org.apache.flink.api.common.ExecutionMode defaultExchangeMode)
        This function connects the predecessors to this operator.
        Parameters:
        operatorToNode - The map from program operators to optimizer nodes.
        defaultExchangeMode - The data exchange mode to use, if the operator does not specify one.
      • setBroadcastInputs

        public void setBroadcastInputs​(Map<org.apache.flink.api.common.operators.Operator<?>,​OptimizerNode> operatorToNode,
                                       org.apache.flink.api.common.ExecutionMode defaultExchangeMode)
        This function connects the operators that produce the broadcast inputs to this operator.
        Parameters:
        operatorToNode - The map from program operators to optimizer nodes.
        defaultExchangeMode - The data exchange mode to use, if the operator does not specify one.
        Throws:
        CompilerException
      • getIncomingConnections

        public abstract List<DagConnection> getIncomingConnections()
        Gets all incoming connections of this node. This method needs to be overridden by subclasses to return the children.
        Returns:
        The list of incoming connections.
      • computeInterestingPropertiesForInputs

        public abstract void computeInterestingPropertiesForInputs​(CostEstimator estimator)
        Tells the node to compute the interesting properties for its inputs. The interesting properties for the node itself must have been computed before. The node must then see how many of interesting properties it preserves and add its own.
        Parameters:
        estimator - The CostEstimator instance to use for plan cost estimation.
      • computeUnclosedBranchStack

        public abstract void computeUnclosedBranchStack()
        This method causes the node to compute the description of open branches in its sub-plan. An open branch describes, that a (transitive) child node had multiple outputs, which have not all been re-joined in the sub-plan. This method needs to set the openBranches field to a stack of unclosed branches, the latest one top. A branch is considered closed, if some later node sees all of the branching node's outputs, no matter if there have been more branches to different paths in the meantime.
      • getAlternativePlans

        public abstract List<PlanNode> getAlternativePlans​(CostEstimator estimator)
        Computes the plan alternatives for this node, an implicitly for all nodes that are children of this node. This method must determine for each alternative the global and local properties and the costs. This method may recursively call getAlternatives() on its children to get their plan alternatives, and build its own alternatives on top of those.
        Parameters:
        estimator - The cost estimator used to estimate the costs of each plan alternative.
        Returns:
        A list containing all plan alternatives.
      • accept

        public abstract void accept​(org.apache.flink.util.Visitor<OptimizerNode> visitor)
        This method implements the visit of a depth-first graph traversing visitor. Implementers must first call the preVisit() method, then hand the visitor to their children, and finally call the postVisit() method.
        Specified by:
        accept in interface org.apache.flink.util.Visitable<OptimizerNode>
        Parameters:
        visitor - The graph traversing visitor.
        See Also:
        Visitable.accept(org.apache.flink.util.Visitor)
      • getSemanticProperties

        public abstract org.apache.flink.api.common.operators.SemanticProperties getSemanticProperties()
      • getId

        public int getId()
        Gets the ID of this node. If the id has not yet been set, this method returns -1;
        Returns:
        This node's id, or -1, if not yet set.
      • initId

        public void initId​(int id)
        Sets the ID of this node.
        Parameters:
        id - The id for this node.
      • addBroadcastConnection

        public void addBroadcastConnection​(String name,
                                           DagConnection broadcastConnection)
        Adds the broadcast connection identified by the given name to this node.
        Parameters:
        broadcastConnection - The connection to add.
      • getBroadcastConnectionNames

        public List<String> getBroadcastConnectionNames()
        Return the list of names associated with broadcast inputs for this node.
      • getBroadcastConnections

        public List<DagConnection> getBroadcastConnections()
        Return the list of inputs associated with broadcast variables for this node.
      • addOutgoingConnection

        public void addOutgoingConnection​(DagConnection connection)
        Adds a new outgoing connection to this node.
        Parameters:
        connection - The connection to add.
      • getOutgoingConnections

        public List<DagConnection> getOutgoingConnections()
        The list of outgoing connections from this node to succeeding tasks.
        Returns:
        The list of outgoing connections.
      • getOperator

        public org.apache.flink.api.common.operators.Operator<?> getOperator()
        Gets the operator represented by this optimizer node.
        Returns:
        This node's operator.
      • getParallelism

        public int getParallelism()
        Gets the parallelism for the operator represented by this optimizer node. The parallelism denotes how many parallel instances of the operator on will be spawned during the execution. If this value is ExecutionConfig.PARALLELISM_DEFAULT then the system will take the default number of parallel instances.
        Returns:
        The parallelism of the operator.
      • setParallelism

        public void setParallelism​(int parallelism)
        Sets the parallelism for this optimizer node. The parallelism denotes how many parallel instances of the operator will be spawned during the execution.
        Parameters:
        parallelism - The parallelism to set. If this value is ExecutionConfig.PARALLELISM_DEFAULT then the system will take the default number of parallel instances.
        Throws:
        IllegalArgumentException - If the parallelism is smaller than one.
      • getMinimalMemoryAcrossAllSubTasks

        public long getMinimalMemoryAcrossAllSubTasks()
        Gets the amount of memory that all subtasks of this task have jointly available.
        Returns:
        The total amount of memory across all subtasks.
      • isOnDynamicPath

        public boolean isOnDynamicPath()
      • identifyDynamicPath

        public void identifyDynamicPath​(int costWeight)
      • getCostWeight

        public int getCostWeight()
      • getMaxDepth

        public int getMaxDepth()
      • getInterestingProperties

        public InterestingProperties getInterestingProperties()
        Gets the properties that are interesting for this node to produce.
        Returns:
        The interesting properties for this node, or null, if not yet computed.
      • getEstimatedNumRecords

        public long getEstimatedNumRecords()
        Description copied from interface: EstimateProvider
        Gets the estimated number of records in the output of this node.
        Specified by:
        getEstimatedNumRecords in interface EstimateProvider
        Returns:
        The estimated number of records.
      • setEstimatedOutputSize

        public void setEstimatedOutputSize​(long estimatedOutputSize)
      • setEstimatedNumRecords

        public void setEstimatedNumRecords​(long estimatedNumRecords)
      • isBranching

        public boolean isBranching()
        Checks whether this node has branching output. A node's output is branched, if it has more than one output connection.
        Returns:
        True, if the node's output branches. False otherwise.
      • markAllOutgoingConnectionsAsPipelineBreaking

        public void markAllOutgoingConnectionsAsPipelineBreaking()
      • haveAllOutputConnectionInterestingProperties

        public boolean haveAllOutputConnectionInterestingProperties()
        Checks, if all outgoing connections have their interesting properties set from their target nodes.
        Returns:
        True, if on all outgoing connections, the interesting properties are set. False otherwise.
      • computeUnionOfInterestingPropertiesFromSuccessors

        public void computeUnionOfInterestingPropertiesFromSuccessors()
        Computes all the interesting properties that are relevant to this node. The interesting properties are a union of the interesting properties on each outgoing connection. However, if two interesting properties on the outgoing connections overlap, the interesting properties will occur only once in this set. For that, this method deduplicates and merges the interesting properties. This method returns copies of the original interesting properties objects and leaves the original objects, contained by the connections, unchanged.
      • clearInterestingProperties

        public void clearInterestingProperties()
      • computeOutputEstimates

        public void computeOutputEstimates​(DataStatistics statistics)
        Causes this node to compute its output estimates (such as number of rows, size in bytes) based on the inputs and the compiler hints. The compiler hints are instantiated with conservative default values which are used if no other values are provided. Nodes may access the statistics to determine relevant information.
        Parameters:
        statistics - The statistics object which may be accessed to get statistical information. The parameter may be null, if no statistics are available.
      • computeOperatorSpecificDefaultEstimates

        protected abstract void computeOperatorSpecificDefaultEstimates​(DataStatistics statistics)
      • readStubAnnotations

        protected void readStubAnnotations()
        Reads all stub annotations, i.e. which fields remain constant, what cardinality bounds the functions have, which fields remain unique.
      • readUniqueFieldsAnnotation

        protected void readUniqueFieldsAnnotation()
      • getUniqueFields

        public Set<org.apache.flink.api.common.operators.util.FieldSet> getUniqueFields()
        Gets the FieldSets which are unique in the output of the node.
      • prunePlanAlternatives

        protected void prunePlanAlternatives​(List<PlanNode> plans)
      • prunePlanAlternativesWithCommonBranching

        protected void prunePlanAlternativesWithCommonBranching​(List<PlanNode> plans)
      • hasUnclosedBranches

        public boolean hasUnclosedBranches()
      • addClosedBranches

        protected void addClosedBranches​(Set<OptimizerNode> alreadyClosed)
      • addClosedBranch

        protected void addClosedBranch​(OptimizerNode alreadyClosed)
      • areBranchCompatible

        protected boolean areBranchCompatible​(PlanNode plan1,
                                              PlanNode plan2)
        Checks whether to candidate plans for the sub-plan of this node are comparable. The two alternative plans are comparable, if

        a) There is no branch in the sub-plan of this node b) Both candidates have the same candidate as the child at the last open branch.

        Parameters:
        plan1 - The root node of the first candidate plan.
        plan2 - The root node of the second candidate plan.
        Returns:
        True if the nodes are branch compatible in the inputs.