/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.function;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.functions.Utils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.function.PulsarFunction;
import org.springframework.pulsar.function.PulsarFunctionOperations;
import org.springframework.pulsar.function.PulsarSink;
import org.springframework.pulsar.function.PulsarSource;

public class PulsarFunctionAdministration
implements SmartLifecycle {
    private final LogAccessor logger = new LogAccessor(this.getClass());
    private final PulsarAdministration pulsarAdministration;
    private final ObjectProvider<PulsarFunction> pulsarFunctions;
    private final ObjectProvider<PulsarSink> pulsarSinks;
    private final ObjectProvider<PulsarSource> pulsarSources;
    private final List<PulsarFunctionOperations<?>> processedFunctions;
    private final boolean failFast;
    private final boolean propagateFailures;
    private final boolean propagateStopFailures;
    private volatile boolean running;

    public PulsarFunctionAdministration(PulsarAdministration pulsarAdministration, ObjectProvider<PulsarFunction> pulsarFunctions, ObjectProvider<PulsarSink> pulsarSinks, ObjectProvider<PulsarSource> pulsarSources, boolean failFast, boolean propagateFailures, boolean propagateStopFailures) {
        this.pulsarAdministration = pulsarAdministration;
        this.pulsarFunctions = pulsarFunctions;
        this.pulsarSinks = pulsarSinks;
        this.pulsarSources = pulsarSources;
        this.failFast = failFast;
        this.propagateFailures = propagateFailures;
        this.propagateStopFailures = propagateStopFailures;
        this.processedFunctions = new ArrayList();
    }

    public synchronized void start() {
        if (!this.running) {
            this.logger.debug(() -> "Processing Pulsar Functions");
            long start = System.currentTimeMillis();
            this.createOrUpdateUserDefinedFunctions();
            this.running = true;
            long duration = System.currentTimeMillis() - start;
            this.logger.debug(() -> "Processed Pulsar Functions in " + duration + " ms");
        }
    }

    public synchronized void stop() {
        if (this.running) {
            this.logger.debug(() -> "Enforcing stop policy on Pulsar Functions");
            this.running = false;
            long start = System.currentTimeMillis();
            this.enforceStopPolicyOnUserDefinedFunctions();
            long duration = System.currentTimeMillis() - start;
            this.logger.debug(() -> "Enforced stop policy on Pulsar Functions in " + duration + " ms");
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public void createOrUpdateUserDefinedFunctions() {
        Stream allFunctions = Stream.concat(Stream.concat(this.pulsarFunctions.orderedStream(), this.pulsarSinks.orderedStream()), this.pulsarSources.orderedStream());
        List functionsToProcess = allFunctions.toList();
        if (functionsToProcess.isEmpty()) {
            this.logger.debug((CharSequence)"No user defined functions to process.");
            return;
        }
        try (PulsarAdmin admin = this.pulsarAdministration.createAdminClient();){
            LinkedHashMap failures = new LinkedHashMap();
            for (PulsarFunctionOperations function : functionsToProcess) {
                Optional<Exception> failure = this.createOrUpdateFunction(function, admin);
                if (failure.isEmpty()) {
                    this.processedFunctions.add(function);
                    continue;
                }
                failures.put(function, failure.get());
                if (!this.failFast) continue;
                break;
            }
            if (!failures.isEmpty()) {
                String msg = "Encountered " + failures.size() + " error(s) creating/updating functions: " + failures;
                if (this.propagateFailures) {
                    throw new PulsarFunctionException(msg, failures);
                }
                this.logger.error(() -> msg);
            }
        }
        catch (PulsarClientException ex) {
            String msg = "Unable to create/update functions - could not create PulsarAdmin: " + ex.getMessage();
            if (this.propagateFailures) {
                throw new PulsarException(msg, ex);
            }
            this.logger.error((Throwable)ex, () -> msg);
        }
    }

    private Optional<Exception> createOrUpdateFunction(PulsarFunctionOperations<?> function, PulsarAdmin admin) {
        try {
            String archive = function.archive();
            boolean usePackageUrl = Utils.isFunctionPackageUrlSupported((String)archive);
            if (function.functionExists(admin)) {
                if (usePackageUrl) {
                    this.logger.info(() -> this.buildLogMsg(function, true, true));
                    function.updateWithUrl(admin);
                } else {
                    this.logger.info(() -> this.buildLogMsg(function, true, false));
                    function.update(admin);
                }
            } else if (usePackageUrl) {
                this.logger.info(() -> this.buildLogMsg(function, false, true));
                function.createWithUrl(admin);
            } else {
                this.logger.info(() -> this.buildLogMsg(function, false, false));
                function.create(admin);
            }
            return Optional.empty();
        }
        catch (PulsarAdminException ex) {
            if (ex.getStatusCode() == 400 && "Update contains no change".equals(ex.getHttpError())) {
                this.logger.debug(() -> "Update contained no change for " + this.functionDesc(function));
                return Optional.empty();
            }
            return Optional.of(ex);
        }
        catch (Exception ex) {
            return Optional.of(ex);
        }
    }

    private String buildLogMsg(PulsarFunctionOperations<?> function, boolean isUpdate, boolean isUrlArchive) {
        return "%s %s (using %s archive: %s)".formatted(isUpdate ? "Updating" : "Creating", this.functionDesc(function), isUrlArchive ? "url" : "local", function.archive());
    }

    List<PulsarFunctionOperations<?>> getProcessedFunctions() {
        return this.processedFunctions;
    }

    public void enforceStopPolicyOnUserDefinedFunctions() {
        if (this.processedFunctions.isEmpty()) {
            this.logger.debug((CharSequence)"No processed functions to enforce stop policy on");
            return;
        }
        try (PulsarAdmin admin = this.pulsarAdministration.createAdminClient();){
            LinkedHashMap failures = new LinkedHashMap();
            Collections.reverse(this.processedFunctions);
            for (PulsarFunctionOperations<?> function : this.processedFunctions) {
                Optional<Exception> failure = this.enforceStopPolicyOnFunction(function, admin);
                failure.ifPresent(e -> failures.put(function, (Exception)e));
            }
            if (!failures.isEmpty()) {
                String msg = "Encountered " + failures.size() + " error(s) enforcing stop policy on functions: " + failures;
                if (this.propagateStopFailures) {
                    throw new PulsarFunctionException(msg, failures);
                }
                this.logger.error(() -> msg);
            }
        }
        catch (PulsarClientException ex) {
            String msg = "Unable to enforce stop policy on functions - could not create PulsarAdmin: " + ex.getMessage();
            if (this.propagateStopFailures) {
                throw new PulsarException(msg, ex);
            }
            this.logger.error((Throwable)ex, () -> msg);
        }
    }

    private Optional<Exception> enforceStopPolicyOnFunction(PulsarFunctionOperations<?> function, PulsarAdmin admin) {
        return switch (function.stopPolicy()) {
            default -> throw new IncompatibleClassChangeError();
            case PulsarFunctionOperations.FunctionStopPolicy.NONE -> {
                this.logger.info(() -> "No stop policy for %s - leaving alone".formatted(this.functionDesc(function)));
                yield Optional.empty();
            }
            case PulsarFunctionOperations.FunctionStopPolicy.STOP -> {
                this.logger.info(() -> "Stopping %s".formatted(this.functionDesc(function)));
                yield this.safeInvoke(() -> function.stop(admin));
            }
            case PulsarFunctionOperations.FunctionStopPolicy.DELETE -> {
                this.logger.info(() -> "Deleting %s".formatted(this.functionDesc(function)));
                yield this.safeInvoke(() -> function.delete(admin));
            }
        };
    }

    private Optional<Exception> safeInvoke(Runnable invocation) {
        try {
            invocation.run();
        }
        catch (Exception ex) {
            return Optional.of(ex);
        }
        return Optional.empty();
    }

    private String functionDesc(PulsarFunctionOperations<?> function) {
        return "'%s' %s".formatted(function.name(), function.type().toString().toLowerCase(Locale.ROOT));
    }

    public static class PulsarFunctionException
    extends PulsarException {
        private final Map<PulsarFunctionOperations<?>, Exception> failures;

        public PulsarFunctionException(String msg, Map<PulsarFunctionOperations<?>, Exception> failures) {
            super(msg);
            this.failures = failures;
        }

        public Map<PulsarFunctionOperations<?>, Exception> getFailures() {
            return this.failures;
        }
    }
}

