From b91fcbb92f21db498214cba38ffd6e3fe886d95e Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 12 Mar 2020 10:23:26 +0100 Subject: finish separation of aggreator into aggregation, closing and transfer processes (test cases still need to be updated) --- src/exchange/.gitignore | 1 + src/exchange/Makefile.am | 13 + src/exchange/taler-exchange-aggregator.c | 338 +------------------ src/exchange/taler-exchange-transfer.c | 544 +++++++++++++++++++++++++++++++ 4 files changed, 560 insertions(+), 336 deletions(-) create mode 100644 src/exchange/taler-exchange-transfer.c diff --git a/src/exchange/.gitignore b/src/exchange/.gitignore index 09cf60a8a..5818f1717 100644 --- a/src/exchange/.gitignore +++ b/src/exchange/.gitignore @@ -8,3 +8,4 @@ taler-exchange-wirewatch test_taler_exchange_wirewatch-postgres test_taler_exchange_httpd_home/.config/taler/account-1.json taler-exchange-closer +taler-exchange-transfer diff --git a/src/exchange/Makefile.am b/src/exchange/Makefile.am index 227224d30..88753c5e4 100644 --- a/src/exchange/Makefile.am +++ b/src/exchange/Makefile.am @@ -20,6 +20,7 @@ bin_PROGRAMS = \ taler-exchange-aggregator \ taler-exchange-closer \ taler-exchange-httpd \ + taler-exchange-transfer \ taler-exchange-wirewatch taler_exchange_aggregator_SOURCES = \ @@ -59,6 +60,18 @@ taler_exchange_wirewatch_LDADD = \ -lgnunetcurl \ -lgnunetutil +taler_exchange_transfer_SOURCES = \ + taler-exchange-transfer.c +taler_exchange_transfer_LDADD = \ + $(LIBGCRYPT_LIBS) \ + $(top_builddir)/src/json/libtalerjson.la \ + $(top_builddir)/src/util/libtalerutil.la \ + $(top_builddir)/src/bank-lib/libtalerbank.la \ + $(top_builddir)/src/exchangedb/libtalerexchangedb.la \ + -ljansson \ + -lgnunetcurl \ + -lgnunetutil + taler_exchange_httpd_SOURCES = \ taler-exchange-httpd.c taler-exchange-httpd.h \ taler-exchange-httpd_db.c taler-exchange-httpd_db.h \ diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 431abea4d..59db4daef 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -18,19 +18,6 @@ * @file taler-exchange-aggregator.c * @brief Process that aggregates outgoing transactions and executes them * @author Christian Grothoff - * - * Note: - * It might be simpler and theoretically more performant to split up - * this process into three: - * - one that runs the 'pending' wire transfers - * - one that performs aggregation - * - one that closes (expired) reserves - * - * They would have some (minor) code duplication to load the database and wire - * plugins and account data, and this would also slightly complicate - * operations by having to launch three processes. OTOH, those processes could - * then fail independently, which might also be a good thing. In any case, - * doing this is not expected to be complicated. */ #include "platform.h" #include @@ -42,38 +29,6 @@ #include "taler_bank_service.h" -/** - * Data we keep to #run_transfers(). There is at most - * one of these around at any given point in time. - * Note that this limits parallelism, and we might want - * to revise this decision at a later point. - */ -struct WirePrepareData -{ - - /** - * Database session for all of our transactions. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Wire execution handle. - */ - struct TALER_BANK_TransferHandle *eh; - - /** - * Wire account used for this preparation. - */ - struct TALER_EXCHANGEDB_WireAccount *wa; - - /** - * Row ID of the transfer. - */ - unsigned long long row_id; - -}; - - /** * Information about one aggregation process to be executed. There is * at most one of these around at any given point in time. @@ -201,22 +156,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; */ static struct GNUNET_SCHEDULER_Task *task; -/** - * If we are currently executing a transfer, information about - * the active transfer is here. Otherwise, this variable is NULL. - */ -static struct WirePrepareData *wpd; - -/** - * Handle to the context for interacting with the bank / wire gateway. - */ -static struct GNUNET_CURL_Context *ctx; - -/** - * Scheduler context for running the @e ctx. - */ -static struct GNUNET_CURL_RescheduleContext *rc; - /** * How long should we sleep when idle before trying to find more work? */ @@ -244,16 +183,6 @@ static void run_aggregation (void *cls); -/** - * Execute the wire transfers that we have committed to - * do. - * - * @param cls NULL - */ -static void -run_transfers (void *cls); - - /** * Free data stored in @a au, but not @a au itself (stack allocated). * @@ -281,16 +210,6 @@ static void shutdown_task (void *cls) { (void) cls; - if (NULL != ctx) - { - GNUNET_CURL_fini (ctx); - ctx = NULL; - } - if (NULL != rc) - { - GNUNET_CURL_gnunet_rc_destroy (rc); - rc = NULL; - } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); if (NULL != task) @@ -298,18 +217,6 @@ shutdown_task (void *cls) GNUNET_SCHEDULER_cancel (task); task = NULL; } - if (NULL != wpd) - { - if (NULL != wpd->eh) - { - TALER_BANK_transfer_cancel (wpd->eh); - wpd->eh = NULL; - } - db_plugin->rollback (db_plugin->cls, - wpd->session); - GNUNET_free (wpd); - wpd = NULL; - } TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); @@ -1038,106 +945,11 @@ run_aggregation (void *cls) return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparation complete, switching to transfer mode\n"); - /* run alternative task: actually do wire transfer! */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - return; - default: - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - } -} - - -/** - * Function called with the result from the execute step. - * - * @param cls NULL - * @param http_status_code #MHD_HTTP_OK on success - * @param ec taler error code - * @param row_id unique ID of the wire transfer in the bank's records - * @param wire_timestamp when did the transfer happen - */ -static void -wire_confirm_cb (void *cls, - unsigned int http_status_code, - enum TALER_ErrorCode ec, - uint64_t row_id, - struct GNUNET_TIME_Absolute wire_timestamp) -{ - struct TALER_EXCHANGEDB_Session *session = wpd->session; - enum GNUNET_DB_QueryStatus qs; - - (void) cls; - (void) row_id; - (void) wire_timestamp; - wpd->eh = NULL; - if (MHD_HTTP_OK != http_status_code) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Wire transaction failed: %u/%d\n", - http_status_code, - ec); - db_plugin->rollback (db_plugin->cls, - session); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; - return; - } - qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, - session, - wpd->row_id); - if (0 >= qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - db_plugin->rollback (db_plugin->cls, - session); - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - /* try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - } - else - { - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - } - GNUNET_free (wpd); - wpd = NULL; - return; - } - GNUNET_free (wpd); - wpd = NULL; - switch (commit_or_warn (session)) - { - case GNUNET_DB_STATUS_SOFT_ERROR: - /* try again */ + "Preparation complete, going again\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); return; - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Wire transfer complete\n"); - /* continue with #run_transfers(), just to guard - against the unlikely case that there are more. */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - return; default: GNUNET_break (0); global_ret = GNUNET_SYSERR; @@ -1147,143 +959,6 @@ wire_confirm_cb (void *cls, } -/** - * Callback with data about a prepared transaction. - * - * @param cls NULL - * @param rowid row identifier used to mark prepared transaction as done - * @param wire_method wire method the preparation was done for - * @param buf transaction data that was persisted, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - */ -static void -wire_prepare_cb (void *cls, - uint64_t rowid, - const char *wire_method, - const char *buf, - size_t buf_size) -{ - struct TALER_EXCHANGEDB_WireAccount *wa; - - (void) cls; - wpd->row_id = rowid; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Starting wire transfer %llu\n", - (unsigned long long) rowid); - wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method); - if (NULL == wpd->wa) - { - /* Should really never happen here, as when we get - here the wire account should be in the cache. */ - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - wpd->session); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; - return; - } - wa = wpd->wa; - wpd->eh = TALER_BANK_transfer (ctx, - &wa->auth, - buf, - buf_size, - &wire_confirm_cb, - NULL); - if (NULL == wpd->eh) - { - GNUNET_break (0); /* Irrecoverable */ - db_plugin->rollback (db_plugin->cls, - wpd->session); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; - return; - } -} - - -/** - * Execute the wire transfers that we have committed to - * do. - * - * @param cls NULL - */ -static void -run_transfers (void *cls) -{ - enum GNUNET_DB_QueryStatus qs; - struct TALER_EXCHANGEDB_Session *session; - const struct GNUNET_SCHEDULER_TaskContext *tc; - - (void) cls; - task = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking for pending wire transfers\n"); - tc = GNUNET_SCHEDULER_get_task_context (); - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - if (NULL == (session = db_plugin->get_session (db_plugin->cls))) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain database session!\n"); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - } - if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - session, - "aggregator run transfer")) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - } - wpd = GNUNET_new (struct WirePrepareData); - wpd->session = session; - qs = db_plugin->wire_prepare_data_get (db_plugin->cls, - session, - &wire_prepare_cb, - NULL); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) - return; /* continued via continuation set in #wire_prepare_cb() */ - db_plugin->rollback (db_plugin->cls, - session); - GNUNET_free (wpd); - wpd = NULL; - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - /* try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* no more prepared wire transfers, go back to aggregation! */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "No more pending wire transfers, starting aggregation\n"); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - return; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* should be impossible */ - GNUNET_assert (0); - } -} - - /** * First task. * @@ -1309,17 +984,8 @@ run (void *cls, global_ret = 1; return; } - ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, - &rc); - rc = GNUNET_CURL_gnunet_rc_create (ctx); - if (NULL == ctx) - { - GNUNET_break (0); - return; - } - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, + task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, cls); diff --git a/src/exchange/taler-exchange-transfer.c b/src/exchange/taler-exchange-transfer.c new file mode 100644 index 000000000..e8c0929be --- /dev/null +++ b/src/exchange/taler-exchange-transfer.c @@ -0,0 +1,544 @@ +/* + This file is part of TALER + Copyright (C) 2016-2020 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see +*/ + +/** + * @file taler-exchange-transfer.c + * @brief Process that actually finalizes outgoing transfers with the wire gateway / bank + * @author Christian Grothoff + */ +#include "platform.h" +#include +#include +#include +#include "taler_exchangedb_lib.h" +#include "taler_exchangedb_plugin.h" +#include "taler_json_lib.h" +#include "taler_bank_service.h" + + +/** + * Data we keep to #run_transfers(). There is at most + * one of these around at any given point in time. + * Note that this limits parallelism, and we might want + * to revise this decision at a later point. + */ +struct WirePrepareData +{ + + /** + * Database session for all of our transactions. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Wire execution handle. + */ + struct TALER_BANK_TransferHandle *eh; + + /** + * Wire account used for this preparation. + */ + struct TALER_EXCHANGEDB_WireAccount *wa; + + /** + * Row ID of the transfer. + */ + unsigned long long row_id; + +}; + + +/** + * The exchange's configuration. + */ +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Our database plugin. + */ +static struct TALER_EXCHANGEDB_Plugin *db_plugin; + +/** + * Next task to run, if any. + */ +static struct GNUNET_SCHEDULER_Task *task; + +/** + * If we are currently executing a transfer, information about + * the active transfer is here. Otherwise, this variable is NULL. + */ +static struct WirePrepareData *wpd; + +/** + * Handle to the context for interacting with the bank / wire gateway. + */ +static struct GNUNET_CURL_Context *ctx; + +/** + * Scheduler context for running the @e ctx. + */ +static struct GNUNET_CURL_RescheduleContext *rc; + +/** + * How long should we sleep when idle before trying to find more work? + */ +static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; + +/** + * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR + * on serious errors. + */ +static int global_ret; + +/** + * #GNUNET_YES if we are in test mode and should exit when idle. + */ +static int test_mode; + + +/** + * Execute the wire transfers that we have committed to + * do. + * + * @param cls NULL + */ +static void +run_transfers (void *cls); + + +/** + * We're being aborted with CTRL-C (or SIGTERM). Shut down. + * + * @param cls closure + */ +static void +shutdown_task (void *cls) +{ + (void) cls; + if (NULL != ctx) + { + GNUNET_CURL_fini (ctx); + ctx = NULL; + } + if (NULL != rc) + { + GNUNET_CURL_gnunet_rc_destroy (rc); + rc = NULL; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Running shutdown\n"); + if (NULL != task) + { + GNUNET_SCHEDULER_cancel (task); + task = NULL; + } + if (NULL != wpd) + { + if (NULL != wpd->eh) + { + TALER_BANK_transfer_cancel (wpd->eh); + wpd->eh = NULL; + } + db_plugin->rollback (db_plugin->cls, + wpd->session); + GNUNET_free (wpd); + wpd = NULL; + } + TALER_EXCHANGEDB_plugin_unload (db_plugin); + db_plugin = NULL; + TALER_EXCHANGEDB_unload_accounts (); + cfg = NULL; +} + + +/** + * Parse the configuration for wirewatch. + * + * @return #GNUNET_OK on success + */ +static int +parse_wirewatch_config () +{ + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_time (cfg, + "exchange", + "AGGREGATOR_IDLE_SLEEP_INTERVAL", + &aggregator_idle_sleep_interval)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "exchange", + "AGGREGATOR_IDLE_SLEEP_INTERVAL"); + return GNUNET_SYSERR; + } + if (NULL == + (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to initialize DB subsystem\n"); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + TALER_EXCHANGEDB_load_accounts (cfg)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No wire accounts configured for debit!\n"); + TALER_EXCHANGEDB_plugin_unload (db_plugin); + db_plugin = NULL; + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Perform a database commit. If it fails, print a warning. + * + * @param session session to perform the commit for. + * @return status of commit + */ +static enum GNUNET_DB_QueryStatus +commit_or_warn (struct TALER_EXCHANGEDB_Session *session) +{ + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->commit (db_plugin->cls, + session); + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + return qs; + GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) + ? GNUNET_ERROR_TYPE_INFO + : GNUNET_ERROR_TYPE_ERROR, + "Failed to commit database transaction!\n"); + return qs; +} + + +/** + * Function called with the result from the execute step. + * + * @param cls NULL + * @param http_status_code #MHD_HTTP_OK on success + * @param ec taler error code + * @param row_id unique ID of the wire transfer in the bank's records + * @param wire_timestamp when did the transfer happen + */ +static void +wire_confirm_cb (void *cls, + unsigned int http_status_code, + enum TALER_ErrorCode ec, + uint64_t row_id, + struct GNUNET_TIME_Absolute wire_timestamp) +{ + struct TALER_EXCHANGEDB_Session *session = wpd->session; + enum GNUNET_DB_QueryStatus qs; + + (void) cls; + (void) row_id; + (void) wire_timestamp; + wpd->eh = NULL; + if (MHD_HTTP_OK != http_status_code) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Wire transaction failed: %u/%d\n", + http_status_code, + ec); + db_plugin->rollback (db_plugin->cls, + session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + GNUNET_free (wpd); + wpd = NULL; + return; + } + qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, + session, + wpd->row_id); + if (0 >= qs) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + db_plugin->rollback (db_plugin->cls, + session); + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + /* try again */ + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + } + else + { + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + } + GNUNET_free (wpd); + wpd = NULL; + return; + } + GNUNET_free (wpd); + wpd = NULL; + switch (commit_or_warn (session)) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + return; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Wire transfer complete\n"); + /* continue with #run_transfers(), just to guard + against the unlikely case that there are more. */ + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + return; + default: + GNUNET_break (0); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + +/** + * Callback with data about a prepared transaction. + * + * @param cls NULL + * @param rowid row identifier used to mark prepared transaction as done + * @param wire_method wire method the preparation was done for + * @param buf transaction data that was persisted, NULL on error + * @param buf_size number of bytes in @a buf, 0 on error + */ +static void +wire_prepare_cb (void *cls, + uint64_t rowid, + const char *wire_method, + const char *buf, + size_t buf_size) +{ + struct TALER_EXCHANGEDB_WireAccount *wa; + + (void) cls; + wpd->row_id = rowid; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting wire transfer %llu\n", + (unsigned long long) rowid); + wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method); + if (NULL == wpd->wa) + { + /* Should really never happen here, as when we get + here the wire account should be in the cache. */ + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + wpd->session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + GNUNET_free (wpd); + wpd = NULL; + return; + } + wa = wpd->wa; + wpd->eh = TALER_BANK_transfer (ctx, + &wa->auth, + buf, + buf_size, + &wire_confirm_cb, + NULL); + if (NULL == wpd->eh) + { + GNUNET_break (0); /* Irrecoverable */ + db_plugin->rollback (db_plugin->cls, + wpd->session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + GNUNET_free (wpd); + wpd = NULL; + return; + } +} + + +/** + * Execute the wire transfers that we have committed to + * do. + * + * @param cls NULL + */ +static void +run_transfers (void *cls) +{ + enum GNUNET_DB_QueryStatus qs; + struct TALER_EXCHANGEDB_Session *session; + const struct GNUNET_SCHEDULER_TaskContext *tc; + + (void) cls; + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for pending wire transfers\n"); + tc = GNUNET_SCHEDULER_get_task_context (); + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + if (NULL == (session = db_plugin->get_session (db_plugin->cls))) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database session!\n"); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (GNUNET_OK != + db_plugin->start (db_plugin->cls, + session, + "aggregator run transfer")) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return; + } + wpd = GNUNET_new (struct WirePrepareData); + wpd->session = session; + qs = db_plugin->wire_prepare_data_get (db_plugin->cls, + session, + &wire_prepare_cb, + NULL); + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) + return; /* continued via continuation set in #wire_prepare_cb() */ + db_plugin->rollback (db_plugin->cls, + session); + GNUNET_free (wpd); + wpd = NULL; + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* no more prepared wire transfers, go sleep a bit! */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No more pending wire transfers, going idle\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, + &run_transfers, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* should be impossible */ + GNUNET_assert (0); + } +} + + +/** + * First task. + * + * @param cls closure, NULL + * @param args remaining command-line arguments + * @param cfgfile name of the configuration file used (for saving, can be NULL!) + * @param c configuration + */ +static void +run (void *cls, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + (void) cls; + (void) args; + (void) cfgfile; + + cfg = c; + if (GNUNET_OK != parse_wirewatch_config ()) + { + cfg = NULL; + global_ret = 1; + return; + } + ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, + &rc); + rc = GNUNET_CURL_gnunet_rc_create (ctx); + if (NULL == ctx) + { + GNUNET_break (0); + return; + } + + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + cls); +} + + +/** + * The main function of the taler-exchange-transfer. + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, 1 on error + */ +int +main (int argc, + char *const *argv) +{ + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_timetravel ('T', + "timetravel"), + GNUNET_GETOPT_option_flag ('t', + "test", + "run in test mode and exit when idle", + &test_mode), + GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), + GNUNET_GETOPT_OPTION_END + }; + + if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, + &argc, &argv)) + return 2; + if (GNUNET_OK != + GNUNET_PROGRAM_run (argc, argv, + "taler-exchange-transfers", + gettext_noop ( + "background process that executes outgoing wire transfers"), + options, + &run, NULL)) + { + GNUNET_free ((void *) argv); + return 1; + } + GNUNET_free ((void *) argv); + return global_ret; +} + + +/* end of taler-exchange-transfer.c */ -- cgit v1.2.3