public class LifecycleManager
extends kafka.utils.ShutdownableThread
Modifier and Type | Class and Description |
---|---|
static class |
LifecycleManager.BlobMetadata |
static class |
LifecycleManager.DeletionCounters |
static class |
LifecycleManager.ReductionInRetention
Holds information about a topic whose retention.ms config has been changed to a lower value.
|
Thread.State, Thread.UncaughtExceptionHandler
Modifier and Type | Field and Description |
---|---|
static String |
DATE_PATTERN |
static int |
DEFAULT_CLM_RUN_FREQUENCY_IN_HOURS |
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
Constructor and Description |
---|
LifecycleManager(TierObjectStore tierObjectStore,
LifecycleManagerConfig config,
Supplier<Boolean> isTierTopicAvailable,
Supplier<Boolean> canCLMRun,
org.apache.kafka.common.utils.Time time,
org.apache.kafka.common.metrics.Metrics metrics) |
Modifier and Type | Method and Description |
---|---|
LifecycleManagerState |
cleanupAndCheckpoint(Long currTimeMs,
LifecycleManagerState state,
Map<NameAndId,Integer> topicToBackupRetentionInDays,
Map<NameAndId,LifecycleManager.ReductionInRetention> retentionChanges,
Map<String,List<String>> deletionListsCurrentOrPastDays,
Map<String,Map<TopicIdPartition,List<LifecycleManager.BlobMetadata>>> dateToDeletableBlobs,
Boolean currentDayBlobsAlreadyLoaded)
Saves all the state of the current run at object store and cleans up older blob list files that are not needed anymore.
|
TierTopicReader |
createTierTopicReader(List<Long> tierOffsets) |
Map<NameAndId,LifecycleManager.ReductionInRetention> |
determineRetentionPeriodReductions(Long currentTime,
LifecycleManagerState state,
Map<NameAndId,Integer> backupRetentionInDays)
This method determines reduction to the backup objects' retention for a topic.
|
void |
doWork() |
Map<NameAndId,Integer> |
getBackupRetentionInDaysForAllTopics(Map<NameAndId,Long> latestRetentionConfigs)
Determines the backup object retention in days for a topic, given the topic's retention.ms configuration value
|
Map<NameAndId,Long> |
getKafkaTopicRetentionMs()
Gets retention ms config for all the non-internal topics in the Kafka cluster.
|
void |
initializeMetrics()
Initialize all metrics after every run.
|
Boolean |
isShuttingDownOrInterrupted() |
Optional<LifecycleManagerState> |
lifecycleManagerState(Long currTimeMs)
Gets the lifecycle Manager state.
|
void |
manageLifecycleForBackedUpSegments()
Main workflow for the backed up object lifecycle management.
|
void |
manageLifecycleForTierTopicSnapshots()
Workflow for deletion of tier topic snapshots based on configured retention duration
|
Map<NameAndId,Long> |
requestConfigsWithRetry(List<String> topics)
Helper function to get the retention configuration for a list of topics.
|
List<TierObjectStore.KeyAndVersion> |
retrieveObjectsEligibleForDeletion(Long currTimeMs,
Map<String,Map<TopicIdPartition,List<LifecycleManager.BlobMetadata>>> dateToDeletableBlobs)
Iterate over the lists of blobs eligible for deletion and return the ones past their deletion time.
|
void |
run() |
void |
shutdown() |
void |
waitForTierTopicToBeAvailable()
Waits for the tier topic manager to start.
|
$lessinit$greater$default$2, awaitShutdown, debug, debug, error, error, fatal, fatal, heartbeat, info, info, initiateShutdown, isDebugEnabled, isInterruptible, isRunning, isShutdownComplete, isShutdownInitiated, isThreadFailed, isTraceEnabled, logger, loggerName, logIdent_$eq, logIdent, msgWithLogIdent, name, pause, trace, trace, warn, warn
activeCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
public static final int DEFAULT_CLM_RUN_FREQUENCY_IN_HOURS
public static final String DATE_PATTERN
public LifecycleManager(TierObjectStore tierObjectStore, LifecycleManagerConfig config, Supplier<Boolean> isTierTopicAvailable, Supplier<Boolean> canCLMRun, org.apache.kafka.common.utils.Time time, org.apache.kafka.common.metrics.Metrics metrics)
public void run()
public void shutdown()
shutdown
in class kafka.utils.ShutdownableThread
public void doWork()
doWork
in class kafka.utils.ShutdownableThread
public void manageLifecycleForTierTopicSnapshots()
public void manageLifecycleForBackedUpSegments()
public Optional<LifecycleManagerState> lifecycleManagerState(Long currTimeMs) throws InterruptedException, ParseException
currTimeMs
- current timestamp to compare with the lastRunTimestamp from the bufferInterruptedException
ParseException
public LifecycleManagerState cleanupAndCheckpoint(Long currTimeMs, LifecycleManagerState state, Map<NameAndId,Integer> topicToBackupRetentionInDays, Map<NameAndId,LifecycleManager.ReductionInRetention> retentionChanges, Map<String,List<String>> deletionListsCurrentOrPastDays, Map<String,Map<TopicIdPartition,List<LifecycleManager.BlobMetadata>>> dateToDeletableBlobs, Boolean currentDayBlobsAlreadyLoaded) throws IOException, ParseException, InterruptedException
currTimeMs
- timestamp in milliseconds representing current timestate
- LifecycleManagerState that was downloaded at the start of this run of LifecycleManagertopicToBackupRetentionInDays
- mapping of topic name / id to retention of corresponding backup objects in daysretentionChanges
- mapping of topic name / id to any incremental change in its backup retentiondeletionListsCurrentOrPastDays
- mapping of date to name of all deletion lists for that date. This map
contains deletion lists for current or past dates onlydateToDeletableBlobs
- date and topicIdPartition sorted lists of blobs to be deleted later today or in the futurecurrentDayBlobsAlreadyLoaded
- boolean to signify if the deletion lists for the current day have already been loadedIOException
ParseException
InterruptedException
public Boolean isShuttingDownOrInterrupted()
public List<TierObjectStore.KeyAndVersion> retrieveObjectsEligibleForDeletion(Long currTimeMs, Map<String,Map<TopicIdPartition,List<LifecycleManager.BlobMetadata>>> dateToDeletableBlobs) throws ParseException, InterruptedException
currTimeMs
- current timestamp in milliseconddateToDeletableBlobs
- date and topicIdPartition sorted lists of blobs to be deletedParseException
InterruptedException
public void initializeMetrics()
public Map<NameAndId,LifecycleManager.ReductionInRetention> determineRetentionPeriodReductions(Long currentTime, LifecycleManagerState state, Map<NameAndId,Integer> backupRetentionInDays) throws ParseException
currentTime
- timestamp used as the time when retention reduction was processedstate
- LifecycleManagerStatebackupRetentionInDays
- mapping of topic name / id to current retention (days) for its backed up blobsParseException
public void waitForTierTopicToBeAvailable() throws InterruptedException
InterruptedException
public Map<NameAndId,Long> getKafkaTopicRetentionMs() throws InterruptedException
InterruptedException
public Map<NameAndId,Long> requestConfigsWithRetry(List<String> topics)
topics
- list of topics whose configuration needs to be fetchedpublic Map<NameAndId,Integer> getBackupRetentionInDaysForAllTopics(Map<NameAndId,Long> latestRetentionConfigs)
latestRetentionConfigs
- mapping of topic name / id to the retention.ms configuration valuepublic TierTopicReader createTierTopicReader(List<Long> tierOffsets) throws InterruptedException
InterruptedException