package io.confluent.connect.replicator;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/ConsumerOffsetCommitCheck.class */
public class ConsumerOffsetCommitCheck extends ConfigurationCheck {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerOffsetCommitCheck.class);
    public static final int CREATE_RETRIES = 5;
    private Map<String, Object> clientConfig = new HashMap();
    private String name = "";
    private String checkTopic = "";
    private AdminClient adminClient = null;
    private Consumer<byte[], byte[]> consumer = null;
    private boolean topicCreated = false;
    private String tag;

    @Override // io.confluent.connect.replicator.ConfigurationCheck
    public boolean performCheck() {
        logConfig(this.clientConfig, log);
        try {
            if (this.adminClient == null) {
                this.adminClient = AdminClient.create(this.clientConfig);
            }
            if (this.consumer == null) {
                this.consumer = ReplicatorSourceTask.createConsumerHelper(this.clientConfig, this.name, "1");
            }
            if (this.adminClient == null || this.consumer == null) {
                cleanup();
                return false;
            }
            createTopic();
            HashMap hashMap = new HashMap();
            hashMap.put(new TopicPartition(this.checkTopic, 0), new OffsetAndMetadata(0L));
            log.debug("committing offsets for topic: " + this.checkTopic + " partitions: 0 in group: " + this.consumer.groupMetadata().groupId());
            this.consumer.commitSync(hashMap);
            return true;
        } catch (Exception e) {
            log.error("failed to commit offsets for group: " + this.name, (Throwable) e);
            return false;
        } finally {
            cleanup();
        }
    }

    private void cleanup() {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    private void createTopic() throws InterruptedException, ExecutionException {
        for (int i = 0; i < 5; i++) {
            try {
                this.adminClient.createTopics(Collections.singletonList(new NewTopic(this.checkTopic, 1, (short) 1))).all().get();
                this.topicCreated = true;
            } catch (ExecutionException e) {
                if (e.getCause().getClass() != TopicExistsException.class) {
                    throw e;
                }
            }
            if (this.adminClient.listTopics().names().get().contains(this.checkTopic)) {
                return;
            }
            Thread.sleep(1000L);
        }
    }

    @Override // io.confluent.connect.replicator.ConfigurationCheck
    public String helpText() {
        return "Replicator commits offsets to both the source and destination clusters. Source cluster offsets are committed for monitoring purposes if the configuration property \"offset.topic.commit\" is true (default is true). Consumer configurations for the source cluster consumer are determined by the prefixes\"src.consumer.\" and \"src.kafka.\" (e.g. src.kafka.bootstrap.servers). Offsets are committed to the destination cluster when offset translation is enabled. This is configured using the property \"offset.translator.tasks.max\" (to disable translation set this to 0). Configurations for the destination consumer are configured using the prefixes \"dest.consumer.\" and \"dest.kafka.\" (e.g. dest.kafka.bootstrap.servers). If security is enabled this replicator will require suitable ACLs to allow it to join the consumer group: " + this.name;
    }

    @Override // io.confluent.connect.replicator.ConfigurationCheck
    public String getName() {
        return "Consumer offset commit check " + this.tag;
    }

    public ConsumerOffsetCommitCheck setClientConfig(Map<String, Object> map) {
        this.clientConfig = map;
        return this;
    }

    public ConsumerOffsetCommitCheck setName(String str) {
        this.name = str;
        return this;
    }

    public ConsumerOffsetCommitCheck setCheckTopic(String str) {
        this.checkTopic = str;
        return this;
    }

    public ConsumerOffsetCommitCheck setConsumer(Consumer<byte[], byte[]> consumer) {
        this.consumer = consumer;
        return this;
    }

    public ConsumerOffsetCommitCheck setTag(String str) {
        this.tag = str;
        return this;
    }

    public void setAdminClient(AdminClient adminClient) {
        this.adminClient = adminClient;
    }
}
