summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-aggregator.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2020-03-12 10:23:26 +0100
committerChristian Grothoff <christian@grothoff.org>2020-03-12 10:23:26 +0100
commitb91fcbb92f21db498214cba38ffd6e3fe886d95e (patch)
tree18ee0a5b8b28feef17e3ff5aedff490ddc7aebad /src/exchange/taler-exchange-aggregator.c
parent83631bc98fe70dd73f212581fb54ab3a82560686 (diff)
downloadexchange-b91fcbb92f21db498214cba38ffd6e3fe886d95e.tar.gz
exchange-b91fcbb92f21db498214cba38ffd6e3fe886d95e.tar.bz2
exchange-b91fcbb92f21db498214cba38ffd6e3fe886d95e.zip
finish separation of aggreator into aggregation, closing and transfer processes (test cases still need to be updated)
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r--src/exchange/taler-exchange-aggregator.c338
1 files changed, 2 insertions, 336 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 431abea4..59db4dae 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -18,19 +18,6 @@
* @file taler-exchange-aggregator.c
* @brief Process that aggregates outgoing transactions and executes them
* @author Christian Grothoff
- *
- * Note:
- * It might be simpler and theoretically more performant to split up
- * this process into three:
- * - one that runs the 'pending' wire transfers
- * - one that performs aggregation
- * - one that closes (expired) reserves
- *
- * They would have some (minor) code duplication to load the database and wire
- * plugins and account data, and this would also slightly complicate
- * operations by having to launch three processes. OTOH, those processes could
- * then fail independently, which might also be a good thing. In any case,
- * doing this is not expected to be complicated.
*/
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
@@ -43,38 +30,6 @@
/**
- * Data we keep to #run_transfers(). There is at most
- * one of these around at any given point in time.
- * Note that this limits parallelism, and we might want
- * to revise this decision at a later point.
- */
-struct WirePrepareData
-{
-
- /**
- * Database session for all of our transactions.
- */
- struct TALER_EXCHANGEDB_Session *session;
-
- /**
- * Wire execution handle.
- */
- struct TALER_BANK_TransferHandle *eh;
-
- /**
- * Wire account used for this preparation.
- */
- struct TALER_EXCHANGEDB_WireAccount *wa;
-
- /**
- * Row ID of the transfer.
- */
- unsigned long long row_id;
-
-};
-
-
-/**
* Information about one aggregation process to be executed. There is
* at most one of these around at any given point in time.
* Note that this limits parallelism, and we might want
@@ -202,22 +157,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct GNUNET_SCHEDULER_Task *task;
/**
- * If we are currently executing a transfer, information about
- * the active transfer is here. Otherwise, this variable is NULL.
- */
-static struct WirePrepareData *wpd;
-
-/**
- * Handle to the context for interacting with the bank / wire gateway.
- */
-static struct GNUNET_CURL_Context *ctx;
-
-/**
- * Scheduler context for running the @e ctx.
- */
-static struct GNUNET_CURL_RescheduleContext *rc;
-
-/**
* How long should we sleep when idle before trying to find more work?
*/
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
@@ -245,16 +184,6 @@ run_aggregation (void *cls);
/**
- * Execute the wire transfers that we have committed to
- * do.
- *
- * @param cls NULL
- */
-static void
-run_transfers (void *cls);
-
-
-/**
* Free data stored in @a au, but not @a au itself (stack allocated).
*
* @param au aggreation unit to clean up
@@ -281,16 +210,6 @@ static void
shutdown_task (void *cls)
{
(void) cls;
- if (NULL != ctx)
- {
- GNUNET_CURL_fini (ctx);
- ctx = NULL;
- }
- if (NULL != rc)
- {
- GNUNET_CURL_gnunet_rc_destroy (rc);
- rc = NULL;
- }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Running shutdown\n");
if (NULL != task)
@@ -298,18 +217,6 @@ shutdown_task (void *cls)
GNUNET_SCHEDULER_cancel (task);
task = NULL;
}
- if (NULL != wpd)
- {
- if (NULL != wpd->eh)
- {
- TALER_BANK_transfer_cancel (wpd->eh);
- wpd->eh = NULL;
- }
- db_plugin->rollback (db_plugin->cls,
- wpd->session);
- GNUNET_free (wpd);
- wpd = NULL;
- }
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
TALER_EXCHANGEDB_unload_accounts ();
@@ -1038,106 +945,11 @@ run_aggregation (void *cls)
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Preparation complete, switching to transfer mode\n");
- /* run alternative task: actually do wire transfer! */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
- NULL);
- return;
- default:
- GNUNET_break (0);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
-}
-
-
-/**
- * Function called with the result from the execute step.
- *
- * @param cls NULL
- * @param http_status_code #MHD_HTTP_OK on success
- * @param ec taler error code
- * @param row_id unique ID of the wire transfer in the bank's records
- * @param wire_timestamp when did the transfer happen
- */
-static void
-wire_confirm_cb (void *cls,
- unsigned int http_status_code,
- enum TALER_ErrorCode ec,
- uint64_t row_id,
- struct GNUNET_TIME_Absolute wire_timestamp)
-{
- struct TALER_EXCHANGEDB_Session *session = wpd->session;
- enum GNUNET_DB_QueryStatus qs;
-
- (void) cls;
- (void) row_id;
- (void) wire_timestamp;
- wpd->eh = NULL;
- if (MHD_HTTP_OK != http_status_code)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Wire transaction failed: %u/%d\n",
- http_status_code,
- ec);
- db_plugin->rollback (db_plugin->cls,
- session);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
- return;
- }
- qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
- session,
- wpd->row_id);
- if (0 >= qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- db_plugin->rollback (db_plugin->cls,
- session);
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- /* try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
- }
- else
- {
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- }
- GNUNET_free (wpd);
- wpd = NULL;
- return;
- }
- GNUNET_free (wpd);
- wpd = NULL;
- switch (commit_or_warn (session))
- {
- case GNUNET_DB_STATUS_SOFT_ERROR:
- /* try again */
+ "Preparation complete, going again\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
return;
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Wire transfer complete\n");
- /* continue with #run_transfers(), just to guard
- against the unlikely case that there are more. */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
- NULL);
- return;
default:
GNUNET_break (0);
global_ret = GNUNET_SYSERR;
@@ -1148,143 +960,6 @@ wire_confirm_cb (void *cls,
/**
- * Callback with data about a prepared transaction.
- *
- * @param cls NULL
- * @param rowid row identifier used to mark prepared transaction as done
- * @param wire_method wire method the preparation was done for
- * @param buf transaction data that was persisted, NULL on error
- * @param buf_size number of bytes in @a buf, 0 on error
- */
-static void
-wire_prepare_cb (void *cls,
- uint64_t rowid,
- const char *wire_method,
- const char *buf,
- size_t buf_size)
-{
- struct TALER_EXCHANGEDB_WireAccount *wa;
-
- (void) cls;
- wpd->row_id = rowid;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Starting wire transfer %llu\n",
- (unsigned long long) rowid);
- wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method);
- if (NULL == wpd->wa)
- {
- /* Should really never happen here, as when we get
- here the wire account should be in the cache. */
- GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- wpd->session);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
- return;
- }
- wa = wpd->wa;
- wpd->eh = TALER_BANK_transfer (ctx,
- &wa->auth,
- buf,
- buf_size,
- &wire_confirm_cb,
- NULL);
- if (NULL == wpd->eh)
- {
- GNUNET_break (0); /* Irrecoverable */
- db_plugin->rollback (db_plugin->cls,
- wpd->session);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
- return;
- }
-}
-
-
-/**
- * Execute the wire transfers that we have committed to
- * do.
- *
- * @param cls NULL
- */
-static void
-run_transfers (void *cls)
-{
- enum GNUNET_DB_QueryStatus qs;
- struct TALER_EXCHANGEDB_Session *session;
- const struct GNUNET_SCHEDULER_TaskContext *tc;
-
- (void) cls;
- task = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Checking for pending wire transfers\n");
- tc = GNUNET_SCHEDULER_get_task_context ();
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return;
- if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain database session!\n");
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- session,
- "aggregator run transfer"))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start database transaction!\n");
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- wpd = GNUNET_new (struct WirePrepareData);
- wpd->session = session;
- qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
- session,
- &wire_prepare_cb,
- NULL);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
- return; /* continued via continuation set in #wire_prepare_cb() */
- db_plugin->rollback (db_plugin->cls,
- session);
- GNUNET_free (wpd);
- wpd = NULL;
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- /* try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
- NULL);
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- /* no more prepared wire transfers, go back to aggregation! */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "No more pending wire transfers, starting aggregation\n");
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
- return;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* should be impossible */
- GNUNET_assert (0);
- }
-}
-
-
-/**
* First task.
*
* @param cls closure, NULL
@@ -1309,17 +984,8 @@ run (void *cls,
global_ret = 1;
return;
}
- ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
- &rc);
- rc = GNUNET_CURL_gnunet_rc_create (ctx);
- if (NULL == ctx)
- {
- GNUNET_break (0);
- return;
- }
-
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);