Class StreamTask<OUT,​OP extends StreamOperator<OUT>>

  • Type Parameters:
    OUT -
    OP -
    All Implemented Interfaces:
    org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask, org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask, org.apache.flink.runtime.jobgraph.tasks.TaskInvokable, org.apache.flink.runtime.taskmanager.AsyncExceptionHandler, ContainingTaskDetails
    Direct Known Subclasses:
    AbstractTwoInputStreamTask, MultipleInputStreamTask, OneInputStreamTask, SourceOperatorStreamTask, SourceStreamTask

    @Internal
    public abstract class StreamTask<OUT,​OP extends StreamOperator<OUT>>
    extends Object
    implements org.apache.flink.runtime.jobgraph.tasks.TaskInvokable, org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask, org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask, org.apache.flink.runtime.taskmanager.AsyncExceptionHandler, ContainingTaskDetails
    Base class for all streaming tasks. A task is the unit of local processing that is deployed and executed by the TaskManagers. Each task runs one or more StreamOperators which form the Task's operator chain. Operators that are chained together execute synchronously in the same thread and hence on the same stream partition. A common case for these chains are successive map/flatmap/filter tasks.

    The task chain contains one "head" operator and multiple chained operators. The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, as well as for sources, iteration heads and iteration tails.

    The Task class deals with the setup of the streams read by the head operator, and the streams produced by the operators at the ends of the operator chain. Note that the chain may fork and thus have multiple ends.

    The life cycle of the task is set up as follows:

    
     -- setInitialState -> provides state of all operators in the chain
    
     -- invoke()
           |
           +----> Create basic utils (config, etc) and load the chain of operators
           +----> operators.setup()
           +----> task specific init()
           +----> initialize-operator-states()
           +----> open-operators()
           +----> run()
           +----> finish-operators()
           +----> close-operators()
           +----> common cleanup
           +----> task specific cleanup()
     

    The StreamTask has a lock object called lock. All calls to methods on a StreamOperator must be synchronized on this lock object to ensure that no methods are called concurrently.

    • Field Detail

      • TRIGGER_THREAD_GROUP

        public static final ThreadGroup TRIGGER_THREAD_GROUP
        The thread group that holds all trigger timer threads.
      • LOG

        protected static final org.slf4j.Logger LOG
        The logger used by the StreamTask and its subclasses.
      • mainOperator

        protected OP extends StreamOperator<OUT> mainOperator
        the main operator that consumes the input streams of this task.
      • configuration

        protected final StreamConfig configuration
        The configuration of this streaming task.
      • stateBackend

        protected final org.apache.flink.runtime.state.StateBackend stateBackend
        Our state backend. We use this to create a keyed state backend.
      • checkpointStorage

        protected final org.apache.flink.runtime.state.CheckpointStorage checkpointStorage
        Our checkpoint storage. We use this to create checkpoint streams.
      • timerService

        protected final TimerService timerService
        The internal TimerService used to define the current processing time (default = System.currentTimeMillis()) and register timers for tasks to be executed in the future.
      • systemTimerService

        protected final TimerService systemTimerService
        In contrast to timerService we should not register any user timers here. It should be used only for system level timers.
      • recordWriter

        protected final org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate<org.apache.flink.runtime.plugable.SerializationDelegate<StreamRecord<OUT>>> recordWriter
    • Constructor Detail

      • StreamTask

        protected StreamTask​(org.apache.flink.runtime.execution.Environment env)
                      throws Exception
        Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
        Parameters:
        env - The task environment for this task.
        Throws:
        Exception
      • StreamTask

        protected StreamTask​(org.apache.flink.runtime.execution.Environment env,
                             @Nullable
                             TimerService timerService)
                      throws Exception
        Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
        Parameters:
        env - The task environment for this task.
        timerService - Optionally, a specific timer service to use.
        Throws:
        Exception
      • StreamTask

        protected StreamTask​(org.apache.flink.runtime.execution.Environment environment,
                             @Nullable
                             TimerService timerService,
                             Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
                             StreamTaskActionExecutor actionExecutor)
                      throws Exception
        Constructor for initialization, possibly with initial state (recovery / savepoint / etc).

        This constructor accepts a special TimerService. By default (and if null is passes for the timer service) a DefaultTimerService will be used.

        Parameters:
        environment - The task environment for this task.
        timerService - Optionally, a specific timer service to use.
        uncaughtExceptionHandler - to handle uncaught exceptions in the async operations thread pool
        actionExecutor - a mean to wrap all actions performed by this task thread. Currently, only SynchronizedActionExecutor can be used to preserve locking semantics.
        Throws:
        Exception
    • Method Detail

      • processInput

        protected void processInput​(MailboxDefaultAction.Controller controller)
                             throws Exception
        This method implements the default action of the task (e.g. processing one event from the input). Implementations should (in general) be non-blocking.
        Parameters:
        controller - controller object for collaborative interaction between the action and the stream task.
        Throws:
        Exception - on any problems in the action.
      • endData

        protected void endData​(org.apache.flink.runtime.io.network.api.StopMode mode)
                        throws Exception
        Throws:
        Exception
      • notifyEndOfData

        protected void notifyEndOfData()
      • setSynchronousSavepoint

        protected void setSynchronousSavepoint​(long checkpointId)
      • advanceToEndOfEventTime

        protected void advanceToEndOfEventTime()
                                        throws Exception
        Emits the MAX_WATERMARK so that all registered timers are fired.

        This is used by the source task when the job is TERMINATED. In the case, we want all the timers registered throughout the pipeline to fire and the related state (e.g. windows) to be flushed.

        For tasks other than the source task, this method does nothing.

        Throws:
        Exception
      • createStreamTaskStateInitializer

        public StreamTaskStateInitializer createStreamTaskStateInitializer​(org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder initializationMetrics)
      • setupNumRecordsInCounter

        protected org.apache.flink.metrics.Counter setupNumRecordsInCounter​(StreamOperator streamOperator)
      • restore

        public final void restore()
                           throws Exception
        Specified by:
        restore in interface org.apache.flink.runtime.jobgraph.tasks.TaskInvokable
        Throws:
        Exception
      • invoke

        public final void invoke()
                          throws Exception
        Specified by:
        invoke in interface org.apache.flink.runtime.jobgraph.tasks.TaskInvokable
        Throws:
        Exception
      • runSingleMailboxLoop

        @VisibleForTesting
        public boolean runSingleMailboxLoop()
                                     throws Exception
        Throws:
        Exception
      • runMailboxStep

        @VisibleForTesting
        public boolean runMailboxStep()
                               throws Exception
        Throws:
        Exception
      • isMailboxLoopRunning

        @VisibleForTesting
        public boolean isMailboxLoopRunning()
      • cleanUp

        public final void cleanUp​(Throwable throwable)
                           throws Exception
        Specified by:
        cleanUp in interface org.apache.flink.runtime.jobgraph.tasks.TaskInvokable
        Throws:
        Exception
      • cancel

        public final void cancel()
                          throws Exception
        Specified by:
        cancel in interface org.apache.flink.runtime.jobgraph.tasks.TaskInvokable
        Throws:
        Exception
      • hasMail

        public boolean hasMail()
      • isRunning

        public final boolean isRunning()
      • isCanceled

        public final boolean isCanceled()
      • isFailing

        public final boolean isFailing()
      • finalize

        protected void finalize()
                         throws Throwable
        The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original shutdown method was never called.

        This should not be relied upon! It will cause shutdown to happen much later than if manual shutdown is attempted, and cause threads to linger for longer than needed.

        Overrides:
        finalize in class Object
        Throws:
        Throwable
      • getName

        public final String getName()
        Gets the name of the task, in the form "taskname (2/5)".
        Returns:
        The name of the task.
      • getCheckpointStorage

        public org.apache.flink.runtime.state.CheckpointStorageWorkerView getCheckpointStorage()
      • triggerCheckpointAsync

        public CompletableFuture<Boolean> triggerCheckpointAsync​(org.apache.flink.runtime.checkpoint.CheckpointMetaData checkpointMetaData,
                                                                 org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions)
        Specified by:
        triggerCheckpointAsync in interface org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
      • getCheckpointBarrierHandler

        protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler()
        Acquires the optional CheckpointBarrierHandler associated with this stream task. The CheckpointBarrierHandler should exist if the task has data inputs and requires to align the barriers.
      • triggerCheckpointOnBarrier

        public void triggerCheckpointOnBarrier​(org.apache.flink.runtime.checkpoint.CheckpointMetaData checkpointMetaData,
                                               org.apache.flink.runtime.checkpoint.CheckpointOptions checkpointOptions,
                                               org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder checkpointMetrics)
                                        throws IOException
        Specified by:
        triggerCheckpointOnBarrier in interface org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
        Throws:
        IOException
      • abortCheckpointOnBarrier

        public void abortCheckpointOnBarrier​(long checkpointId,
                                             org.apache.flink.runtime.checkpoint.CheckpointException cause)
                                      throws IOException
        Specified by:
        abortCheckpointOnBarrier in interface org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
        Throws:
        IOException
      • declineCheckpoint

        protected void declineCheckpoint​(long checkpointId)
      • getAsyncOperationsThreadPool

        public final ExecutorService getAsyncOperationsThreadPool()
      • notifyCheckpointCompleteAsync

        public Future<Void> notifyCheckpointCompleteAsync​(long checkpointId)
        Specified by:
        notifyCheckpointCompleteAsync in interface org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
      • notifyCheckpointAbortAsync

        public Future<Void> notifyCheckpointAbortAsync​(long checkpointId,
                                                       long latestCompletedCheckpointId)
        Specified by:
        notifyCheckpointAbortAsync in interface org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
      • notifyCheckpointSubsumedAsync

        public Future<Void> notifyCheckpointSubsumedAsync​(long checkpointId)
        Specified by:
        notifyCheckpointSubsumedAsync in interface org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
      • dispatchOperatorEvent

        public void dispatchOperatorEvent​(org.apache.flink.runtime.jobgraph.OperatorID operator,
                                          org.apache.flink.util.SerializedValue<org.apache.flink.runtime.operators.coordination.OperatorEvent> event)
                                   throws org.apache.flink.util.FlinkException
        Specified by:
        dispatchOperatorEvent in interface org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask
        Throws:
        org.apache.flink.util.FlinkException
      • handleAsyncException

        public void handleAsyncException​(String message,
                                         Throwable exception)
        Handles an exception thrown by another thread (e.g. a TriggerTask), other than the one executing the main task by failing the task entirely.

        In more detail, it marks task execution failed for an external reason (a reason other than the task code itself throwing an exception). If the task is already in a terminal state (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. Otherwise it sets the state to FAILED, and, if the invokable code is running, starts an asynchronous thread that aborts that code.

        This method never blocks.

        Specified by:
        handleAsyncException in interface org.apache.flink.runtime.taskmanager.AsyncExceptionHandler
      • getCancelables

        public final org.apache.flink.core.fs.CloseableRegistry getCancelables()
      • createRecordWriterDelegate

        @VisibleForTesting
        public static <OUT> org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate<org.apache.flink.runtime.plugable.SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate​(StreamConfig configuration,
                                                                                                                                                                                                       org.apache.flink.runtime.execution.Environment environment)
      • getAsyncCheckpointStartDelayNanos

        protected long getAsyncCheckpointStartDelayNanos()
      • isUsingNonBlockingInput

        public boolean isUsingNonBlockingInput()
        Specified by:
        isUsingNonBlockingInput in interface org.apache.flink.runtime.jobgraph.tasks.TaskInvokable
      • maybeInterruptOnCancel

        public void maybeInterruptOnCancel​(Thread toInterrupt,
                                           @Nullable
                                           String taskName,
                                           @Nullable
                                           Long timeout)
        Specified by:
        maybeInterruptOnCancel in interface org.apache.flink.runtime.jobgraph.tasks.TaskInvokable
      • getEnvironment

        public final org.apache.flink.runtime.execution.Environment getEnvironment()