diff options
Diffstat (limited to 'bank/src/main/kotlin/tech/libeufin/bank/Database.kt')
-rw-r--r-- | bank/src/main/kotlin/tech/libeufin/bank/Database.kt | 116 |
1 files changed, 110 insertions, 6 deletions
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt index 066d0cc2..32b925d8 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt @@ -130,7 +130,7 @@ fun resetDatabaseTables(dbConfig: String, sqlDir: String) { dbConn.execSQLUpdate(sqlDrop) } -class Database(private val dbConfig: String, private val bankCurrency: String) { +class Database(private val dbConfig: String, internal val bankCurrency: String) { private var dbConn: PgConnection? = null private var dbCtr: Int = 0 private val preparedStatements: MutableMap<String, PreparedStatement> = mutableMapOf() @@ -139,6 +139,15 @@ class Database(private val dbConfig: String, private val bankCurrency: String) { Class.forName("org.postgresql.Driver") } + internal fun conn(): PgConnection? { + // Translate "normal" postgresql:// connection URI to something that JDBC likes. + val jdbcConnStr = getJdbcConnectionFromPg(dbConfig) + logger.info("connecting to database via JDBC string '$jdbcConnStr'") + val conn = DriverManager.getConnection(jdbcConnStr).unwrap(PgConnection::class.java) + conn?.execSQLUpdate("SET search_path TO libeufin_bank;") + return conn + } + private fun reconnect() { dbCtr++ val myDbConn = dbConn @@ -146,12 +155,8 @@ class Database(private val dbConfig: String, private val bankCurrency: String) { return dbConn?.close() preparedStatements.clear() - // Translate "normal" postgresql:// connection URI to something that JDBC likes. - val jdbcConnStr = getJdbcConnectionFromPg(dbConfig) - logger.info("connecting to database via JDBC string '$jdbcConnStr'") - dbConn = DriverManager.getConnection(jdbcConnStr).unwrap(PgConnection::class.java) + dbConn = conn() dbCtr = 0 - dbConn?.execSQLUpdate("SET search_path TO libeufin_bank;") } private fun prepare(sql: String): PreparedStatement { @@ -1367,3 +1372,102 @@ class Database(private val dbConfig: String, private val bankCurrency: String) { } } } + +// TODO perf, merge with Database to reuse connection and prepared statement +class HistoryDatabaseCtx(private val db: Database, delta: Long, private val bankAccountId: Long, private val direction: TransactionDirection, private var poll_ms: Long): java.io.Closeable { + private val conn = db.conn() ?: throw internalServerError("DB connection down"); + private val channel = "${direction.name}_$bankAccountId"; + private var stmt: PreparedStatement; + + init { + val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC") + stmt = conn.prepareStatement(""" + SELECT + creditor_payto_uri + ,creditor_name + ,debtor_payto_uri + ,debtor_name + ,subject + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,transaction_date + ,account_servicer_reference + ,payment_information_id + ,end_to_end_id + ,bank_account_id + ,bank_transaction_id + FROM bank_account_transactions + WHERE bank_transaction_id ${cmpOp} ? + AND bank_account_id=? + AND direction=?::direction_enum + ORDER BY bank_transaction_id ${orderBy} + LIMIT ? + """) + + // Older notification cannot appear TODO ? + if (delta < 0) { + poll_ms = 0; + } + if (poll_ms > 0) { + conn.execSQLUpdate("LISTEN $channel"); + } + } + + fun bankTransactionGetHistory(start: Long, delta: Long): List<BankAccountTransaction> { + stmt.setLong(1, start) + stmt.setLong(2, bankAccountId) + stmt.setString(3, direction.name) + stmt.setLong(4, abs(delta)) + val rs = stmt.executeQuery() + rs.use { + val ret = mutableListOf<BankAccountTransaction>() + if (!it.next()) return ret + do { + ret.add( + BankAccountTransaction( + creditorPaytoUri = it.getString("creditor_payto_uri"), + creditorName = it.getString("creditor_name"), + debtorPaytoUri = it.getString("debtor_payto_uri"), + debtorName = it.getString("debtor_name"), + amount = TalerAmount( + it.getLong("amount_val"), + it.getInt("amount_frac"), + db.bankCurrency + ), + accountServicerReference = it.getString("account_servicer_reference"), + endToEndId = it.getString("end_to_end_id"), + direction = direction, + bankAccountId = it.getLong("bank_account_id"), + paymentInformationId = it.getString("payment_information_id"), + subject = it.getString("subject"), + transactionDate = it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank(), + dbRowId = it.getLong("bank_transaction_id") + )) + } while (it.next()) + return ret + } + } + + /** Wait for more transactions, return true if new transactions have appeared */ + suspend fun pool(start: Long, delta: Long): Boolean { + if (poll_ms <= 0 || delta == 0L) + return false; + var remaining = abs(delta); + do { + val pollStart = System.currentTimeMillis() + conn.getNotifications(poll_ms.toInt()).forEach { + val id = it.parameter.toLong() + val new = if (delta < 0) id < start else id > start + if (new) remaining -= 1 + } + val pollEnd = System.currentTimeMillis() + poll_ms -= pollEnd - pollStart + } while (poll_ms > 0 && remaining > 0L) + return true + } + + override fun close() { + conn.execSQLUpdate("UNLISTEN $channel"); + conn.close() + } +}
\ No newline at end of file |