From ad8351c912995a9ef0524822f0fdeef55e9d27a9 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 6 Apr 2016 10:22:09 +0200 Subject: fix iterate_matching_deposits(), LIMIT does not work with variables in Postgres (#4360) --- src/exchange/taler-exchange-aggregator.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/exchange') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index a6b7de211..cfc11a5f9 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -78,9 +78,10 @@ static int test_mode; * of the smallest possible unit are aggregated, they do surpass the * "tiny" threshold beyond which we never trigger a wire transaction! * - * TODO: make configurable (via config file or command line option) + * Note: do not change here, Postgres requires us to hard-code the + * LIMIT in the prepared statement. */ -static unsigned int aggregation_limit = 10000; +static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; /** -- cgit v1.2.3 From 4977a3eb8fe403eca5d30ce7ec397213f4a4eadd Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 6 Apr 2016 10:34:42 +0200 Subject: address 'global_ret' simplification TODO --- src/exchange/taler-exchange-aggregator.c | 116 ++++++++++++++++++------------- 1 file changed, 69 insertions(+), 47 deletions(-) (limited to 'src/exchange') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index cfc11a5f9..96922eff2 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -20,7 +20,6 @@ * @author Christian Grothoff * * TODO: - * - simplify global_ret: make it a global! * - handle shutdown more nicely (call 'cancel' method on wire transfers) */ #include "platform.h" @@ -67,6 +66,12 @@ static struct TALER_WIRE_Plugin *wire_plugin; */ static struct GNUNET_SCHEDULER_Task *task; +/** + * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR + * on serious errors. + */ +static int global_ret; + /** * #GNUNET_YES if we are in test mode and are using temporary tables. */ @@ -84,6 +89,25 @@ static int test_mode; static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; +/** + * We're being aborted with CTRL-C (or SIGTERM). Shut down. + * + * @param cls closure + * @param tc scheduler context + */ +static void +shutdown_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + if (NULL != task) + { + GNUNET_SCHEDULER_cancel (task); + task = NULL; + } + /* FIXME: other shutdown stuff here! */ +} + + /** * Load configuration parameters for the exchange * server into the corresponding global variables. @@ -216,11 +240,6 @@ struct AggregationUnit */ unsigned long long *additional_rows; - /** - * Pointer to global return value. Closure for #run(). - */ - int *global_ret; - /** * Offset specifying how many #additional_rows are in use. */ @@ -314,7 +333,6 @@ deposit_cb (void *cls, } - /** * Function called with details about another deposit we * can aggregate into an existing aggregation unit. @@ -433,14 +451,13 @@ prepare_cb (void *cls, * Main work function that queries the DB and aggregates transactions * into larger wire transfers. * - * @param cls pointer to an `int` which we will return from main() + * @param cls NULL * @param tc scheduler context */ static void run_aggregation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - int *global_ret = cls; struct TALER_EXCHANGEDB_Session *session; struct AggregationUnit *au; unsigned int i; @@ -453,7 +470,7 @@ run_aggregation (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database session!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } if (GNUNET_OK != @@ -462,7 +479,7 @@ run_aggregation (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } au = GNUNET_new (struct AggregationUnit); @@ -482,7 +499,7 @@ run_aggregation (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } if (GNUNET_YES == test_mode) @@ -495,7 +512,7 @@ run_aggregation (void *cls, /* nothing to do, sleep for a minute and try again */ task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &run_aggregation, - global_ret); + NULL); } return; } @@ -518,7 +535,7 @@ run_aggregation (void *cls, GNUNET_free (au); db_plugin->rollback (db_plugin->cls, session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } /* Round to the unit supported by the wire transfer method */ @@ -541,7 +558,7 @@ run_aggregation (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); @@ -576,10 +593,9 @@ run_aggregation (void *cls, GNUNET_free (au); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } - au->global_ret = global_ret; au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls, au->wire, &au->total_amount, @@ -600,7 +616,7 @@ run_aggregation (void *cls, GNUNET_free (au); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } /* otherwise we continue with #prepare_cb(), see below */ @@ -632,7 +648,6 @@ prepare_cb (void *cls, size_t buf_size) { struct AggregationUnit *au = cls; - int *global_ret = au->global_ret; struct TALER_EXCHANGEDB_Session *session = au->session; GNUNET_free_non_null (au->additional_rows); @@ -646,7 +661,7 @@ prepare_cb (void *cls, session); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } @@ -663,7 +678,7 @@ prepare_cb (void *cls, session); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } @@ -677,13 +692,13 @@ prepare_cb (void *cls, "Failed to commit database transaction!\n"); /* try again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } /* run alternative task: actually do wire transfer! */ task = GNUNET_SCHEDULER_add_now (&run_transfers, - &global_ret); + NULL); } @@ -703,12 +718,6 @@ struct WirePrepareData */ struct TALER_WIRE_ExecuteHandle *eh; - /** - * Pointer to global return value. Closure for #run(). - */ - int *global_ret; - - /** * Row ID of the transfer. */ @@ -730,7 +739,6 @@ wire_confirm_cb (void *cls, const char *emsg) { struct WirePrepareData *wpd = cls; - int *global_ret = wpd->global_ret; struct TALER_EXCHANGEDB_Session *session = wpd->session; wpd->eh = NULL; @@ -741,7 +749,7 @@ wire_confirm_cb (void *cls, emsg); db_plugin->rollback (db_plugin->cls, session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free (wpd); return; } @@ -753,7 +761,7 @@ wire_confirm_cb (void *cls, GNUNET_break (0); /* why!? */ db_plugin->rollback (db_plugin->cls, session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free (wpd); return; } @@ -766,13 +774,13 @@ wire_confirm_cb (void *cls, "Failed to commit database transaction!\n"); /* try again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } /* continue with #run_transfers(), just to guard against the unlikely case that there are more. */ task = GNUNET_SCHEDULER_add_now (&run_transfers, - &global_ret); + NULL); } @@ -792,7 +800,6 @@ wire_prepare_cb (void *cls, size_t buf_size) { struct WirePrepareData *wpd = cls; - int *global_ret = wpd->global_ret; wpd->row_id = rowid; wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, @@ -808,7 +815,7 @@ wire_prepare_cb (void *cls, GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, wpd->session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free (wpd); return; } @@ -819,14 +826,13 @@ wire_prepare_cb (void *cls, * Execute the wire transfers that we have committed to * do. * - * @param cls pointer to an `int` which we will return from main() + * @param cls NULL * @param tc scheduler context */ static void run_transfers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - int *global_ret = cls; int ret; struct WirePrepareData *wpd; struct TALER_EXCHANGEDB_Session *session; @@ -838,7 +844,7 @@ run_transfers (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database session!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } if (GNUNET_OK != @@ -847,12 +853,11 @@ run_transfers (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } wpd = GNUNET_new (struct WirePrepareData); wpd->session = session; - wpd->global_ret = global_ret; ret = db_plugin->wire_prepare_data_get (db_plugin->cls, session, exchange_wireformat, @@ -863,7 +868,7 @@ run_transfers (void *cls, GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free (wpd); return; } @@ -873,7 +878,7 @@ run_transfers (void *cls, db_plugin->rollback (db_plugin->cls, session); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); GNUNET_free (wpd); return; } @@ -881,6 +886,24 @@ run_transfers (void *cls, } +/** + * First task. + * + * @param cls closure, NULL + * @param tc scheduler context + */ +static void +run (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + task = GNUNET_SCHEDULER_add_now (&run_transfers, + cls); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, + &shutdown_task, + cls); +} + + /** * The main function of the taler-exchange-httpd server ("the exchange"). * @@ -907,7 +930,6 @@ main (int argc, GNUNET_GETOPT_OPTION_VERSION (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; - int ret = GNUNET_OK; GNUNET_assert (GNUNET_OK == GNUNET_log_setup ("taler-exchange-aggregator", @@ -929,12 +951,12 @@ main (int argc, { return 1; } - - GNUNET_SCHEDULER_run (&run_transfers, &ret); + global_ret = GNUNET_OK; + GNUNET_SCHEDULER_run (&run, NULL); TALER_EXCHANGEDB_plugin_unload (db_plugin); TALER_WIRE_plugin_unload (wire_plugin); - return (GNUNET_SYSERR == ret) ? 1 : 0; + return (GNUNET_SYSERR == global_ret) ? 1 : 0; } /* end of taler-exchange-aggregator.c */ -- cgit v1.2.3 From 07541d319c16e8dd78e813989223a654cb306c87 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 6 Apr 2016 11:24:33 +0200 Subject: address 'shutdown' TODO in taler-exchange-aggregator --- src/exchange/taler-exchange-aggregator.c | 279 +++++++++++++++++-------------- 1 file changed, 158 insertions(+), 121 deletions(-) (limited to 'src/exchange') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 96922eff2..3726d32fa 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -18,9 +18,6 @@ * @file taler-exchange-aggregator.c * @brief Process that aggregates outgoing transactions and executes them * @author Christian Grothoff - * - * TODO: - * - handle shutdown more nicely (call 'cancel' method on wire transfers) */ #include "platform.h" #include @@ -31,6 +28,102 @@ #include "taler_json_lib.h" #include "taler_wire_lib.h" + +/** + * Data we keep to #run_transfers(). There is at most + * one of these around at any given point in time. + */ +struct WirePrepareData +{ + + /** + * Database session for all of our transactions. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Wire execution handle. + */ + struct TALER_WIRE_ExecuteHandle *eh; + + /** + * Row ID of the transfer. + */ + unsigned long long row_id; + +}; + + +/** + * Information about one aggregation process to be executed. There is + * at most one of these around at any given point in time. + */ +struct AggregationUnit +{ + /** + * Public key of the merchant. + */ + struct TALER_MerchantPublicKeyP merchant_pub; + + /** + * Total amount to be transferred. + */ + struct TALER_Amount total_amount; + + /** + * Hash of @e wire. + */ + struct GNUNET_HashCode h_wire; + + /** + * Wire transfer identifier we use. + */ + struct TALER_WireTransferIdentifierRawP wtid; + + /** + * Row ID of the transaction that started it all. + */ + unsigned long long row_id; + + /** + * The current time. + */ + struct GNUNET_TIME_Absolute execution_time; + + /** + * Wire details of the merchant. + */ + json_t *wire; + + /** + * Database session for all of our transactions. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Wire preparation handle. + */ + struct TALER_WIRE_PrepareHandle *ph; + + /** + * Array of #aggregation_limit row_ids from the + * aggregation. + */ + unsigned long long *additional_rows; + + /** + * Offset specifying how many #additional_rows are in use. + */ + unsigned int rows_offset; + + /** + * Set to #GNUNET_YES if we have to abort due to failure. + */ + int failed; + +}; + + /** * Which currency is used by this exchange? */ @@ -62,10 +155,22 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct TALER_WIRE_Plugin *wire_plugin; /** - * Task for the main #run() function. + * Next task to run, if any. */ static struct GNUNET_SCHEDULER_Task *task; +/** + * If we are currently executing a transfer, information about + * the active transfer is here. Otherwise, this variable is NULL. + */ +static struct WirePrepareData *wpd; + +/** + * If we are currently aggregating transactions, information about the + * active aggregation is here. Otherwise, this variable is NULL. + */ +static struct AggregationUnit *au; + /** * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR * on serious errors. @@ -104,7 +209,36 @@ shutdown_task (void *cls, GNUNET_SCHEDULER_cancel (task); task = NULL; } - /* FIXME: other shutdown stuff here! */ + if (NULL != wpd) + { + if (NULL != wpd->eh) + { + wire_plugin->execute_wire_transfer_cancel (wire_plugin->cls, + wpd->eh); + wpd->eh = NULL; + } + db_plugin->rollback (db_plugin->cls, + wpd->session); + GNUNET_free (wpd); + wpd = NULL; + } + if (NULL != au) + { + if (NULL != au->ph) + { + wire_plugin->prepare_wire_transfer_cancel (wire_plugin->cls, + au->ph); + au->ph = NULL; + } + db_plugin->rollback (db_plugin->cls, + au->session); + GNUNET_free_non_null (au->additional_rows); + if (NULL != au->wire) + json_decref (au->wire); + au = NULL; + GNUNET_free (au); + + } } @@ -183,81 +317,11 @@ exchange_serve_process_config (const char *exchange_directory) } -/** - * Information about one aggregation process to - * be executed. - */ -struct AggregationUnit -{ - /** - * Public key of the merchant. - */ - struct TALER_MerchantPublicKeyP merchant_pub; - - /** - * Total amount to be transferred. - */ - struct TALER_Amount total_amount; - - /** - * Hash of @e wire. - */ - struct GNUNET_HashCode h_wire; - - /** - * Wire transfer identifier we use. - */ - struct TALER_WireTransferIdentifierRawP wtid; - - /** - * Row ID of the transaction that started it all. - */ - unsigned long long row_id; - - /** - * The current time. - */ - struct GNUNET_TIME_Absolute execution_time; - - /** - * Wire details of the merchant. - */ - json_t *wire; - - /** - * Database session for all of our transactions. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Wire preparation handle. - */ - struct TALER_WIRE_PrepareHandle *ph; - - /** - * Array of #aggregation_limit row_ids from the - * aggregation. - */ - unsigned long long *additional_rows; - - /** - * Offset specifying how many #additional_rows are in use. - */ - unsigned int rows_offset; - - /** - * Set to #GNUNET_YES if we have to abort due to failure. - */ - int failed; - -}; - - /** * Function called with details about deposits that have been made, * with the goal of executing the corresponding wire transaction. * - * @param cls closure with the `struct AggregationUnit` + * @param cls NULL * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin @@ -282,8 +346,6 @@ deposit_cb (void *cls, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { - struct AggregationUnit *au = cls; - au->merchant_pub = *merchant_pub; if (GNUNET_OK != TALER_amount_subtract (&au->total_amount, @@ -337,7 +399,7 @@ deposit_cb (void *cls, * Function called with details about another deposit we * can aggregate into an existing aggregation unit. * - * @param cls closure with the `struct AggregationUnit` + * @param cls NULL * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin @@ -362,7 +424,6 @@ aggregate_cb (void *cls, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { - struct AggregationUnit *au = cls; struct TALER_Amount delta; GNUNET_break (0 == @@ -459,7 +520,6 @@ run_aggregation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct TALER_EXCHANGEDB_Session *session; - struct AggregationUnit *au; unsigned int i; int ret; @@ -493,6 +553,7 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; db_plugin->rollback (db_plugin->cls, session); if (0 != ret) @@ -533,6 +594,7 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; db_plugin->rollback (db_plugin->cls, session); global_ret = GNUNET_SYSERR; @@ -563,6 +625,7 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; return; } /* Mark transactions by row_id as minor */ @@ -591,6 +654,7 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); @@ -602,9 +666,6 @@ run_aggregation (void *cls, &au->wtid, &prepare_cb, au); - /* FIXME: currently we have no clean-up plan on - shutdown to call prepare_wire_transfer_cancel! - Maybe make 'au' global? */ if (NULL == au->ph) { GNUNET_break (0); /* why? how to best recover? */ @@ -613,6 +674,7 @@ run_aggregation (void *cls, GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); + au = NULL; GNUNET_free (au); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, @@ -638,7 +700,7 @@ run_transfers (void *cls, /** * Function to be called with the prepared transfer data. * - * @param cls closure with the `struct AggregationUnit` + * @param cls NULL * @param buf transaction data to persist, NULL on error * @param buf_size number of bytes in @a buf, 0 on error */ @@ -647,13 +709,13 @@ prepare_cb (void *cls, const char *buf, size_t buf_size) { - struct AggregationUnit *au = cls; struct TALER_EXCHANGEDB_Session *session = au->session; GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; if (NULL == buf) { GNUNET_break (0); /* why? how to best recover? */ @@ -702,34 +764,10 @@ prepare_cb (void *cls, } -/** - * Data we keep to #run_transfers(). - */ -struct WirePrepareData -{ - - /** - * Database session for all of our transactions. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Wire execution handle. - */ - struct TALER_WIRE_ExecuteHandle *eh; - - /** - * Row ID of the transfer. - */ - unsigned long long row_id; - -}; - - /** * Function called with the result from the execute step. * - * @param cls closure with the `struct WirePrepareData` + * @param cls NULL * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure * @param emsg NULL on success, otherwise an error message */ @@ -738,7 +776,6 @@ wire_confirm_cb (void *cls, int success, const char *emsg) { - struct WirePrepareData *wpd = cls; struct TALER_EXCHANGEDB_Session *session = wpd->session; wpd->eh = NULL; @@ -751,6 +788,7 @@ wire_confirm_cb (void *cls, session); global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } if (GNUNET_OK != @@ -763,9 +801,11 @@ wire_confirm_cb (void *cls, session); global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } GNUNET_free (wpd); + wpd = NULL; if (GNUNET_OK != db_plugin->commit (db_plugin->cls, session)) @@ -788,7 +828,7 @@ wire_confirm_cb (void *cls, /** * Callback with data about a prepared transaction. * - * @param cls closure with the `struct WirePrepareData` + * @param cls NULL * @param rowid row identifier used to mark prepared transaction as done * @param buf transaction data that was persisted, NULL on error * @param buf_size number of bytes in @a buf, 0 on error @@ -799,17 +839,12 @@ wire_prepare_cb (void *cls, const char *buf, size_t buf_size) { - struct WirePrepareData *wpd = cls; - wpd->row_id = rowid; wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, buf, buf_size, &wire_confirm_cb, - wpd); - /* FIXME: currently we have no clean-up plan on - shutdown to call execute_wire_transfer_cancel! - Maybe make 'wpd' global? */ + NULL); if (NULL == wpd->eh) { GNUNET_break (0); /* why? how to best recover? */ @@ -817,6 +852,7 @@ wire_prepare_cb (void *cls, wpd->session); global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } } @@ -834,7 +870,6 @@ run_transfers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { int ret; - struct WirePrepareData *wpd; struct TALER_EXCHANGEDB_Session *session; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) @@ -862,7 +897,7 @@ run_transfers (void *cls, session, exchange_wireformat, &wire_prepare_cb, - wpd); + NULL); if (GNUNET_SYSERR == ret) { GNUNET_break (0); /* why? how to best recover? */ @@ -870,6 +905,7 @@ run_transfers (void *cls, session); global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } if (GNUNET_NO == ret) @@ -880,6 +916,7 @@ run_transfers (void *cls, task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); GNUNET_free (wpd); + wpd = NULL; return; } /* otherwise, continues in #wire_prepare_cb() */ -- cgit v1.2.3 From a96c7177aae60c37041406ff0879992e58ef2f50 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 6 Apr 2016 12:04:51 +0200 Subject: get simple aggregation test to pass --- src/exchange/taler-exchange-aggregator.c | 42 +++++++++++++++++++++++---- src/exchange/test_taler_exchange_aggregator.c | 22 +++++++------- src/wire/plugin_wire_test.c | 2 +- 3 files changed, 50 insertions(+), 16 deletions(-) (limited to 'src/exchange') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 3726d32fa..def6d0be4 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -237,8 +237,9 @@ shutdown_task (void *cls, json_decref (au->wire); au = NULL; GNUNET_free (au); - } + TALER_EXCHANGEDB_plugin_unload (db_plugin); + TALER_WIRE_plugin_unload (wire_plugin); } @@ -366,6 +367,9 @@ deposit_cb (void *cls, GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &au->wtid, sizeof (au->wtid)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting aggregation under WTID %s\n", + TALER_B2S (&au->wtid)); if (GNUNET_OK != db_plugin->insert_aggregation_tracking (db_plugin->cls, au->session, @@ -523,8 +527,11 @@ run_aggregation (void *cls, unsigned int i; int ret; + task = NULL; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for ready deposits to aggregate\n"); if (NULL == (session = db_plugin->get_session (db_plugin->cls, test_mode))) { @@ -563,6 +570,8 @@ run_aggregation (void *cls, global_ret = GNUNET_SYSERR; return; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No more ready deposits, going to sleep\n"); if (GNUNET_YES == test_mode) { /* in test mode, shutdown if we end up being idle */ @@ -578,6 +587,9 @@ run_aggregation (void *cls, return; } /* Now try to find other deposits to aggregate */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Found ready deposit for %s, aggregating\n", + TALER_B2S (&au->merchant_pub)); ret = db_plugin->iterate_matching_deposits (db_plugin->cls, session, &au->h_wire, @@ -608,6 +620,8 @@ run_aggregation (void *cls, if ( (0 == au->total_amount.value) && (0 == au->total_amount.fraction) ) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Aggregate value too low for transfer\n"); /* Rollback ongoing transaction, as we will not use the respective WTID and thus need to remove the tracking data */ db_plugin->rollback (db_plugin->cls, @@ -660,6 +674,15 @@ run_aggregation (void *cls, NULL); return; } + { + char *amount_s; + + amount_s = TALER_amount_to_string (&au->total_amount); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Preparing wire transfer of %s to %s\n", + amount_s, + TALER_B2S (&au->merchant_pub)); + } au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls, au->wire, &au->total_amount, @@ -757,7 +780,8 @@ prepare_cb (void *cls, NULL); return; } - + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Preparation complete, switching to transfer mode\n"); /* run alternative task: actually do wire transfer! */ task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); @@ -817,6 +841,8 @@ wire_confirm_cb (void *cls, NULL); return; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Wire transfer complete\n"); /* continue with #run_transfers(), just to guard against the unlikely case that there are more. */ task = GNUNET_SCHEDULER_add_now (&run_transfers, @@ -840,6 +866,9 @@ wire_prepare_cb (void *cls, size_t buf_size) { wpd->row_id = rowid; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting wire transfer %llu\n", + rowid); wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, buf, buf_size, @@ -872,6 +901,9 @@ run_transfers (void *cls, int ret; struct TALER_EXCHANGEDB_Session *session; + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for pending wire transfers\n"); if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; if (NULL == (session = db_plugin->get_session (db_plugin->cls, @@ -911,6 +943,8 @@ run_transfers (void *cls, if (GNUNET_NO == ret) { /* no more prepared wire transfers, go back to aggregation! */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No more pending wire transfers, starting aggregation\n"); db_plugin->rollback (db_plugin->cls, session); task = GNUNET_SCHEDULER_add_now (&run_aggregation, @@ -934,7 +968,7 @@ run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { task = GNUNET_SCHEDULER_add_now (&run_transfers, - cls); + NULL); GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, cls); @@ -991,8 +1025,6 @@ main (int argc, global_ret = GNUNET_OK; GNUNET_SCHEDULER_run (&run, NULL); - TALER_EXCHANGEDB_plugin_unload (db_plugin); - TALER_WIRE_plugin_unload (wire_plugin); return (GNUNET_SYSERR == global_ret) ? 1 : 0; } diff --git a/src/exchange/test_taler_exchange_aggregator.c b/src/exchange/test_taler_exchange_aggregator.c index 22bc8e277..f34dea837 100644 --- a/src/exchange/test_taler_exchange_aggregator.c +++ b/src/exchange/test_taler_exchange_aggregator.c @@ -668,25 +668,19 @@ run_test () .label = "run-aggregator-deposit-1" }, - /* The above step is already known to fail (with an error message) - right now, so we skip the rest of the test. */ - { - .opcode = OPCODE_TERMINATE_SKIP, - .label = "testcase-incomplete-terminating-with-skip" - }, - - { .opcode = OPCODE_EXPECT_TRANSACTION, .label = "expect-deposit-1", - .details.expect_transaction.debit_account = 1, + .details.expect_transaction.debit_account = 3, .details.expect_transaction.credit_account = 4, .details.expect_transaction.amount = "EUR:1" }, + { .opcode = OPCODE_EXPECT_TRANSACTIONS_EMPTY, .label = "expect-empty-transactions-on-start" }, + /* test idempotency: run again on transactions already done */ { .opcode = OPCODE_DATABASE_DEPOSIT, @@ -703,6 +697,11 @@ run_test () .label = "expect-empty-transactions-on-start" }, + { + .opcode = OPCODE_TERMINATE_SUCCESS, + .label = "testcase-incomplete-terminating-with-skip" + }, + { .opcode = OPCODE_TERMINATE_SKIP, .label = "testcase-incomplete-terminating-with-skip" @@ -804,7 +803,6 @@ handle_mhd_request (void *cls, GNUNET_break_op (0); return MHD_NO; } - /* FIXME: to be implemented! */ pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX, con_cls, upload_data, @@ -848,6 +846,10 @@ handle_mhd_request (void *cls, transactions_tail, t); } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Receiving incoming wire transfer: %llu->%llu\n", + (unsigned long long) t->debit_account, + (unsigned long long) t->credit_account); json_decref (json); resp = MHD_create_response_from_buffer (0, "", MHD_RESPMEM_PERSISTENT); ret = MHD_queue_response (connection, diff --git a/src/wire/plugin_wire_test.c b/src/wire/plugin_wire_test.c index 357449af6..9d0667680 100644 --- a/src/wire/plugin_wire_test.c +++ b/src/wire/plugin_wire_test.c @@ -178,7 +178,7 @@ context_task (void *cls, rs, ws, &context_task, - cls); + tc); GNUNET_NETWORK_fdset_destroy (rs); GNUNET_NETWORK_fdset_destroy (ws); } -- cgit v1.2.3