Class MessageAcknowledgingSourceBase<Type,UId>
- java.lang.Object
-
- org.apache.flink.api.common.functions.AbstractRichFunction
-
- org.apache.flink.streaming.api.functions.source.RichSourceFunction<Type>
-
- org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase<Type,UId>
-
- Type Parameters:
Type- The type of the messages created by the source.UId- The type of unique IDs which may be used to acknowledge elements.
- All Implemented Interfaces:
Serializable,org.apache.flink.api.common.functions.Function,org.apache.flink.api.common.functions.RichFunction,org.apache.flink.api.common.state.CheckpointListener,CheckpointedFunction,SourceFunction<Type>
- Direct Known Subclasses:
MultipleIdsMessageAcknowledgingSourceBase
@Deprecated @PublicEvolving public abstract class MessageAcknowledgingSourceBase<Type,UId> extends RichSourceFunction<Type> implements CheckpointedFunction, org.apache.flink.api.common.state.CheckpointListener
Deprecated.This class is based on theSourceFunctionAPI, which is due to be removed. Use the newSourceAPI instead.Abstract base class for data sources that receive elements from a message queue and acknowledge them back by IDs.The mechanism for this source assumes that messages are identified by a unique ID. When messages are taken from the message queue, the message must not be dropped immediately, but must be retained until acknowledged. Messages that are not acknowledged within a certain time interval will be served again (to a different connection, established by the recovered source).
Note that this source can give no guarantees about message order in the case of failures, because messages that were retrieved but not yet acknowledged will be returned later again, after a set of messages that was not retrieved before the failure.
Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and acknowledged when the checkpoint is complete. That way, no message is acknowledged unless it is certain that it has been successfully processed throughout the topology and the updates to any state caused by that message are persistent.
All messages that are emitted and successfully processed by the streaming program will eventually be acknowledged. In corner cases, the source may receive certain IDs multiple times, if a failure occurs while acknowledging. To cope with this situation, an additional Set stores all processed IDs. IDs are only removed after they have been acknowledged.
A typical way to use this base in a source function is by implementing a run() method as follows:
NOTE: This source has a parallelism ofpublic void run(SourceContext<Type> ctx) throws Exception { while (running) { Message msg = queue.retrieve(); synchronized (ctx.getCheckpointLock()) { if (addId(msg.getMessageId())) { ctx.collect(msg.getMessageData()); } } } }1.- See Also:
- Serialized Form
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.flink.streaming.api.functions.source.SourceFunction
SourceFunction.SourceContext<T>
-
-
Field Summary
Fields Modifier and Type Field Description protected ArrayDeque<org.apache.flink.api.java.tuple.Tuple2<Long,Set<UId>>>pendingCheckpointsDeprecated.The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion.
-
Constructor Summary
Constructors Modifier Constructor Description protectedMessageAcknowledgingSourceBase(Class<UId> idClass)Deprecated.Creates a new MessageAcknowledgingSourceBase for IDs of the given type.protectedMessageAcknowledgingSourceBase(org.apache.flink.api.common.typeinfo.TypeInformation<UId> idTypeInfo)Deprecated.Creates a new MessageAcknowledgingSourceBase for IDs of the given type.
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Deprecated Methods Modifier and Type Method Description protected abstract voidacknowledgeIDs(long checkpointId, Set<UId> uIds)Deprecated.This method must be implemented to acknowledge the given set of IDs back to the message queue.protected booleanaddId(UId uid)Deprecated.Adds an ID to be stored with the current checkpoint.voidclose()Deprecated.voidinitializeState(org.apache.flink.runtime.state.FunctionInitializationContext context)Deprecated.This method is called when the parallel function instance is created during distributed execution.voidnotifyCheckpointAborted(long checkpointId)Deprecated.voidnotifyCheckpointComplete(long checkpointId)Deprecated.voidsnapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext context)Deprecated.This method is called when a snapshot for a checkpoint is requested.-
Methods inherited from class org.apache.flink.api.common.functions.AbstractRichFunction
getIterationRuntimeContext, getRuntimeContext, open, setRuntimeContext
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.streaming.api.functions.source.SourceFunction
cancel, run
-
-
-
-
Field Detail
-
pendingCheckpoints
protected transient ArrayDeque<org.apache.flink.api.java.tuple.Tuple2<Long,Set<UId>>> pendingCheckpoints
Deprecated.The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion.
-
-
Constructor Detail
-
MessageAcknowledgingSourceBase
protected MessageAcknowledgingSourceBase(Class<UId> idClass)
Deprecated.Creates a new MessageAcknowledgingSourceBase for IDs of the given type.- Parameters:
idClass- The class of the message ID type, used to create a serializer for the message IDs.
-
MessageAcknowledgingSourceBase
protected MessageAcknowledgingSourceBase(org.apache.flink.api.common.typeinfo.TypeInformation<UId> idTypeInfo)
Deprecated.Creates a new MessageAcknowledgingSourceBase for IDs of the given type.- Parameters:
idTypeInfo- The type information of the message ID type, used to create a serializer for the message IDs.
-
-
Method Detail
-
initializeState
public void initializeState(org.apache.flink.runtime.state.FunctionInitializationContext context) throws ExceptionDeprecated.Description copied from interface:CheckpointedFunctionThis method is called when the parallel function instance is created during distributed execution. Functions typically set up their state storing data structures in this method.- Specified by:
initializeStatein interfaceCheckpointedFunction- Parameters:
context- the context for initializing the operator- Throws:
Exception- Thrown, if state could not be created ot restored.
-
close
public void close() throws ExceptionDeprecated.- Specified by:
closein interfaceorg.apache.flink.api.common.functions.RichFunction- Overrides:
closein classorg.apache.flink.api.common.functions.AbstractRichFunction- Throws:
Exception
-
acknowledgeIDs
protected abstract void acknowledgeIDs(long checkpointId, Set<UId> uIds)Deprecated.This method must be implemented to acknowledge the given set of IDs back to the message queue.- Parameters:
uIds- The list od IDs to acknowledge.
-
addId
protected boolean addId(UId uid)
Deprecated.Adds an ID to be stored with the current checkpoint. In order to achieve exactly-once guarantees, implementing classes should only emit records with IDs for which this method return true.- Parameters:
uid- The ID to add.- Returns:
- True if the id has not been processed previously.
-
snapshotState
public void snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext context) throws ExceptionDeprecated.Description copied from interface:CheckpointedFunctionThis method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to ensure that all state is exposed by means previously offered throughFunctionInitializationContextwhen the Function was initialized, or offered now byFunctionSnapshotContextitself.- Specified by:
snapshotStatein interfaceCheckpointedFunction- Parameters:
context- the context for drawing a snapshot of the operator- Throws:
Exception- Thrown, if state could not be created ot restored.
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long checkpointId) throws ExceptionDeprecated.- Specified by:
notifyCheckpointCompletein interfaceorg.apache.flink.api.common.state.CheckpointListener- Throws:
Exception
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId)
Deprecated.- Specified by:
notifyCheckpointAbortedin interfaceorg.apache.flink.api.common.state.CheckpointListener
-
-