summaryrefslogtreecommitdiff
path: root/src/exchange
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-04-07 09:33:04 +0200
committerChristian Grothoff <christian@grothoff.org>2016-04-07 09:33:04 +0200
commit148dda09d4218d40e2e0cc9d8fe9212aede969f5 (patch)
treef06e626d5171235923a205654296d459fec509f8 /src/exchange
parent8df75214f459fd39ce43540dc604733c1a47515e (diff)
parenta96c7177aae60c37041406ff0879992e58ef2f50 (diff)
downloadexchange-148dda09d4218d40e2e0cc9d8fe9212aede969f5.tar.gz
exchange-148dda09d4218d40e2e0cc9d8fe9212aede969f5.tar.bz2
exchange-148dda09d4218d40e2e0cc9d8fe9212aede969f5.zip
Merge branch 'master' of git+ssh://taler.net/var/git/exchange
Diffstat (limited to 'src/exchange')
-rw-r--r--src/exchange/taler-exchange-aggregator.c436
-rw-r--r--src/exchange/test_taler_exchange_aggregator.c22
2 files changed, 276 insertions, 182 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index a6b7de211..def6d0be4 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -18,10 +18,6 @@
* @file taler-exchange-aggregator.c
* @brief Process that aggregates outgoing transactions and executes them
* @author Christian Grothoff
- *
- * TODO:
- * - simplify global_ret: make it a global!
- * - handle shutdown more nicely (call 'cancel' method on wire transfers)
*/
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
@@ -32,6 +28,102 @@
#include "taler_json_lib.h"
#include "taler_wire_lib.h"
+
+/**
+ * Data we keep to #run_transfers(). There is at most
+ * one of these around at any given point in time.
+ */
+struct WirePrepareData
+{
+
+ /**
+ * Database session for all of our transactions.
+ */
+ struct TALER_EXCHANGEDB_Session *session;
+
+ /**
+ * Wire execution handle.
+ */
+ struct TALER_WIRE_ExecuteHandle *eh;
+
+ /**
+ * Row ID of the transfer.
+ */
+ unsigned long long row_id;
+
+};
+
+
+/**
+ * Information about one aggregation process to be executed. There is
+ * at most one of these around at any given point in time.
+ */
+struct AggregationUnit
+{
+ /**
+ * Public key of the merchant.
+ */
+ struct TALER_MerchantPublicKeyP merchant_pub;
+
+ /**
+ * Total amount to be transferred.
+ */
+ struct TALER_Amount total_amount;
+
+ /**
+ * Hash of @e wire.
+ */
+ struct GNUNET_HashCode h_wire;
+
+ /**
+ * Wire transfer identifier we use.
+ */
+ struct TALER_WireTransferIdentifierRawP wtid;
+
+ /**
+ * Row ID of the transaction that started it all.
+ */
+ unsigned long long row_id;
+
+ /**
+ * The current time.
+ */
+ struct GNUNET_TIME_Absolute execution_time;
+
+ /**
+ * Wire details of the merchant.
+ */
+ json_t *wire;
+
+ /**
+ * Database session for all of our transactions.
+ */
+ struct TALER_EXCHANGEDB_Session *session;
+
+ /**
+ * Wire preparation handle.
+ */
+ struct TALER_WIRE_PrepareHandle *ph;
+
+ /**
+ * Array of #aggregation_limit row_ids from the
+ * aggregation.
+ */
+ unsigned long long *additional_rows;
+
+ /**
+ * Offset specifying how many #additional_rows are in use.
+ */
+ unsigned int rows_offset;
+
+ /**
+ * Set to #GNUNET_YES if we have to abort due to failure.
+ */
+ int failed;
+
+};
+
+
/**
* Which currency is used by this exchange?
*/
@@ -63,11 +155,29 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct TALER_WIRE_Plugin *wire_plugin;
/**
- * Task for the main #run() function.
+ * Next task to run, if any.
*/
static struct GNUNET_SCHEDULER_Task *task;
/**
+ * If we are currently executing a transfer, information about
+ * the active transfer is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd;
+
+/**
+ * If we are currently aggregating transactions, information about the
+ * active aggregation is here. Otherwise, this variable is NULL.
+ */
+static struct AggregationUnit *au;
+
+/**
+ * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
+ * on serious errors.
+ */
+static int global_ret;
+
+/**
* #GNUNET_YES if we are in test mode and are using temporary tables.
*/
static int test_mode;
@@ -78,9 +188,59 @@ static int test_mode;
* of the smallest possible unit are aggregated, they do surpass the
* "tiny" threshold beyond which we never trigger a wire transaction!
*
- * TODO: make configurable (via config file or command line option)
+ * Note: do not change here, Postgres requires us to hard-code the
+ * LIMIT in the prepared statement.
*/
-static unsigned int aggregation_limit = 10000;
+static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT;
+
+
+/**
+ * We're being aborted with CTRL-C (or SIGTERM). Shut down.
+ *
+ * @param cls closure
+ * @param tc scheduler context
+ */
+static void
+shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (NULL != task)
+ {
+ GNUNET_SCHEDULER_cancel (task);
+ task = NULL;
+ }
+ if (NULL != wpd)
+ {
+ if (NULL != wpd->eh)
+ {
+ wire_plugin->execute_wire_transfer_cancel (wire_plugin->cls,
+ wpd->eh);
+ wpd->eh = NULL;
+ }
+ db_plugin->rollback (db_plugin->cls,
+ wpd->session);
+ GNUNET_free (wpd);
+ wpd = NULL;
+ }
+ if (NULL != au)
+ {
+ if (NULL != au->ph)
+ {
+ wire_plugin->prepare_wire_transfer_cancel (wire_plugin->cls,
+ au->ph);
+ au->ph = NULL;
+ }
+ db_plugin->rollback (db_plugin->cls,
+ au->session);
+ GNUNET_free_non_null (au->additional_rows);
+ if (NULL != au->wire)
+ json_decref (au->wire);
+ au = NULL;
+ GNUNET_free (au);
+ }
+ TALER_EXCHANGEDB_plugin_unload (db_plugin);
+ TALER_WIRE_plugin_unload (wire_plugin);
+}
/**
@@ -159,85 +319,10 @@ exchange_serve_process_config (const char *exchange_directory)
/**
- * Information about one aggregation process to
- * be executed.
- */
-struct AggregationUnit
-{
- /**
- * Public key of the merchant.
- */
- struct TALER_MerchantPublicKeyP merchant_pub;
-
- /**
- * Total amount to be transferred.
- */
- struct TALER_Amount total_amount;
-
- /**
- * Hash of @e wire.
- */
- struct GNUNET_HashCode h_wire;
-
- /**
- * Wire transfer identifier we use.
- */
- struct TALER_WireTransferIdentifierRawP wtid;
-
- /**
- * Row ID of the transaction that started it all.
- */
- unsigned long long row_id;
-
- /**
- * The current time.
- */
- struct GNUNET_TIME_Absolute execution_time;
-
- /**
- * Wire details of the merchant.
- */
- json_t *wire;
-
- /**
- * Database session for all of our transactions.
- */
- struct TALER_EXCHANGEDB_Session *session;
-
- /**
- * Wire preparation handle.
- */
- struct TALER_WIRE_PrepareHandle *ph;
-
- /**
- * Array of #aggregation_limit row_ids from the
- * aggregation.
- */
- unsigned long long *additional_rows;
-
- /**
- * Pointer to global return value. Closure for #run().
- */
- int *global_ret;
-
- /**
- * Offset specifying how many #additional_rows are in use.
- */
- unsigned int rows_offset;
-
- /**
- * Set to #GNUNET_YES if we have to abort due to failure.
- */
- int failed;
-
-};
-
-
-/**
* Function called with details about deposits that have been made,
* with the goal of executing the corresponding wire transaction.
*
- * @param cls closure with the `struct AggregationUnit`
+ * @param cls NULL
* @param row_id identifies database entry
* @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
@@ -262,8 +347,6 @@ deposit_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire)
{
- struct AggregationUnit *au = cls;
-
au->merchant_pub = *merchant_pub;
if (GNUNET_OK !=
TALER_amount_subtract (&au->total_amount,
@@ -284,6 +367,9 @@ deposit_cb (void *cls,
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&au->wtid,
sizeof (au->wtid));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Starting aggregation under WTID %s\n",
+ TALER_B2S (&au->wtid));
if (GNUNET_OK !=
db_plugin->insert_aggregation_tracking (db_plugin->cls,
au->session,
@@ -313,12 +399,11 @@ deposit_cb (void *cls,
}
-
/**
* Function called with details about another deposit we
* can aggregate into an existing aggregation unit.
*
- * @param cls closure with the `struct AggregationUnit`
+ * @param cls NULL
* @param row_id identifies database entry
* @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
@@ -343,7 +428,6 @@ aggregate_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire)
{
- struct AggregationUnit *au = cls;
struct TALER_Amount delta;
GNUNET_break (0 ==
@@ -432,27 +516,28 @@ prepare_cb (void *cls,
* Main work function that queries the DB and aggregates transactions
* into larger wire transfers.
*
- * @param cls pointer to an `int` which we will return from main()
+ * @param cls NULL
* @param tc scheduler context
*/
static void
run_aggregation (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- int *global_ret = cls;
struct TALER_EXCHANGEDB_Session *session;
- struct AggregationUnit *au;
unsigned int i;
int ret;
+ task = NULL;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Checking for ready deposits to aggregate\n");
if (NULL == (session = db_plugin->get_session (db_plugin->cls,
test_mode)))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database session!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
if (GNUNET_OK !=
@@ -461,7 +546,7 @@ run_aggregation (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
au = GNUNET_new (struct AggregationUnit);
@@ -475,15 +560,18 @@ run_aggregation (void *cls,
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
db_plugin->rollback (db_plugin->cls,
session);
if (0 != ret)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No more ready deposits, going to sleep\n");
if (GNUNET_YES == test_mode)
{
/* in test mode, shutdown if we end up being idle */
@@ -494,11 +582,14 @@ run_aggregation (void *cls,
/* nothing to do, sleep for a minute and try again */
task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
&run_aggregation,
- global_ret);
+ NULL);
}
return;
}
/* Now try to find other deposits to aggregate */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Found ready deposit for %s, aggregating\n",
+ TALER_B2S (&au->merchant_pub));
ret = db_plugin->iterate_matching_deposits (db_plugin->cls,
session,
&au->h_wire,
@@ -515,9 +606,10 @@ run_aggregation (void *cls,
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
db_plugin->rollback (db_plugin->cls,
session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
/* Round to the unit supported by the wire transfer method */
@@ -528,6 +620,8 @@ run_aggregation (void *cls,
if ( (0 == au->total_amount.value) &&
(0 == au->total_amount.fraction) )
{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Aggregate value too low for transfer\n");
/* Rollback ongoing transaction, as we will not use the respective
WTID and thus need to remove the tracking data */
db_plugin->rollback (db_plugin->cls,
@@ -540,11 +634,12 @@ run_aggregation (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
return;
}
/* Mark transactions by row_id as minor */
@@ -573,21 +668,27 @@ run_aggregation (void *cls,
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
- au->global_ret = global_ret;
+ {
+ char *amount_s;
+
+ amount_s = TALER_amount_to_string (&au->total_amount);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Preparing wire transfer of %s to %s\n",
+ amount_s,
+ TALER_B2S (&au->merchant_pub));
+ }
au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls,
au->wire,
&au->total_amount,
&au->wtid,
&prepare_cb,
au);
- /* FIXME: currently we have no clean-up plan on
- shutdown to call prepare_wire_transfer_cancel!
- Maybe make 'au' global? */
if (NULL == au->ph)
{
GNUNET_break (0); /* why? how to best recover? */
@@ -596,10 +697,11 @@ run_aggregation (void *cls,
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
json_decref (au->wire);
+ au = NULL;
GNUNET_free (au);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
/* otherwise we continue with #prepare_cb(), see below */
@@ -621,7 +723,7 @@ run_transfers (void *cls,
/**
* Function to be called with the prepared transfer data.
*
- * @param cls closure with the `struct AggregationUnit`
+ * @param cls NULL
* @param buf transaction data to persist, NULL on error
* @param buf_size number of bytes in @a buf, 0 on error
*/
@@ -630,14 +732,13 @@ prepare_cb (void *cls,
const char *buf,
size_t buf_size)
{
- struct AggregationUnit *au = cls;
- int *global_ret = au->global_ret;
struct TALER_EXCHANGEDB_Session *session = au->session;
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
if (NULL == buf)
{
GNUNET_break (0); /* why? how to best recover? */
@@ -645,7 +746,7 @@ prepare_cb (void *cls,
session);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
@@ -662,7 +763,7 @@ prepare_cb (void *cls,
session);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
@@ -676,50 +777,21 @@ prepare_cb (void *cls,
"Failed to commit database transaction!\n");
/* try again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
-
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Preparation complete, switching to transfer mode\n");
/* run alternative task: actually do wire transfer! */
task = GNUNET_SCHEDULER_add_now (&run_transfers,
- &global_ret);
+ NULL);
}
/**
- * Data we keep to #run_transfers().
- */
-struct WirePrepareData
-{
-
- /**
- * Database session for all of our transactions.
- */
- struct TALER_EXCHANGEDB_Session *session;
-
- /**
- * Wire execution handle.
- */
- struct TALER_WIRE_ExecuteHandle *eh;
-
- /**
- * Pointer to global return value. Closure for #run().
- */
- int *global_ret;
-
-
- /**
- * Row ID of the transfer.
- */
- unsigned long long row_id;
-
-};
-
-
-/**
* Function called with the result from the execute step.
*
- * @param cls closure with the `struct WirePrepareData`
+ * @param cls NULL
* @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure
* @param emsg NULL on success, otherwise an error message
*/
@@ -728,8 +800,6 @@ wire_confirm_cb (void *cls,
int success,
const char *emsg)
{
- struct WirePrepareData *wpd = cls;
- int *global_ret = wpd->global_ret;
struct TALER_EXCHANGEDB_Session *session = wpd->session;
wpd->eh = NULL;
@@ -740,8 +810,9 @@ wire_confirm_cb (void *cls,
emsg);
db_plugin->rollback (db_plugin->cls,
session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
if (GNUNET_OK !=
@@ -752,11 +823,13 @@ wire_confirm_cb (void *cls,
GNUNET_break (0); /* why!? */
db_plugin->rollback (db_plugin->cls,
session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
GNUNET_free (wpd);
+ wpd = NULL;
if (GNUNET_OK !=
db_plugin->commit (db_plugin->cls,
session))
@@ -765,13 +838,15 @@ wire_confirm_cb (void *cls,
"Failed to commit database transaction!\n");
/* try again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Wire transfer complete\n");
/* continue with #run_transfers(), just to guard
against the unlikely case that there are more. */
task = GNUNET_SCHEDULER_add_now (&run_transfers,
- &global_ret);
+ NULL);
}
@@ -779,7 +854,7 @@ wire_confirm_cb (void *cls,
/**
* Callback with data about a prepared transaction.
*
- * @param cls closure with the `struct WirePrepareData`
+ * @param cls NULL
* @param rowid row identifier used to mark prepared transaction as done
* @param buf transaction data that was persisted, NULL on error
* @param buf_size number of bytes in @a buf, 0 on error
@@ -790,25 +865,23 @@ wire_prepare_cb (void *cls,
const char *buf,
size_t buf_size)
{
- struct WirePrepareData *wpd = cls;
- int *global_ret = wpd->global_ret;
-
wpd->row_id = rowid;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Starting wire transfer %llu\n",
+ rowid);
wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls,
buf,
buf_size,
&wire_confirm_cb,
- wpd);
- /* FIXME: currently we have no clean-up plan on
- shutdown to call execute_wire_transfer_cancel!
- Maybe make 'wpd' global? */
+ NULL);
if (NULL == wpd->eh)
{
GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls,
wpd->session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
}
@@ -818,18 +891,19 @@ wire_prepare_cb (void *cls,
* Execute the wire transfers that we have committed to
* do.
*
- * @param cls pointer to an `int` which we will return from main()
+ * @param cls NULL
* @param tc scheduler context
*/
static void
run_transfers (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- int *global_ret = cls;
int ret;
- struct WirePrepareData *wpd;
struct TALER_EXCHANGEDB_Session *session;
+ task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Checking for pending wire transfers\n");
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
if (NULL == (session = db_plugin->get_session (db_plugin->cls,
@@ -837,7 +911,7 @@ run_transfers (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database session!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
if (GNUNET_OK !=
@@ -846,34 +920,37 @@ run_transfers (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
return;
}
wpd = GNUNET_new (struct WirePrepareData);
wpd->session = session;
- wpd->global_ret = global_ret;
ret = db_plugin->wire_prepare_data_get (db_plugin->cls,
session,
exchange_wireformat,
&wire_prepare_cb,
- wpd);
+ NULL);
if (GNUNET_SYSERR == ret)
{
GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls,
session);
- *global_ret = GNUNET_SYSERR;
+ global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
if (GNUNET_NO == ret)
{
/* no more prepared wire transfers, go back to aggregation! */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No more pending wire transfers, starting aggregation\n");
db_plugin->rollback (db_plugin->cls,
session);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- global_ret);
+ NULL);
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
/* otherwise, continues in #wire_prepare_cb() */
@@ -881,6 +958,24 @@ run_transfers (void *cls,
/**
+ * First task.
+ *
+ * @param cls closure, NULL
+ * @param tc scheduler context
+ */
+static void
+run (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ &shutdown_task,
+ cls);
+}
+
+
+/**
* The main function of the taler-exchange-httpd server ("the exchange").
*
* @param argc number of arguments from the command line
@@ -906,7 +1001,6 @@ main (int argc,
GNUNET_GETOPT_OPTION_VERSION (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END
};
- int ret = GNUNET_OK;
GNUNET_assert (GNUNET_OK ==
GNUNET_log_setup ("taler-exchange-aggregator",
@@ -928,12 +1022,10 @@ main (int argc,
{
return 1;
}
+ global_ret = GNUNET_OK;
+ GNUNET_SCHEDULER_run (&run, NULL);
- GNUNET_SCHEDULER_run (&run_transfers, &ret);
-
- TALER_EXCHANGEDB_plugin_unload (db_plugin);
- TALER_WIRE_plugin_unload (wire_plugin);
- return (GNUNET_SYSERR == ret) ? 1 : 0;
+ return (GNUNET_SYSERR == global_ret) ? 1 : 0;
}
/* end of taler-exchange-aggregator.c */
diff --git a/src/exchange/test_taler_exchange_aggregator.c b/src/exchange/test_taler_exchange_aggregator.c
index 22bc8e277..f34dea837 100644
--- a/src/exchange/test_taler_exchange_aggregator.c
+++ b/src/exchange/test_taler_exchange_aggregator.c
@@ -668,25 +668,19 @@ run_test ()
.label = "run-aggregator-deposit-1"
},
- /* The above step is already known to fail (with an error message)
- right now, so we skip the rest of the test. */
- {
- .opcode = OPCODE_TERMINATE_SKIP,
- .label = "testcase-incomplete-terminating-with-skip"
- },
-
-
{
.opcode = OPCODE_EXPECT_TRANSACTION,
.label = "expect-deposit-1",
- .details.expect_transaction.debit_account = 1,
+ .details.expect_transaction.debit_account = 3,
.details.expect_transaction.credit_account = 4,
.details.expect_transaction.amount = "EUR:1"
},
+
{
.opcode = OPCODE_EXPECT_TRANSACTIONS_EMPTY,
.label = "expect-empty-transactions-on-start"
},
+
/* test idempotency: run again on transactions already done */
{
.opcode = OPCODE_DATABASE_DEPOSIT,
@@ -704,6 +698,11 @@ run_test ()
},
{
+ .opcode = OPCODE_TERMINATE_SUCCESS,
+ .label = "testcase-incomplete-terminating-with-skip"
+ },
+
+ {
.opcode = OPCODE_TERMINATE_SKIP,
.label = "testcase-incomplete-terminating-with-skip"
},
@@ -804,7 +803,6 @@ handle_mhd_request (void *cls,
GNUNET_break_op (0);
return MHD_NO;
}
- /* FIXME: to be implemented! */
pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX,
con_cls,
upload_data,
@@ -848,6 +846,10 @@ handle_mhd_request (void *cls,
transactions_tail,
t);
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Receiving incoming wire transfer: %llu->%llu\n",
+ (unsigned long long) t->debit_account,
+ (unsigned long long) t->credit_account);
json_decref (json);
resp = MHD_create_response_from_buffer (0, "", MHD_RESPMEM_PERSISTENT);
ret = MHD_queue_response (connection,