/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.image.loader;

import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;

public class MetadataBatchLoader {
    private final Logger log;
    private final Time time;
    private final FaultHandler faultHandler;
    private final MetadataUpdater callback;
    private MetadataImage image;
    private MetadataDelta delta;
    private long lastOffset;
    private int lastEpoch;
    private long lastContainedLogTimeMs;
    private long numBytes;
    private int numBatches;
    private long totalBatchElapsedNs;
    private TransactionState transactionState;
    private boolean hasSeenRecord;

    public MetadataBatchLoader(LogContext logContext, Time time, FaultHandler faultHandler, MetadataUpdater callback) {
        this.log = logContext.logger(MetadataBatchLoader.class);
        this.time = time;
        this.faultHandler = faultHandler;
        this.callback = callback;
        this.resetToImage(MetadataImage.EMPTY);
        this.hasSeenRecord = false;
    }

    public boolean hasSeenRecord() {
        return this.hasSeenRecord;
    }

    public final void resetToImage(MetadataImage image) {
        this.image = image;
        this.hasSeenRecord = !image.isEmpty();
        this.delta = new MetadataDelta.Builder().setImage(image).build();
        this.transactionState = TransactionState.NO_TRANSACTION;
        this.lastOffset = image.provenance().lastContainedOffset();
        this.lastEpoch = image.provenance().lastContainedEpoch();
        this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs();
        this.numBytes = 0L;
        this.numBatches = 0;
        this.totalBatchElapsedNs = 0L;
    }

