diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 338 |
1 files changed, 2 insertions, 336 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 431abea4d..59db4daef 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -18,19 +18,6 @@ * @file taler-exchange-aggregator.c * @brief Process that aggregates outgoing transactions and executes them * @author Christian Grothoff - * - * Note: - * It might be simpler and theoretically more performant to split up - * this process into three: - * - one that runs the 'pending' wire transfers - * - one that performs aggregation - * - one that closes (expired) reserves - * - * They would have some (minor) code duplication to load the database and wire - * plugins and account data, and this would also slightly complicate - * operations by having to launch three processes. OTOH, those processes could - * then fail independently, which might also be a good thing. In any case, - * doing this is not expected to be complicated. */ #include "platform.h" #include <gnunet/gnunet_util_lib.h> @@ -43,38 +30,6 @@ /** - * Data we keep to #run_transfers(). There is at most - * one of these around at any given point in time. - * Note that this limits parallelism, and we might want - * to revise this decision at a later point. - */ -struct WirePrepareData -{ - - /** - * Database session for all of our transactions. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Wire execution handle. - */ - struct TALER_BANK_TransferHandle *eh; - - /** - * Wire account used for this preparation. - */ - struct TALER_EXCHANGEDB_WireAccount *wa; - - /** - * Row ID of the transfer. - */ - unsigned long long row_id; - -}; - - -/** * Information about one aggregation process to be executed. There is * at most one of these around at any given point in time. * Note that this limits parallelism, and we might want @@ -202,22 +157,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct GNUNET_SCHEDULER_Task *task; /** - * If we are currently executing a transfer, information about - * the active transfer is here. Otherwise, this variable is NULL. - */ -static struct WirePrepareData *wpd; - -/** - * Handle to the context for interacting with the bank / wire gateway. - */ -static struct GNUNET_CURL_Context *ctx; - -/** - * Scheduler context for running the @e ctx. - */ -static struct GNUNET_CURL_RescheduleContext *rc; - -/** * How long should we sleep when idle before trying to find more work? */ static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; @@ -245,16 +184,6 @@ run_aggregation (void *cls); /** - * Execute the wire transfers that we have committed to - * do. - * - * @param cls NULL - */ -static void -run_transfers (void *cls); - - -/** * Free data stored in @a au, but not @a au itself (stack allocated). * * @param au aggreation unit to clean up @@ -281,16 +210,6 @@ static void shutdown_task (void *cls) { (void) cls; - if (NULL != ctx) - { - GNUNET_CURL_fini (ctx); - ctx = NULL; - } - if (NULL != rc) - { - GNUNET_CURL_gnunet_rc_destroy (rc); - rc = NULL; - } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); if (NULL != task) @@ -298,18 +217,6 @@ shutdown_task (void *cls) GNUNET_SCHEDULER_cancel (task); task = NULL; } - if (NULL != wpd) - { - if (NULL != wpd->eh) - { - TALER_BANK_transfer_cancel (wpd->eh); - wpd->eh = NULL; - } - db_plugin->rollback (db_plugin->cls, - wpd->session); - GNUNET_free (wpd); - wpd = NULL; - } TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); @@ -1038,106 +945,11 @@ run_aggregation (void *cls) return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparation complete, switching to transfer mode\n"); - /* run alternative task: actually do wire transfer! */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - return; - default: - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - } -} - - -/** - * Function called with the result from the execute step. - * - * @param cls NULL - * @param http_status_code #MHD_HTTP_OK on success - * @param ec taler error code - * @param row_id unique ID of the wire transfer in the bank's records - * @param wire_timestamp when did the transfer happen - */ -static void -wire_confirm_cb (void *cls, - unsigned int http_status_code, - enum TALER_ErrorCode ec, - uint64_t row_id, - struct GNUNET_TIME_Absolute wire_timestamp) -{ - struct TALER_EXCHANGEDB_Session *session = wpd->session; - enum GNUNET_DB_QueryStatus qs; - - (void) cls; - (void) row_id; - (void) wire_timestamp; - wpd->eh = NULL; - if (MHD_HTTP_OK != http_status_code) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Wire transaction failed: %u/%d\n", - http_status_code, - ec); - db_plugin->rollback (db_plugin->cls, - session); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; - return; - } - qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, - session, - wpd->row_id); - if (0 >= qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - db_plugin->rollback (db_plugin->cls, - session); - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - /* try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - } - else - { - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - } - GNUNET_free (wpd); - wpd = NULL; - return; - } - GNUNET_free (wpd); - wpd = NULL; - switch (commit_or_warn (session)) - { - case GNUNET_DB_STATUS_SOFT_ERROR: - /* try again */ + "Preparation complete, going again\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); return; - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Wire transfer complete\n"); - /* continue with #run_transfers(), just to guard - against the unlikely case that there are more. */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - return; default: GNUNET_break (0); global_ret = GNUNET_SYSERR; @@ -1148,143 +960,6 @@ wire_confirm_cb (void *cls, /** - * Callback with data about a prepared transaction. - * - * @param cls NULL - * @param rowid row identifier used to mark prepared transaction as done - * @param wire_method wire method the preparation was done for - * @param buf transaction data that was persisted, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - */ -static void -wire_prepare_cb (void *cls, - uint64_t rowid, - const char *wire_method, - const char *buf, - size_t buf_size) -{ - struct TALER_EXCHANGEDB_WireAccount *wa; - - (void) cls; - wpd->row_id = rowid; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Starting wire transfer %llu\n", - (unsigned long long) rowid); - wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method); - if (NULL == wpd->wa) - { - /* Should really never happen here, as when we get - here the wire account should be in the cache. */ - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - wpd->session); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; - return; - } - wa = wpd->wa; - wpd->eh = TALER_BANK_transfer (ctx, - &wa->auth, - buf, - buf_size, - &wire_confirm_cb, - NULL); - if (NULL == wpd->eh) - { - GNUNET_break (0); /* Irrecoverable */ - db_plugin->rollback (db_plugin->cls, - wpd->session); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; - return; - } -} - - -/** - * Execute the wire transfers that we have committed to - * do. - * - * @param cls NULL - */ -static void -run_transfers (void *cls) -{ - enum GNUNET_DB_QueryStatus qs; - struct TALER_EXCHANGEDB_Session *session; - const struct GNUNET_SCHEDULER_TaskContext *tc; - - (void) cls; - task = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking for pending wire transfers\n"); - tc = GNUNET_SCHEDULER_get_task_context (); - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - if (NULL == (session = db_plugin->get_session (db_plugin->cls))) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain database session!\n"); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - } - if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - session, - "aggregator run transfer")) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - } - wpd = GNUNET_new (struct WirePrepareData); - wpd->session = session; - qs = db_plugin->wire_prepare_data_get (db_plugin->cls, - session, - &wire_prepare_cb, - NULL); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) - return; /* continued via continuation set in #wire_prepare_cb() */ - db_plugin->rollback (db_plugin->cls, - session); - GNUNET_free (wpd); - wpd = NULL; - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - /* try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* no more prepared wire transfers, go back to aggregation! */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "No more pending wire transfers, starting aggregation\n"); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - return; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* should be impossible */ - GNUNET_assert (0); - } -} - - -/** * First task. * * @param cls closure, NULL @@ -1309,17 +984,8 @@ run (void *cls, global_ret = 1; return; } - ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, - &rc); - rc = GNUNET_CURL_gnunet_rc_create (ctx); - if (NULL == ctx) - { - GNUNET_break (0); - return; - } - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, + task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, cls); |