Class StreamTask<OUT,OP extends StreamOperator<OUT>>
- java.lang.Object
-
- org.apache.flink.streaming.runtime.tasks.StreamTask<OUT,OP>
-
- Type Parameters:
OUT-OP-
- All Implemented Interfaces:
org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask,org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask,org.apache.flink.runtime.jobgraph.tasks.TaskInvokable,org.apache.flink.runtime.taskmanager.AsyncExceptionHandler,ContainingTaskDetails
- Direct Known Subclasses:
AbstractTwoInputStreamTask,MultipleInputStreamTask,OneInputStreamTask,SourceOperatorStreamTask,SourceStreamTask
@Internal public abstract class StreamTask<OUT,OP extends StreamOperator<OUT>> extends Object implements org.apache.flink.runtime.jobgraph.tasks.TaskInvokable, org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask, org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask, org.apache.flink.runtime.taskmanager.AsyncExceptionHandler, ContainingTaskDetails
Base class for all streaming tasks. A task is the unit of local processing that is deployed and executed by the TaskManagers. Each task runs one or moreStreamOperators which form the Task's operator chain. Operators that are chained together execute synchronously in the same thread and hence on the same stream partition. A common case for these chains are successive map/flatmap/filter tasks.The task chain contains one "head" operator and multiple chained operators. The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, as well as for sources, iteration heads and iteration tails.
The Task class deals with the setup of the streams read by the head operator, and the streams produced by the operators at the ends of the operator chain. Note that the chain may fork and thus have multiple ends.
The life cycle of the task is set up as follows:
-- setInitialState -> provides state of all operators in the chain -- invoke() | +----> Create basic utils (config, etc) and load the chain of operators +----> operators.setup() +----> task specific init() +----> initialize-operator-states() +----> open-operators() +----> run() +----> finish-operators() +----> close-operators() +----> common cleanup +----> task specific cleanup()The
StreamTaskhas a lock object calledlock. All calls to methods on aStreamOperatormust be synchronized on this lock object to ensure that no methods are called concurrently.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interfaceStreamTask.CanEmitBatchOfRecordsCheckerCheck whether records can be emitted in batch.
-
Field Summary
Fields Modifier and Type Field Description protected org.apache.flink.runtime.state.CheckpointStoragecheckpointStorageOur checkpoint storage.protected StreamConfigconfigurationThe configuration of this streaming task.protected StreamInputProcessorinputProcessorThe input processor.protected static org.slf4j.LoggerLOGThe logger used by the StreamTask and its subclasses.protected MailboxProcessormailboxProcessorprotected OPmainOperatorthe main operator that consumes the input streams of this task.protected OperatorChain<OUT,OP>operatorChainThe chain of operators executed by this task.protected org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate<org.apache.flink.runtime.plugable.SerializationDelegate<StreamRecord<OUT>>>recordWriterprotected org.apache.flink.runtime.state.StateBackendstateBackendOur state backend.protected TimerServicesystemTimerServiceIn contrast totimerServicewe should not register any user timers here.protected TimerServicetimerServiceThe internalTimerServiceused to define the current processing time (default =System.currentTimeMillis()) and register timers for tasks to be executed in the future.static ThreadGroupTRIGGER_THREAD_GROUPThe thread group that holds all trigger timer threads.
-
Constructor Summary
Constructors Modifier Constructor Description protectedStreamTask(org.apache.flink.runtime.execution.Environment env)Constructor for initialization, possibly with initial state (recovery / savepoint / etc).protectedStreamTask(org.apache.flink.runtime.execution.Environment env, TimerService timerService)Constructor for initialization, possibly with initial state (recovery / savepoint / etc).protectedStreamTask(org.apache.flink.runtime.execution.Environment environment, TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler)protectedStreamTask(org.apache.flink.runtime.execution.Environment environment, TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor)Constructor for initialization, possibly with initial state (recovery / savepoint / etc).protectedStreamTask(org.apache.flink.runtime.execution.Environment environment, TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor, TaskMailbox mailbox)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description voidabortCheckpointOnBarrier(long checkpointId, org.apache.flink.runtime.checkpoint.CheckpointException cause)protected voidadvanceToEndOfEventTime()Emits theMAX_WATERMARKso that all registered timers are fired.protected voidafterInvoke()voidcancel()protected voidcancelTask()voidcleanUp(Throwable throwable)protected voidcleanUpInternal()static <OUT> org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate<org.apache.flink.runtime.plugable.SerializationDelegate<StreamRecord<OUT>>>createRecordWriterDelegate(StreamConfig configuration, org.apache.flink.runtime.execution.Environment environment)StreamTaskStateInitializercreateStreamTaskStateInitializer(org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder initializationMetrics)protected voiddeclineCheckpoint(long checkpointId)voiddispatchOperatorEvent(org.apache.flink.runtime.jobgraph.OperatorID operator, org.apache.flink.util.SerializedValue<org.apache.flink.runtime.operators.coordination.OperatorEvent> event)protected voidendData(org.apache.flink.runtime.io.network.api.StopMode mode)protected voidfinalize()The finalize method shuts down the timer.protected longgetAsyncCheckpointStartDelayNanos()ExecutorServicegetAsyncOperationsThreadPool()org.apache.flink.core.fs.CloseableRegistrygetCancelables()StreamTask.CanEmitBatchOfRecordsCheckergetCanEmitBatchOfRecords()protected Optional<CheckpointBarrierHandler>getCheckpointBarrierHandler()Acquires the optionalCheckpointBarrierHandlerassociated with this stream task.org.apache.flink.runtime.state.CheckpointStorageWorkerViewgetCheckpointStorage()protected CompletableFuture<Void>getCompletionFuture()StreamConfiggetConfiguration()org.apache.flink.runtime.execution.EnvironmentgetEnvironment()MailboxExecutorFactorygetMailboxExecutorFactory()StringgetName()Gets the name of the task, in the form "taskname (2/5)".ProcessingTimeServiceFactorygetProcessingTimeServiceFactory()voidhandleAsyncException(String message, Throwable exception)Handles an exception thrown by another thread (e.g. a TriggerTask), other than the one executing the main task by failing the task entirely.protected abstract voidinit()voidinvoke()booleanisCanceled()booleanisFailing()booleanisMailboxLoopRunning()booleanisRunning()booleanisUsingNonBlockingInput()voidmaybeInterruptOnCancel(Thread toInterrupt, String taskName, Long timeout)Future<Void>notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId)Future<Void>notifyCheckpointCompleteAsync(long checkpointId)Future<Void>notifyCheckpointSubsumedAsync(long checkpointId)protected voidnotifyEndOfData()protected voidprocessInput(MailboxDefaultAction.Controller controller)This method implements the default action of the task (e.g. processing one event from the input).voidrestore()voidrunMailboxLoop()booleanrunMailboxStep()protected voidsetSynchronousSavepoint(long checkpointId)protected org.apache.flink.metrics.CountersetupNumRecordsInCounter(StreamOperator streamOperator)StringtoString()CompletableFuture<Boolean>triggerCheckpointAsync(org.apache.flink.runtime.checkpoint.CheckpointMetaData checkpointMetaData, org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions)voidtriggerCheckpointOnBarrier(org.apache.flink.runtime.checkpoint.CheckpointMetaData checkpointMetaData, org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions, org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder checkpointMetrics)-
Methods inherited from class java.lang.Object
clone, equals, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.flink.streaming.runtime.tasks.ContainingTaskDetails
getExecutionConfig, getIndexInSubtaskGroup, getJobConfiguration, getUserCodeClassLoader
-
-
-
-
Field Detail
-
TRIGGER_THREAD_GROUP
public static final ThreadGroup TRIGGER_THREAD_GROUP
The thread group that holds all trigger timer threads.
-
LOG
protected static final org.slf4j.Logger LOG
The logger used by the StreamTask and its subclasses.
-
inputProcessor
@Nullable protected StreamInputProcessor inputProcessor
The input processor. Initialized ininit()method.
-
mainOperator
protected OP extends StreamOperator<OUT> mainOperator
the main operator that consumes the input streams of this task.
-
operatorChain
protected OperatorChain<OUT,OP extends StreamOperator<OUT>> operatorChain
The chain of operators executed by this task.
-
configuration
protected final StreamConfig configuration
The configuration of this streaming task.
-
stateBackend
protected final org.apache.flink.runtime.state.StateBackend stateBackend
Our state backend. We use this to create a keyed state backend.
-
checkpointStorage
protected final org.apache.flink.runtime.state.CheckpointStorage checkpointStorage
Our checkpoint storage. We use this to create checkpoint streams.
-
timerService
protected final TimerService timerService
The internalTimerServiceused to define the current processing time (default =System.currentTimeMillis()) and register timers for tasks to be executed in the future.
-
systemTimerService
protected final TimerService systemTimerService
In contrast totimerServicewe should not register any user timers here. It should be used only for system level timers.
-
recordWriter
protected final org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate<org.apache.flink.runtime.plugable.SerializationDelegate<StreamRecord<OUT>>> recordWriter
-
mailboxProcessor
protected final MailboxProcessor mailboxProcessor
-
-
Constructor Detail
-
StreamTask
protected StreamTask(org.apache.flink.runtime.execution.Environment env) throws ExceptionConstructor for initialization, possibly with initial state (recovery / savepoint / etc).- Parameters:
env- The task environment for this task.- Throws:
Exception
-
StreamTask
protected StreamTask(org.apache.flink.runtime.execution.Environment env, @Nullable TimerService timerService) throws ExceptionConstructor for initialization, possibly with initial state (recovery / savepoint / etc).- Parameters:
env- The task environment for this task.timerService- Optionally, a specific timer service to use.- Throws:
Exception
-
StreamTask
protected StreamTask(org.apache.flink.runtime.execution.Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception- Throws:
Exception
-
StreamTask
protected StreamTask(org.apache.flink.runtime.execution.Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor) throws ExceptionConstructor for initialization, possibly with initial state (recovery / savepoint / etc).This constructor accepts a special
TimerService. By default (and if null is passes for the timer service) aDefaultTimerServicewill be used.- Parameters:
environment- The task environment for this task.timerService- Optionally, a specific timer service to use.uncaughtExceptionHandler- to handle uncaught exceptions in the async operations thread poolactionExecutor- a mean to wrap all actions performed by this task thread. Currently, only SynchronizedActionExecutor can be used to preserve locking semantics.- Throws:
Exception
-
StreamTask
protected StreamTask(org.apache.flink.runtime.execution.Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor, TaskMailbox mailbox) throws Exception- Throws:
Exception
-
-
Method Detail
-
processInput
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception
This method implements the default action of the task (e.g. processing one event from the input). Implementations should (in general) be non-blocking.- Parameters:
controller- controller object for collaborative interaction between the action and the stream task.- Throws:
Exception- on any problems in the action.
-
endData
protected void endData(org.apache.flink.runtime.io.network.api.StopMode mode) throws Exception- Throws:
Exception
-
notifyEndOfData
protected void notifyEndOfData()
-
setSynchronousSavepoint
protected void setSynchronousSavepoint(long checkpointId)
-
advanceToEndOfEventTime
protected void advanceToEndOfEventTime() throws ExceptionEmits theMAX_WATERMARKso that all registered timers are fired.This is used by the source task when the job is
TERMINATED. In the case, we want all the timers registered throughout the pipeline to fire and the related state (e.g. windows) to be flushed.For tasks other than the source task, this method does nothing.
- Throws:
Exception
-
createStreamTaskStateInitializer
public StreamTaskStateInitializer createStreamTaskStateInitializer(org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder initializationMetrics)
-
setupNumRecordsInCounter
protected org.apache.flink.metrics.Counter setupNumRecordsInCounter(StreamOperator streamOperator)
-
restore
public final void restore() throws Exception- Specified by:
restorein interfaceorg.apache.flink.runtime.jobgraph.tasks.TaskInvokable- Throws:
Exception
-
invoke
public final void invoke() throws Exception- Specified by:
invokein interfaceorg.apache.flink.runtime.jobgraph.tasks.TaskInvokable- Throws:
Exception
-
runMailboxStep
@VisibleForTesting public boolean runMailboxStep() throws Exception- Throws:
Exception
-
isMailboxLoopRunning
@VisibleForTesting public boolean isMailboxLoopRunning()
-
cleanUp
public final void cleanUp(Throwable throwable) throws Exception
- Specified by:
cleanUpin interfaceorg.apache.flink.runtime.jobgraph.tasks.TaskInvokable- Throws:
Exception
-
getCompletionFuture
protected CompletableFuture<Void> getCompletionFuture()
-
cancel
public final void cancel() throws Exception- Specified by:
cancelin interfaceorg.apache.flink.runtime.jobgraph.tasks.TaskInvokable- Throws:
Exception
-
getMailboxExecutorFactory
public MailboxExecutorFactory getMailboxExecutorFactory()
-
getCanEmitBatchOfRecords
public StreamTask.CanEmitBatchOfRecordsChecker getCanEmitBatchOfRecords()
-
isRunning
public final boolean isRunning()
-
isCanceled
public final boolean isCanceled()
-
isFailing
public final boolean isFailing()
-
finalize
protected void finalize() throws ThrowableThe finalize method shuts down the timer. This is a fail-safe shutdown, in case the original shutdown method was never called.This should not be relied upon! It will cause shutdown to happen much later than if manual shutdown is attempted, and cause threads to linger for longer than needed.
-
getName
public final String getName()
Gets the name of the task, in the form "taskname (2/5)".- Returns:
- The name of the task.
-
getCheckpointStorage
public org.apache.flink.runtime.state.CheckpointStorageWorkerView getCheckpointStorage()
-
getConfiguration
public StreamConfig getConfiguration()
-
triggerCheckpointAsync
public CompletableFuture<Boolean> triggerCheckpointAsync(org.apache.flink.runtime.checkpoint.CheckpointMetaData checkpointMetaData, org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions)
- Specified by:
triggerCheckpointAsyncin interfaceorg.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
-
getCheckpointBarrierHandler
protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler()
Acquires the optionalCheckpointBarrierHandlerassociated with this stream task. TheCheckpointBarrierHandlershould exist if the task has data inputs and requires to align the barriers.
-
triggerCheckpointOnBarrier
public void triggerCheckpointOnBarrier(org.apache.flink.runtime.checkpoint.CheckpointMetaData checkpointMetaData, org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions, org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder checkpointMetrics) throws IOException- Specified by:
triggerCheckpointOnBarrierin interfaceorg.apache.flink.runtime.jobgraph.tasks.CheckpointableTask- Throws:
IOException
-
abortCheckpointOnBarrier
public void abortCheckpointOnBarrier(long checkpointId, org.apache.flink.runtime.checkpoint.CheckpointException cause) throws IOException- Specified by:
abortCheckpointOnBarrierin interfaceorg.apache.flink.runtime.jobgraph.tasks.CheckpointableTask- Throws:
IOException
-
declineCheckpoint
protected void declineCheckpoint(long checkpointId)
-
getAsyncOperationsThreadPool
public final ExecutorService getAsyncOperationsThreadPool()
-
notifyCheckpointCompleteAsync
public Future<Void> notifyCheckpointCompleteAsync(long checkpointId)
- Specified by:
notifyCheckpointCompleteAsyncin interfaceorg.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
-
notifyCheckpointAbortAsync
public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId)
- Specified by:
notifyCheckpointAbortAsyncin interfaceorg.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
-
notifyCheckpointSubsumedAsync
public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId)
- Specified by:
notifyCheckpointSubsumedAsyncin interfaceorg.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
-
dispatchOperatorEvent
public void dispatchOperatorEvent(org.apache.flink.runtime.jobgraph.OperatorID operator, org.apache.flink.util.SerializedValue<org.apache.flink.runtime.operators.coordination.OperatorEvent> event) throws org.apache.flink.util.FlinkException- Specified by:
dispatchOperatorEventin interfaceorg.apache.flink.runtime.jobgraph.tasks.CoordinatedTask- Throws:
org.apache.flink.util.FlinkException
-
getProcessingTimeServiceFactory
public ProcessingTimeServiceFactory getProcessingTimeServiceFactory()
-
handleAsyncException
public void handleAsyncException(String message, Throwable exception)
Handles an exception thrown by another thread (e.g. a TriggerTask), other than the one executing the main task by failing the task entirely.In more detail, it marks task execution failed for an external reason (a reason other than the task code itself throwing an exception). If the task is already in a terminal state (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. Otherwise it sets the state to FAILED, and, if the invokable code is running, starts an asynchronous thread that aborts that code.
This method never blocks.
- Specified by:
handleAsyncExceptionin interfaceorg.apache.flink.runtime.taskmanager.AsyncExceptionHandler
-
getCancelables
public final org.apache.flink.core.fs.CloseableRegistry getCancelables()
-
createRecordWriterDelegate
@VisibleForTesting public static <OUT> org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate<org.apache.flink.runtime.plugable.SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate(StreamConfig configuration, org.apache.flink.runtime.execution.Environment environment)
-
getAsyncCheckpointStartDelayNanos
protected long getAsyncCheckpointStartDelayNanos()
-
isUsingNonBlockingInput
public boolean isUsingNonBlockingInput()
- Specified by:
isUsingNonBlockingInputin interfaceorg.apache.flink.runtime.jobgraph.tasks.TaskInvokable
-
maybeInterruptOnCancel
public void maybeInterruptOnCancel(Thread toInterrupt, @Nullable String taskName, @Nullable Long timeout)
- Specified by:
maybeInterruptOnCancelin interfaceorg.apache.flink.runtime.jobgraph.tasks.TaskInvokable
-
getEnvironment
public final org.apache.flink.runtime.execution.Environment getEnvironment()
-
-