/* This file is part of TALER Copyright (C) 2016-2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ /** * @file taler-exchange-transfer.c * @brief Process that actually finalizes outgoing transfers with the wire gateway / bank * @author Christian Grothoff */ #include "platform.h" #include #include #include #include "taler_exchangedb_lib.h" #include "taler_exchangedb_plugin.h" #include "taler_json_lib.h" #include "taler_bank_service.h" /** * What is the maximum batch size we use for credit history * requests with the bank. See `batch_size` below. */ #define MAXIMUM_BATCH_SIZE 1024 /** * How often will we retry a request (given certain * HTTP status codes) before giving up? */ #define MAX_RETRIES 16 /** * Information about our work shard. */ struct Shard { /** * Time when we started to work on this shard. */ struct GNUNET_TIME_Absolute shard_start_time; /** * Offset the shard begins at. */ uint64_t shard_start; /** * Exclusive offset where the shard ends. */ uint64_t shard_end; /** * Offset where our current batch begins. */ uint64_t batch_start; /** * Highest row processed in the current batch. */ uint64_t batch_end; }; /** * Data we keep to #run_transfers(). There is at most * one of these around at any given point in time. * Note that this limits parallelism, and we might want * to revise this decision at a later point. */ struct WirePrepareData { /** * All transfers done in the same transaction * are kept in a DLL. */ struct WirePrepareData *next; /** * All transfers done in the same transaction * are kept in a DLL. */ struct WirePrepareData *prev; /** * Wire execution handle. */ struct TALER_BANK_TransferHandle *eh; /** * Wire account used for this preparation. */ const struct TALER_EXCHANGEDB_AccountInfo *wa; /** * Row ID of the transfer. */ unsigned long long row_id; /** * Number of bytes allocated after this struct * with the prewire data. */ size_t buf_size; /** * How often did we retry so far? */ unsigned int retries; }; /** * The exchange's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Our database plugin. */ static struct TALER_EXCHANGEDB_Plugin *db_plugin; /** * Next task to run, if any. */ static struct GNUNET_SCHEDULER_Task *task; /** * If we are currently executing transfers, information about * the active transfers is here. Otherwise, this variable is NULL. */ static struct WirePrepareData *wpd_head; /** * If we are currently executing transfers, information about * the active transfers is here. Otherwise, this variable is NULL. */ static struct WirePrepareData *wpd_tail; /** * Information about our work shard. */ static struct Shard *shard; /** * Handle to the context for interacting with the bank / wire gateway. */ static struct GNUNET_CURL_Context *ctx; /** * Scheduler context for running the @e ctx. */ static struct GNUNET_CURL_RescheduleContext *rc; /** * Value to return from main(). 0 on success, non-zero on errors. */ static int global_ret; /** * #GNUNET_YES if we are in test mode and should exit when idle. */ static int test_mode; /** * How long should we sleep when idle before trying to find more work? * Also used for how long we wait to grab a shard before trying it again. * The value should be set to a bit above the average time it takes to * process a shard. */ static struct GNUNET_TIME_Relative transfer_idle_sleep_interval; /** * How long did we take to finish the last shard? */ static struct GNUNET_TIME_Relative shard_delay; /** * Modulus to apply to group shards. The shard size must ultimately be a * multiple of the batch size. Thus, if this is not a multiple of the * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size. */ static unsigned int shard_size = MAXIMUM_BATCH_SIZE; /** * How many workers should we plan our scheduling with? */ static unsigned int max_workers = 16; /** * Clean up all active bank interactions. */ static void cleanup_wpd (void) { struct WirePrepareData *wpd; while (NULL != (wpd = wpd_head)) { GNUNET_CONTAINER_DLL_remove (wpd_head, wpd_tail, wpd); if (NULL != wpd->eh) { TALER_BANK_transfer_cancel (wpd->eh); wpd->eh = NULL; } GNUNET_free (wpd); } } /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. * * @param cls closure */ static void shutdown_task (void *cls) { (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); if (NULL != task) { GNUNET_SCHEDULER_cancel (task); task = NULL; } cleanup_wpd (); GNUNET_free (shard); db_plugin->rollback (db_plugin->cls); /* just in case */ TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); cfg = NULL; if (NULL != ctx) { GNUNET_CURL_fini (ctx); ctx = NULL; } if (NULL != rc) { GNUNET_CURL_gnunet_rc_destroy (rc); rc = NULL; } } /** * Parse the configuration for wirewatch. * * @return #GNUNET_OK on success */ static enum GNUNET_GenericReturnValue parse_transfer_config (void) { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "exchange", "TRANSFER_IDLE_SLEEP_INTERVAL", &transfer_idle_sleep_interval)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "exchange", "TRANSFER_IDLE_SLEEP_INTERVAL"); return GNUNET_SYSERR; } if (NULL == (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to initialize DB subsystem\n"); return GNUNET_SYSERR; } if (GNUNET_OK != TALER_EXCHANGEDB_load_accounts (cfg, TALER_EXCHANGEDB_ALO_DEBIT | TALER_EXCHANGEDB_ALO_AUTHDATA)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No wire accounts configured for debit!\n"); TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; return GNUNET_SYSERR; } return GNUNET_OK; } /** * Perform a database commit. If it fails, print a warning. * * @return status of commit */ static enum GNUNET_DB_QueryStatus commit_or_warn (void) { enum GNUNET_DB_QueryStatus qs; qs = db_plugin->commit (db_plugin->cls); if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) return qs; GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) ? GNUNET_ERROR_TYPE_INFO : GNUNET_ERROR_TYPE_ERROR, "Failed to commit database transaction!\n"); return qs; } /** * Execute the wire transfers that we have committed to * do. * * @param cls NULL */ static void run_transfers (void *cls); /** * Select shard to process. * * @param cls NULL */ static void select_shard (void *cls); /** * We are done with the current batch. Commit * and move on. */ static void batch_done (void) { /* batch done */ switch (commit_or_warn ()) { case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); return; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: shard->batch_start = shard->batch_end + 1; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Batch complete\n"); /* continue with #run_transfers(), just to guard against the unlikely case that there are more. */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); return; default: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } } /** * Function called with the result from the execute step. * On success, we mark the respective wire transfer as finished, * and in general we afterwards continue to #run_transfers(), * except for irrecoverable errors. * * @param cls `struct WirePrepareData` we are working on * @param http_status_code #MHD_HTTP_OK on success * @param ec taler error code * @param row_id unique ID of the wire transfer in the bank's records * @param wire_timestamp when did the transfer happen */ static void wire_confirm_cb (void *cls, unsigned int http_status_code, enum TALER_ErrorCode ec, uint64_t row_id, struct GNUNET_TIME_Absolute wire_timestamp) { struct WirePrepareData *wpd = cls; enum GNUNET_DB_QueryStatus qs; (void) row_id; (void) wire_timestamp; wpd->eh = NULL; switch (http_status_code) { case MHD_HTTP_OK: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Wire transfer %llu completed successfully\n", (unsigned long long) wpd->row_id); qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, wpd->row_id); /* continued below */ break; case MHD_HTTP_NOT_FOUND: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Wire transaction %llu failed: %u/%d\n", (unsigned long long) wpd->row_id, http_status_code, ec); qs = db_plugin->wire_prepare_data_mark_failed (db_plugin->cls, wpd->row_id); /* continued below */ break; case 0: case MHD_HTTP_TOO_MANY_REQUESTS: case MHD_HTTP_INTERNAL_SERVER_ERROR: case MHD_HTTP_BAD_GATEWAY: case MHD_HTTP_SERVICE_UNAVAILABLE: case MHD_HTTP_GATEWAY_TIMEOUT: wpd->retries++; if (wpd->retries < MAX_RETRIES) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Wire transfer %llu failed (%u), trying again\n", (unsigned long long) wpd->row_id, http_status_code); wpd->eh = TALER_BANK_transfer (ctx, wpd->wa->auth, &wpd[1], wpd->buf_size, &wire_confirm_cb, wpd); return; } GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Wire transaction %llu failed: %u/%d\n", (unsigned long long) wpd->row_id, http_status_code, ec); db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; default: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Wire transfer %llu failed: %u/%d\n", (unsigned long long) wpd->row_id, http_status_code, ec); db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } shard->batch_end = GNUNET_MAX (wpd->row_id, shard->batch_end); switch (qs) { case GNUNET_DB_STATUS_SOFT_ERROR: db_plugin->rollback (db_plugin->cls); cleanup_wpd (); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); return; case GNUNET_DB_STATUS_HARD_ERROR: db_plugin->rollback (db_plugin->cls); cleanup_wpd (); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: GNUNET_CONTAINER_DLL_remove (wpd_head, wpd_tail, wpd); GNUNET_free (wpd); break; } if (NULL != wpd_head) return; /* wait for other queries to complete */ batch_done (); } /** * Callback with data about a prepared transaction. Triggers the respective * wire transfer using the prepared transaction data. * * @param cls NULL * @param rowid row identifier used to mark prepared transaction as done * @param wire_method wire method the preparation was done for * @param buf transaction data that was persisted, NULL on error * @param buf_size number of bytes in @a buf, 0 on error */ static void wire_prepare_cb (void *cls, uint64_t rowid, const char *wire_method, const char *buf, size_t buf_size) { struct WirePrepareData *wpd; (void) cls; if (rowid >= shard->shard_end) { /* skip */ shard->batch_end = shard->shard_end - 1; if (NULL != wpd_head) return; batch_done (); return; } if ( (NULL == wire_method) || (NULL == buf) ) { GNUNET_break (0); db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } wpd = GNUNET_malloc (sizeof (struct WirePrepareData) + buf_size); memcpy (&wpd[1], buf, buf_size); wpd->buf_size = buf_size; wpd->row_id = rowid; GNUNET_CONTAINER_DLL_insert (wpd_head, wpd_tail, wpd); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting wire transfer %llu\n", (unsigned long long) rowid); wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method); if (NULL == wpd->wa) { /* Should really never happen here, as when we get here the wire account should be in the cache. */ GNUNET_break (0); db_plugin->rollback (db_plugin->cls); global_ret = EXIT_NOTCONFIGURED; GNUNET_SCHEDULER_shutdown (); return; } wpd->eh = TALER_BANK_transfer (ctx, wpd->wa->auth, buf, buf_size, &wire_confirm_cb, wpd); if (NULL == wpd->eh) { GNUNET_break (0); /* Irrecoverable */ db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } } /** * Execute the wire transfers that we have committed to * do. * * @param cls NULL */ static void run_transfers (void *cls) { enum GNUNET_DB_QueryStatus qs; int64_t limit; (void) cls; task = NULL; limit = shard->shard_end - shard->batch_start; if (0 >= limit) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Shard [%llu,%llu) completed\n", (unsigned long long) shard->shard_start, (unsigned long long) shard->batch_end); qs = db_plugin->complete_shard (db_plugin->cls, "transfer", shard->shard_start, shard->batch_end + 1); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); GNUNET_free (shard); GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got DB soft error for complete_shard. Rolling back.\n"); GNUNET_free (shard); task = GNUNET_SCHEDULER_add_now (&select_shard, NULL); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* already existed, ok, let's just continue */ break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: /* normal case */ break; } shard_delay = GNUNET_TIME_absolute_get_duration (shard->shard_start_time); GNUNET_free (shard); task = GNUNET_SCHEDULER_add_now (&select_shard, NULL); return; } /* cap number of parallel connections to a reasonable limit for concurrent requests to the bank */ limit = GNUNET_MIN (limit, 256); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking for %lld pending wire transfers [%llu-...)\n", (long long) limit, (unsigned long long) shard->batch_start); if (GNUNET_OK != db_plugin->start_read_committed (db_plugin->cls, "aggregator run transfer")) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } qs = db_plugin->wire_prepare_data_get (db_plugin->cls, shard->batch_start, limit, &wire_prepare_cb, NULL); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: db_plugin->rollback (db_plugin->cls); GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* no more prepared wire transfers, go sleep a bit! */ db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); if (GNUNET_YES == test_mode) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more pending wire transfers, shutting down (because we are in test mode)\n"); GNUNET_SCHEDULER_shutdown (); } else { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more pending wire transfers, going idle\n"); task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval, &run_transfers, NULL); } return; default: /* continued in wire_prepare_cb() */ return; } } /** * Select shard to process. * * @param cls NULL */ static void select_shard (void *cls) { enum GNUNET_DB_QueryStatus qs; struct GNUNET_TIME_Relative delay; uint64_t start; uint64_t end; (void) cls; task = NULL; if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database connection!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } if (0 == max_workers) delay = GNUNET_TIME_UNIT_ZERO; else delay.rel_value_us = GNUNET_CRYPTO_random_u64 ( GNUNET_CRYPTO_QUALITY_WEAK, 4 * GNUNET_TIME_relative_max ( transfer_idle_sleep_interval, GNUNET_TIME_relative_multiply (shard_delay, max_workers)).rel_value_us); qs = db_plugin->begin_shard (db_plugin->cls, "transfer", delay, shard_size, &start, &end); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain starting point for montoring from database!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval, &select_shard, NULL); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_break (0); task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval, &select_shard, NULL); return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: /* continued below */ break; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting with shard [%llu,%llu)\n", (unsigned long long) start, (unsigned long long) end); shard = GNUNET_new (struct Shard); shard->shard_start_time = GNUNET_TIME_absolute_get (); shard->shard_start = start; shard->shard_end = end; shard->batch_start = start; task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); } /** * First task. * * @param cls closure, NULL * @param args remaining command-line arguments * @param cfgfile name of the configuration file used (for saving, can be NULL!) * @param c configuration */ static void run (void *cls, char *const *args, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *c) { (void) cls; (void) args; (void) cfgfile; cfg = c; if (GNUNET_OK != parse_transfer_config ()) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; return; } ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); rc = GNUNET_CURL_gnunet_rc_create (ctx); if (NULL == ctx) { GNUNET_break (0); return; } if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database connection!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&select_shard, NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, cls); } /** * The main function of the taler-exchange-transfer. * * @param argc number of arguments from the command line * @param argv command line arguments * @return 0 ok, 1 on error */ int main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_uint ('S', "size", "SIZE", "Size to process per shard (default: 1024)", &shard_size), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), GNUNET_GETOPT_option_uint ('w', "workers", "COUNT", "Plan work load with up to COUNT worker processes (default: 16)", &max_workers), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return EXIT_INVALIDARGUMENT; TALER_OS_init (); ret = GNUNET_PROGRAM_run ( argc, argv, "taler-exchange-transfer", gettext_noop ( "background process that executes outgoing wire transfers"), options, &run, NULL); GNUNET_free_nz ((void *) argv); if (GNUNET_SYSERR == ret) return EXIT_INVALIDARGUMENT; if (GNUNET_NO == ret) return EXIT_SUCCESS; return global_ret; } /* end of taler-exchange-transfer.c */