/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.knn.index.codec.nativeindex.remote;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.Generated;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.codec.nativeindex.remote.DocIdInputStream;
import org.opensearch.knn.index.codec.nativeindex.remote.VectorRepositoryAccessor;
import org.opensearch.knn.index.codec.nativeindex.remote.VectorValuesInputStream;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.store.IndexOutputWithBuffer;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;

public class DefaultVectorRepositoryAccessor
implements VectorRepositoryAccessor {
    @Generated
    private static final Logger log = LogManager.getLogger(DefaultVectorRepositoryAccessor.class);
    private final BlobContainer blobContainer;

    @Override
    public void writeToRepository(final String blobName, int totalLiveDocs, VectorDataType vectorDataType, Supplier<KNNVectorValues<?>> knnVectorValuesSupplier) throws IOException, InterruptedException {
        assert (this.blobContainer != null);
        KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();
        KNNCodecUtil.initializeVectorValues(knnVectorValues);
        final long vectorBlobLength = (long)knnVectorValues.bytesPerVector() * (long)totalLiveDocs;
        BlobContainer blobContainer = this.blobContainer;
        if (blobContainer instanceof AsyncMultiStreamBlobContainer) {
            AsyncMultiStreamBlobContainer asyncBlobContainer = (AsyncMultiStreamBlobContainer)blobContainer;
            log.debug("Container {} Supports Parallel Blob Upload", (Object)this.blobContainer);
            WriteContext writeContext = this.createWriteContext(blobName, vectorBlobLength, knnVectorValuesSupplier, vectorDataType);
            final AtomicReference exception = new AtomicReference();
            CountDownLatch latch = new CountDownLatch(1);
            asyncBlobContainer.asyncBlobUpload(writeContext, (ActionListener)new LatchedActionListener((ActionListener)new ActionListener<Void>(this){

                public void onResponse(Void unused) {
                    log.debug("Parallel vector upload succeeded for blob {} with size {}", (Object)(blobName + ".knnvec"), (Object)vectorBlobLength);
                }

                public void onFailure(Exception e) {
                    log.error("Parallel vector upload failed for blob {} with size {}", (Object)(blobName + ".knnvec"), (Object)vectorBlobLength, (Object)e);
                    exception.set(e);
                }
            }, latch));
            this.writeDocIds(knnVectorValuesSupplier.get(), vectorBlobLength, totalLiveDocs, blobName, this.blobContainer);
            latch.await();
            if (exception.get() != null) {
                throw new IOException((Throwable)exception.get());
            }
        } else {
            log.debug("Container {} Does Not Support Parallel Blob Upload", (Object)this.blobContainer);
            try (BufferedInputStream vectorStream = new BufferedInputStream(new VectorValuesInputStream(knnVectorValuesSupplier.get(), vectorDataType));){
                log.debug("Writing {} bytes for {} docs to {}", (Object)vectorBlobLength, (Object)totalLiveDocs, (Object)(blobName + ".knnvec"));
                this.blobContainer.writeBlob(blobName + ".knnvec", (InputStream)vectorStream, vectorBlobLength, true);
            }
            this.writeDocIds(knnVectorValuesSupplier.get(), vectorBlobLength, totalLiveDocs, blobName, this.blobContainer);
        }
    }

    private void writeDocIds(KNNVectorValues<?> knnVectorValues, long vectorBlobLength, long totalLiveDocs, String blobName, BlobContainer blobContainer) throws IOException {
        try (BufferedInputStream docStream = new BufferedInputStream(new DocIdInputStream(knnVectorValues));){
            log.debug("Writing {} bytes for {} docs ids to {}", (Object)vectorBlobLength, (Object)(totalLiveDocs * 4L), (Object)(blobName + ".knndid"));
            blobContainer.writeBlob(blobName + ".knndid", (InputStream)docStream, totalLiveDocs * 4L, true);
        }
    }

    private StreamContext getStreamContext(long partSize, long vectorBlobLength, Supplier<KNNVectorValues<?>> knnVectorValuesSupplier, VectorDataType vectorDataType) {
        long lastPartSize = vectorBlobLength % partSize != 0L ? vectorBlobLength % partSize : partSize;
        int numberOfParts = (int)(vectorBlobLength % partSize == 0L ? vectorBlobLength / partSize : vectorBlobLength / partSize + 1L);
        return new StreamContext(this.getTransferPartStreamSupplier(knnVectorValuesSupplier, vectorDataType), partSize, lastPartSize, numberOfParts);
    }

    private CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException> getTransferPartStreamSupplier(Supplier<KNNVectorValues<?>> knnVectorValuesSupplier, VectorDataType vectorDataType) {
        return (partNo, size, position) -> {
            log.info("Creating InputStream for partNo: {}, size: {}, position: {}", partNo, size, position);
            VectorValuesInputStream vectorValuesInputStream = new VectorValuesInputStream((KNNVectorValues)knnVectorValuesSupplier.get(), vectorDataType, (long)position, (long)size);
            return new InputStreamContainer((InputStream)vectorValuesInputStream, size.longValue(), position.longValue());
        };
    }

    private WriteContext createWriteContext(String blobName, long vectorBlobLength, Supplier<KNNVectorValues<?>> knnVectorValuesSupplier, VectorDataType vectorDataType) {
        return new WriteContext.Builder().fileName(blobName + ".knnvec").streamContextSupplier(partSize -> this.getStreamContext(partSize, vectorBlobLength, knnVectorValuesSupplier, vectorDataType)).fileSize(vectorBlobLength).failIfAlreadyExists(true).writePriority(WritePriority.NORMAL).uploadFinalizer(bool -> {}).build();
    }

    @Override
    public void readFromRepository(String fileName, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException {
        if (StringUtils.isBlank((String)fileName)) {
            throw new IllegalArgumentException("download path is null or empty");
        }
        if (!fileName.endsWith(KNNEngine.FAISS.getExtension())) {
            log.error("file name [{}] does not end with extension [{}}", (Object)fileName, (Object)KNNEngine.FAISS.getExtension());
            throw new IllegalArgumentException("download path has incorrect file extension");
        }
        InputStream graphStream = this.blobContainer.readBlob(fileName);
        indexOutputWithBuffer.writeFromStreamWithBuffer(graphStream);
    }

    @Generated
    public DefaultVectorRepositoryAccessor(BlobContainer blobContainer) {
        this.blobContainer = blobContainer;
    }
}

