Class TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT>
- java.lang.Object
-
- org.apache.flink.api.common.functions.AbstractRichFunction
-
- org.apache.flink.streaming.api.functions.sink.RichSinkFunction<IN>
-
- org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT>
-
- Type Parameters:
IN- Input type forSinkFunction.TXN- Transaction to store all of the information required to handle a transaction.CONTEXT- Context that will be shared across all invocations for the givenTwoPhaseCommitSinkFunctioninstance. Context is created once
- All Implemented Interfaces:
Serializable,org.apache.flink.api.common.functions.Function,org.apache.flink.api.common.functions.RichFunction,org.apache.flink.api.common.state.CheckpointListener,CheckpointedFunction,SinkFunction<IN>
@PublicEvolving public abstract class TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> extends RichSinkFunction<IN> implements CheckpointedFunction, org.apache.flink.api.common.state.CheckpointListener
This is a recommended base class for all of theSinkFunctionthat intend to implement exactly-once semantic. It does that by implementing two phase commit algorithm on top of theCheckpointedFunctionandCheckpointListener. User should provide customTXN(transaction handle) and implement abstract methods handling this transaction handle.- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classTwoPhaseCommitSinkFunction.State<TXN,CONTEXT>State POJO class coupling pendingTransaction, context and pendingCommitTransactions.static classTwoPhaseCommitSinkFunction.StateSerializer<TXN,CONTEXT>CustomTypeSerializerfor the sink state.static classTwoPhaseCommitSinkFunction.StateSerializerSnapshot<TXN,CONTEXT>Snapshot for theTwoPhaseCommitSinkFunction.StateSerializer.static classTwoPhaseCommitSinkFunction.TransactionHolder<TXN>Adds metadata (currently only the start time of the transaction) to the transaction object.-
Nested classes/interfaces inherited from interface org.apache.flink.streaming.api.functions.sink.SinkFunction
SinkFunction.Context
-
-
Field Summary
Fields Modifier and Type Field Description protected LinkedHashMap<Long,TwoPhaseCommitSinkFunction.TransactionHolder<TXN>>pendingCommitTransactionsprotected org.apache.flink.api.common.state.ListState<TwoPhaseCommitSinkFunction.State<TXN,CONTEXT>>stateprotected Optional<CONTEXT>userContext
-
Constructor Summary
Constructors Constructor Description TwoPhaseCommitSinkFunction(org.apache.flink.api.common.typeutils.TypeSerializer<TXN> transactionSerializer, org.apache.flink.api.common.typeutils.TypeSerializer<CONTEXT> contextSerializer)Use defaultListStateDescriptorfor internal state serialization.
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected abstract voidabort(TXN transaction)Abort a transaction.protected abstract TXNbeginTransaction()Method that starts a new transaction.voidclose()protected abstract voidcommit(TXN transaction)Commit a pre-committed transaction.protected TXNcurrentTransaction()protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT>enableTransactionTimeoutWarnings(double warningRatio)Enables logging of warnings if a transaction's elapsed time reaches a specified ratio of thetransactionTimeout.voidfinish()This method is called at the end of data processing.protected voidfinishProcessing(TXN transaction)This method is called at the end of data processing.protected voidfinishRecoveringContext(Collection<TXN> handledTransactions)Callback for subclasses which is called after restoring (each) user context.protected Optional<CONTEXT>getUserContext()protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT>ignoreFailuresAfterTransactionTimeout()If called, the sink will only log but not propagate exceptions thrown inrecoverAndCommit(Object)if the transaction is older than a specified transaction timeout.voidinitializeState(org.apache.flink.runtime.state.FunctionInitializationContext context)This method is called when the parallel function instance is created during distributed execution.protected Optional<CONTEXT>initializeUserContext()voidinvoke(IN value)This should not be implemented by subclasses.voidinvoke(IN value, SinkFunction.Context context)Writes the given value to the sink.protected abstract voidinvoke(TXN transaction, IN value, SinkFunction.Context context)Write value within a transaction.voidnotifyCheckpointAborted(long checkpointId)voidnotifyCheckpointComplete(long checkpointId)protected java.util.stream.Stream<Map.Entry<Long,TXN>>pendingTransactions()protected abstract voidpreCommit(TXN transaction)Pre commit previously created transaction.protected voidrecoverAndAbort(TXN transaction)Abort a transaction that was rejected by a coordinator after a failure.protected voidrecoverAndCommit(TXN transaction)Invoked on recovered transactions after a failure.protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT>setTransactionTimeout(long transactionTimeout)Sets the transaction timeout.voidsnapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext context)This method is called when a snapshot for a checkpoint is requested.-
Methods inherited from class org.apache.flink.api.common.functions.AbstractRichFunction
getIterationRuntimeContext, getRuntimeContext, open, setRuntimeContext
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.streaming.api.functions.sink.SinkFunction
writeWatermark
-
-
-
-
Field Detail
-
pendingCommitTransactions
protected final LinkedHashMap<Long,TwoPhaseCommitSinkFunction.TransactionHolder<TXN>> pendingCommitTransactions
-
state
protected transient org.apache.flink.api.common.state.ListState<TwoPhaseCommitSinkFunction.State<TXN,CONTEXT>> state
-
-
Constructor Detail
-
TwoPhaseCommitSinkFunction
public TwoPhaseCommitSinkFunction(org.apache.flink.api.common.typeutils.TypeSerializer<TXN> transactionSerializer, org.apache.flink.api.common.typeutils.TypeSerializer<CONTEXT> contextSerializer)
Use defaultListStateDescriptorfor internal state serialization. Helpful utilities for using this constructor areTypeInformation.of(Class),TypeHintandTypeInformation.of(TypeHint). Example:TwoPhaseCommitSinkFunction(TypeInformation.of(new TypeHint<State<TXN, CONTEXT>>() {}));- Parameters:
transactionSerializer-TypeSerializerfor the transaction type of this sinkcontextSerializer-TypeSerializerfor the context type of this sink
-
-
Method Detail
-
pendingTransactions
@Nonnull protected java.util.stream.Stream<Map.Entry<Long,TXN>> pendingTransactions()
-
invoke
protected abstract void invoke(TXN transaction, IN value, SinkFunction.Context context) throws Exception
Write value within a transaction.- Throws:
Exception
-
beginTransaction
protected abstract TXN beginTransaction() throws Exception
Method that starts a new transaction.- Returns:
- newly created transaction.
- Throws:
Exception
-
preCommit
protected abstract void preCommit(TXN transaction) throws Exception
Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the transaction for a commit that might happen in the future. After this point the transaction might still be aborted, but underlying implementation must ensure that commit calls on already pre committed transactions will always succeed.Usually implementation involves flushing the data.
- Throws:
Exception
-
commit
protected abstract void commit(TXN transaction)
Commit a pre-committed transaction. If this method fail, Flink application will be restarted andrecoverAndCommit(Object)will be called again for the same transaction.
-
recoverAndCommit
protected void recoverAndCommit(TXN transaction)
Invoked on recovered transactions after a failure. User implementation must ensure that this call will eventually succeed. If it fails, Flink application will be restarted and it will be invoked again. If it does not succeed eventually, a data loss will occur. Transactions will be recovered in an order in which they were created.
-
abort
protected abstract void abort(TXN transaction)
Abort a transaction.
-
recoverAndAbort
protected void recoverAndAbort(TXN transaction)
Abort a transaction that was rejected by a coordinator after a failure.
-
finishRecoveringContext
protected void finishRecoveringContext(Collection<TXN> handledTransactions)
Callback for subclasses which is called after restoring (each) user context.- Parameters:
handledTransactions- transactions which were already committed or aborted and do not need further handling
-
finishProcessing
protected void finishProcessing(@Nullable TXN transaction)
This method is called at the end of data processing.The method is expected to flush all remaining buffered data. Exceptions will cause the pipeline to be recognized as failed, because the last data items are not processed properly. You may use this method to flush remaining buffered elements in the state into the current transaction which will be committed in the last checkpoint.
-
invoke
public final void invoke(IN value) throws Exception
This should not be implemented by subclasses.- Specified by:
invokein interfaceSinkFunction<IN>- Throws:
Exception
-
invoke
public final void invoke(IN value, SinkFunction.Context context) throws Exception
Description copied from interface:SinkFunctionWrites the given value to the sink. This function is called for every record.You have to override this method when implementing a
SinkFunction, this is adefaultmethod for backward compatibility with the old-style method only.- Specified by:
invokein interfaceSinkFunction<IN>- Parameters:
value- The input record.context- Additional context about the input record.- Throws:
Exception- This method may throw exceptions. Throwing an exception will cause the operation to fail and may trigger recovery.
-
finish
public final void finish() throws ExceptionDescription copied from interface:SinkFunctionThis method is called at the end of data processing.The method is expected to flush all remaining buffered data. Exceptions will cause the pipeline to be recognized as failed, because the last data items are not processed properly. You may use this method to flush remaining buffered elements in the state into transactions which you can commit in the last checkpoint.
NOTE:This method does not need to close any resources. You should release external resources in the
AbstractRichFunction.close()method.- Specified by:
finishin interfaceSinkFunction<IN>- Throws:
Exception- This method may throw exceptions. Throwing an exception will cause the operation to fail and may trigger recovery.
-
notifyCheckpointComplete
public final void notifyCheckpointComplete(long checkpointId) throws Exception- Specified by:
notifyCheckpointCompletein interfaceorg.apache.flink.api.common.state.CheckpointListener- Throws:
Exception
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId)
- Specified by:
notifyCheckpointAbortedin interfaceorg.apache.flink.api.common.state.CheckpointListener
-
snapshotState
public void snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext context) throws ExceptionDescription copied from interface:CheckpointedFunctionThis method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to ensure that all state is exposed by means previously offered throughFunctionInitializationContextwhen the Function was initialized, or offered now byFunctionSnapshotContextitself.- Specified by:
snapshotStatein interfaceCheckpointedFunction- Parameters:
context- the context for drawing a snapshot of the operator- Throws:
Exception- Thrown, if state could not be created ot restored.
-
initializeState
public void initializeState(org.apache.flink.runtime.state.FunctionInitializationContext context) throws ExceptionDescription copied from interface:CheckpointedFunctionThis method is called when the parallel function instance is created during distributed execution. Functions typically set up their state storing data structures in this method.- Specified by:
initializeStatein interfaceCheckpointedFunction- Parameters:
context- the context for initializing the operator- Throws:
Exception- Thrown, if state could not be created ot restored.
-
close
public void close() throws Exception- Specified by:
closein interfaceorg.apache.flink.api.common.functions.RichFunction- Overrides:
closein classorg.apache.flink.api.common.functions.AbstractRichFunction- Throws:
Exception
-
setTransactionTimeout
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> setTransactionTimeout(long transactionTimeout)
Sets the transaction timeout. Setting only the transaction timeout has no effect in itself.- Parameters:
transactionTimeout- The transaction timeout in ms.- See Also:
ignoreFailuresAfterTransactionTimeout(),enableTransactionTimeoutWarnings(double)
-
ignoreFailuresAfterTransactionTimeout
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> ignoreFailuresAfterTransactionTimeout()
If called, the sink will only log but not propagate exceptions thrown inrecoverAndCommit(Object)if the transaction is older than a specified transaction timeout. The start time of an transaction is determined bySystem.currentTimeMillis(). By default, failures are propagated.
-
enableTransactionTimeoutWarnings
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> enableTransactionTimeoutWarnings(double warningRatio)
Enables logging of warnings if a transaction's elapsed time reaches a specified ratio of thetransactionTimeout. IfwarningRatiois 0, a warning will be always logged when committing the transaction.- Parameters:
warningRatio- A value in the range [0,1].- Returns:
-
-