libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

helpers.kt (4885B)


      1 /*
      2  * This file is part of LibEuFin.
      3  * Copyright (C) 2024-2025 Taler Systems S.A.
      4  *
      5  * LibEuFin is free software; you can redistribute it and/or modify
      6  * it under the terms of the GNU Affero General Public License as
      7  * published by the Free Software Foundation; either version 3, or
      8  * (at your option) any later version.
      9  *
     10  * LibEuFin is distributed in the hope that it will be useful, but
     11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
     12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General
     13  * Public License for more details.
     14  *
     15  * You should have received a copy of the GNU Affero General Public
     16  * License along with LibEuFin; see the file COPYING.  If not, see
     17  * <http://www.gnu.org/licenses/>
     18  */
     19 
     20 package tech.libeufin.common.db
     21 
     22 import kotlinx.coroutines.coroutineScope
     23 import kotlinx.coroutines.flow.Flow
     24 import kotlinx.coroutines.flow.first
     25 import kotlinx.coroutines.launch
     26 import kotlinx.coroutines.withTimeoutOrNull
     27 import tech.libeufin.common.HistoryParams
     28 import tech.libeufin.common.PageParams
     29 import java.sql.PreparedStatement
     30 import java.sql.ResultSet
     31 import kotlin.math.abs
     32 
     33 /** Apply paging logic to a sql query */
     34 suspend fun <T> DbPool.page(
     35     params: PageParams,
     36     idName: String,
     37     query: String,
     38     args: TalerStatement.() -> Unit = {},
     39     map: (ResultSet) -> T
     40 ): List<T> {
     41     val backward = params.limit < 0
     42     val pageQuery = """
     43         $query
     44         $idName ${if (backward) '<' else '>'} ?
     45         ORDER BY $idName ${if (backward) "DESC" else "ASC"}
     46         LIMIT ?
     47     """
     48     return serializable(pageQuery) {
     49         args()
     50         bind(params.offset)
     51         bind(abs(params.limit))
     52         all { map(it) }
     53     }
     54 }
     55 
     56 /**
     57 * The following function returns the list of transactions, according
     58 * to the history parameters and perform long polling when necessary
     59 */
     60 suspend fun <T> DbPool.poolHistory(
     61     params: HistoryParams, 
     62     bankAccountId: Long,
     63     listen: suspend (Long, suspend (Flow<Long>) -> List<T>) -> List<T>,
     64     query: String,
     65     accountColumn: String = "bank_account_id",
     66     map: (ResultSet) -> T
     67 ): List<T> {
     68 
     69     suspend fun load(): List<T> = page(
     70         params.page, 
     71         "bank_transaction_id", 
     72         "$query $accountColumn=? AND", 
     73         {
     74             bind(bankAccountId)
     75         },
     76         map
     77     )
     78     
     79     // When going backward there is always at least one transaction or none
     80     return if (params.page.limit >= 0 && params.polling.timeout_ms > 0) {
     81         listen(bankAccountId) { flow ->
     82             coroutineScope {
     83                 // Start buffering notification before loading transactions to not miss any
     84                 val polling = launch {
     85                     withTimeoutOrNull(params.polling.timeout_ms) {
     86                         flow.first { it > params.page.offset } // Always forward so >
     87                     }
     88                 }    
     89                 // Initial loading
     90                 val init = load()
     91                 // Long polling if we found no transactions
     92                 if (init.isEmpty()) {
     93                     if (polling.join() != null) {
     94                         load()
     95                     } else {
     96                         init
     97                     }
     98                 } else {
     99                     polling.cancel()
    100                     init
    101                 }
    102             }
    103         }
    104     } else {
    105         load()
    106     }
    107 }
    108 
    109 /**
    110 * The following function returns the list of transactions, according
    111 * to the history parameters and perform long polling when necessary
    112 */
    113 suspend fun <T> DbPool.poolHistoryGlobal(
    114     params: HistoryParams, 
    115     listen: suspend (suspend (Flow<Long>) -> List<T>) -> List<T>,
    116     query: String,
    117     idColumnValue: String,
    118     map: (ResultSet) -> T
    119 ): List<T> {
    120 
    121     suspend fun load(): List<T> = page(
    122         params.page, 
    123         idColumnValue,
    124         query,
    125         map=map
    126     )
    127 
    128     // When going backward there is always at least one transaction or none
    129     return if (params.page.limit >= 0 && params.polling.timeout_ms > 0) {
    130         listen { flow ->
    131             coroutineScope {
    132                 // Start buffering notification before loading transactions to not miss any
    133                 val polling = launch {
    134                     withTimeoutOrNull(params.polling.timeout_ms) {
    135                         flow.first { it > params.page.offset } // Always forward so >
    136                     }
    137                 }    
    138                 // Initial loading
    139                 val init = load()
    140                 // Long polling if we found no transactions
    141                 if (init.isEmpty()) {
    142                     if (polling.join() != null) {
    143                         load()
    144                     } else {
    145                         init
    146                     }
    147                 } else {
    148                     polling.cancel()
    149                     init
    150                 }
    151             }
    152         }
    153     } else {
    154         load()
    155     }
    156 }