diff options
Diffstat (limited to 'src/backend/taler-merchant-wirewatch.c')
-rw-r--r-- | src/backend/taler-merchant-wirewatch.c | 756 |
1 files changed, 756 insertions, 0 deletions
diff --git a/src/backend/taler-merchant-wirewatch.c b/src/backend/taler-merchant-wirewatch.c new file mode 100644 index 00000000..17eb7a0a --- /dev/null +++ b/src/backend/taler-merchant-wirewatch.c @@ -0,0 +1,756 @@ +/* + This file is part of TALER + Copyright (C) 2023 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +/** + * @file taler-merchant-wirewatch.c + * @brief Process that imports information about incoming bank transfers into the merchant backend + * @author Christian Grothoff + */ +#include "platform.h" +#include <gnunet/gnunet_util_lib.h> +#include <jansson.h> +#include <pthread.h> +#include <taler/taler_dbevents.h> +#include "taler_merchant_bank_lib.h" +#include "taler_merchantdb_lib.h" +#include "taler_merchantdb_plugin.h" + +/** + * Timeout for the bank interaction. Rather long as we should do long-polling + * and do not want to wake up too often. + */ +#define BANK_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \ + 5) + + +/** + * Information about a watch job. + */ +struct Watch +{ + /** + * Kept in a DLL. + */ + struct Watch *next; + + /** + * Kept in a DLL. + */ + struct Watch *prev; + + /** + * Next task to run, if any. + */ + struct GNUNET_SCHEDULER_Task *task; + + /** + * Dynamically adjusted long polling time-out. + */ + struct GNUNET_TIME_Relative bank_timeout; + + /** + * For which instance are we importing bank transfers? + */ + char *instance_id; + + /** + * For which account are we importing bank transfers? + */ + char *payto_uri; + + /** + * Bank history request. + */ + struct TALER_MERCHANT_BANK_CreditHistoryHandle *hh; + + /** + * Start row for the bank interaction. Exclusive. + */ + uint64_t start_row; + + /** + * Artificial delay to use between API calls. Used to + * throttle on failures. + */ + struct GNUNET_TIME_Relative delay; + + /** + * When did we start our last HTTP request? + */ + struct GNUNET_TIME_Absolute start_time; + + /** + * How long should long-polling take at least? + */ + struct GNUNET_TIME_Absolute long_poll_timeout; + + /** + * Login data for the bank. + */ + struct TALER_MERCHANT_BANK_AuthenticationData ad; + + /** + * Set to true if we found a transaction in the last iteration. + */ + bool found; + +}; + + +/** + * Head of active watches. + */ +static struct Watch *w_head; + +/** + * Tail of active watches. + */ +static struct Watch *w_tail; + +/** + * The merchant's configuration. + */ +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Our database plugin. + */ +static struct TALER_MERCHANTDB_Plugin *db_plugin; + +/** + * Handle to the context for interacting with the bank. + */ +static struct GNUNET_CURL_Context *ctx; + +/** + * Scheduler context for running the @e ctx. + */ +static struct GNUNET_CURL_RescheduleContext *rc; + +/** + * Event handler to learn that the configuration changed + * and we should shutdown (to be restarted). + */ +static struct GNUNET_DB_EventHandler *eh; + +/** + * Value to return from main(). 0 on success, non-zero on errors. + */ +static int global_ret; + +/** + * How many transactions should we fetch at most per batch? + */ +static unsigned int batch_size = 32; + +/** + * #GNUNET_YES if we are in test mode and should exit when idle. + */ +static int test_mode; + +/** + * #GNUNET_YES if we are in persistent mode and do + * not exit on #config_changed. + */ +static int persist_mode; + +/** + * Set to true if we are shutting down due to a + * configuration change. + */ +static bool config_changed_flag; + +/** + * Save progress in DB. + */ +static void +save (struct Watch *w) +{ + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->update_wirewatch_progress (db_plugin->cls, + w->instance_id, + w->payto_uri, + w->start_row); + if (qs < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to persist wirewatch progress for %s/%s (%d)\n", + w->instance_id, + w->payto_uri, + qs); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_FAILURE; + } +} + + +/** + * Free resources of @a w. + * + * @param w watch job to terminate + */ +static void +end_watch (struct Watch *w) +{ + if (NULL != w->task) + { + GNUNET_SCHEDULER_cancel (w->task); + w->task = NULL; + } + if (NULL != w->hh) + { + TALER_MERCHANT_BANK_credit_history_cancel (w->hh); + w->hh = NULL; + } + GNUNET_free (w->instance_id); + GNUNET_free (w->payto_uri); + TALER_MERCHANT_BANK_auth_free (&w->ad); + GNUNET_CONTAINER_DLL_remove (w_head, + w_tail, + w); + GNUNET_free (w); +} + + +/** + * We're being aborted with CTRL-C (or SIGTERM). Shut down. + * + * @param cls closure + */ +static void +shutdown_task (void *cls) +{ + (void) cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Running shutdown\n"); + while (NULL != w_head) + { + struct Watch *w = w_head; + + save (w); + end_watch (w); + } + if (NULL != eh) + { + db_plugin->event_listen_cancel (eh); + eh = NULL; + } + TALER_MERCHANTDB_plugin_unload (db_plugin); + db_plugin = NULL; + cfg = NULL; + if (NULL != ctx) + { + GNUNET_CURL_fini (ctx); + ctx = NULL; + } + if (NULL != rc) + { + GNUNET_CURL_gnunet_rc_destroy (rc); + rc = NULL; + } +} + + +/** + * Parse @a subject from wire transfer into @a wtid and @a exchange_url. + * + * @param subject wire transfer subject to parse; + * format is "$WTID $URL" + * @param[out] wtid wire transfer ID to extract + * @param[out] exchange_url set to exchange URL + * @return #GNUNET_OK on success + */ +static enum GNUNET_GenericReturnValue +parse_subject (const char *subject, + struct TALER_WireTransferIdentifierRawP *wtid, + char **exchange_url) +{ + const char *space; + + space = strchr (subject, ' '); + if (NULL == space) + return GNUNET_NO; + if (GNUNET_OK != + GNUNET_STRINGS_string_to_data (subject, + space - subject, + wtid, + sizeof (*wtid))) + return GNUNET_NO; + space++; + if (! TALER_url_valid_charset (space)) + return GNUNET_NO; + if ( (0 != strncasecmp ("http://", + space, + strlen ("http://"))) && + (0 != strncasecmp ("https://", + space, + strlen ("https://"))) ) + return GNUNET_NO; + *exchange_url = GNUNET_strdup (space); + return GNUNET_OK; +} + + +/** + * Run next iteration. + * + * @param cls a `struct Watch *` + */ +static void +do_work (void *cls); + + +/** + * Callbacks of this type are used to serve the result of asking + * the bank for the credit transaction history. + * + * @param cls a `struct Watch *` + * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request + * 0 if the bank's reply is bogus (fails to follow the protocol), + * #MHD_HTTP_NO_CONTENT if there are no more results; on success the + * last callback is always of this status (even if `abs(num_results)` were + * already returned). + * @param ec detailed error code + * @param serial_id monotonically increasing counter corresponding to the transaction + * @param details details about the wire transfer + * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + */ +static enum GNUNET_GenericReturnValue +credit_cb ( + void *cls, + unsigned int http_status, + enum TALER_ErrorCode ec, + uint64_t serial_id, + const struct TALER_MERCHANT_BANK_CreditDetails *details) +{ + struct Watch *w = cls; + + switch (http_status) + { + case 0: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Invalid HTTP response (HTTP status: 0, %d) from bank\n", + ec); + w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); + break; + case MHD_HTTP_OK: + { + enum GNUNET_DB_QueryStatus qs; + char *exchange_url; + struct TALER_WireTransferIdentifierRawP wtid; + char *credit_payto; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Received wire transfer `%s' over %s\n", + details->wire_subject, + TALER_amount2s (&details->amount)); + w->found = true; + if (GNUNET_OK != + parse_subject (details->wire_subject, + &wtid, + &exchange_url)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Skipping transfer %llu (%s): not from exchange\n", + (unsigned long long) serial_id, + details->wire_subject); + w->start_row = serial_id; + return GNUNET_OK; + } + /* FIXME-Performance-Optimization: consider grouping multiple inserts + into one bigger transaction with just one notify. */ + credit_payto = TALER_payto_normalize (details->credit_account_uri); + qs = db_plugin->insert_transfer (db_plugin->cls, + w->instance_id, + exchange_url, + &wtid, + &details->amount, + credit_payto, + true /* confirmed */); + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + struct TALER_Amount total; + struct TALER_Amount wfee; + struct TALER_Amount eamount; + struct GNUNET_TIME_Timestamp timestamp; + bool have_esig; + bool verified; + + qs = db_plugin->lookup_transfer (db_plugin->cls, + w->instance_id, + exchange_url, + &wtid, + &total, + &wfee, + &eamount, + ×tamp, + &have_esig, + &verified); + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Inserting transfer for %s into database failed. Is the credit account %s configured correctly?\n", + w->instance_id, + credit_payto); + } + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) + { + if (0 != + TALER_amount_cmp (&total, + &details->amount)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Inserting transfer for %s into database failed. An entry exists for a different transfer amount (%s)!\n", + w->instance_id, + TALER_amount2s (&total)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Inserting transfer for %s into database failed. An equivalent entry already exists.\n", + w->instance_id); + } + } + } + GNUNET_free (credit_payto); + GNUNET_free (exchange_url); + if (qs < 0) + { + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + w->hh = NULL; + return GNUNET_SYSERR; + } + /* Success => reset back-off timer! */ + w->delay = GNUNET_TIME_UNIT_ZERO; + { + struct GNUNET_DB_EventHeaderP es = { + .size = htons (sizeof (es)), + .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED) + }; + + db_plugin->event_notify (db_plugin->cls, + &es, + NULL, + 0); + } + } + w->start_row = serial_id; + return GNUNET_OK; + case MHD_HTTP_NO_CONTENT: + save (w); + /* Delay artificially if server returned before long-poll timeout */ + if (! w->found) + w->delay = GNUNET_TIME_absolute_get_remaining (w->long_poll_timeout); + break; + case MHD_HTTP_NOT_FOUND: + /* configuration likely wrong, wait at least 1 minute, backoff up to 15 minutes! */ + w->delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MINUTES, + GNUNET_TIME_STD_BACKOFF (w->delay)); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Bank claims account is unknown, waiting for %s before trying again\n", + GNUNET_TIME_relative2s (w->delay, + true)); + break; + case MHD_HTTP_GATEWAY_TIMEOUT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Gateway timeout, adjusting long polling threshold\n"); + /* Limit new timeout at request delay */ + w->bank_timeout + = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_duration ( + w->start_time), + w->bank_timeout); + /* set the timeout a bit earlier */ + w->bank_timeout + = GNUNET_TIME_relative_subtract (w->bank_timeout, + GNUNET_TIME_UNIT_SECONDS); + /* do not allow it to go to zero */ + w->bank_timeout + = GNUNET_TIME_relative_max (w->bank_timeout, + GNUNET_TIME_UNIT_SECONDS); + w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); + break; + default: + /* Something went wrong, try again, but with back-off */ + w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Unexpected HTTP status code %u(%d) from bank\n", + http_status, + ec); + break; + } + w->hh = NULL; + if (test_mode && (! w->found)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No transactions found and in test mode. Ending watch!\n"); + end_watch (w); + if (NULL == w_head) + GNUNET_SCHEDULER_shutdown (); + return GNUNET_OK; + } + w->task = GNUNET_SCHEDULER_add_delayed (w->delay, + &do_work, + w); + return GNUNET_OK; +} + + +static void +do_work (void *cls) +{ + struct Watch *w = cls; + + w->task = NULL; + w->found = false; + w->long_poll_timeout + = GNUNET_TIME_relative_to_absolute (w->bank_timeout); + w->start_time + = GNUNET_TIME_absolute_get (); + w->hh = TALER_MERCHANT_BANK_credit_history (ctx, + &w->ad, + w->start_row, + batch_size, + test_mode + ? GNUNET_TIME_UNIT_ZERO + : w->bank_timeout, + &credit_cb, + w); + if (NULL == w->hh) + { + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + +/** + * Function called with information about a accounts + * the wirewatcher should monitor. + * + * @param cls closure (NULL) + * @param instance instance that owns the account + * @param payto_uri account URI + * @param credit_facade_url URL for the credit facade + * @param credit_facade_credentials account access credentials + * @param last_serial last transaction serial (inclusive) we have seen from this account + */ +static void +start_watch ( + void *cls, + const char *instance, + const char *payto_uri, + const char *credit_facade_url, + const json_t *credit_facade_credentials, + uint64_t last_serial) +{ + struct Watch *w = GNUNET_new (struct Watch); + + (void) cls; + w->bank_timeout = BANK_TIMEOUT; + if (GNUNET_OK != + TALER_MERCHANT_BANK_auth_parse_json (credit_facade_credentials, + credit_facade_url, + &w->ad)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to parse authentication data of `%s/%s'\n", + instance, + payto_uri); + GNUNET_free (w); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NOTCONFIGURED; + return; + } + + GNUNET_CONTAINER_DLL_insert (w_head, + w_tail, + w); + w->instance_id = GNUNET_strdup (instance); + w->payto_uri = TALER_payto_normalize (payto_uri); + w->start_row = last_serial; + w->task = GNUNET_SCHEDULER_add_now (&do_work, + w); +} + + +/** + * Function called on configuration change events received from Postgres. We + * shutdown (and systemd should restart us). + * + * @param cls closure (NULL) + * @param extra additional event data provided + * @param extra_size number of bytes in @a extra + */ +static void +config_changed (void *cls, + const void *extra, + size_t extra_size) +{ + (void) cls; + (void) extra; + (void) extra_size; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Configuration changed, %s\n", + 0 == persist_mode + ? "restarting" + : "reinitializing"); + config_changed_flag = true; + GNUNET_SCHEDULER_shutdown (); +} + + +/** + * First task. + * + * @param cls closure, NULL + * @param args remaining command-line arguments + * @param cfgfile name of the configuration file used (for saving, can be NULL!) + * @param c configuration + */ +static void +run (void *cls, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + (void) args; + (void) cfgfile; + + cfg = c; + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); + ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, + &rc); + rc = GNUNET_CURL_gnunet_rc_create (ctx); + if (NULL == ctx) + { + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NO_RESTART; + return; + } + if (NULL == + (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to initialize DB subsystem\n"); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NOTCONFIGURED; + return; + } + if (GNUNET_OK != + db_plugin->connect (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to connect to database\n"); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NO_RESTART; + return; + } + { + struct GNUNET_DB_EventHeaderP es = { + .size = htons (sizeof (es)), + .type = htons (TALER_DBEVENT_MERCHANT_ACCOUNTS_CHANGED) + }; + + eh = db_plugin->event_listen (db_plugin->cls, + &es, + GNUNET_TIME_UNIT_FOREVER_REL, + &config_changed, + NULL); + } + { + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->select_wirewatch_accounts (db_plugin->cls, + &start_watch, + NULL); + if (qs < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain wirewatch accounts from database\n"); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NO_RESTART; + return; + } + if ( (NULL == w_head) && + (GNUNET_YES == test_mode) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No active wirewatch accounts in database and in test mode. Exiting.\n"); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_SUCCESS; + return; + } + } +} + + +/** + * The main function of taler-merchant-wirewatch + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, 1 on error + */ +int +main (int argc, + char *const *argv) +{ + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_flag ('p', + "persist", + "run in persist mode and do not exit on configuration changes", + &persist_mode), + GNUNET_GETOPT_option_timetravel ('T', + "timetravel"), + GNUNET_GETOPT_option_flag ('t', + "test", + "run in test mode and exit when idle", + &test_mode), + GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), + GNUNET_GETOPT_OPTION_END + }; + enum GNUNET_GenericReturnValue ret; + + if (GNUNET_OK != + GNUNET_STRINGS_get_utf8_args (argc, argv, + &argc, &argv)) + return EXIT_INVALIDARGUMENT; + TALER_OS_init (); + do { + config_changed_flag = false; + ret = GNUNET_PROGRAM_run ( + argc, argv, + "taler-merchant-wirewatch", + gettext_noop ( + "background process that watches for incoming wire transfers to the merchant bank account"), + options, + &run, NULL); + } while ( (1 == persist_mode) && + config_changed_flag); + GNUNET_free_nz ((void *) argv); + if (GNUNET_SYSERR == ret) + return EXIT_INVALIDARGUMENT; + if (GNUNET_NO == ret) + return EXIT_SUCCESS; + return global_ret; +} + + +/* end of taler-exchange-wirewatch.c */ |