Package org.apache.flink.optimizer.dag
Class OptimizerNode
- java.lang.Object
-
- org.apache.flink.optimizer.dag.OptimizerNode
-
- All Implemented Interfaces:
EstimateProvider,DumpableNode<OptimizerNode>,org.apache.flink.util.Visitable<OptimizerNode>
- Direct Known Subclasses:
AbstractPartialSolutionNode,DataSinkNode,DataSourceNode,SingleInputNode,TwoInputNode
public abstract class OptimizerNode extends Object implements org.apache.flink.util.Visitable<OptimizerNode>, EstimateProvider, DumpableNode<OptimizerNode>
The OptimizerNode is the base class of all nodes in the optimizer DAG. The optimizer DAG is the optimizer's representation of a program, created before the actual optimization (which creates different candidate plans and computes their cost).Nodes in the DAG correspond (almost) one-to-one to the operators in a program. The optimizer DAG is constructed to hold the additional information that the optimizer needs:
- Estimates of the data size processed by each operator
- Helper structures to track where the data flow "splits" and "joins", to support flows that are DAGs but not trees.
- Tags and weights to differentiate between loop-variant and -invariant parts of an iteration
- Interesting properties to be used during the enumeration of candidate plans
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classOptimizerNode.UnclosedBranchDescriptorDescription of an unclosed branch.
-
Field Summary
Fields Modifier and Type Field Description protected List<PlanNode>cachedPlansprotected Set<OptimizerNode>closedBranchingNodesprotected intcostWeightprotected longestimatedNumRecordsprotected longestimatedOutputSizeprotected List<OptimizerNode>hereJoinedBranchesprotected intidstatic intMAX_DYNAMIC_PATH_COST_WEIGHTprotected booleanonDynamicPathprotected List<OptimizerNode.UnclosedBranchDescriptor>openBranchesprotected Set<org.apache.flink.api.common.operators.util.FieldSet>uniqueFields
-
Constructor Summary
Constructors Modifier Constructor Description OptimizerNode(org.apache.flink.api.common.operators.Operator<?> op)Creates a new optimizer node that represents the given program operator.protectedOptimizerNode(OptimizerNode toCopy)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description abstract voidaccept(org.apache.flink.util.Visitor<OptimizerNode> visitor)This method implements the visit of a depth-first graph traversing visitor.voidaddBroadcastConnection(String name, DagConnection broadcastConnection)Adds the broadcast connection identified by the givennameto this node.protected voidaddClosedBranch(OptimizerNode alreadyClosed)protected voidaddClosedBranches(Set<OptimizerNode> alreadyClosed)voidaddOutgoingConnection(DagConnection connection)Adds a new outgoing connection to this node.protected booleanareBranchCompatible(PlanNode plan1, PlanNode plan2)Checks whether to candidate plans for the sub-plan of this node are comparable.voidclearInterestingProperties()abstract voidcomputeInterestingPropertiesForInputs(CostEstimator estimator)Tells the node to compute the interesting properties for its inputs.protected abstract voidcomputeOperatorSpecificDefaultEstimates(DataStatistics statistics)voidcomputeOutputEstimates(DataStatistics statistics)Causes this node to compute its output estimates (such as number of rows, size in bytes) based on the inputs and the compiler hints.abstract voidcomputeUnclosedBranchStack()This method causes the node to compute the description of open branches in its sub-plan.protected List<OptimizerNode.UnclosedBranchDescriptor>computeUnclosedBranchStackForBroadcastInputs(List<OptimizerNode.UnclosedBranchDescriptor> branchesSoFar)voidcomputeUnionOfInterestingPropertiesFromSuccessors()Computes all the interesting properties that are relevant to this node.abstract List<PlanNode>getAlternativePlans(CostEstimator estimator)Computes the plan alternatives for this node, an implicitly for all nodes that are children of this node.protected List<OptimizerNode.UnclosedBranchDescriptor>getBranchesForParent(DagConnection toParent)List<String>getBroadcastConnectionNames()Return the list of names associated with broadcast inputs for this node.List<DagConnection>getBroadcastConnections()Return the list of inputs associated with broadcast variables for this node.Set<OptimizerNode>getClosedBranchingNodes()intgetCostWeight()Iterable<DumpableConnection<OptimizerNode>>getDumpableInputs()floatgetEstimatedAvgWidthPerOutputRecord()Gets the estimated number of bytes per record.longgetEstimatedNumRecords()Gets the estimated number of records in the output of this node.longgetEstimatedOutputSize()Gets the estimated output size from this node.intgetId()Gets the ID of this node.abstract List<DagConnection>getIncomingConnections()Gets all incoming connections of this node.InterestingPropertiesgetInterestingProperties()Gets the properties that are interesting for this node to produce.intgetMaxDepth()longgetMinimalMemoryAcrossAllSubTasks()Gets the amount of memory that all subtasks of this task have jointly available.List<OptimizerNode.UnclosedBranchDescriptor>getOpenBranches()org.apache.flink.api.common.operators.Operator<?>getOperator()Gets the operator represented by this optimizer node.abstract StringgetOperatorName()Gets the name of this node, which is the name of the function/operator, or data source / data sink.OptimizerNodegetOptimizerNode()List<DagConnection>getOutgoingConnections()The list of outgoing connections from this node to succeeding tasks.intgetParallelism()Gets the parallelism for the operator represented by this optimizer node.PlanNodegetPlanNode()Iterable<OptimizerNode>getPredecessors()Gets an iterator over the predecessors.abstract org.apache.flink.api.common.operators.SemanticPropertiesgetSemanticProperties()Set<org.apache.flink.api.common.operators.util.FieldSet>getUniqueFields()Gets the FieldSets which are unique in the output of the node.booleanhasUnclosedBranches()booleanhaveAllOutputConnectionInterestingProperties()Checks, if all outgoing connections have their interesting properties set from their target nodes.voididentifyDynamicPath(int costWeight)voidinitId(int id)Sets the ID of this node.booleanisBranching()Checks whether this node has branching output.booleanisOnDynamicPath()voidmarkAllOutgoingConnectionsAsPipelineBreaking()protected booleanmergeLists(List<OptimizerNode.UnclosedBranchDescriptor> child1open, List<OptimizerNode.UnclosedBranchDescriptor> child2open, List<OptimizerNode.UnclosedBranchDescriptor> result, boolean markJoinedBranchesAsPipelineBreaking)The node IDs are assigned in graph-traversal order (pre-order), hence, each list is sorted by ID in ascending order and all consecutive lists start with IDs in ascending order.protected voidprunePlanAlternatives(List<PlanNode> plans)protected voidprunePlanAlternativesWithCommonBranching(List<PlanNode> plans)protected voidreadStubAnnotations()Reads all stub annotations, i.e.protected voidreadUniqueFieldsAnnotation()protected voidremoveClosedBranches(List<OptimizerNode.UnclosedBranchDescriptor> openList)voidsetBroadcastInputs(Map<org.apache.flink.api.common.operators.Operator<?>,OptimizerNode> operatorToNode, org.apache.flink.api.common.ExecutionMode defaultExchangeMode)This function connects the operators that produce the broadcast inputs to this operator.voidsetEstimatedNumRecords(long estimatedNumRecords)voidsetEstimatedOutputSize(long estimatedOutputSize)abstract voidsetInput(Map<org.apache.flink.api.common.operators.Operator<?>,OptimizerNode> operatorToNode, org.apache.flink.api.common.ExecutionMode defaultExchangeMode)This function connects the predecessors to this operator.voidsetParallelism(int parallelism)Sets the parallelism for this optimizer node.StringtoString()
-
-
-
Field Detail
-
MAX_DYNAMIC_PATH_COST_WEIGHT
public static final int MAX_DYNAMIC_PATH_COST_WEIGHT
- See Also:
- Constant Field Values
-
openBranches
protected List<OptimizerNode.UnclosedBranchDescriptor> openBranches
-
closedBranchingNodes
protected Set<OptimizerNode> closedBranchingNodes
-
hereJoinedBranches
protected List<OptimizerNode> hereJoinedBranches
-
estimatedOutputSize
protected long estimatedOutputSize
-
estimatedNumRecords
protected long estimatedNumRecords
-
uniqueFields
protected Set<org.apache.flink.api.common.operators.util.FieldSet> uniqueFields
-
id
protected int id
-
costWeight
protected int costWeight
-
onDynamicPath
protected boolean onDynamicPath
-
-
Constructor Detail
-
OptimizerNode
public OptimizerNode(org.apache.flink.api.common.operators.Operator<?> op)
Creates a new optimizer node that represents the given program operator.- Parameters:
op- The operator that the node represents.
-
OptimizerNode
protected OptimizerNode(OptimizerNode toCopy)
-
-
Method Detail
-
getOperatorName
public abstract String getOperatorName()
Gets the name of this node, which is the name of the function/operator, or data source / data sink.- Returns:
- The node name.
-
setInput
public abstract void setInput(Map<org.apache.flink.api.common.operators.Operator<?>,OptimizerNode> operatorToNode, org.apache.flink.api.common.ExecutionMode defaultExchangeMode)
This function connects the predecessors to this operator.- Parameters:
operatorToNode- The map from program operators to optimizer nodes.defaultExchangeMode- The data exchange mode to use, if the operator does not specify one.
-
setBroadcastInputs
public void setBroadcastInputs(Map<org.apache.flink.api.common.operators.Operator<?>,OptimizerNode> operatorToNode, org.apache.flink.api.common.ExecutionMode defaultExchangeMode)
This function connects the operators that produce the broadcast inputs to this operator.- Parameters:
operatorToNode- The map from program operators to optimizer nodes.defaultExchangeMode- The data exchange mode to use, if the operator does not specify one.- Throws:
CompilerException
-
getIncomingConnections
public abstract List<DagConnection> getIncomingConnections()
Gets all incoming connections of this node. This method needs to be overridden by subclasses to return the children.- Returns:
- The list of incoming connections.
-
computeInterestingPropertiesForInputs
public abstract void computeInterestingPropertiesForInputs(CostEstimator estimator)
Tells the node to compute the interesting properties for its inputs. The interesting properties for the node itself must have been computed before. The node must then see how many of interesting properties it preserves and add its own.- Parameters:
estimator- TheCostEstimatorinstance to use for plan cost estimation.
-
computeUnclosedBranchStack
public abstract void computeUnclosedBranchStack()
This method causes the node to compute the description of open branches in its sub-plan. An open branch describes, that a (transitive) child node had multiple outputs, which have not all been re-joined in the sub-plan. This method needs to set theopenBranchesfield to a stack of unclosed branches, the latest one top. A branch is considered closed, if some later node sees all of the branching node's outputs, no matter if there have been more branches to different paths in the meantime.
-
computeUnclosedBranchStackForBroadcastInputs
protected List<OptimizerNode.UnclosedBranchDescriptor> computeUnclosedBranchStackForBroadcastInputs(List<OptimizerNode.UnclosedBranchDescriptor> branchesSoFar)
-
getAlternativePlans
public abstract List<PlanNode> getAlternativePlans(CostEstimator estimator)
Computes the plan alternatives for this node, an implicitly for all nodes that are children of this node. This method must determine for each alternative the global and local properties and the costs. This method may recursively callgetAlternatives()on its children to get their plan alternatives, and build its own alternatives on top of those.- Parameters:
estimator- The cost estimator used to estimate the costs of each plan alternative.- Returns:
- A list containing all plan alternatives.
-
accept
public abstract void accept(org.apache.flink.util.Visitor<OptimizerNode> visitor)
This method implements the visit of a depth-first graph traversing visitor. Implementers must first call thepreVisit()method, then hand the visitor to their children, and finally call thepostVisit()method.- Specified by:
acceptin interfaceorg.apache.flink.util.Visitable<OptimizerNode>- Parameters:
visitor- The graph traversing visitor.- See Also:
Visitable.accept(org.apache.flink.util.Visitor)
-
getSemanticProperties
public abstract org.apache.flink.api.common.operators.SemanticProperties getSemanticProperties()
-
getPredecessors
public Iterable<OptimizerNode> getPredecessors()
Description copied from interface:DumpableNodeGets an iterator over the predecessors.- Specified by:
getPredecessorsin interfaceDumpableNode<OptimizerNode>- Returns:
- An iterator over the predecessors.
-
getId
public int getId()
Gets the ID of this node. If the id has not yet been set, this method returns -1;- Returns:
- This node's id, or -1, if not yet set.
-
initId
public void initId(int id)
Sets the ID of this node.- Parameters:
id- The id for this node.
-
addBroadcastConnection
public void addBroadcastConnection(String name, DagConnection broadcastConnection)
Adds the broadcast connection identified by the givennameto this node.- Parameters:
broadcastConnection- The connection to add.
-
getBroadcastConnectionNames
public List<String> getBroadcastConnectionNames()
Return the list of names associated with broadcast inputs for this node.
-
getBroadcastConnections
public List<DagConnection> getBroadcastConnections()
Return the list of inputs associated with broadcast variables for this node.
-
addOutgoingConnection
public void addOutgoingConnection(DagConnection connection)
Adds a new outgoing connection to this node.- Parameters:
connection- The connection to add.
-
getOutgoingConnections
public List<DagConnection> getOutgoingConnections()
The list of outgoing connections from this node to succeeding tasks.- Returns:
- The list of outgoing connections.
-
getOperator
public org.apache.flink.api.common.operators.Operator<?> getOperator()
Gets the operator represented by this optimizer node.- Returns:
- This node's operator.
-
getParallelism
public int getParallelism()
Gets the parallelism for the operator represented by this optimizer node. The parallelism denotes how many parallel instances of the operator on will be spawned during the execution. If this value isExecutionConfig.PARALLELISM_DEFAULTthen the system will take the default number of parallel instances.- Returns:
- The parallelism of the operator.
-
setParallelism
public void setParallelism(int parallelism)
Sets the parallelism for this optimizer node. The parallelism denotes how many parallel instances of the operator will be spawned during the execution.- Parameters:
parallelism- The parallelism to set. If this value isExecutionConfig.PARALLELISM_DEFAULTthen the system will take the default number of parallel instances.- Throws:
IllegalArgumentException- If the parallelism is smaller than one.
-
getMinimalMemoryAcrossAllSubTasks
public long getMinimalMemoryAcrossAllSubTasks()
Gets the amount of memory that all subtasks of this task have jointly available.- Returns:
- The total amount of memory across all subtasks.
-
isOnDynamicPath
public boolean isOnDynamicPath()
-
identifyDynamicPath
public void identifyDynamicPath(int costWeight)
-
getCostWeight
public int getCostWeight()
-
getMaxDepth
public int getMaxDepth()
-
getInterestingProperties
public InterestingProperties getInterestingProperties()
Gets the properties that are interesting for this node to produce.- Returns:
- The interesting properties for this node, or null, if not yet computed.
-
getEstimatedOutputSize
public long getEstimatedOutputSize()
Description copied from interface:EstimateProviderGets the estimated output size from this node.- Specified by:
getEstimatedOutputSizein interfaceEstimateProvider- Returns:
- The estimated output size.
-
getEstimatedNumRecords
public long getEstimatedNumRecords()
Description copied from interface:EstimateProviderGets the estimated number of records in the output of this node.- Specified by:
getEstimatedNumRecordsin interfaceEstimateProvider- Returns:
- The estimated number of records.
-
setEstimatedOutputSize
public void setEstimatedOutputSize(long estimatedOutputSize)
-
setEstimatedNumRecords
public void setEstimatedNumRecords(long estimatedNumRecords)
-
getEstimatedAvgWidthPerOutputRecord
public float getEstimatedAvgWidthPerOutputRecord()
Description copied from interface:EstimateProviderGets the estimated number of bytes per record.- Specified by:
getEstimatedAvgWidthPerOutputRecordin interfaceEstimateProvider- Returns:
- The estimated number of bytes per record.
-
isBranching
public boolean isBranching()
Checks whether this node has branching output. A node's output is branched, if it has more than one output connection.- Returns:
- True, if the node's output branches. False otherwise.
-
markAllOutgoingConnectionsAsPipelineBreaking
public void markAllOutgoingConnectionsAsPipelineBreaking()
-
haveAllOutputConnectionInterestingProperties
public boolean haveAllOutputConnectionInterestingProperties()
Checks, if all outgoing connections have their interesting properties set from their target nodes.- Returns:
- True, if on all outgoing connections, the interesting properties are set. False otherwise.
-
computeUnionOfInterestingPropertiesFromSuccessors
public void computeUnionOfInterestingPropertiesFromSuccessors()
Computes all the interesting properties that are relevant to this node. The interesting properties are a union of the interesting properties on each outgoing connection. However, if two interesting properties on the outgoing connections overlap, the interesting properties will occur only once in this set. For that, this method deduplicates and merges the interesting properties. This method returns copies of the original interesting properties objects and leaves the original objects, contained by the connections, unchanged.
-
clearInterestingProperties
public void clearInterestingProperties()
-
computeOutputEstimates
public void computeOutputEstimates(DataStatistics statistics)
Causes this node to compute its output estimates (such as number of rows, size in bytes) based on the inputs and the compiler hints. The compiler hints are instantiated with conservative default values which are used if no other values are provided. Nodes may access the statistics to determine relevant information.- Parameters:
statistics- The statistics object which may be accessed to get statistical information. The parameter may be null, if no statistics are available.
-
computeOperatorSpecificDefaultEstimates
protected abstract void computeOperatorSpecificDefaultEstimates(DataStatistics statistics)
-
readStubAnnotations
protected void readStubAnnotations()
Reads all stub annotations, i.e. which fields remain constant, what cardinality bounds the functions have, which fields remain unique.
-
readUniqueFieldsAnnotation
protected void readUniqueFieldsAnnotation()
-
getUniqueFields
public Set<org.apache.flink.api.common.operators.util.FieldSet> getUniqueFields()
Gets the FieldSets which are unique in the output of the node.
-
prunePlanAlternativesWithCommonBranching
protected void prunePlanAlternativesWithCommonBranching(List<PlanNode> plans)
-
hasUnclosedBranches
public boolean hasUnclosedBranches()
-
getClosedBranchingNodes
public Set<OptimizerNode> getClosedBranchingNodes()
-
getOpenBranches
public List<OptimizerNode.UnclosedBranchDescriptor> getOpenBranches()
-
getBranchesForParent
protected List<OptimizerNode.UnclosedBranchDescriptor> getBranchesForParent(DagConnection toParent)
-
removeClosedBranches
protected void removeClosedBranches(List<OptimizerNode.UnclosedBranchDescriptor> openList)
-
addClosedBranches
protected void addClosedBranches(Set<OptimizerNode> alreadyClosed)
-
addClosedBranch
protected void addClosedBranch(OptimizerNode alreadyClosed)
-
areBranchCompatible
protected boolean areBranchCompatible(PlanNode plan1, PlanNode plan2)
Checks whether to candidate plans for the sub-plan of this node are comparable. The two alternative plans are comparable, ifa) There is no branch in the sub-plan of this node b) Both candidates have the same candidate as the child at the last open branch.
- Parameters:
plan1- The root node of the first candidate plan.plan2- The root node of the second candidate plan.- Returns:
- True if the nodes are branch compatible in the inputs.
-
mergeLists
protected final boolean mergeLists(List<OptimizerNode.UnclosedBranchDescriptor> child1open, List<OptimizerNode.UnclosedBranchDescriptor> child2open, List<OptimizerNode.UnclosedBranchDescriptor> result, boolean markJoinedBranchesAsPipelineBreaking)
The node IDs are assigned in graph-traversal order (pre-order), hence, each list is sorted by ID in ascending order and all consecutive lists start with IDs in ascending order.- Parameters:
markJoinedBranchesAsPipelineBreaking- True, if the
-
getOptimizerNode
public OptimizerNode getOptimizerNode()
- Specified by:
getOptimizerNodein interfaceDumpableNode<OptimizerNode>
-
getPlanNode
public PlanNode getPlanNode()
- Specified by:
getPlanNodein interfaceDumpableNode<OptimizerNode>
-
getDumpableInputs
public Iterable<DumpableConnection<OptimizerNode>> getDumpableInputs()
- Specified by:
getDumpableInputsin interfaceDumpableNode<OptimizerNode>
-
-