/*
* This file is part of LibEuFin.
* Copyright (C) 2020 Taler Systems S.A.
*
* LibEuFin is free software; you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation; either version 3, or
* (at your option) any later version.
*
* LibEuFin is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
* Public License for more details.
*
* You should have received a copy of the GNU Affero General Public
* License along with LibEuFin; see the file COPYING. If not, see
*
*/
package tech.libeufin.nexus
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.server.application.ApplicationCall
import io.ktor.server.application.call
import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.content.TextContent
import io.ktor.http.*
import io.ktor.server.request.receive
import io.ktor.server.response.*
import io.ktor.server.routing.Route
import io.ktor.server.routing.get
import io.ktor.server.routing.post
import io.ktor.server.util.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.currentCoroutineContext
import org.jetbrains.exposed.dao.Entity
import org.jetbrains.exposed.dao.id.IdTable
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.nexus.bankaccount.addPaymentInitiation
import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
import tech.libeufin.nexus.bankaccount.getBankAccount
import tech.libeufin.nexus.iso20022.*
import tech.libeufin.nexus.server.*
import tech.libeufin.util.*
import java.net.URL
import kotlin.math.abs
import kotlin.math.min
/**
* Request body for "$TWG_BASE_URL/transfer".
*/
data class TalerTransferRequest(
val request_uid: String,
val amount: String,
val exchange_base_url: String,
val wtid: String,
val credit_account: String // payto://-format
)
data class TalerTransferResponse(
/**
* Point in time when Nexus put the payment instruction into the database.
*/
val timestamp: GnunetTimestamp,
val row_id: Long
)
/**
* History accounting data structures, typically
* used to build JSON responses.
*/
data class TalerIncomingBankTransaction(
val row_id: Long,
val date: GnunetTimestamp, // timestamp
val amount: String,
val credit_account: String, // payto form,
val debit_account: String,
val reserve_pub: String
)
data class TalerIncomingHistory(
var incoming_transactions: MutableList = mutableListOf()
)
data class TalerOutgoingBankTransaction(
val row_id: Long,
val date: GnunetTimestamp, // timestamp
val amount: String,
val credit_account: String, // payto form,
val debit_account: String,
val wtid: String,
val exchange_base_url: String
)
data class TalerOutgoingHistory(
var outgoing_transactions: MutableList = mutableListOf()
)
data class GnunetTimestamp(val t_s: Long)
/**
* Sort query results in descending order for negative deltas, and ascending otherwise.
*/
fun > SizedIterable.orderTaler(delta: Int): List {
return if (delta < 0) {
this.sortedByDescending { it.id }
} else {
this.sortedBy { it.id }
}
}
/** Builds the comparison operator for history entries based on the sign of 'delta' */
fun getComparisonOperator(delta: Int, start: Long, table: IdTable): Op {
return if (delta < 0) {
Expression.build {
table.id less start
}
} else {
Expression.build {
table.id greater start
}
}
}
fun expectLong(param: String?, allowNegative: Boolean = false): Long {
if (param == null) throw badRequest("'$param' is not Long")
val maybeLong = try { param.toLong() } catch (e: Exception) {
throw badRequest("'$param' is not Long")
}
if (!allowNegative && maybeLong < 0)
throw badRequest("Not expecting a negative: $param")
return maybeLong
}
// Helper handling 'start' being optional and its dependence on 'delta'.
fun handleStartArgument(start: String?, delta: Int): Long {
if (start == null) {
if (delta >= 0) return -1
return Long.MAX_VALUE
}
return expectLong(start)
}
/**
* The Taler layer cannot rely on the ktor-internal JSON-converter/responder,
* because this one adds a "charset" extra information in the Content-Type header
* that makes the GNUnet JSON parser unhappy.
*
* The workaround is to explicitly convert the 'data class'-object into a JSON
* string (what this function does), and use the simpler respondText method.
*/
fun customConverter(body: Any): String {
return jacksonObjectMapper().writeValueAsString(body)
}
/**
* Tries to extract a valid reserve public key from the raw subject line
*/
fun extractReservePubFromSubject(rawSubject: String): String? {
val re = "\\b[a-z0-9A-Z]{52}\\b".toRegex()
val result = re.find(rawSubject.replace("[\n]+".toRegex(), "")) ?: return null
return result.value.uppercase()
}
// Handle a Taler Wire Gateway /transfer request.
private suspend fun talerTransfer(call: ApplicationCall) {
val transferRequest = call.receive()
val amountObj = parseAmount(transferRequest.amount)
// FIXME: Right now we only parse the credit_account, should we also validate that it matches our account info?
// FIXME, another parse happens below; is this really useful here?
parsePayto(transferRequest.credit_account)
val facadeId = expectNonNull(call.parameters["fcid"])
val opaqueRowId = transaction {
call.request.requirePermission(PermissionQuery("facade", facadeId, "facade.talerwiregateway.transfer"))
val facade = FacadeEntity.find { FacadesTable.facadeName eq facadeId }.firstOrNull() ?: throw NexusError(
HttpStatusCode.NotFound,
"Could not find facade '${facadeId}'"
)
val creditorData = parsePayto(transferRequest.credit_account)
/** Checking the UID has the desired characteristics */
TalerRequestedPaymentEntity.find {
TalerRequestedPaymentsTable.requestUid eq transferRequest.request_uid
}.forEach {
if (
(it.amount != transferRequest.amount) or
(it.creditAccount != transferRequest.exchange_base_url) or
(it.wtid != transferRequest.wtid)
) {
throw NexusError(
HttpStatusCode.Conflict,
"This uid (${transferRequest.request_uid}) belongs to a different payment already"
)
}
}
val exchangeBankAccount = getFacadeBankAccount(facadeId)
val paymentSubject = "${transferRequest.wtid} ${transferRequest.exchange_base_url}"
val pain001 = addPaymentInitiation(
Pain001Data(
creditorIban = creditorData.iban,
creditorBic = creditorData.bic,
creditorName = creditorData.receiverName ?: throw NexusError(
HttpStatusCode.BadRequest, "Payto did not mention account owner"
),
subject = paymentSubject,
sum = amountObj.amount,
currency = amountObj.currency
),
exchangeBankAccount
)
logger.debug("Taler requests payment: ${transferRequest.wtid}")
val row = TalerRequestedPaymentEntity.new {
this.facade = facade
preparedPayment = pain001
exchangeBaseUrl = transferRequest.exchange_base_url
requestUid = transferRequest.request_uid
amount = transferRequest.amount
wtid = transferRequest.wtid
creditAccount = transferRequest.credit_account
}
row.id.value
}
return call.respond(
TextContent(
customConverter(
TalerTransferResponse(
/**
* Normally should point to the next round where the background
* routine will send new PAIN.001 data to the bank; work in progress..
*/
timestamp = GnunetTimestamp(System.currentTimeMillis() / 1000L),
row_id = opaqueRowId
)
),
ContentType.Application.Json
)
)
}
// Processes new transactions and stores TWG-specific data in
fun talerFilter(
payment: NexusBankTransactionEntity,
txDtls: TransactionDetails
) {
val channelsToNotify = mutableListOf()
var isInvalid = false // True when pub is invalid or duplicate.
val subject = txDtls.unstructuredRemittanceInformation
val debtorName = txDtls.debtor?.name
if (debtorName == null) {
logger.warn("empty debtor name")
return
}
val debtorAcct = txDtls.debtorAccount
if (debtorAcct == null) {
// FIXME: Report payment, we can't even send it back
logger.warn("empty debtor account")
return
}
val debtorIban = debtorAcct.iban
if (debtorIban == null) {
// FIXME: Report payment, we can't even send it back
logger.warn("non-iban debtor account")
return
}
val debtorAgent = txDtls.debtorAgent
if (debtorAgent == null) {
// FIXME: Report payment, we can't even send it back
logger.warn("missing debtor agent")
return
}
if (debtorAgent.bic == null) {
logger.warn("Not allowing transactions missing the BIC. IBAN and name: ${debtorIban}, $debtorName")
return
}
val reservePub = extractReservePubFromSubject(subject)
if (reservePub == null) {
logger.warn("could not find reserve pub in remittance information")
TalerInvalidIncomingPaymentEntity.new {
this.payment = payment
timestampMs = System.currentTimeMillis()
}
// Will be paid back by the refund handler.
return
}
// Check if reserve_pub was used already
val maybeExist = TalerIncomingPaymentEntity.find {
TalerIncomingPaymentsTable.reservePublicKey eq reservePub
}.firstOrNull()
if (maybeExist != null) {
val msg = "Reserve pub '$reservePub' was used already"
logger.info(msg)
isInvalid = true
}
if (!CryptoUtil.checkValidEddsaPublicKey(reservePub)) {
logger.info("invalid public key detected")
isInvalid = true
}
if (isInvalid) {
TalerInvalidIncomingPaymentEntity.new {
this.payment = payment
timestampMs = System.currentTimeMillis()
}
// Will be paid back by the refund handler.
return
}
TalerIncomingPaymentEntity.new {
this.payment = payment
reservePublicKey = reservePub
timestampMs = System.currentTimeMillis()
debtorPaytoUri = buildIbanPaytoUri(
debtorIban,
debtorAgent.bic,
debtorName
)
}
val dbTx = TransactionManager.currentOrNull() ?: throw NexusError(
HttpStatusCode.InternalServerError,
"talerFilter(): unexpected execution out of a DB transaction"
)
// Only supporting Postgres' NOTIFY.
if (dbTx.isPostgres()) {
val channelName = buildChannelName(
NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
payment.bankAccount.iban
)
logger.debug("NOTIFYing on domain" +
" ${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
" for IBAN: ${payment.bankAccount.iban}. Resulting channel" +
" name: $channelName.")
dbTx.postgresNotify(channelName)
}
}
fun maybeTalerRefunds(bankAccount: NexusBankAccountEntity, lastSeenId: Long) {
logger.debug(
"Searching refundable payments of account: ${bankAccount.bankAccountName}," +
" after last seen transaction id: $lastSeenId"
)
transaction {
TalerInvalidIncomingPaymentsTable.innerJoin(
NexusBankTransactionsTable,
{ NexusBankTransactionsTable.id },
{ TalerInvalidIncomingPaymentsTable.payment }
).select {
/**
* Finds Taler-invalid incoming payments that weren't refunded
* yet and are newer than those processed along the last round.
*/
TalerInvalidIncomingPaymentsTable.refunded eq false and
(NexusBankTransactionsTable.bankAccount eq bankAccount.id.value) and
(NexusBankTransactionsTable.id greater lastSeenId)
}.forEach {
// For each of them, extracts the wire details to reuse in the refund.
val paymentData = jacksonObjectMapper().readValue(
it[NexusBankTransactionsTable.transactionJson],
CamtBankAccountEntry::class.java
)
if (paymentData.batches == null) {
logger.error(
"Empty wire details encountered in transaction with" +
" AcctSvcrRef: ${paymentData.accountServicerRef}." +
" Taler can't refund."
)
throw NexusError(
HttpStatusCode.InternalServerError,
"Unexpected void payment, cannot refund"
)
}
val debtorAccount = paymentData.batches[0].batchTransactions[0].details.debtorAccount
if (debtorAccount?.iban == null) {
logger.error("Could not find a IBAN to refund in transaction (AcctSvcrRef): ${paymentData.accountServicerRef}, aborting refund")
throw NexusError(HttpStatusCode.InternalServerError, "IBAN to refund not found")
}
val debtorAgent = paymentData.batches[0].batchTransactions[0].details.debtorAgent
if (debtorAgent?.bic == null) {
logger.error("Could not find the BIC of refundable IBAN at transaction (AcctSvcrRef): ${paymentData.accountServicerRef}, aborting refund")
throw NexusError(HttpStatusCode.InternalServerError, "BIC to refund not found")
}
val debtorPerson = paymentData.batches[0].batchTransactions[0].details.debtor
if (debtorPerson?.name == null) {
logger.error("Could not find the owner's name of refundable IBAN at transaction (AcctSvcrRef): ${paymentData.accountServicerRef}, aborting refund")
throw NexusError(HttpStatusCode.InternalServerError, "Name to refund not found")
}
// FIXME: investigate this amount!
val amount = paymentData.batches[0].batchTransactions[0].amount
NexusAssert(
it[NexusBankTransactionsTable.creditDebitIndicator] == "CRDT" &&
it[NexusBankTransactionsTable.bankAccount] == bankAccount.id,
"Cannot refund a _outgoing_ payment!"
)
// FIXME #7116
addPaymentInitiation(
Pain001Data(
creditorIban = debtorAccount.iban,
creditorBic = debtorAgent.bic,
creditorName = debtorPerson.name,
subject = "Taler refund of: ${paymentData.batches[0].batchTransactions[0].details.unstructuredRemittanceInformation}",
sum = amount.value,
currency = amount.currency
),
bankAccount // the Exchange bank account.
)
logger.debug("Refund of transaction (AcctSvcrRef): ${paymentData.accountServicerRef} got prepared")
it[TalerInvalidIncomingPaymentsTable.refunded] = true
}
}
}
/**
* Handle a /taler/history/outgoing request.
*/
private suspend fun historyOutgoing(call: ApplicationCall) {
val facadeId = expectNonNull(call.parameters["fcid"])
call.request.requirePermission(PermissionQuery("facade", facadeId, "facade.talerwiregateway.history"))
val param = call.expectUrlParameter("delta")
val delta: Int = try {
param.toInt()
} catch (e: Exception) {
throw EbicsProtocolError(HttpStatusCode.BadRequest, "'${param}' is not Int")
}
val start: Long = handleStartArgument(call.request.queryParameters["start"], delta)
val startCmpOp = getComparisonOperator(delta, start, TalerRequestedPaymentsTable)
/* retrieve database elements */
val history = TalerOutgoingHistory()
transaction {
/** Retrieve all the outgoing payments from the _clean Taler outgoing table_ */
val subscriberBankAccount = getFacadeBankAccount(facadeId)
val reqPayments = mutableListOf()
val reqPaymentsWithUnconfirmed = TalerRequestedPaymentEntity.find {
startCmpOp
}.orderTaler(delta)
reqPaymentsWithUnconfirmed.forEach {
if (it.preparedPayment.confirmationTransaction != null) {
reqPayments.add(it)
}
}
if (reqPayments.isNotEmpty()) {
reqPayments.subList(0, min(abs(delta), reqPayments.size)).forEach {
history.outgoing_transactions.add(
TalerOutgoingBankTransaction(
row_id = it.id.value,
amount = it.amount,
wtid = it.wtid,
date = GnunetTimestamp(it.preparedPayment.preparationDate / 1000L),
credit_account = it.creditAccount,
debit_account = buildIbanPaytoUri(
subscriberBankAccount.iban,
subscriberBankAccount.bankCode,
subscriberBankAccount.accountHolder,
),
exchange_base_url = it.exchangeBaseUrl
)
)
}
}
}
if (history.outgoing_transactions.size == 0) {
call.respond(HttpStatusCode.NoContent)
return
}
call.respond(
status = HttpStatusCode.OK,
TextContent(customConverter(history), ContentType.Application.Json)
)
}
// Handle a /taler-wire-gateway/history/incoming request.
private suspend fun historyIncoming(call: ApplicationCall) {
val facadeId = expectNonNull(call.parameters["fcid"])
call.request.requirePermission(
PermissionQuery(
"facade",
facadeId,
"facade.talerwiregateway.history"
)
)
val longPollTimeoutPar = call.parameters["long_poll_ms"]
val longPollTimeout = if (longPollTimeoutPar != null) {
val longPollTimeoutValue = try { longPollTimeoutPar.toLong() }
catch (e: Exception) {
throw badRequest("long_poll_ms value is invalid")
}
longPollTimeoutValue
} else null
val param = call.expectUrlParameter("delta")
val delta: Int = try { param.toInt() } catch (e: Exception) {
throw EbicsProtocolError(HttpStatusCode.BadRequest, "'${param}' is not Int")
}
val start: Long = handleStartArgument(call.request.queryParameters["start"], delta)
val history = TalerIncomingHistory()
val startCmpOp = getComparisonOperator(delta, start, TalerIncomingPaymentsTable)
val listenHandle: PostgresListenHandle? = if (isPostgres() && longPollTimeout != null) {
val notificationChannelName = buildChannelName(
NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
getFacadeBankAccount(facadeId).iban
)
val handle = PostgresListenHandle(channelName = notificationChannelName)
handle.postgresListen()
handle
} else null
/**
* NOTE: the LISTEN command MAY also go inside this transaction,
* but that uses a connection other than the one provided by the
* transaction block. More facts on the consequences are needed.
*/
var result: List = transaction {
TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta)
}
if (result.isNotEmpty() && listenHandle != null)
listenHandle.postgresUnlisten()
if (result.isEmpty() && listenHandle != null && longPollTimeout != null) {
logger.debug("Waiting for NOTIFY on channel ${listenHandle.channelName}," +
" with timeout: $longPollTimeoutPar ms")
val notificationArrived = coroutineScope {
async(Dispatchers.IO) {
listenHandle.postgresGetNotifications(longPollTimeout)
}.await()
}
if (notificationArrived) {
/**
* NOTE: the query can still have zero results despite the
* notification. That happens when the 'start' URI param is
* higher than the ID of the new row in the database. Not
* an error.
*/
result = transaction {
// addLogger(StdOutSqlLogger)
TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta)
}
}
}
/**
* Whether because of a timeout or a notification or of never slept, here it
* proceeds to the response (== resultOrWait.first IS EFFECTIVE).
*/
val maybeNewPayments = result
if (maybeNewPayments.isNotEmpty()) {
transaction {
maybeNewPayments.subList(
0,
min(abs(delta), maybeNewPayments.size)
).forEach {
history.incoming_transactions.add(
TalerIncomingBankTransaction(
// Rounded timestamp
date = GnunetTimestamp(it.timestampMs / 1000L),
row_id = it.id.value,
amount = "${it.payment.currency}:${it.payment.amount}",
reserve_pub = it.reservePublicKey,
credit_account = buildIbanPaytoUri(
it.payment.bankAccount.iban,
it.payment.bankAccount.bankCode,
it.payment.bankAccount.accountHolder,
),
debit_account = it.debtorPaytoUri
)
)
}
}
}
if (history.incoming_transactions.size == 0) {
call.respond(HttpStatusCode.NoContent)
return
}
return call.respond(
status = HttpStatusCode.OK,
TextContent(customConverter(history), ContentType.Application.Json)
)
}
/**
* This call proxies /admin/add/incoming to the Sandbox,
* which is the service keeping the transactions ledger.
* The credentials are ASSUMED to be exchange/x (user/pass).
*
* In the future, a dedicate "add-incoming" facade should
* be provided, offering the mean to store the credentials
* at configuration time.
*/
private suspend fun addIncoming(call: ApplicationCall) {
val facadeId = ensureNonNull(call.parameters["fcid"])
val currentBody = call.receive()
val fromDb = transaction {
val f = FacadeEntity.findByName(facadeId) ?: throw notFound("facade $facadeId not found")
val facadeState = FacadeStateEntity.find {
FacadeStateTable.facade eq f.id
}.firstOrNull() ?: throw internalServerError("facade $facadeId has no state!")
val conn = NexusBankConnectionEntity.findByName(facadeState.bankConnection) ?: throw internalServerError(
"state of facade $facadeId has no bank connection!"
)
val ebicsData = NexusEbicsSubscribersTable.select {
NexusEbicsSubscribersTable.nexusBankConnection eq conn.id
}.firstOrNull() ?: throw internalServerError(
"Connection '${conn.connectionId}' doesn't have EBICS"
)
// Resort Sandbox URL from EBICS endpoint.
val sandboxUrl = URL(ebicsData[NexusEbicsSubscribersTable.ebicsURL])
// NOTE: the exchange username must be 'exchange', at the Sandbox.
return@transaction Pair(
url {
protocol = URLProtocol(sandboxUrl.protocol, 80)
host = sandboxUrl.host
if (sandboxUrl.port != 80)
port = sandboxUrl.port
path(
"demobanks",
"default",
"taler-wire-gateway",
"exchange",
"admin",
"add-incoming"
)
},
facadeState.bankAccount
)
}
val client = HttpClient { followRedirects = true }
try {
client.post(fromDb.first) {
setBody(currentBody)
basicAuth("exchange", "x")
contentType(ContentType.Application.Json)
}
} catch (e: ClientRequestException) {
logger.error("Proxying /admin/add/incoming to the Sandbox failed: $e")
} catch (e: Exception) {
logger.error("Could not proxy /admin/add/incoming to the Sandbox: $e")
}
/**
* At this point, Sandbox booked the payment. Now the "row_id"
* value to put in the response needs to be resorted; that may
* be known by fetching a fresh C52 report, then let Nexus ingest
* the result, and finally _optimistically_ pick the latest entry
* in the received payments. */
fetchBankAccountTransactions(
client,
FetchSpecLatestJson(
FetchLevel.REPORT,
null
),
fromDb.second
)
/**
* The latest incoming payment should now be found among
* the ingested ones.
*/
val lastIncomingPayment = transaction {
val lastRecord = TalerIncomingPaymentEntity.all().last()
return@transaction Pair(lastRecord.id.value, lastRecord.timestampMs)
}
call.respond(object {
val row_id = lastIncomingPayment.first
val timestamp = GnunetTimestamp(lastIncomingPayment.second / 1000L)
})
}
private fun getCurrency(facadeName: String): String {
return transaction {
getFacadeState(facadeName).currency
}
}
fun talerFacadeRoutes(route: Route) {
route.get("/config") {
val facadeId = ensureNonNull(call.parameters["fcid"])
call.request.requirePermission(
PermissionQuery("facade", facadeId, "facade.talerwiregateway.transfer"),
PermissionQuery("facade", facadeId, "facade.talerwiregateway.history")
)
call.respond(object {
val version = "0.0.0"
val name = "taler-wire-gateway"
val currency = getCurrency(facadeId)
})
return@get
}
route.post("/transfer") {
talerTransfer(call)
return@post
}
route.get("/history/outgoing") {
historyOutgoing(call)
return@get
}
route.get("/history/incoming") {
historyIncoming(call)
return@get
}
route.post("/admin/add-incoming") {
addIncoming(call)
return@post
}
route.get("") {
call.respondText("Hello, this is a Taler Facade")
return@get
}
}