summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-aggregator.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r--src/exchange/taler-exchange-aggregator.c63
1 files changed, 16 insertions, 47 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 5da0a8094..c82b66669 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -90,11 +90,6 @@ struct AggregationUnit
const struct TALER_EXCHANGEDB_AccountInfo *wa;
/**
- * Database session for all of our transactions.
- */
- struct TALER_EXCHANGEDB_Session *session;
-
- /**
* Array of row_ids from the aggregation.
*/
uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
@@ -355,7 +350,6 @@ deposit_cb (void *cls,
au->total_amount = *amount_with_fee;
au->have_refund = GNUNET_NO;
qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
- au->session,
coin_pub,
&au->merchant_pub,
h_contract_terms,
@@ -444,7 +438,6 @@ deposit_cb (void *cls,
enum GNUNET_DB_QueryStatus qs;
qs = db_plugin->get_wire_fee (db_plugin->cls,
- au->session,
au->wa->method,
au->execution_time,
&start_date,
@@ -468,7 +461,6 @@ deposit_cb (void *cls,
TALER_B2S (&au->wtid),
TALER_amount2s (&au->wire_fee));
qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
- au->session,
&au->wtid,
row_id);
if (qs <= 0)
@@ -480,7 +472,6 @@ deposit_cb (void *cls,
"Aggregator marks deposit %llu as done\n",
(unsigned long long) row_id);
qs = db_plugin->mark_deposit_done (db_plugin->cls,
- au->session,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
@@ -535,7 +526,6 @@ aggregate_cb (void *cls,
/* compute contribution of this coin (after fees) */
au->have_refund = GNUNET_NO;
qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
- au->session,
coin_pub,
&au->merchant_pub,
h_contract_terms,
@@ -596,7 +586,6 @@ aggregate_cb (void *cls,
au->additional_rows[au->rows_offset++] = row_id;
/* insert into aggregation tracking table */
qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
- au->session,
&au->wtid,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
@@ -605,7 +594,6 @@ aggregate_cb (void *cls,
return qs;
}
qs = db_plugin->mark_deposit_done (db_plugin->cls,
- au->session,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
@@ -622,16 +610,14 @@ aggregate_cb (void *cls,
/**
* Perform a database commit. If it fails, print a warning.
*
- * @param session session to perform the commit for.
* @return status of commit
*/
static enum GNUNET_DB_QueryStatus
-commit_or_warn (struct TALER_EXCHANGEDB_Session *session)
+commit_or_warn (void)
{
enum GNUNET_DB_QueryStatus qs;
- qs = db_plugin->commit (db_plugin->cls,
- session);
+ qs = db_plugin->commit (db_plugin->cls);
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
return qs;
GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -652,24 +638,23 @@ static void
run_aggregation (void *cls)
{
struct AggregationUnit au_active;
- struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs;
(void) cls;
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Checking for ready deposits to aggregate\n");
- if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain database session!\n");
+ "Failed to obtain database connection!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
if (GNUNET_OK !=
- db_plugin->start_deferred_wire_out (db_plugin->cls,
- session))
+ db_plugin->start_deferred_wire_out (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
@@ -680,16 +665,13 @@ run_aggregation (void *cls)
memset (&au_active,
0,
sizeof (au_active));
- au_active.session = session;
qs = db_plugin->get_ready_deposit (db_plugin->cls,
- session,
&deposit_cb,
&au_active);
if (0 >= qs)
{
cleanup_au (&au_active);
- db_plugin->rollback (db_plugin->cls,
- session);
+ db_plugin->rollback (db_plugin->cls);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -729,7 +711,6 @@ run_aggregation (void *cls)
"Found ready deposit for %s, aggregating\n",
TALER_B2S (&au_active.merchant_pub));
qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
- session,
&au_active.h_wire,
&au_active.merchant_pub,
&aggregate_cb,
@@ -741,8 +722,7 @@ run_aggregation (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n");
cleanup_au (&au_active);
- db_plugin->rollback (db_plugin->cls,
- session);
+ db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
@@ -752,8 +732,7 @@ run_aggregation (void *cls)
/* serializiability issue, try again */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue, trying again later!\n");
- db_plugin->rollback (db_plugin->cls,
- session);
+ db_plugin->rollback (db_plugin->cls);
cleanup_au (&au_active);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
@@ -780,14 +759,12 @@ run_aggregation (void *cls)
TALER_amount2s (&au_active.final_amount));
/* Rollback ongoing transaction, as we will not use the respective
WTID and thus need to remove the tracking data */
- db_plugin->rollback (db_plugin->cls,
- session);
+ db_plugin->rollback (db_plugin->cls);
/* There were results, just the value was too low. Start another
transaction to mark all* of the selected deposits as minor! */
if (GNUNET_OK !=
db_plugin->start (db_plugin->cls,
- session,
"aggregator mark tiny transactions"))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -799,14 +776,12 @@ run_aggregation (void *cls)
}
/* Mark transactions by row_id as minor */
qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- session,
au_active.row_id);
if (0 <= qs)
{
for (unsigned int i = 0; i<au_active.rows_offset; i++)
{
qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- session,
au_active.additional_rows[i]);
if (0 > qs)
break;
@@ -816,8 +791,7 @@ run_aggregation (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue, trying again later!\n");
- db_plugin->rollback (db_plugin->cls,
- session);
+ db_plugin->rollback (db_plugin->cls);
cleanup_au (&au_active);
/* start again */
GNUNET_assert (NULL == task);
@@ -827,15 +801,14 @@ run_aggregation (void *cls)
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
- db_plugin->rollback (db_plugin->cls,
- session);
+ db_plugin->rollback (db_plugin->cls);
cleanup_au (&au_active);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
/* commit */
- (void) commit_or_warn (session);
+ (void) commit_or_warn ();
cleanup_au (&au_active);
/* start again */
@@ -877,7 +850,6 @@ run_aggregation (void *cls)
(unsigned int) buf_size);
/* Commit our intention to execute the wire transfer! */
qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
- session,
au_active.wa->method,
buf,
buf_size);
@@ -887,7 +859,6 @@ run_aggregation (void *cls)
table constraints */
if (qs >= 0)
qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
- session,
au_active.execution_time,
&au_active.wtid,
au_active.wire,
@@ -899,8 +870,7 @@ run_aggregation (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue for prepared wire data; trying again later!\n");
- db_plugin->rollback (db_plugin->cls,
- session);
+ db_plugin->rollback (db_plugin->cls);
/* start again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
@@ -910,8 +880,7 @@ run_aggregation (void *cls)
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- session);
+ db_plugin->rollback (db_plugin->cls);
/* die hard */
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
@@ -923,7 +892,7 @@ run_aggregation (void *cls)
/* Now we can finally commit the overall transaction, as we are
again consistent if all of this passes. */
- switch (commit_or_warn (session))
+ switch (commit_or_warn ())
{
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */