summaryrefslogtreecommitdiff
path: root/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
diff options
context:
space:
mode:
Diffstat (limited to 'bank/src/main/kotlin/tech/libeufin/bank/Database.kt')
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/Database.kt116
1 files changed, 110 insertions, 6 deletions
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
index 066d0cc2..32b925d8 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
@@ -130,7 +130,7 @@ fun resetDatabaseTables(dbConfig: String, sqlDir: String) {
dbConn.execSQLUpdate(sqlDrop)
}
-class Database(private val dbConfig: String, private val bankCurrency: String) {
+class Database(private val dbConfig: String, internal val bankCurrency: String) {
private var dbConn: PgConnection? = null
private var dbCtr: Int = 0
private val preparedStatements: MutableMap<String, PreparedStatement> = mutableMapOf()
@@ -139,6 +139,15 @@ class Database(private val dbConfig: String, private val bankCurrency: String) {
Class.forName("org.postgresql.Driver")
}
+ internal fun conn(): PgConnection? {
+ // Translate "normal" postgresql:// connection URI to something that JDBC likes.
+ val jdbcConnStr = getJdbcConnectionFromPg(dbConfig)
+ logger.info("connecting to database via JDBC string '$jdbcConnStr'")
+ val conn = DriverManager.getConnection(jdbcConnStr).unwrap(PgConnection::class.java)
+ conn?.execSQLUpdate("SET search_path TO libeufin_bank;")
+ return conn
+ }
+
private fun reconnect() {
dbCtr++
val myDbConn = dbConn
@@ -146,12 +155,8 @@ class Database(private val dbConfig: String, private val bankCurrency: String) {
return
dbConn?.close()
preparedStatements.clear()
- // Translate "normal" postgresql:// connection URI to something that JDBC likes.
- val jdbcConnStr = getJdbcConnectionFromPg(dbConfig)
- logger.info("connecting to database via JDBC string '$jdbcConnStr'")
- dbConn = DriverManager.getConnection(jdbcConnStr).unwrap(PgConnection::class.java)
+ dbConn = conn()
dbCtr = 0
- dbConn?.execSQLUpdate("SET search_path TO libeufin_bank;")
}
private fun prepare(sql: String): PreparedStatement {
@@ -1367,3 +1372,102 @@ class Database(private val dbConfig: String, private val bankCurrency: String) {
}
}
}
+
+// TODO perf, merge with Database to reuse connection and prepared statement
+class HistoryDatabaseCtx(private val db: Database, delta: Long, private val bankAccountId: Long, private val direction: TransactionDirection, private var poll_ms: Long): java.io.Closeable {
+ private val conn = db.conn() ?: throw internalServerError("DB connection down");
+ private val channel = "${direction.name}_$bankAccountId";
+ private var stmt: PreparedStatement;
+
+ init {
+ val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC")
+ stmt = conn.prepareStatement("""
+ SELECT
+ creditor_payto_uri
+ ,creditor_name
+ ,debtor_payto_uri
+ ,debtor_name
+ ,subject
+ ,(amount).val AS amount_val
+ ,(amount).frac AS amount_frac
+ ,transaction_date
+ ,account_servicer_reference
+ ,payment_information_id
+ ,end_to_end_id
+ ,bank_account_id
+ ,bank_transaction_id
+ FROM bank_account_transactions
+ WHERE bank_transaction_id ${cmpOp} ?
+ AND bank_account_id=?
+ AND direction=?::direction_enum
+ ORDER BY bank_transaction_id ${orderBy}
+ LIMIT ?
+ """)
+
+ // Older notification cannot appear TODO ?
+ if (delta < 0) {
+ poll_ms = 0;
+ }
+ if (poll_ms > 0) {
+ conn.execSQLUpdate("LISTEN $channel");
+ }
+ }
+
+ fun bankTransactionGetHistory(start: Long, delta: Long): List<BankAccountTransaction> {
+ stmt.setLong(1, start)
+ stmt.setLong(2, bankAccountId)
+ stmt.setString(3, direction.name)
+ stmt.setLong(4, abs(delta))
+ val rs = stmt.executeQuery()
+ rs.use {
+ val ret = mutableListOf<BankAccountTransaction>()
+ if (!it.next()) return ret
+ do {
+ ret.add(
+ BankAccountTransaction(
+ creditorPaytoUri = it.getString("creditor_payto_uri"),
+ creditorName = it.getString("creditor_name"),
+ debtorPaytoUri = it.getString("debtor_payto_uri"),
+ debtorName = it.getString("debtor_name"),
+ amount = TalerAmount(
+ it.getLong("amount_val"),
+ it.getInt("amount_frac"),
+ db.bankCurrency
+ ),
+ accountServicerReference = it.getString("account_servicer_reference"),
+ endToEndId = it.getString("end_to_end_id"),
+ direction = direction,
+ bankAccountId = it.getLong("bank_account_id"),
+ paymentInformationId = it.getString("payment_information_id"),
+ subject = it.getString("subject"),
+ transactionDate = it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank(),
+ dbRowId = it.getLong("bank_transaction_id")
+ ))
+ } while (it.next())
+ return ret
+ }
+ }
+
+ /** Wait for more transactions, return true if new transactions have appeared */
+ suspend fun pool(start: Long, delta: Long): Boolean {
+ if (poll_ms <= 0 || delta == 0L)
+ return false;
+ var remaining = abs(delta);
+ do {
+ val pollStart = System.currentTimeMillis()
+ conn.getNotifications(poll_ms.toInt()).forEach {
+ val id = it.parameter.toLong()
+ val new = if (delta < 0) id < start else id > start
+ if (new) remaining -= 1
+ }
+ val pollEnd = System.currentTimeMillis()
+ poll_ms -= pollEnd - pollStart
+ } while (poll_ms > 0 && remaining > 0L)
+ return true
+ }
+
+ override fun close() {
+ conn.execSQLUpdate("UNLISTEN $channel");
+ conn.close()
+ }
+} \ No newline at end of file