    public long loadBatch(Batch<ApiMessageAndVersion> batch, LeaderAndEpoch leaderAndEpoch) {
        long startNs = this.time.nanoseconds();
        int indexWithinBatch = 0;
        this.lastContainedLogTimeMs = batch.appendTimestamp();
        this.lastEpoch = batch.epoch();
        for (ApiMessageAndVersion record : batch.records()) {
            try {
                this.replay(record);
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Error loading metadata log record from offset " + (batch.baseOffset() + (long)indexWithinBatch), e);
            }
            if (this.transactionState == TransactionState.STARTED_TRANSACTION && (indexWithinBatch > 0 || this.numBatches > 0)) {
                MetadataProvenance provenance = new MetadataProvenance(this.lastOffset, this.lastEpoch, this.lastContainedLogTimeMs, indexWithinBatch == 0);
                LogDeltaManifest manifest = LogDeltaManifest.newBuilder().provenance(provenance).leaderAndEpoch(leaderAndEpoch).numBatches(this.numBatches).elapsedNs(this.totalBatchElapsedNs).numBytes(this.numBytes).build();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us. The delta is {}.", new Object[]{this.image.offset(), manifest.provenance().lastContainedOffset(), manifest.numBatches(), TimeUnit.NANOSECONDS.toMicros(manifest.elapsedNs()), provenance.isOffsetBatchAligned() ? "batch aligned" : "not batch aligned"});
                }
                this.applyDeltaAndUpdate(this.delta, manifest);
                this.transactionState = TransactionState.STARTED_TRANSACTION;
            }
            this.lastOffset = batch.baseOffset() + (long)indexWithinBatch;
            ++indexWithinBatch;
        }
        long elapsedNs = this.time.nanoseconds() - startNs;
        this.lastOffset = batch.lastOffset();
        this.numBytes += (long)batch.sizeInBytes();
        ++this.numBatches;
        this.totalBatchElapsedNs += elapsedNs;
        return this.totalBatchElapsedNs;
    }

    public void maybeFlushBatches(LeaderAndEpoch leaderAndEpoch, boolean isOffsetBatchAligned) {
        MetadataProvenance provenance = new MetadataProvenance(this.lastOffset, this.lastEpoch, this.lastContainedLogTimeMs, isOffsetBatchAligned);
        LogDeltaManifest manifest = LogDeltaManifest.newBuilder().provenance(provenance).leaderAndEpoch(leaderAndEpoch).numBatches(this.numBatches).elapsedNs(this.totalBatchElapsedNs).numBytes(this.numBytes).build();
        switch (this.transactionState) {
            case STARTED_TRANSACTION: 
            case CONTINUED_TRANSACTION: {
                this.log.debug("handleCommit: not publishing since a transaction starting at {} is still in progress. {} batch(es) processed so far.", (Object)this.image.offset(), (Object)this.numBatches);
                break;
            }
            case ABORTED_TRANSACTION: {
                this.log.debug("handleCommit: publishing empty delta between {} and {} from {} batch(es) since a transaction was aborted", new Object[]{this.image.offset(), manifest.provenance().lastContainedOffset(), manifest.numBatches()});
                this.applyDeltaAndUpdate(new MetadataDelta.Builder().setImage(this.image).build(), manifest);
                break;
            }
            case ENDED_TRANSACTION: 
            case NO_TRANSACTION: {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us.", new Object[]{this.image.offset(), manifest.provenance().lastContainedOffset(), manifest.numBatches(), TimeUnit.NANOSECONDS.toMicros(manifest.elapsedNs())});
                }
                this.applyDeltaAndUpdate(this.delta, manifest);
            }
        }
    }

    private void replay(ApiMessageAndVersion record) {
        MetadataRecordType type = MetadataRecordType.fromId(record.message().apiKey());
        switch (type) {
            case BEGIN_TRANSACTION_RECORD: {
                if (this.transactionState == TransactionState.STARTED_TRANSACTION || this.transactionState == TransactionState.CONTINUED_TRANSACTION) {
                    throw new RuntimeException("Encountered BeginTransactionRecord while already in a transaction");
                }
                this.transactionState = TransactionState.STARTED_TRANSACTION;
                break;
            }
            case END_TRANSACTION_RECORD: {
                if (this.transactionState == TransactionState.CONTINUED_TRANSACTION || this.transactionState == TransactionState.STARTED_TRANSACTION) {
                    this.transactionState = TransactionState.ENDED_TRANSACTION;
                    break;
                }
                throw new RuntimeException("Encountered EndTransactionRecord without having seen a BeginTransactionRecord");
            }
            case ABORT_TRANSACTION_RECORD: {
                if (this.transactionState == TransactionState.CONTINUED_TRANSACTION || this.transactionState == TransactionState.STARTED_TRANSACTION) {
                    this.transactionState = TransactionState.ABORTED_TRANSACTION;
                    break;
                }
                throw new RuntimeException("Encountered AbortTransactionRecord without having seen a BeginTransactionRecord");
            }
            default: {
                switch (this.transactionState) {
                    case STARTED_TRANSACTION: {
                        this.transactionState = TransactionState.CONTINUED_TRANSACTION;
                        break;
                    }
                    case ABORTED_TRANSACTION: 
                    case ENDED_TRANSACTION: {
                        this.transactionState = TransactionState.NO_TRANSACTION;
                        break;
                    }
                }
                this.hasSeenRecord = true;
                this.delta.replay(record.message());
            }
        }
    }

    private void applyDeltaAndUpdate(MetadataDelta delta, LogDeltaManifest manifest) {
        try {
            this.image = delta.apply(manifest.provenance());
        }
        catch (Throwable e) {
            this.faultHandler.handleFault("Error generating new metadata image from metadata delta between offset " + this.image.offset() + " and " + manifest.provenance().lastContainedOffset(), e);
        }
        this.callback.update(delta, this.image, manifest);
        this.resetToImage(this.image);
    }

    @FunctionalInterface
    public static interface MetadataUpdater {
        public void update(MetadataDelta var1, MetadataImage var2, LogDeltaManifest var3);
    }

    static enum TransactionState {
        NO_TRANSACTION,
        STARTED_TRANSACTION,
        CONTINUED_TRANSACTION,
        ENDED_TRANSACTION,
        ABORTED_TRANSACTION;

    }
}

