/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.jdbc.internal.executor;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;

public class BufferReduceStatementExecutor
implements JdbcBatchStatementExecutor<RowData> {
    private final JdbcBatchStatementExecutor<RowData> upsertExecutor;
    private final JdbcBatchStatementExecutor<RowData> deleteExecutor;
    private final Function<RowData, RowData> keyExtractor;
    private final Function<RowData, RowData> valueTransform;
    private final Map<RowData, Tuple2<Boolean, RowData>> reduceBuffer = new HashMap<RowData, Tuple2<Boolean, RowData>>();

    public BufferReduceStatementExecutor(JdbcBatchStatementExecutor<RowData> upsertExecutor, JdbcBatchStatementExecutor<RowData> deleteExecutor, Function<RowData, RowData> keyExtractor, Function<RowData, RowData> valueTransform) {
        this.upsertExecutor = upsertExecutor;
        this.deleteExecutor = deleteExecutor;
        this.keyExtractor = keyExtractor;
        this.valueTransform = valueTransform;
    }

    @Override
    public void prepareStatements(Connection connection) throws SQLException {
        this.upsertExecutor.prepareStatements(connection);
        this.deleteExecutor.prepareStatements(connection);
    }

    @Override
    public void addToBatch(RowData record) throws SQLException {
        RowData key = this.keyExtractor.apply(record);
        boolean flag = this.changeFlag(record.getRowKind());
        RowData value = this.valueTransform.apply(record);
        this.reduceBuffer.put(key, (Tuple2<Boolean, RowData>)Tuple2.of((Object)flag, (Object)value));
    }

    private boolean changeFlag(RowKind rowKind) {
        switch (rowKind) {
            case INSERT: 
            case UPDATE_AFTER: {
                return true;
            }
            case DELETE: 
            case UPDATE_BEFORE: {
                return false;
            }
        }
        throw new UnsupportedOperationException(String.format("Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE, but get: %s.", rowKind));
    }

    @Override
    public void executeBatch() throws SQLException {
        for (Tuple2<Boolean, RowData> tuple : this.reduceBuffer.values()) {
            if (((Boolean)tuple.f0).booleanValue()) {
                this.upsertExecutor.addToBatch((RowData)tuple.f1);
                continue;
            }
            this.deleteExecutor.addToBatch((RowData)tuple.f1);
        }
        this.upsertExecutor.executeBatch();
        this.deleteExecutor.executeBatch();
        this.reduceBuffer.clear();
    }

    @Override
    public void closeStatements() throws SQLException {
        this.upsertExecutor.closeStatements();
        this.deleteExecutor.closeStatements();
    }
}

