public class LifecycleManager
extends kafka.utils.ShutdownableThread
implements kafka.server.BrokerReconfigurable
Modifier and Type | Class and Description |
---|---|
static class |
LifecycleManager.BlobMetadata |
static class |
LifecycleManager.ReductionInRetention
Holds information about a topic whose retention.ms config has been changed to a lower value.
|
Thread.State, Thread.UncaughtExceptionHandler
Modifier and Type | Field and Description |
---|---|
static String |
DATE_PATTERN |
static int |
DEFAULT_CLM_RUN_FREQUENCY_IN_HOURS |
static scala.collection.Set<String> |
reconfigurableConfigs |
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
Constructor and Description |
---|
LifecycleManager(TierObjectStore tierObjectStore,
LifecycleManagerConfig config,
Supplier<Boolean> isTierTopicAvailable,
Supplier<Boolean> canCLMRun,
org.apache.kafka.common.utils.Time time,
org.apache.kafka.common.metrics.Metrics metrics) |
Modifier and Type | Method and Description |
---|---|
TierTopicReader |
createTierTopicReader(List<Long> tierOffsets) |
void |
doOneCLMRun()
Main workflow for the backed up object lifecycle management.
|
void |
doWork() |
Map<String,LifecycleManager.ReductionInRetention> |
getAnyRetentionPeriodReductions(Long currentTime,
LifecycleManagerState state,
Map<String,Integer> backupRetentionInDays)
This method determines reduction to the backup objects' retention for a topic.
|
static Map<String,Integer> |
getBackupRetentionInDaysForAllTopics(Map<String,Long> latestRetentionConfigs)
Determines the backup object retention in days for a topic, given the topic's retention.ms configuration value
|
Map<String,Long> |
getKafkaTopicRetentionMs()
Gets retention ms config for all the non-internal topics in the Kafka cluster
|
Optional<LifecycleManagerState> |
getLifecycleManagerState(Long currTimeMs)
Gets the lifecycle Manager state.
|
boolean |
isLifecycleManagerActive() |
scala.collection.Set<String> |
reconfigurableConfigs() |
void |
reconfigure(kafka.server.KafkaConfig oldConfig,
kafka.server.KafkaConfig newConfig) |
List<TierObjectStore.KeyAndVersion> |
retrieveObjectsEligibleForDeletion(Long currTimeMs,
Map<String,Map<TopicIdPartition,List<LifecycleManager.BlobMetadata>>> dateToDeletableBlobs)
Iterate over the lists of blobs eligible for deletion and return the ones past their deletion time.
|
void |
run() |
void |
shutdown() |
void |
validateReconfiguration(kafka.server.KafkaConfig newConfig) |
void |
waitForTierTopicToBeAvailable()
Waits for the tier topic manager to start.
|
$lessinit$greater$default$2, awaitShutdown, debug, debug, error, error, fatal, fatal, heartbeat, info, info, initiateShutdown, isDebugEnabled, isInterruptible, isRunning, isShutdownComplete, isShutdownInitiated, isThreadFailed, isTraceEnabled, logger, loggerName, logIdent_$eq, logIdent, msgWithLogIdent, name, pause, trace, trace, warn, warn
activeCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
public static final int DEFAULT_CLM_RUN_FREQUENCY_IN_HOURS
public static final String DATE_PATTERN
public static scala.collection.Set<String> reconfigurableConfigs
public LifecycleManager(TierObjectStore tierObjectStore, LifecycleManagerConfig config, Supplier<Boolean> isTierTopicAvailable, Supplier<Boolean> canCLMRun, org.apache.kafka.common.utils.Time time, org.apache.kafka.common.metrics.Metrics metrics)
public void run()
public void shutdown()
shutdown
in class kafka.utils.ShutdownableThread
public void doWork()
doWork
in class kafka.utils.ShutdownableThread
public void doOneCLMRun()
public Optional<LifecycleManagerState> getLifecycleManagerState(Long currTimeMs) throws ParseException, StateManager.LifecycleManagerVersionException
currTimeMs
- current timestamp to compare with the lastRunTimestamp from the bufferParseException
StateManager.LifecycleManagerVersionException
public List<TierObjectStore.KeyAndVersion> retrieveObjectsEligibleForDeletion(Long currTimeMs, Map<String,Map<TopicIdPartition,List<LifecycleManager.BlobMetadata>>> dateToDeletableBlobs) throws ParseException, InterruptedException
currTimeMs
- current timestamp in milliseconddateToDeletableBlobs
- date and topicIdPartition sorted lists of blobs to be deletedParseException
InterruptedException
public Map<String,LifecycleManager.ReductionInRetention> getAnyRetentionPeriodReductions(Long currentTime, LifecycleManagerState state, Map<String,Integer> backupRetentionInDays) throws ParseException
currentTime
- timestamp used as the time when retention reduction was processedstate
- LifecycleManagerStatebackupRetentionInDays
- mapping of topic name to current retention (days) for its backed up blobsParseException
public void waitForTierTopicToBeAvailable() throws InterruptedException
InterruptedException
public Map<String,Long> getKafkaTopicRetentionMs() throws ExecutionException, InterruptedException
ExecutionException
InterruptedException
public boolean isLifecycleManagerActive()
public static Map<String,Integer> getBackupRetentionInDaysForAllTopics(Map<String,Long> latestRetentionConfigs)
latestRetentionConfigs
- mapping of topic name to the retention.ms configuration valuepublic TierTopicReader createTierTopicReader(List<Long> tierOffsets)
public scala.collection.Set<String> reconfigurableConfigs()
reconfigurableConfigs
in interface kafka.server.BrokerReconfigurable
public void validateReconfiguration(kafka.server.KafkaConfig newConfig)
validateReconfiguration
in interface kafka.server.BrokerReconfigurable
public void reconfigure(kafka.server.KafkaConfig oldConfig, kafka.server.KafkaConfig newConfig)
reconfigure
in interface kafka.server.BrokerReconfigurable