/* This file is part of TALER Copyright (C) 2023 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ /** * @file taler-merchant-wirewatch.c * @brief Process that imports information about incoming bank transfers into the merchant backend * @author Christian Grothoff */ #include "platform.h" #include #include #include #include #include "taler_merchant_bank_lib.h" #include "taler_merchantdb_lib.h" #include "taler_merchantdb_plugin.h" /** * Timeout for the bank interaction. Rather long as we should do long-polling * and do not want to wake up too often. */ #define BANK_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \ 5) /** * Information about a watch job. */ struct Watch { /** * Kept in a DLL. */ struct Watch *next; /** * Kept in a DLL. */ struct Watch *prev; /** * Next task to run, if any. */ struct GNUNET_SCHEDULER_Task *task; /** * Dynamically adjusted long polling time-out. */ struct GNUNET_TIME_Relative bank_timeout; /** * For which instance are we importing bank transfers? */ char *instance_id; /** * For which account are we importing bank transfers? */ char *payto_uri; /** * Bank history request. */ struct TALER_MERCHANT_BANK_CreditHistoryHandle *hh; /** * Start row for the bank interaction. Exclusive. */ uint64_t start_row; /** * Artificial delay to use between API calls. Used to * throttle on failures. */ struct GNUNET_TIME_Relative delay; /** * When did we start our last HTTP request? */ struct GNUNET_TIME_Absolute start_time; /** * How long should long-polling take at least? */ struct GNUNET_TIME_Absolute long_poll_timeout; /** * Login data for the bank. */ struct TALER_MERCHANT_BANK_AuthenticationData ad; /** * Set to true if we found a transaction in the last iteration. */ bool found; }; /** * Head of active watches. */ static struct Watch *w_head; /** * Tail of active watches. */ static struct Watch *w_tail; /** * The merchant's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Our database plugin. */ static struct TALER_MERCHANTDB_Plugin *db_plugin; /** * Handle to the context for interacting with the bank. */ static struct GNUNET_CURL_Context *ctx; /** * Scheduler context for running the @e ctx. */ static struct GNUNET_CURL_RescheduleContext *rc; /** * Event handler to learn that the configuration changed * and we should shutdown (to be restarted). */ static struct GNUNET_DB_EventHandler *eh; /** * Value to return from main(). 0 on success, non-zero on errors. */ static int global_ret; /** * How many transactions should we fetch at most per batch? */ static unsigned int batch_size = 32; /** * #GNUNET_YES if we are in test mode and should exit when idle. */ static int test_mode; /** * #GNUNET_YES if we are in persistent mode and do * not exit on #config_changed. */ static int persist_mode; /** * Set to true if we are shutting down due to a * configuration change. */ static bool config_changed_flag; /** * Save progress in DB. */ static void save (struct Watch *w) { enum GNUNET_DB_QueryStatus qs; qs = db_plugin->update_wirewatch_progress (db_plugin->cls, w->instance_id, w->payto_uri, w->start_row); if (qs < 0) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to persist wirewatch progress for %s/%s (%d)\n", w->instance_id, w->payto_uri, qs); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_FAILURE; } } /** * Free resources of @a w. * * @param w watch job to terminate */ static void end_watch (struct Watch *w) { if (NULL != w->task) { GNUNET_SCHEDULER_cancel (w->task); w->task = NULL; } if (NULL != w->hh) { TALER_MERCHANT_BANK_credit_history_cancel (w->hh); w->hh = NULL; } GNUNET_free (w->instance_id); GNUNET_free (w->payto_uri); TALER_MERCHANT_BANK_auth_free (&w->ad); GNUNET_CONTAINER_DLL_remove (w_head, w_tail, w); GNUNET_free (w); } /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. * * @param cls closure */ static void shutdown_task (void *cls) { (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); while (NULL != w_head) { struct Watch *w = w_head; save (w); end_watch (w); } if (NULL != eh) { db_plugin->event_listen_cancel (eh); eh = NULL; } TALER_MERCHANTDB_plugin_unload (db_plugin); db_plugin = NULL; cfg = NULL; if (NULL != ctx) { GNUNET_CURL_fini (ctx); ctx = NULL; } if (NULL != rc) { GNUNET_CURL_gnunet_rc_destroy (rc); rc = NULL; } } /** * Parse @a subject from wire transfer into @a wtid and @a exchange_url. * * @param subject wire transfer subject to parse; * format is "$WTID $URL" * @param[out] wtid wire transfer ID to extract * @param[out] exchange_url set to exchange URL * @return #GNUNET_OK on success */ static enum GNUNET_GenericReturnValue parse_subject (const char *subject, struct TALER_WireTransferIdentifierRawP *wtid, char **exchange_url) { const char *space; space = strchr (subject, ' '); if (NULL == space) return GNUNET_NO; if (GNUNET_OK != GNUNET_STRINGS_string_to_data (subject, space - subject, wtid, sizeof (*wtid))) return GNUNET_NO; space++; if (! TALER_url_valid_charset (space)) return GNUNET_NO; if ( (0 != strncasecmp ("http://", space, strlen ("http://"))) && (0 != strncasecmp ("https://", space, strlen ("https://"))) ) return GNUNET_NO; *exchange_url = GNUNET_strdup (space); return GNUNET_OK; } /** * Run next iteration. * * @param cls a `struct Watch *` */ static void do_work (void *cls); /** * Callbacks of this type are used to serve the result of asking * the bank for the credit transaction history. * * @param cls a `struct Watch *` * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request * 0 if the bank's reply is bogus (fails to follow the protocol), * #MHD_HTTP_NO_CONTENT if there are no more results; on success the * last callback is always of this status (even if `abs(num_results)` were * already returned). * @param ec detailed error code * @param serial_id monotonically increasing counter corresponding to the transaction * @param details details about the wire transfer * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration */ static enum GNUNET_GenericReturnValue credit_cb ( void *cls, unsigned int http_status, enum TALER_ErrorCode ec, uint64_t serial_id, const struct TALER_MERCHANT_BANK_CreditDetails *details) { struct Watch *w = cls; switch (http_status) { case 0: GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Invalid HTTP response (HTTP status: 0, %d) from bank\n", ec); w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); break; case MHD_HTTP_OK: { enum GNUNET_DB_QueryStatus qs; char *exchange_url; struct TALER_WireTransferIdentifierRawP wtid; char *credit_payto; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Received wire transfer `%s' over %s\n", details->wire_subject, TALER_amount2s (&details->amount)); w->found = true; if (GNUNET_OK != parse_subject (details->wire_subject, &wtid, &exchange_url)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Skipping transfer %llu (%s): not from exchange\n", (unsigned long long) serial_id, details->wire_subject); w->start_row = serial_id; return GNUNET_OK; } /* FIXME-Performance-Optimization: consider grouping multiple inserts into one bigger transaction with just one notify. */ credit_payto = TALER_payto_normalize (details->credit_account_uri); qs = db_plugin->insert_transfer (db_plugin->cls, w->instance_id, exchange_url, &wtid, &details->amount, credit_payto, true /* confirmed */); if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { struct TALER_Amount total; struct TALER_Amount wfee; struct TALER_Amount eamount; struct GNUNET_TIME_Timestamp timestamp; bool have_esig; bool verified; qs = db_plugin->lookup_transfer (db_plugin->cls, w->instance_id, exchange_url, &wtid, &total, &wfee, &eamount, ×tamp, &have_esig, &verified); if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Inserting transfer for %s into database failed. Is the credit account %s configured correctly?\n", w->instance_id, credit_payto); } if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) { if (0 != TALER_amount_cmp (&total, &details->amount)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Inserting transfer for %s into database failed. An entry exists for a different transfer amount (%s)!\n", w->instance_id, TALER_amount2s (&total)); } else { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Inserting transfer for %s into database failed. An equivalent entry already exists.\n", w->instance_id); } } } GNUNET_free (credit_payto); GNUNET_free (exchange_url); if (qs < 0) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); w->hh = NULL; return GNUNET_SYSERR; } /* Success => reset back-off timer! */ w->delay = GNUNET_TIME_UNIT_ZERO; { struct GNUNET_DB_EventHeaderP es = { .size = htons (sizeof (es)), .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED) }; db_plugin->event_notify (db_plugin->cls, &es, NULL, 0); } } w->start_row = serial_id; return GNUNET_OK; case MHD_HTTP_NO_CONTENT: save (w); /* Delay artificially if server returned before long-poll timeout */ if (! w->found) w->delay = GNUNET_TIME_absolute_get_remaining (w->long_poll_timeout); break; case MHD_HTTP_NOT_FOUND: /* configuration likely wrong, wait at least 1 minute, backoff up to 15 minutes! */ w->delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MINUTES, GNUNET_TIME_STD_BACKOFF (w->delay)); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Bank claims account is unknown, waiting for %s before trying again\n", GNUNET_TIME_relative2s (w->delay, true)); break; case MHD_HTTP_GATEWAY_TIMEOUT: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Gateway timeout, adjusting long polling threshold\n"); /* Limit new timeout at request delay */ w->bank_timeout = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_duration ( w->start_time), w->bank_timeout); /* set the timeout a bit earlier */ w->bank_timeout = GNUNET_TIME_relative_subtract (w->bank_timeout, GNUNET_TIME_UNIT_SECONDS); /* do not allow it to go to zero */ w->bank_timeout = GNUNET_TIME_relative_max (w->bank_timeout, GNUNET_TIME_UNIT_SECONDS); w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); break; default: /* Something went wrong, try again, but with back-off */ w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Unexpected HTTP status code %u(%d) from bank\n", http_status, ec); break; } w->hh = NULL; if (test_mode && (! w->found)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No transactions found and in test mode. Ending watch!\n"); end_watch (w); if (NULL == w_head) GNUNET_SCHEDULER_shutdown (); return GNUNET_OK; } w->task = GNUNET_SCHEDULER_add_delayed (w->delay, &do_work, w); return GNUNET_OK; } static void do_work (void *cls) { struct Watch *w = cls; w->task = NULL; w->found = false; w->long_poll_timeout = GNUNET_TIME_relative_to_absolute (w->bank_timeout); w->start_time = GNUNET_TIME_absolute_get (); w->hh = TALER_MERCHANT_BANK_credit_history (ctx, &w->ad, w->start_row, batch_size, test_mode ? GNUNET_TIME_UNIT_ZERO : w->bank_timeout, &credit_cb, w); if (NULL == w->hh) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); return; } } /** * Function called with information about a accounts * the wirewatcher should monitor. * * @param cls closure (NULL) * @param instance instance that owns the account * @param payto_uri account URI * @param credit_facade_url URL for the credit facade * @param credit_facade_credentials account access credentials * @param last_serial last transaction serial (inclusive) we have seen from this account */ static void start_watch ( void *cls, const char *instance, const char *payto_uri, const char *credit_facade_url, const json_t *credit_facade_credentials, uint64_t last_serial) { struct Watch *w = GNUNET_new (struct Watch); (void) cls; w->bank_timeout = BANK_TIMEOUT; if (GNUNET_OK != TALER_MERCHANT_BANK_auth_parse_json (credit_facade_credentials, credit_facade_url, &w->ad)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to parse authentication data of `%s/%s'\n", instance, payto_uri); GNUNET_free (w); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_NOTCONFIGURED; return; } GNUNET_CONTAINER_DLL_insert (w_head, w_tail, w); w->instance_id = GNUNET_strdup (instance); w->payto_uri = TALER_payto_normalize (payto_uri); w->start_row = last_serial; w->task = GNUNET_SCHEDULER_add_now (&do_work, w); } /** * Function called on configuration change events received from Postgres. We * shutdown (and systemd should restart us). * * @param cls closure (NULL) * @param extra additional event data provided * @param extra_size number of bytes in @a extra */ static void config_changed (void *cls, const void *extra, size_t extra_size) { (void) cls; (void) extra; (void) extra_size; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Configuration changed, %s\n", 0 == persist_mode ? "restarting" : "reinitializing"); config_changed_flag = true; GNUNET_SCHEDULER_shutdown (); } /** * First task. * * @param cls closure, NULL * @param args remaining command-line arguments * @param cfgfile name of the configuration file used (for saving, can be NULL!) * @param c configuration */ static void run (void *cls, char *const *args, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *c) { (void) args; (void) cfgfile; cfg = c; GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); rc = GNUNET_CURL_gnunet_rc_create (ctx); if (NULL == ctx) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_NO_RESTART; return; } if (NULL == (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to initialize DB subsystem\n"); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_NOTCONFIGURED; return; } if (GNUNET_OK != db_plugin->connect (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to connect to database\n"); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_NO_RESTART; return; } { struct GNUNET_DB_EventHeaderP es = { .size = htons (sizeof (es)), .type = htons (TALER_DBEVENT_MERCHANT_ACCOUNTS_CHANGED) }; eh = db_plugin->event_listen (db_plugin->cls, &es, GNUNET_TIME_UNIT_FOREVER_REL, &config_changed, NULL); } { enum GNUNET_DB_QueryStatus qs; qs = db_plugin->select_wirewatch_accounts (db_plugin->cls, &start_watch, NULL); if (qs < 0) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain wirewatch accounts from database\n"); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_NO_RESTART; return; } if ( (NULL == w_head) && (GNUNET_YES == test_mode) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No active wirewatch accounts in database and in test mode. Exiting.\n"); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_SUCCESS; return; } } } /** * The main function of taler-merchant-wirewatch * * @param argc number of arguments from the command line * @param argv command line arguments * @return 0 ok, 1 on error */ int main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_flag ('p', "persist", "run in persist mode and do not exit on configuration changes", &persist_mode), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return EXIT_INVALIDARGUMENT; TALER_OS_init (); do { config_changed_flag = false; ret = GNUNET_PROGRAM_run ( argc, argv, "taler-merchant-wirewatch", gettext_noop ( "background process that watches for incoming wire transfers to the merchant bank account"), options, &run, NULL); } while ( (1 == persist_mode) && config_changed_flag); GNUNET_free_nz ((void *) argv); if (GNUNET_SYSERR == ret) return EXIT_INVALIDARGUMENT; if (GNUNET_NO == ret) return EXIT_SUCCESS; return global_ret; } /* end of taler-exchange-wirewatch.c */