package kafka.restore.schedulers;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import kafka.restore.RestoreMetricsManager;
import kafka.restore.configmap.NodeConfig;
import kafka.restore.messages.KafkaTierPartitionStatusRequest;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageResult;
import kafka.tier.state.TierPartitionStatus;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:kafka/restore/schedulers/KafkaConnectionPoolTest.class */
public class KafkaConnectionPoolTest {
    private MockResultReceiver resultsReceiver;
    private CloseableHttpClient httpClient;
    KafkaConnectionPoolImpl kafkaConnectionPool;
    RestoreMetricsManager restoreMetricsManager;
    Metrics metrics;
    Time time;
    NodeConfig broker0 = new NodeConfig(0, "localhost", 9080);
    NodeConfig broker1 = new NodeConfig(1, "localhost", 9080);
    List<NodeConfig> replicas = Arrays.asList(this.broker0, this.broker1);

    /* loaded from: input_file:kafka/restore/schedulers/KafkaConnectionPoolTest$MockResultReceiver.class */
    private static class MockResultReceiver implements AsyncServiceSchedulerResultsReceiver {
        MessageResponse messageResponse;

        private MockResultReceiver() {
        }

        public void reportServiceSchedulerResponse(MessageResponse messageResponse) {
            this.messageResponse = messageResponse;
        }

        public MessageResponse getMessageResponse() {
            return this.messageResponse;
        }
    }

