aboutsummaryrefslogtreecommitdiff
path: root/nexus/src/main
diff options
context:
space:
mode:
authorAntoine A <>2024-04-26 15:17:14 +0900
committerAntoine A <>2024-04-26 15:17:14 +0900
commit71050ab44fbccb970ec530383cdeef42ca0cf928 (patch)
tree43236da27bd1f2c21c62f48c4105cf4db7e59345 /nexus/src/main
parentc73f750472139b7ec872c1bf16a284de143ef998 (diff)
downloadlibeufin-71050ab44fbccb970ec530383cdeef42ca0cf928.tar.gz
libeufin-71050ab44fbccb970ec530383cdeef42ca0cf928.tar.bz2
libeufin-71050ab44fbccb970ec530383cdeef42ca0cf928.zip
nexus: wire gateway /history/incoming
Diffstat (limited to 'nexus/src/main')
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt2
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt2
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt4
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt4
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt12
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt32
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt32
7 files changed, 67 insertions, 21 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt
index 59094204..eb1bac91 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt
@@ -52,6 +52,8 @@ class NexusConfig(val config: TalerConfig) {
bic = requireString("bic"),
name = requireString("name")
)
+ /** Bank account payto */
+ val payto = IbanPayto.build(account.iban, account.bic, account.name)
/** Path where we store the bank public keys */
val bankPublicKeysPath = requirePath("bank_public_keys_file")
/** Path where we store our private keys */
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
index 96710648..e5f18595 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
@@ -366,7 +366,7 @@ class EbicsFetch: CliktCommand("Fetches EBICS files") {
val cfg = extractEbicsConfig(common.config)
val dbCfg = cfg.config.dbConfig()
- Database(dbCfg).use { db ->
+ Database(dbCfg, cfg.currency).use { db ->
val (clientKeys, bankKeys) = expectFullKeys(cfg)
val ctx = FetchContext(
cfg,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
index 8bde6d60..947ecab6 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
@@ -65,7 +65,7 @@ data class SubmissionContext(
private suspend fun submitInitiatedPayment(
ctx: SubmissionContext,
payment: InitiatedPayment
-): String {
+): String {
val creditAccount = try {
val payto = Payto.parse(payment.creditPaytoUri).expectIban()
IbanAccountMetadata(
@@ -157,7 +157,7 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat
httpClient = HttpClient(),
fileLogger = FileLogger(ebicsLog)
)
- Database(dbCfg).use { db ->
+ Database(dbCfg, cfg.currency).use { db ->
val frequency: Duration = if (transient) {
logger.info("Transient mode: submitting what found and returning.")
Duration.ZERO
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index b3153a8e..387bf68b 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -130,7 +130,7 @@ class InitiatePayment: CliktCommand("Initiate an outgoing payment") {
Base32Crockford.encode(bytes)
}
- Database(dbCfg).use { db ->
+ Database(dbCfg, currency).use { db ->
db.initiated.create(
InitiatedPayment(
id = -1,
@@ -273,7 +273,7 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") {
Base32Crockford.encode(bytes)
}
- Database(dbCfg).use { db ->
+ Database(dbCfg, currency).use { db ->
ingestIncomingPayment(db,
IncomingPayment(
amount = amount,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt
index 54dae351..7879522c 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt
@@ -66,26 +66,22 @@ fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) {
)
}
}
- /*
suspend fun <T> PipelineContext<Unit, ApplicationCall>.historyEndpoint(
reduce: (List<T>, String) -> Any,
- dbLambda: suspend ExchangeDAO.(HistoryParams, Long, BankPaytoCtx) -> List<T>
+ dbLambda: suspend ExchangeDAO.(HistoryParams) -> List<T>
) {
val params = HistoryParams.extract(context.request.queryParameters)
- val bankAccount = call.bankInfo(db, ctx.payto)
-
- val items = db.exchange.dbLambda(params, bankAccount.bankAccountId, ctx.payto)
- val
+ val items = db.exchange.dbLambda(params)
if (items.isEmpty()) {
call.respond(HttpStatusCode.NoContent)
} else {
- call.respond(reduce(items, bankAccount.payto))
+ call.respond(reduce(items, cfg.payto))
}
}
get("/taler-wire-gateway/history/incoming") {
historyEndpoint(::IncomingHistory, ExchangeDAO::incomingHistory)
}
- get("/taler-wire-gateway/history/outgoing") {
+ /*get("/taler-wire-gateway/history/outgoing") {
historyEndpoint(::OutgoingHistory, ExchangeDAO::outgoingHistory)
}*/
post("/taler-wire-gateway/admin/add-incoming") {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt
index 4cc70452..01c512ef 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt
@@ -18,9 +18,12 @@
*/
package tech.libeufin.nexus.db
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import tech.libeufin.common.TalerAmount
-import tech.libeufin.common.db.DatabaseConfig
-import tech.libeufin.common.db.DbPool
+import tech.libeufin.common.db.*
import java.time.Instant
/**
@@ -39,8 +42,31 @@ data class InitiatedPayment(
/**
* Collects database connection steps and any operation on the Nexus tables.
*/
-class Database(dbConfig: DatabaseConfig): DbPool(dbConfig, "libeufin_nexus") {
+class Database(dbConfig: DatabaseConfig, val bankCurrency: String): DbPool(dbConfig, "libeufin_nexus") {
val payment = PaymentDAO(this)
val initiated = InitiatedDAO(this)
val exchange = ExchangeDAO(this)
+
+ private val outgoingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow()
+ private val incomingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow()
+
+ init {
+ watchNotifications(pgSource, "libeufin_nexus", LoggerFactory.getLogger("libeufin-nexus-db-watcher"), mapOf(
+ "outgoing_tx" to {
+ val id = it.toLong()
+ outgoingTxFlows.emit(id)
+ },
+ "incoming_tx" to {
+ val id = it.toLong()
+ incomingTxFlows.emit(id)
+ }
+ ))
+ }
+
+ /** Listen for new taler outgoing transactions */
+ suspend fun <R> listenOutgoing(lambda: suspend (Flow<Long>) -> R): R
+ = lambda(outgoingTxFlows as Flow<Long>)
+ /** Listen for new taler incoming transactions */
+ suspend fun <R> listenIncoming(lambda: suspend (Flow<Long>) -> R): R
+ = lambda(incomingTxFlows as Flow<Long>)
} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt
index d3844167..6d65e444 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt
@@ -19,16 +19,38 @@
package tech.libeufin.nexus.db
-import tech.libeufin.common.db.one
-import tech.libeufin.common.db.getTalerTimestamp
-import tech.libeufin.common.micros
-import tech.libeufin.common.TalerProtocolTimestamp
-import tech.libeufin.common.TransferRequest
+import tech.libeufin.common.db.*
+import tech.libeufin.common.*
import java.sql.ResultSet
import java.time.Instant
/** Data access logic for exchange specific logic */
class ExchangeDAO(private val db: Database) {
+ /** Query history of taler incoming transactions */
+ suspend fun incomingHistory(
+ params: HistoryParams
+ ): List<IncomingReserveTransaction>
+ = db.poolHistoryGlobal(params, db::listenIncoming, """
+ SELECT
+ tit.incoming_transaction_id
+ ,execution_time
+ ,(amount).val AS amount_val
+ ,(amount).frac AS amount_frac
+ ,debit_payto_uri
+ ,reserve_public_key
+ FROM talerable_incoming_transactions AS tit
+ JOIN incoming_transactions AS it
+ ON tit.incoming_transaction_id=it.incoming_transaction_id
+ WHERE
+ """, "tit.incoming_transaction_id") {
+ IncomingReserveTransaction(
+ row_id = it.getLong("incoming_transaction_id"),
+ date = it.getTalerTimestamp("execution_time"),
+ amount = it.getAmount("amount", db.bankCurrency),
+ debit_account = it.getString("debit_payto_uri"),
+ reserve_pub = EddsaPublicKey(it.getBytes("reserve_public_key")),
+ )
+ }
/** Result of taler transfer transaction creation */
sealed interface TransferResult {