summaryrefslogtreecommitdiff
path: root/nexus/src/main/kotlin
diff options
context:
space:
mode:
authorMS <ms@taler.net>2020-06-06 00:36:12 +0200
committerMS <ms@taler.net>2020-06-06 00:36:12 +0200
commit74c2a6c0e9b3ed17fdb8bd3abd1325edcd662d07 (patch)
tree46fb8beba729f65e731a4f23f30839274efce702 /nexus/src/main/kotlin
parent708661483b61f2d420a6a51c68220c3f5e141d38 (diff)
downloadlibeufin-74c2a6c0e9b3ed17fdb8bd3abd1325edcd662d07.tar.gz
libeufin-74c2a6c0e9b3ed17fdb8bd3abd1325edcd662d07.tar.bz2
libeufin-74c2a6c0e9b3ed17fdb8bd3abd1325edcd662d07.zip
deduplicating transactions and addressing #6266.
Diffstat (limited to 'nexus/src/main/kotlin')
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt31
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt16
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt36
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt85
4 files changed, 94 insertions, 74 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
index 2a6c61a8..193c1ef4 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
@@ -80,7 +80,6 @@ class TalerIncomingPaymentEntity(id: EntityID<Long>) : LongEntity(id) {
return newRow
}
}
-
var payment by RawBankTransactionEntity referencedOn TalerIncomingPayments.payment
var valid by TalerIncomingPayments.valid
var refunded by TalerIncomingPayments.refunded
@@ -120,12 +119,12 @@ object RawBankTransactionsTable : LongIdTable() {
val counterpartName = text("counterpartName")
val bookingDate = long("bookingDate")
val status = text("status") // BOOK or other.
+ val uid = text("uid") // AcctSvcrRef code, given by the bank.
val bankAccount = reference("bankAccount", NexusBankAccountsTable)
}
class RawBankTransactionEntity(id: EntityID<Long>) : LongEntity(id) {
companion object : LongEntityClass<RawBankTransactionEntity>(RawBankTransactionsTable)
-
var unstructuredRemittanceInformation by RawBankTransactionsTable.unstructuredRemittanceInformation
var transactionType by RawBankTransactionsTable.transactionType
var currency by RawBankTransactionsTable.currency
@@ -135,6 +134,7 @@ class RawBankTransactionEntity(id: EntityID<Long>) : LongEntity(id) {
var counterpartName by RawBankTransactionsTable.counterpartName
var bookingDate by RawBankTransactionsTable.bookingDate
var status by RawBankTransactionsTable.status
+ var uid by RawBankTransactionsTable.uid
var bankAccount by NexusBankAccountEntity referencedOn RawBankTransactionsTable.bankAccount
}
@@ -157,10 +157,8 @@ object PreparedPaymentsTable : IdTable<String>() {
val debitorIban = text("debitorIban")
val debitorBic = text("debitorBic")
val debitorName = text("debitorName").nullable()
-
/* Indicates whether the PAIN message was sent to the bank. */
val submitted = bool("submitted").default(false)
-
/* Indicates whether the bank didn't perform the payment: note that
* this state can be reached when the payment gets listed in a CRZ
* response OR when the payment doesn't show up in a C52/C53 response */
@@ -169,7 +167,6 @@ object PreparedPaymentsTable : IdTable<String>() {
class PreparedPaymentEntity(id: EntityID<String>) : Entity<String>(id) {
companion object : EntityClass<String, PreparedPaymentEntity>(PreparedPaymentsTable)
-
var paymentId by PreparedPaymentsTable.paymentId
var preparationDate by PreparedPaymentsTable.preparationDate
var submissionDate by PreparedPaymentsTable.submissionDate
@@ -227,7 +224,6 @@ object EbicsSubscribersTable : IntIdTable() {
class EbicsSubscriberEntity(id: EntityID<Int>) : IntEntity(id) {
companion object : IntEntityClass<EbicsSubscriberEntity>(EbicsSubscribersTable)
-
var ebicsURL by EbicsSubscribersTable.ebicsURL
var hostID by EbicsSubscribersTable.hostID
var partnerID by EbicsSubscribersTable.partnerID
@@ -273,7 +269,6 @@ object FacadesTable : IdTable<String>() {
override val primaryKey = PrimaryKey(id, name = "id")
val type = text("type")
val creator = reference("creator", NexusUsersTable)
- val config = reference("config", TalerFacadeConfigsTable) // see #6266
val highestSeenMsgID = long("highestSeenMessageID").default(0)
}
@@ -281,25 +276,27 @@ class FacadeEntity(id: EntityID<String>) : Entity<String>(id) {
companion object : EntityClass<String, FacadeEntity>(FacadesTable)
var type by FacadesTable.type
var creator by NexusUserEntity referencedOn FacadesTable.creator
- var config by TalerFacadeConfigEntity referencedOn FacadesTable.config
- var highestSeenMsgID by FacadesTable.highestSeenMsgID
}
-object TalerFacadeConfigsTable : IntIdTable() {
+object TalerFacadeStatesTable : IntIdTable() {
val bankAccount = text("bankAccount")
val bankConnection = text("bankConnection")
/* "statement", "report", "notification" */
val reserveTransferLevel = text("reserveTransferLevel")
val intervalIncrement = text("intervalIncrement")
+ val facade = reference("facade", FacadesTable)
+ val highestSeenMsgID = long("highestSeenMsgID").default(0)
}
-class TalerFacadeConfigEntity(id: EntityID<Int>) : IntEntity(id) {
- companion object : IntEntityClass<TalerFacadeConfigEntity>(TalerFacadeConfigsTable)
- var bankAccount by TalerFacadeConfigsTable.bankAccount
- var bankConnection by TalerFacadeConfigsTable.bankConnection
+class TalerFacadeStateEntity(id: EntityID<Int>) : IntEntity(id) {
+ companion object : IntEntityClass<TalerFacadeStateEntity>(TalerFacadeStatesTable)
+ var bankAccount by TalerFacadeStatesTable.bankAccount
+ var bankConnection by TalerFacadeStatesTable.bankConnection
/* "statement", "report", "notification" */
- var reserveTransferLevel by TalerFacadeConfigsTable.reserveTransferLevel
- var intervalIncrement by TalerFacadeConfigsTable.intervalIncrement
+ var reserveTransferLevel by TalerFacadeStatesTable.reserveTransferLevel
+ var intervalIncrement by TalerFacadeStatesTable.intervalIncrement
+ var facade by FacadeEntity referencedOn TalerFacadeStatesTable.facade
+ var highestSeenMsgID by TalerFacadeStatesTable.highestSeenMsgID
}
fun dbCreateTables(dbName: String) {
@@ -318,7 +315,7 @@ fun dbCreateTables(dbName: String) {
NexusBankConnectionsTable,
NexusBankMessagesTable,
FacadesTable,
- TalerFacadeConfigsTable
+ TalerFacadeStatesTable
)
}
} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt
index 1462ab97..450c1eec 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt
@@ -104,6 +104,16 @@ fun getEbicsSubscriberDetails(userId: String, transportId: String): EbicsClientS
return getEbicsSubscriberDetailsInternal(subscriber)
}
+// returns true if the payment is found in the database.
+fun isDuplicate(camt: Document, acctSvcrRef: String): Boolean {
+ val found = transaction {
+ RawBankTransactionEntity.find {
+ RawBankTransactionsTable.uid eq acctSvcrRef
+ }.firstOrNull()
+ }
+ return found != null
+}
+
fun processCamtMessage(
bankAccountId: String,
camt53doc: Document
@@ -116,8 +126,14 @@ fun processCamtMessage(
val bookingDate = parseDashedDate(
camt53doc.pickString("//*[local-name()='BookgDt']//*[local-name()='Dt']")
)
+ val acctSvcrRef = camt53doc.pickString("//*[local-name()='AcctSvcrRef']")
+ if (isDuplicate(camt53doc, acctSvcrRef)) {
+ logger.info("Processing a duplicate, not storing it.")
+ return@transaction
+ }
RawBankTransactionEntity.new {
bankAccount = acct
+ uid = acctSvcrRef
unstructuredRemittanceInformation =
camt53doc.pickString("//*[local-name()='Ntry']//*[local-name()='Ustrd']")
transactionType = camt53doc.pickString("//*[local-name()='Ntry']//*[local-name()='CdtDbtInd']")
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index 372f0e83..901db99c 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -74,7 +74,6 @@ import java.util.*
import java.util.zip.InflaterInputStream
import javax.crypto.EncryptedPrivateKeyInfo
import java.time.LocalDateTime
-import kotlin.coroutines.CoroutineContext
data class NexusError(val statusCode: HttpStatusCode, val reason: String) :
Exception("${reason} (HTTP status $statusCode)")
@@ -265,7 +264,7 @@ fun schedulePeriodicWork() {
logger.debug("Outer background job")
try {
delay(Duration.ofSeconds(1))
- downloadFacadesTransactions(this)
+ downloadTalerFacadesTransactions(this)
ingestTalerTransactions()
} catch (e: Exception) {
logger.info("==== Background job exception ====\n${e.message}======")
@@ -275,13 +274,13 @@ fun schedulePeriodicWork() {
}
/** Crawls all the facades, and requests history for each of its creators. */
-suspend fun downloadFacadesTransactions(myScope: CoroutineScope) {
+suspend fun downloadTalerFacadesTransactions(myScope: CoroutineScope) {
val httpClient = HttpClient()
val work = mutableListOf<Pair<String, String>>()
transaction {
- FacadeEntity.all().forEach {
- logger.debug("Fetching history for facade: ${it.id.value}, bank account: ${it.config.bankAccount}")
- work.add(Pair(it.creator.id.value, it.config.bankAccount))
+ TalerFacadeStateEntity.all().forEach {
+ logger.debug("Fetching history for facade: ${it.id.value}, bank account: ${it.bankAccount}")
+ work.add(Pair(it.facade.creator.id.value, it.bankAccount))
}
}
work.forEach {
@@ -489,7 +488,6 @@ fun serverMain(dbName: String) {
call.respond(HttpStatusCode.OK, qr)
return@post
}
-
/**
* Shows the bank accounts belonging to the requesting user.
*/
@@ -522,7 +520,6 @@ fun serverMain(dbName: String) {
}
call.respond(res)
}
-
/**
* Submit one particular payment to the bank.
*/
@@ -1095,25 +1092,20 @@ fun serverMain(dbName: String) {
}
post("/facades") {
val body = call.receive<FacadeInfo>()
- val (user, talerConfig) = transaction {
+ val newFacade = transaction {
val user = authenticateRequest(call.request)
- val talerConfig = TalerFacadeConfigEntity.new {
+ FacadeEntity.new(body.name) {
+ type = body.type
+ creator = user
+ }
+ }
+ transaction {
+ TalerFacadeStateEntity.new {
bankAccount = body.config.bankAccount
bankConnection = body.config.bankConnection
intervalIncrement = body.config.intervalIncremental
reserveTransferLevel = body.config.reserveTransferLevel
- }
- Pair(user, talerConfig)
- }
- // Kotlin+Exposed did NOT like the referenced and referencing
- // tables to be created inside the same transfer block. This
- // problem must be further investigated.
- transaction {
- FacadeEntity.new(body.name) {
- type = body.type
- creator = user
- config = talerConfig
- highestSeenMsgID = 0
+ facade = newFacade
}
}
call.respondText("Facade created")
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt
index 499743e3..7a74d09b 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt
@@ -177,9 +177,13 @@ fun expectLong(param: String?): Long {
}
}
-
/** Helper handling 'start' being optional and its dependence on 'delta'. */
fun handleStartArgument(start: String?, delta: Int): Long {
+ if (start == null) {
+ if (delta >= 0)
+ return -1
+ return Long.MAX_VALUE
+ }
return expectLong(start)
}
@@ -218,13 +222,36 @@ fun paymentFailed(entry: RawBankTransactionEntity): Boolean {
return false
}
-fun getFacadeBankAccount(nexusUser: NexusUserEntity): NexusBankAccountEntity {
- val facade = FacadeEntity.find { FacadesTable.creator eq nexusUser.id.value }.firstOrNull() ?: throw NexusError(
- HttpStatusCode.NotFound, "Could not find any facade from ${nexusUser.id.value}"
+fun getTalerFacadeState(fcid: String): TalerFacadeStateEntity {
+ val facade = FacadeEntity.find { FacadesTable.id eq fcid }.firstOrNull() ?: throw NexusError(
+ HttpStatusCode.NotFound,
+ "Could not find facade '${fcid}'"
+ )
+ val facadeState = TalerFacadeStateEntity.find {
+ TalerFacadeStatesTable.facade eq facade.id.value
+ }.firstOrNull() ?: throw NexusError(
+ HttpStatusCode.NotFound,
+ "Could not find any state for facade: ${fcid}"
)
- val bankAccount = NexusBankAccountEntity.findById(facade.config.bankAccount) ?: throw NexusError(
- HttpStatusCode.NotFound, "Could not find any bank account named ${facade.config.bankAccount}"
+ return facadeState
+}
+
+fun getTalerFacadeBankAccount(fcid: String): NexusBankAccountEntity {
+ val facade = FacadeEntity.find { FacadesTable.id eq fcid }.firstOrNull() ?: throw NexusError(
+ HttpStatusCode.NotFound,
+ "Could not find facade '${fcid}'"
)
+ val facadeState = TalerFacadeStateEntity.find {
+ TalerFacadeStatesTable.facade eq facade.id.value
+ }.firstOrNull() ?: throw NexusError(
+ HttpStatusCode.NotFound,
+ "Could not find any state for facade: ${fcid}"
+ )
+ val bankAccount = NexusBankAccountEntity.findById(facadeState.bankAccount) ?: throw NexusError(
+ HttpStatusCode.NotFound,
+ "Could not find any bank account named ${facadeState.bankAccount}"
+ )
+
return bankAccount
}
@@ -251,7 +278,7 @@ suspend fun talerTransfer(call: ApplicationCall) {
)
}
}
- val exchangeBankAccount = getFacadeBankAccount(exchangeUser)
+ val exchangeBankAccount = getTalerFacadeBankAccount(expectNonNull(call.parameters["fcid"]))
val pain001 = addPreparedPayment(
Pain001Data(
creditorIban = creditorData.iban,
@@ -263,6 +290,7 @@ suspend fun talerTransfer(call: ApplicationCall) {
),
exchangeBankAccount
)
+ logger.debug("Taler requests payment: ${transferRequest.wtid}")
val row = TalerRequestedPaymentEntity.new {
preparedPayment = pain001 // not really used/needed, just here to silence warnings
exchangeBaseUrl = transferRequest.exchange_base_url
@@ -296,16 +324,12 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit {
val addIncomingData = call.receive<TalerAdminAddIncoming>()
val debtor = parsePayto(addIncomingData.debit_account)
val res = transaction {
+ val user = authenticateRequest(call.request)
val facadeID = expectNonNull(call.parameters["fcid"])
- val facade = FacadeEntity.findById(facadeID) ?: throw NexusError(
- HttpStatusCode.NotFound, "Could not find facade '$facadeID'"
- )
- val facadeBankAccount = NexusBankAccountEntity.findById(facade.config.bankAccount) ?: throw NexusError(
- HttpStatusCode.NotFound,
- "Such bank account '${facade.config.bankAccount}' wasn't found for facade '$facadeID'"
- )
+ val facadeState = getTalerFacadeState(facadeID)
+ val facadeBankAccount = getTalerFacadeBankAccount(facadeID)
return@transaction object {
- val facadeLastSeen = facade.highestSeenMsgID
+ val facadeLastSeen = facadeState.highestSeenMsgID
val facadeIban = facadeBankAccount.iban
val facadeBic = facadeBankAccount.bankCode
val facadeHolderName = facadeBankAccount.accountHolder
@@ -317,6 +341,7 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit {
urlString = "http://localhost:5000/admin/payments",
block = {
/** FIXME: ideally Jackson should define such request body. */
+ val parsedAmount = parseAmount(addIncomingData.amount)
this.body = """{
"creditorIban": "${res.facadeIban}",
"creditorBic": "${res.facadeBic}",
@@ -324,7 +349,8 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit {
"debitorIban": "${debtor.iban}",
"debitorBic": "${debtor.bic}",
"debitorName": "${debtor.name}",
- "amount": "${addIncomingData.amount}",
+ "amount": "${parsedAmount.amount}",
+ "currency": "${parsedAmount.currency}",
"subject": "${addIncomingData.reserve_pub}"
}""".trimIndent()
contentType(ContentType.Application.Json)
@@ -356,7 +382,8 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit {
fun ingestTalerTransactions() {
fun ingest(subscriberAccount: NexusBankAccountEntity, facade: FacadeEntity) {
logger.debug("Ingesting transactions for Taler facade: ${facade.id.value}")
- var lastId = facade.highestSeenMsgID
+ val facadeState = getTalerFacadeState(facade.id.value)
+ var lastId = facadeState.highestSeenMsgID
RawBankTransactionEntity.find {
/** Those with exchange bank account involved */
RawBankTransactionsTable.bankAccount eq subscriberAccount.id.value and
@@ -365,13 +392,6 @@ fun ingestTalerTransactions() {
/** Those that came later than the latest processed payment */
(RawBankTransactionsTable.id.greater(lastId))
}.orderBy(Pair(RawBankTransactionsTable.id, SortOrder.ASC)).forEach {
- if (duplicatePayment(it)) {
- logger.warn("Incoming payment already seen")
- throw NexusError(
- HttpStatusCode.InternalServerError,
- "Incoming payment already seen"
- )
- }
// Incoming payment.
if (it.transactionType == "CRDT") {
if (CryptoUtil.checkValidEddsaPublicKey(it.unstructuredRemittanceInformation)) {
@@ -388,29 +408,25 @@ fun ingestTalerTransactions() {
}
// Outgoing payment
if (it.transactionType == "DBIT") {
+ logger.debug("Ingesting outgoing payment: ${it.unstructuredRemittanceInformation}")
var talerRequested = TalerRequestedPaymentEntity.find {
TalerRequestedPayments.wtid eq it.unstructuredRemittanceInformation
}.firstOrNull() ?: throw NexusError(
HttpStatusCode.InternalServerError,
"Payment '${it.unstructuredRemittanceInformation}' shows in history, but was never requested!"
)
- if (talerRequested != null) {
- talerRequested.rawConfirmed = it
- }
+ talerRequested.rawConfirmed = it
}
- /** WARNING: it is not guaranteed that the last processed raw
- * payment is ALSO the one with highest ID. A more accurate management
- * is needed. */
lastId = it.id.value
}
- facade.highestSeenMsgID = lastId
+ facadeState.highestSeenMsgID = lastId
}
// invoke ingestion for all the facades
transaction {
FacadeEntity.find {
FacadesTable.type eq "taler-wire-gateway"
}.forEach {
- val subscriberAccount = getFacadeBankAccount(it.creator)
+ val subscriberAccount = getTalerFacadeBankAccount(it.id.value)
ingest(subscriberAccount, it)
}
}
@@ -429,9 +445,8 @@ suspend fun historyOutgoing(call: ApplicationCall): Unit {
val history = TalerOutgoingHistory()
transaction {
val user = authenticateRequest(call.request)
-
/** Retrieve all the outgoing payments from the _clean Taler outgoing table_ */
- val subscriberBankAccount = getFacadeBankAccount(user)
+ val subscriberBankAccount = getTalerFacadeBankAccount(expectNonNull(call.parameters["fcid"]))
val reqPayments = TalerRequestedPaymentEntity.find {
TalerRequestedPayments.rawConfirmed.isNotNull() and startCmpOp
}.orderTaler(delta)
@@ -523,4 +538,4 @@ fun talerFacadeRoutes(route: Route) {
call.respondText("Hello, this is Taler Facade")
return@get
}
-}
+} \ No newline at end of file