From 9aa2e94812278f1bd392483a6a9b36f138c20aef Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 24 Jun 2017 00:41:41 +0200 Subject: working on #5010 for aggregator --- src/exchange/taler-exchange-aggregator.c | 150 +++++++++++++--------- src/exchangedb/plugin_exchangedb_postgres.c | 188 ++++++++++++++++------------ src/exchangedb/test_exchangedb.c | 4 +- src/include/taler_exchangedb_plugin.h | 19 ++- 4 files changed, 210 insertions(+), 151 deletions(-) diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 10240190b..8dd46f7f1 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -812,6 +812,8 @@ prepare_close_cb (void *cls, const char *buf, size_t buf_size) { + enum GNUNET_DB_QueryStatus qs; + GNUNET_assert (cls == ctc); GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -832,14 +834,25 @@ prepare_close_cb (void *cls, } /* Commit our intention to execute the wire transfer! */ - if (GNUNET_OK != - db_plugin->wire_prepare_data_insert (db_plugin->cls, - ctc->session, - ctc->type, - buf, - buf_size)) + qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, + ctc->session, + ctc->type, + buf, + buf_size); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) + { + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + ctc->session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + GNUNET_free (ctc->type); + GNUNET_free (ctc); + ctc = NULL; + return; + } + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { - GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, ctc->session); /* start again */ @@ -863,33 +876,53 @@ prepare_close_cb (void *cls, } +/** + * Closure for #expired_reserve_cb(). + */ +struct ExpiredReserveContext +{ + + /** + * Database session we are using. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Set to #GNUNET_YES if the transaction continues + * asynchronously. + */ + int async_cont; +}; + + /** * Function called with details about expired reserves. * We trigger the reserve closure by inserting the respective * closing record and prewire instructions into the respective * tables. * - * @param cls a `struct TALER_EXCHANGEDB_Session *` + * @param cls a `struct ExpiredReserveContext *` * @param reserve_pub public key of the reserve * @param left amount left in the reserve * @param account_details information about the reserve's bank account * @param expiration_date when did the reserve expire - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + * @return transaction status code */ -static int +static enum GNUNET_DB_QueryStatus expired_reserve_cb (void *cls, const struct TALER_ReservePublicKeyP *reserve_pub, const struct TALER_Amount *left, const json_t *account_details, struct GNUNET_TIME_Absolute expiration_date) { - struct TALER_EXCHANGEDB_Session *session = cls; + struct ExpiredReserveContext *erc = cls; + struct TALER_EXCHANGEDB_Session *session = erc->session; struct GNUNET_TIME_Absolute now; struct TALER_WireTransferIdentifierRawP wtid; struct TALER_Amount amount_without_fee; const struct TALER_Amount *closing_fee; int ret; - int iret; + enum GNUNET_DB_QueryStatus qs; const char *type; struct WirePlugin *wp; @@ -901,21 +934,17 @@ expired_reserve_cb (void *cls, if (NULL == type) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } wp = find_plugin (type); if (NULL == wp) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } /* lookup `closing_fee` */ @@ -925,11 +954,9 @@ expired_reserve_cb (void *cls, session)) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } closing_fee = &wp->af->closing_fee; @@ -956,22 +983,22 @@ expired_reserve_cb (void *cls, reserve_pub, GNUNET_MIN (sizeof (wtid), sizeof (*reserve_pub))); - iret = db_plugin->insert_reserve_closed (db_plugin->cls, - session, - reserve_pub, - now, - account_details, - &wtid, - left, - closing_fee); + qs = db_plugin->insert_reserve_closed (db_plugin->cls, + session, + reserve_pub, + now, + account_details, + &wtid, + left, + closing_fee); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Closing reserve %s over %s (%d, %d)\n", TALER_B2S (reserve_pub), TALER_amount2s (left), ret, - iret); + qs); if ( (GNUNET_OK == ret) && - (GNUNET_OK == iret) ) + (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) ) { /* success, perform wire transfer */ if (GNUNET_SYSERR == @@ -979,11 +1006,9 @@ expired_reserve_cb (void *cls, &amount_without_fee)) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } ctc = GNUNET_new (struct CloseTransferContext); ctc->wp = wp; @@ -1000,33 +1025,29 @@ expired_reserve_cb (void *cls, if (NULL == ctc->ph) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); GNUNET_free (ctc->type); GNUNET_free (ctc); ctc = NULL; + return GNUNET_DB_STATUS_HARD_ERROR; } - return GNUNET_SYSERR; + erc->async_cont = GNUNET_YES; + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } /* Check for hard failure */ - if (GNUNET_SYSERR == iret) + if ( (GNUNET_SYSERR == ret) || + (GNUNET_DB_STATUS_HARD_ERROR == qs) ) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } - /* Reserve balance was almost zero; just commit */ + /* Reserve balance was almost zero OR soft error */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Reserve was virtually empty, moving on\n"); - (void) commit_or_warn (session); - task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); - return GNUNET_SYSERR; + return qs; } @@ -1040,9 +1061,10 @@ static void run_reserve_closures (void *cls) { struct TALER_EXCHANGEDB_Session *session; - int ret; + enum GNUNET_DB_QueryStatus qs; const struct GNUNET_SCHEDULER_TaskContext *tc; - + struct ExpiredReserveContext erc; + task = NULL; reserves_idle = GNUNET_NO; tc = GNUNET_SCHEDULER_get_task_context (); @@ -1068,22 +1090,29 @@ run_reserve_closures (void *cls) GNUNET_SCHEDULER_shutdown (); return; } - ret = db_plugin->get_expired_reserves (db_plugin->cls, - session, - GNUNET_TIME_absolute_get (), - &expired_reserve_cb, - session); - if (GNUNET_SYSERR == ret) + erc.session = session; + erc.async_cont = GNUNET_NO; + qs = db_plugin->get_expired_reserves (db_plugin->cls, + session, + GNUNET_TIME_absolute_get (), + &expired_reserve_cb, + &erc); + switch (qs) { + case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); db_plugin->rollback (db_plugin->cls, session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); return; - } - if (GNUNET_NO == ret) - { + case GNUNET_DB_STATUS_SOFT_ERROR: + db_plugin->rollback (db_plugin->cls, + session); + task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more idle reserves, going back to aggregation\n"); reserves_idle = GNUNET_YES; @@ -1092,6 +1121,13 @@ run_reserve_closures (void *cls) task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + if (GNUNET_YES == erc.async_cont) + break; + (void) commit_or_warn (session); + task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, + NULL); + return; } } diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index a41b19662..98ac11773 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1454,7 +1454,9 @@ postgres_prepare (PGconn *db_conn) " FROM reserves" " WHERE expiration_date<=$1" " AND (current_balance_val != 0 " - " OR current_balance_frac != 0);", + " OR current_balance_frac != 0)" + " ORDER BY expiration_date ASC" + " LIMIT 1;", 1), /* Used in #postgres_get_coin_transactions() to obtain payback transactions for a coin */ @@ -2884,8 +2886,8 @@ postgres_get_ready_deposit (void *cls, * @param deposit_cb function to call for each deposit * @param deposit_cb_cls closure for @a deposit_cb * @param limit maximum number of matching deposits to return - * @return number of rows processed, 0 if none exist, - * #GNUNET_SYSERR on error + * @return transaction status code, if positive: + * number of rows processed, 0 if none exist */ static int postgres_iterate_matching_deposits (void *cls, @@ -4633,57 +4635,50 @@ postgres_insert_wire_fee (void *cls, /** - * Obtain information about expired reserves and their - * remaining balances. - * - * @param cls closure of the plugin - * @param session database connection - * @param now timestamp based on which we decide expiration - * @param rec function to call on expired reserves - * @param rec_cls closure for @a rec - * @return #GNUNET_SYSERR on database error - * #GNUNET_NO if there are no expired non-empty reserves - * #GNUNET_OK on success + * Closure for #reserve_expired_cb(). */ -static int -postgres_get_expired_reserves (void *cls, - struct TALER_EXCHANGEDB_Session *session, - struct GNUNET_TIME_Absolute now, - TALER_EXCHANGEDB_ReserveExpiredCallback rec, - void *rec_cls) +struct ExpiredReserveContext { - struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_absolute_time (&now), - GNUNET_PQ_query_param_end - }; - PGresult *result; - int nrows; + /** + * Function to call for each expired reserve. + */ + TALER_EXCHANGEDB_ReserveExpiredCallback rec; - result = GNUNET_PQ_exec_prepared (session->conn, - "get_expired_reserves", - params); - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } - nrows = PQntuples (result); - if (0 == nrows) - { - /* no matches found */ - PQclear (result); - return GNUNET_NO; - } + /** + * Closure to give to @e rec. + */ + void *rec_cls; - for (int i=0;irec (erc->rec_cls, + &reserve_pub, + &remaining_balance, + account_details, + exp_date); GNUNET_PQ_cleanup_result (rs); if (GNUNET_OK != ret) break; } - PQclear (result); - return GNUNET_OK; + erc->status = ret; +} + + +/** + * Obtain information about expired reserves and their + * remaining balances. + * + * @param cls closure of the plugin + * @param session database connection + * @param now timestamp based on which we decide expiration + * @param rec function to call on expired reserves + * @param rec_cls closure for @a rec + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_get_expired_reserves (void *cls, + struct TALER_EXCHANGEDB_Session *session, + struct GNUNET_TIME_Absolute now, + TALER_EXCHANGEDB_ReserveExpiredCallback rec, + void *rec_cls) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_end + }; + struct ExpiredReserveContext ectx; + enum GNUNET_DB_QueryStatus qs; + + ectx.rec = rec; + ectx.rec_cls = rec_cls; + ectx.status = GNUNET_OK; + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "get_expired_reserves", + params, + &reserve_expired_cb, + &ectx); + if (GNUNET_OK != ectx.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; } @@ -4730,11 +4763,9 @@ postgres_get_expired_reserves (void *cls, * @param wtid wire transfer details * @param amount_with_fee amount we charged to the reserve * @param closing_fee how high is the closing fee - * @return #GNUNET_OK on success, - * #GNUNET_NO if the record exists or on transient errors - * #GNUNET_SYSERR on failure + * @return transaction status code */ -static int +static enum GNUNET_DB_QueryStatus postgres_insert_reserve_closed (void *cls, struct TALER_EXCHANGEDB_Session *session, const struct TALER_ReservePublicKeyP *reserve_pub, @@ -4757,11 +4788,11 @@ postgres_insert_reserve_closed (void *cls, int ret; enum GNUNET_DB_QueryStatus qs; - ret = execute_prepared_non_select (session, - "reserves_close_insert", - params); - if (GNUNET_OK != ret) - return ret; + qs = GNUNET_PQ_eval_prepared_non_select (session->conn, + "reserves_close_insert", + params); + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) + return qs; /* update reserve balance */ reserve.pub = *reserve_pub; @@ -4770,10 +4801,11 @@ postgres_insert_reserve_closed (void *cls, session, &reserve))) { - /* FIXME: #5010 */ /* Existence should have been checked before we got here... */ - GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs); - return (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) ? GNUNET_NO : GNUNET_SYSERR; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + qs = GNUNET_DB_STATUS_HARD_ERROR; + return qs; } ret = TALER_amount_subtract (&reserve.balance, &reserve.balance, @@ -4786,18 +4818,12 @@ postgres_insert_reserve_closed (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Closing of reserve `%s' refused due to balance missmatch. Retrying.\n", TALER_B2S (reserve_pub)); - return GNUNET_NO; + return GNUNET_DB_STATUS_HARD_ERROR; } GNUNET_break (GNUNET_NO == ret); - qs = reserves_update (cls, - session, - &reserve); - if (0 >= qs) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - return GNUNET_OK; + return reserves_update (cls, + session, + &reserve); } @@ -4809,9 +4835,9 @@ postgres_insert_reserve_closed (void *cls, * @param type type of the wire transfer (i.e. "sepa") * @param buf buffer with wire transfer preparation data * @param buf_size number of bytes in @a buf - * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors + * @return query status code */ -static int +static enum GNUNET_DB_QueryStatus postgres_wire_prepare_data_insert (void *cls, struct TALER_EXCHANGEDB_Session *session, const char *type, @@ -4824,9 +4850,9 @@ postgres_wire_prepare_data_insert (void *cls, GNUNET_PQ_query_param_end }; - return execute_prepared_non_select (session, - "wire_prepare_data_insert", - params); + return GNUNET_PQ_eval_prepared_non_select (session->conn, + "wire_prepare_data_insert", + params); } diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index ae0c82130..4ca7c39c9 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -119,7 +119,7 @@ test_wire_prepare (struct TALER_EXCHANGEDB_Session *session) session, &dead_prepare_cb, NULL)); - FAILIF (GNUNET_OK != + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != plugin->wire_prepare_data_insert (plugin->cls, session, "testcase", @@ -1666,7 +1666,7 @@ run (void *cls) GNUNET_assert (GNUNET_OK == TALER_string_to_amount (CURRENCY ":0.000010", &fee_closing)); - FAILIF (GNUNET_OK != + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != plugin->insert_reserve_closed (plugin->cls, session, &reserve_pub, diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 9a97b5f37..9f80fda42 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -1009,9 +1009,9 @@ typedef int * @param left amount left in the reserve * @param account_details information about the reserve's bank account * @param expiration_date when did the reserve expire - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + * @return transaction status code to pass on */ -typedef int +typedef enum GNUNET_DB_QueryStatus (*TALER_EXCHANGEDB_ReserveExpiredCallback)(void *cls, const struct TALER_ReservePublicKeyP *reserve_pub, const struct TALER_Amount *left, @@ -1821,11 +1821,9 @@ struct TALER_EXCHANGEDB_Plugin * @param now timestamp based on which we decide expiration * @param rec function to call on expired reserves * @param rec_cls closure for @a rec - * @return #GNUNET_SYSERR on database error - * #GNUNET_NO if there are no expired non-empty reserves - * #GNUNET_OK on success + * @return transaction status */ - int + enum GNUNET_DB_QueryStatus (*get_expired_reserves)(void *cls, struct TALER_EXCHANGEDB_Session *session, struct GNUNET_TIME_Absolute now, @@ -1844,10 +1842,9 @@ struct TALER_EXCHANGEDB_Plugin * @param wtid identifier for the wire transfer * @param amount_with_fee amount we charged to the reserve * @param closing_fee how high is the closing fee - * @return #GNUNET_OK on success, #GNUNET_NO if the record exists, - * #GNUNET_SYSERR on failure + * @return transaction status code */ - int + enum GNUNET_DB_QueryStatus (*insert_reserve_closed)(void *cls, struct TALER_EXCHANGEDB_Session *session, const struct TALER_ReservePublicKeyP *reserve_pub, @@ -1866,9 +1863,9 @@ struct TALER_EXCHANGEDB_Plugin * @param type type of the wire transfer (i.e. "sepa") * @param buf buffer with wire transfer preparation data * @param buf_size number of bytes in @a buf - * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors + * @return query status code */ - int + enum GNUNET_DB_QueryStatus (*wire_prepare_data_insert)(void *cls, struct TALER_EXCHANGEDB_Session *session, const char *type, -- cgit v1.2.3