diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 63 |
1 files changed, 16 insertions, 47 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 5da0a809..c82b6666 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -90,11 +90,6 @@ struct AggregationUnit const struct TALER_EXCHANGEDB_AccountInfo *wa; /** - * Database session for all of our transactions. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** * Array of row_ids from the aggregation. */ uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; @@ -355,7 +350,6 @@ deposit_cb (void *cls, au->total_amount = *amount_with_fee; au->have_refund = GNUNET_NO; qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - au->session, coin_pub, &au->merchant_pub, h_contract_terms, @@ -444,7 +438,6 @@ deposit_cb (void *cls, enum GNUNET_DB_QueryStatus qs; qs = db_plugin->get_wire_fee (db_plugin->cls, - au->session, au->wa->method, au->execution_time, &start_date, @@ -468,7 +461,6 @@ deposit_cb (void *cls, TALER_B2S (&au->wtid), TALER_amount2s (&au->wire_fee)); qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - au->session, &au->wtid, row_id); if (qs <= 0) @@ -480,7 +472,6 @@ deposit_cb (void *cls, "Aggregator marks deposit %llu as done\n", (unsigned long long) row_id); qs = db_plugin->mark_deposit_done (db_plugin->cls, - au->session, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { @@ -535,7 +526,6 @@ aggregate_cb (void *cls, /* compute contribution of this coin (after fees) */ au->have_refund = GNUNET_NO; qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - au->session, coin_pub, &au->merchant_pub, h_contract_terms, @@ -596,7 +586,6 @@ aggregate_cb (void *cls, au->additional_rows[au->rows_offset++] = row_id; /* insert into aggregation tracking table */ qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - au->session, &au->wtid, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) @@ -605,7 +594,6 @@ aggregate_cb (void *cls, return qs; } qs = db_plugin->mark_deposit_done (db_plugin->cls, - au->session, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { @@ -622,16 +610,14 @@ aggregate_cb (void *cls, /** * Perform a database commit. If it fails, print a warning. * - * @param session session to perform the commit for. * @return status of commit */ static enum GNUNET_DB_QueryStatus -commit_or_warn (struct TALER_EXCHANGEDB_Session *session) +commit_or_warn (void) { enum GNUNET_DB_QueryStatus qs; - qs = db_plugin->commit (db_plugin->cls, - session); + qs = db_plugin->commit (db_plugin->cls); if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) return qs; GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) @@ -652,24 +638,23 @@ static void run_aggregation (void *cls) { struct AggregationUnit au_active; - struct TALER_EXCHANGEDB_Session *session; enum GNUNET_DB_QueryStatus qs; (void) cls; task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking for ready deposits to aggregate\n"); - if (NULL == (session = db_plugin->get_session (db_plugin->cls))) + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain database session!\n"); + "Failed to obtain database connection!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } if (GNUNET_OK != - db_plugin->start_deferred_wire_out (db_plugin->cls, - session)) + db_plugin->start_deferred_wire_out (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); @@ -680,16 +665,13 @@ run_aggregation (void *cls) memset (&au_active, 0, sizeof (au_active)); - au_active.session = session; qs = db_plugin->get_ready_deposit (db_plugin->cls, - session, &deposit_cb, &au_active); if (0 >= qs) { cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls, - session); + db_plugin->rollback (db_plugin->cls); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -729,7 +711,6 @@ run_aggregation (void *cls) "Found ready deposit for %s, aggregating\n", TALER_B2S (&au_active.merchant_pub)); qs = db_plugin->iterate_matching_deposits (db_plugin->cls, - session, &au_active.h_wire, &au_active.merchant_pub, &aggregate_cb, @@ -741,8 +722,7 @@ run_aggregation (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls, - session); + db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; @@ -752,8 +732,7 @@ run_aggregation (void *cls) /* serializiability issue, try again */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls, - session); + db_plugin->rollback (db_plugin->cls); cleanup_au (&au_active); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, @@ -780,14 +759,12 @@ run_aggregation (void *cls) TALER_amount2s (&au_active.final_amount)); /* Rollback ongoing transaction, as we will not use the respective WTID and thus need to remove the tracking data */ - db_plugin->rollback (db_plugin->cls, - session); + db_plugin->rollback (db_plugin->cls); /* There were results, just the value was too low. Start another transaction to mark all* of the selected deposits as minor! */ if (GNUNET_OK != db_plugin->start (db_plugin->cls, - session, "aggregator mark tiny transactions")) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -799,14 +776,12 @@ run_aggregation (void *cls) } /* Mark transactions by row_id as minor */ qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - session, au_active.row_id); if (0 <= qs) { for (unsigned int i = 0; i<au_active.rows_offset; i++) { qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - session, au_active.additional_rows[i]); if (0 > qs) break; @@ -816,8 +791,7 @@ run_aggregation (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls, - session); + db_plugin->rollback (db_plugin->cls); cleanup_au (&au_active); /* start again */ GNUNET_assert (NULL == task); @@ -827,15 +801,14 @@ run_aggregation (void *cls) } if (GNUNET_DB_STATUS_HARD_ERROR == qs) { - db_plugin->rollback (db_plugin->cls, - session); + db_plugin->rollback (db_plugin->cls); cleanup_au (&au_active); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } /* commit */ - (void) commit_or_warn (session); + (void) commit_or_warn (); cleanup_au (&au_active); /* start again */ @@ -877,7 +850,6 @@ run_aggregation (void *cls) (unsigned int) buf_size); /* Commit our intention to execute the wire transfer! */ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - session, au_active.wa->method, buf, buf_size); @@ -887,7 +859,6 @@ run_aggregation (void *cls) table constraints */ if (qs >= 0) qs = db_plugin->store_wire_transfer_out (db_plugin->cls, - session, au_active.execution_time, &au_active.wtid, au_active.wire, @@ -899,8 +870,7 @@ run_aggregation (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue for prepared wire data; trying again later!\n"); - db_plugin->rollback (db_plugin->cls, - session); + db_plugin->rollback (db_plugin->cls); /* start again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, @@ -910,8 +880,7 @@ run_aggregation (void *cls) if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); + db_plugin->rollback (db_plugin->cls); /* die hard */ global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); @@ -923,7 +892,7 @@ run_aggregation (void *cls) /* Now we can finally commit the overall transaction, as we are again consistent if all of this passes. */ - switch (commit_or_warn (session)) + switch (commit_or_warn ()) { case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ |