diff options
author | Antoine A <> | 2024-04-26 15:17:14 +0900 |
---|---|---|
committer | Antoine A <> | 2024-04-26 15:17:14 +0900 |
commit | 71050ab44fbccb970ec530383cdeef42ca0cf928 (patch) | |
tree | 43236da27bd1f2c21c62f48c4105cf4db7e59345 /nexus/src/main | |
parent | c73f750472139b7ec872c1bf16a284de143ef998 (diff) | |
download | libeufin-71050ab44fbccb970ec530383cdeef42ca0cf928.tar.gz libeufin-71050ab44fbccb970ec530383cdeef42ca0cf928.tar.bz2 libeufin-71050ab44fbccb970ec530383cdeef42ca0cf928.zip |
nexus: wire gateway /history/incoming
Diffstat (limited to 'nexus/src/main')
7 files changed, 67 insertions, 21 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt index 59094204..eb1bac91 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt @@ -52,6 +52,8 @@ class NexusConfig(val config: TalerConfig) { bic = requireString("bic"), name = requireString("name") ) + /** Bank account payto */ + val payto = IbanPayto.build(account.iban, account.bic, account.name) /** Path where we store the bank public keys */ val bankPublicKeysPath = requirePath("bank_public_keys_file") /** Path where we store our private keys */ diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt index 96710648..e5f18595 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt @@ -366,7 +366,7 @@ class EbicsFetch: CliktCommand("Fetches EBICS files") { val cfg = extractEbicsConfig(common.config) val dbCfg = cfg.config.dbConfig() - Database(dbCfg).use { db -> + Database(dbCfg, cfg.currency).use { db -> val (clientKeys, bankKeys) = expectFullKeys(cfg) val ctx = FetchContext( cfg, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt index 8bde6d60..947ecab6 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt @@ -65,7 +65,7 @@ data class SubmissionContext( private suspend fun submitInitiatedPayment( ctx: SubmissionContext, payment: InitiatedPayment -): String { +): String { val creditAccount = try { val payto = Payto.parse(payment.creditPaytoUri).expectIban() IbanAccountMetadata( @@ -157,7 +157,7 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat httpClient = HttpClient(), fileLogger = FileLogger(ebicsLog) ) - Database(dbCfg).use { db -> + Database(dbCfg, cfg.currency).use { db -> val frequency: Duration = if (transient) { logger.info("Transient mode: submitting what found and returning.") Duration.ZERO diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt index b3153a8e..387bf68b 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt @@ -130,7 +130,7 @@ class InitiatePayment: CliktCommand("Initiate an outgoing payment") { Base32Crockford.encode(bytes) } - Database(dbCfg).use { db -> + Database(dbCfg, currency).use { db -> db.initiated.create( InitiatedPayment( id = -1, @@ -273,7 +273,7 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") { Base32Crockford.encode(bytes) } - Database(dbCfg).use { db -> + Database(dbCfg, currency).use { db -> ingestIncomingPayment(db, IncomingPayment( amount = amount, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt index 54dae351..7879522c 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt @@ -66,26 +66,22 @@ fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) { ) } } - /* suspend fun <T> PipelineContext<Unit, ApplicationCall>.historyEndpoint( reduce: (List<T>, String) -> Any, - dbLambda: suspend ExchangeDAO.(HistoryParams, Long, BankPaytoCtx) -> List<T> + dbLambda: suspend ExchangeDAO.(HistoryParams) -> List<T> ) { val params = HistoryParams.extract(context.request.queryParameters) - val bankAccount = call.bankInfo(db, ctx.payto) - - val items = db.exchange.dbLambda(params, bankAccount.bankAccountId, ctx.payto) - val + val items = db.exchange.dbLambda(params) if (items.isEmpty()) { call.respond(HttpStatusCode.NoContent) } else { - call.respond(reduce(items, bankAccount.payto)) + call.respond(reduce(items, cfg.payto)) } } get("/taler-wire-gateway/history/incoming") { historyEndpoint(::IncomingHistory, ExchangeDAO::incomingHistory) } - get("/taler-wire-gateway/history/outgoing") { + /*get("/taler-wire-gateway/history/outgoing") { historyEndpoint(::OutgoingHistory, ExchangeDAO::outgoingHistory) }*/ post("/taler-wire-gateway/admin/add-incoming") { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt index 4cc70452..01c512ef 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt @@ -18,9 +18,12 @@ */ package tech.libeufin.nexus.db +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.slf4j.Logger +import org.slf4j.LoggerFactory import tech.libeufin.common.TalerAmount -import tech.libeufin.common.db.DatabaseConfig -import tech.libeufin.common.db.DbPool +import tech.libeufin.common.db.* import java.time.Instant /** @@ -39,8 +42,31 @@ data class InitiatedPayment( /** * Collects database connection steps and any operation on the Nexus tables. */ -class Database(dbConfig: DatabaseConfig): DbPool(dbConfig, "libeufin_nexus") { +class Database(dbConfig: DatabaseConfig, val bankCurrency: String): DbPool(dbConfig, "libeufin_nexus") { val payment = PaymentDAO(this) val initiated = InitiatedDAO(this) val exchange = ExchangeDAO(this) + + private val outgoingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow() + private val incomingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow() + + init { + watchNotifications(pgSource, "libeufin_nexus", LoggerFactory.getLogger("libeufin-nexus-db-watcher"), mapOf( + "outgoing_tx" to { + val id = it.toLong() + outgoingTxFlows.emit(id) + }, + "incoming_tx" to { + val id = it.toLong() + incomingTxFlows.emit(id) + } + )) + } + + /** Listen for new taler outgoing transactions */ + suspend fun <R> listenOutgoing(lambda: suspend (Flow<Long>) -> R): R + = lambda(outgoingTxFlows as Flow<Long>) + /** Listen for new taler incoming transactions */ + suspend fun <R> listenIncoming(lambda: suspend (Flow<Long>) -> R): R + = lambda(incomingTxFlows as Flow<Long>) }
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt index d3844167..6d65e444 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt @@ -19,16 +19,38 @@ package tech.libeufin.nexus.db -import tech.libeufin.common.db.one -import tech.libeufin.common.db.getTalerTimestamp -import tech.libeufin.common.micros -import tech.libeufin.common.TalerProtocolTimestamp -import tech.libeufin.common.TransferRequest +import tech.libeufin.common.db.* +import tech.libeufin.common.* import java.sql.ResultSet import java.time.Instant /** Data access logic for exchange specific logic */ class ExchangeDAO(private val db: Database) { + /** Query history of taler incoming transactions */ + suspend fun incomingHistory( + params: HistoryParams + ): List<IncomingReserveTransaction> + = db.poolHistoryGlobal(params, db::listenIncoming, """ + SELECT + tit.incoming_transaction_id + ,execution_time + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,debit_payto_uri + ,reserve_public_key + FROM talerable_incoming_transactions AS tit + JOIN incoming_transactions AS it + ON tit.incoming_transaction_id=it.incoming_transaction_id + WHERE + """, "tit.incoming_transaction_id") { + IncomingReserveTransaction( + row_id = it.getLong("incoming_transaction_id"), + date = it.getTalerTimestamp("execution_time"), + amount = it.getAmount("amount", db.bankCurrency), + debit_account = it.getString("debit_payto_uri"), + reserve_pub = EddsaPublicKey(it.getBytes("reserve_public_key")), + ) + } /** Result of taler transfer transaction creation */ sealed interface TransferResult { |