public class KafkaConnectionPoolImpl extends Object implements KafkaConnectionPool
Constructor and Description |
---|
KafkaConnectionPoolImpl(AsyncServiceSchedulerResultsReceiver resultsReceiver,
int poolSize,
String ftpsDirPath,
RestoreMetricsManager metrics,
org.apache.kafka.common.utils.Time time)
Constructs new KafkaConnectionPool with given resultsReceiver.
|
Modifier and Type | Method and Description |
---|---|
String |
getFtpsFileName(String topic,
int partition,
org.apache.http.client.methods.CloseableHttpResponse response)
Returns the filename that should be used to store the FTPS file for the given topic/partition.
|
protected void |
reportFetchFtpsResponse(KafkaFetchFtpsRequest request,
MessageStatusCode statusCode,
MessageResult result)
Constructs a FetchFtpsResponse with the given params and null ftpsFileName and reports it to the resultsReceiver.
|
protected void |
reportFetchFtpsResponse(KafkaFetchFtpsRequest request,
MessageStatusCode statusCode,
MessageResult result,
String ftpsFileName)
Constructs a FetchFtpsResponse with the given params and reports it to the resultsReceiver.
|
protected void |
reportKafkaResponse(KafkaRequest request,
MessageStatusCode statusCode,
MessageResult result) |
protected void |
reportKafkaValidateLogRangeResponse(KafkaRequest request,
MessageStatusCode statusCode,
MessageResult result) |
protected void |
reportTierPartitionStatusResponse(KafkaRequest request,
MessageStatusCode statusCode,
MessageResult result) |
void |
setHttpClient(org.apache.http.impl.client.CloseableHttpClient httpClient)
For test.
|
void |
setStatusQueryRetryWaitInMs(long statusQueryRetryWaitInMs) |
void |
shutdown()
Shuts down KafkaConnectionPool resources.
|
void |
startUp()
Provisions necessary resources for running KafkaConnectionPool.
|
protected void |
submitFetchFtpsRequest(KafkaFetchFtpsRequest request) |
protected void |
submitKafkaEventRequest(KafkaRequest request) |
void |
submitKafkaRequest(KafkaRequest kafkaRequest)
Submits KafkaRequest to be scheduled.
|
protected void |
submitKafkaValidateLogRangeRequest(KafkaValidateLogRangeRequest request) |
protected void |
submitTierPartitionStatusRequest(KafkaTierPartitionStatusRequest request)
Submits a KafkaTierPartitionStatusRequest to be scheduled.
|
public KafkaConnectionPoolImpl(AsyncServiceSchedulerResultsReceiver resultsReceiver, int poolSize, String ftpsDirPath, RestoreMetricsManager metrics, org.apache.kafka.common.utils.Time time)
resultsReceiver
- results receiver to forward responses topoolSize
- maximum number of threads/concurrent connections in the KafkaConnectionPoolftpsDirPath
- directory in which fetched FTPS files should be storedpublic void setStatusQueryRetryWaitInMs(long statusQueryRetryWaitInMs)
public void setHttpClient(org.apache.http.impl.client.CloseableHttpClient httpClient)
public void startUp()
startUp
in interface KafkaConnectionPool
public void shutdown()
shutdown
in interface KafkaConnectionPool
public void submitKafkaRequest(KafkaRequest kafkaRequest)
submitKafkaRequest
in interface KafkaConnectionPool
kafkaRequest
- request to schedule.protected void submitFetchFtpsRequest(KafkaFetchFtpsRequest request)
protected void submitKafkaEventRequest(KafkaRequest request)
protected void submitTierPartitionStatusRequest(KafkaTierPartitionStatusRequest request)
protected void submitKafkaValidateLogRangeRequest(KafkaValidateLogRangeRequest request)
protected void reportFetchFtpsResponse(KafkaFetchFtpsRequest request, MessageStatusCode statusCode, MessageResult result, String ftpsFileName)
request
- request we are responding tostatusCode
- status code of responseresult
- MessageResult of responseftpsFileName
- path to the local FTPS fileprotected void reportFetchFtpsResponse(KafkaFetchFtpsRequest request, MessageStatusCode statusCode, MessageResult result)
request
- request we are responding tostatusCode
- status code of responseresult
- MessageResult of responseprotected void reportKafkaResponse(KafkaRequest request, MessageStatusCode statusCode, MessageResult result)
protected void reportKafkaValidateLogRangeResponse(KafkaRequest request, MessageStatusCode statusCode, MessageResult result)
protected void reportTierPartitionStatusResponse(KafkaRequest request, MessageStatusCode statusCode, MessageResult result)
public String getFtpsFileName(String topic, int partition, org.apache.http.client.methods.CloseableHttpResponse response)
topic
- topic associated with FTPS filepartition
- partition number associated with FTPS file