libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

notifications.kt (3305B)


      1 /*
      2  * This file is part of LibEuFin.
      3  * Copyright (C) 2024 Taler Systems S.A.
      4  *
      5  * LibEuFin is free software; you can redistribute it and/or modify
      6  * it under the terms of the GNU Affero General Public License as
      7  * published by the Free Software Foundation; either version 3, or
      8  * (at your option) any later version.
      9  *
     10  * LibEuFin is distributed in the hope that it will be useful, but
     11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
     12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General
     13  * Public License for more details.
     14  *
     15  * You should have received a copy of the GNU Affero General Public
     16  * License along with LibEuFin; see the file COPYING.  If not, see
     17  * <http://www.gnu.org/licenses/>
     18  */
     19 
     20 package tech.libeufin.common.db
     21 
     22 import kotlinx.coroutines.delay
     23 import kotlinx.coroutines.flow.Flow
     24 import kotlinx.coroutines.flow.MutableSharedFlow
     25 import kotlinx.coroutines.runBlocking
     26 import org.postgresql.ds.PGSimpleDataSource
     27 import org.slf4j.Logger
     28 import tech.libeufin.common.ExpoBackoffDecorr
     29 import tech.libeufin.common.fmtLog
     30 import java.util.concurrent.ConcurrentHashMap
     31 
     32 // SharedFlow that are manually counted for manual garbage collection
     33 class CountedSharedFlow<T> {
     34     val flow: MutableSharedFlow<T> = MutableSharedFlow()
     35     var count: Int = 0
     36 }
     37 
     38 fun watchNotifications(
     39     pgSource: PGSimpleDataSource, 
     40     schema: String,
     41     logger: Logger,
     42     listeners: Map<String, (suspend (String) -> Unit)>
     43 ) {
     44     val backoff = ExpoBackoffDecorr()
     45     // Run notification logic in a separated thread
     46     kotlin.concurrent.thread(isDaemon = true) {
     47         runBlocking {
     48             while (true) {
     49                 try {
     50                     val conn = pgSource.pgConnection(schema)
     51 
     52                     // Listen to all notifications channels
     53                     for (channel in listeners.keys) {
     54                         conn.execSQLUpdate("LISTEN $channel")
     55                     }
     56 
     57                     backoff.reset()
     58 
     59                     while (true) {
     60                         conn.getNotifications(0) // Block until we receive at least one notification
     61                             .forEach {
     62                             // Dispatch
     63                             try {
     64                                 listeners[it.name]!!(it.parameter)
     65                             } catch (e: Exception) {
     66                                 throw Exception("channel ${it.name} with input '${it.parameter}'", e)
     67                             }
     68                         }
     69                     }
     70                 } catch (e: Exception) {
     71                     e.fmtLog(logger)
     72                     delay(backoff.next())
     73                 }
     74             }
     75         }
     76     }
     77 }
     78 
     79 /** Listen to flow from [map] for [key] using [lambda]*/
     80 suspend fun <R, K, V> listen(map: ConcurrentHashMap<K, CountedSharedFlow<V>>, key: K, lambda: suspend (Flow<V>) -> R): R {
     81     // Register listener, create a new flow if missing
     82     val flow = map.compute(key) { _, v ->
     83         val tmp = v ?: CountedSharedFlow()
     84         tmp.count++
     85         tmp
     86     }!!.flow
     87 
     88     try {
     89         return lambda(flow)
     90     } finally {
     91         // Unregister listener, removing unused flow
     92         map.compute(key) { _, v ->
     93             v!!
     94             v.count--
     95             if (v.count > 0) v else null
     96         }
     97     }
     98 }