From 535b53cfc287e197ef5bfe3652ee123e847215fc Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 6 Jan 2024 14:11:59 +0100 Subject: add taler-merchant-depositcheck to build --- src/backend/Makefile.am | 17 + src/backend/taler-merchant-depositcheck.c | 572 +++++++++++++++++++++--------- 2 files changed, 425 insertions(+), 164 deletions(-) (limited to 'src/backend') diff --git a/src/backend/Makefile.am b/src/backend/Makefile.am index 7978258d..2fe09eec 100644 --- a/src/backend/Makefile.am +++ b/src/backend/Makefile.am @@ -16,6 +16,7 @@ EXTRA_DIST = \ $(pkgcfg_DATA) bin_PROGRAMS = \ + taler-merchant-depositcheck \ taler-merchant-exchange \ taler-merchant-httpd \ taler-merchant-webhook \ @@ -226,6 +227,22 @@ taler_merchant_webhook_LDADD = \ taler_merchant_webhook_CFLAGS = \ $(AM_CFLAGS) +taler_merchant_depositcheck_SOURCES = \ + taler-merchant-depositcheck.c +taler_merchant_depositcheck_LDADD = \ + $(top_builddir)/src/backenddb/libtalermerchantdb.la \ + -ltalerexchange \ + -ltalerjson \ + -ltalerutil \ + -ltalerpq \ + -ljansson \ + -lgnunetcurl \ + -lgnunetjson \ + -lgnunetutil \ + -lcurl \ + $(XLIB) +taler_merchant_depositcheck_CFLAGS = \ + $(AM_CFLAGS) taler_merchant_wirewatch_SOURCES = \ taler-merchant-wirewatch.c diff --git a/src/backend/taler-merchant-depositcheck.c b/src/backend/taler-merchant-depositcheck.c index 6982d891..4364c9f5 100644 --- a/src/backend/taler-merchant-depositcheck.c +++ b/src/backend/taler-merchant-depositcheck.c @@ -26,37 +26,59 @@ #include "taler_merchantdb_plugin.h" #include +/** + * How many requests do we make at most in parallel to the same exchange? + */ +#define CONCURRENCY_LIMIT 32 + /** * Information we keep per exchange. */ -struct ExchangeState +struct Child { /** * Kept in a DLL. */ - struct ExchangeState *next; + struct Child *next; /** * Kept in a DLL. */ - struct ExchangeState *prev; + struct Child *prev; + + /** + * The child process. + */ + struct GNUNET_OS_Process *process; + + /** + * Wait handle. + */ + struct GNUNET_ChildWaitHandle *cwh; /** * Which exchange is this state for? */ - const char *base_url; + char *base_url; /** - * Key material of the exchange. + * Task to restart the child. */ - struct TALER_EXCHANGE_Keys *keys; + struct GNUNET_SCHEDULER_Task *rt; /** - * Handle for active /keys request. + * When should the child be restarted at the earliest? */ - struct TALER_EXCHANGE_GetKeysHandle *gkh; + struct GNUNET_TIME_Absolute next_start; + + /** + * Current minimum delay between restarts, grows + * exponentially if child exits befor this time. + */ + struct GNUNET_TIME_Relative rd; + }; @@ -76,9 +98,9 @@ struct ExchangeInteraction struct ExchangeInteraction *prev; /** - * Exchange we are interacting with. + * Handle for exchange interaction. */ - struct ExchangeState *es; + struct TALER_EXCHANGE_DepositGetHandle *dgh; /** * Wire deadline for the deposit. @@ -115,18 +137,39 @@ struct ExchangeInteraction */ struct TALER_MerchantPrivateKeyP merchant_priv; + /** + * Serial number of the row in the deposits table + * that we are processing. + */ + uint64_t deposit_serial; + + /** + * The instance the deposit belongs to. + */ + char *instance_id; + }; /** - * Head of list of exchanges we interact with. + * Head of list of children we forked. + */ +static struct Child *c_head; + +/** + * Tail of list of children we forked. + */ +static struct Child *c_tail; + +/** + * Key material of the exchange. */ -static struct ExchangeState *e_head; +static struct TALER_EXCHANGE_Keys *keys; /** - * Tail of list of exchanges we interact with. + * Handle for active /keys request. */ -static struct ExchangeState *e_tail; +static struct TALER_EXCHANGE_GetKeysHandle *gkh; /** * Head of list of active exchange interactions. @@ -138,6 +181,11 @@ static struct ExchangeInteraction *w_head; */ static struct ExchangeInteraction *w_tail; +/** + * Number of active entries in the @e w_head list. + */ +static uint64_t w_count; + /** * Notification handler from database on new work. */ @@ -148,6 +196,11 @@ static struct GNUNET_DB_EventHandler *eh; */ static const struct GNUNET_CONFIGURATION_Handle *cfg; +/** + * Name of the configuration file we use. + */ +static const char *cfg_filename; + /** * Our database plugin. */ @@ -173,6 +226,12 @@ static struct GNUNET_CURL_Context *ctx; */ static struct GNUNET_CURL_RescheduleContext *rc; +/** + * Which exchange are we monitoring? NULL if we + * are the parent of the workers. + */ +static char *exchange_url; + /** * Value to return from main(). 0 on success, non-zero on errors. */ @@ -192,7 +251,7 @@ static int test_mode; static void shutdown_task (void *cls) { - struct ExchangeState *es; + struct Child *c; struct ExchangeInteraction *w; (void) cls; @@ -208,24 +267,70 @@ shutdown_task (void *cls) GNUNET_SCHEDULER_cancel (task); task = NULL; } + if (NULL != gkh) + { + TALER_EXCHANGE_get_keys_cancel (gkh); + gkh = NULL; + } while (NULL != (w = w_head)) { GNUNET_CONTAINER_DLL_remove (w_head, w_tail, w); + if (NULL != w->dgh) + { + TALER_EXCHANGE_deposits_get_cancel (w->dgh); + w->dgh = NULL; + } + w_count--; + GNUNET_free (w->instance_id); GNUNET_free (w); } - while (NULL != (es = e_head)) + while (NULL != (c = c_head)) + { + enum GNUNET_OS_ProcessStatusType type; + unsigned long code; + + GNUNET_CONTAINER_DLL_remove (c_head, + c_tail, + c); + if (NULL != c->rt) + { + GNUNET_SCHEDULER_cancel (c->rt); + c->rt = NULL; + } + if (NULL != c->cwh) + { + GNUNET_wait_child_cancel (c->cwh); + c->cwh = NULL; + } + if (NULL != c->process) + { + GNUNET_break (0 == + GNUNET_OS_process_kill (c->process, + SIGTERM)); + GNUNET_break (GNUNET_OK == + GNUNET_OS_process_wait_status (c->process, + &type, + &code)); + if ( (GNUNET_OS_PROCESS_EXITED != type) || + (0 != code) ) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Process for exchange %s had trouble (%d/%d)\n", + c->base_url, + (int) type, + (int) code); + GNUNET_OS_process_destroy (c->process); + } + GNUNET_free (c->base_url); + GNUNET_free (c); + } + if (NULL != db_plugin) { - GNUNET_CONTAINER_DLL_remove (e_head, - e_tail, - es); - GNUNET_free (es->base_url); - GNUNET_free (es); + db_plugin->rollback (db_plugin->cls); /* just in case */ + TALER_MERCHANTDB_plugin_unload (db_plugin); + db_plugin = NULL; } - db_plugin->rollback (db_plugin->cls); /* just in case */ - TALER_MERCHANTDB_plugin_unload (db_plugin); - db_plugin = NULL; cfg = NULL; if (NULL != ctx) { @@ -253,22 +358,21 @@ select_work (void *cls); * Make sure to run the select_work() task at * the @a next_deadline. * - * @param next_deadline deadline when work becomes ready + * @param deadline time when work becomes ready */ static void -run_at (struct GNUNET_TIME_Absolute next_deadline) +run_at (struct GNUNET_TIME_Absolute deadline) { if (GNUNET_TIME_absolute_cmp (deadline, - <, + >, next_deadline)) - { - if (NULL != task) - GNUNET_SCHEDULER_cancel (task); - next_deadline = deadline; - task = GNUNET_SCHEDULER_add_at (deadline, - &select_work, - NULL); - } + return; + if (NULL != task) + GNUNET_SCHEDULER_cancel (task); + next_deadline = deadline; + task = GNUNET_SCHEDULER_add_at (deadline, + &select_work, + NULL); } @@ -282,7 +386,7 @@ static void deposit_get_cb (void *cls, const struct TALER_EXCHANGE_GetDepositResponse *dr) { - struct ExchangeState *es = cls; + struct ExchangeInteraction *w = cls; switch (dr->hr.http_status) { @@ -293,10 +397,12 @@ deposit_get_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Exchange returned wire transfer over %s for deposited coin %s\n", TALER_amount2s (&dr->details.ok.coin_contribution), - TALER_B2S (&tq->coin_pub)); - qs = TMH_db->insert_deposit_to_transfer (TMH_db->cls, - tq->deposit_serial, - &dr->details.ok); + TALER_B2S (&w->coin_pub)); + // FIXME: this must filter this particular entry from the + // select below for good! + qs = db_plugin->insert_deposit_to_transfer (db_plugin->cls, + w->deposit_serial, + &dr->details.ok); if (qs < 0) { GNUNET_break (0); @@ -316,13 +422,16 @@ deposit_get_cb (void *cls, "Exchange returned KYC requirement (%d/%d) for deposited coin %s\n", dr->details.accepted.kyc_ok, dr->details.accepted.aml_decision, - TALER_B2S (&tq->coin_pub)); + TALER_B2S (&w->coin_pub)); now = GNUNET_TIME_timestamp_get (); - qs = TMH_db->account_kyc_set_status ( - TMH_db->cls, - gorc->hc->instance->settings.id, - &tq->h_wire, - tq->exchange_url, + // FIXME: this must filter this particular entry from the + // select below, at least until the KYC/AML request is + // satisfied; how will we learn that? + qs = db_plugin->account_kyc_set_status ( + db_plugin->cls, + w->instance_id, + &w->h_wire, + exchange_url, dr->details.accepted.requirement_row, NULL, NULL, @@ -340,8 +449,9 @@ deposit_get_cb (void *cls, default: { GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Exchange returned tracking failure for deposited coin %s\n", - TALER_B2S (&tq->coin_pub)); + "Exchange %s returned tracking failure for deposited coin %s\n", + exchange_url, + TALER_B2S (&w->coin_pub)); /* FIXME: how to handle? */ return; } @@ -350,107 +460,26 @@ deposit_get_cb (void *cls, GNUNET_CONTAINER_DLL_remove (w_head, w_tail, w); + w_count--; GNUNET_free (w); - // FIXME: not the best condition to go for more work, - // one down exchange would halt us entirely! - if (NULL == w_head) + if ( (w_count < CONCURRENCY_LIMIT / 2) || + (0 == w_count) ) task = GNUNET_SCHEDULER_add_now (&select_work, NULL); } -/** - * Initiate request with the exchange about deposits. - * - * @param[in,out] exchange interaction handle - */ -static void -inquire_at_exchange (struct ExchangeInteraction *w) -{ - struct ExchangeState *es = w->es; - - GNUNET_assert (NULL == w->dgh); - w->dgh = TALER_EXCHANGE_deposits_get ( - ctx, - es->exchange_url, - es->keys, - &w->merchant_priv, - &w->h_wire, - &w->h_contract_terms, - &w->coin_pub, - GNUNET_TIME_UNIT_ZERO, - &deposit_get_cb, - w); -} - - -/** - * Function called with information about who is auditing - * a particular exchange and what keys the exchange is using. - * The ownership over the @a keys object is passed to - * the callee, thus it is given explicitly and not - * (only) via @a kr. - * - * @param cls closure - * @param kr response from /keys - * @param[in] keys keys object passed to callback with - * reference counter of 1. Must be freed by callee - * using #TALER_EXCHANGE_keys_decref(). NULL on failure. - */ -static void -keys_cb ( - void *cls, - const struct TALER_EXCHANGE_KeysResponse *kr, - struct TALER_EXCHANGE_Keys *keys) -{ - struct ExchangeState *es = cls; - - es->gkh = NULL; - if (NULL == keys) - return; - if (NULL != es->keys) - TALER_EXCHANGE_keys_decref (keys); - es->keys = TALER_EXCHANGE_keys_incref (keys); - /* Trigger all deposits blocked on fetching /keys */ - for (struct ExchangeInteraction *w = w_head; - NULL != w; - w = w->next) - { - if (w->es != es) - continue; - if (NULL != w->dgh) - continue; - inquire_at_exchange (w); - } -} - - -/** - * Download /keys from an exchange. - * - * @param[in,out] es exchange state - */ -static void -fetch_keys (struct ExchangeState *es) -{ - GNUNET_assert (NULL == es->gkh); - es->gkh = TALER_EXCHANGE_get_keys (ctx, - es->base_url, - es->keys, - &keys_cb, - es); -} - - /** * Typically called by `select_work`. * * @param cls NULL * @param deposit_serial identifies the deposit operation - * @param exchange_url URL of the exchange that issued @a coin_pub - * @param amount_with_fee amount the exchange will deposit for this coin - * @param deposit_fee fee the exchange will charge for this coin - * @param h_wire hash of the merchant's wire account into which the deposit was made + * @param wire_deadline when is the wire due + * @param h_contract_terms hash of the contract terms + * @param merchant_priv private key of the merchant + * @param instance_id row ID of the instance + * @param h_wire hash of the merchant's wire account into * @param amount_with_fee amount the exchange will deposit for this coin + * @param deposit_fee fee the exchange will charge for this coin which the deposit was made * @param coin_pub public key of the deposited coin */ static void @@ -458,16 +487,15 @@ pending_deposits_cb ( void *cls, uint64_t deposit_serial, struct GNUNET_TIME_Absolute wire_deadline, /* missing in DB! Funky migration needed! */ - const char *exchange_url, const struct TALER_PrivateContractHashP *h_contract_terms, const struct TALER_MerchantPrivateKeyP *merchant_priv, + const char *instance_id, const struct TALER_MerchantWireHashP *h_wire, const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *deposit_fee, const struct TALER_CoinSpendPublicKeyP *coin_pub) { struct ExchangeInteraction *w = GNUNET_new (struct ExchangeInteraction); - struct ExchangeState *es = NULL; (void) cls; if (GNUNET_TIME_absolute_is_future (wire_deadline)) @@ -475,6 +503,7 @@ pending_deposits_cb ( run_at (wire_deadline); return; } + w->deposit_serial = deposit_serial; w->wire_deadline = wire_deadline; w->h_contract_terms = *h_contract_terms; w->merchant_priv = *merchant_priv; @@ -482,29 +511,30 @@ pending_deposits_cb ( w->amount_with_fee = *amount_with_fee; w->deposit_fee = *deposit_fee; w->coin_pub = *coin_pub; + w->instance_id = GNUNET_strdup (instance_id); GNUNET_CONTAINER_DLL_insert (w_head, w_tail, w); - for (es = e_head; NULL != es; es = es->next) - if (0 == strcmp (exchange_url, - es->base_url)) - break; - if (NULL == es) + w_count++; + GNUNET_assert (NULL != keys); + if (GNUNET_TIME_absolute_is_past (keys->key_data_expiration.abs_time)) { - es = GNUNET_new (struct ExchangeState); - es->base_url = GNUNET_strdup (exchange_url); - GNUNET_CONTAINER_DLL_insert (e_head, - e_tail, - es); - } - w->es = es; - if ( (NULL == es->keys) || - (GNUNET_TIME_absolute_is_past (es->keys->key_data_expiration)) ) - { - fetch_keys (es); + /* Parent should re-start us, then we will re-fetch /keys */ + GNUNET_SCHEDULER_shutdown (); return; } - inquire_at_exchange (w); + GNUNET_assert (NULL == w->dgh); + w->dgh = TALER_EXCHANGE_deposits_get ( + ctx, + exchange_url, + keys, + &w->merchant_priv, + &w->h_wire, + &w->h_contract_terms, + &w->coin_pub, + GNUNET_TIME_UNIT_ZERO, + &deposit_get_cb, + w); } @@ -529,6 +559,8 @@ db_notify (void *cls, GNUNET_break (0); return; } + if (0 != w_count) + return; /* already at work! */ memcpy (&nbo_deadline, extra, extra_size); @@ -541,18 +573,35 @@ static void select_work (void *cls) { bool retry = false; + uint32_t limit = CONCURRENCY_LIMIT - w_count; (void) cls; task = NULL; + GNUNET_assert (NULL != keys); + if (GNUNET_TIME_absolute_is_past (keys->key_data_expiration.abs_time)) + { + /* Parent should re-start us, then we will re-fetch /keys */ + GNUNET_SCHEDULER_shutdown (); + return; + } while (1) { - struct GNUNET_TIME_Absolute now; enum GNUNET_DB_QueryStatus qs; - now = GNUNET_TIME_absolute_get (); db_plugin->preflight (db_plugin->cls); + // NOTE: + // SQL must filter all deposits for accounts + // that are already KYC/AML-blocked! + // (FIXME: How do we then learn about KYC unblocking?) + // Usually, select up to limit + // deposits with wire deadlines < now, + // or *1* deposit with smallest wire deadline if + // retry is 'true'. + if (retry) + limit = 1; qs = db_plugin->lookup_pending_deposits (db_plugin->cls, - now, + exchange_url, + limit, retry, &pending_deposits_cb, NULL); @@ -581,6 +630,178 @@ select_work (void *cls) } +/** + * Function called with information about who is auditing + * a particular exchange and what keys the exchange is using. + * The ownership over the @a keys object is passed to + * the callee, thus it is given explicitly and not + * (only) via @a kr. + * + * @param cls closure, NULL + * @param kr response from /keys + * @param[in] keys keys object passed to callback with + * reference counter of 1. Must be freed by callee + * using #TALER_EXCHANGE_keys_decref(). NULL on failure. + */ +static void +keys_cb ( + void *cls, + const struct TALER_EXCHANGE_KeysResponse *kr, + struct TALER_EXCHANGE_Keys *keys) +{ + gkh = NULL; + if (NULL == keys) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to download %skeys\n", + exchange_url); + GNUNET_SCHEDULER_shutdown (); + return; + } + keys = TALER_EXCHANGE_keys_incref (keys); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&select_work, + NULL); +} + + +/** + * Start a copy of this process with the exchange URL + * set to the given @a base_url + * + * @param base_url base URL to run with + */ +static struct GNUNET_OS_Process * +start_worker (const char *base_url) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Launching worker for exchange `%s'\n", + base_url); + return GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, + NULL, + NULL, + NULL, + "taler-merchant-depositcheck", + "-c", cfg_filename, + "-e", base_url, + NULL); +} + + +/** + * Restart worker process for the given child. + * + * @param cls a `struct Child *` that needs a worker. + */ +static void +restart_child (void *cls); + + +/** + * Function called upon death or completion of a child process. + * + * @param cls a `struct Child *` + * @param type type of the process + * @param exit_code status code of the process + */ +static void +child_done_cb (void *cls, + enum GNUNET_OS_ProcessStatusType type, + long unsigned int exit_code) +{ + struct Child *c = cls; + + c->cwh = NULL; + if ( (GNUNET_OS_PROCESS_EXITED != type) || + (0 != exit_code) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Process for exchange %s had trouble (%d/%d)\n", + c->base_url, + (int) type, + (int) exit_code); + GNUNET_SCHEDULER_shutdown (); + global_ret = 1; + return; + } + GNUNET_OS_process_destroy (c->process); + if (GNUNET_TIME_absolute_is_future (c->next_start)) + c->rd = GNUNET_TIME_STD_BACKOFF (c->rd); + else + c->rd = GNUNET_TIME_UNIT_SECONDS; + c->rt = GNUNET_SCHEDULER_add_at (c->next_start, + &restart_child, + c); +} + + +static void +restart_child (void *cls) +{ + struct Child *c = cls; + + c->rt = NULL; + c->next_start = GNUNET_TIME_relative_to_absolute (c->rd); + c->process = start_worker (c->base_url); + if (NULL == c->process) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, + "exec"); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } + c->cwh = GNUNET_wait_child (c->process, + &child_done_cb, + c); +} + + +/** + * Function to iterate over section. + * + * @param cls closure + * @param section name of the section + */ +static void +cfg_iter_cb (void *cls, + const char *section) +{ + char *base_url; + struct Child *c; + + if (0 != + strncasecmp (section, + "merchant-exchange-", + strlen ("merchant-exchange-"))) + return; + if (GNUNET_YES == + GNUNET_CONFIGURATION_get_value_yesno (cfg, + section, + "DISABLED")) + return; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_string (cfg, + section, + "EXCHANGE_BASE_URL", + &base_url)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_WARNING, + section, + "EXCHANGE_BASE_URL"); + return; + } + c = GNUNET_new (struct Child); + c->rd = GNUNET_TIME_UNIT_SECONDS; + c->base_url = base_url; + GNUNET_CONTAINER_DLL_insert (c_head, + c_tail, + c); + c->rt = GNUNET_SCHEDULER_add_now (&restart_child, + c); +} + + /** * First task. * @@ -596,11 +817,25 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *c) { (void) args; - (void) cfgfile; cfg = c; + cfg_filename = cfgfile; GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); + if (NULL == exchange_url) + { + GNUNET_CONFIGURATION_iterate_sections (c, + &cfg_iter_cb, + NULL); + if (NULL == c_head) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "No exchanges found in configuration\n"); + return; + } + return; + } + ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); rc = GNUNET_CURL_gnunet_rc_create (ctx); @@ -616,6 +851,7 @@ run (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to initialize DB subsystem\n"); GNUNET_SCHEDULER_shutdown (); + global_ret = 1; return; } if (GNUNET_OK != @@ -624,6 +860,7 @@ run (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to connect to database\n"); GNUNET_SCHEDULER_shutdown (); + global_ret = 1; return; } { @@ -638,9 +875,11 @@ run (void *cls, &db_notify, NULL); } - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&select_work, - NULL); + gkh = TALER_EXCHANGE_get_keys (ctx, + exchange_url, + NULL, + &keys_cb, + NULL); } @@ -656,6 +895,11 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_string ('e', + "exchange", + "BASE_URL", + "limit us to checking deposits of this exchange", + &exchange_url), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", -- cgit v1.2.3