/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.source.BulkTableQuerier;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig;
import io.confluent.connect.jdbc.source.JdbcSourceTaskConfig;
import io.confluent.connect.jdbc.source.OffsetProtocols;
import io.confluent.connect.jdbc.source.TableQuerier;
import io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier;
import io.confluent.connect.jdbc.source.TimestampTableQuerier;
import io.confluent.connect.jdbc.util.CachedConnectionProvider;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.Version;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSourceTask
extends SourceTask {
    private static final int CONSECUTIVE_EMPTY_RESULTS_BEFORE_RETURN = 3;
    private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
    private Time time;
    private JdbcSourceTaskConfig config;
    private DatabaseDialect dialect;
    CachedConnectionProvider cachedConnectionProvider;
    PriorityQueue<TableQuerier> tableQueue = new PriorityQueue();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicLong taskThreadId = new AtomicLong(0L);
    int maxRetriesPerQuerier;

    public JdbcSourceTask() {
        this.time = Time.SYSTEM;
    }

    public JdbcSourceTask(Time time) {
        this.time = time;
    }

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> properties) {
        log.info("Starting JDBC source task");
        try {
            this.config = new JdbcSourceTaskConfig(properties);
        }
        catch (ConfigException e) {
            throw new ConfigException("Couldn't start JdbcSourceTask due to configuration error", (Object)e);
        }
        List<String> tables = this.config.getList("tables");
        Boolean tablesFetched = this.config.getBoolean("tables.fetched");
        String query = this.config.getString("query");
        if (tables.isEmpty() && query.isEmpty()) {
            if (!tablesFetched.booleanValue()) {
                this.taskThreadId.set(Thread.currentThread().getId());
                log.info("Started JDBC source task. Waiting for DB tables to be fetched.");
                return;
            }
            throw new ConfigException("Task is being killed because it was not assigned a table nor a query to execute. If run in table mode please make sure that the tables exist on the database. If the table does exist on the database, we recommend using the fully qualified table name.");
        }
        if (!tables.isEmpty() && !query.isEmpty()) {
            throw new ConfigException("Invalid configuration: a JdbcSourceTask cannot have both a table and a query assigned to it");
        }
        String url = this.config.getString("connection.url");
        int maxConnAttempts = this.config.getInt("connection.attempts");
        long retryBackoff = this.config.getLong("connection.backoff.ms");
        String dialectName = this.config.getString("dialect.name");
        if (dialectName != null && !dialectName.trim().isEmpty()) {
            this.dialect = DatabaseDialects.create(dialectName, this.config);
        } else {
            log.info("Finding the database dialect that is best fit for the provided JDBC URL.");
            this.dialect = DatabaseDialects.findBestFor(url, this.config);
        }
        log.info("Using JDBC dialect {}", (Object)this.dialect.name());
        this.cachedConnectionProvider = this.connectionProvider(maxConnAttempts, retryBackoff);
        this.dialect.setConnectionIsolationMode(this.cachedConnectionProvider.getConnection(), JdbcSourceConnectorConfig.TransactionIsolationMode.valueOf(this.config.getString("transaction.isolation.mode")));
        TableQuerier.QueryMode queryMode = !query.isEmpty() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE;
        List<String> tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY ? Collections.singletonList(query) : tables;
        String mode = this.config.getString("mode");
        HashMap<String, List<Map<String, String>>> partitionsByTableFqn = new HashMap<String, List<Map<String, String>>>();
        Map offsets = null;
        if (mode.equals("incrementing") || mode.equals("timestamp") || mode.equals("timestamp+incrementing")) {
            ArrayList<Map<String, String>> partitions = new ArrayList<Map<String, String>>(tables.size());
            switch (queryMode) {
                case TABLE: {
                    log.trace("Starting in TABLE mode");
                    for (String table : tables) {
                        List<Map<String, String>> tablePartitions = this.possibleTablePartitions(table);
                        partitions.addAll(tablePartitions);
                        partitionsByTableFqn.put(table, tablePartitions);
                    }
                    break;
                }
                case QUERY: {
                    log.trace("Starting in QUERY mode");
                    partitions.add(Collections.singletonMap("query", "query"));
                    break;
                }
                default: {
                    throw new ConfigException("Unknown query mode: " + (Object)((Object)queryMode));
                }
            }
            offsets = this.context.offsetStorageReader().offsets(partitions);
            log.trace("The partition offsets are {}", (Object)offsets);
        }
        String incrementingColumn = this.config.getString("incrementing.column.name");
        List timestampColumns = this.config.getList("timestamp.column.name");
        Long timestampDelayInterval = this.config.getLong("timestamp.delay.interval.ms");
        boolean validateNonNulls = this.config.getBoolean("validate.non.null");
        TimeZone timeZone = this.config.timeZone();
        String suffix = this.config.getString("query.suffix").trim();
        if (queryMode.equals((Object)TableQuerier.QueryMode.TABLE)) {
            this.validateColumnsExist(mode, incrementingColumn, timestampColumns, tables.get(0));
        }
        for (String tableOrQuery : tablesOrQuery) {
            List<Map<String, String>> tablePartitionsToCheck;
            log.trace("Task executing in {} mode", (Object)queryMode);
            switch (queryMode) {
                case TABLE: {
                    if (validateNonNulls) {
                        this.validateNonNullable(mode, tableOrQuery, incrementingColumn, timestampColumns);
                    }
                    tablePartitionsToCheck = (List<Map<String, String>>)partitionsByTableFqn.get(tableOrQuery);
                    break;
                }
                case QUERY: {
                    Map<String, String> partition = Collections.singletonMap("query", "query");
                    tablePartitionsToCheck = Collections.singletonList(partition);
                    break;
                }
                default: {
                    throw new ConfigException("Unexpected query mode: " + (Object)((Object)queryMode));
                }
            }
            Map offset = null;
            if (offsets != null) {
                for (Map<String, String> toCheckPartition : tablePartitionsToCheck) {
                    offset = (Map)offsets.get(toCheckPartition);
                    if (offset == null) continue;
                    log.info("Found offset {} for partition {}", (Object)offsets, toCheckPartition);
                    break;
                }
            }
            offset = this.computeInitialOffset(tableOrQuery, offset, timeZone);
            String topicPrefix = this.config.topicPrefix();
            JdbcSourceConnectorConfig.TimestampGranularity timestampGranularity = JdbcSourceConnectorConfig.TimestampGranularity.get(this.config);
            if (mode.equals("bulk")) {
                this.tableQueue.add(new BulkTableQuerier(this.dialect, queryMode, tableOrQuery, topicPrefix, suffix));
                continue;
            }
            if (mode.equals("incrementing")) {
                this.tableQueue.add(new TimestampIncrementingTableQuerier(this.dialect, queryMode, tableOrQuery, topicPrefix, null, incrementingColumn, offset, timestampDelayInterval, timeZone, suffix, timestampGranularity));
                continue;
            }
            if (mode.equals("timestamp")) {
                this.tableQueue.add(new TimestampTableQuerier(this.dialect, queryMode, tableOrQuery, topicPrefix, timestampColumns, offset, timestampDelayInterval, timeZone, suffix, timestampGranularity));
                continue;
            }
            if (!mode.endsWith("timestamp+incrementing")) continue;
            this.tableQueue.add(new TimestampIncrementingTableQuerier(this.dialect, queryMode, tableOrQuery, topicPrefix, timestampColumns, incrementingColumn, offset, timestampDelayInterval, timeZone, suffix, timestampGranularity));
        }
        this.running.set(true);
        this.taskThreadId.set(Thread.currentThread().getId());
        log.info("Started JDBC source task");
        this.maxRetriesPerQuerier = this.config.getInt("query.retry.attempts");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void validateColumnsExist(String mode, String incrementingColumn, List<String> timestampColumns, String table) {
        try {
            Connection conn = this.cachedConnectionProvider.getConnection();
            boolean autoCommit = conn.getAutoCommit();
            try {
                Set missingTsColumns;
                log.info("Validating columns exist for table");
                conn.setAutoCommit(true);
                Map<ColumnId, ColumnDefinition> defnsById = this.dialect.describeColumns(conn, table, null);
                Set columnNames = defnsById.keySet().stream().map(ColumnId::name).map(String::toLowerCase).collect(Collectors.toSet());
                if ((mode.equals("incrementing") || mode.equals("timestamp+incrementing")) && !incrementingColumn.isEmpty() && !columnNames.contains(incrementingColumn.toLowerCase(Locale.getDefault()))) {
                    throw new ConfigException("Incrementing column: " + incrementingColumn + " does not exist.");
                }
                if ((mode.equals("timestamp") || mode.equals("timestamp+incrementing")) && !timestampColumns.isEmpty() && !(missingTsColumns = timestampColumns.stream().filter(tsColumn -> !columnNames.contains(tsColumn.toLowerCase(Locale.getDefault()))).collect(Collectors.toSet())).isEmpty()) {
                    throw new ConfigException("Timestamp columns: " + String.join((CharSequence)", ", missingTsColumns) + " do not exist.");
                }
            }
            finally {
                conn.setAutoCommit(autoCommit);
            }
        }
        catch (SQLException e) {
            throw new ConnectException("Failed trying to validate that columns used for offsets exist", (Throwable)e);
        }
    }

    protected CachedConnectionProvider connectionProvider(int maxConnAttempts, long retryBackoff) {
        return new CachedConnectionProvider(this.dialect, maxConnAttempts, retryBackoff){

            @Override
            protected void onConnect(Connection connection) throws SQLException {
                super.onConnect(connection);
                connection.setAutoCommit(false);
            }
        };
    }

    private List<Map<String, String>> possibleTablePartitions(String table) {
        TableId tableId = this.dialect.parseTableIdentifier(table);
        return Arrays.asList(OffsetProtocols.sourcePartitionForProtocolV1(tableId), OffsetProtocols.sourcePartitionForProtocolV0(tableId));
    }

    protected Map<String, Object> computeInitialOffset(String tableOrQuery, Map<String, Object> partitionOffset, TimeZone timezone) {
        if (partitionOffset != null) {
            log.info("Partition offset for '{}' is not null. Using existing offset.", (Object)tableOrQuery);
            return partitionOffset;
        }
        log.info("Partition offset for '{}' is null. Computing initial offset.", (Object)tableOrQuery);
        HashMap<String, Long> initialPartitionOffset = null;
        Long timestampInitial = this.config.getLong("timestamp.initial");
        if (timestampInitial != null) {
            if (timestampInitial == JdbcSourceConnectorConfig.TIMESTAMP_INITIAL_CURRENT) {
                try {
                    Connection con = this.cachedConnectionProvider.getConnection();
                    Calendar cal = Calendar.getInstance(timezone);
                    timestampInitial = this.dialect.currentTimeOnDB(con, cal).getTime();
                }
                catch (SQLException e) {
                    throw new ConnectException("Error while getting initial timestamp from database", (Throwable)e);
                }
            }
            initialPartitionOffset = new HashMap<String, Long>();
            initialPartitionOffset.put("timestamp", timestampInitial);
            log.info("No offsets found for '{}', so using configured timestamp {}", (Object)tableOrQuery, (Object)timestampInitial);
        }
        return initialPartitionOffset;
    }

    public void stop() throws ConnectException {
        log.info("Stopping JDBC source task");
        this.running.set(false);
        if (this.taskThreadId.longValue() == Thread.currentThread().getId()) {
            this.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void closeResources() {
        log.info("Closing resources for JDBC source task");
        try {
            if (this.cachedConnectionProvider != null) {
                this.cachedConnectionProvider.close(true);
            }
        }
        catch (Throwable t) {
            log.warn("Error while closing the connections", t);
        }
        finally {
            this.cachedConnectionProvider = null;
            try {
                if (this.dialect != null) {
                    this.dialect.close();
                }
            }
            catch (Throwable t) {
                log.warn("Error while closing the {} dialect: ", (Object)this.dialect.name(), (Object)t);
            }
            finally {
                this.dialect = null;
            }
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        log.trace("Polling for new data");
        Boolean tablesFetched = this.config.getBoolean("tables.fetched");
        String query = this.config.getString("query");
        if (query.isEmpty() && !tablesFetched.booleanValue()) {
            long sleepMs = this.config.getInt("poll.interval.ms").intValue();
            log.trace("Waiting for tables to be fetched from the database. No records will be polled. Waiting {} ms to poll", (Object)sleepMs);
            this.time.sleep(sleepMs);
            return null;
        }
        Map consecutiveEmptyResults = this.tableQueue.stream().collect(Collectors.toMap(Function.identity(), q -> 0));
        while (this.running.get()) {
            long now;
            long nextUpdate;
            long sleepMs;
            TableQuerier querier = this.tableQueue.peek();
            if (!querier.querying() && (sleepMs = Math.min((nextUpdate = querier.getLastUpdate() + (long)this.config.getInt("poll.interval.ms").intValue()) - (now = this.time.milliseconds()), 100L)) > 0L) {
                log.trace("Waiting {} ms to poll {} next", (Object)(nextUpdate - now), (Object)querier.toString());
                this.time.sleep(sleepMs);
                continue;
            }
            ArrayList<SourceRecord> results = new ArrayList<SourceRecord>();
            try {
                log.debug("Checking for next block of results from {}", (Object)querier.toString());
                querier.maybeStartQuery(this.cachedConnectionProvider.getConnection());
                int batchMaxRows = this.config.getInt("batch.max.rows");
                boolean hadNext = true;
                while (results.size() < batchMaxRows && (hadNext = querier.next())) {
                    results.add(querier.extractRecord());
                }
                querier.resetRetryCount();
                if (!hadNext) {
                    this.resetAndRequeueHead(querier, false);
                }
                if (results.isEmpty()) {
                    consecutiveEmptyResults.compute(querier, (k, v) -> v + 1);
                    log.trace("No updates for {}", (Object)querier.toString());
                    if (Collections.min(consecutiveEmptyResults.values()) < 3) continue;
                    log.trace("More than 3 consecutive empty results for all queriers, returning");
                    return null;
                }
                consecutiveEmptyResults.put(querier, 0);
                log.debug("Returning {} records for {}", (Object)results.size(), (Object)querier);
                return results;
            }
            catch (SQLNonTransientException sqle) {
                log.error("Non-transient SQL exception while running query for table: {}", (Object)querier, (Object)sqle);
                this.resetAndRequeueHead(querier, true);
                this.closeResources();
                throw new ConnectException((Throwable)sqle);
            }
            catch (SQLException sqle) {
                log.error("SQL exception while running query for table: {}, {}. Attempting retry {} of {} attempts.", new Object[]{querier, sqle, querier.getAttemptedRetryCount() + 1, this.maxRetriesPerQuerier});
                this.resetAndRequeueHead(querier, true);
                if (this.maxRetriesPerQuerier > 0 && querier.getAttemptedRetryCount() >= this.maxRetriesPerQuerier) {
                    this.closeResources();
                    throw new ConnectException("Failed to Query table after retries", (Throwable)sqle);
                }
                querier.incrementRetryCount();
                return null;
            }
            catch (Throwable t) {
                log.error("Failed to run query for table: {}", (Object)querier, (Object)t);
                this.resetAndRequeueHead(querier, true);
                this.closeResources();
                throw t;
            }
        }
        this.shutdown();
        return null;
    }

    private void shutdown() {
        TableQuerier querier = this.tableQueue.peek();
        if (querier != null) {
            this.resetAndRequeueHead(querier, true);
        }
        this.closeResources();
    }

    private void resetAndRequeueHead(TableQuerier expectedHead, boolean resetOffset) {
        log.debug("Resetting querier {}", (Object)expectedHead.toString());
        TableQuerier removedQuerier = this.tableQueue.poll();
        assert (removedQuerier == expectedHead);
        expectedHead.reset(this.time.milliseconds(), resetOffset);
        this.tableQueue.add(expectedHead);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void validateNonNullable(String incrementalMode, String table, String incrementingColumn, List<String> timestampColumns) {
        log.info("Validating non-nullable fields for table: {}", (Object)table);
        try {
            HashSet<String> lowercaseTsColumns = new HashSet<String>();
            for (String timestampColumn : timestampColumns) {
                lowercaseTsColumns.add(timestampColumn.toLowerCase(Locale.getDefault()));
            }
            boolean incrementingOptional = false;
            boolean atLeastOneTimestampNotOptional = false;
            Connection conn = this.cachedConnectionProvider.getConnection();
            boolean autoCommit = conn.getAutoCommit();
            try {
                conn.setAutoCommit(true);
                Map<ColumnId, ColumnDefinition> defnsById = this.dialect.describeColumns(conn, table, null);
                for (ColumnDefinition defn : defnsById.values()) {
                    String columnName = defn.id().name();
                    if (columnName.equalsIgnoreCase(incrementingColumn)) {
                        incrementingOptional = defn.isOptional();
                        continue;
                    }
                    if (!lowercaseTsColumns.contains(columnName.toLowerCase(Locale.getDefault())) || defn.isOptional()) continue;
                    atLeastOneTimestampNotOptional = true;
                }
            }
            finally {
                conn.setAutoCommit(autoCommit);
            }
            if ((incrementalMode.equals("incrementing") || incrementalMode.equals("timestamp+incrementing")) && incrementingOptional) {
                throw new ConnectException("Cannot make incremental queries using incrementing column " + incrementingColumn + " on " + table + " because this column is nullable.");
            }
            if ((incrementalMode.equals("timestamp") || incrementalMode.equals("timestamp+incrementing")) && !atLeastOneTimestampNotOptional) {
                throw new ConnectException("Cannot make incremental queries using timestamp columns " + String.join((CharSequence)",", timestampColumns) + " on " + table + " because all of these columns are nullable.");
            }
        }
        catch (SQLException e) {
            throw new ConnectException("Failed trying to validate that columns used for offsets are NOT NULL", (Throwable)e);
        }
    }
}

