/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectSinkAddressEvent;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class CollectSinkFunction<IN>
extends RichSinkFunction<IN>
implements CheckpointedFunction,
CheckpointListener {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(CollectSinkFunction.class);
    private final TypeSerializer<IN> serializer;
    private final int maxResultsPerBatch;
    private final int maxResultsBuffered;
    private final String accumulatorName;
    private transient OperatorEventGateway eventGateway;
    private transient LinkedList<IN> bufferedResults;
    private transient ReentrantLock bufferedResultsLock;
    private transient Condition bufferNotFullCondition;
    private transient String version;
    private transient long offset;
    private transient long lastCheckpointedOffset;
    private transient ServerThread serverThread;
    private transient ListState<IN> bufferedResultsState;
    private transient ListState<Long> offsetState;
    private transient SortedMap<Long, Long> uncompletedCheckpointMap;

    public CollectSinkFunction(TypeSerializer<IN> serializer, int maxResultsPerBatch, String accumulatorName) {
        this.serializer = serializer;
        this.maxResultsPerBatch = maxResultsPerBatch;
        this.maxResultsBuffered = maxResultsPerBatch * 2;
        this.accumulatorName = accumulatorName;
    }

    private void initBuffer() {
        if (this.bufferedResults != null) {
            return;
        }
        this.bufferedResults = new LinkedList();
        this.bufferedResultsLock = new ReentrantLock();
        this.bufferNotFullCondition = this.bufferedResultsLock.newCondition();
        this.lastCheckpointedOffset = this.offset = 0L;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.initBuffer();
        this.bufferedResultsState = context.getOperatorStateStore().getListState(new ListStateDescriptor("bufferedResultsState", this.serializer));
        this.bufferedResults.clear();
        for (Object result : (Iterable)this.bufferedResultsState.get()) {
            this.bufferedResults.add(result);
        }
        this.offsetState = context.getOperatorStateStore().getListState(new ListStateDescriptor("offsetState", Long.class));
        this.offset = 0L;
        Iterator iterator = ((Iterable)this.offsetState.get()).iterator();
        while (iterator.hasNext()) {
            long value;
            this.offset = value = ((Long)iterator.next()).longValue();
        }
        this.lastCheckpointedOffset = this.offset;
        LOG.info("Initializing collect sink statee with offset = " + this.lastCheckpointedOffset + ", num buffered results = " + this.bufferedResults.size());
        this.uncompletedCheckpointMap = new TreeMap<Long, Long>();
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.bufferedResultsLock.lock();
        try {
            this.bufferedResultsState.clear();
            this.bufferedResultsState.addAll(this.bufferedResults);
            this.offsetState.clear();
            this.offsetState.add((Object)this.offset);
            this.uncompletedCheckpointMap.put(context.getCheckpointId(), this.offset);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Checkpoint begin with checkpointId = " + context.getCheckpointId() + ", lastCheckpointedOffset = " + this.lastCheckpointedOffset + ", num buffered results = " + this.bufferedResults.size());
            }
        }
        finally {
            this.bufferedResultsLock.unlock();
        }
    }

    public void open(Configuration parameters) throws Exception {
        Preconditions.checkState((this.getRuntimeContext().getNumberOfParallelSubtasks() == 1 ? 1 : 0) != 0, (Object)"The parallelism of CollectSinkFunction must be 1");
        this.initBuffer();
        this.version = UUID.randomUUID().toString();
        this.serverThread = new ServerThread(this.serializer);
        this.serverThread.start();
        Preconditions.checkNotNull((Object)this.eventGateway, (String)"Operator event gateway hasn't been set");
        InetSocketAddress address = this.serverThread.getServerSocketAddress();
        LOG.info("Collect sink server established, address = " + address);
        CollectSinkAddressEvent addressEvent = new CollectSinkAddressEvent(address);
        this.eventGateway.sendEventToCoordinator((OperatorEvent)addressEvent);
    }

    @Override
    public void invoke(IN value, SinkFunction.Context context) throws Exception {
        this.bufferedResultsLock.lock();
        try {
            if (this.bufferedResults.size() >= this.maxResultsBuffered) {
                this.bufferNotFullCondition.await();
            }
            this.bufferedResults.add(value);
        }
        finally {
            this.bufferedResultsLock.unlock();
        }
    }

    public void close() throws Exception {
        this.serverThread.close();
        this.serverThread.join();
        this.bufferedResults = null;
    }

    public void accumulateFinalResults() throws Exception {
        this.bufferedResultsLock.lock();
        try {
            SerializedListAccumulator accumulator = new SerializedListAccumulator();
            accumulator.add((Object)CollectSinkFunction.serializeAccumulatorResult(this.offset, this.version, this.lastCheckpointedOffset, this.bufferedResults, this.serializer), (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
            this.getRuntimeContext().addAccumulator(this.accumulatorName, (Accumulator)accumulator);
        }
        finally {
            this.bufferedResultsLock.unlock();
        }
    }

    public void notifyCheckpointComplete(long checkpointId) {
        this.lastCheckpointedOffset = (Long)this.uncompletedCheckpointMap.get(checkpointId);
        this.uncompletedCheckpointMap.headMap(checkpointId + 1L).clear();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Checkpoint complete with checkpointId = " + checkpointId + ", lastCheckpointedOffset = " + this.lastCheckpointedOffset);
        }
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    public void setOperatorEventGateway(OperatorEventGateway eventGateway) {
        this.eventGateway = eventGateway;
    }

    @VisibleForTesting
    public static <T> byte[] serializeAccumulatorResult(long offset, String version, long lastCheckpointedOffset, List<T> bufferedResults, TypeSerializer<T> serializer) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper((OutputStream)baos);
        wrapper.writeLong(offset);
        CollectCoordinationResponse<T> finalResponse = new CollectCoordinationResponse<T>(version, lastCheckpointedOffset, bufferedResults, serializer);
        finalResponse.serialize((DataOutputView)wrapper);
        return baos.toByteArray();
    }

    public static <T> Tuple2<Long, CollectCoordinationResponse<T>> deserializeAccumulatorResult(byte[] serializedAccResults) throws IOException {
        ByteArrayInputStream bais = new ByteArrayInputStream(serializedAccResults);
        DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper((InputStream)bais);
        long token = wrapper.readLong();
        CollectCoordinationResponse finalResponse = new CollectCoordinationResponse((DataInputView)wrapper);
        return Tuple2.of((Object)token, finalResponse);
    }

    private class ServerThread
    extends Thread {
        private final TypeSerializer<IN> serializer;
        private final ServerSocket serverSocket;
        private boolean running;
        private Socket connection;
        private DataInputViewStreamWrapper inStream;
        private DataOutputViewStreamWrapper outStream;

        private ServerThread(TypeSerializer<IN> serializer) throws Exception {
            this.serializer = serializer.duplicate();
            this.serverSocket = new ServerSocket(0, 0, this.getBindAddress());
            this.running = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (this.running) {
                try {
                    ArrayList results;
                    if (this.connection == null) {
                        this.connection = this.serverSocket.accept();
                        this.inStream = new DataInputViewStreamWrapper(this.connection.getInputStream());
                        this.outStream = new DataOutputViewStreamWrapper(this.connection.getOutputStream());
                        LOG.info("Coordinator connection received");
                    }
                    CollectCoordinationRequest request = new CollectCoordinationRequest((DataInputView)this.inStream);
                    String requestVersion = request.getVersion();
                    long requestOffset = request.getOffset();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Request received, version = " + requestVersion + ", offset = " + requestOffset);
                        LOG.debug("Expecting version = " + CollectSinkFunction.this.version + ", offset = " + CollectSinkFunction.this.offset);
                    }
                    if (!CollectSinkFunction.this.version.equals(requestVersion) || requestOffset < CollectSinkFunction.this.offset) {
                        LOG.warn("Invalid request. Received version = " + requestVersion + ", offset = " + requestOffset + ", while expected version = " + CollectSinkFunction.this.version + ", offset = " + CollectSinkFunction.this.offset);
                        this.sendBackResults(Collections.emptyList());
                        continue;
                    }
                    CollectSinkFunction.this.bufferedResultsLock.lock();
                    try {
                        int oldSize = CollectSinkFunction.this.bufferedResults.size();
                        int ackedNum = Math.min((int)(requestOffset - CollectSinkFunction.this.offset), oldSize);
                        int nextBatchSize = Math.min(ackedNum + CollectSinkFunction.this.maxResultsPerBatch, oldSize) - ackedNum;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Preparing " + nextBatchSize + " results");
                        }
                        for (int i = 0; i < ackedNum; ++i) {
                            CollectSinkFunction.this.bufferedResults.removeFirst();
                            CollectSinkFunction.this.offset++;
                        }
                        results = new ArrayList(CollectSinkFunction.this.bufferedResults.subList(0, nextBatchSize));
                        if (oldSize >= CollectSinkFunction.this.maxResultsBuffered && CollectSinkFunction.this.bufferedResults.size() < CollectSinkFunction.this.maxResultsBuffered) {
                            CollectSinkFunction.this.bufferNotFullCondition.signal();
                        }
                    }
                    finally {
                        CollectSinkFunction.this.bufferedResultsLock.unlock();
                    }
                    this.sendBackResults(results);
                }
                catch (Exception e) {
                    LOG.warn("Collect sink server encounters an exception", (Throwable)e);
                    this.closeCurrentConnection();
                }
            }
        }

        private void close() {
            this.running = false;
            this.closeServerSocket();
            this.closeCurrentConnection();
        }

        private InetSocketAddress getServerSocketAddress() {
            RuntimeContext context = CollectSinkFunction.this.getRuntimeContext();
            Preconditions.checkState((boolean)(context instanceof StreamingRuntimeContext), (Object)"CollectSinkFunction can only be used in StreamTask");
            StreamingRuntimeContext streamingContext = (StreamingRuntimeContext)context;
            String taskManagerAddress = streamingContext.getTaskManagerRuntimeInfo().getTaskManagerExternalAddress();
            return new InetSocketAddress(taskManagerAddress, this.serverSocket.getLocalPort());
        }

        private InetAddress getBindAddress() {
            RuntimeContext context = CollectSinkFunction.this.getRuntimeContext();
            Preconditions.checkState((boolean)(context instanceof StreamingRuntimeContext), (Object)"CollectSinkFunction can only be used in StreamTask");
            StreamingRuntimeContext streamingContext = (StreamingRuntimeContext)context;
            String bindAddress = streamingContext.getTaskManagerRuntimeInfo().getTaskManagerBindAddress();
            if (bindAddress != null) {
                try {
                    return InetAddress.getByName(bindAddress);
                }
                catch (UnknownHostException e) {
                    return null;
                }
            }
            return null;
        }

        private void sendBackResults(List<IN> results) throws IOException {
            CollectCoordinationResponse response = new CollectCoordinationResponse(CollectSinkFunction.this.version, CollectSinkFunction.this.lastCheckpointedOffset, results, this.serializer);
            response.serialize((DataOutputView)this.outStream);
        }

        private void closeCurrentConnection() {
            try {
                if (this.connection != null) {
                    this.connection.close();
                    this.connection = null;
                }
            }
            catch (Exception e) {
                LOG.warn("Error occurs when closing client connections in CollectSinkFunction", (Throwable)e);
            }
        }

        private void closeServerSocket() {
            try {
                this.serverSocket.close();
            }
            catch (Exception e) {
                LOG.warn("Error occurs when closing server in CollectSinkFunction", (Throwable)e);
            }
        }
    }
}

