From 9aa2e94812278f1bd392483a6a9b36f138c20aef Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 24 Jun 2017 00:41:41 +0200 Subject: working on #5010 for aggregator --- src/exchange/taler-exchange-aggregator.c | 150 +++++++++++++++++++------------ 1 file changed, 93 insertions(+), 57 deletions(-) (limited to 'src/exchange') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 10240190b..8dd46f7f1 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -812,6 +812,8 @@ prepare_close_cb (void *cls, const char *buf, size_t buf_size) { + enum GNUNET_DB_QueryStatus qs; + GNUNET_assert (cls == ctc); GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -832,14 +834,25 @@ prepare_close_cb (void *cls, } /* Commit our intention to execute the wire transfer! */ - if (GNUNET_OK != - db_plugin->wire_prepare_data_insert (db_plugin->cls, - ctc->session, - ctc->type, - buf, - buf_size)) + qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, + ctc->session, + ctc->type, + buf, + buf_size); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) + { + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + ctc->session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + GNUNET_free (ctc->type); + GNUNET_free (ctc); + ctc = NULL; + return; + } + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { - GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, ctc->session); /* start again */ @@ -863,33 +876,53 @@ prepare_close_cb (void *cls, } +/** + * Closure for #expired_reserve_cb(). + */ +struct ExpiredReserveContext +{ + + /** + * Database session we are using. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Set to #GNUNET_YES if the transaction continues + * asynchronously. + */ + int async_cont; +}; + + /** * Function called with details about expired reserves. * We trigger the reserve closure by inserting the respective * closing record and prewire instructions into the respective * tables. * - * @param cls a `struct TALER_EXCHANGEDB_Session *` + * @param cls a `struct ExpiredReserveContext *` * @param reserve_pub public key of the reserve * @param left amount left in the reserve * @param account_details information about the reserve's bank account * @param expiration_date when did the reserve expire - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + * @return transaction status code */ -static int +static enum GNUNET_DB_QueryStatus expired_reserve_cb (void *cls, const struct TALER_ReservePublicKeyP *reserve_pub, const struct TALER_Amount *left, const json_t *account_details, struct GNUNET_TIME_Absolute expiration_date) { - struct TALER_EXCHANGEDB_Session *session = cls; + struct ExpiredReserveContext *erc = cls; + struct TALER_EXCHANGEDB_Session *session = erc->session; struct GNUNET_TIME_Absolute now; struct TALER_WireTransferIdentifierRawP wtid; struct TALER_Amount amount_without_fee; const struct TALER_Amount *closing_fee; int ret; - int iret; + enum GNUNET_DB_QueryStatus qs; const char *type; struct WirePlugin *wp; @@ -901,21 +934,17 @@ expired_reserve_cb (void *cls, if (NULL == type) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } wp = find_plugin (type); if (NULL == wp) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } /* lookup `closing_fee` */ @@ -925,11 +954,9 @@ expired_reserve_cb (void *cls, session)) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } closing_fee = &wp->af->closing_fee; @@ -956,22 +983,22 @@ expired_reserve_cb (void *cls, reserve_pub, GNUNET_MIN (sizeof (wtid), sizeof (*reserve_pub))); - iret = db_plugin->insert_reserve_closed (db_plugin->cls, - session, - reserve_pub, - now, - account_details, - &wtid, - left, - closing_fee); + qs = db_plugin->insert_reserve_closed (db_plugin->cls, + session, + reserve_pub, + now, + account_details, + &wtid, + left, + closing_fee); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Closing reserve %s over %s (%d, %d)\n", TALER_B2S (reserve_pub), TALER_amount2s (left), ret, - iret); + qs); if ( (GNUNET_OK == ret) && - (GNUNET_OK == iret) ) + (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) ) { /* success, perform wire transfer */ if (GNUNET_SYSERR == @@ -979,11 +1006,9 @@ expired_reserve_cb (void *cls, &amount_without_fee)) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } ctc = GNUNET_new (struct CloseTransferContext); ctc->wp = wp; @@ -1000,33 +1025,29 @@ expired_reserve_cb (void *cls, if (NULL == ctc->ph) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); GNUNET_free (ctc->type); GNUNET_free (ctc); ctc = NULL; + return GNUNET_DB_STATUS_HARD_ERROR; } - return GNUNET_SYSERR; + erc->async_cont = GNUNET_YES; + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } /* Check for hard failure */ - if (GNUNET_SYSERR == iret) + if ( (GNUNET_SYSERR == ret) || + (GNUNET_DB_STATUS_HARD_ERROR == qs) ) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } - /* Reserve balance was almost zero; just commit */ + /* Reserve balance was almost zero OR soft error */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Reserve was virtually empty, moving on\n"); - (void) commit_or_warn (session); - task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); - return GNUNET_SYSERR; + return qs; } @@ -1040,9 +1061,10 @@ static void run_reserve_closures (void *cls) { struct TALER_EXCHANGEDB_Session *session; - int ret; + enum GNUNET_DB_QueryStatus qs; const struct GNUNET_SCHEDULER_TaskContext *tc; - + struct ExpiredReserveContext erc; + task = NULL; reserves_idle = GNUNET_NO; tc = GNUNET_SCHEDULER_get_task_context (); @@ -1068,22 +1090,29 @@ run_reserve_closures (void *cls) GNUNET_SCHEDULER_shutdown (); return; } - ret = db_plugin->get_expired_reserves (db_plugin->cls, - session, - GNUNET_TIME_absolute_get (), - &expired_reserve_cb, - session); - if (GNUNET_SYSERR == ret) + erc.session = session; + erc.async_cont = GNUNET_NO; + qs = db_plugin->get_expired_reserves (db_plugin->cls, + session, + GNUNET_TIME_absolute_get (), + &expired_reserve_cb, + &erc); + switch (qs) { + case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); db_plugin->rollback (db_plugin->cls, session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); return; - } - if (GNUNET_NO == ret) - { + case GNUNET_DB_STATUS_SOFT_ERROR: + db_plugin->rollback (db_plugin->cls, + session); + task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more idle reserves, going back to aggregation\n"); reserves_idle = GNUNET_YES; @@ -1092,6 +1121,13 @@ run_reserve_closures (void *cls) task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + if (GNUNET_YES == erc.async_cont) + break; + (void) commit_or_warn (session); + task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, + NULL); + return; } } -- cgit v1.2.3