/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierBehaviourController;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.UnalignedController;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
public class SingleCheckpointBarrierHandler
extends CheckpointBarrierHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SingleCheckpointBarrierHandler.class);
    private final String taskName;
    private final CheckpointBarrierBehaviourController controller;
    private int numBarriersReceived;
    private long currentCheckpointId = -1L;
    private int numOpenChannels;
    private CompletableFuture<Void> allBarriersReceivedFuture = FutureUtils.completedVoidFuture();

    @VisibleForTesting
    static SingleCheckpointBarrierHandler createUnalignedCheckpointBarrierHandler(SubtaskCheckpointCoordinator checkpointCoordinator, String taskName, AbstractInvokable toNotifyOnCheckpoint, CheckpointableInput ... inputs) {
        return new SingleCheckpointBarrierHandler(taskName, toNotifyOnCheckpoint, (int)Arrays.stream(inputs).flatMap(gate -> gate.getChannelInfos().stream()).count(), new UnalignedController(checkpointCoordinator, inputs));
    }

    SingleCheckpointBarrierHandler(String taskName, AbstractInvokable toNotifyOnCheckpoint, int numOpenChannels, CheckpointBarrierBehaviourController controller) {
        super(toNotifyOnCheckpoint);
        this.taskName = taskName;
        this.numOpenChannels = numOpenChannels;
        this.controller = controller;
    }

    @Override
    public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
        long barrierId = barrier.getId();
        LOG.debug("{}: Received barrier from channel {} @ {}.", new Object[]{this.taskName, channelInfo, barrierId});
        if (this.currentCheckpointId > barrierId || this.currentCheckpointId == barrierId && !this.isCheckpointPending()) {
            this.controller.obsoleteBarrierReceived(channelInfo, barrier);
            return;
        }
        if (this.currentCheckpointId < barrierId) {
            if (this.isCheckpointPending()) {
                this.cancelSubsumedCheckpoint(barrierId);
            }
            if (this.getNumOpenChannels() == 1) {
                this.markAlignmentStartAndEnd(barrier.getTimestamp());
            } else {
                this.markAlignmentStart(barrier.getTimestamp());
            }
            this.currentCheckpointId = barrierId;
            this.numBarriersReceived = 0;
            this.allBarriersReceivedFuture = new CompletableFuture();
            try {
                if (this.controller.preProcessFirstBarrier(channelInfo, barrier)) {
                    LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.", new Object[]{this.taskName, barrier.getId(), barrier.getTimestamp()});
                    this.notifyCheckpoint(barrier);
                }
            }
            catch (CheckpointException e) {
                this.abortInternal(barrier.getId(), e);
                return;
            }
        }
        this.controller.barrierReceived(channelInfo, barrier);
        if (this.currentCheckpointId == barrierId && ++this.numBarriersReceived == this.numOpenChannels) {
            if (this.getNumOpenChannels() > 1) {
                this.markAlignmentEnd();
            }
            this.numBarriersReceived = 0;
            if (this.controller.postProcessLastBarrier(channelInfo, barrier)) {
                LOG.debug("{}: Triggering checkpoint {} on the last barrier at {}.", new Object[]{this.taskName, barrier.getId(), barrier.getTimestamp()});
                this.notifyCheckpoint(barrier);
            }
            this.allBarriersReceivedFuture.complete(null);
        }
    }

    @Override
    public void processBarrierAnnouncement(CheckpointBarrier announcedBarrier, int sequenceNumber, InputChannelInfo channelInfo) throws IOException {
    }

    @Override
    public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException {
        long cancelledId = cancelBarrier.getCheckpointId();
        if (cancelledId > this.currentCheckpointId || cancelledId == this.currentCheckpointId && this.numBarriersReceived > 0) {
            this.abortInternal(cancelledId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
        }
    }

    private void abortInternal(long cancelledId, CheckpointException exception) throws IOException {
        this.currentCheckpointId = Math.max(cancelledId, this.currentCheckpointId);
        this.numBarriersReceived = 0;
        this.controller.abortPendingCheckpoint(cancelledId, exception);
        this.allBarriersReceivedFuture.completeExceptionally(exception);
        this.notifyAbort(cancelledId, exception);
    }

    @Override
    public void processEndOfPartition() throws IOException {
        --this.numOpenChannels;
        if (this.isCheckpointPending()) {
            LOG.warn("{}: Received EndOfPartition(-1) before completing current checkpoint {}. Skipping current checkpoint.", (Object)this.taskName, (Object)this.currentCheckpointId);
            this.numBarriersReceived = 0;
            CheckpointException exception = new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM);
            this.controller.abortPendingCheckpoint(this.currentCheckpointId, exception);
            this.allBarriersReceivedFuture.completeExceptionally(exception);
            this.notifyAbort(this.currentCheckpointId, exception);
        }
    }

    @Override
    public long getLatestCheckpointId() {
        return this.currentCheckpointId;
    }

    @Override
    public void close() throws IOException {
        this.allBarriersReceivedFuture.cancel(false);
        super.close();
    }

    @Override
    protected boolean isCheckpointPending() {
        return this.numBarriersReceived > 0;
    }

    private void cancelSubsumedCheckpoint(long barrierId) throws IOException {
        CheckpointException exception = new CheckpointException("Barrier id: " + barrierId, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED);
        LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", new Object[]{this.taskName, barrierId, this.currentCheckpointId});
        this.controller.abortPendingCheckpoint(this.currentCheckpointId, exception);
        this.allBarriersReceivedFuture.completeExceptionally(exception);
        this.notifyAbort(this.currentCheckpointId, exception);
    }

    @Override
    public CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
        if (checkpointId < this.currentCheckpointId) {
            return FutureUtils.completedVoidFuture();
        }
        if (checkpointId > this.currentCheckpointId) {
            throw new IllegalStateException("Checkpoint " + checkpointId + " has not been started at all");
        }
        return this.allBarriersReceivedFuture;
    }

    @VisibleForTesting
    int getNumOpenChannels() {
        return this.numOpenChannels;
    }

    public String toString() {
        return String.format("%s: current checkpoint: %d, current barriers: %d, open channels: %d", this.taskName, this.currentCheckpointId, this.numBarriersReceived, this.numOpenChannels);
    }
}

