package org.apache.kafka.connect.storage;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.class */
public class ConnectorOffsetBackingStore implements OffsetBackingStore {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectorOffsetBackingStore.class);
    private final Time time;
    private final Supplier<LoggingContext> loggingContext;
    private final String primaryOffsetsTopic;
    private final Optional<OffsetBackingStore> workerStore;
    private final Optional<KafkaOffsetBackingStore> connectorStore;
    private final Optional<TopicAdmin> connectorStoreAdmin;

    public static ConnectorOffsetBackingStore withConnectorAndWorkerStores(Supplier<LoggingContext> supplier, OffsetBackingStore offsetBackingStore, KafkaOffsetBackingStore kafkaOffsetBackingStore, String str, TopicAdmin topicAdmin) {
        Objects.requireNonNull(supplier);
        Objects.requireNonNull(offsetBackingStore);
        Objects.requireNonNull(kafkaOffsetBackingStore);
        Objects.requireNonNull(str);
        Objects.requireNonNull(topicAdmin);
        return new ConnectorOffsetBackingStore(Time.SYSTEM, supplier, str, offsetBackingStore, kafkaOffsetBackingStore, topicAdmin);
    }

    public static ConnectorOffsetBackingStore withOnlyWorkerStore(Supplier<LoggingContext> supplier, OffsetBackingStore offsetBackingStore, String str) {
        Objects.requireNonNull(supplier);
        Objects.requireNonNull(offsetBackingStore);
        return new ConnectorOffsetBackingStore(Time.SYSTEM, supplier, str, offsetBackingStore, null, null);
    }

    public static ConnectorOffsetBackingStore withOnlyConnectorStore(Supplier<LoggingContext> supplier, KafkaOffsetBackingStore kafkaOffsetBackingStore, String str, TopicAdmin topicAdmin) {
        Objects.requireNonNull(supplier);
        Objects.requireNonNull(str);
        Objects.requireNonNull(topicAdmin);
        return new ConnectorOffsetBackingStore(Time.SYSTEM, supplier, str, null, kafkaOffsetBackingStore, topicAdmin);
    }

    ConnectorOffsetBackingStore(Time time, Supplier<LoggingContext> supplier, String str, OffsetBackingStore offsetBackingStore, KafkaOffsetBackingStore kafkaOffsetBackingStore, TopicAdmin topicAdmin) {
        if (offsetBackingStore == null && kafkaOffsetBackingStore == null) {
            throw new IllegalArgumentException("At least one non-null offset store must be provided");
        }
        this.time = time;
        this.loggingContext = supplier;
        this.primaryOffsetsTopic = str;
        this.workerStore = Optional.ofNullable(offsetBackingStore);
        this.connectorStore = Optional.ofNullable(kafkaOffsetBackingStore);
        this.connectorStoreAdmin = Optional.ofNullable(topicAdmin);
    }

    public String primaryOffsetsTopic() {
        return this.primaryOffsetsTopic;
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public void start() {
        this.connectorStore.ifPresent((v0) -> {
            v0.start();
        });
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public void stop() {
        this.connectorStore.ifPresent((v0) -> {
            v0.stop();
        });
        this.connectorStoreAdmin.ifPresent((v0) -> {
            v0.close();
        });
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> collection) {
        final Future<Map<ByteBuffer, ByteBuffer>> fromStore = getFromStore(this.workerStore, collection);
        final Future<Map<ByteBuffer, ByteBuffer>> fromStore2 = getFromStore(this.connectorStore, collection);
        return new Future<Map<ByteBuffer, ByteBuffer>>() { // from class: org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.1
            @Override // java.util.concurrent.Future
            public boolean cancel(boolean z) {
                return fromStore.cancel(z) | fromStore2.cancel(z);
            }

            @Override // java.util.concurrent.Future
            public boolean isCancelled() {
                return fromStore.isCancelled() || fromStore2.isCancelled();
            }

            @Override // java.util.concurrent.Future
            public boolean isDone() {
                return fromStore.isDone() && fromStore2.isDone();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public Map<ByteBuffer, ByteBuffer> get() throws InterruptedException, ExecutionException {
                HashMap hashMap = new HashMap((Map) fromStore.get());
                hashMap.putAll((Map) fromStore2.get());
                return hashMap;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public Map<ByteBuffer, ByteBuffer> get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                long millis = timeUnit.toMillis(j);
                long milliseconds = ConnectorOffsetBackingStore.this.time.milliseconds() + millis;
                HashMap hashMap = new HashMap((Map) fromStore.get(millis, timeUnit));
                hashMap.putAll((Map) fromStore2.get(Math.max(1L, milliseconds - ConnectorOffsetBackingStore.this.time.milliseconds()), TimeUnit.MILLISECONDS));
                return hashMap;
            }
        };
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public Future<Void> set(Map<ByteBuffer, ByteBuffer> map, Callback<Void> callback) {
        OffsetBackingStore offsetBackingStore;
        OffsetBackingStore offsetBackingStore2;
        if (this.connectorStore.isPresent()) {
            offsetBackingStore = this.connectorStore.get();
            offsetBackingStore2 = this.workerStore.orElse(null);
        } else {
            if (!this.workerStore.isPresent()) {
                throw new IllegalStateException("At least one non-null offset store must be provided");
            }
            offsetBackingStore = this.workerStore.get();
            offsetBackingStore2 = null;
        }
        OffsetBackingStore offsetBackingStore3 = offsetBackingStore2;
        return offsetBackingStore.set(map, (th, r9) -> {
            if (offsetBackingStore3 != null) {
                if (th != null) {
                    log.trace("Skipping offsets write to secondary store because primary write has failed", th);
                } else {
                    try {
                        offsetBackingStore3.set(map, (th, r6) -> {
                            LoggingContext loggingContext = loggingContext();
                            Throwable th = null;
                            try {
                                if (th != null) {
                                    log.warn("Failed to write offsets to secondary backing store", th);
                                } else {
                                    log.debug("Successfully flushed offsets to secondary backing store");
                                }
                                if (loggingContext != null) {
                                    if (0 == 0) {
                                        loggingContext.close();
                                        return;
                                    }
                                    try {
                                        loggingContext.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                            } catch (Throwable th3) {
                                if (loggingContext != null) {
                                    if (0 != 0) {
                                        try {
                                            loggingContext.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        loggingContext.close();
                                    }
                                }
                                throw th3;
                            }
                        });
                    } catch (Exception e) {
                        log.warn("Failed to write offsets to secondary backing store", (Throwable) e);
                    }
                }
            }
            LoggingContext loggingContext = loggingContext();
            Throwable th2 = null;
            try {
                try {
                    callback.onCompletion(th, r9);
                    if (loggingContext != null) {
                        if (0 == 0) {
                            loggingContext.close();
                            return;
                        }
                        try {
                            loggingContext.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                } catch (Throwable th4) {
                    th2 = th4;
                    throw th4;
                }
            } catch (Throwable th5) {
                if (loggingContext != null) {
                    if (th2 != null) {
                        try {
                            loggingContext.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        loggingContext.close();
                    }
                }
                throw th5;
            }
        });
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public void configure(WorkerConfig workerConfig) {
        this.connectorStore.ifPresent(kafkaOffsetBackingStore -> {
            kafkaOffsetBackingStore.configure(workerConfig);
        });
    }

    public boolean hasConnectorSpecificStore() {
        return this.connectorStore.isPresent();
    }

    public boolean hasWorkerGlobalStore() {
        return this.workerStore.isPresent();
    }

    private LoggingContext loggingContext() {
        LoggingContext loggingContext = this.loggingContext.get();
        Objects.requireNonNull(loggingContext);
        return loggingContext;
    }

    private static Future<Map<ByteBuffer, ByteBuffer>> getFromStore(Optional<? extends OffsetBackingStore> optional, Collection<ByteBuffer> collection) {
        return (Future) optional.map(offsetBackingStore -> {
            return offsetBackingStore.get(collection);
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        });
    }
}
