/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.share;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kafka.server.ReplicaManager;
import kafka.server.share.ShareFetchUtils;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.PartitionErrorData;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.server.share.persister.ReadShareGroupStateParameters;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.share.persister.WriteShareGroupStateParameters;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SharePartition {
    private static final Logger log = LoggerFactory.getLogger(SharePartition.class);
    static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
    private final String groupId;
    private final TopicIdPartition topicIdPartition;
    private final int leaderEpoch;
    private final NavigableMap<Long, InFlightBatch> cachedState;
    private final ReadWriteLock lock;
    private final AtomicBoolean findNextFetchOffset;
    private final AtomicBoolean fetchLock;
    private final int maxInFlightMessages;
    private final int maxDeliveryCount;
    private final GroupConfigManager groupConfigManager;
    private final int defaultRecordLockDurationMs;
    private final Timer timer;
    private final Time time;
    private final Persister persister;
    private final SharePartitionManager.SharePartitionListener listener;
    private long startOffset;
    private long endOffset;
    private final OffsetMetadata fetchOffsetMetadata;
    private int stateEpoch;
    private SharePartitionState partitionState;
    private final ReplicaManager replicaManager;

    SharePartition(String groupId, TopicIdPartition topicIdPartition, int leaderEpoch, int maxInFlightMessages, int maxDeliveryCount, int defaultRecordLockDurationMs, Timer timer, Time time, Persister persister, ReplicaManager replicaManager, GroupConfigManager groupConfigManager, SharePartitionManager.SharePartitionListener listener) {
        this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs, timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener);
    }

    SharePartition(String groupId, TopicIdPartition topicIdPartition, int leaderEpoch, int maxInFlightMessages, int maxDeliveryCount, int defaultRecordLockDurationMs, Timer timer, Time time, Persister persister, ReplicaManager replicaManager, GroupConfigManager groupConfigManager, SharePartitionState sharePartitionState, SharePartitionManager.SharePartitionListener listener) {
        this.groupId = groupId;
        this.topicIdPartition = topicIdPartition;
        this.leaderEpoch = leaderEpoch;
        this.maxInFlightMessages = maxInFlightMessages;
        this.maxDeliveryCount = maxDeliveryCount;
        this.cachedState = new ConcurrentSkipListMap<Long, InFlightBatch>();
        this.lock = new ReentrantReadWriteLock();
        this.findNextFetchOffset = new AtomicBoolean(false);
        this.fetchLock = new AtomicBoolean(false);
        this.defaultRecordLockDurationMs = defaultRecordLockDurationMs;
        this.timer = timer;
        this.time = time;
        this.persister = persister;
        this.partitionState = sharePartitionState;
        this.replicaManager = replicaManager;
        this.groupConfigManager = groupConfigManager;
        this.fetchOffsetMetadata = new OffsetMetadata();
        this.listener = listener;
    }

    public CompletableFuture<Void> maybeInitialize() {
        log.debug("Maybe initialize share partition: {}-{}", (Object)this.groupId, (Object)this.topicIdPartition);
        try {
            if (this.initializedOrThrowException()) {
                return CompletableFuture.completedFuture(null);
            }
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        try {
            if (!this.emptyToInitialState()) {
                return CompletableFuture.completedFuture(null);
            }
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.persister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(this.groupId).setTopicsData(Collections.singletonList(new TopicData(this.topicIdPartition.topicId(), Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData((int)this.topicIdPartition.partition(), (int)this.leaderEpoch))))).build()).build()).whenComplete((result, exception) -> {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [5[TRYBLOCK]], but top level block is 10[WHILELOOP]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1050)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        });
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long nextFetchOffset() {
        this.lock.writeLock().lock();
        try {
            if (!this.findNextFetchOffset.get()) {
                if (this.cachedState.isEmpty() || this.startOffset > this.cachedState.lastEntry().getValue().lastOffset()) {
                    long l = this.endOffset;
                    return l;
                }
                long l = this.endOffset + 1L;
                return l;
            }
            if (this.cachedState.isEmpty() || this.startOffset > this.cachedState.lastEntry().getValue().lastOffset()) {
                this.findNextFetchOffset.set(false);
                long l = this.endOffset;
                return l;
            }
            long nextFetchOffset = -1L;
            for (Map.Entry entry : this.cachedState.entrySet()) {
                if (((InFlightBatch)entry.getValue()).offsetState() == null) {
                    if (((InFlightBatch)entry.getValue()).batchState() != RecordState.AVAILABLE) continue;
                    nextFetchOffset = ((InFlightBatch)entry.getValue()).firstOffset();
                    break;
                }
                for (Map.Entry offsetState : ((InFlightBatch)entry.getValue()).offsetState().entrySet()) {
                    if (((InFlightState)offsetState.getValue()).state != RecordState.AVAILABLE) continue;
                    nextFetchOffset = (Long)offsetState.getKey();
                    break;
                }
                if (nextFetchOffset == -1L) continue;
                break;
            }
            if (nextFetchOffset == -1L) {
                this.findNextFetchOffset.set(false);
                nextFetchOffset = this.endOffset + 1L;
            }
            long l = nextFetchOffset;
            return l;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ShareAcquiredRecords acquire(String memberId, int maxFetchRecords, FetchPartitionData fetchPartitionData) {
        log.trace("Received acquire request for share partition: {}-{} memberId: {}", new Object[]{this.groupId, this.topicIdPartition, memberId});
        if (this.stateNotActive() || maxFetchRecords <= 0) {
            return ShareAcquiredRecords.empty();
        }
        RecordBatch lastBatch = fetchPartitionData.records.lastBatch().orElse(null);
        if (lastBatch == null) {
            return ShareAcquiredRecords.empty();
        }
        RecordBatch firstBatch = (RecordBatch)fetchPartitionData.records.batches().iterator().next();
        this.lock.writeLock().lock();
        try {
            NavigableMap<Long, InFlightBatch> subMap;
            long baseOffset = firstBatch.baseOffset();
            Map.Entry<Long, InFlightBatch> floorOffset = this.cachedState.floorEntry(baseOffset);
            if (floorOffset != null && floorOffset.getValue().lastOffset() >= baseOffset) {
                baseOffset = floorOffset.getKey();
            }
            if ((subMap = this.cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true)).isEmpty()) {
                log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", (Object)this.groupId, (Object)this.topicIdPartition);
                ShareFetchResponseData.AcquiredRecords acquiredRecords = this.acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), firstBatch.baseOffset(), lastBatch.lastOffset(), maxFetchRecords);
                ShareAcquiredRecords shareAcquiredRecords = ShareAcquiredRecords.fromAcquiredRecords((ShareFetchResponseData.AcquiredRecords)acquiredRecords);
                return shareAcquiredRecords;
            }
            log.trace("Overlap exists with in-flight records. Acquire the records if available for the share partition: {}-{}", (Object)this.groupId, (Object)this.topicIdPartition);
            ArrayList<ShareFetchResponseData.AcquiredRecords> result = new ArrayList<ShareFetchResponseData.AcquiredRecords>();
            int acquiredCount = 0;
            for (Map.Entry entry : subMap.entrySet()) {
                if (acquiredCount >= maxFetchRecords) break;
                InFlightBatch inFlightBatch = (InFlightBatch)entry.getValue();
                boolean fullMatch = this.checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastBatch.lastOffset());
                if (!fullMatch || inFlightBatch.offsetState() != null) {
                    log.trace("Subset or offset tracked batch record found for share partition, batch: {} request offsets - first: {}, last: {} for the share partition: {}-{}", new Object[]{inFlightBatch, firstBatch.baseOffset(), lastBatch.lastOffset(), this.groupId, this.topicIdPartition});
                    if (inFlightBatch.offsetState() == null) {
                        if (inFlightBatch.batchState() != RecordState.AVAILABLE || inFlightBatch.batchHasOngoingStateTransition()) {
                            log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {} skipping offset tracking for batch as well.", new Object[]{this.groupId, this.topicIdPartition, inFlightBatch});
                            continue;
                        }
                        inFlightBatch.maybeInitializeOffsetStateUpdate();
                    }
                    int acquiredSubsetCount = this.acquireSubsetBatchRecords(memberId, firstBatch.baseOffset(), lastBatch.lastOffset(), inFlightBatch, result);
                    acquiredCount += acquiredSubsetCount;
                    continue;
                }
                if (inFlightBatch.batchState() != RecordState.AVAILABLE || inFlightBatch.batchHasOngoingStateTransition()) {
                    log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {}", new Object[]{this.groupId, this.topicIdPartition, inFlightBatch});
                    continue;
                }
                InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, true, this.maxDeliveryCount, memberId);
                if (updateResult == null) {
                    log.info("Unable to acquire records for the batch: {} in share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
                    continue;
                }
                AcquisitionLockTimerTask acquisitionLockTimeoutTask = this.scheduleAcquisitionLockTimeout(memberId, inFlightBatch.firstOffset(), inFlightBatch.lastOffset());
                inFlightBatch.updateAcquisitionLockTimeout(acquisitionLockTimeoutTask);
                result.add(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(inFlightBatch.firstOffset()).setLastOffset(inFlightBatch.lastOffset()).setDeliveryCount((short)inFlightBatch.batchDeliveryCount()));
                acquiredCount += (int)(inFlightBatch.lastOffset() - inFlightBatch.firstOffset() + 1L);
            }
            if (acquiredCount < maxFetchRecords && subMap.lastEntry().getValue().lastOffset() < lastBatch.lastOffset()) {
                log.trace("There exists another batch which needs to be acquired as well");
                ShareFetchResponseData.AcquiredRecords acquiredRecords = this.acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), subMap.lastEntry().getValue().lastOffset() + 1L, lastBatch.lastOffset(), maxFetchRecords - acquiredCount);
                result.add(acquiredRecords);
                acquiredCount += (int)(acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1L);
            }
            ShareAcquiredRecords shareAcquiredRecords = new ShareAcquiredRecords(result, acquiredCount);
            return shareAcquiredRecords;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> acknowledge(String memberId, List<ShareAcknowledgementBatch> acknowledgementBatches) {
        log.trace("Acknowledgement batch request for share partition: {}-{}", (Object)this.groupId, (Object)this.topicIdPartition);
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Throwable throwable = null;
        ArrayList<InFlightState> updatedStates = new ArrayList<InFlightState>();
        ArrayList<PersisterStateBatch> stateBatches = new ArrayList<PersisterStateBatch>();
        this.lock.writeLock().lock();
        try {
            for (ShareAcknowledgementBatch batch : acknowledgementBatches) {
                NavigableMap<Long, InFlightBatch> subMap;
                Map<Long, RecordState> recordStateMap;
                try {
                    recordStateMap = this.fetchRecordStateMapForAcknowledgementBatch(batch);
                }
                catch (IllegalArgumentException e) {
                    log.debug("Invalid acknowledge type: {} for share partition: {}-{}", new Object[]{batch.acknowledgeTypes(), this.groupId, this.topicIdPartition});
                    throwable = new InvalidRequestException("Invalid acknowledge type: " + String.valueOf(batch.acknowledgeTypes()));
                    break;
                }
                if (batch.lastOffset() < this.startOffset) {
                    log.trace("All offsets in the acknowledgement batch {} are already archived: {}-{}", new Object[]{batch, this.groupId, this.topicIdPartition});
                    continue;
                }
                try {
                    subMap = this.fetchSubMapForAcknowledgementBatch(batch);
                }
                catch (InvalidRecordStateException | InvalidRequestException e) {
                    throwable = e;
                    break;
                }
                Optional<Throwable> ackThrowable = this.acknowledgeBatchRecords(memberId, batch, recordStateMap, subMap, updatedStates, stateBatches);
                if (!ackThrowable.isPresent()) continue;
                throwable = ackThrowable.get();
                break;
            }
            this.rollbackOrProcessStateUpdates(future, throwable, updatedStates, stateBatches);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> releaseAcquiredRecords(String memberId) {
        log.trace("Release acquired records request for share partition: {}-{} memberId: {}", new Object[]{this.groupId, this.topicIdPartition, memberId});
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Throwable throwable = null;
        ArrayList<InFlightState> updatedStates = new ArrayList<InFlightState>();
        ArrayList<PersisterStateBatch> stateBatches = new ArrayList<PersisterStateBatch>();
        this.lock.writeLock().lock();
        try {
            RecordState recordState = RecordState.AVAILABLE;
            for (Map.Entry entry : this.cachedState.entrySet()) {
                Optional<Throwable> releaseAcquiredRecordsThrowable;
                InFlightBatch inFlightBatch = (InFlightBatch)entry.getValue();
                if (inFlightBatch.offsetState() == null && inFlightBatch.batchState() == RecordState.ACQUIRED && inFlightBatch.batchMemberId().equals(memberId) && this.checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset())) {
                    inFlightBatch.maybeInitializeOffsetStateUpdate();
                }
                if (inFlightBatch.offsetState() != null) {
                    releaseAcquiredRecordsThrowable = this.releaseAcquiredRecordsForPerOffsetBatch(memberId, inFlightBatch, recordState, updatedStates, stateBatches);
                    if (!releaseAcquiredRecordsThrowable.isPresent()) continue;
                    throwable = releaseAcquiredRecordsThrowable.get();
                    break;
                }
                releaseAcquiredRecordsThrowable = this.releaseAcquiredRecordsForCompleteBatch(memberId, inFlightBatch, recordState, updatedStates, stateBatches);
                if (!releaseAcquiredRecordsThrowable.isPresent()) continue;
                throwable = releaseAcquiredRecordsThrowable.get();
                break;
            }
            this.rollbackOrProcessStateUpdates(future, throwable, updatedStates, stateBatches);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return future;
    }

    private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String memberId, InFlightBatch inFlightBatch, RecordState recordState, List<InFlightState> updatedStates, List<PersisterStateBatch> stateBatches) {
        log.trace("Offset tracked batch record found, batch: {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
        for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) {
            if (!((InFlightState)offsetState.getValue()).memberId().equals(memberId) && !((InFlightState)offsetState.getValue()).memberId().equals(EMPTY_MEMBER_ID)) {
                log.debug("Member {} is not the owner of offset: {} in batch: {} for the share partition: {}-{}. Skipping offset.", new Object[]{memberId, offsetState.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                return Optional.empty();
            }
            if (((InFlightState)offsetState.getValue()).state != RecordState.ACQUIRED) continue;
            InFlightState updateResult = ((InFlightState)offsetState.getValue()).startStateTransition((Long)offsetState.getKey() < this.startOffset ? RecordState.ARCHIVED : recordState, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
            if (updateResult == null) {
                log.debug("Unable to release records from acquired state for the offset: {} in batch: {} for the share partition: {}-{}", new Object[]{offsetState.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the offset"));
            }
            updatedStates.add(updateResult);
            stateBatches.add(new PersisterStateBatch(((Long)offsetState.getKey()).longValue(), ((Long)offsetState.getKey()).longValue(), updateResult.state.id, (short)updateResult.deliveryCount));
            if (updateResult.state == RecordState.ARCHIVED) continue;
            this.findNextFetchOffset.set(true);
        }
        return Optional.empty();
    }

    private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String memberId, InFlightBatch inFlightBatch, RecordState recordState, List<InFlightState> updatedStates, List<PersisterStateBatch> stateBatches) {
        if (!inFlightBatch.batchMemberId().equals(memberId) && !inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) {
            log.debug("Member {} is not the owner of batch record {} for share partition: {}-{}. Skipping batch.", new Object[]{memberId, inFlightBatch, this.groupId, this.topicIdPartition});
            return Optional.empty();
        }
        log.trace("Releasing acquired records for complete batch {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
        if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
            InFlightState updateResult = inFlightBatch.startBatchStateTransition(inFlightBatch.lastOffset() < this.startOffset ? RecordState.ARCHIVED : recordState, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
            if (updateResult == null) {
                log.debug("Unable to release records from acquired state for the batch: {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
                return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the batch"));
            }
            updatedStates.add(updateResult);
            stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), updateResult.state.id, (short)updateResult.deliveryCount));
            if (updateResult.state != RecordState.ARCHIVED) {
                this.findNextFetchOffset.set(true);
            }
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void updateCacheAndOffsets(long logStartOffset) {
        this.lock.writeLock().lock();
        try {
            if (logStartOffset <= this.startOffset) {
                log.error("The log start offset: {} is not greater than the start offset: {} for the share partition: {}-{}", new Object[]{logStartOffset, this.startOffset, this.groupId, this.topicIdPartition});
                return;
            }
            log.debug("Updating start offset for share partition: {}-{} from: {} to: {} since LSO has moved to: {}", new Object[]{this.groupId, this.topicIdPartition, this.startOffset, logStartOffset, logStartOffset});
            if (this.cachedState.isEmpty()) {
                this.startOffset = logStartOffset;
                this.endOffset = logStartOffset;
                return;
            }
            boolean anyRecordArchived = this.archiveAvailableRecordsOnLsoMovement(logStartOffset);
            if (anyRecordArchived) {
                this.findNextFetchOffset.set(true);
            }
            this.startOffset = logStartOffset;
            if (this.endOffset < this.startOffset) {
                this.endOffset = this.startOffset;
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean archiveAvailableRecordsOnLsoMovement(long logStartOffset) {
        this.lock.writeLock().lock();
        try {
            Map.Entry entry;
            long batchStartOffset;
            boolean isAnyOffsetArchived = false;
            boolean isAnyBatchArchived = false;
            Iterator iterator = this.cachedState.entrySet().iterator();
            while (iterator.hasNext() && (batchStartOffset = ((Long)(entry = iterator.next()).getKey()).longValue()) < logStartOffset) {
                InFlightBatch inFlightBatch = (InFlightBatch)entry.getValue();
                boolean fullMatch = this.checkForFullMatch(inFlightBatch, this.startOffset, logStartOffset - 1L);
                if (!fullMatch || inFlightBatch.offsetState() != null) {
                    log.debug("Subset or offset tracked batch record found while trying to update offsets and cached state map due to LSO movement, batch: {}, offsets to update - first: {}, last: {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.startOffset, logStartOffset - 1L, this.groupId, this.topicIdPartition});
                    if (inFlightBatch.offsetState() == null) {
                        if (inFlightBatch.batchState() != RecordState.AVAILABLE) continue;
                        inFlightBatch.maybeInitializeOffsetStateUpdate();
                    }
                    isAnyOffsetArchived = isAnyOffsetArchived || this.archivePerOffsetBatchRecords(inFlightBatch, this.startOffset, logStartOffset - 1L);
                    continue;
                }
                isAnyBatchArchived = isAnyBatchArchived || this.archiveCompleteBatch(inFlightBatch);
            }
            boolean bl = isAnyOffsetArchived || isAnyBatchArchived;
            return bl;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean archivePerOffsetBatchRecords(InFlightBatch inFlightBatch, long startOffsetToArchive, long endOffsetToArchive) {
        this.lock.writeLock().lock();
        try {
            boolean isAnyOffsetArchived = false;
            log.trace("Archiving offset tracked batch: {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
            for (Map.Entry offsetState : inFlightBatch.offsetState().entrySet()) {
                if ((Long)offsetState.getKey() < startOffsetToArchive) continue;
                if ((Long)offsetState.getKey() > endOffsetToArchive) break;
                if (((InFlightState)offsetState.getValue()).state != RecordState.AVAILABLE) continue;
                ((InFlightState)offsetState.getValue()).archive(EMPTY_MEMBER_ID);
                isAnyOffsetArchived = true;
            }
            boolean bl = isAnyOffsetArchived;
            return bl;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean archiveCompleteBatch(InFlightBatch inFlightBatch) {
        this.lock.writeLock().lock();
        try {
            log.trace("Archiving complete batch: {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
            if (inFlightBatch.batchState() == RecordState.AVAILABLE) {
                inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
                boolean bl = true;
                return bl;
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return false;
    }

    boolean canAcquireRecords() {
        long numRecords;
        if (this.nextFetchOffset() != this.endOffset() + 1L) {
            return true;
        }
        this.lock.readLock().lock();
        try {
            numRecords = this.cachedState.isEmpty() ? 0L : this.endOffset - this.startOffset + 1L;
        }
        finally {
            this.lock.readLock().unlock();
        }
        return numRecords < (long)this.maxInFlightMessages;
    }

    boolean maybeAcquireFetchLock() {
        if (this.stateNotActive()) {
            return false;
        }
        return this.fetchLock.compareAndSet(false, true);
    }

    void releaseFetchLock() {
        this.fetchLock.set(false);
    }

    void markFenced() {
        this.lock.writeLock().lock();
        try {
            this.partitionState = SharePartitionState.FENCED;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    SharePartitionManager.SharePartitionListener listener() {
        return this.listener;
    }

    int leaderEpoch() {
        return this.leaderEpoch;
    }

    private boolean stateNotActive() {
        return this.partitionState() != SharePartitionState.ACTIVE;
    }

    private boolean emptyToInitialState() {
        this.lock.writeLock().lock();
        try {
            if (this.initializedOrThrowException()) {
                boolean bl = false;
                return bl;
            }
            this.partitionState = SharePartitionState.INITIALIZING;
            boolean bl = true;
            return bl;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean initializedOrThrowException() {
        SharePartitionState currentState = this.partitionState();
        return switch (currentState) {
            default -> throw new IncompatibleClassChangeError();
            case SharePartitionState.ACTIVE -> true;
            case SharePartitionState.FAILED -> throw new IllegalStateException(String.format("Share partition failed to load %s-%s", this.groupId, this.topicIdPartition));
            case SharePartitionState.INITIALIZING -> throw new LeaderNotAvailableException(String.format("Share partition is already initializing %s-%s", this.groupId, this.topicIdPartition));
            case SharePartitionState.FENCED -> throw new FencedStateEpochException(String.format("Share partition is fenced %s-%s", this.groupId, this.topicIdPartition));
            case SharePartitionState.EMPTY -> false;
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ShareFetchResponseData.AcquiredRecords acquireNewBatchRecords(String memberId, Iterable<? extends RecordBatch> batches, long firstOffset, long lastOffset, int maxFetchRecords) {
        this.lock.writeLock().lock();
        try {
            long lastAcquiredOffset;
            long firstAcquiredOffset = firstOffset;
            if (this.cachedState.isEmpty() && this.endOffset > firstAcquiredOffset) {
                firstAcquiredOffset = this.endOffset;
            }
            if ((long)maxFetchRecords < (lastAcquiredOffset = lastOffset) - firstAcquiredOffset + 1L) {
                lastAcquiredOffset = this.lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + (long)maxFetchRecords - 1L);
            }
            AcquisitionLockTimerTask timerTask = this.scheduleAcquisitionLockTimeout(memberId, firstAcquiredOffset, lastAcquiredOffset);
            this.cachedState.put(firstAcquiredOffset, new InFlightBatch(memberId, firstAcquiredOffset, lastAcquiredOffset, RecordState.ACQUIRED, 1, timerTask));
            if ((Long)this.cachedState.firstKey() == firstAcquiredOffset) {
                this.startOffset = firstAcquiredOffset;
            }
            this.endOffset = lastAcquiredOffset;
            ShareFetchResponseData.AcquiredRecords acquiredRecords = new ShareFetchResponseData.AcquiredRecords().setFirstOffset(firstAcquiredOffset).setLastOffset(lastAcquiredOffset).setDeliveryCount((short)1);
            return acquiredRecords;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int acquireSubsetBatchRecords(String memberId, long requestFirstOffset, long requestLastOffset, InFlightBatch inFlightBatch, List<ShareFetchResponseData.AcquiredRecords> result) {
        this.lock.writeLock().lock();
        int acquiredCount = 0;
        try {
            for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) {
                if ((Long)offsetState.getKey() < requestFirstOffset) continue;
                if ((Long)offsetState.getKey() > requestLastOffset) {
                    break;
                }
                if (((InFlightState)offsetState.getValue()).state != RecordState.AVAILABLE || ((InFlightState)offsetState.getValue()).hasOngoingStateTransition()) {
                    log.trace("The offset {} is not available in share partition: {}-{}, skipping: {}", new Object[]{offsetState.getKey(), this.groupId, this.topicIdPartition, inFlightBatch});
                    continue;
                }
                InFlightState updateResult = ((InFlightState)offsetState.getValue()).tryUpdateState(RecordState.ACQUIRED, true, this.maxDeliveryCount, memberId);
                if (updateResult == null) {
                    log.trace("Unable to acquire records for the offset: {} in batch: {} for the share partition: {}-{}", new Object[]{offsetState.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                    continue;
                }
                AcquisitionLockTimerTask acquisitionLockTimeoutTask = this.scheduleAcquisitionLockTimeout(memberId, (Long)offsetState.getKey(), (Long)offsetState.getKey());
                ((InFlightState)offsetState.getValue()).updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
                result.add(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(((Long)offsetState.getKey()).longValue()).setLastOffset(((Long)offsetState.getKey()).longValue()).setDeliveryCount((short)((InFlightState)offsetState.getValue()).deliveryCount));
                ++acquiredCount;
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return acquiredCount;
    }

    private boolean checkForFullMatch(InFlightBatch inFlightBatch, long firstOffsetToCompare, long lastOffsetToCompare) {
        return inFlightBatch.firstOffset() >= firstOffsetToCompare && inFlightBatch.lastOffset() <= lastOffsetToCompare;
    }

    private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batchLastOffset) {
        long localStartOffset = this.startOffset();
        return batchFirstOffset < localStartOffset && batchLastOffset >= localStartOffset;
    }

    private Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch(ShareAcknowledgementBatch batch) {
        HashMap<Long, RecordState> recordStateMap = new HashMap<Long, RecordState>();
        for (int index = 0; index < batch.acknowledgeTypes().size(); ++index) {
            recordStateMap.put(batch.firstOffset() + (long)index, SharePartition.fetchRecordState((Byte)batch.acknowledgeTypes().get(index)));
        }
        return recordStateMap;
    }

    private static RecordState fetchRecordState(byte acknowledgeType) {
        switch (acknowledgeType) {
            case 1: {
                return RecordState.ACKNOWLEDGED;
            }
            case 2: {
                return RecordState.AVAILABLE;
            }
            case 0: 
            case 3: {
                return RecordState.ARCHIVED;
            }
        }
        throw new IllegalArgumentException("Invalid acknowledge type: " + acknowledgeType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private NavigableMap<Long, InFlightBatch> fetchSubMapForAcknowledgementBatch(ShareAcknowledgementBatch batch) {
        this.lock.writeLock().lock();
        try {
            Map.Entry<Long, InFlightBatch> floorOffset = this.cachedState.floorEntry(batch.firstOffset());
            if (floorOffset == null) {
                boolean hasStartOffsetMoved = this.checkForStartOffsetWithinBatch(batch.firstOffset(), batch.lastOffset());
                if (hasStartOffsetMoved) {
                    floorOffset = this.cachedState.floorEntry(this.startOffset);
                } else {
                    log.debug("Batch record {} not found for share partition: {}-{}", new Object[]{batch, this.groupId, this.topicIdPartition});
                    throw new InvalidRecordStateException("Batch record not found. The request batch offsets are not found in the cache.");
                }
            }
            NavigableMap<Long, InFlightBatch> subMap = this.cachedState.subMap(floorOffset.getKey(), true, batch.lastOffset(), true);
            if (subMap.lastEntry().getValue().lastOffset < batch.firstOffset()) {
                log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", new Object[]{batch, this.groupId, this.topicIdPartition});
                throw new InvalidRequestException("Batch record not found. The first offset in request is past acquired records.");
            }
            if (batch.lastOffset() > subMap.lastEntry().getValue().lastOffset) {
                log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", new Object[]{batch, this.groupId, this.topicIdPartition});
                throw new InvalidRequestException("Batch record not found. The last offset in request is past acquired records.");
            }
            NavigableMap<Long, InFlightBatch> navigableMap = subMap;
            return navigableMap;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<Throwable> acknowledgeBatchRecords(String memberId, ShareAcknowledgementBatch batch, Map<Long, RecordState> recordStateMap, NavigableMap<Long, InFlightBatch> subMap, List<InFlightState> updatedStates, List<PersisterStateBatch> stateBatches) {
        this.lock.writeLock().lock();
        try {
            for (Map.Entry entry : subMap.entrySet()) {
                Optional<Throwable> throwable;
                InFlightBatch inFlightBatch = (InFlightBatch)entry.getValue();
                if (inFlightBatch.lastOffset() < this.startOffset) {
                    log.trace("All offsets in the inflight batch {} are already archived: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
                    continue;
                }
                if (inFlightBatch.offsetState() == null && (throwable = this.validateAcknowledgementBatchMemberId(memberId, inFlightBatch)).isPresent()) {
                    Optional<Throwable> optional = throwable;
                    return optional;
                }
                boolean fullMatch = this.checkForFullMatch(inFlightBatch, batch.firstOffset(), batch.lastOffset());
                boolean isPerOffsetClientAck = batch.acknowledgeTypes().size() > 1;
                boolean hasStartOffsetMoved = this.checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset());
                if (!fullMatch || inFlightBatch.offsetState() != null || isPerOffsetClientAck || hasStartOffsetMoved) {
                    log.debug("Subset or offset tracked batch record found for acknowledgement, batch: {}, request offsets - first: {}, last: {}, client per offsetstate {} for the share partition: {}-{}", new Object[]{inFlightBatch, batch.firstOffset(), batch.lastOffset(), isPerOffsetClientAck, this.groupId, this.topicIdPartition});
                    if (inFlightBatch.offsetState() == null) {
                        if (inFlightBatch.batchState() != RecordState.ACQUIRED) {
                            log.debug("The batch is not in the acquired state: {} for share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
                            Optional<Throwable> optional = Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The subset batch is not in the acquired state."));
                            return optional;
                        }
                        inFlightBatch.maybeInitializeOffsetStateUpdate();
                    }
                    throwable = this.acknowledgePerOffsetBatchRecords(memberId, batch, inFlightBatch, recordStateMap, updatedStates, stateBatches);
                } else {
                    throwable = this.acknowledgeCompleteBatch(batch, inFlightBatch, recordStateMap.get(batch.firstOffset()), updatedStates, stateBatches);
                }
                if (!throwable.isPresent()) continue;
                Optional<Throwable> optional = throwable;
                return optional;
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return Optional.empty();
    }

    private Optional<Throwable> validateAcknowledgementBatchMemberId(String memberId, InFlightBatch inFlightBatch) {
        if (inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) {
            log.debug("The batch is not in the acquired state: {} for share partition: {}-{}. Empty member id for batch.", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
            return Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The batch is not in the acquired state."));
        }
        if (!inFlightBatch.batchMemberId().equals(memberId)) {
            log.debug("Member {} is not the owner of batch record {} for share partition: {}-{}", new Object[]{memberId, inFlightBatch, this.groupId, this.topicIdPartition});
            return Optional.of(new InvalidRecordStateException("Member is not the owner of batch record"));
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<Throwable> acknowledgePerOffsetBatchRecords(String memberId, ShareAcknowledgementBatch batch, InFlightBatch inFlightBatch, Map<Long, RecordState> recordStateMap, List<InFlightState> updatedStates, List<PersisterStateBatch> stateBatches) {
        this.lock.writeLock().lock();
        try {
            RecordState recordStateDefault = recordStateMap.get(batch.firstOffset());
            for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) {
                Optional<Throwable> optional;
                if ((Long)offsetState.getKey() < batch.firstOffset() || (Long)offsetState.getKey() < this.startOffset) continue;
                if ((Long)offsetState.getKey() > batch.lastOffset()) {
                    break;
                }
                if (((InFlightState)offsetState.getValue()).state != RecordState.ACQUIRED) {
                    log.debug("The offset is not acquired, offset: {} batch: {} for the share partition: {}-{}", new Object[]{offsetState.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                    optional = Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The offset is not acquired."));
                    return optional;
                }
                if (!((InFlightState)offsetState.getValue()).memberId.equals(memberId)) {
                    log.debug("Member {} is not the owner of offset: {} in batch: {} for the share partition: {}-{}", new Object[]{memberId, offsetState.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                    optional = Optional.of(new InvalidRecordStateException("Member is not the owner of offset"));
                    return optional;
                }
                RecordState recordState = recordStateMap.size() > 1 ? recordStateMap.get(offsetState.getKey()) : recordStateDefault;
                InFlightState updateResult = ((InFlightState)offsetState.getValue()).startStateTransition(recordState, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
                if (updateResult == null) {
                    log.debug("Unable to acknowledge records for the offset: {} in batch: {} for the share partition: {}-{}", new Object[]{offsetState.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                    Optional<Throwable> optional2 = Optional.of(new InvalidRecordStateException("Unable to acknowledge records for the batch"));
                    return optional2;
                }
                updatedStates.add(updateResult);
                stateBatches.add(new PersisterStateBatch(((Long)offsetState.getKey()).longValue(), ((Long)offsetState.getKey()).longValue(), updateResult.state.id, (short)updateResult.deliveryCount));
                if (recordState != RecordState.AVAILABLE || updateResult.state == RecordState.ARCHIVED) continue;
                this.findNextFetchOffset.set(true);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<Throwable> acknowledgeCompleteBatch(ShareAcknowledgementBatch batch, InFlightBatch inFlightBatch, RecordState recordState, List<InFlightState> updatedStates, List<PersisterStateBatch> stateBatches) {
        this.lock.writeLock().lock();
        try {
            log.trace("Acknowledging complete batch record {} for the share partition: {}-{}", new Object[]{batch, this.groupId, this.topicIdPartition});
            if (inFlightBatch.batchState() != RecordState.ACQUIRED) {
                log.debug("The batch is not in the acquired state: {} for share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
                Optional<Throwable> optional = Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The batch is not in the acquired state."));
                return optional;
            }
            InFlightState updateResult = inFlightBatch.startBatchStateTransition(recordState, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
            if (updateResult == null) {
                log.debug("Unable to acknowledge records for the batch: {} with state: {} for the share partition: {}-{}", new Object[]{inFlightBatch, recordState, this.groupId, this.topicIdPartition});
                Optional<Throwable> optional = Optional.of(new InvalidRecordStateException("Unable to acknowledge records for the batch"));
                return optional;
            }
            updatedStates.add(updateResult);
            stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset, inFlightBatch.lastOffset, updateResult.state.id, (short)updateResult.deliveryCount));
            if (recordState == RecordState.AVAILABLE && updateResult.state != RecordState.ARCHIVED) {
                this.findNextFetchOffset.set(true);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void updateFetchOffsetMetadata(long nextFetchOffset, LogOffsetMetadata logOffsetMetadata) {
        this.lock.writeLock().lock();
        try {
            this.fetchOffsetMetadata.updateOffsetMetadata(nextFetchOffset, logOffsetMetadata);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Optional<LogOffsetMetadata> fetchOffsetMetadata(long nextFetchOffset) {
        this.lock.readLock().lock();
        try {
            if (this.fetchOffsetMetadata.offsetMetadata() == null || this.fetchOffsetMetadata.offset() != nextFetchOffset) {
                Optional<LogOffsetMetadata> optional = Optional.empty();
                return optional;
            }
            Optional<LogOffsetMetadata> optional = Optional.of(this.fetchOffsetMetadata.offsetMetadata());
            return optional;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    SharePartitionState partitionState() {
        this.lock.readLock().lock();
        try {
            SharePartitionState sharePartitionState = this.partitionState;
            return sharePartitionState;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void rollbackOrProcessStateUpdates(CompletableFuture<Void> future, Throwable throwable, List<InFlightState> updatedStates, List<PersisterStateBatch> stateBatches) {
        this.lock.writeLock().lock();
        try {
            if (throwable != null) {
                log.debug("Request failed for updating state, rollback any changed state for the share partition: {}-{}", (Object)this.groupId, (Object)this.topicIdPartition);
                updatedStates.forEach(state -> state.completeStateTransition(false));
                future.completeExceptionally(throwable);
                return;
            }
            if (stateBatches.isEmpty() && updatedStates.isEmpty()) {
                future.complete(null);
                return;
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        this.writeShareGroupState(stateBatches).whenComplete((result, exception) -> {
            this.lock.writeLock().lock();
            try {
                if (exception != null) {
                    log.error("Failed to write state to persister for the share partition: {}-{}", new Object[]{this.groupId, this.topicIdPartition, exception});
                    updatedStates.forEach(state -> state.completeStateTransition(false));
                    future.completeExceptionally((Throwable)exception);
                    return;
                }
                log.trace("State change request successful for share partition: {}-{}", (Object)this.groupId, (Object)this.topicIdPartition);
                updatedStates.forEach(state -> {
                    state.completeStateTransition(true);
                    state.cancelAndClearAcquisitionLockTimeoutTask();
                });
                this.maybeUpdateCachedStateAndOffsets();
                future.complete(null);
            }
            finally {
                this.lock.writeLock().unlock();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void maybeUpdateCachedStateAndOffsets() {
        this.lock.writeLock().lock();
        try {
            long lastKeyToRemove;
            if (!this.canMoveStartOffset()) {
                return;
            }
            long lastOffsetAcknowledged = this.findLastOffsetAcknowledged();
            if (lastOffsetAcknowledged == -1L) {
                return;
            }
            long lastCachedOffset = this.cachedState.lastEntry().getValue().lastOffset();
            if (lastOffsetAcknowledged == lastCachedOffset) {
                this.startOffset = lastCachedOffset + 1L;
                this.endOffset = lastCachedOffset + 1L;
                this.cachedState.clear();
                return;
            }
            long firstKeyToRemove = (Long)this.cachedState.firstKey();
            Map.Entry<Long, InFlightBatch> entry = this.cachedState.floorEntry(lastOffsetAcknowledged);
            if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
                this.startOffset = this.cachedState.higherKey(lastOffsetAcknowledged);
                lastKeyToRemove = entry.getKey();
            } else {
                this.startOffset = lastOffsetAcknowledged + 1L;
                lastKeyToRemove = entry.getKey().equals(this.cachedState.firstKey()) ? -1L : this.cachedState.lowerKey(entry.getKey());
            }
            if (lastKeyToRemove != -1L) {
                NavigableMap<Long, InFlightBatch> subMap = this.cachedState.subMap(firstKeyToRemove, true, lastKeyToRemove, true);
                for (Long key : subMap.keySet()) {
                    this.cachedState.remove(key);
                }
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean canMoveStartOffset() {
        if (this.cachedState.isEmpty()) {
            return false;
        }
        Map.Entry<Long, InFlightBatch> entry = this.cachedState.floorEntry(this.startOffset);
        if (entry == null) {
            log.error("The start offset: {} is not found in the cached state for share partition: {}-{}. Cannot move the start offset.", new Object[]{this.startOffset, this.groupId, this.topicIdPartition});
            return false;
        }
        RecordState startOffsetState = entry.getValue().offsetState == null ? entry.getValue().batchState() : ((InFlightState)entry.getValue().offsetState().get(this.startOffset)).state();
        return this.isRecordStateAcknowledged(startOffsetState);
    }

    private boolean isRecordStateAcknowledged(RecordState recordState) {
        return recordState == RecordState.ACKNOWLEDGED || recordState == RecordState.ARCHIVED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long findLastOffsetAcknowledged() {
        this.lock.readLock().lock();
        long lastOffsetAcknowledged = -1L;
        try {
            for (Map.Entry entry : this.cachedState.entrySet()) {
                InFlightBatch inFlightBatch = (InFlightBatch)entry.getValue();
                if (inFlightBatch.offsetState() == null) {
                    if (!this.isRecordStateAcknowledged(inFlightBatch.batchState())) {
                        long l = lastOffsetAcknowledged;
                        return l;
                    }
                    lastOffsetAcknowledged = inFlightBatch.lastOffset();
                    continue;
                }
                for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) {
                    if (!this.isRecordStateAcknowledged(((InFlightState)offsetState.getValue()).state())) {
                        long l = lastOffsetAcknowledged;
                        return l;
                    }
                    lastOffsetAcknowledged = (Long)offsetState.getKey();
                }
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
        return lastOffsetAcknowledged;
    }

    private long lastOffsetFromBatchWithRequestOffset(Iterable<? extends RecordBatch> batches, long offset) {
        RecordBatch previousBatch = null;
        for (RecordBatch recordBatch : batches) {
            if (offset < recordBatch.baseOffset()) break;
            previousBatch = recordBatch;
        }
        if (previousBatch != null && offset <= previousBatch.lastOffset()) {
            return previousBatch.lastOffset();
        }
        return offset;
    }

    CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> stateBatches) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.persister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(this.groupId).setTopicsData(Collections.singletonList(new TopicData(this.topicIdPartition.topicId(), Collections.singletonList(PartitionFactory.newPartitionStateBatchData((int)this.topicIdPartition.partition(), (int)this.stateEpoch, (long)this.startOffset, (int)this.leaderEpoch, stateBatches))))).build()).build()).whenComplete((result, exception) -> {
            if (exception != null) {
                log.error("Failed to write the share group state for share partition: {}-{}", new Object[]{this.groupId, this.topicIdPartition, exception});
                future.completeExceptionally(new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s", this.groupId, this.topicIdPartition), (Throwable)exception));
                return;
            }
            if (result == null || result.topicsData() == null || result.topicsData().size() != 1) {
                log.error("Failed to write the share group state for share partition: {}-{}. Invalid state found: {}", new Object[]{this.groupId, this.topicIdPartition, result});
                future.completeExceptionally(new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s", this.groupId, this.topicIdPartition)));
                return;
            }
            TopicData state = (TopicData)result.topicsData().get(0);
            if (state.topicId() != this.topicIdPartition.topicId() || state.partitions().size() != 1 || ((PartitionErrorData)state.partitions().get(0)).partition() != this.topicIdPartition.partition()) {
                log.error("Failed to write the share group state for share partition: {}-{}. Invalid topic partition response: {}", new Object[]{this.groupId, this.topicIdPartition, result});
                future.completeExceptionally(new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s", this.groupId, this.topicIdPartition)));
                return;
            }
            PartitionErrorData partitionData = (PartitionErrorData)state.partitions().get(0);
            if (partitionData.errorCode() != Errors.NONE.code()) {
                KafkaException ex = this.fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
                log.error("Failed to write the share group state for share partition: {}-{} due to exception", new Object[]{this.groupId, this.topicIdPartition, ex});
                future.completeExceptionally((Throwable)ex);
                return;
            }
            future.complete(null);
        });
        return future;
    }

    private KafkaException fetchPersisterError(short errorCode, String errorMessage) {
        Errors error = Errors.forCode((short)errorCode);
        switch (error) {
            case NOT_COORDINATOR: 
            case COORDINATOR_NOT_AVAILABLE: 
            case COORDINATOR_LOAD_IN_PROGRESS: {
                return new CoordinatorNotAvailableException(errorMessage);
            }
            case GROUP_ID_NOT_FOUND: {
                return new GroupIdNotFoundException(errorMessage);
            }
            case UNKNOWN_TOPIC_OR_PARTITION: {
                return new UnknownTopicOrPartitionException(errorMessage);
            }
            case FENCED_STATE_EPOCH: {
                return new FencedStateEpochException(errorMessage);
            }
            case FENCED_LEADER_EPOCH: {
                return new NotLeaderOrFollowerException(errorMessage);
            }
        }
        return new UnknownServerException(errorMessage);
    }

    AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) {
        int recordLockDurationMs = this.groupConfigManager.groupConfig(this.groupId).isPresent() ? ((GroupConfig)this.groupConfigManager.groupConfig(this.groupId).get()).shareRecordLockDurationMs() : this.defaultRecordLockDurationMs;
        return this.scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs);
    }

    private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset, long delayMs) {
        AcquisitionLockTimerTask acquisitionLockTimerTask = this.acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs);
        this.timer.add((TimerTask)acquisitionLockTimerTask);
        return acquisitionLockTimerTask;
    }

    private AcquisitionLockTimerTask acquisitionLockTimerTask(String memberId, long firstOffset, long lastOffset, long delayMs) {
        return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset, lastOffset);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) {
        ArrayList<PersisterStateBatch> stateBatches;
        this.lock.writeLock().lock();
        try {
            Map.Entry<Long, InFlightBatch> floorOffset = this.cachedState.floorEntry(firstOffset);
            if (floorOffset == null) {
                log.error("Base offset {} not found for share partition: {}-{}", new Object[]{firstOffset, this.groupId, this.topicIdPartition});
                return;
            }
            stateBatches = new ArrayList<PersisterStateBatch>();
            NavigableMap<Long, InFlightBatch> subMap = this.cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
            for (Map.Entry entry : subMap.entrySet()) {
                InFlightBatch inFlightBatch = (InFlightBatch)entry.getValue();
                if (inFlightBatch.offsetState() == null && inFlightBatch.batchState() == RecordState.ACQUIRED && this.checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset())) {
                    inFlightBatch.maybeInitializeOffsetStateUpdate();
                }
                if (inFlightBatch.offsetState() == null) {
                    this.releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, memberId);
                    continue;
                }
                this.releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset);
            }
            if (!stateBatches.isEmpty()) {
                this.writeShareGroupState(stateBatches).whenComplete((result, exception) -> {
                    if (exception != null) {
                        log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", new Object[]{this.groupId, this.topicIdPartition, memberId, exception});
                    }
                    this.maybeUpdateCachedStateAndOffsets();
                });
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (!stateBatches.isEmpty()) {
            DelayedShareFetchGroupKey delayedShareFetchKey = new DelayedShareFetchGroupKey(this.groupId, this.topicIdPartition.topicId(), this.topicIdPartition.partition());
            this.replicaManager.completeDelayedShareFetchRequest((DelayedShareFetchKey)delayedShareFetchKey);
        }
    }

    private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch, List<PersisterStateBatch> stateBatches, String memberId) {
        if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
            InFlightState updateResult = inFlightBatch.tryUpdateBatchState(inFlightBatch.lastOffset() < this.startOffset ? RecordState.ARCHIVED : RecordState.AVAILABLE, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
            if (updateResult == null) {
                log.error("Unable to release acquisition lock on timeout for the batch: {} for the share partition: {}-{} memberId: {}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition, memberId});
                return;
            }
            stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), updateResult.state.id, (short)updateResult.deliveryCount));
            updateResult.cancelAndClearAcquisitionLockTimeoutTask();
            if (updateResult.state != RecordState.ARCHIVED) {
                this.findNextFetchOffset.set(true);
            }
            return;
        }
        log.debug("The batch is not in acquired state while release of acquisition lock on timeout, skipping, batch: {} for the share partition: {}-{} memberId: {}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition, memberId});
    }

    private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFlightBatch, List<PersisterStateBatch> stateBatches, String memberId, long firstOffset, long lastOffset) {
        for (Map.Entry offsetState : inFlightBatch.offsetState().entrySet()) {
            if ((Long)offsetState.getKey() < firstOffset) continue;
            if ((Long)offsetState.getKey() > lastOffset) break;
            if (((InFlightState)offsetState.getValue()).state != RecordState.ACQUIRED) {
                log.debug("The offset is not in acquired state while release of acquisition lock on timeout, skipping, offset: {} batch: {} for the share partition: {}-{} memberId: {}", new Object[]{offsetState.getKey(), inFlightBatch, this.groupId, this.topicIdPartition, memberId});
                continue;
            }
            InFlightState updateResult = ((InFlightState)offsetState.getValue()).tryUpdateState((Long)offsetState.getKey() < this.startOffset ? RecordState.ARCHIVED : RecordState.AVAILABLE, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
            if (updateResult == null) {
                log.error("Unable to release acquisition lock on timeout for the offset: {} in batch: {} for the share partition: {}-{} memberId: {}", new Object[]{offsetState.getKey(), inFlightBatch, this.groupId, this.topicIdPartition, memberId});
                continue;
            }
            stateBatches.add(new PersisterStateBatch(((Long)offsetState.getKey()).longValue(), ((Long)offsetState.getKey()).longValue(), updateResult.state.id, (short)updateResult.deliveryCount));
            updateResult.cancelAndClearAcquisitionLockTimeoutTask();
            if (updateResult.state == RecordState.ARCHIVED) continue;
            this.findNextFetchOffset.set(true);
        }
    }

    private long startOffsetDuringInitialization(long partitionDataStartOffset) throws Exception {
        if (partitionDataStartOffset != -1L) {
            return partitionDataStartOffset;
        }
        ShareGroupAutoOffsetResetStrategy offsetResetStrategy = this.groupConfigManager.groupConfig(this.groupId).isPresent() ? ((GroupConfig)this.groupConfigManager.groupConfig(this.groupId).get()).shareAutoOffsetReset() : GroupConfig.defaultShareAutoOffsetReset();
        if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
            return ShareFetchUtils.offsetForLatestTimestamp(this.topicIdPartition, this.replicaManager, this.leaderEpoch);
        }
        if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.EARLIEST) {
            return ShareFetchUtils.offsetForEarliestTimestamp(this.topicIdPartition, this.replicaManager, this.leaderEpoch);
        }
        return ShareFetchUtils.offsetForTimestamp(this.topicIdPartition, this.replicaManager, offsetResetStrategy.timestamp(), this.leaderEpoch);
    }

    NavigableMap<Long, InFlightBatch> cachedState() {
        return new ConcurrentSkipListMap<Long, InFlightBatch>((SortedMap<Long, InFlightBatch>)this.cachedState);
    }

    boolean findNextFetchOffset() {
        return this.findNextFetchOffset.get();
    }

    void findNextFetchOffset(boolean findNextOffset) {
        this.findNextFetchOffset.getAndSet(findNextOffset);
    }

    long startOffset() {
        this.lock.readLock().lock();
        try {
            long l = this.startOffset;
            return l;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    long endOffset() {
        this.lock.readLock().lock();
        try {
            long l = this.endOffset;
            return l;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    int stateEpoch() {
        return this.stateEpoch;
    }

    Timer timer() {
        return this.timer;
    }

    static enum SharePartitionState {
        EMPTY,
        INITIALIZING,
        ACTIVE,
        FAILED,
        FENCED;

    }

    static final class OffsetMetadata {
        private long offset = -1L;
        private LogOffsetMetadata offsetMetadata;

        OffsetMetadata() {
        }

        long offset() {
            return this.offset;
        }

        LogOffsetMetadata offsetMetadata() {
            return this.offsetMetadata;
        }

        void updateOffsetMetadata(long offset, LogOffsetMetadata offsetMetadata) {
            this.offset = offset;
            this.offsetMetadata = offsetMetadata;
        }
    }

    final class InFlightBatch {
        private final long firstOffset;
        private final long lastOffset;
        private InFlightState batchState;
        private NavigableMap<Long, InFlightState> offsetState;

        InFlightBatch(String memberId, long firstOffset, long lastOffset, RecordState state, int deliveryCount, AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
            this.firstOffset = firstOffset;
            this.lastOffset = lastOffset;
            this.batchState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
        }

        long firstOffset() {
            return this.firstOffset;
        }

        long lastOffset() {
            return this.lastOffset;
        }

        RecordState batchState() {
            return this.inFlightState().state;
        }

        String batchMemberId() {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch member id is not available as the offset state is maintained");
            }
            return this.batchState.memberId;
        }

        int batchDeliveryCount() {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch delivery count is not available as the offset state is maintained");
            }
            return this.batchState.deliveryCount;
        }

        AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
            return this.inFlightState().acquisitionLockTimeoutTask;
        }

        NavigableMap<Long, InFlightState> offsetState() {
            return this.offsetState;
        }

        private InFlightState inFlightState() {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch state is not available as the offset state is maintained");
            }
            return this.batchState;
        }

        private boolean batchHasOngoingStateTransition() {
            return this.inFlightState().hasOngoingStateTransition();
        }

        private void archiveBatch(String newMemberId) {
            this.inFlightState().archive(newMemberId);
        }

        private InFlightState tryUpdateBatchState(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
            }
            return this.batchState.tryUpdateState(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId);
        }

        private InFlightState startBatchStateTransition(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
            }
            return this.batchState.startStateTransition(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId);
        }

        private void maybeInitializeOffsetStateUpdate() {
            if (this.offsetState == null) {
                this.offsetState = new ConcurrentSkipListMap<Long, InFlightState>();
                for (long offset = this.firstOffset; offset <= this.lastOffset; ++offset) {
                    if (this.batchState.acquisitionLockTimeoutTask != null) {
                        long delayMs = this.batchState.acquisitionLockTimeoutTask.expirationMs() - SharePartition.this.time.hiResClockMs();
                        AcquisitionLockTimerTask timerTask = SharePartition.this.acquisitionLockTimerTask(this.batchState.memberId, offset, offset, delayMs);
                        this.offsetState.put(offset, new InFlightState(this.batchState.state, this.batchState.deliveryCount, this.batchState.memberId, timerTask));
                        SharePartition.this.timer.add((TimerTask)timerTask);
                        continue;
                    }
                    this.offsetState.put(offset, new InFlightState(this.batchState.state, this.batchState.deliveryCount, this.batchState.memberId));
                }
                if (this.batchState.acquisitionLockTimeoutTask != null) {
                    this.batchState.cancelAndClearAcquisitionLockTimeoutTask();
                }
                this.batchState = null;
            }
        }

        private void updateAcquisitionLockTimeout(AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
            this.inFlightState().acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
        }

        public String toString() {
            return "InFlightBatch(firstOffset=" + this.firstOffset + ", lastOffset=" + this.lastOffset + ", inFlightState=" + String.valueOf(this.batchState) + ", offsetState=" + String.valueOf(this.offsetState == null ? "null" : this.offsetState) + ")";
        }
    }

    static enum RecordState {
        AVAILABLE(0),
        ACQUIRED(1),
        ACKNOWLEDGED(2),
        ARCHIVED(4);

        public final byte id;

        private RecordState(byte id) {
            this.id = id;
        }

        public RecordState validateTransition(RecordState newState) throws IllegalStateException {
            Objects.requireNonNull(newState, "newState cannot be null");
            if (this == newState) {
                throw new IllegalStateException("The state transition is invalid as the new state isthe same as the current state");
            }
            if (this == ACKNOWLEDGED || this == ARCHIVED) {
                throw new IllegalStateException("The state transition is invalid from the current state: " + String.valueOf((Object)this));
            }
            if (this == AVAILABLE && newState != ACQUIRED) {
                throw new IllegalStateException("The state can only be transitioned to ACQUIRED from AVAILABLE");
            }
            return newState;
        }

        public static RecordState forId(byte id) {
            switch (id) {
                case 0: {
                    return AVAILABLE;
                }
                case 1: {
                    return ACQUIRED;
                }
                case 2: {
                    return ACKNOWLEDGED;
                }
                case 4: {
                    return ARCHIVED;
                }
            }
            throw new IllegalArgumentException("Unknown record state id: " + id);
        }
    }

    static final class InFlightState {
        private RecordState state;
        private int deliveryCount;
        private String memberId;
        private InFlightState rollbackState;
        private AcquisitionLockTimerTask acquisitionLockTimeoutTask;

        InFlightState(RecordState state, int deliveryCount, String memberId) {
            this(state, deliveryCount, memberId, null);
        }

        InFlightState(RecordState state, int deliveryCount, String memberId, AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
            this.state = state;
            this.deliveryCount = deliveryCount;
            this.memberId = memberId;
            this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
        }

        RecordState state() {
            return this.state;
        }

        String memberId() {
            return this.memberId;
        }

        TimerTask acquisitionLockTimeoutTask() {
            return this.acquisitionLockTimeoutTask;
        }

        void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) throws IllegalArgumentException {
            if (this.acquisitionLockTimeoutTask != null) {
                throw new IllegalArgumentException("Existing acquisition lock timeout exists, cannot override.");
            }
            this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
        }

        void cancelAndClearAcquisitionLockTimeoutTask() {
            this.acquisitionLockTimeoutTask.cancel();
            this.acquisitionLockTimeoutTask = null;
        }

        private boolean hasOngoingStateTransition() {
            if (this.rollbackState == null) {
                return false;
            }
            return this.rollbackState.state != null;
        }

        private InFlightState tryUpdateState(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
            try {
                if (newState == RecordState.AVAILABLE && this.deliveryCount >= maxDeliveryCount) {
                    newState = RecordState.ARCHIVED;
                }
                this.state = this.state.validateTransition(newState);
                if (incrementDeliveryCount && newState != RecordState.ARCHIVED) {
                    ++this.deliveryCount;
                }
                this.memberId = newMemberId;
                return this;
            }
            catch (IllegalStateException e) {
                log.error("Failed to update state of the records", (Throwable)e);
                return null;
            }
        }

        private void archive(String newMemberId) {
            this.state = RecordState.ARCHIVED;
            this.memberId = newMemberId;
        }

        private InFlightState startStateTransition(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
            this.rollbackState = new InFlightState(this.state, this.deliveryCount, this.memberId, this.acquisitionLockTimeoutTask);
            return this.tryUpdateState(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId);
        }

        private void completeStateTransition(boolean commit) {
            if (commit) {
                this.rollbackState = null;
                return;
            }
            this.state = this.rollbackState.state;
            this.deliveryCount = this.rollbackState.deliveryCount;
            this.memberId = this.rollbackState.memberId;
            this.rollbackState = null;
        }

        public int hashCode() {
            return Objects.hash(new Object[]{this.state, this.deliveryCount, this.memberId});
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            InFlightState that = (InFlightState)o;
            return this.state == that.state && this.deliveryCount == that.deliveryCount && this.memberId.equals(that.memberId);
        }

        public String toString() {
            return "InFlightState(state=" + this.state.toString() + ", deliveryCount=" + this.deliveryCount + ", memberId=" + this.memberId + ")";
        }
    }

    final class AcquisitionLockTimerTask
    extends TimerTask {
        private final long expirationMs;
        private final String memberId;
        private final long firstOffset;
        private final long lastOffset;

        AcquisitionLockTimerTask(long delayMs, String memberId, long firstOffset, long lastOffset) {
            super(delayMs);
            this.expirationMs = SharePartition.this.time.hiResClockMs() + delayMs;
            this.memberId = memberId;
            this.firstOffset = firstOffset;
            this.lastOffset = lastOffset;
        }

        long expirationMs() {
            return this.expirationMs;
        }

        public void run() {
            SharePartition.this.releaseAcquisitionLockOnTimeout(this.memberId, this.firstOffset, this.lastOffset);
        }
    }
}

