/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ambari.metrics.sink.relocated.zookeeper.server.quorum;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ambari.metrics.sink.relocated.slf4j.Logger;
import org.apache.ambari.metrics.sink.relocated.slf4j.LoggerFactory;
import org.apache.ambari.metrics.sink.relocated.zookeeper.server.Request;
import org.apache.ambari.metrics.sink.relocated.zookeeper.server.RequestProcessor;
import org.apache.ambari.metrics.sink.relocated.zookeeper.server.WorkerService;
import org.apache.ambari.metrics.sink.relocated.zookeeper.server.ZooKeeperCriticalThread;
import org.apache.ambari.metrics.sink.relocated.zookeeper.server.ZooKeeperServerListener;

public class CommitProcessor
extends ZooKeeperCriticalThread
implements RequestProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);
    public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS = "zookeeper.commitProcessor.numWorkerThreads";
    public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT = "zookeeper.commitProcessor.shutdownTimeout";
    protected final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue();
    protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue();
    protected final AtomicReference<Request> nextPending = new AtomicReference();
    private final AtomicReference<Request> currentlyCommitting = new AtomicReference();
    protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
    RequestProcessor nextProcessor;
    protected volatile boolean stopped = true;
    private long workerShutdownTimeoutMS;
    protected WorkerService workerPool;
    boolean matchSyncs;

    public CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener) {
        super("CommitProcessor:" + id, listener);
        this.nextProcessor = nextProcessor;
        this.matchSyncs = matchSyncs;
    }

    private boolean isProcessingRequest() {
        return this.numRequestsProcessing.get() != 0;
    }

    private boolean isWaitingForCommit() {
        return this.nextPending.get() != null;
    }

    private boolean isProcessingCommit() {
        return this.currentlyCommitting.get() != null;
    }

    protected boolean needCommit(Request request) {
        switch (request.type) {
            case 1: 
            case 2: 
            case 5: 
            case 7: 
            case 14: 
            case 15: 
            case 16: 
            case 19: 
            case 20: 
            case 21: {
                return true;
            }
            case 9: {
                return this.matchSyncs;
            }
            case -11: 
            case -10: {
                return !request.isLocalSession();
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            while (!this.stopped) {
                Request request;
                CommitProcessor commitProcessor = this;
                synchronized (commitProcessor) {
                    while (!this.stopped && (this.queuedRequests.isEmpty() || this.isWaitingForCommit() || this.isProcessingCommit()) && (this.committedRequests.isEmpty() || this.isProcessingRequest())) {
                        this.wait();
                    }
                }
                while (!(this.stopped || this.isWaitingForCommit() || this.isProcessingCommit() || (request = this.queuedRequests.poll()) == null)) {
                    if (this.needCommit(request)) {
                        this.nextPending.set(request);
                        continue;
                    }
                    this.sendToNextProcessor(request);
                }
                this.processCommitted();
            }
        }
        catch (Throwable e) {
            this.handleException(this.getName(), e);
        }
        LOG.info("CommitProcessor exited loop!");
    }

    protected void processCommitted() {
        if (!this.stopped && !this.isProcessingRequest() && this.committedRequests.peek() != null) {
            if (!this.isWaitingForCommit() && !this.queuedRequests.isEmpty()) {
                return;
            }
            Request request = this.committedRequests.poll();
            Request pending = this.nextPending.get();
            if (pending != null && pending.sessionId == request.sessionId && pending.cxid == request.cxid) {
                pending.setHdr(request.getHdr());
                pending.setTxn(request.getTxn());
                pending.zxid = request.zxid;
                this.currentlyCommitting.set(pending);
                this.nextPending.set(null);
                this.sendToNextProcessor(pending);
            } else {
                this.currentlyCommitting.set(request);
                this.sendToNextProcessor(request);
            }
        }
    }

    @Override
    public void start() {
        int numCores = Runtime.getRuntime().availableProcessors();
        int numWorkerThreads = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores);
        this.workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000L);
        LOG.info("Configuring CommitProcessor with " + (numWorkerThreads > 0 ? Integer.valueOf(numWorkerThreads) : "no") + " worker threads.");
        if (this.workerPool == null) {
            this.workerPool = new WorkerService("CommitProcWork", numWorkerThreads, true);
        }
        this.stopped = false;
        super.start();
    }

    private void sendToNextProcessor(Request request) {
        this.numRequestsProcessing.incrementAndGet();
        this.workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
    }

    private synchronized void wakeup() {
        this.notifyAll();
    }

    public void commit(Request request) {
        if (this.stopped || request == null) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Committing request:: " + request);
        }
        this.committedRequests.add(request);
        if (!this.isProcessingCommit()) {
            this.wakeup();
        }
    }

    @Override
    public void processRequest(Request request) {
        if (this.stopped) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing request:: " + request);
        }
        this.queuedRequests.add(request);
        if (!this.isWaitingForCommit()) {
            this.wakeup();
        }
    }

    private void halt() {
        this.stopped = true;
        this.wakeup();
        this.queuedRequests.clear();
        if (this.workerPool != null) {
            this.workerPool.stop();
        }
    }

    @Override
    public void shutdown() {
        LOG.info("Shutting down");
        this.halt();
        if (this.workerPool != null) {
            this.workerPool.join(this.workerShutdownTimeoutMS);
        }
        if (this.nextProcessor != null) {
            this.nextProcessor.shutdown();
        }
    }

    private class CommitWorkRequest
    extends WorkerService.WorkRequest {
        private final Request request;

        CommitWorkRequest(Request request) {
            this.request = request;
        }

        @Override
        public void cleanup() {
            if (!CommitProcessor.this.stopped) {
                LOG.error("Exception thrown by downstream processor, unable to continue.");
                CommitProcessor.this.halt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void doWork() throws RequestProcessor.RequestProcessorException {
            try {
                CommitProcessor.this.nextProcessor.processRequest(this.request);
            }
            finally {
                CommitProcessor.this.currentlyCommitting.compareAndSet(this.request, null);
                if (!(CommitProcessor.this.numRequestsProcessing.decrementAndGet() != 0 || CommitProcessor.this.queuedRequests.isEmpty() && CommitProcessor.this.committedRequests.isEmpty())) {
                    CommitProcessor.this.wakeup();
                }
            }
        }
    }
}

