1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
package tech.libeufin.nexus
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.http.*
import org.jetbrains.exposed.dao.flushCache
import org.jetbrains.exposed.sql.SortOrder
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.nexus.iso20022.CamtBankAccountEntry
import tech.libeufin.nexus.iso20022.CreditDebitIndicator
import tech.libeufin.nexus.iso20022.TransactionDetails
import tech.libeufin.nexus.server.NexusFacadeType
// Mainly used to resort the last processed transaction ID.
fun getFacadeState(fcid: String): FacadeStateEntity {
return transaction {
val facade = FacadeEntity.find {
FacadesTable.facadeName eq fcid
}.firstOrNull() ?: throw NexusError(
HttpStatusCode.NotFound,
"Could not find facade '${fcid}'"
)
FacadeStateEntity.find {
FacadeStateTable.facade eq facade.id.value
}.firstOrNull() ?: throw NexusError(
HttpStatusCode.NotFound,
"Could not find any state for facade: $fcid"
)
}
}
fun getFacadeBankAccount(fcid: String): NexusBankAccountEntity {
return transaction {
val facadeState = getFacadeState(fcid)
NexusBankAccountEntity.findByName(facadeState.bankAccount) ?: throw NexusError(
HttpStatusCode.NotFound,
"The facade: $fcid doesn't manage bank account: ${facadeState.bankAccount}"
)
}
}
/**
* Ingests transactions for those facades accounting for bankAccountId.
* 'incomingFilterCb' decides whether the facade accepts the payment;
* if not, refundCb prepares a refund. The 'txStatus' parameter decides
* at which state one transaction deserve to fuel Taler transactions. BOOK
* is conservative, and with some banks the delay can be significant. PNDG
* instead reacts faster, but risks that one transaction gets undone by the
* bank and never reach the BOOK state; this would mean a loss and/or admin
* burden.
*/
fun ingestFacadeTransactions(
bankAccountId: String,
facadeType: NexusFacadeType,
incomingFilterCb: ((NexusBankTransactionEntity, TransactionDetails) -> Unit)?,
refundCb: ((NexusBankAccountEntity, Long) -> Unit)?,
txStatus: EntryStatus = EntryStatus.BOOK
) {
fun ingest(bankAccount: NexusBankAccountEntity, facade: FacadeEntity) {
logger.debug(
"Ingesting transactions for Taler facade ${facade.id.value}," +
" and bank account: ${bankAccount.bankAccountName}"
)
val facadeState = getFacadeState(facade.facadeName)
var lastId = facadeState.highestSeenMessageSerialId
NexusBankTransactionEntity.find {
/** Those with "our" bank account involved */
NexusBankTransactionsTable.bankAccount eq bankAccount.id.value and
/** Those that are booked */
(NexusBankTransactionsTable.status eq txStatus) and
/** Those that came later than the latest processed payment */
(NexusBankTransactionsTable.id.greater(lastId))
}.orderBy(Pair(NexusBankTransactionsTable.id, SortOrder.ASC)).forEach {
// Incoming payment.
val tx = jacksonObjectMapper().readValue(
it.transactionJson,
CamtBankAccountEntry::class.java
)
/**
* Need transformer from "JSON tx" to TransactionDetails?.
*/
val details: TransactionDetails? = tx.batches?.get(0)?.batchTransactions?.get(0)?.details
if (details == null) {
logger.warn("A void money movement (${tx.accountServicerRef}) made it through the ingestion: VERY strange")
return@forEach
}
when (tx.creditDebitIndicator) {
CreditDebitIndicator.CRDT -> {
if (incomingFilterCb != null) {
incomingFilterCb(
it, // payment DB object
details // wire transfer details
)
}
}
else -> Unit
}
lastId = it.id.value
}
try {
if (refundCb != null) {
refundCb(
bankAccount,
facadeState.highestSeenMessageSerialId
)
}
} catch (e: Exception) {
logger.warn("Sending refund payment failed: ${e.message}")
}
facadeState.highestSeenMessageSerialId = lastId
}
// invoke ingestion for all the facades
transaction {
FacadeEntity.find { FacadesTable.type eq facadeType.facadeType }.forEach {
val facadeBankAccount = getFacadeBankAccount(it.facadeName)
if (facadeBankAccount.bankAccountName == bankAccountId)
ingest(facadeBankAccount, it)
flushCache()
}
}
}
|