summaryrefslogtreecommitdiff
path: root/nexus/src/main
diff options
context:
space:
mode:
authorAntoine A <>2024-05-07 13:19:57 +0900
committerAntoine A <>2024-05-07 13:20:34 +0900
commit7243876ab11582fed2ae21eedb2a2807a2c19318 (patch)
tree4ca9694a2c0ffd295e92d8c973b49d349f75bb5e /nexus/src/main
parent5cc21948754598ab23519da826e47e6202696970 (diff)
parentef2124c9949991cd4a0fbb78356184d06564b5e0 (diff)
downloadlibeufin-7243876ab11582fed2ae21eedb2a2807a2c19318.tar.gz
libeufin-7243876ab11582fed2ae21eedb2a2807a2c19318.tar.bz2
libeufin-7243876ab11582fed2ae21eedb2a2807a2c19318.zip
Merge remote-tracking branch 'origin/v11-dev'
Diffstat (limited to 'nexus/src/main')
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt49
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt46
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt6
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt6
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt8
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt62
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/api/RevenueApi.kt45
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt62
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/api/helpers.kt65
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt41
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt128
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt15
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt65
13 files changed, 506 insertions, 92 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt
index 59094204..823ed449 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt
@@ -31,9 +31,13 @@ class NexusFetchConfig(config: TalerConfig) {
val ignoreBefore = config.lookupDate("nexus-fetch", "ignore_transactions_before")
}
+class ApiConfig(config: TalerConfig, section: String) {
+ val authMethod = config.requireAuthMethod(section)
+}
+
/** Configuration for libeufin-nexus */
class NexusConfig(val config: TalerConfig) {
- private fun requireString(option: String): String = config.requireString("nexus-ebics", option)
+ private fun requireString(option: String, type: String? = null): String = config.requireString("nexus-ebics", option, type)
private fun requirePath(option: String): Path = config.requirePath("nexus-ebics", option)
/** The bank's currency */
@@ -52,17 +56,26 @@ class NexusConfig(val config: TalerConfig) {
bic = requireString("bic"),
name = requireString("name")
)
+ /** Bank account payto */
+ val payto = IbanPayto.build(account.iban, account.bic, account.name)
/** Path where we store the bank public keys */
val bankPublicKeysPath = requirePath("bank_public_keys_file")
/** Path where we store our private keys */
val clientPrivateKeysPath = requirePath("client_private_keys_file")
val fetch = NexusFetchConfig(config)
- val dialect = when (val type = requireString("bank_dialect")) {
+ val dialect = when (val type = requireString("bank_dialect", "dialect")) {
"postfinance" -> Dialect.postfinance
"gls" -> Dialect.gls
- else -> throw TalerConfigError.invalid("dialct", "libeufin-nexus", "bank_dialect", "expected 'postfinance' or 'gls' got '$type'")
+ else -> throw TalerConfigError.invalid("bank dialect", "libeufin-nexus", "bank_dialect", "expected 'postfinance' or 'gls' got '$type'")
}
+ val accountType = when (val type = requireString("account_type", "account type")) {
+ "normal" -> AccountType.normal
+ "exchange" -> AccountType.exchange
+ else -> throw TalerConfigError.invalid("account type", "libeufin-nexus", "account_type", "expected 'normal' or 'exchange' got '$type'")
+ }
+ val wireGatewayApiCfg = config.apiConf("nexus-httpd-wire-gateway-api")
+ val revenueApiCfg = config.apiConf("nexus-httpd-revenue-api")
}
fun NexusConfig.checkCurrency(amount: TalerAmount) {
@@ -70,4 +83,34 @@ fun NexusConfig.checkCurrency(amount: TalerAmount) {
"Wrong currency: expected regional $currency got ${amount.currency}",
TalerErrorCode.GENERIC_CURRENCY_MISMATCH
)
+}
+
+fun TalerConfig.requireAuthMethod(section: String): AuthMethod {
+ return when (val method = requireString(section, "auth_method", "auth method")) {
+ "none" -> AuthMethod.None
+ "bearer-token" -> {
+ val token = requireString(section, "auth_bearer_token")
+ AuthMethod.Bearer(token)
+ }
+ else -> throw TalerConfigError.invalid("auth method target type", section, "auth_method", "expected 'bearer-token' or 'none' got '$method'")
+ }
+}
+
+fun TalerConfig.apiConf(section: String): ApiConfig? {
+ val enabled = requireBoolean(section, "enabled")
+ return if (enabled) {
+ return ApiConfig(this, section)
+ } else {
+ null
+ }
+}
+
+sealed interface AuthMethod {
+ data object None: AuthMethod
+ data class Bearer(val token: String): AuthMethod
+}
+
+enum class AccountType {
+ normal,
+ exchange
} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
index 9394e4e7..f1e85513 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
@@ -95,7 +95,10 @@ suspend fun ingestOutgoingPayment(
db: Database,
payment: OutgoingPayment
) {
- val result = db.payment.registerOutgoing(payment)
+ val metadata: Pair<ShortHashCode, ExchangeUrl>? = payment.wireTransferSubject?.let {
+ runCatching { parseOutgoingTxMetadata(it) }.getOrNull()
+ }
+ val result = db.payment.registerOutgoing(payment, metadata?.first, metadata?.second)
if (result.new) {
if (result.initiated)
logger.info("$payment")
@@ -106,8 +109,6 @@ suspend fun ingestOutgoingPayment(
}
}
-private val PATTERN = Regex("[a-z0-9A-Z]{52}")
-
/**
* Ingests an incoming payment. Stores the payment into valid talerable ones
* or bounces it, according to the subject.
@@ -117,18 +118,31 @@ private val PATTERN = Regex("[a-z0-9A-Z]{52}")
*/
suspend fun ingestIncomingPayment(
db: Database,
- payment: IncomingPayment
+ payment: IncomingPayment,
+ accountType: AccountType
) {
suspend fun bounce(msg: String) {
- val result = db.payment.registerMalformedIncoming(
- payment,
- payment.amount,
- Instant.now()
- )
- if (result.new) {
- logger.info("$payment bounced in '${result.bounceId}': $msg")
- } else {
- logger.debug("$payment already seen and bounced in '${result.bounceId}': $msg")
+ when (accountType) {
+ AccountType.exchange -> {
+ val result = db.payment.registerMalformedIncoming(
+ payment,
+ payment.amount,
+ Instant.now()
+ )
+ if (result.new) {
+ logger.info("$payment bounced in '${result.bounceId}': $msg")
+ } else {
+ logger.debug("$payment already seen and bounced in '${result.bounceId}': $msg")
+ }
+ }
+ AccountType.normal -> {
+ val res = db.payment.registerIncoming(payment)
+ if (res.new) {
+ logger.info("$payment")
+ } else {
+ logger.debug("$payment already seen")
+ }
+ }
}
}
runCatching { parseIncomingTxMetadata(payment.wireTransferSubject) }.fold(
@@ -163,7 +177,7 @@ private suspend fun ingestDocument(
logger.debug("IGNORE $it")
} else {
when (it) {
- is IncomingPayment -> ingestIncomingPayment(db, it)
+ is IncomingPayment -> ingestIncomingPayment(db, it, cfg.accountType)
is OutgoingPayment -> ingestOutgoingPayment(db, it)
is TxNotification.Reversal -> {
logger.error("BOUNCE '${it.msgId}': ${it.reason}")
@@ -364,10 +378,10 @@ class EbicsFetch: CliktCommand("Fetches EBICS files") {
* mode when no flags are passed to the invocation.
*/
override fun run() = cliCmd(logger, common.log) {
- val cfg = extractEbicsConfig(common.config)
+ val cfg = loadNexusConfig(common.config)
val dbCfg = cfg.config.dbConfig()
- Database(dbCfg).use { db ->
+ Database(dbCfg, cfg.currency).use { db ->
val (clientKeys, bankKeys) = expectFullKeys(cfg)
val ctx = FetchContext(
cfg,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt
index 1c9ea902..7da7da07 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt
@@ -155,7 +155,7 @@ suspend fun doKeysRequestAndUpdateState(
* @param configFile location of the configuration entry point.
* @return internal representation of the configuration.
*/
-fun extractEbicsConfig(configFile: Path?): NexusConfig {
+fun loadNexusConfig(configFile: Path?): NexusConfig {
val config = loadConfig(configFile)
return NexusConfig(config)
}
@@ -197,8 +197,8 @@ class EbicsSetup: CliktCommand("Set up the EBICS subscriber") {
* This function collects the main steps of setting up an EBICS access.
*/
override fun run() = cliCmd(logger, common.log) {
- val cfg = extractEbicsConfig(common.config)
- val client = HttpClient {
+ val cfg = loadNexusConfig(common.config)
+ val client = HttpClient {
install(HttpTimeout) {
// It can take a lot of time for the bank to generate documents
socketTimeoutMillis = 5 * 60 * 1000
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
index 8bde6d60..c6a6ceef 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
@@ -65,7 +65,7 @@ data class SubmissionContext(
private suspend fun submitInitiatedPayment(
ctx: SubmissionContext,
payment: InitiatedPayment
-): String {
+): String {
val creditAccount = try {
val payto = Payto.parse(payment.creditPaytoUri).expectIban()
IbanAccountMetadata(
@@ -147,7 +147,7 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat
* FIXME: reduce code duplication with the fetch subcommand.
*/
override fun run() = cliCmd(logger, common.log) {
- val cfg = extractEbicsConfig(common.config)
+ val cfg = loadNexusConfig(common.config)
val dbCfg = cfg.config.dbConfig()
val (clientKeys, bankKeys) = expectFullKeys(cfg)
val ctx = SubmissionContext(
@@ -157,7 +157,7 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat
httpClient = HttpClient(),
fileLogger = FileLogger(ebicsLog)
)
- Database(dbCfg).use { db ->
+ Database(dbCfg, cfg.currency).use { db ->
val frequency: Duration = if (transient) {
logger.info("Transient mode: submitting what found and returning.")
Duration.ZERO
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt
index e8cb5680..fce0b224 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt
@@ -300,13 +300,9 @@ data class OutgoingPayment(
private fun XmlDestructor.payto(prefix: String): String? {
val iban = opt("${prefix}Acct")?.one("Id")?.one("IBAN")?.text()
return if (iban != null) {
- val payto = StringBuilder("payto://iban/$iban")
val name = opt(prefix) { opt("Nm")?.text() ?: opt("Pty")?.one("Nm")?.text() }
- if (name != null) {
- val urlEncName = URLEncoder.encode(name, "utf-8")
- payto.append("?receiver-name=$urlEncName")
- }
- return payto.toString()
+ // Parse bic ?
+ IbanPayto.build(iban, null, name)
} else {
null
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index 0907ad7c..1ae3bdb3 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -31,6 +31,7 @@ import com.github.ajalt.clikt.parameters.arguments.*
import com.github.ajalt.clikt.parameters.groups.*
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.*
+import com.github.ajalt.clikt.core.ProgramResult
import io.ktor.client.*
import io.ktor.client.plugins.*
import kotlinx.serialization.json.Json
@@ -75,6 +76,7 @@ fun Instant.fmtDateTime(): String =
fun Application.nexusApi(db: Database, cfg: NexusConfig) = talerApi(logger) {
wireGatewayApi(db, cfg)
+ revenueApi(db, cfg)
}
/**
@@ -132,7 +134,7 @@ class InitiatePayment: CliktCommand("Initiate an outgoing payment") {
Base32Crockford.encode(bytes)
}
- Database(dbCfg).use { db ->
+ Database(dbCfg, currency).use { db ->
db.initiated.create(
InitiatedPayment(
id = -1,
@@ -147,6 +149,44 @@ class InitiatePayment: CliktCommand("Initiate an outgoing payment") {
}
}
+class Serve : CliktCommand("Run libeufin-nexus HTTP server", name = "serve") {
+ private val common by CommonOption()
+ private val check by option().flag()
+
+ override fun run() = cliCmd(logger, common.log) {
+ val cfg = loadNexusConfig(common.config)
+
+ if (check) {
+ // Check if the server is to be started
+ val apis = listOf(
+ cfg.wireGatewayApiCfg to "Wire Gateway API",
+ cfg.revenueApiCfg to "Revenue API"
+ )
+ var startServer = false
+ for ((api, name) in apis) {
+ if (api != null) {
+ startServer = true
+ logger.info("$name is enabled: starting the server")
+ }
+ }
+ if (!startServer) {
+ logger.info("All APIs are disabled: not starting the server")
+ throw ProgramResult(1)
+ } else {
+ throw ProgramResult(0)
+ }
+ }
+
+ val dbCfg = cfg.config.dbConfig()
+ val serverCfg = cfg.config.loadServerConfig("nexus-httpd")
+ Database(dbCfg, cfg.currency).use { db ->
+ serve(serverCfg) {
+ nexusApi(db, cfg)
+ }
+ }
+ }
+}
+
class FakeIncoming: CliktCommand("Genere a fake incoming payment") {
private val common by CommonOption()
private val amount by option(
@@ -162,15 +202,14 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") {
).convert { Payto.parse(it).expectIban() }
override fun run() = cliCmd(logger, common.log) {
- val cfg = loadConfig(common.config)
- val dbCfg = cfg.dbConfig()
- val currency = cfg.requireString("nexus-ebics", "currency")
+ val cfg = loadNexusConfig(common.config)
+ val dbCfg = cfg.config.dbConfig()
val subject = payto.message ?: subject ?: throw Exception("Missing subject")
val amount = payto.amount ?: amount ?: throw Exception("Missing amount")
- if (amount.currency != currency)
- throw Exception("Wrong currency: expected $currency got ${amount.currency}")
+ if (amount.currency != cfg.currency)
+ throw Exception("Wrong currency: expected ${cfg.currency} got ${amount.currency}")
val bankId = run {
val bytes = ByteArray(16)
@@ -178,7 +217,7 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") {
Base32Crockford.encode(bytes)
}
- Database(dbCfg).use { db ->
+ Database(dbCfg, amount.currency).use { db ->
ingestIncomingPayment(db,
IncomingPayment(
amount = amount,
@@ -186,7 +225,8 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") {
wireTransferSubject = subject,
executionTime = Instant.now(),
bankId = bankId
- )
+ ),
+ cfg.accountType
)
}
}
@@ -227,7 +267,7 @@ class EbicsDownload: CliktCommand("Perform EBICS requests", name = "ebics-btd")
class DryRun: Exception()
override fun run() = cliCmd(logger, common.log) {
- val cfg = extractEbicsConfig(common.config)
+ val cfg = loadNexusConfig(common.config)
val (clientKeys, bankKeys) = expectFullKeys(cfg)
val pinnedStartVal = pinnedStart
val pinnedStartArg = if (pinnedStartVal != null) {
@@ -282,7 +322,7 @@ class ListCmd: CliktCommand("List nexus transactions", name = "list") {
val dbCfg = cfg.dbConfig()
val currency = cfg.requireString("nexus-ebics", "currency")
- Database(dbCfg).use { db ->
+ Database(dbCfg, currency).use { db ->
fun fmtPayto(payto: String?): String {
if (payto == null) return ""
try {
@@ -405,7 +445,7 @@ class TestingCmd : CliktCommand("Testing helper commands", name = "testing") {
class LibeufinNexusCommand : CliktCommand() {
init {
versionOption(getVersion())
- subcommands(EbicsSetup(), DbInit(), EbicsSubmit(), EbicsFetch(), InitiatePayment(), CliConfigCmd(NEXUS_CONFIG_SOURCE), TestingCmd())
+ subcommands(EbicsSetup(), DbInit(), Serve(), EbicsSubmit(), EbicsFetch(), InitiatePayment(), CliConfigCmd(NEXUS_CONFIG_SOURCE), TestingCmd())
}
override fun run() = Unit
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/RevenueApi.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/RevenueApi.kt
new file mode 100644
index 00000000..e1435a44
--- /dev/null
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/api/RevenueApi.kt
@@ -0,0 +1,45 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+package tech.libeufin.nexus.api
+
+import io.ktor.http.*
+import io.ktor.server.application.*
+import io.ktor.server.response.*
+import io.ktor.server.routing.*
+import tech.libeufin.nexus.*
+import tech.libeufin.nexus.db.*
+import tech.libeufin.common.*
+
+fun Routing.revenueApi(db: Database, cfg: NexusConfig) = authApi(cfg.revenueApiCfg) {
+ get("/taler-revenue/config") {
+ call.respond(RevenueConfig(
+ currency = cfg.currency
+ ))
+ }
+ get("/taler-revenue/history") {
+ val params = HistoryParams.extract(context.request.queryParameters)
+ val items = db.payment.revenueHistory(params)
+
+ if (items.isEmpty()) {
+ call.respond(HttpStatusCode.NoContent)
+ } else {
+ call.respond(RevenueIncomingHistory(items, cfg.payto))
+ }
+ }
+} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt
index f7374204..d645b953 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt
@@ -29,10 +29,12 @@ import tech.libeufin.common.*
import tech.libeufin.nexus.*
import tech.libeufin.nexus.db.*
import tech.libeufin.nexus.db.PaymentDAO.*
+import tech.libeufin.nexus.db.InitiatedDAO.*
+import tech.libeufin.nexus.db.ExchangeDAO.*
import java.time.Instant
-fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) {
+fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) = authApi(cfg.wireGatewayApiCfg) {
get("/taler-wire-gateway/config") {
call.respond(WireGatewayConfig(
currency = cfg.currency
@@ -41,69 +43,52 @@ fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) {
post("/taler-wire-gateway/transfer") {
val req = call.receive<TransferRequest>()
cfg.checkCurrency(req.amount)
- // TODO
- /*val res = db.exchange.transfer(
- req = req,
- login = username,
- now = Instant.now()
+ req.credit_account.expectRequestIban()
+ val bankId = run {
+ val bytes = ByteArray(16)
+ kotlin.random.Random.nextBytes(bytes)
+ Base32Crockford.encode(bytes)
+ }
+ val res = db.exchange.transfer(
+ req,
+ bankId,
+ Instant.now()
)
when (res) {
- is TransferResult.UnknownExchange -> throw unknownAccount(username)
- is TransferResult.NotAnExchange -> throw conflict(
- "$username is not an exchange account.",
- TalerErrorCode.BANK_ACCOUNT_IS_NOT_EXCHANGE
- )
- is TransferResult.UnknownCreditor -> throw unknownCreditorAccount(req.credit_account.canonical)
- is TransferResult.BothPartyAreExchange -> throw conflict(
- "Wire transfer attempted with credit and debit party being both exchange account",
- TalerErrorCode.BANK_ACCOUNT_IS_EXCHANGE
- )
- is TransferResult.ReserveUidReuse -> throw conflict(
+ TransferResult.RequestUidReuse -> throw conflict(
"request_uid used already",
TalerErrorCode.BANK_TRANSFER_REQUEST_UID_REUSED
)
- is TransferResult.BalanceInsufficient -> throw conflict(
- "Insufficient balance for exchange",
- TalerErrorCode.BANK_UNALLOWED_DEBIT
- )
is TransferResult.Success -> call.respond(
TransferResponse(
timestamp = res.timestamp,
row_id = res.id
)
)
- }*/
+ }
}
- /*suspend fun <T> PipelineContext<Unit, ApplicationCall>.historyEndpoint(
+ suspend fun <T> PipelineContext<Unit, ApplicationCall>.historyEndpoint(
reduce: (List<T>, String) -> Any,
- dbLambda: suspend ExchangeDAO.(HistoryParams, Long, BankPaytoCtx) -> List<T>
+ dbLambda: suspend ExchangeDAO.(HistoryParams) -> List<T>
) {
val params = HistoryParams.extract(context.request.queryParameters)
- val bankAccount = call.bankInfo(db, ctx.payto)
-
- if (!bankAccount.isTalerExchange)
- throw conflict(
- "$username is not an exchange account.",
- TalerErrorCode.BANK_ACCOUNT_IS_NOT_EXCHANGE
- )
-
- val items = db.exchange.dbLambda(params, bankAccount.bankAccountId, ctx.payto)
-
+ val items = db.exchange.dbLambda(params)
if (items.isEmpty()) {
call.respond(HttpStatusCode.NoContent)
} else {
- call.respond(reduce(items, bankAccount.payto))
+ call.respond(reduce(items, cfg.payto))
}
- }*/
- /*get("/taler-wire-gateway/history/incoming") {
+ }
+ get("/taler-wire-gateway/history/incoming") {
historyEndpoint(::IncomingHistory, ExchangeDAO::incomingHistory)
}
get("/taler-wire-gateway/history/outgoing") {
historyEndpoint(::OutgoingHistory, ExchangeDAO::outgoingHistory)
- }*/
+ }
post("/taler-wire-gateway/admin/add-incoming") {
val req = call.receive<AddIncomingRequest>()
cfg.checkCurrency(req.amount)
+ req.debit_account.expectRequestIban()
val timestamp = Instant.now()
val bankId = run {
val bytes = ByteArray(16)
@@ -122,7 +107,6 @@ fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) {
"reserve_pub used already",
TalerErrorCode.BANK_DUPLICATE_RESERVE_PUB_SUBJECT
)
- // TODO timestamp when idempotent
is IncomingRegistrationResult.Success -> call.respond(
AddIncomingResponse(
timestamp = TalerProtocolTimestamp(timestamp),
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/helpers.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/helpers.kt
new file mode 100644
index 00000000..df5acb83
--- /dev/null
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/api/helpers.kt
@@ -0,0 +1,65 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.nexus.api
+
+import tech.libeufin.nexus.*
+import tech.libeufin.common.*
+import tech.libeufin.common.api.*
+import io.ktor.http.*
+import io.ktor.server.application.*
+import io.ktor.server.response.*
+import io.ktor.server.routing.*
+import io.ktor.util.*
+import io.ktor.util.pipeline.*
+
+/** Apply api configuration for a route: conditional access and authentication */
+fun Route.authApi(cfg: ApiConfig?, callback: Route.() -> Unit): Route =
+ intercept(callback) {
+ if (cfg == null) {
+ throw apiError(HttpStatusCode.NotImplemented, "API not implemented", TalerErrorCode.END)
+ }
+ val header = context.request.headers["Authorization"]
+ // Basic auth challenge
+ when (cfg.authMethod) {
+ AuthMethod.None -> {}
+ is AuthMethod.Bearer -> {
+ if (header == null) {
+ context.response.header(HttpHeaders.WWWAuthenticate, "Bearer")
+ throw unauthorized(
+ "Authorization header not found",
+ TalerErrorCode.GENERIC_PARAMETER_MISSING
+ )
+ }
+ val (scheme, content) = header.splitOnce(" ") ?: throw badRequest(
+ "Authorization is invalid",
+ TalerErrorCode.GENERIC_HTTP_HEADERS_MALFORMED
+ )
+ when (scheme) {
+ "Bearer" -> {
+ // TODO choose between one of those
+ if (content != cfg.authMethod.token) {
+ throw unauthorized("Unknown token")
+ }
+ }
+ else -> throw unauthorized("Authorization method wrong or not supported")
+ }
+ }
+ }
+ } \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt
index b6422612..25cfaa59 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt
@@ -18,9 +18,12 @@
*/
package tech.libeufin.nexus.db
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import tech.libeufin.common.TalerAmount
-import tech.libeufin.common.db.DatabaseConfig
-import tech.libeufin.common.db.DbPool
+import tech.libeufin.common.db.*
import java.time.Instant
/**
@@ -39,7 +42,39 @@ data class InitiatedPayment(
/**
* Collects database connection steps and any operation on the Nexus tables.
*/
-class Database(dbConfig: DatabaseConfig): DbPool(dbConfig, "libeufin_nexus") {
+class Database(dbConfig: DatabaseConfig, val bankCurrency: String): DbPool(dbConfig, "libeufin_nexus") {
val payment = PaymentDAO(this)
val initiated = InitiatedDAO(this)
+ val exchange = ExchangeDAO(this)
+
+ private val outgoingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow()
+ private val incomingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow()
+ private val revenueTxFlows: MutableSharedFlow<Long> = MutableSharedFlow()
+
+ init {
+ watchNotifications(pgSource, "libeufin_nexus", LoggerFactory.getLogger("libeufin-nexus-db-watcher"), mapOf(
+ "revenue_tx" to {
+ val id = it.toLong()
+ revenueTxFlows.emit(id)
+ },
+ "outgoing_tx" to {
+ val id = it.toLong()
+ outgoingTxFlows.emit(id)
+ },
+ "incoming_tx" to {
+ val id = it.toLong()
+ incomingTxFlows.emit(id)
+ }
+ ))
+ }
+
+ /** Listen for new taler outgoing transactions */
+ suspend fun <R> listenOutgoing(lambda: suspend (Flow<Long>) -> R): R
+ = lambda(outgoingTxFlows)
+ /** Listen for new taler incoming transactions */
+ suspend fun <R> listenIncoming(lambda: suspend (Flow<Long>) -> R): R
+ = lambda(incomingTxFlows)
+ /** Listen for new incoming transactions */
+ suspend fun <R> listenRevenue(lambda: suspend (Flow<Long>) -> R): R
+ = lambda(revenueTxFlows)
} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt
new file mode 100644
index 00000000..6f3a3a3a
--- /dev/null
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt
@@ -0,0 +1,128 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.nexus.db
+
+import tech.libeufin.common.db.*
+import tech.libeufin.common.*
+import java.sql.ResultSet
+import java.time.Instant
+
+/** Data access logic for exchange specific logic */
+class ExchangeDAO(private val db: Database) {
+ /** Query history of taler incoming transactions */
+ suspend fun incomingHistory(
+ params: HistoryParams
+ ): List<IncomingReserveTransaction>
+ = db.poolHistoryGlobal(params, db::listenIncoming, """
+ SELECT
+ incoming_transaction_id
+ ,execution_time
+ ,(amount).val AS amount_val
+ ,(amount).frac AS amount_frac
+ ,debit_payto_uri
+ ,reserve_public_key
+ FROM talerable_incoming_transactions
+ JOIN incoming_transactions USING(incoming_transaction_id)
+ WHERE
+ """, "incoming_transaction_id") {
+ IncomingReserveTransaction(
+ row_id = it.getLong("incoming_transaction_id"),
+ date = it.getTalerTimestamp("execution_time"),
+ amount = it.getAmount("amount", db.bankCurrency),
+ debit_account = it.getString("debit_payto_uri"),
+ reserve_pub = EddsaPublicKey(it.getBytes("reserve_public_key")),
+ )
+ }
+
+ /** Query [exchangeId] history of taler outgoing transactions */
+ suspend fun outgoingHistory(
+ params: HistoryParams
+ ): List<OutgoingTransaction>
+ = db.poolHistoryGlobal(params, db::listenOutgoing, """
+ SELECT
+ outgoing_transaction_id
+ ,execution_time AS execution_time
+ ,(amount).val AS amount_val
+ ,(amount).frac AS amount_frac
+ ,credit_payto_uri AS credit_payto_uri
+ ,wtid
+ ,exchange_base_url
+ FROM talerable_outgoing_transactions
+ JOIN outgoing_transactions USING(outgoing_transaction_id)
+ WHERE
+ """, "outgoing_transaction_id") {
+ OutgoingTransaction(
+ row_id = it.getLong("outgoing_transaction_id"),
+ date = it.getTalerTimestamp("execution_time"),
+ amount = it.getAmount("amount", db.bankCurrency),
+ credit_account = it.getString("credit_payto_uri"),
+ wtid = ShortHashCode(it.getBytes("wtid")),
+ exchange_base_url = it.getString("exchange_base_url")
+ )
+ }
+
+ /** Result of taler transfer transaction creation */
+ sealed interface TransferResult {
+ /** Transaction [id] and wire transfer [timestamp] */
+ data class Success(val id: Long, val timestamp: TalerProtocolTimestamp): TransferResult
+ data object RequestUidReuse: TransferResult
+ }
+
+ /** Perform a Taler transfer */
+ suspend fun transfer(
+ req: TransferRequest,
+ bankId: String,
+ now: Instant
+ ): TransferResult = db.serializable { conn ->
+ val subject = "${req.wtid} ${req.exchange_base_url.url}"
+ val stmt = conn.prepareStatement("""
+ SELECT
+ out_request_uid_reuse
+ ,out_tx_row_id
+ ,out_timestamp
+ FROM
+ taler_transfer (
+ ?, ?, ?,
+ (?,?)::taler_amount,
+ ?, ?, ?, ?
+ );
+ """)
+
+ stmt.setBytes(1, req.request_uid.raw)
+ stmt.setBytes(2, req.wtid.raw)
+ stmt.setString(3, subject)
+ stmt.setLong(4, req.amount.value)
+ stmt.setInt(5, req.amount.frac)
+ stmt.setString(6, req.exchange_base_url.url)
+ stmt.setString(7, req.credit_account.canonical)
+ stmt.setString(8, bankId)
+ stmt.setLong(9, now.micros())
+
+ stmt.one {
+ when {
+ it.getBoolean("out_request_uid_reuse") -> TransferResult.RequestUidReuse
+ else -> TransferResult.Success(
+ id = it.getLong("out_tx_row_id"),
+ timestamp = it.getTalerTimestamp("out_timestamp")
+ )
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
index 04fd3965..052b75f9 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
@@ -22,6 +22,7 @@ package tech.libeufin.nexus.db
import tech.libeufin.common.asInstant
import tech.libeufin.common.db.all
import tech.libeufin.common.db.executeUpdateViolation
+import tech.libeufin.common.db.oneUniqueViolation
import tech.libeufin.common.db.getAmount
import tech.libeufin.common.db.oneOrNull
import tech.libeufin.common.micros
@@ -32,9 +33,9 @@ import java.time.Instant
class InitiatedDAO(private val db: Database) {
/** Outgoing payments initiation result */
- enum class PaymentInitiationResult {
- REQUEST_UID_REUSE,
- SUCCESS
+ sealed interface PaymentInitiationResult {
+ data class Success(val id: Long): PaymentInitiationResult
+ data object RequestUidReuse: PaymentInitiationResult
}
/** Register a new pending payment in the database */
@@ -47,16 +48,18 @@ class InitiatedDAO(private val db: Database) {
,initiation_time
,request_uid
) VALUES ((?,?)::taler_amount,?,?,?,?)
+ RETURNING initiated_outgoing_transaction_id
""")
+ // TODO check payto uri
stmt.setLong(1, paymentData.amount.value)
stmt.setInt(2, paymentData.amount.frac)
stmt.setString(3, paymentData.wireTransferSubject)
stmt.setString(4, paymentData.creditPaytoUri.toString())
stmt.setLong(5, paymentData.initiationTime.micros())
stmt.setString(6, paymentData.requestUid)
- if (stmt.executeUpdateViolation())
- return@conn PaymentInitiationResult.SUCCESS
- return@conn PaymentInitiationResult.REQUEST_UID_REUSE
+ stmt.oneUniqueViolation(PaymentInitiationResult.RequestUidReuse) {
+ PaymentInitiationResult.Success(it.getLong("initiated_outgoing_transaction_id"))
+ }
}
/** Register EBICS submission success */
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt
index 1253e084..e17184eb 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt
@@ -35,10 +35,14 @@ class PaymentDAO(private val db: Database) {
)
/** Register an outgoing payment reconciling it with its initiated payment counterpart if present */
- suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = db.conn {
+ suspend fun registerOutgoing(
+ paymentData: OutgoingPayment,
+ wtid: ShortHashCode?,
+ baseUrl: ExchangeUrl?,
+ ): OutgoingRegistrationResult = db.conn {
val stmt = it.prepareStatement("""
SELECT out_tx_id, out_initiated, out_found
- FROM register_outgoing((?,?)::taler_amount,?,?,?,?)
+ FROM register_outgoing((?,?)::taler_amount,?,?,?,?,?,?)
""")
val executionTime = paymentData.executionTime.micros()
stmt.setLong(1, paymentData.amount.value)
@@ -47,6 +51,17 @@ class PaymentDAO(private val db: Database) {
stmt.setLong(4, executionTime)
stmt.setString(5, paymentData.creditPaytoUri)
stmt.setString(6, paymentData.messageId)
+ if (wtid != null) {
+ stmt.setBytes(7, wtid.raw)
+ } else {
+ stmt.setNull(7, java.sql.Types.NULL)
+ }
+ if (baseUrl != null) {
+ stmt.setString(8, baseUrl.url)
+ } else {
+ stmt.setNull(8, java.sql.Types.NULL)
+ }
+
stmt.one {
OutgoingRegistrationResult(
it.getLong("out_tx_id"),
@@ -127,6 +142,52 @@ class PaymentDAO(private val db: Database) {
}
}
+ /** Register an incoming payment */
+ suspend fun registerIncoming(
+ paymentData: IncomingPayment
+ ): IncomingRegistrationResult.Success = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ SELECT out_found, out_tx_id
+ FROM register_incoming((?,?)::taler_amount,?,?,?,?)
+ """)
+ val executionTime = paymentData.executionTime.micros()
+ stmt.setLong(1, paymentData.amount.value)
+ stmt.setInt(2, paymentData.amount.frac)
+ stmt.setString(3, paymentData.wireTransferSubject)
+ stmt.setLong(4, executionTime)
+ stmt.setString(5, paymentData.debitPaytoUri)
+ stmt.setString(6, paymentData.bankId)
+ stmt.one {
+ IncomingRegistrationResult.Success(
+ it.getLong("out_tx_id"),
+ !it.getBoolean("out_found")
+ )
+ }
+ }
+
+ /** Query history of incoming transactions */
+ suspend fun revenueHistory(
+ params: HistoryParams
+ ): List<RevenueIncomingBankTransaction>
+ = db.poolHistoryGlobal(params, db::listenRevenue, """
+ SELECT
+ incoming_transaction_id
+ ,execution_time
+ ,(amount).val AS amount_val
+ ,(amount).frac AS amount_frac
+ ,debit_payto_uri
+ ,wire_transfer_subject
+ FROM incoming_transactions WHERE
+ """, "incoming_transaction_id") {
+ RevenueIncomingBankTransaction(
+ row_id = it.getLong("incoming_transaction_id"),
+ date = it.getTalerTimestamp("execution_time"),
+ amount = it.getAmount("amount", db.bankCurrency),
+ debit_account = it.getString("debit_payto_uri"),
+ subject = it.getString("wire_transfer_subject")
+ )
+ }
+
/** List incoming transaction metadata for debugging */
suspend fun metadataIncoming(): List<IncomingTxMetadata> = db.conn { conn ->
val stmt = conn.prepareStatement("""