package kafka.tier.store;

import com.google.api.services.storage.StorageScopes;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.TierObjectStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/store/GcsTierObjectStore.class */
public class GcsTierObjectStore implements TierObjectStore {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GcsTierObjectStore.class);
    private final String clusterId;
    private final int brokerId;
    private final String bucket;
    private final int writeChunkSize;
    private final int readChunkSize;
    private final Storage storage;

    /* loaded from: input_file:kafka/tier/store/GcsTierObjectStore$GcsTierObjectStoreResponse.class */
    private static class GcsTierObjectStoreResponse implements TierObjectStoreResponse {
        private final InputStream inputStream;

        GcsTierObjectStoreResponse(ReadChannel readChannel, long j, int i) throws IOException {
            if (i > 0) {
                readChannel.setChunkSize(i);
            }
            readChannel.seek(j);
            this.inputStream = Channels.newInputStream(readChannel);
        }

        @Override // java.lang.AutoCloseable
        public void close() throws IOException {
            this.inputStream.close();
        }

        @Override // kafka.tier.store.TierObjectStoreResponse
        public InputStream getInputStream() {
            return this.inputStream;
        }
    }

    public GcsTierObjectStore(GcsTierObjectStoreConfig gcsTierObjectStoreConfig) {
        this(storage(gcsTierObjectStoreConfig), gcsTierObjectStoreConfig);
    }

    GcsTierObjectStore(Storage storage, GcsTierObjectStoreConfig gcsTierObjectStoreConfig) {
        this.clusterId = gcsTierObjectStoreConfig.clusterId;
        this.brokerId = gcsTierObjectStoreConfig.brokerId.intValue();
        this.storage = storage;
        this.bucket = gcsTierObjectStoreConfig.gcsBucket;
        this.writeChunkSize = gcsTierObjectStoreConfig.gcsWriteChunkSize.intValue();
        this.readChunkSize = gcsTierObjectStoreConfig.gcsReadChunkSize.intValue();
        expectBucket(this.bucket, gcsTierObjectStoreConfig.gcsRegion);
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStoreResponse getObject(TierObjectStore.ObjectMetadata objectMetadata, TierObjectStore.FileType fileType, Integer num, Integer num2) {
        String keyPath = TierObjectStoreUtils.keyPath(objectMetadata, fileType);
        BlobId of = BlobId.of(this.bucket, keyPath);
        if (num != null && num2 != null && num.intValue() > num2.intValue()) {
            throw new IllegalStateException("Invalid range of byteOffsetStart and byteOffsetEnd");
        }
        if (num == null && num2 != null) {
            throw new IllegalStateException("Cannot specify a byteOffsetEnd without specifying a byteOffsetStart");
        }
        log.debug("Fetching object from gcs://{}/{}, with range of {} to {}", this.bucket, keyPath, num, num2);
        try {
            return new GcsTierObjectStoreResponse(this.storage.reader(of, new Storage.BlobSourceOption[0]), num == null ? 0L : num.longValue(), this.readChunkSize);
        } catch (StorageException e) {
            throw new TierObjectStoreRetriableException("Failed to fetch segment " + objectMetadata, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when fetching segment " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putSegment(TierObjectStore.ObjectMetadata objectMetadata, File file, File file2, File file3, Optional<File> optional, Optional<ByteBuffer> optional2, Optional<File> optional3) {
        Map<String, String> createSegmentMetadata = TierObjectStoreUtils.createSegmentMetadata(objectMetadata, this.clusterId, this.brokerId);
        try {
            putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.SEGMENT), createSegmentMetadata, file);
            putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.OFFSET_INDEX), createSegmentMetadata, file2);
            putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.TIMESTAMP_INDEX), createSegmentMetadata, file3);
            if (optional.isPresent()) {
                putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.PRODUCER_STATE), createSegmentMetadata, optional.get());
            }
            if (optional2.isPresent()) {
                putBuf(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX), createSegmentMetadata, optional2.get());
            }
            if (optional3.isPresent()) {
                putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.EPOCH_STATE), createSegmentMetadata, optional3.get());
            }
        } catch (StorageException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segment " + objectMetadata, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void deleteSegment(TierObjectStore.ObjectMetadata objectMetadata) {
        ArrayList arrayList = new ArrayList();
        for (TierObjectStore.FileType fileType : TierObjectStore.FileType.values()) {
            arrayList.add(BlobId.of(this.bucket, TierObjectStoreUtils.keyPath(objectMetadata, fileType)));
        }
        log.debug("Deleting " + arrayList);
        try {
            this.storage.delete(arrayList);
        } catch (StorageException e) {
            throw new TierObjectStoreRetriableException("Failed to delete segment " + objectMetadata, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when deleting segment " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void close() {
    }

    private void putFile(String str, Map<String, String> map, File file) throws IOException {
        BlobInfo build = BlobInfo.newBuilder(BlobId.of(this.bucket, str)).setMetadata(map).build();
        log.debug("Uploading object to gcs://{}/{}", this.bucket, str);
        WriteChannel writer = this.storage.writer(build, new Storage.BlobWriteOption[0]);
        Throwable th = null;
        try {
            FileChannel open = FileChannel.open(file.toPath(), StandardOpenOption.READ);
            Throwable th2 = null;
            try {
                try {
                    if (this.writeChunkSize > 0) {
                        writer.setChunkSize(this.writeChunkSize);
                    }
                    long length = file.length();
                    for (long j = 0; j < length; j += open.transferTo(j, length, writer)) {
                    }
                    if (open != null) {
                        if (0 != 0) {
                            try {
                                open.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            open.close();
                        }
                    }
                    if (writer != null) {
                        if (0 == 0) {
                            writer.close();
                            return;
                        }
                        try {
                            writer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (open != null) {
                    if (th2 != null) {
                        try {
                            open.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        open.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (writer != null) {
                if (0 != 0) {
                    try {
                        writer.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    writer.close();
                }
            }
            throw th8;
        }
    }

    private void putBuf(String str, Map<String, String> map, ByteBuffer byteBuffer) throws IOException {
        BlobInfo build = BlobInfo.newBuilder(BlobId.of(this.bucket, str)).setMetadata(map).build();
        log.debug("Uploading object gcs://{}/{}", this.bucket, str);
        WriteChannel writer = this.storage.writer(build, new Storage.BlobWriteOption[0]);
        Throwable th = null;
        try {
            try {
                if (this.writeChunkSize > 0) {
                    writer.setChunkSize(this.writeChunkSize);
                }
                while (byteBuffer.hasRemaining()) {
                    writer.write(byteBuffer);
                }
                if (writer != null) {
                    if (0 == 0) {
                        writer.close();
                        return;
                    }
                    try {
                        writer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (writer != null) {
                if (th != null) {
                    try {
                        writer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    writer.close();
                }
            }
            throw th4;
        }
    }

    private static Storage storage(GcsTierObjectStoreConfig gcsTierObjectStoreConfig) {
        if (!gcsTierObjectStoreConfig.gcsCredFilePath.isPresent()) {
            return StorageOptions.getDefaultInstance().getService();
        }
        try {
            return StorageOptions.newBuilder().setCredentials(GoogleCredentials.fromStream(new FileInputStream(gcsTierObjectStoreConfig.gcsCredFilePath.get())).createScoped(Lists.newArrayList(StorageScopes.CLOUD_PLATFORM))).build2().getService();
        } catch (IOException e) {
            throw new TierObjectStoreFatalException("Error in opening GCS credentials file", e);
        }
    }

    private void expectBucket(String str, String str2) throws TierObjectStoreFatalException {
        try {
            Bucket bucket = this.storage.get(str, Storage.BucketGetOption.fields(Storage.BucketField.LOCATION));
            if (bucket == null) {
                throw new TierObjectStoreFatalException("Configured bucket " + str + " does not exist or could not be found");
            }
            String location = bucket.getLocation();
            if (str2.equalsIgnoreCase(location)) {
                return;
            }
            log.warn("Bucket region {} does not match expected region {}", location, str2);
        } catch (StorageException e) {
            throw new TierObjectStoreFatalException("Unable to access bucket " + str, e);
        }
    }
}
