package org.apache.kafka.connect.storage;

import io.confluent.shaded.org.slf4j.Logger;
import io.confluent.shaded.org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.Callback;

/* loaded from: input_file:org/apache/kafka/connect/storage/OffsetStorageWriter.class */
public class OffsetStorageWriter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OffsetStorageWriter.class);
    private final OffsetBackingStore backingStore;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final String namespace;
    private Map<Map<String, Object>, Map<String, Object>> data = new HashMap();
    private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
    private final Semaphore flushInProgress = new Semaphore(1);
    private long currentFlushId = 0;

    public OffsetStorageWriter(OffsetBackingStore offsetBackingStore, String str, Converter converter, Converter converter2) {
        this.backingStore = offsetBackingStore;
        this.namespace = str;
        this.keyConverter = converter;
        this.valueConverter = converter2;
    }

    public synchronized void offset(Map<String, ?> map, Map<String, ?> map2) {
        this.data.put(map, map2);
    }

    private boolean flushing() {
        return this.toFlush != null;
    }

    public boolean beginFlush() {
        try {
            return beginFlush(0L, TimeUnit.NANOSECONDS);
        } catch (InterruptedException | TimeoutException e) {
            log.error("Invalid call to OffsetStorageWriter beginFlush() while already flushing, the framework should not allow this");
            throw new ConnectException("OffsetStorageWriter is already flushing");
        }
    }

    public boolean beginFlush(long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
        if (!this.flushInProgress.tryAcquire(Math.max(0L, j), timeUnit)) {
            throw new TimeoutException("Timed out waiting for previous flush to finish");
        }
        synchronized (this) {
            if (this.data.isEmpty()) {
                this.flushInProgress.release();
                return false;
            }
            this.toFlush = this.data;
            this.data = new HashMap();
            return true;
        }
    }

    public Future<Void> doFlush(Callback<Void> callback) {
        long j;
        HashMap hashMap;
        synchronized (this) {
            j = this.currentFlushId;
            try {
                hashMap = new HashMap(this.toFlush.size());
                for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : this.toFlush.entrySet()) {
                    OffsetUtils.validateFormat((Map) entry.getKey());
                    OffsetUtils.validateFormat((Map) entry.getValue());
                    byte[] fromConnectData = this.keyConverter.fromConnectData(this.namespace, null, Arrays.asList(this.namespace, entry.getKey()));
                    ByteBuffer wrap = fromConnectData != null ? ByteBuffer.wrap(fromConnectData) : null;
                    byte[] fromConnectData2 = this.valueConverter.fromConnectData(this.namespace, null, entry.getValue());
                    hashMap.put(wrap, fromConnectData2 != null ? ByteBuffer.wrap(fromConnectData2) : null);
                }
                log.debug("Submitting {} entries to backing store. The offsets are: {}", Integer.valueOf(hashMap.size()), this.toFlush);
            } catch (Throwable th) {
                log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit offsets under namespace {}. This likely won't recover unless the unserializable partition or offset information is overwritten.", this.namespace);
                log.error("Cause of serialization failure:", th);
                callback.onCompletion(th, null);
                return null;
            }
        }
        return this.backingStore.set(hashMap, (th2, r11) -> {
            if (!handleFinishWrite(j, th2, r11) || callback == null) {
                return;
            }
            callback.onCompletion(th2, r11);
        });
    }

    public synchronized void cancelFlush() {
        if (flushing()) {
            this.toFlush.putAll(this.data);
            this.data = this.toFlush;
            this.currentFlushId++;
            this.flushInProgress.release();
            this.toFlush = null;
        }
    }

    private synchronized boolean handleFinishWrite(long j, Throwable th, Void r10) {
        if (j != this.currentFlushId) {
            return false;
        }
        if (th != null) {
            cancelFlush();
            return true;
        }
        this.currentFlushId++;
        this.flushInProgress.release();
        this.toFlush = null;
        return true;
    }
}
