summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-04-06 10:34:42 +0200
committerChristian Grothoff <christian@grothoff.org>2016-04-06 10:34:42 +0200
commit4977a3eb8fe403eca5d30ce7ec397213f4a4eadd (patch)
tree09ea839a4a417f4c7248e3875ed926decd957ff4
parentad8351c912995a9ef0524822f0fdeef55e9d27a9 (diff)
downloadexchange-4977a3eb8fe403eca5d30ce7ec397213f4a4eadd.tar.gz
exchange-4977a3eb8fe403eca5d30ce7ec397213f4a4eadd.tar.bz2
exchange-4977a3eb8fe403eca5d30ce7ec397213f4a4eadd.zip
address 'global_ret' simplification TODO
-rw-r--r--src/exchange/taler-exchange-aggregator.c116
1 files changed, 69 insertions, 47 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index cfc11a5f9..96922eff2 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -20,7 +20,6 @@
* @author Christian Grothoff
*
* TODO:
- * - simplify global_ret: make it a global!
* - handle shutdown more nicely (call 'cancel' method on wire transfers)
*/
#include "platform.h"
@@ -68,6 +67,12 @@ static struct TALER_WIRE_Plugin *wire_plugin;
static struct GNUNET_SCHEDULER_Task *task;
/**
+ * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
+ * on serious errors.
+ */
+static int global_ret;
+
+/**
* #GNUNET_YES if we are in test mode and are using temporary tables.
*/
static int test_mode;
@@ -85,6 +90,25 @@ static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT
/**
+ * We're being aborted with CTRL-C (or SIGTERM). Shut down.
+ *
+ * @param cls closure
+ * @param tc scheduler context
+ */
+static void
+shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (NULL != task)
+ {
+ GNUNET_SCHEDULER_cancel (task);
+ task = NULL;
+ }
+ /* FIXME: other shutdown stuff here! */
+}
+
+
+/**
* Load configuration parameters for the exchange
* server into the corresponding global variables.
*
@@ -217,11 +241,6 @@ struct AggregationUnit
unsigned long long *additional_rows;
/**
- * Pointer to global return value. Closure for #run().
- */
- int *global_ret;
-
- /**
* Offset specifying how many #additional_rows are in use.
*/
unsigned int rows_offset;
@@ -314,7 +333,6 @@ deposit_cb (void *cls,
}
-
/**
* Function called with details about another deposit we
* can aggregate into an existing aggregation unit.
@@ -433,14 +451,13 @@ prepare_cb (void *cls,
* Main work function that queries the DB and aggregates transactions
* into larger wire transfers.
*
- * @param cls pointer to an `int` which we will return from main()
+ * @param cls NULL
* @param tc scheduler context
*/
static void
run_aggregation (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- int *global_ret = cls;
struct TALER_EXCHANGEDB_Session *session;
struct AggregationUnit *au;
unsigned int i;
@@ -453,7 +470,7 @@ run_aggregation (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database session!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
if (GNUNET_OK !=
@@ -462,7 +479,7 @@ run_aggregation (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
au = GNUNET_new (struct AggregationUnit);
@@ -482,7 +499,7 @@ run_aggregation (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
if (GNUNET_YES == test_mode)
@@ -495,7 +512,7 @@ run_aggregation (void *cls,
/* nothing to do, sleep for a minute and try again */
task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
&run_aggregation,
- global_ret);
+ NULL);
}
return;
}
@@ -518,7 +535,7 @@ run_aggregation (void *cls,
GNUNET_free (au);
db_plugin->rollback (db_plugin->cls,
session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
/* Round to the unit supported by the wire transfer method */
@@ -541,7 +558,7 @@ run_aggregation (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
json_decref (au->wire);
@@ -576,10 +593,9 @@ run_aggregation (void *cls,
GNUNET_free (au);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
- au->global_ret = global_ret;
au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls,
au->wire,
&au->total_amount,
@@ -600,7 +616,7 @@ run_aggregation (void *cls,
GNUNET_free (au);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
/* otherwise we continue with #prepare_cb(), see below */
@@ -632,7 +648,6 @@ prepare_cb (void *cls,
size_t buf_size)
{
struct AggregationUnit *au = cls;
- int *global_ret = au->global_ret;
struct TALER_EXCHANGEDB_Session *session = au->session;
GNUNET_free_non_null (au->additional_rows);
@@ -646,7 +661,7 @@ prepare_cb (void *cls,
session);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
@@ -663,7 +678,7 @@ prepare_cb (void *cls,
session);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
@@ -677,13 +692,13 @@ prepare_cb (void *cls,
"Failed to commit database transaction!\n");
/* try again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
/* run alternative task: actually do wire transfer! */
task = GNUNET_SCHEDULER_add_now (&run_transfers,
- &global_ret);
+ NULL);
}
@@ -704,12 +719,6 @@ struct WirePrepareData
struct TALER_WIRE_ExecuteHandle *eh;
/**
- * Pointer to global return value. Closure for #run().
- */
- int *global_ret;
-
-
- /**
* Row ID of the transfer.
*/
unsigned long long row_id;
@@ -730,7 +739,6 @@ wire_confirm_cb (void *cls,
const char *emsg)
{
struct WirePrepareData *wpd = cls;
- int *global_ret = wpd->global_ret;
struct TALER_EXCHANGEDB_Session *session = wpd->session;
wpd->eh = NULL;
@@ -741,7 +749,7 @@ wire_confirm_cb (void *cls,
emsg);
db_plugin->rollback (db_plugin->cls,
session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
return;
}
@@ -753,7 +761,7 @@ wire_confirm_cb (void *cls,
GNUNET_break (0); /* why!? */
db_plugin->rollback (db_plugin->cls,
session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
return;
}
@@ -766,13 +774,13 @@ wire_confirm_cb (void *cls,
"Failed to commit database transaction!\n");
/* try again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
/* continue with #run_transfers(), just to guard
against the unlikely case that there are more. */
task = GNUNET_SCHEDULER_add_now (&run_transfers,
- &global_ret);
+ NULL);
}
@@ -792,7 +800,6 @@ wire_prepare_cb (void *cls,
size_t buf_size)
{
struct WirePrepareData *wpd = cls;
- int *global_ret = wpd->global_ret;
wpd->row_id = rowid;
wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls,
@@ -808,7 +815,7 @@ wire_prepare_cb (void *cls,
GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls,
wpd->session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
return;
}
@@ -819,14 +826,13 @@ wire_prepare_cb (void *cls,
* Execute the wire transfers that we have committed to
* do.
*
- * @param cls pointer to an `int` which we will return from main()
+ * @param cls NULL
* @param tc scheduler context
*/
static void
run_transfers (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- int *global_ret = cls;
int ret;
struct WirePrepareData *wpd;
struct TALER_EXCHANGEDB_Session *session;
@@ -838,7 +844,7 @@ run_transfers (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database session!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
if (GNUNET_OK !=
@@ -847,12 +853,11 @@ run_transfers (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
wpd = GNUNET_new (struct WirePrepareData);
wpd->session = session;
- wpd->global_ret = global_ret;
ret = db_plugin->wire_prepare_data_get (db_plugin->cls,
session,
exchange_wireformat,
@@ -863,7 +868,7 @@ run_transfers (void *cls,
GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls,
session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
return;
}
@@ -873,7 +878,7 @@ run_transfers (void *cls,
db_plugin->rollback (db_plugin->cls,
session);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
GNUNET_free (wpd);
return;
}
@@ -882,6 +887,24 @@ run_transfers (void *cls,
/**
+ * First task.
+ *
+ * @param cls closure, NULL
+ * @param tc scheduler context
+ */
+static void
+run (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ cls);
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ &shutdown_task,
+ cls);
+}
+
+
+/**
* The main function of the taler-exchange-httpd server ("the exchange").
*
* @param argc number of arguments from the command line
@@ -907,7 +930,6 @@ main (int argc,
GNUNET_GETOPT_OPTION_VERSION (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END
};
- int ret = GNUNET_OK;
GNUNET_assert (GNUNET_OK ==
GNUNET_log_setup ("taler-exchange-aggregator",
@@ -929,12 +951,12 @@ main (int argc,
{
return 1;
}
-
- GNUNET_SCHEDULER_run (&run_transfers, &ret);
+ global_ret = GNUNET_OK;
+ GNUNET_SCHEDULER_run (&run, NULL);
TALER_EXCHANGEDB_plugin_unload (db_plugin);
TALER_WIRE_plugin_unload (wire_plugin);
- return (GNUNET_SYSERR == ret) ? 1 : 0;
+ return (GNUNET_SYSERR == global_ret) ? 1 : 0;
}
/* end of taler-exchange-aggregator.c */