summaryrefslogtreecommitdiff
path: root/src/backend/taler-merchant-wirewatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/taler-merchant-wirewatch.c')
-rw-r--r--src/backend/taler-merchant-wirewatch.c756
1 files changed, 756 insertions, 0 deletions
diff --git a/src/backend/taler-merchant-wirewatch.c b/src/backend/taler-merchant-wirewatch.c
new file mode 100644
index 00000000..17eb7a0a
--- /dev/null
+++ b/src/backend/taler-merchant-wirewatch.c
@@ -0,0 +1,756 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2023 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+/**
+ * @file taler-merchant-wirewatch.c
+ * @brief Process that imports information about incoming bank transfers into the merchant backend
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include <gnunet/gnunet_util_lib.h>
+#include <jansson.h>
+#include <pthread.h>
+#include <taler/taler_dbevents.h>
+#include "taler_merchant_bank_lib.h"
+#include "taler_merchantdb_lib.h"
+#include "taler_merchantdb_plugin.h"
+
+/**
+ * Timeout for the bank interaction. Rather long as we should do long-polling
+ * and do not want to wake up too often.
+ */
+#define BANK_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \
+ 5)
+
+
+/**
+ * Information about a watch job.
+ */
+struct Watch
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct Watch *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct Watch *prev;
+
+ /**
+ * Next task to run, if any.
+ */
+ struct GNUNET_SCHEDULER_Task *task;
+
+ /**
+ * Dynamically adjusted long polling time-out.
+ */
+ struct GNUNET_TIME_Relative bank_timeout;
+
+ /**
+ * For which instance are we importing bank transfers?
+ */
+ char *instance_id;
+
+ /**
+ * For which account are we importing bank transfers?
+ */
+ char *payto_uri;
+
+ /**
+ * Bank history request.
+ */
+ struct TALER_MERCHANT_BANK_CreditHistoryHandle *hh;
+
+ /**
+ * Start row for the bank interaction. Exclusive.
+ */
+ uint64_t start_row;
+
+ /**
+ * Artificial delay to use between API calls. Used to
+ * throttle on failures.
+ */
+ struct GNUNET_TIME_Relative delay;
+
+ /**
+ * When did we start our last HTTP request?
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
+ * How long should long-polling take at least?
+ */
+ struct GNUNET_TIME_Absolute long_poll_timeout;
+
+ /**
+ * Login data for the bank.
+ */
+ struct TALER_MERCHANT_BANK_AuthenticationData ad;
+
+ /**
+ * Set to true if we found a transaction in the last iteration.
+ */
+ bool found;
+
+};
+
+
+/**
+ * Head of active watches.
+ */
+static struct Watch *w_head;
+
+/**
+ * Tail of active watches.
+ */
+static struct Watch *w_tail;
+
+/**
+ * The merchant's configuration.
+ */
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+/**
+ * Our database plugin.
+ */
+static struct TALER_MERCHANTDB_Plugin *db_plugin;
+
+/**
+ * Handle to the context for interacting with the bank.
+ */
+static struct GNUNET_CURL_Context *ctx;
+
+/**
+ * Scheduler context for running the @e ctx.
+ */
+static struct GNUNET_CURL_RescheduleContext *rc;
+
+/**
+ * Event handler to learn that the configuration changed
+ * and we should shutdown (to be restarted).
+ */
+static struct GNUNET_DB_EventHandler *eh;
+
+/**
+ * Value to return from main(). 0 on success, non-zero on errors.
+ */
+static int global_ret;
+
+/**
+ * How many transactions should we fetch at most per batch?
+ */
+static unsigned int batch_size = 32;
+
+/**
+ * #GNUNET_YES if we are in test mode and should exit when idle.
+ */
+static int test_mode;
+
+/**
+ * #GNUNET_YES if we are in persistent mode and do
+ * not exit on #config_changed.
+ */
+static int persist_mode;
+
+/**
+ * Set to true if we are shutting down due to a
+ * configuration change.
+ */
+static bool config_changed_flag;
+
+/**
+ * Save progress in DB.
+ */
+static void
+save (struct Watch *w)
+{
+ enum GNUNET_DB_QueryStatus qs;
+
+ qs = db_plugin->update_wirewatch_progress (db_plugin->cls,
+ w->instance_id,
+ w->payto_uri,
+ w->start_row);
+ if (qs < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to persist wirewatch progress for %s/%s (%d)\n",
+ w->instance_id,
+ w->payto_uri,
+ qs);
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_FAILURE;
+ }
+}
+
+
+/**
+ * Free resources of @a w.
+ *
+ * @param w watch job to terminate
+ */
+static void
+end_watch (struct Watch *w)
+{
+ if (NULL != w->task)
+ {
+ GNUNET_SCHEDULER_cancel (w->task);
+ w->task = NULL;
+ }
+ if (NULL != w->hh)
+ {
+ TALER_MERCHANT_BANK_credit_history_cancel (w->hh);
+ w->hh = NULL;
+ }
+ GNUNET_free (w->instance_id);
+ GNUNET_free (w->payto_uri);
+ TALER_MERCHANT_BANK_auth_free (&w->ad);
+ GNUNET_CONTAINER_DLL_remove (w_head,
+ w_tail,
+ w);
+ GNUNET_free (w);
+}
+
+
+/**
+ * We're being aborted with CTRL-C (or SIGTERM). Shut down.
+ *
+ * @param cls closure
+ */
+static void
+shutdown_task (void *cls)
+{
+ (void) cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Running shutdown\n");
+ while (NULL != w_head)
+ {
+ struct Watch *w = w_head;
+
+ save (w);
+ end_watch (w);
+ }
+ if (NULL != eh)
+ {
+ db_plugin->event_listen_cancel (eh);
+ eh = NULL;
+ }
+ TALER_MERCHANTDB_plugin_unload (db_plugin);
+ db_plugin = NULL;
+ cfg = NULL;
+ if (NULL != ctx)
+ {
+ GNUNET_CURL_fini (ctx);
+ ctx = NULL;
+ }
+ if (NULL != rc)
+ {
+ GNUNET_CURL_gnunet_rc_destroy (rc);
+ rc = NULL;
+ }
+}
+
+
+/**
+ * Parse @a subject from wire transfer into @a wtid and @a exchange_url.
+ *
+ * @param subject wire transfer subject to parse;
+ * format is "$WTID $URL"
+ * @param[out] wtid wire transfer ID to extract
+ * @param[out] exchange_url set to exchange URL
+ * @return #GNUNET_OK on success
+ */
+static enum GNUNET_GenericReturnValue
+parse_subject (const char *subject,
+ struct TALER_WireTransferIdentifierRawP *wtid,
+ char **exchange_url)
+{
+ const char *space;
+
+ space = strchr (subject, ' ');
+ if (NULL == space)
+ return GNUNET_NO;
+ if (GNUNET_OK !=
+ GNUNET_STRINGS_string_to_data (subject,
+ space - subject,
+ wtid,
+ sizeof (*wtid)))
+ return GNUNET_NO;
+ space++;
+ if (! TALER_url_valid_charset (space))
+ return GNUNET_NO;
+ if ( (0 != strncasecmp ("http://",
+ space,
+ strlen ("http://"))) &&
+ (0 != strncasecmp ("https://",
+ space,
+ strlen ("https://"))) )
+ return GNUNET_NO;
+ *exchange_url = GNUNET_strdup (space);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Run next iteration.
+ *
+ * @param cls a `struct Watch *`
+ */
+static void
+do_work (void *cls);
+
+
+/**
+ * Callbacks of this type are used to serve the result of asking
+ * the bank for the credit transaction history.
+ *
+ * @param cls a `struct Watch *`
+ * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request
+ * 0 if the bank's reply is bogus (fails to follow the protocol),
+ * #MHD_HTTP_NO_CONTENT if there are no more results; on success the
+ * last callback is always of this status (even if `abs(num_results)` were
+ * already returned).
+ * @param ec detailed error code
+ * @param serial_id monotonically increasing counter corresponding to the transaction
+ * @param details details about the wire transfer
+ * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
+ */
+static enum GNUNET_GenericReturnValue
+credit_cb (
+ void *cls,
+ unsigned int http_status,
+ enum TALER_ErrorCode ec,
+ uint64_t serial_id,
+ const struct TALER_MERCHANT_BANK_CreditDetails *details)
+{
+ struct Watch *w = cls;
+
+ switch (http_status)
+ {
+ case 0:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Invalid HTTP response (HTTP status: 0, %d) from bank\n",
+ ec);
+ w->delay = GNUNET_TIME_STD_BACKOFF (w->delay);
+ break;
+ case MHD_HTTP_OK:
+ {
+ enum GNUNET_DB_QueryStatus qs;
+ char *exchange_url;
+ struct TALER_WireTransferIdentifierRawP wtid;
+ char *credit_payto;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Received wire transfer `%s' over %s\n",
+ details->wire_subject,
+ TALER_amount2s (&details->amount));
+ w->found = true;
+ if (GNUNET_OK !=
+ parse_subject (details->wire_subject,
+ &wtid,
+ &exchange_url))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Skipping transfer %llu (%s): not from exchange\n",
+ (unsigned long long) serial_id,
+ details->wire_subject);
+ w->start_row = serial_id;
+ return GNUNET_OK;
+ }
+ /* FIXME-Performance-Optimization: consider grouping multiple inserts
+ into one bigger transaction with just one notify. */
+ credit_payto = TALER_payto_normalize (details->credit_account_uri);
+ qs = db_plugin->insert_transfer (db_plugin->cls,
+ w->instance_id,
+ exchange_url,
+ &wtid,
+ &details->amount,
+ credit_payto,
+ true /* confirmed */);
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ struct TALER_Amount total;
+ struct TALER_Amount wfee;
+ struct TALER_Amount eamount;
+ struct GNUNET_TIME_Timestamp timestamp;
+ bool have_esig;
+ bool verified;
+
+ qs = db_plugin->lookup_transfer (db_plugin->cls,
+ w->instance_id,
+ exchange_url,
+ &wtid,
+ &total,
+ &wfee,
+ &eamount,
+ &timestamp,
+ &have_esig,
+ &verified);
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Inserting transfer for %s into database failed. Is the credit account %s configured correctly?\n",
+ w->instance_id,
+ credit_payto);
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
+ {
+ if (0 !=
+ TALER_amount_cmp (&total,
+ &details->amount))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Inserting transfer for %s into database failed. An entry exists for a different transfer amount (%s)!\n",
+ w->instance_id,
+ TALER_amount2s (&total));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Inserting transfer for %s into database failed. An equivalent entry already exists.\n",
+ w->instance_id);
+ }
+ }
+ }
+ GNUNET_free (credit_payto);
+ GNUNET_free (exchange_url);
+ if (qs < 0)
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ w->hh = NULL;
+ return GNUNET_SYSERR;
+ }
+ /* Success => reset back-off timer! */
+ w->delay = GNUNET_TIME_UNIT_ZERO;
+ {
+ struct GNUNET_DB_EventHeaderP es = {
+ .size = htons (sizeof (es)),
+ .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED)
+ };
+
+ db_plugin->event_notify (db_plugin->cls,
+ &es,
+ NULL,
+ 0);
+ }
+ }
+ w->start_row = serial_id;
+ return GNUNET_OK;
+ case MHD_HTTP_NO_CONTENT:
+ save (w);
+ /* Delay artificially if server returned before long-poll timeout */
+ if (! w->found)
+ w->delay = GNUNET_TIME_absolute_get_remaining (w->long_poll_timeout);
+ break;
+ case MHD_HTTP_NOT_FOUND:
+ /* configuration likely wrong, wait at least 1 minute, backoff up to 15 minutes! */
+ w->delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MINUTES,
+ GNUNET_TIME_STD_BACKOFF (w->delay));
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Bank claims account is unknown, waiting for %s before trying again\n",
+ GNUNET_TIME_relative2s (w->delay,
+ true));
+ break;
+ case MHD_HTTP_GATEWAY_TIMEOUT:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Gateway timeout, adjusting long polling threshold\n");
+ /* Limit new timeout at request delay */
+ w->bank_timeout
+ = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_duration (
+ w->start_time),
+ w->bank_timeout);
+ /* set the timeout a bit earlier */
+ w->bank_timeout
+ = GNUNET_TIME_relative_subtract (w->bank_timeout,
+ GNUNET_TIME_UNIT_SECONDS);
+ /* do not allow it to go to zero */
+ w->bank_timeout
+ = GNUNET_TIME_relative_max (w->bank_timeout,
+ GNUNET_TIME_UNIT_SECONDS);
+ w->delay = GNUNET_TIME_STD_BACKOFF (w->delay);
+ break;
+ default:
+ /* Something went wrong, try again, but with back-off */
+ w->delay = GNUNET_TIME_STD_BACKOFF (w->delay);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Unexpected HTTP status code %u(%d) from bank\n",
+ http_status,
+ ec);
+ break;
+ }
+ w->hh = NULL;
+ if (test_mode && (! w->found))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No transactions found and in test mode. Ending watch!\n");
+ end_watch (w);
+ if (NULL == w_head)
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_OK;
+ }
+ w->task = GNUNET_SCHEDULER_add_delayed (w->delay,
+ &do_work,
+ w);
+ return GNUNET_OK;
+}
+
+
+static void
+do_work (void *cls)
+{
+ struct Watch *w = cls;
+
+ w->task = NULL;
+ w->found = false;
+ w->long_poll_timeout
+ = GNUNET_TIME_relative_to_absolute (w->bank_timeout);
+ w->start_time
+ = GNUNET_TIME_absolute_get ();
+ w->hh = TALER_MERCHANT_BANK_credit_history (ctx,
+ &w->ad,
+ w->start_row,
+ batch_size,
+ test_mode
+ ? GNUNET_TIME_UNIT_ZERO
+ : w->bank_timeout,
+ &credit_cb,
+ w);
+ if (NULL == w->hh)
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+}
+
+
+/**
+ * Function called with information about a accounts
+ * the wirewatcher should monitor.
+ *
+ * @param cls closure (NULL)
+ * @param instance instance that owns the account
+ * @param payto_uri account URI
+ * @param credit_facade_url URL for the credit facade
+ * @param credit_facade_credentials account access credentials
+ * @param last_serial last transaction serial (inclusive) we have seen from this account
+ */
+static void
+start_watch (
+ void *cls,
+ const char *instance,
+ const char *payto_uri,
+ const char *credit_facade_url,
+ const json_t *credit_facade_credentials,
+ uint64_t last_serial)
+{
+ struct Watch *w = GNUNET_new (struct Watch);
+
+ (void) cls;
+ w->bank_timeout = BANK_TIMEOUT;
+ if (GNUNET_OK !=
+ TALER_MERCHANT_BANK_auth_parse_json (credit_facade_credentials,
+ credit_facade_url,
+ &w->ad))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to parse authentication data of `%s/%s'\n",
+ instance,
+ payto_uri);
+ GNUNET_free (w);
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NOTCONFIGURED;
+ return;
+ }
+
+ GNUNET_CONTAINER_DLL_insert (w_head,
+ w_tail,
+ w);
+ w->instance_id = GNUNET_strdup (instance);
+ w->payto_uri = TALER_payto_normalize (payto_uri);
+ w->start_row = last_serial;
+ w->task = GNUNET_SCHEDULER_add_now (&do_work,
+ w);
+}
+
+
+/**
+ * Function called on configuration change events received from Postgres. We
+ * shutdown (and systemd should restart us).
+ *
+ * @param cls closure (NULL)
+ * @param extra additional event data provided
+ * @param extra_size number of bytes in @a extra
+ */
+static void
+config_changed (void *cls,
+ const void *extra,
+ size_t extra_size)
+{
+ (void) cls;
+ (void) extra;
+ (void) extra_size;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Configuration changed, %s\n",
+ 0 == persist_mode
+ ? "restarting"
+ : "reinitializing");
+ config_changed_flag = true;
+ GNUNET_SCHEDULER_shutdown ();
+}
+
+
+/**
+ * First task.
+ *
+ * @param cls closure, NULL
+ * @param args remaining command-line arguments
+ * @param cfgfile name of the configuration file used (for saving, can be NULL!)
+ * @param c configuration
+ */
+static void
+run (void *cls,
+ char *const *args,
+ const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *c)
+{
+ (void) args;
+ (void) cfgfile;
+
+ cfg = c;
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+ NULL);
+ ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
+ &rc);
+ rc = GNUNET_CURL_gnunet_rc_create (ctx);
+ if (NULL == ctx)
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NO_RESTART;
+ return;
+ }
+ if (NULL ==
+ (db_plugin = TALER_MERCHANTDB_plugin_load (cfg)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to initialize DB subsystem\n");
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NOTCONFIGURED;
+ return;
+ }
+ if (GNUNET_OK !=
+ db_plugin->connect (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to connect to database\n");
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NO_RESTART;
+ return;
+ }
+ {
+ struct GNUNET_DB_EventHeaderP es = {
+ .size = htons (sizeof (es)),
+ .type = htons (TALER_DBEVENT_MERCHANT_ACCOUNTS_CHANGED)
+ };
+
+ eh = db_plugin->event_listen (db_plugin->cls,
+ &es,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &config_changed,
+ NULL);
+ }
+ {
+ enum GNUNET_DB_QueryStatus qs;
+
+ qs = db_plugin->select_wirewatch_accounts (db_plugin->cls,
+ &start_watch,
+ NULL);
+ if (qs < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain wirewatch accounts from database\n");
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NO_RESTART;
+ return;
+ }
+ if ( (NULL == w_head) &&
+ (GNUNET_YES == test_mode) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No active wirewatch accounts in database and in test mode. Exiting.\n");
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_SUCCESS;
+ return;
+ }
+ }
+}
+
+
+/**
+ * The main function of taler-merchant-wirewatch
+ *
+ * @param argc number of arguments from the command line
+ * @param argv command line arguments
+ * @return 0 ok, 1 on error
+ */
+int
+main (int argc,
+ char *const *argv)
+{
+ struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_flag ('p',
+ "persist",
+ "run in persist mode and do not exit on configuration changes",
+ &persist_mode),
+ GNUNET_GETOPT_option_timetravel ('T',
+ "timetravel"),
+ GNUNET_GETOPT_option_flag ('t',
+ "test",
+ "run in test mode and exit when idle",
+ &test_mode),
+ GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
+ GNUNET_GETOPT_OPTION_END
+ };
+ enum GNUNET_GenericReturnValue ret;
+
+ if (GNUNET_OK !=
+ GNUNET_STRINGS_get_utf8_args (argc, argv,
+ &argc, &argv))
+ return EXIT_INVALIDARGUMENT;
+ TALER_OS_init ();
+ do {
+ config_changed_flag = false;
+ ret = GNUNET_PROGRAM_run (
+ argc, argv,
+ "taler-merchant-wirewatch",
+ gettext_noop (
+ "background process that watches for incoming wire transfers to the merchant bank account"),
+ options,
+ &run, NULL);
+ } while ( (1 == persist_mode) &&
+ config_changed_flag);
+ GNUNET_free_nz ((void *) argv);
+ if (GNUNET_SYSERR == ret)
+ return EXIT_INVALIDARGUMENT;
+ if (GNUNET_NO == ret)
+ return EXIT_SUCCESS;
+ return global_ret;
+}
+
+
+/* end of taler-exchange-wirewatch.c */