/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.data;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.data.AppendOnlyCompactManager;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.stats.BinaryTableStats;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.writer.BaseFileWriter;
import org.apache.flink.table.store.file.writer.FileWriter;
import org.apache.flink.table.store.file.writer.Metric;
import org.apache.flink.table.store.file.writer.MetricFileWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.file.writer.RollingFileWriter;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

public class AppendOnlyWriter
implements RecordWriter<RowData> {
    private final long schemaId;
    private final FileFormat fileFormat;
    private final long targetFileSize;
    private final RowType writeSchema;
    private final DataFilePathFactory pathFactory;
    private final AppendOnlyCompactManager compactManager;
    private final boolean forceCompact;
    private final LinkedList<DataFileMeta> toCompact;
    private final List<DataFileMeta> compactBefore;
    private final List<DataFileMeta> compactAfter;
    private final LongCounter seqNumCounter;
    private RowRollingWriter writer;

    public AppendOnlyWriter(long schemaId, FileFormat fileFormat, long targetFileSize, RowType writeSchema, LinkedList<DataFileMeta> restoredFiles, AppendOnlyCompactManager compactManager, boolean forceCompact, DataFilePathFactory pathFactory) {
        this.schemaId = schemaId;
        this.fileFormat = fileFormat;
        this.targetFileSize = targetFileSize;
        this.writeSchema = writeSchema;
        this.pathFactory = pathFactory;
        this.compactManager = compactManager;
        this.forceCompact = forceCompact;
        this.toCompact = restoredFiles;
        this.compactBefore = new ArrayList<DataFileMeta>();
        this.compactAfter = new ArrayList<DataFileMeta>();
        this.seqNumCounter = new LongCounter(AppendOnlyWriter.getMaxSequenceNumber(restoredFiles) + 1L);
        this.writer = RowRollingWriter.createRollingRowWriter(schemaId, fileFormat, targetFileSize, writeSchema, pathFactory, this.seqNumCounter);
    }

    @Override
    public void write(RowData rowData) throws Exception {
        Preconditions.checkArgument((rowData.getRowKind() == RowKind.INSERT ? 1 : 0) != 0, (String)"Append-only writer can only accept insert row kind, but current row kind is: %s", (Object[])new Object[]{rowData.getRowKind()});
        this.writer.write(rowData);
    }

    @Override
    public Increment prepareCommit(boolean endOnfInput) throws Exception {
        ArrayList<DataFileMeta> newFiles = new ArrayList<DataFileMeta>();
        if (this.writer != null) {
            this.writer.close();
            newFiles.addAll((Collection<DataFileMeta>)this.writer.result());
            this.seqNumCounter.resetLocal();
            this.seqNumCounter.add(AppendOnlyWriter.getMaxSequenceNumber(newFiles) + 1L);
            this.writer = RowRollingWriter.createRollingRowWriter(this.schemaId, this.fileFormat, this.targetFileSize, this.writeSchema, this.pathFactory, this.seqNumCounter);
        }
        this.toCompact.addAll(newFiles);
        this.submitCompaction();
        boolean blocking = endOnfInput || this.forceCompact;
        this.finishCompaction(blocking);
        return this.drainIncrement(newFiles);
    }

    @Override
    public void sync() throws Exception {
        this.finishCompaction(true);
    }

    @Override
    public List<DataFileMeta> close() throws Exception {
        this.compactManager.cancelCompaction();
        this.sync();
        ArrayList<DataFileMeta> result = new ArrayList<DataFileMeta>();
        if (this.writer != null) {
            this.writer.abort();
            result.addAll((Collection<DataFileMeta>)this.writer.result());
            this.writer = null;
        }
        return result;
    }

    private static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
        return fileMetas.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L);
    }

    private void submitCompaction() throws ExecutionException, InterruptedException {
        this.finishCompaction(false);
        if (this.compactManager.isCompactionFinished() && !this.toCompact.isEmpty()) {
            this.compactManager.submitCompaction();
        }
    }

    private void finishCompaction(boolean blocking) throws ExecutionException, InterruptedException {
        this.compactManager.finishCompaction(blocking).ifPresent(result -> {
            DataFileMeta lastFile;
            this.compactBefore.addAll(result.before());
            this.compactAfter.addAll(result.after());
            if (!result.after().isEmpty() && (lastFile = result.after().get(result.after().size() - 1)).fileSize() < this.targetFileSize) {
                this.toCompact.offerFirst(lastFile);
            }
        });
    }

    private Increment drainIncrement(List<DataFileMeta> newFiles) {
        Increment increment = new Increment(newFiles, new ArrayList<DataFileMeta>(this.compactBefore), new ArrayList<DataFileMeta>(this.compactAfter));
        this.compactBefore.clear();
        this.compactAfter.clear();
        return increment;
    }

    @VisibleForTesting
    List<DataFileMeta> getToCompact() {
        return this.toCompact;
    }

    public static class RowFileWriter
    extends BaseFileWriter<RowData, DataFileMeta> {
        private final FieldStatsArraySerializer statsArraySerializer;
        private final long schemaId;
        private final LongCounter seqNumCounter;

        public RowFileWriter(FileWriter.Factory<RowData, Metric> writerFactory, Path path, RowType writeSchema, long schemaId, LongCounter seqNumCounter) {
            super(writerFactory, path);
            this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
            this.schemaId = schemaId;
            this.seqNumCounter = seqNumCounter;
        }

        @Override
        public void write(RowData row) throws IOException {
            super.write(row);
            this.seqNumCounter.add(1L);
        }

        @Override
        protected DataFileMeta createResult(Path path, Metric metric) throws IOException {
            BinaryTableStats stats = this.statsArraySerializer.toBinary(metric.fieldStats());
            return DataFileMeta.forAppend(path.getName(), FileUtils.getFileSize(path), this.recordCount(), stats, this.seqNumCounter.getLocalValue() - super.recordCount(), this.seqNumCounter.getLocalValue() - 1L, this.schemaId);
        }
    }

    public static class RowRollingWriter
    extends RollingFileWriter<RowData, DataFileMeta> {
        public RowRollingWriter(Supplier<RowFileWriter> writerFactory, long targetFileSize) {
            super(writerFactory, targetFileSize);
        }

        public static RowRollingWriter createRollingRowWriter(long schemaId, FileFormat fileFormat, long targetFileSize, RowType writeSchema, DataFilePathFactory pathFactory, LongCounter seqNumCounter) {
            return new RowRollingWriter(() -> new RowFileWriter(MetricFileWriter.createFactory(fileFormat.createWriterFactory(writeSchema), Function.identity(), writeSchema, fileFormat.createStatsExtractor(writeSchema).orElse(null)), pathFactory.newPath(), writeSchema, schemaId, seqNumCounter), targetFileSize);
        }

        public List<DataFileMeta> write(CloseableIterator<RowData> iterator) throws Exception {
            try {
                super.write(iterator);
                super.close();
                Object object = super.result();
                return object;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            finally {
                iterator.close();
            }
        }
    }
}

