/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.common.runtime;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorPlayback;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoordinatorLoaderImpl<T>
implements CoordinatorLoader<T> {
    public static final long DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384L;
    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLoaderImpl.class);
    private final Time time;
    private final Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier;
    private final Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier;
    private final Deserializer<T> deserializer;
    private final int loadBufferSize;
    private final long commitIntervalOffsets;
    private final AtomicBoolean isRunning = new AtomicBoolean(true);
    private final KafkaScheduler scheduler = new KafkaScheduler(1);

    public CoordinatorLoaderImpl(Time time, Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier, Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier, Deserializer<T> deserializer, int loadBufferSize, long commitIntervalOffsets) {
        this.time = time;
        this.partitionLogSupplier = partitionLogSupplier;
        this.partitionLogEndOffsetSupplier = partitionLogEndOffsetSupplier;
        this.deserializer = deserializer;
        this.loadBufferSize = loadBufferSize;
        this.commitIntervalOffsets = commitIntervalOffsets;
        this.scheduler.startup();
    }

    @Override
    public CompletableFuture<CoordinatorLoader.LoadSummary> load(TopicPartition tp, CoordinatorPlayback<T> coordinator) {
        CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<CoordinatorLoader.LoadSummary>();
        long startTimeMs = this.time.milliseconds();
        try {
            ScheduledFuture result = this.scheduler.scheduleOnce(String.format("Load coordinator from %s", tp), () -> this.doLoad(tp, coordinator, future, startTimeMs));
            if (result.isCancelled()) {
                future.completeExceptionally(new RuntimeException("Coordinator loader is closed."));
            }
        }
        catch (Exception e) {
            future.completeExceptionally(e);
        }
        return future;
    }

    private void doLoad(TopicPartition tp, CoordinatorPlayback<T> coordinator, CompletableFuture<CoordinatorLoader.LoadSummary> future, long startTimeMs) {
        long schedulerQueueTimeMs = this.time.milliseconds() - startTimeMs;
        try {
            Optional<UnifiedLog> logOpt = this.partitionLogSupplier.apply(tp);
            if (logOpt.isEmpty()) {
                future.completeExceptionally((Throwable)new NotLeaderOrFollowerException("Could not load records from " + String.valueOf(tp) + " because the log does not exist."));
                return;
            }
            UnifiedLog log = logOpt.get();
            ByteBuffer buffer = ByteBuffer.allocate(0);
            long currentOffset = log.logStartOffset();
            LoadStats stats = new LoadStats();
            long lastCommittedOffset = -1L;
            while (this.shouldFetchNextBatch(currentOffset, this.logEndOffset(tp), stats.readAtLeastOneRecord)) {
                FetchDataInfo fetchDataInfo = log.read(currentOffset, this.loadBufferSize, FetchIsolation.LOG_END, true);
                stats.readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes() > 0;
                MemoryRecords memoryRecords = this.toReadableMemoryRecords(tp, fetchDataInfo.records, buffer);
                if (fetchDataInfo.records instanceof FileRecords) {
                    buffer = memoryRecords.buffer();
                }
                ReplayResult replayResult = this.processMemoryRecords(tp, log, memoryRecords, coordinator, stats, currentOffset, lastCommittedOffset);
                currentOffset = replayResult.nextOffset;
                lastCommittedOffset = replayResult.lastCommittedOffset;
            }
            long endTimeMs = this.time.milliseconds();
            if (this.logEndOffset(tp) == -1L) {
                future.completeExceptionally((Throwable)new NotLeaderOrFollowerException(String.format("Stopped loading records from %s because the partition is not online or is no longer the leader.", tp)));
            } else if (this.isRunning.get()) {
                future.complete(new CoordinatorLoader.LoadSummary(startTimeMs, endTimeMs, schedulerQueueTimeMs, stats.numRecords, stats.numBytes));
            } else {
                future.completeExceptionally(new RuntimeException("Coordinator loader is closed."));
            }
        }
        catch (Throwable ex) {
            future.completeExceptionally(ex);
        }
    }

    private long logEndOffset(TopicPartition tp) {
        return this.partitionLogEndOffsetSupplier.apply(tp).orElse(-1L);
    }

    private boolean shouldFetchNextBatch(long currentOffset, long logEndOffset, boolean readAtLeastOneRecord) {
        return currentOffset < logEndOffset && readAtLeastOneRecord && this.isRunning.get();
    }

    private MemoryRecords toReadableMemoryRecords(TopicPartition tp, Records records, ByteBuffer buffer) throws IOException {
        if (records instanceof MemoryRecords) {
            MemoryRecords memoryRecords = (MemoryRecords)records;
            return memoryRecords;
        }
        if (records instanceof FileRecords) {
            FileRecords fileRecords = (FileRecords)records;
            int sizeInBytes = fileRecords.sizeInBytes();
            int bytesNeeded = Math.max(this.loadBufferSize, sizeInBytes);
            if (buffer.capacity() < bytesNeeded) {
                if (this.loadBufferSize < bytesNeeded) {
                    LOG.warn("Loaded metadata from {} with buffer larger ({} bytes) than configured buffer size ({} bytes).", new Object[]{tp, bytesNeeded, this.loadBufferSize});
                }
                buffer = ByteBuffer.allocate(bytesNeeded);
            } else {
                buffer.clear();
            }
            fileRecords.readInto(buffer, 0);
            return MemoryRecords.readableRecords((ByteBuffer)buffer);
        }
        throw new IllegalArgumentException("Unsupported record type: " + String.valueOf(records.getClass()));
    }

    private ReplayResult processMemoryRecords(TopicPartition tp, UnifiedLog log, MemoryRecords memoryRecords, CoordinatorPlayback<T> coordinator, LoadStats loadStats, long currentOffset, long lastCommittedOffset) {
        for (MutableRecordBatch batch : memoryRecords.batches()) {
            long currentHighWatermark;
            if (batch.isControlBatch()) {
                for (Record record : batch) {
                    ++loadStats.numRecords;
                    ControlRecordType controlRecord = ControlRecordType.parse((ByteBuffer)record.key());
                    if (controlRecord == ControlRecordType.COMMIT) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Replaying end transaction marker from {} at offset {} to commit transaction with producer id {} and producer epoch {}.", new Object[]{tp, record.offset(), batch.producerId(), batch.producerEpoch()});
                        }
                        coordinator.replayEndTransactionMarker(batch.producerId(), batch.producerEpoch(), TransactionResult.COMMIT);
                        continue;
                    }
                    if (controlRecord != ControlRecordType.ABORT) continue;
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Replaying end transaction marker from {} at offset {} to abort transaction with producer id {} and producer epoch {}.", new Object[]{tp, record.offset(), batch.producerId(), batch.producerEpoch()});
                    }
                    coordinator.replayEndTransactionMarker(batch.producerId(), batch.producerEpoch(), TransactionResult.ABORT);
                }
            } else {
                for (Record record : batch) {
                    ++loadStats.numRecords;
                    Optional<Object> coordinatorRecordOpt = Optional.empty();
                    try {
                        coordinatorRecordOpt = Optional.ofNullable(this.deserializer.deserialize(record.key(), record.value()));
                    }
                    catch (Deserializer.UnknownRecordTypeException ex) {
                        LOG.warn("Unknown record type {} while loading offsets and group metadata from {}. Ignoring it. It could be a left over from an aborted upgrade.", (Object)ex.unknownType(), (Object)tp);
                    }
                    catch (RuntimeException ex) {
                        String msg = String.format("Deserializing record %s from %s failed.", record, tp);
                        LOG.error(msg, (Throwable)ex);
                        throw new RuntimeException(msg, ex);
                    }
                    coordinatorRecordOpt.ifPresent(coordinatorRecord -> {
                        try {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("Replaying record {} from {} at offset {} with producer id {} and producer epoch {}.", new Object[]{coordinatorRecord, tp, record.offset(), batch.producerId(), batch.producerEpoch()});
                            }
                            coordinator.replay(record.offset(), batch.producerId(), batch.producerEpoch(), coordinatorRecord);
                        }
                        catch (RuntimeException ex) {
                            String msg = String.format("Replaying record %s from %s at offset %d with producer id %d and producer epoch %d failed.", coordinatorRecord, tp, record.offset(), batch.producerId(), batch.producerEpoch());
                            LOG.error(msg, (Throwable)ex);
                            throw new RuntimeException(msg, ex);
                        }
                    });
                }
            }
            if ((currentOffset = batch.nextOffset()) >= (currentHighWatermark = log.highWatermark())) {
                coordinator.updateLastWrittenOffset(currentOffset);
                if (currentHighWatermark <= lastCommittedOffset) continue;
                coordinator.updateLastCommittedOffset(currentHighWatermark);
                lastCommittedOffset = currentHighWatermark;
                continue;
            }
            if (currentOffset - lastCommittedOffset < this.commitIntervalOffsets) continue;
            coordinator.updateLastWrittenOffset(currentOffset);
            coordinator.updateLastCommittedOffset(currentOffset);
            lastCommittedOffset = currentOffset;
        }
        loadStats.numBytes += (long)memoryRecords.sizeInBytes();
        return new ReplayResult(currentOffset, lastCommittedOffset);
    }

    @Override
    public void close() throws Exception {
        if (!this.isRunning.compareAndSet(true, false)) {
            LOG.warn("Coordinator loader is already shutting down.");
            return;
        }
        this.scheduler.shutdown();
    }

    private static class LoadStats {
        private long numRecords = 0L;
        private long numBytes = 0L;
        private boolean readAtLeastOneRecord = true;

        private LoadStats() {
        }

        public String toString() {
            return "LoadStats(numRecords=" + this.numRecords + ", numBytes=" + this.numBytes + ", readAtLeastOneRecord=" + this.readAtLeastOneRecord + ")";
        }
    }

    private record ReplayResult(long nextOffset, long lastCommittedOffset) {
    }
}

