Interface RecoverableWriter
-
- All Known Implementing Classes:
LocalRecoverableWriter
@PublicEvolving public interface RecoverableWriterThe RecoverableWriter creates and recoversRecoverableFsDataOutputStream. It can be used to write data to a file system in a way that the writing can be resumed consistently after a failure and recovery without loss of data or possible duplication of bytes.The streams do not make the files they write to immediately visible, but instead write to temp files or other temporary storage. To publish the data atomically in the end, the stream offers the
RecoverableFsDataOutputStream.closeForCommit()method to create a committer that publishes the result.These writers are useful in the context of checkpointing. The example below illustrates how to use them:
// --------- initial run -------- RecoverableWriter writer = fileSystem.createRecoverableWriter(); RecoverableFsDataOutputStream out = writer.open(path); out.write(...); // persist intermediate state ResumeRecoverable intermediateState = out.persist(); storeInCheckpoint(intermediateState); // --------- recovery -------- ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint RecoverableWriter writer = fileSystem.createRecoverableWriter(); RecoverableFsDataOutputStream out = writer.recover(lastCheckpointState); out.write(...); // append more data out.closeForCommit().commit(); // close stream and publish all the data // --------- recovery without appending -------- ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint RecoverableWriter writer = fileSystem.createRecoverableWriter(); Committer committer = writer.recoverForCommit(lastCheckpointState); committer.commit(); // publish the state as of the last checkpointRecovery
Recovery relies on data persistence in the target file system or object store. While the code itself works with the specific primitives that the target storage offers, recovery will fail if the data written so far was deleted by an external factor. For example, some implementations stage data in temp files or object parts. If these were deleted by someone or by an automated cleanup policy, then resuming may fail. This is not surprising and should be expected, but we want to explicitly point this out here.
Specific care is needed for systems like S3, where the implementation uses Multipart Uploads to incrementally upload and persist parts of the result. Timeouts for Multipart Uploads and life time of Parts in unfinished Multipart Uploads need to be set in the bucket policy high enough to accommodate the recovery. These values are typically in the days, so regular recovery is typically not a problem. What can become an issue is situations where a Flink application is hard killed (all processes or containers removed) and then one tries to manually recover the application from an externalized checkpoint some days later. In that case, systems like S3 may have removed uncommitted parts and recovery will not succeed.
Implementer's Note
From the perspective of the implementer, it would be desirable to make this class generic with respect to the concrete types of 'CommitRecoverable' and 'ResumeRecoverable'. However, we found that this makes the code more clumsy to use and we hence dropped the generics at the cost of doing some explicit casts in the implementation that would otherwise have been implicitly generated by the generics compiler.
-
-
Nested Class Summary
Nested Classes Modifier and Type Interface Description static interfaceRecoverableWriter.CommitRecoverableA handle to an in-progress stream with a defined and persistent amount of data.static interfaceRecoverableWriter.ResumeRecoverableA handle to an in-progress stream with a defined and persistent amount of data.
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method Description booleancleanupRecoverableState(RecoverableWriter.ResumeRecoverable resumable)Frees up any resources that were previously occupied in order to be able to recover from a (potential) failure.SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable>getCommitRecoverableSerializer()The serializer for the CommitRecoverable types created in this writer.SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable>getResumeRecoverableSerializer()The serializer for the ResumeRecoverable types created in this writer.RecoverableFsDataOutputStreamopen(Path path)Opens a new recoverable stream to write to the given path.RecoverableFsDataOutputStreamrecover(RecoverableWriter.ResumeRecoverable resumable)Resumes a recoverable stream consistently at the point indicated by the given ResumeRecoverable.RecoverableFsDataOutputStream.CommitterrecoverForCommit(RecoverableWriter.CommitRecoverable resumable)Recovers a recoverable stream consistently at the point indicated by the given CommitRecoverable for finalizing and committing.booleanrequiresCleanupOfRecoverableState()Marks if the writer requires to do any additional cleanup/freeing of resources occupied as part of aRecoverableWriter.ResumeRecoverable, e.g.booleansupportsResume()Checks whether the writer and its streams support resuming (appending to) files after recovery (via therecover(ResumeRecoverable)method).
-
-
-
Method Detail
-
open
RecoverableFsDataOutputStream open(Path path) throws IOException
Opens a new recoverable stream to write to the given path. Whether existing files will be overwritten is implementation specific and should not be relied upon.- Parameters:
path- The path of the file/object to write to.- Returns:
- A new RecoverableFsDataOutputStream writing a new file/object.
- Throws:
IOException- Thrown if the stream could not be opened/initialized.
-
recover
RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable resumable) throws IOException
Resumes a recoverable stream consistently at the point indicated by the given ResumeRecoverable. Future writes to the stream will continue / append the file as of that point.This method is optional and whether it is supported is indicated through the
supportsResume()method.- Parameters:
resumable- The opaque handle with the recovery information.- Returns:
- A recoverable stream writing to the file/object as it was at the point when the ResumeRecoverable was created.
- Throws:
IOException- Thrown, if resuming fails.UnsupportedOperationException- Thrown if this optional method is not supported.
-
requiresCleanupOfRecoverableState
boolean requiresCleanupOfRecoverableState()
Marks if the writer requires to do any additional cleanup/freeing of resources occupied as part of aRecoverableWriter.ResumeRecoverable, e.g. temporarily files created or objects uploaded to external systems.In case cleanup is required, then
cleanupRecoverableState(ResumeRecoverable)should be called.- Returns:
trueif cleanup is required,falseotherwise.
-
cleanupRecoverableState
boolean cleanupRecoverableState(RecoverableWriter.ResumeRecoverable resumable) throws IOException
Frees up any resources that were previously occupied in order to be able to recover from a (potential) failure. These can be temporary files that were written to the filesystem or objects that were uploaded to S3.NOTE: This operation should not throw an exception, but return false if the cleanup did not happen for any reason.
- Parameters:
resumable- TheRecoverableWriter.ResumeRecoverablewhose state we want to clean-up.- Returns:
trueif the resources were successfully freed,falseotherwise (e.g. the file to be deleted was not there for any reason - already deleted or never created).- Throws:
IOException
-
recoverForCommit
RecoverableFsDataOutputStream.Committer recoverForCommit(RecoverableWriter.CommitRecoverable resumable) throws IOException
Recovers a recoverable stream consistently at the point indicated by the given CommitRecoverable for finalizing and committing. This will publish the target file with exactly the data that was written up to the point then the CommitRecoverable was created.- Parameters:
resumable- The opaque handle with the recovery information.- Returns:
- A committer that publishes the target file.
- Throws:
IOException- Thrown, if recovery fails.
-
getCommitRecoverableSerializer
SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitRecoverableSerializer()
The serializer for the CommitRecoverable types created in this writer. This serializer should be used to store the CommitRecoverable in checkpoint state or other forms of persistent state.
-
getResumeRecoverableSerializer
SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeRecoverableSerializer()
The serializer for the ResumeRecoverable types created in this writer. This serializer should be used to store the ResumeRecoverable in checkpoint state or other forms of persistent state.
-
supportsResume
boolean supportsResume()
Checks whether the writer and its streams support resuming (appending to) files after recovery (via therecover(ResumeRecoverable)method).If true, then this writer supports the
recover(ResumeRecoverable)method. If false, then that method may not be supported and streams can only be recovered viarecoverForCommit(CommitRecoverable).
-
-