package io.confluent.connect.replicator;

import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriter;
import io.confluent.connect.replicator.offsets.GroupTopicPartition;
import io.confluent.connect.replicator.offsets.TimestampAndDelta;
import io.confluent.connect.replicator.util.NewReplicatorAdminClient;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/ConsumerTimestampsCheck.class */
public class ConsumerTimestampsCheck extends ConfigurationCheck {
    private Map<String, Object> adminClientConfig;
    private ConsumerProvider consumerProvider;
    private ConsumerTimestampsWriter timestampsWriter;
    public static final String TIMESTAMPS_TOPIC_NAME = "__consumer_timestamps";
    public static final String VERIFIER_GROUP_NAME = "__replicator_verifier_timestamps_committer_group";
    public static final String VERIFIER_TASK_ID = "__replicator_verifier";
    public static final String VERIFIER_TOPIC = "__replicator_verifier";
    private static Logger log = LoggerFactory.getLogger((Class<?>) ConsumerTimestampsCheck.class);
    public static final Long VERIFIER_TIMESTAMP = 1234567890L;

    @Override // io.confluent.connect.replicator.ConfigurationCheck
    public boolean performCheck() {
        boolean z = false;
        boolean z2 = false;
        try {
            try {
                if (this.timestampsWriter == null) {
                    this.timestampsWriter = new ConsumerTimestampsWriter(new NewReplicatorAdminClient(AdminClient.create(this.adminClientConfig), new SystemTime(), 0L, "__replicator_verifier"));
                    this.timestampsWriter.configure(this.adminClientConfig);
                }
                GroupTopicPartition groupTopicPartition = new GroupTopicPartition(VERIFIER_GROUP_NAME, new TopicPartition("__replicator_verifier", 0));
                TimestampAndDelta timestampAndDelta = new TimestampAndDelta(VERIFIER_TIMESTAMP.longValue());
                log.debug("About to try producing to timestamp topic.");
                this.timestampsWriter.send(groupTopicPartition, timestampAndDelta).get();
                z = true;
                log.info("Successfully produced to timestamp topic.");
                closeTimestampsWriter();
            } catch (Throwable th) {
                closeTimestampsWriter();
                throw th;
            }
        } catch (Exception e) {
            if (e.getCause() instanceof TopicAuthorizationException) {
                log.error("Insufficient permissions to access {}", "__consumer_timestamps", e);
            } else {
                log.error("Encountered an error while trying to produce", (Throwable) e);
            }
            closeTimestampsWriter();
        }
        Consumer<byte[], byte[]> consumer = null;
        try {
            try {
                try {
                    consumer = this.consumerProvider.getConsumer();
                    consumer.assign(Utils.getAssignment("__consumer_timestamps"));
                    consumer.seekToEnd(Utils.getAssignment("__consumer_timestamps"));
                    log.debug("Polling timestamp topic from consumer.");
                    consumer.poll(Duration.ofMillis(100L));
                    z2 = true;
                    log.info("Successfully consumed from timestamp topic.");
                    if (consumer != null) {
                        consumer.close();
                    }
                } catch (Throwable th2) {
                    if (consumer != null) {
                        consumer.close();
                    }
                    throw th2;
                }
            } catch (Exception e2) {
                log.error("Encountered error while trying to consume: ", (Throwable) e2);
                if (consumer != null) {
                    consumer.close();
                }
            }
        } catch (TopicAuthorizationException e3) {
            log.error("Insufficient permissions to access {}", "__consumer_timestamps");
            if (consumer != null) {
                consumer.close();
            }
        }
        return z && z2;
    }

    private void closeTimestampsWriter() {
        if (this.timestampsWriter != null) {
            try {
                this.timestampsWriter.close();
            } catch (Exception e) {
            }
        }
    }

    @Override // io.confluent.connect.replicator.ConfigurationCheck
    public String helpText() {
        return "If the `__consumer_timestamps` topic doesn't exist, no checks are performed.\nIf the topic does exist, we will\n1. Produce a message to it\n2. Attempt to poll this topic from a consumer\n(this produce/consume will happen with a random consumer group)";
    }

    @Override // io.confluent.connect.replicator.ConfigurationCheck
    public String getName() {
        return "Source cluster Consumer Timestamps topic check";
    }

    public ConsumerTimestampsCheck setTimestampWriter(ConsumerTimestampsWriter consumerTimestampsWriter) {
        this.timestampsWriter = consumerTimestampsWriter;
        return this;
    }

    public ConsumerTimestampsCheck setConsumerProvider(ConsumerProvider consumerProvider) {
        this.consumerProvider = consumerProvider;
        return this;
    }

    public ConsumerTimestampsCheck setAdminClientConfig(Map<String, Object> map) {
        this.adminClientConfig = map;
        return this;
    }
}
