/* This file is part of TALER Copyright (C) 2024 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ /** * @file taler-merchant-depositcheck.c * @brief Process that inquires with the exchange for deposits that should have been wired * @author Christian Grothoff */ #include "platform.h" #include #include #include #include "taler_merchantdb_lib.h" #include "taler_merchantdb_plugin.h" #include /** * How many requests do we make at most in parallel to the same exchange? */ #define CONCURRENCY_LIMIT 32 /** * Information we keep per exchange. */ struct Child { /** * Kept in a DLL. */ struct Child *next; /** * Kept in a DLL. */ struct Child *prev; /** * The child process. */ struct GNUNET_OS_Process *process; /** * Wait handle. */ struct GNUNET_ChildWaitHandle *cwh; /** * Which exchange is this state for? */ char *base_url; /** * Task to restart the child. */ struct GNUNET_SCHEDULER_Task *rt; /** * When should the child be restarted at the earliest? */ struct GNUNET_TIME_Absolute next_start; /** * Current minimum delay between restarts, grows * exponentially if child exits before this time. */ struct GNUNET_TIME_Relative rd; }; /** * Information we keep per exchange interaction. */ struct ExchangeInteraction { /** * Kept in a DLL. */ struct ExchangeInteraction *next; /** * Kept in a DLL. */ struct ExchangeInteraction *prev; /** * Handle for exchange interaction. */ struct TALER_EXCHANGE_DepositGetHandle *dgh; /** * Wire deadline for the deposit. */ struct GNUNET_TIME_Absolute wire_deadline; /** * Target account hash of the deposit. */ struct TALER_MerchantWireHashP h_wire; /** * Deposited amount. */ struct TALER_Amount amount_with_fee; /** * Deposit fee paid. */ struct TALER_Amount deposit_fee; /** * Public key of the deposited coin. */ struct TALER_CoinSpendPublicKeyP coin_pub; /** * Hash over the @e contract_terms. */ struct TALER_PrivateContractHashP h_contract_terms; /** * Merchant instance's private key. */ struct TALER_MerchantPrivateKeyP merchant_priv; /** * Serial number of the row in the deposits table * that we are processing. */ uint64_t deposit_serial; /** * The instance the deposit belongs to. */ char *instance_id; }; /** * Head of list of children we forked. */ static struct Child *c_head; /** * Tail of list of children we forked. */ static struct Child *c_tail; /** * Key material of the exchange. */ static struct TALER_EXCHANGE_Keys *keys; /** * Handle for active /keys request. */ static struct TALER_EXCHANGE_GetKeysHandle *gkh; /** * Head of list of active exchange interactions. */ static struct ExchangeInteraction *w_head; /** * Tail of list of active exchange interactions. */ static struct ExchangeInteraction *w_tail; /** * Number of active entries in the @e w_head list. */ static uint64_t w_count; /** * Notification handler from database on new work. */ static struct GNUNET_DB_EventHandler *eh; /** * The merchant's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Name of the configuration file we use. */ static char *cfg_filename; /** * Our database plugin. */ static struct TALER_MERCHANTDB_Plugin *db_plugin; /** * Next wire deadline that @e task is scheduled for. */ static struct GNUNET_TIME_Absolute next_deadline; /** * Next task to run, if any. */ static struct GNUNET_SCHEDULER_Task *task; /** * Handle to the context for interacting with the exchange. */ static struct GNUNET_CURL_Context *ctx; /** * Scheduler context for running the @e ctx. */ static struct GNUNET_CURL_RescheduleContext *rc; /** * Which exchange are we monitoring? NULL if we * are the parent of the workers. */ static char *exchange_url; /** * Value to return from main(). 0 on success, non-zero on errors. */ static int global_ret; /** * #GNUNET_YES if we are in test mode and should exit when idle. */ static int test_mode; /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. * * @param cls closure */ static void shutdown_task (void *cls) { struct Child *c; struct ExchangeInteraction *w; (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); if (NULL != eh) { db_plugin->event_listen_cancel (eh); eh = NULL; } if (NULL != task) { GNUNET_SCHEDULER_cancel (task); task = NULL; } if (NULL != gkh) { TALER_EXCHANGE_get_keys_cancel (gkh); gkh = NULL; } while (NULL != (w = w_head)) { GNUNET_CONTAINER_DLL_remove (w_head, w_tail, w); if (NULL != w->dgh) { TALER_EXCHANGE_deposits_get_cancel (w->dgh); w->dgh = NULL; } w_count--; GNUNET_free (w->instance_id); GNUNET_free (w); } while (NULL != (c = c_head)) { GNUNET_CONTAINER_DLL_remove (c_head, c_tail, c); if (NULL != c->rt) { GNUNET_SCHEDULER_cancel (c->rt); c->rt = NULL; } if (NULL != c->cwh) { GNUNET_wait_child_cancel (c->cwh); c->cwh = NULL; } if (NULL != c->process) { enum GNUNET_OS_ProcessStatusType type = GNUNET_OS_PROCESS_UNKNOWN; unsigned long code = 0; GNUNET_break (0 == GNUNET_OS_process_kill (c->process, SIGTERM)); GNUNET_break (GNUNET_OK == GNUNET_OS_process_wait_status (c->process, &type, &code)); if ( (GNUNET_OS_PROCESS_EXITED != type) || (0 != code) ) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Process for exchange %s had trouble (%d/%d)\n", c->base_url, (int) type, (int) code); GNUNET_OS_process_destroy (c->process); } GNUNET_free (c->base_url); GNUNET_free (c); } if (NULL != db_plugin) { db_plugin->rollback (db_plugin->cls); /* just in case */ TALER_MERCHANTDB_plugin_unload (db_plugin); db_plugin = NULL; } cfg = NULL; if (NULL != ctx) { GNUNET_CURL_fini (ctx); ctx = NULL; } if (NULL != rc) { GNUNET_CURL_gnunet_rc_destroy (rc); rc = NULL; } } /** * Task to get more deposits to work on from the database. * * @param cls NULL */ static void select_work (void *cls); /** * Make sure to run the select_work() task at * the @a next_deadline. * * @param deadline time when work becomes ready */ static void run_at (struct GNUNET_TIME_Absolute deadline) { if ( (NULL != task) && (GNUNET_TIME_absolute_cmp (deadline, >, next_deadline)) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Not scheduling for %s yet, already have earlier task pending\n", GNUNET_TIME_absolute2s (deadline)); return; } if (NULL == keys) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Not scheduling for %s yet, no /keys available\n", GNUNET_TIME_absolute2s (deadline)); return; /* too early */ } if (NULL != task) GNUNET_SCHEDULER_cancel (task); next_deadline = deadline; task = GNUNET_SCHEDULER_add_at (deadline, &select_work, NULL); } /** * Function called with detailed wire transfer data. * * @param cls closure with a `struct ExchangeInteraction *` * @param dr HTTP response data */ static void deposit_get_cb (void *cls, const struct TALER_EXCHANGE_GetDepositResponse *dr) { struct ExchangeInteraction *w = cls; switch (dr->hr.http_status) { case MHD_HTTP_OK: { enum GNUNET_DB_QueryStatus qs; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Exchange returned wire transfer over %s for deposited coin %s\n", TALER_amount2s (&dr->details.ok.coin_contribution), TALER_B2S (&w->coin_pub)); qs = db_plugin->insert_deposit_to_transfer (db_plugin->cls, w->deposit_serial, &dr->details.ok); if (qs < 0) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); return; } break; } case MHD_HTTP_ACCEPTED: { /* got a 'preliminary' reply from the exchange, remember our target UUID */ enum GNUNET_DB_QueryStatus qs; struct GNUNET_TIME_Timestamp now; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Exchange returned KYC requirement (%d/%d) for deposited coin %s\n", dr->details.accepted.kyc_ok, dr->details.accepted.aml_decision, TALER_B2S (&w->coin_pub)); now = GNUNET_TIME_timestamp_get (); qs = db_plugin->account_kyc_set_status ( db_plugin->cls, w->instance_id, &w->h_wire, exchange_url, dr->details.accepted.requirement_row, NULL, NULL, now, dr->details.accepted.kyc_ok, dr->details.accepted.aml_decision); if (qs < 0) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); return; } if (dr->details.accepted.kyc_ok && (TALER_AML_NORMAL == dr->details.accepted.aml_decision)) { struct GNUNET_TIME_Absolute future_retry; future_retry = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); qs = db_plugin->update_deposit_confirmation_status ( db_plugin->cls, w->deposit_serial, future_retry, "Exchange reported 202 Accepted but no KYC block"); if (qs < 0) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); return; } } break; } default: { enum GNUNET_DB_QueryStatus qs; struct GNUNET_TIME_Absolute future_retry; char *msg; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Exchange %s returned tracking failure for deposited coin %s\n", exchange_url, TALER_B2S (&w->coin_pub)); future_retry = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); GNUNET_asprintf (&msg, "Unexpected exchange status %u (#%d, %s)\n", dr->hr.http_status, (int) dr->hr.ec, dr->hr.hint); qs = db_plugin->update_deposit_confirmation_status ( db_plugin->cls, w->deposit_serial, future_retry, msg); GNUNET_free (msg); if (qs < 0) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); return; } return; } } /* end switch */ GNUNET_CONTAINER_DLL_remove (w_head, w_tail, w); w_count--; GNUNET_free (w); GNUNET_assert (NULL != keys); if ( (w_count < CONCURRENCY_LIMIT / 2) || (0 == w_count) ) task = GNUNET_SCHEDULER_add_now (&select_work, NULL); } /** * Typically called by `select_work`. * * @param cls NULL * @param deposit_serial identifies the deposit operation * @param wire_deadline when is the wire due * @param h_contract_terms hash of the contract terms * @param merchant_priv private key of the merchant * @param instance_id row ID of the instance * @param h_wire hash of the merchant's wire account into * @param amount_with_fee amount the exchange will deposit for this coin * @param deposit_fee fee the exchange will charge for this coin which the deposit was made * @param coin_pub public key of the deposited coin */ static void pending_deposits_cb ( void *cls, uint64_t deposit_serial, struct GNUNET_TIME_Absolute wire_deadline, const struct TALER_PrivateContractHashP *h_contract_terms, const struct TALER_MerchantPrivateKeyP *merchant_priv, const char *instance_id, const struct TALER_MerchantWireHashP *h_wire, const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *deposit_fee, const struct TALER_CoinSpendPublicKeyP *coin_pub) { struct ExchangeInteraction *w = GNUNET_new (struct ExchangeInteraction); (void) cls; if (GNUNET_TIME_absolute_is_future (wire_deadline)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Pending deposit has deadline in the future at %s\n", GNUNET_TIME_absolute2s (wire_deadline)); run_at (wire_deadline); return; } w->deposit_serial = deposit_serial; w->wire_deadline = wire_deadline; w->h_contract_terms = *h_contract_terms; w->merchant_priv = *merchant_priv; w->h_wire = *h_wire; w->amount_with_fee = *amount_with_fee; w->deposit_fee = *deposit_fee; w->coin_pub = *coin_pub; w->instance_id = GNUNET_strdup (instance_id); GNUNET_CONTAINER_DLL_insert (w_head, w_tail, w); w_count++; GNUNET_assert (NULL != keys); if (GNUNET_TIME_absolute_is_past (keys->key_data_expiration.abs_time)) { /* Parent should re-start us, then we will re-fetch /keys */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "/keys expired, shutting down\n"); GNUNET_SCHEDULER_shutdown (); return; } GNUNET_assert (NULL == w->dgh); w->dgh = TALER_EXCHANGE_deposits_get ( ctx, exchange_url, keys, &w->merchant_priv, &w->h_wire, &w->h_contract_terms, &w->coin_pub, GNUNET_TIME_UNIT_ZERO, &deposit_get_cb, w); } /** * Function called on events received from Postgres. * * @param cls closure, NULL * @param extra additional event data provided, timestamp with wire deadline * @param extra_size number of bytes in @a extra */ static void db_notify (void *cls, const void *extra, size_t extra_size) { struct GNUNET_TIME_Absolute deadline; struct GNUNET_TIME_AbsoluteNBO nbo_deadline; (void) cls; if (sizeof (nbo_deadline) != extra_size) { GNUNET_break (0); return; } if (0 != w_count) return; /* already at work! */ memcpy (&nbo_deadline, extra, extra_size); deadline = GNUNET_TIME_absolute_ntoh (nbo_deadline); run_at (deadline); } static void select_work (void *cls) { bool retry = false; uint64_t limit = CONCURRENCY_LIMIT - w_count; (void) cls; task = NULL; GNUNET_assert (w_count <= CONCURRENCY_LIMIT); GNUNET_assert (NULL != keys); if (0 == limit) { GNUNET_break (0); return; } if (GNUNET_TIME_absolute_is_past (keys->key_data_expiration.abs_time)) { /* Parent should re-start us, then we will re-fetch /keys */ GNUNET_SCHEDULER_shutdown (); return; } while (1) { enum GNUNET_DB_QueryStatus qs; db_plugin->preflight (db_plugin->cls); if (retry) limit = 1; qs = db_plugin->lookup_pending_deposits (db_plugin->cls, exchange_url, limit, retry, &pending_deposits_cb, NULL); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Looking up pending deposits query status was %d\n", (int) qs); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Transaction failed!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: if (test_mode) { GNUNET_SCHEDULER_shutdown (); return; } if (retry) return; /* nothing left */ retry = true; continue; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: default: /* wait for async completion, then select more work. */ return; } } } /** * Function called with information about who is auditing * a particular exchange and what keys the exchange is using. * The ownership over the @a keys object is passed to * the callee, thus it is given explicitly and not * (only) via @a kr. * * @param cls closure, NULL * @param kr response from /keys * @param[in] in_keys keys object passed to callback with * reference counter of 1. Must be freed by callee * using #TALER_EXCHANGE_keys_decref(). NULL on failure. */ static void keys_cb ( void *cls, const struct TALER_EXCHANGE_KeysResponse *kr, struct TALER_EXCHANGE_Keys *in_keys) { gkh = NULL; if (NULL == in_keys) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to download %skeys\n", exchange_url); GNUNET_SCHEDULER_shutdown (); return; } keys = TALER_EXCHANGE_keys_incref (in_keys); task = GNUNET_SCHEDULER_add_now (&select_work, NULL); } /** * Start a copy of this process with the exchange URL * set to the given @a base_url * * @param base_url base URL to run with */ static struct GNUNET_OS_Process * start_worker (const char *base_url) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Launching worker for exchange `%s' using `%s`\n", base_url, cfg_filename); return GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, NULL, NULL, NULL, "taler-merchant-depositcheck", "taler-merchant-depositcheck", "-c", cfg_filename, "-e", base_url, "-L", "INFO", test_mode ? "-t" : NULL, NULL); } /** * Restart worker process for the given child. * * @param cls a `struct Child *` that needs a worker. */ static void restart_child (void *cls); /** * Function called upon death or completion of a child process. * * @param cls a `struct Child *` * @param type type of the process * @param exit_code status code of the process */ static void child_done_cb (void *cls, enum GNUNET_OS_ProcessStatusType type, long unsigned int exit_code) { struct Child *c = cls; c->cwh = NULL; if ( (GNUNET_OS_PROCESS_EXITED != type) || (0 != exit_code) ) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Process for exchange %s had trouble (%d/%d)\n", c->base_url, (int) type, (int) exit_code); GNUNET_SCHEDULER_shutdown (); global_ret = 1; return; } GNUNET_OS_process_destroy (c->process); c->process = NULL; if (test_mode && (! GNUNET_TIME_relative_is_zero (c->rd)) ) { return; } if (GNUNET_TIME_absolute_is_future (c->next_start)) c->rd = GNUNET_TIME_STD_BACKOFF (c->rd); else c->rd = GNUNET_TIME_UNIT_SECONDS; c->rt = GNUNET_SCHEDULER_add_at (c->next_start, &restart_child, c); } static void restart_child (void *cls) { struct Child *c = cls; c->rt = NULL; c->next_start = GNUNET_TIME_relative_to_absolute (c->rd); c->process = start_worker (c->base_url); if (NULL == c->process) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "exec"); global_ret = 1; GNUNET_SCHEDULER_shutdown (); return; } c->cwh = GNUNET_wait_child (c->process, &child_done_cb, c); } /** * Function to iterate over section. * * @param cls closure * @param section name of the section */ static void cfg_iter_cb (void *cls, const char *section) { char *base_url; struct Child *c; if (0 != strncasecmp (section, "merchant-exchange-", strlen ("merchant-exchange-"))) return; if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno (cfg, section, "DISABLED")) return; if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, section, "EXCHANGE_BASE_URL", &base_url)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_WARNING, section, "EXCHANGE_BASE_URL"); return; } c = GNUNET_new (struct Child); c->rd = GNUNET_TIME_UNIT_SECONDS; c->base_url = base_url; GNUNET_CONTAINER_DLL_insert (c_head, c_tail, c); c->rt = GNUNET_SCHEDULER_add_now (&restart_child, c); } /** * First task. * * @param cls closure, NULL * @param args remaining command-line arguments * @param cfgfile name of the configuration file used (for saving, can be NULL!) * @param c configuration */ static void run (void *cls, char *const *args, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *c) { (void) args; cfg = c; cfg_filename = GNUNET_strdup (cfgfile); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running with configuration %s\n", cfgfile); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); if (NULL == exchange_url) { GNUNET_CONFIGURATION_iterate_sections (c, &cfg_iter_cb, NULL); if (NULL == c_head) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "No exchanges found in configuration\n"); return; } return; } ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); rc = GNUNET_CURL_gnunet_rc_create (ctx); if (NULL == ctx) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); return; } if (NULL == (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to initialize DB subsystem\n"); GNUNET_SCHEDULER_shutdown (); global_ret = 1; return; } if (GNUNET_OK != db_plugin->connect (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to connect to database\n"); GNUNET_SCHEDULER_shutdown (); global_ret = 1; return; } { struct GNUNET_DB_EventHeaderP es = { .size = htons (sizeof (es)), .type = htons (TALER_DBEVENT_MERCHANT_NEW_WIRE_DEADLINE) }; eh = db_plugin->event_listen (db_plugin->cls, &es, GNUNET_TIME_UNIT_FOREVER_REL, &db_notify, NULL); } gkh = TALER_EXCHANGE_get_keys (ctx, exchange_url, NULL, &keys_cb, NULL); } /** * The main function of the taler-merchant-depositcheck * * @param argc number of arguments from the command line * @param argv command line arguments * @return 0 ok, 1 on error */ int main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_string ('e', "exchange", "BASE_URL", "limit us to checking deposits of this exchange", &exchange_url), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return EXIT_INVALIDARGUMENT; TALER_OS_init (); ret = GNUNET_PROGRAM_run ( argc, argv, "taler-merchant-depositcheck", gettext_noop ( "background process that checks with the exchange on deposits that are past the wire deadline"), options, &run, NULL); GNUNET_free_nz ((void *) argv); if (GNUNET_SYSERR == ret) return EXIT_INVALIDARGUMENT; if (GNUNET_NO == ret) return EXIT_SUCCESS; return global_ret; } /* end of taler-merchant-depositcheck.c */