    private CloseableHttpResponse mockTierPartitionStatusResponse(int i) throws IOException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(("{\"data\": {\"attributes\": {\"state\":" + i + "}}}").getBytes());
        StatusLine statusLine = (StatusLine) Mockito.mock(StatusLine.class);
        Mockito.when(Integer.valueOf(statusLine.getStatusCode())).thenReturn(200);
        HttpEntity httpEntity = (HttpEntity) Mockito.mock(HttpEntity.class);
        Mockito.when(httpEntity.getContent()).thenReturn(byteArrayInputStream);
        Mockito.when(Long.valueOf(httpEntity.getContentLength())).thenReturn(Long.valueOf(r0.getBytes().length));
        CloseableHttpResponse closeableHttpResponse = (CloseableHttpResponse) Mockito.mock(CloseableHttpResponse.class);
        Mockito.when(closeableHttpResponse.getEntity()).thenReturn(httpEntity);
        Mockito.when(closeableHttpResponse.getStatusLine()).thenReturn(statusLine);
        return closeableHttpResponse;
    }

    @BeforeEach
    public void setup() throws Exception {
        this.resultsReceiver = new MockResultReceiver();
        this.httpClient = (CloseableHttpClient) Mockito.mock(CloseableHttpClient.class);
        this.metrics = new Metrics();
        this.restoreMetricsManager = new RestoreMetricsManager(this.metrics, "test_cluster");
        this.time = Time.SYSTEM;
        this.kafkaConnectionPool = new KafkaConnectionPoolImpl(this.resultsReceiver, 3, "", this.restoreMetricsManager, this.time);
        this.kafkaConnectionPool.setStatusQueryRetryWaitInMs(1L);
        this.kafkaConnectionPool.startUp();
    }

    @AfterEach
    public void teardown() throws Exception {
        this.kafkaConnectionPool.shutdown();
    }

    @Test
    public void testSubmitTierPartitionStatusRequestSuccess() throws IOException, InterruptedException {
        CloseableHttpResponse mockTierPartitionStatusResponse = mockTierPartitionStatusResponse(TierPartitionStatus.READ_ONLY.ordinal());
        Mockito.when(this.httpClient.execute((HttpUriRequest) Mockito.any(HttpUriRequest.class))).thenReturn(mockTierPartitionStatusResponse).thenReturn(mockTierPartitionStatusResponse(TierPartitionStatus.READ_ONLY.ordinal()));
        this.kafkaConnectionPool.setHttpClient(this.httpClient);
        this.kafkaConnectionPool.submitKafkaRequest(new KafkaTierPartitionStatusRequest(0, "test", 0, this.broker0, this.replicas, TierPartitionStatus.READ_ONLY.ordinal()));
        Thread.sleep(1000L);
        Assertions.assertEquals(MessageResult.SUCCESS, this.resultsReceiver.getMessageResponse().getResult());
    }

    @Test
    public void testSubmitTierPartitionStatusRequestOneReplicaFail() throws IOException, InterruptedException {
        CloseableHttpResponse mockTierPartitionStatusResponse = mockTierPartitionStatusResponse(TierPartitionStatus.READ_ONLY.ordinal());
        CloseableHttpResponse mockTierPartitionStatusResponse2 = mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        CloseableHttpResponse mockTierPartitionStatusResponse3 = mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        Mockito.when(this.httpClient.execute((HttpUriRequest) Mockito.any(HttpUriRequest.class))).thenReturn(mockTierPartitionStatusResponse).thenReturn(mockTierPartitionStatusResponse2).thenReturn(mockTierPartitionStatusResponse3).thenReturn(mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal()));
        this.kafkaConnectionPool.setHttpClient(this.httpClient);
        this.kafkaConnectionPool.submitKafkaRequest(new KafkaTierPartitionStatusRequest(0, "test", 0, this.broker0, this.replicas, TierPartitionStatus.READ_ONLY.ordinal()));
        Thread.sleep(1000L);
        Assertions.assertEquals(MessageResult.FAILURE, this.resultsReceiver.getMessageResponse().getResult());
    }

    @Test
    public void testSubmitTierPartitionStatusRequestRetrySuccess() throws IOException, InterruptedException {
        CloseableHttpResponse mockTierPartitionStatusResponse = mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        Mockito.when(this.httpClient.execute((HttpUriRequest) Mockito.any(HttpUriRequest.class))).thenReturn(mockTierPartitionStatusResponse).thenReturn(mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal())).thenReturn(mockTierPartitionStatusResponse(TierPartitionStatus.READ_ONLY.ordinal()));
        this.kafkaConnectionPool.setHttpClient(this.httpClient);
        this.kafkaConnectionPool.submitKafkaRequest(new KafkaTierPartitionStatusRequest(0, "test", 0, this.broker0, Arrays.asList(this.broker0), TierPartitionStatus.READ_ONLY.ordinal()));
        Thread.sleep(1000L);
        Assertions.assertEquals(MessageResult.SUCCESS, this.resultsReceiver.getMessageResponse().getResult());
    }

    @Test
    public void testSubmitTierPartitionStatusRequestRetryFail() throws IOException, InterruptedException {
        CloseableHttpResponse mockTierPartitionStatusResponse = mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        Mockito.when(this.httpClient.execute((HttpUriRequest) Mockito.any(HttpUriRequest.class))).thenReturn(mockTierPartitionStatusResponse).thenReturn(mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal())).thenReturn(mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal()));
        this.kafkaConnectionPool.setHttpClient(this.httpClient);
        this.kafkaConnectionPool.submitKafkaRequest(new KafkaTierPartitionStatusRequest(0, "test", 0, this.broker0, Arrays.asList(this.broker0), TierPartitionStatus.READ_ONLY.ordinal()));
        Thread.sleep(1000L);
        Assertions.assertEquals(MessageResult.FAILURE, this.resultsReceiver.getMessageResponse().getResult());
    }

    @Test
    public void testGetFtpsFileName() throws Exception {
        CloseableHttpResponse mockTierPartitionStatusResponse = mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        Mockito.when(mockTierPartitionStatusResponse.getLastHeader("Content-Disposition")).thenReturn(newHeader("Content-Disposition", "attachment; filename=\"00000000000000000000.tierstate.adler\""));
        Assertions.assertEquals("/ftps-test-0.tierstate.adler", this.kafkaConnectionPool.getFtpsFileName("test", 0, mockTierPartitionStatusResponse));
        CloseableHttpResponse mockTierPartitionStatusResponse2 = mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        Mockito.when(mockTierPartitionStatusResponse2.getLastHeader("Content-Disposition")).thenReturn(newHeader("Content-Disposition", "attachment; filename=\"00000000000000000000.tierstate\""));
        Assertions.assertEquals("/ftps-test-0.tierstate", this.kafkaConnectionPool.getFtpsFileName("test", 0, mockTierPartitionStatusResponse2));
    }

    private Header newHeader(final String str, final String str2) {
        return new Header() { // from class: kafka.restore.schedulers.KafkaConnectionPoolTest.1
            @Override // org.apache.http.Header
            public String getName() {
                return str;
            }

            @Override // org.apache.http.Header
            public String getValue() {
                return str2;
            }

            @Override // org.apache.http.Header
            public HeaderElement[] getElements() throws ParseException {
                return null;
            }
        };
    }
}
