helpers.kt (4885B)
1 /* 2 * This file is part of LibEuFin. 3 * Copyright (C) 2024-2025 Taler Systems S.A. 4 * 5 * LibEuFin is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation; either version 3, or 8 * (at your option) any later version. 9 * 10 * LibEuFin is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General 13 * Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public 16 * License along with LibEuFin; see the file COPYING. If not, see 17 * <http://www.gnu.org/licenses/> 18 */ 19 20 package tech.libeufin.common.db 21 22 import kotlinx.coroutines.coroutineScope 23 import kotlinx.coroutines.flow.Flow 24 import kotlinx.coroutines.flow.first 25 import kotlinx.coroutines.launch 26 import kotlinx.coroutines.withTimeoutOrNull 27 import tech.libeufin.common.HistoryParams 28 import tech.libeufin.common.PageParams 29 import java.sql.PreparedStatement 30 import java.sql.ResultSet 31 import kotlin.math.abs 32 33 /** Apply paging logic to a sql query */ 34 suspend fun <T> DbPool.page( 35 params: PageParams, 36 idName: String, 37 query: String, 38 args: TalerStatement.() -> Unit = {}, 39 map: (ResultSet) -> T 40 ): List<T> { 41 val backward = params.limit < 0 42 val pageQuery = """ 43 $query 44 $idName ${if (backward) '<' else '>'} ? 45 ORDER BY $idName ${if (backward) "DESC" else "ASC"} 46 LIMIT ? 47 """ 48 return serializable(pageQuery) { 49 args() 50 bind(params.offset) 51 bind(abs(params.limit)) 52 all { map(it) } 53 } 54 } 55 56 /** 57 * The following function returns the list of transactions, according 58 * to the history parameters and perform long polling when necessary 59 */ 60 suspend fun <T> DbPool.poolHistory( 61 params: HistoryParams, 62 bankAccountId: Long, 63 listen: suspend (Long, suspend (Flow<Long>) -> List<T>) -> List<T>, 64 query: String, 65 accountColumn: String = "bank_account_id", 66 map: (ResultSet) -> T 67 ): List<T> { 68 69 suspend fun load(): List<T> = page( 70 params.page, 71 "bank_transaction_id", 72 "$query $accountColumn=? AND", 73 { 74 bind(bankAccountId) 75 }, 76 map 77 ) 78 79 // When going backward there is always at least one transaction or none 80 return if (params.page.limit >= 0 && params.polling.timeout_ms > 0) { 81 listen(bankAccountId) { flow -> 82 coroutineScope { 83 // Start buffering notification before loading transactions to not miss any 84 val polling = launch { 85 withTimeoutOrNull(params.polling.timeout_ms) { 86 flow.first { it > params.page.offset } // Always forward so > 87 } 88 } 89 // Initial loading 90 val init = load() 91 // Long polling if we found no transactions 92 if (init.isEmpty()) { 93 if (polling.join() != null) { 94 load() 95 } else { 96 init 97 } 98 } else { 99 polling.cancel() 100 init 101 } 102 } 103 } 104 } else { 105 load() 106 } 107 } 108 109 /** 110 * The following function returns the list of transactions, according 111 * to the history parameters and perform long polling when necessary 112 */ 113 suspend fun <T> DbPool.poolHistoryGlobal( 114 params: HistoryParams, 115 listen: suspend (suspend (Flow<Long>) -> List<T>) -> List<T>, 116 query: String, 117 idColumnValue: String, 118 map: (ResultSet) -> T 119 ): List<T> { 120 121 suspend fun load(): List<T> = page( 122 params.page, 123 idColumnValue, 124 query, 125 map=map 126 ) 127 128 // When going backward there is always at least one transaction or none 129 return if (params.page.limit >= 0 && params.polling.timeout_ms > 0) { 130 listen { flow -> 131 coroutineScope { 132 // Start buffering notification before loading transactions to not miss any 133 val polling = launch { 134 withTimeoutOrNull(params.polling.timeout_ms) { 135 flow.first { it > params.page.offset } // Always forward so > 136 } 137 } 138 // Initial loading 139 val init = load() 140 // Long polling if we found no transactions 141 if (init.isEmpty()) { 142 if (polling.join() != null) { 143 load() 144 } else { 145 init 146 } 147 } else { 148 polling.cancel() 149 init 150 } 151 } 152 } 153 } else { 154 load() 155 } 156 }