From 2d662e3f8e62e750cf2dcf2030cc69e8ae176960 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 24 Jun 2017 12:15:11 +0200 Subject: fix #5010 for taler-exchange-aggregator --- src/exchange/taler-exchange-aggregator.c | 411 ++++++++++++++++--------------- 1 file changed, 216 insertions(+), 195 deletions(-) (limited to 'src/exchange') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 8dd46f7f1..7bd437b5d 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -346,18 +346,19 @@ advance_fees (struct WirePlugin *wp, * @param wp wire transfer fee data structure to update * @param now timestamp to update fees to * @param session DB session to use - * @return #GNUNET_OK on success, #GNUNET_SYSERR if we - * lack current fee information (and need to exit) + * @return transaction status */ -static int +static enum GNUNET_DB_QueryStatus update_fees (struct WirePlugin *wp, struct GNUNET_TIME_Absolute now, struct TALER_EXCHANGEDB_Session *session) { + enum GNUNET_DB_QueryStatus qs; + advance_fees (wp, now); if (NULL != wp->af) - return GNUNET_OK; + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; /* Let's try to load it from disk... */ wp->af = TALER_EXCHANGEDB_fees_read (cfg, wp->type); @@ -367,26 +368,26 @@ update_fees (struct WirePlugin *wp, NULL != p; p = p->next) { - if (GNUNET_SYSERR == - db_plugin->insert_wire_fee (db_plugin->cls, - session, - wp->type, - p->start_date, - p->end_date, - &p->wire_fee, - &p->master_sig)) + qs = db_plugin->insert_wire_fee (db_plugin->cls, + session, + wp->type, + p->start_date, + p->end_date, + &p->wire_fee, + &p->master_sig); + if (qs < 0) { TALER_EXCHANGEDB_fees_free (wp->af); wp->af = NULL; - return GNUNET_SYSERR; + return qs; } } if (NULL != wp->af) - return GNUNET_OK; + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to find current wire transfer fees for `%s'\n", wp->type); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS; } @@ -425,6 +426,26 @@ find_plugin (const char *type) return wp; } + +/** + * Free data stored in #au. + */ +static void +cleanup_au (void) +{ + if (NULL == au) + return; + GNUNET_free_non_null (au->additional_rows); + if (NULL != au->wire) + { + json_decref (au->wire); + au->wire = NULL; + } + GNUNET_free (au); + au = NULL; +} + + /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. * @@ -463,11 +484,7 @@ shutdown_task (void *cls) } db_plugin->rollback (db_plugin->cls, au->session); - GNUNET_free_non_null (au->additional_rows); - if (NULL != au->wire) - json_decref (au->wire); - au = NULL; - GNUNET_free (au); + cleanup_au (); } if (NULL != ctc) { @@ -564,9 +581,9 @@ exchange_serve_process_config () * @param wire_deadline by which the merchant adviced that he would like the * wire transfer to be executed * @param wire wire details for the merchant - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate */ -static int +static enum GNUNET_DB_QueryStatus deposit_cb (void *cls, uint64_t row_id, const struct TALER_MerchantPublicKeyP *merchant_pub, @@ -588,7 +605,7 @@ deposit_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Fatally malformed record at row %llu\n", (unsigned long long) row_id); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } au->row_id = row_id; GNUNET_assert (NULL == au->wire); @@ -604,38 +621,41 @@ deposit_cb (void *cls, au->wp = find_plugin (extract_type (au->wire)); if (NULL == au->wp) - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; /* make sure we have current fees */ au->execution_time = GNUNET_TIME_absolute_get (); (void) GNUNET_TIME_round_abs (&au->execution_time); - if (GNUNET_OK != - update_fees (au->wp, - au->execution_time, - au->session)) - return GNUNET_SYSERR; + qs = update_fees (au->wp, + au->execution_time, + au->session); + if (qs <= 0) + { + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + qs = GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; + } au->wire_fee = au->wp->af->wire_fee; - if (GNUNET_OK != - db_plugin->insert_aggregation_tracking (db_plugin->cls, - au->session, - &au->wtid, - row_id)) + qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, + au->session, + &au->wtid, + row_id); + if (qs <= 0) { - GNUNET_break (0); - return GNUNET_SYSERR; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; } qs = db_plugin->mark_deposit_done (db_plugin->cls, au->session, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { - /* FIXME #5010 */ - GNUNET_break (0); - au->failed = GNUNET_YES; - return GNUNET_SYSERR; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; } - return GNUNET_OK; + return qs; } @@ -653,9 +673,9 @@ deposit_cb (void *cls, * @param wire_deadline by which the merchant adviced that he would like the * wire transfer to be executed * @param wire wire details for the merchant - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + * @return transaction status code */ -static int +static enum GNUNET_DB_QueryStatus aggregate_cb (void *cls, uint64_t row_id, const struct TALER_MerchantPublicKeyP *merchant_pub, @@ -682,9 +702,12 @@ aggregate_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Fatally malformed record at %llu\n", (unsigned long long) row_id); - return GNUNET_SYSERR; + return GNUNET_DB_STATUS_HARD_ERROR; } /* add to total */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding transaction amount %s to aggregation\n", + TALER_amount2s (&delta)); if (GNUNET_OK != TALER_amount_add (&au->total_amount, &au->total_amount, @@ -694,14 +717,14 @@ aggregate_cb (void *cls, "Overflow or currency incompatibility during aggregation at %llu\n", (unsigned long long) row_id); /* Skip this one, but keep going! */ - return GNUNET_OK; + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } if (au->rows_offset >= aggregation_limit) { /* Bug: we asked for at most #aggregation_limit results! */ GNUNET_break (0); /* Skip this one, but keep going. */ - return GNUNET_OK; + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } if (NULL == au->additional_rows) au->additional_rows = GNUNET_new_array (aggregation_limit, @@ -709,26 +732,27 @@ aggregate_cb (void *cls, /* "append" to our list of rows */ au->additional_rows[au->rows_offset++] = row_id; /* insert into aggregation tracking table */ - if (GNUNET_OK != - db_plugin->insert_aggregation_tracking (db_plugin->cls, - au->session, - &au->wtid, - row_id)) + qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, + au->session, + &au->wtid, + row_id); + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { - GNUNET_break (0); - return GNUNET_SYSERR; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; } qs = db_plugin->mark_deposit_done (db_plugin->cls, au->session, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { - /* FIXME: #5010 */ - GNUNET_break (0); - au->failed = GNUNET_YES; - return GNUNET_SYSERR; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; } - return GNUNET_OK; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Added row %llu to aggregation\n", + (unsigned long long) row_id); + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } @@ -948,15 +972,18 @@ expired_reserve_cb (void *cls, } /* lookup `closing_fee` */ - if (GNUNET_OK != - update_fees (wp, - now, - session)) + qs = update_fees (wp, + now, + session); + if (qs <= 0) { - GNUNET_break (0); + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + qs = GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_DB_STATUS_HARD_ERROR; + if (GNUNET_DB_STATUS_HARD_ERROR == qs) + GNUNET_SCHEDULER_shutdown (); + return qs; } closing_fee = &wp->af->closing_fee; @@ -1144,7 +1171,6 @@ run_aggregation (void *cls) static int swap; struct TALER_EXCHANGEDB_Session *session; enum GNUNET_DB_QueryStatus qs; - int ret; const struct GNUNET_SCHEDULER_TaskContext *tc; task = NULL; @@ -1179,19 +1205,16 @@ run_aggregation (void *cls) } au = GNUNET_new (struct AggregationUnit); au->session = session; - ret = db_plugin->get_ready_deposit (db_plugin->cls, - session, - &deposit_cb, - au); - if (0 >= ret) + qs = db_plugin->get_ready_deposit (db_plugin->cls, + session, + &deposit_cb, + au); + if (0 >= qs) { - if (NULL != au->wire) - json_decref (au->wire); - GNUNET_free (au); - au = NULL; + cleanup_au (); db_plugin->rollback (db_plugin->cls, session); - if (GNUNET_SYSERR == ret) + if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); @@ -1199,6 +1222,14 @@ run_aggregation (void *cls) GNUNET_SCHEDULER_shutdown (); return; } + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + /* should re-try immediately */ + swap--; /* do not count failed attempts */ + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + NULL); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more ready deposits, going to sleep\n"); if ( (GNUNET_YES == test_mode) && @@ -1209,12 +1240,13 @@ run_aggregation (void *cls) } else { - /* nothing to do, sleep for a minute and try again */ if ( (GNUNET_NO == reserves_idle) || - (GNUNET_YES == test_mode) ) + (GNUNET_YES == test_mode) ) + /* Possibly more to on reserves, go for it immediately */ task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, NULL); else + /* nothing to do, sleep for a minute and try again */ task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &run_aggregation, NULL); @@ -1226,29 +1258,37 @@ run_aggregation (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Found ready deposit for %s, aggregating\n", TALER_B2S (&au->merchant_pub)); - ret = db_plugin->iterate_matching_deposits (db_plugin->cls, - session, - &au->h_wire, - &au->merchant_pub, - &aggregate_cb, - au, - aggregation_limit); - if ( (GNUNET_SYSERR == ret) || + qs = db_plugin->iterate_matching_deposits (db_plugin->cls, + session, + &au->h_wire, + &au->merchant_pub, + &aggregate_cb, + au, + aggregation_limit); + if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) || (GNUNET_YES == au->failed) ) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); - GNUNET_free_non_null (au->additional_rows); - json_decref (au->wire); - GNUNET_free (au); - au = NULL; + cleanup_au (); db_plugin->rollback (db_plugin->cls, session); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); return; } - + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + db_plugin->rollback (db_plugin->cls, + session); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + NULL); + return; + } + /* Subtract wire transfer fee and round to the unit supported by the wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ @@ -1263,13 +1303,16 @@ run_aggregation (void *cls) (0 == au->final_amount.fraction) ) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Aggregate value too low for transfer\n"); + "Aggregate value too low for transfer (%d/%s)\n", + qs, + TALER_amount2s (&au->final_amount)); /* Rollback ongoing transaction, as we will not use the respective WTID and thus need to remove the tracking data */ db_plugin->rollback (db_plugin->cls, session); - /* Start another transaction to mark all* of the selected deposits - *as minor! */ + + /* There were results, just the value was too low. Start another + transaction to mark all* of the selected deposits as minor! */ if (GNUNET_OK != db_plugin->start (db_plugin->cls, session)) @@ -1277,16 +1320,11 @@ run_aggregation (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); global_ret = GNUNET_SYSERR; + cleanup_au (); GNUNET_SCHEDULER_shutdown (); - GNUNET_free_non_null (au->additional_rows); - if (NULL != au->wire) - json_decref (au->wire); - GNUNET_free (au); - au = NULL; return; } /* Mark transactions by row_id as minor */ - ret = GNUNET_OK; qs = db_plugin->mark_deposit_tiny (db_plugin->cls, session, au->row_id); @@ -1303,13 +1341,11 @@ run_aggregation (void *cls) } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); db_plugin->rollback (db_plugin->cls, session); - GNUNET_free_non_null (au->additional_rows); - if (NULL != au->wire) - json_decref (au->wire); - GNUNET_free (au); - au = NULL; + cleanup_au (); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); @@ -1319,21 +1355,13 @@ run_aggregation (void *cls) { db_plugin->rollback (db_plugin->cls, session); - GNUNET_free_non_null (au->additional_rows); - if (NULL != au->wire) - json_decref (au->wire); - GNUNET_free (au); - au = NULL; + cleanup_au (); GNUNET_SCHEDULER_shutdown (); return; } /* commit */ (void) commit_or_warn (session); - GNUNET_free_non_null (au->additional_rows); - if (NULL != au->wire) - json_decref (au->wire); - GNUNET_free (au); - au = NULL; + cleanup_au (); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); @@ -1361,11 +1389,7 @@ run_aggregation (void *cls) GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, session); - GNUNET_free_non_null (au->additional_rows); - if (NULL != au->wire) - json_decref (au->wire); - GNUNET_free (au); - au = NULL; + cleanup_au (); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); @@ -1388,8 +1412,10 @@ prepare_cb (void *cls, size_t buf_size) { struct TALER_EXCHANGEDB_Session *session = au->session; + enum GNUNET_DB_QueryStatus qs; GNUNET_free_non_null (au->additional_rows); + au->additional_rows = NULL; if (NULL == buf) { GNUNET_break (0); /* why? how to best recover? */ @@ -1398,74 +1424,53 @@ prepare_cb (void *cls, /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); - if (NULL != au->wire) - { - json_decref (au->wire); - au->wire = NULL; - } - GNUNET_free (au); - au = NULL; + cleanup_au (); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Storing %u bytes of wire prepare data\n", + (unsigned int) buf_size); /* Commit our intention to execute the wire transfer! */ - if (GNUNET_OK != - db_plugin->wire_prepare_data_insert (db_plugin->cls, - session, - au->wp->type, - buf, - buf_size)) + qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, + session, + au->wp->type, + buf, + buf_size); + /* Commit the WTID data to 'wire_out' to finally satisfy aggregation + table constraints */ + if (qs >= 0) + qs = db_plugin->store_wire_transfer_out (db_plugin->cls, + session, + au->execution_time, + &au->wtid, + au->wire, + &au->final_amount); + cleanup_au (); + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { - GNUNET_break (0); /* why? how to best recover? */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue for prepared wire data; trying again later!\n"); db_plugin->rollback (db_plugin->cls, session); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); - if (NULL != au->wire) - { - json_decref (au->wire); - au->wire = NULL; - } - GNUNET_free (au); - au = NULL; return; } - - /* Commit the WTID data to 'wire_out' to finally satisfy aggregation - table constraints */ - if (GNUNET_OK != - db_plugin->store_wire_transfer_out (db_plugin->cls, - session, - au->execution_time, - &au->wtid, - au->wire, - &au->final_amount)) + if (GNUNET_DB_STATUS_HARD_ERROR == qs) { - GNUNET_break (0); /* why? how to best recover? */ + GNUNET_break (0); db_plugin->rollback (db_plugin->cls, session); - /* start again */ - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - if (NULL != au->wire) - { - json_decref (au->wire); - au->wire = NULL; - } - GNUNET_free (au); - au = NULL; + /* die hard */ + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stored wire transfer out instructions\n"); - if (NULL != au->wire) - { - json_decref (au->wire); - au->wire = NULL; - } - GNUNET_free (au); - au = NULL; /* Now we can finally commit the overall transaction, as we are again consistent if all of this passes. */ @@ -1473,6 +1478,8 @@ prepare_cb (void *cls, { case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Commit issue for prepared wire data; trying again later!\n"); task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); return; @@ -1512,6 +1519,7 @@ wire_confirm_cb (void *cls, const char *emsg) { struct TALER_EXCHANGEDB_Session *session = wpd->session; + enum GNUNET_DB_QueryStatus qs; wpd->eh = NULL; if (GNUNET_SYSERR == success) @@ -1527,16 +1535,25 @@ wire_confirm_cb (void *cls, wpd = NULL; return; } - if (GNUNET_OK != - db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, - session, - wpd->row_id)) + qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, + session, + wpd->row_id); + if (0 >= qs) { - GNUNET_break (0); /* why!? */ + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); db_plugin->rollback (db_plugin->cls, session); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + /* try again */ + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + NULL); + } + else + { + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + } GNUNET_free (wpd); wpd = NULL; return; @@ -1621,7 +1638,7 @@ wire_prepare_cb (void *cls, static void run_transfers (void *cls) { - int ret; + enum GNUNET_DB_QueryStatus qs; struct TALER_EXCHANGEDB_Session *session; const struct GNUNET_SCHEDULER_TaskContext *tc; @@ -1651,35 +1668,39 @@ run_transfers (void *cls) } wpd = GNUNET_new (struct WirePrepareData); wpd->session = session; - ret = db_plugin->wire_prepare_data_get (db_plugin->cls, - session, - &wire_prepare_cb, - NULL); - if (GNUNET_SYSERR == ret) + qs = db_plugin->wire_prepare_data_get (db_plugin->cls, + session, + &wire_prepare_cb, + NULL); + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) + return; /* continues in #wire_prepare_cb() */ + db_plugin->rollback (db_plugin->cls, + session); + GNUNET_free (wpd); + wpd = NULL; + switch (qs) { - GNUNET_break (0); /* why? how to best recover? */ - db_plugin->rollback (db_plugin->cls, - session); + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; return; - } - if (GNUNET_NO == ret) - { + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* no more prepared wire transfers, go back to aggregation! */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more pending wire transfers, starting aggregation\n"); - db_plugin->rollback (db_plugin->cls, - session); task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); - GNUNET_free (wpd); - wpd = NULL; return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* should be impossible */ + GNUNET_assert (0); } - /* otherwise, continues in #wire_prepare_cb() */ } -- cgit v1.2.3