libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 400e444b4af2f0990df1d842087c38c351a60a82
parent d30f3a9fcd1c6f930214b649276ea283145141d8
Author: Antoine A <>
Date:   Tue, 10 Oct 2023 00:39:53 +0000

Use dedicated tables for Taler exchange transactions

Diffstat:
Mbank/src/main/kotlin/tech/libeufin/bank/Database.kt | 367++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
Mbank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt | 48++++++++++--------------------------------------
Mbank/src/test/kotlin/TalerApiTest.kt | 5++---
Mdatabase-versioning/libeufin-bank-0001.sql | 21++++++++++++---------
Mdatabase-versioning/procedures.sql | 44+++++++++++++++++++-------------------------
5 files changed, 285 insertions(+), 200 deletions(-)

diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt @@ -82,6 +82,20 @@ private fun PGSimpleDataSource.pgConnection(): PgConnection { return conn } +private fun <R> PgConnection.transaction(lambda: (PgConnection) -> R): R { + try { + setAutoCommit(false); + val result = lambda(this) + commit(); + setAutoCommit(true); + return result + } catch(e: Exception){ + rollback(); + setAutoCommit(true); + throw e; + } +} + fun initializeDatabaseTables(dbConfig: String, sqlDir: String) { logger.info("doing DB initialization, sqldir $sqlDir, dbConfig $dbConfig") pgDataSource(dbConfig).pgConnection().use { conn -> @@ -127,7 +141,11 @@ fun resetDatabaseTables(dbConfig: String, sqlDir: String) { } val sqlDrop = File("$sqlDir/libeufin-bank-drop.sql").readText() + try { conn.execSQLUpdate(sqlDrop) + } catch (e: Exception) { + + } } } @@ -194,9 +212,9 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos dbPool.close() } - private fun <R> conn(lambda: (Connection) -> R): R { + private fun <R> conn(lambda: (PgConnection) -> R): R { val conn = dbPool.getConnection() - return conn.use(lambda) + return conn.use{ it -> lambda(it.unwrap(PgConnection::class.java)) } } // CUSTOMERS @@ -687,38 +705,92 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos fun bankTransactionCreate( tx: BankInternalTransaction ): BankTransactionResult = conn { conn -> - // TODO register incoming transaction if creditor is taler exchange and subject is well formed else bounce - // TODO register outgoing transaction if debitor is taler exchange else ignore - val stmt = conn.prepareStatement(""" - SELECT out_nx_creditor, out_nx_debtor, out_balance_insufficient - FROM bank_wire_transfer(?,?,TEXT(?),(?,?)::taler_amount,?,TEXT(?),TEXT(?),TEXT(?)) - """ - ) - stmt.setLong(1, tx.creditorAccountId) - stmt.setLong(2, tx.debtorAccountId) - stmt.setString(3, tx.subject) - stmt.setLong(4, tx.amount.value) - stmt.setInt(5, tx.amount.frac) - stmt.setLong(6, tx.transactionDate.toDbMicros() ?: throw faultyTimestampByBank()) - stmt.setString(7, tx.accountServicerReference) - stmt.setString(8, tx.paymentInformationId) - stmt.setString(9, tx.endToEndId) - stmt.executeQuery().use { - when { - !it.next() -> throw internalServerError("Bank transaction didn't properly return") - it.getBoolean("out_nx_debtor") -> { - logger.error("No debtor account found") - BankTransactionResult.NO_DEBTOR - } - it.getBoolean("out_nx_creditor") -> { - logger.error("No creditor account found") - BankTransactionResult.NO_CREDITOR - } - it.getBoolean("out_balance_insufficient") -> { - logger.error("Balance insufficient") - BankTransactionResult.CONFLICT + conn.transaction { + val stmt = conn.prepareStatement(""" + SELECT + out_nx_creditor + ,out_nx_debtor + ,out_balance_insufficient + ,out_credit_row_id + ,out_debit_row_id + ,out_creditor_is_exchange + ,out_debtor_is_exchange + FROM bank_wire_transfer(?,?,TEXT(?),(?,?)::taler_amount,?,TEXT(?),TEXT(?),TEXT(?)) + """ + ) + stmt.setLong(1, tx.creditorAccountId) + stmt.setLong(2, tx.debtorAccountId) + stmt.setString(3, tx.subject) + stmt.setLong(4, tx.amount.value) + stmt.setInt(5, tx.amount.frac) + stmt.setLong(6, tx.transactionDate.toDbMicros() ?: throw faultyTimestampByBank()) + stmt.setString(7, tx.accountServicerReference) + stmt.setString(8, tx.paymentInformationId) + stmt.setString(9, tx.endToEndId) + stmt.executeQuery().use { + when { + !it.next() -> throw internalServerError("Bank transaction didn't properly return") + it.getBoolean("out_nx_debtor") -> { + logger.error("No debtor account found") + BankTransactionResult.NO_DEBTOR + } + it.getBoolean("out_nx_creditor") -> { + logger.error("No creditor account found") + BankTransactionResult.NO_CREDITOR + } + it.getBoolean("out_balance_insufficient") -> { + logger.error("Balance insufficient") + BankTransactionResult.CONFLICT + } + else -> { + if (it.getBoolean("out_creditor_is_exchange")) { + // Parse subject + val reservePub = try { + EddsaPublicKey(tx.subject) + } catch (e: Exception) { + null + } + if (reservePub != null) { + val rowId = it.getLong("out_credit_row_id") + val stmt = conn.prepareStatement(""" + INSERT INTO taler_exchange_incoming + (reserve_pub, bank_transaction) + VALUES (?, ?) + """) + stmt.setString(1, reservePub.encoded) + stmt.setLong(2, rowId) + stmt.executeUpdate() + conn.execSQLUpdate("NOTIFY incoming_tx, '${"${tx.creditorAccountId} $rowId"}'") + } else { + // TODO bounce + } + } else if (it.getBoolean("out_debtor_is_exchange")) { + // Parse subject + val metadata = try { + val split = tx.subject.split(" ", limit=2) ; + Pair(ShortHashCode(split[0]), split[2]) + } catch (e: Exception) { + null + } + if (metadata != null) { + val rowId = it.getLong("out_debit_row_id") + val stmt = conn.prepareStatement(""" + INSERT INTO taler_exchange_outgoing + (wtid, exchange_base_url, bank_transaction) + VALUES (?, ?, ?) + """) + stmt.setString(1, metadata.first.encoded) + stmt.setString(2, metadata.second) + stmt.setLong(3, rowId) + stmt.executeUpdate() + conn.execSQLUpdate("NOTIFY outgoing_tx, '${"${tx.debtorAccountId} $rowId"}'") + } else { + // TODO log ? + } + } + BankTransactionResult.SUCCESS + } } - else -> BankTransactionResult.SUCCESS } } } @@ -789,11 +861,12 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos } } - suspend fun <T> bankTransactionPoolHistory( + private suspend fun <T> poolHistory( params: HistoryParams, - bankAccountId: Long, - direction: TransactionDirection, - map: (BankAccountTransaction) -> T? + bankAccountId: Long, + listen: suspend NotificationWatcher.(Long, suspend (Flow<Notification>) -> Unit) -> Unit, + query: String, + map: (ResultSet) -> T ): List<T> { var start = params.start var delta = params.delta @@ -809,85 +882,33 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos // Prepare statement val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC") val stmt = conn.prepareStatement(""" - SELECT - creditor_payto_uri - ,creditor_name - ,debtor_payto_uri - ,debtor_name - ,subject - ,(amount).val AS amount_val - ,(amount).frac AS amount_frac - ,transaction_date - ,account_servicer_reference - ,payment_information_id - ,end_to_end_id - ,bank_account_id - ,bank_transaction_id - FROM bank_account_transactions + $query WHERE bank_transaction_id ${cmpOp} ? AND bank_account_id=? - AND direction=?::direction_enum ORDER BY bank_transaction_id ${orderBy} LIMIT ? """) stmt.setLong(2, bankAccountId) - stmt.setString(3, direction.name) - fun bankTransactionGetHistory(): List<BankAccountTransaction> { + fun load() { stmt.setLong(1, start) - stmt.setLong(4, abs(delta)) - return stmt.all { - BankAccountTransaction( - creditorPaytoUri = it.getString("creditor_payto_uri"), - creditorName = it.getString("creditor_name"), - debtorPaytoUri = it.getString("debtor_payto_uri"), - debtorName = it.getString("debtor_name"), - amount = TalerAmount( - it.getLong("amount_val"), - it.getInt("amount_frac"), - getCurrency() - ), - accountServicerReference = it.getString("account_servicer_reference"), - endToEndId = it.getString("end_to_end_id"), - direction = direction, - bankAccountId = it.getLong("bank_account_id"), - paymentInformationId = it.getString("payment_information_id"), - subject = it.getString("subject"), - transactionDate = it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank(), - dbRowId = it.getLong("bank_transaction_id") - ) - } - } - - fun loadBankHistory() { - while (delta != 0L) { - val history = bankTransactionGetHistory() - if (history.isEmpty()) - break; - history.forEach { - val item = map(it); - // Advance cursor - start = it.expectRowId() - - if (item != null) { - items.add(item) - // Reduce delta - if (delta < 0) delta++ else delta--; - } - } - } + stmt.setLong(3, abs(delta)) + items.addAll(stmt.all { + start = it.getLong("bank_transaction_id") + if (delta < 0) delta ++ else delta -- + map(it) + }) } // Start expensive listening process only if we intend to poll if (poll_ms > 0) { - notifWatcher.listen(NotificationTopic(bankAccountId, direction)) { flow -> + notifWatcher.(listen)(bankAccountId) { flow -> // Start buffering notification to not miss any val buffered = flow.buffer() // Initial load - loadBankHistory() + load() // Long polling while necessary - while (delta != 0L && poll_ms > 0) { - val pollStart = System.currentTimeMillis() + if (delta != 0L) { withTimeoutOrNull(poll_ms) { buffered.filter { when { @@ -897,8 +918,6 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos } }.take(abs(delta).toInt()).count() } - val pollEnd = System.currentTimeMillis() - poll_ms -= pollEnd - pollStart // If going backward without a starting point, we reset loading progress if (params.start == Long.MAX_VALUE) { @@ -906,17 +925,83 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos delta = params.delta items.clear() } - loadBankHistory() + load() } } } else { - loadBankHistory() + load() } } return items.toList(); } + suspend fun exchangeIncomingPoolHistory( + params: HistoryParams, + bankAccountId: Long + ): List<IncomingReserveTransaction> { + return poolHistory(params, bankAccountId, NotificationWatcher::listenIncoming, """ + SELECT + bank_transaction_id + ,transaction_date + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,debtor_payto_uri + ,reserve_pub + FROM taler_exchange_incoming AS tfr + JOIN bank_account_transactions AS txs + ON bank_transaction=txs.bank_transaction_id + """) { + IncomingReserveTransaction( + row_id = it.getLong("bank_transaction_id"), + date = TalerProtocolTimestamp( + it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank() + ), + amount = TalerAmount( + it.getLong("amount_val"), + it.getInt("amount_frac"), + getCurrency() + ), + debit_account = it.getString("debtor_payto_uri"), + reserve_pub = EddsaPublicKey(it.getString("reserve_pub")), + ) + } + } + + suspend fun exchangeOutgoingPoolHistory( + params: HistoryParams, + bankAccountId: Long + ): List<OutgoingTransaction> { + return poolHistory(params, bankAccountId, NotificationWatcher::listenOutgoing, """ + SELECT + bank_transaction_id + ,transaction_date + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,creditor_payto_uri + ,wtid + ,exchange_base_url + FROM taler_exchange_outgoing AS tfr + JOIN bank_account_transactions AS txs + ON bank_transaction=txs.bank_transaction_id + """) { + OutgoingTransaction( + row_id = it.getLong("bank_transaction_id"), + date = TalerProtocolTimestamp( + it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank() + ), + amount = TalerAmount( + it.getLong("amount_val"), + it.getInt("amount_frac"), + getCurrency() + ), + credit_account = it.getString("creditor_payto_uri"), + wtid = ShortHashCode(it.getString("wtid")), + exchange_base_url = it.getString("exchange_base_url") + ) + } + } + /** * The following function returns the list of transactions, according * to the history parameters. The parameters take at least the 'start' @@ -1328,12 +1413,12 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos SELECT wtid ,exchange_base_url - ,(tfr.amount).val AS amount_value - ,(tfr.amount).frac AS amount_frac - ,tfr.credit_account_payto + ,(txs.amount).val AS amount_value + ,(txs.amount).frac AS amount_frac + ,txs.creditor_payto_uri ,tfr.bank_transaction ,txs.transaction_date AS timestamp - FROM taler_exchange_transfers AS tfr + FROM taler_exchange_outgoing AS tfr JOIN bank_account_transactions AS txs ON bank_transaction=txs.bank_transaction_id WHERE request_uid = ?; @@ -1347,7 +1432,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos frac = it.getInt("amount_frac"), getCurrency() ), - creditAccount = it.getString("credit_account_payto"), + creditAccount = it.getString("creditor_payto_uri"), exchangeBaseUrl = it.getString("exchange_base_url"), requestUid = requestUid, debitTxRowId = it.getLong("bank_transaction"), @@ -1441,12 +1526,13 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos } private data class Notification(val rowId: Long) -private data class NotificationTopic(val account: Long, val direction: TransactionDirection) private class NotificationWatcher(private val pgSource: PGSimpleDataSource) { private class CountedSharedFlow(val flow: MutableSharedFlow<Notification>, var count: Int) - private val bankTxFlows = ConcurrentHashMap<NotificationTopic, CountedSharedFlow>() + private val bankTxFlows = ConcurrentHashMap<Long, CountedSharedFlow>() + private val outgoingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow>() + private val incomingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow>() init { kotlin.concurrent.thread(isDaemon = true) { @@ -1455,20 +1541,39 @@ private class NotificationWatcher(private val pgSource: PGSimpleDataSource) { try { val conn = pgSource.pgConnection() conn.execSQLUpdate("LISTEN bank_tx") + conn.execSQLUpdate("LISTEN outgoing_tx") + conn.execSQLUpdate("LISTEN incoming_tx") while (true) { conn.getNotifications().forEach { - val info = it.parameter.split(' ', limit = 4).map { it.toLong() } - val debtorAccount = info[0]; - val creditorAccount = info[1]; - val debitRow = info[2]; - val creditRow = info[3]; - - bankTxFlows.get(NotificationTopic(debtorAccount, TransactionDirection.debit))?.run { - flow.emit(Notification(debitRow)) - } - bankTxFlows.get(NotificationTopic(creditorAccount, TransactionDirection.credit))?.run { - flow.emit(Notification(creditRow)) + if (it.name == "bank_tx") { + val info = it.parameter.split(' ', limit = 4).map { it.toLong() } + val debtorAccount = info[0]; + val creditorAccount = info[1]; + val debitRow = info[2]; + val creditRow = info[3]; + + bankTxFlows.get(debtorAccount)?.run { + flow.emit(Notification(debitRow)) + flow.emit(Notification(creditRow)) + } + bankTxFlows.get(creditorAccount)?.run { + flow.emit(Notification(debitRow)) + flow.emit(Notification(creditRow)) + } + } else { + val info = it.parameter.split(' ', limit = 2).map { it.toLong() } + val account = info[0]; + val row = info[1]; + if (it.name == "outgoing_tx") { + outgoingTxFlows.get(account)?.run { + flow.emit(Notification(row)) + } + } else { + incomingTxFlows.get(account)?.run { + flow.emit(Notification(row)) + } + } } } } @@ -1480,9 +1585,9 @@ private class NotificationWatcher(private val pgSource: PGSimpleDataSource) { } } - suspend fun listen(topic: NotificationTopic, lambda: suspend (Flow<Notification>) -> Unit) { + private suspend fun listen(map: ConcurrentHashMap<Long, CountedSharedFlow>, account: Long, lambda: suspend (Flow<Notification>) -> Unit) { // Register listener - val flow = bankTxFlows.compute(topic) { _, v -> + val flow = map.compute(account) { _, v -> val tmp = v ?: CountedSharedFlow(MutableSharedFlow(), 0); tmp.count++; tmp @@ -1492,11 +1597,23 @@ private class NotificationWatcher(private val pgSource: PGSimpleDataSource) { lambda(flow) } finally { // Unregister listener - bankTxFlows.compute(topic) { _, v -> + map.compute(account) { _, v -> v!!; v.count--; if (v.count > 0) v else null } } + } + + suspend fun listenBank(account: Long, lambda: suspend (Flow<Notification>) -> Unit) { + listen(bankTxFlows, account, lambda) + } + + suspend fun listenOutgoing(account: Long, lambda: suspend (Flow<Notification>) -> Unit) { + listen(outgoingTxFlows, account, lambda) + } + + suspend fun listenIncoming(account: Long, lambda: suspend (Flow<Notification>) -> Unit) { + listen(incomingTxFlows, account, lambda) } } \ No newline at end of file diff --git a/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt b/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt @@ -120,13 +120,17 @@ fun Routing.talerWireGatewayHandlers(db: Database, ctx: BankApplicationContext) return@post } - suspend fun <T> historyEndpoint(call: ApplicationCall, direction: TransactionDirection, reduce: (List<T>, String) -> Any, map: (BankAccountTransaction) -> T?) { + suspend fun <T> historyEndpoint( + call: ApplicationCall, + reduce: (List<T>, String) -> Any, + dbLambda: suspend Database.(HistoryParams, Long) -> List<T> + ) { call.authCheck(TokenScope.readonly, true) val params = getHistoryParams(call.request.queryParameters) val bankAccount = call.bankAccount() if (!bankAccount.isTalerExchange) throw forbidden("History is not related to a Taler exchange.") - val items = db.bankTransactionPoolHistory(params, bankAccount.expectRowId(), direction, map); + val items = db.dbLambda(params, bankAccount.expectRowId()); if (items.isEmpty()) { call.respond(HttpStatusCode.NoContent) @@ -136,45 +140,11 @@ fun Routing.talerWireGatewayHandlers(db: Database, ctx: BankApplicationContext) } get("/accounts/{USERNAME}/taler-wire-gateway/history/incoming") { - historyEndpoint(call, TransactionDirection.credit, ::IncomingHistory) { - try { - val reservePub = EddsaPublicKey(it.subject) - IncomingReserveTransaction( - row_id = it.expectRowId(), - amount = it.amount, - date = TalerProtocolTimestamp(it.transactionDate), - debit_account = it.debtorPaytoUri, - reserve_pub = reservePub - ) - } catch (e: Exception) { - // This should usually not happen in the first place, - // because transactions to the exchange without a valid - // reserve pub should be bounced. - logger.warn("Invalid incoming transaction ${it.expectRowId()}: ${it.subject}") - null - } - } + historyEndpoint(call, ::IncomingHistory, Database::exchangeIncomingPoolHistory) } get("/accounts/{USERNAME}/taler-wire-gateway/history/outgoing") { - historyEndpoint(call, TransactionDirection.debit, ::OutgoingHistory) { - try { - val split = it.subject.split(" ") - OutgoingTransaction( - row_id = it.expectRowId(), - date = TalerProtocolTimestamp(it.transactionDate), - amount = it.amount, - credit_account = it.creditorPaytoUri, - wtid = ShortHashCode(split[0]), - exchange_base_url = split[1] - ) - } catch (e: Exception) { - // This should usually not happen in the first place, - // because transactions from the exchange should be well formed - logger.warn("Invalid outgoing transaction ${it.expectRowId()}: ${it.subject}") - null - } - } + historyEndpoint(call, ::OutgoingHistory, Database::exchangeOutgoingPoolHistory) } post("/accounts/{USERNAME}/taler-wire-gateway/admin/add-incoming") { @@ -186,6 +156,8 @@ fun Routing.talerWireGatewayHandlers(db: Database, ctx: BankApplicationContext) "Currency mismatch", TalerErrorCode.TALER_EC_GENERIC_CURRENCY_MISMATCH ) + + // TODO check conflict in transaction if (db.bankTransactionCheckExists(req.reserve_pub.encoded) != null) throw conflict( "Reserve pub. already used", diff --git a/bank/src/test/kotlin/TalerApiTest.kt b/bank/src/test/kotlin/TalerApiTest.kt @@ -328,10 +328,9 @@ class TalerApiTest { ) } - // Testing ranges. - val mockReservePub = randShortHashCode().encoded + // Testing ranges. repeat(300) { - db.bankTransactionCreate(genTx(mockReservePub)).assertSuccess() + db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess() } // forward range: diff --git a/database-versioning/libeufin-bank-0001.sql b/database-versioning/libeufin-bank-0001.sql @@ -358,22 +358,25 @@ CREATE TABLE IF NOT EXISTS bank_account_statements -- end of: accounts activity report -- start of: Taler integration -CREATE TABLE IF NOT EXISTS taler_exchange_transfers - (exchange_transfer_id BIGINT GENERATED BY DEFAULT AS IDENTITY - ,request_uid TEXT NOT NULL UNIQUE +CREATE TABLE IF NOT EXISTS taler_exchange_outgoing + (exchange_outgoing_id BIGINT GENERATED BY DEFAULT AS IDENTITY + ,request_uid TEXT UNIQUE DEFAULT NULL ,wtid TEXT NOT NULL UNIQUE ,exchange_base_url TEXT NOT NULL - ,credit_account_payto TEXT NOT NULL - ,amount taler_amount NOT NULL ,bank_transaction BIGINT UNIQUE NOT NULL REFERENCES bank_account_transactions(bank_transaction_id) ON DELETE RESTRICT ON UPDATE RESTRICT ); -COMMENT ON TABLE taler_exchange_transfers - IS 'Tracks all the requests made by Taler exchanges to pay merchants'; -COMMENT ON COLUMN taler_exchange_transfers.bank_transaction - IS 'Reference to the (outgoing) bank transaction that finalizes the exchange transfer request.'; + +CREATE TABLE IF NOT EXISTS taler_exchange_incoming + (exchange_incoming_id BIGINT GENERATED BY DEFAULT AS IDENTITY + ,reserve_pub TEXT NOT NULL UNIQUE + ,bank_transaction BIGINT UNIQUE NOT NULL + REFERENCES bank_account_transactions(bank_transaction_id) + ON DELETE RESTRICT + ON UPDATE RESTRICT + ); CREATE TABLE IF NOT EXISTS taler_withdrawal_operations (taler_withdrawal_id BIGINT GENERATED BY DEFAULT AS IDENTITY diff --git a/database-versioning/procedures.sql b/database-versioning/procedures.sql @@ -28,7 +28,6 @@ BEGIN THEN RAISE EXCEPTION 'addition overflow'; END IF; - RETURN; END $$; COMMENT ON PROCEDURE amount_add IS 'Returns the normalized sum of two amounts. It raises an exception when the resulting .val is larger than 2^52'; @@ -64,7 +63,6 @@ ELSE ok = FALSE; END IF; END IF; -RETURN; END $$; COMMENT ON FUNCTION amount_left_minus_right IS 'Subtracts the right amount from the left and returns the difference and TRUE, if the left amount is larger than the right, or an invalid amount and FALSE otherwise.'; @@ -215,10 +213,8 @@ CREATE OR REPLACE FUNCTION taler_transfer( LANGUAGE plpgsql AS $$ DECLARE -maybe_balance_insufficient BOOLEAN; receiver_bank_account_id BIGINT; payment_subject TEXT; -exchange_debit_tx_id BIGINT; BEGIN -- First creating the bank transaction, then updating @@ -242,8 +238,8 @@ SELECT out_balance_insufficient, out_debit_row_id INTO - maybe_balance_insufficient, - exchange_debit_tx_id + out_exchange_balance_insufficient, + out_tx_row_id FROM bank_wire_transfer( receiver_bank_account_id, in_exchange_bank_account_id, @@ -254,29 +250,23 @@ SELECT in_payment_information_id, in_end_to_end_id ); -IF (maybe_balance_insufficient) -THEN - out_exchange_balance_insufficient=TRUE; +IF out_exchange_balance_insufficient THEN RETURN; END IF; -out_exchange_balance_insufficient=FALSE; INSERT - INTO taler_exchange_transfers ( + INTO taler_exchange_outgoing ( request_uid, wtid, exchange_base_url, - credit_account_payto, - amount, bank_transaction ) VALUES ( in_request_uid, in_wtid, in_exchange_base_url, - in_credit_account_payto, - in_amount, - exchange_debit_tx_id + out_tx_row_id ); -out_tx_row_id = exchange_debit_tx_id; +-- notify new transaction +PERFORM pg_notify('outgoing_tx', in_exchange_bank_account_id || ' ' || out_tx_row_id); END $$; COMMENT ON FUNCTION taler_transfer( text, @@ -397,7 +387,9 @@ CREATE OR REPLACE FUNCTION bank_wire_transfer( OUT out_nx_debtor BOOLEAN, OUT out_balance_insufficient BOOLEAN, OUT out_credit_row_id BIGINT, - OUT out_debit_row_id BIGINT + OUT out_debit_row_id BIGINT, + OUT out_creditor_is_exchange BOOLEAN, + OUT out_debtor_is_exchange BOOLEAN ) LANGUAGE plpgsql AS $$ @@ -428,12 +420,14 @@ SELECT has_debt, (balance).val, (balance).frac, (max_debt).val, (max_debt).frac, - internal_payto_uri, customers.name + internal_payto_uri, customers.name, + is_taler_exchange INTO debtor_has_debt, debtor_balance.val, debtor_balance.frac, debtor_max_debt.val, debtor_max_debt.frac, - debtor_payto_uri, debtor_name + debtor_payto_uri, debtor_name, + out_debtor_is_exchange FROM bank_accounts JOIN customers ON (bank_accounts.owning_customer_id = customers.customer_id) WHERE bank_account_id=in_debtor_account_id; @@ -448,11 +442,13 @@ out_nx_debtor=FALSE; SELECT has_debt, (balance).val, (balance).frac, - internal_payto_uri, customers.name + internal_payto_uri, customers.name, + is_taler_exchange INTO creditor_has_debt, creditor_balance.val, creditor_balance.frac, - creditor_payto_uri, creditor_name + creditor_payto_uri, creditor_name, + out_creditor_is_exchange FROM bank_accounts JOIN customers ON (bank_accounts.owning_customer_id = customers.customer_id) WHERE bank_account_id=in_creditor_account_id; @@ -625,9 +621,7 @@ SET WHERE bank_account_id=in_creditor_account_id; -- notify new transaction -PERFORM pg_notify('bank_tx', in_debtor_account_id || ' ' || in_creditor_account_id || ' ' || out_debit_row_id || ' ' || out_credit_row_id) - -RETURN; +PERFORM pg_notify('bank_tx', in_debtor_account_id || ' ' || in_creditor_account_id || ' ' || out_debit_row_id || ' ' || out_credit_row_id); END $$; CREATE OR REPLACE FUNCTION cashout_delete(