/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator;
import org.apache.flink.util.Preconditions;

@Internal
public final class StreamingGlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
implements BoundedOneInput {
    private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
    private final List<GlobalCommT> recoveredGlobalCommittables;
    private boolean endOfInput;

    StreamingGlobalCommitterOperator(GlobalCommitter<CommT, GlobalCommT> globalCommitter, SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
        super(committableSerializer);
        this.globalCommitter = (GlobalCommitter)Preconditions.checkNotNull(globalCommitter);
        this.recoveredGlobalCommittables = new ArrayList<GlobalCommT>();
        this.endOfInput = false;
    }

    @Override
    void recoveredCommittables(List<GlobalCommT> committables) throws IOException {
        List recovered = this.globalCommitter.filterRecoveredCommittables((List)Preconditions.checkNotNull(committables));
        this.recoveredGlobalCommittables.addAll(recovered);
    }

    @Override
    List<GlobalCommT> prepareCommit(List<CommT> input) throws IOException {
        Preconditions.checkNotNull(input);
        ArrayList<GlobalCommT> result = new ArrayList<GlobalCommT>(this.recoveredGlobalCommittables);
        this.recoveredGlobalCommittables.clear();
        if (!input.isEmpty()) {
            result.add(this.globalCommitter.combine(input));
        }
        return result;
    }

    @Override
    List<GlobalCommT> commit(List<GlobalCommT> committables) throws Exception {
        return this.globalCommitter.commit((List)Preconditions.checkNotNull(committables));
    }

    @Override
    public void endInput() {
        this.endOfInput = true;
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.globalCommitter.close();
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        if (this.endOfInput) {
            this.globalCommitter.endOfInput();
        }
    }
}

