/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.io.Serializable;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.connector.sink.CommittableTypeInfo;
import org.apache.flink.table.store.connector.sink.Committer;
import org.apache.flink.table.store.connector.sink.CommitterOperator;
import org.apache.flink.table.store.connector.sink.StoreCommitter;
import org.apache.flink.table.store.connector.sink.StoreCompactOperator;
import org.apache.flink.table.store.connector.sink.StoreWriteOperator;
import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;

public class StoreSink
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final String WRITER_NAME = "Writer";
    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
    private final ObjectIdentifier tableIdentifier;
    private final FileStoreTable table;
    private final boolean compactionTask;
    @Nullable
    private final Map<String, String> compactPartitionSpec;
    @Nullable
    private final CatalogLock.Factory lockFactory;
    @Nullable
    private final Map<String, String> overwritePartition;
    @Nullable
    private final LogSinkFunction logSinkFunction;

    public StoreSink(ObjectIdentifier tableIdentifier, FileStoreTable table, boolean compactionTask, @Nullable Map<String, String> compactPartitionSpec, @Nullable CatalogLock.Factory lockFactory, @Nullable Map<String, String> overwritePartition, @Nullable LogSinkFunction logSinkFunction) {
        this.tableIdentifier = tableIdentifier;
        this.table = table;
        this.compactionTask = compactionTask;
        this.compactPartitionSpec = compactPartitionSpec;
        this.lockFactory = lockFactory;
        this.overwritePartition = overwritePartition;
        this.logSinkFunction = logSinkFunction;
    }

    private OneInputStreamOperator<RowData, Committable> createWriteOperator() {
        if (this.compactionTask) {
            return new StoreCompactOperator(this.table, this.compactPartitionSpec);
        }
        return new StoreWriteOperator(this.table, this.overwritePartition, this.logSinkFunction);
    }

    private StoreCommitter createCommitter(String user, boolean createEmptyCommit) {
        Lock lock = Lock.fromCatalog(this.lockFactory, this.tableIdentifier.toObjectPath());
        return new StoreCommitter(this.table.newCommit(user).withOverwritePartition(this.overwritePartition).withCreateEmptyCommit(createEmptyCommit).withLock(lock));
    }

    public DataStreamSink<?> sinkTo(DataStream<RowData> input) {
        CommittableTypeInfo typeInfo = new CommittableTypeInfo();
        SingleOutputStreamOperator written = input.transform(WRITER_NAME, (TypeInformation)typeInfo, this.createWriteOperator()).setParallelism(input.getParallelism());
        StreamExecutionEnvironment env = input.getExecutionEnvironment();
        boolean streamingCheckpointEnabled = StreamExecutionEnvironmentUtils.getConfiguration(env).get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING && env.getCheckpointConfig().isCheckpointingEnabled();
        SingleOutputStreamOperator committed = written.transform(GLOBAL_COMMITTER_NAME, (TypeInformation)typeInfo, (OneInputStreamOperator)new CommitterOperator(streamingCheckpointEnabled, (SerializableFunction<String, Committer>)(SerializableFunction & Serializable)user -> this.createCommitter((String)user, streamingCheckpointEnabled), (SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>)((SerializableSupplier & Serializable)ManifestCommittableSerializer::new))).setParallelism(1).setMaxParallelism(1);
        return committed.addSink((SinkFunction)new DiscardingSink()).name("end").setParallelism(1);
    }
}

