summaryrefslogtreecommitdiff
path: root/src/mint/taler-mint-aggregator.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-01-28 03:58:21 +0100
committerChristian Grothoff <christian@grothoff.org>2016-01-28 03:58:21 +0100
commit46d9cc367bdc9bf8cda7ae12e78ea0a2e0853d36 (patch)
treeff13f79a4c570c1fe63050665e3c3398dc6d9a11 /src/mint/taler-mint-aggregator.c
parent4506b4878ffb4f827cd0be0ab7587ed3654edd0d (diff)
downloadexchange-46d9cc367bdc9bf8cda7ae12e78ea0a2e0853d36.tar.gz
exchange-46d9cc367bdc9bf8cda7ae12e78ea0a2e0853d36.tar.bz2
exchange-46d9cc367bdc9bf8cda7ae12e78ea0a2e0853d36.zip
finishing core logic for #4141, but untested
Diffstat (limited to 'src/mint/taler-mint-aggregator.c')
-rw-r--r--src/mint/taler-mint-aggregator.c661
1 files changed, 633 insertions, 28 deletions
diff --git a/src/mint/taler-mint-aggregator.c b/src/mint/taler-mint-aggregator.c
index ee0f6ab22..5e05c8673 100644
--- a/src/mint/taler-mint-aggregator.c
+++ b/src/mint/taler-mint-aggregator.c
@@ -18,6 +18,10 @@
* @file taler-mint-aggregator.c
* @brief Process that aggregates outgoing transactions and executes them
* @author Christian Grothoff
+ *
+ * TODO:
+ * - simplify global_ret: make it a global!
+ * - handle shutdown more nicely (call 'cancel' method on wire transfers)
*/
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
@@ -62,6 +66,16 @@ static struct TALER_WIRE_Plugin *wire_plugin;
*/
static struct GNUNET_SCHEDULER_Task *task;
+/**
+ * Limit on the number of transactions we aggregate at once. Note
+ * that the limit must be big enough to ensure that when transactions
+ * of the smallest possible unit are aggregated, they do surpass the
+ * "tiny" threshold beyond which we never trigger a wire transaction!
+ *
+ * TODO: make configurable (via config file or command line option)
+ */
+static unsigned int aggregation_limit = 10000;
+
/**
* Load configuration parameters for the mint
@@ -144,10 +158,85 @@ mint_serve_process_config (const char *mint_directory)
/**
+ * Information about one aggregation process to
+ * be executed.
+ */
+struct AggregationUnit
+{
+ /**
+ * Public key of the merchant.
+ */
+ struct TALER_MerchantPublicKeyP merchant_pub;
+
+ /**
+ * Total amount to be transferred.
+ */
+ struct TALER_Amount total_amount;
+
+ /**
+ * Hash of @e wire.
+ */
+ struct GNUNET_HashCode h_wire;
+
+ /**
+ * Wire transfer identifier we use.
+ */
+ struct TALER_WireTransferIdentifierRawP wtid;
+
+ /**
+ * Row ID of the transaction that started it all.
+ */
+ unsigned long long row_id;
+
+ /**
+ * The current time.
+ */
+ struct GNUNET_TIME_Absolute execution_time;
+
+ /**
+ * Wire details of the merchant.
+ */
+ json_t *wire;
+
+ /**
+ * Database session for all of our transactions.
+ */
+ struct TALER_MINTDB_Session *session;
+
+ /**
+ * Wire preparation handle.
+ */
+ struct TALER_WIRE_PrepareHandle *ph;
+
+ /**
+ * Array of #aggregation_limit row_ids from the
+ * aggregation.
+ */
+ unsigned long long *additional_rows;
+
+ /**
+ * Pointer to global return value. Closure for #run().
+ */
+ int *global_ret;
+
+ /**
+ * Offset specifying how many #additional_rows are in use.
+ */
+ unsigned int rows_offset;
+
+ /**
+ * Set to #GNUNET_YES if we have to abort due to failure.
+ */
+ int failed;
+
+};
+
+
+/**
* Function called with details about deposits that have been made,
* with the goal of executing the corresponding wire transaction.
*
- * @param cls closure
+ * @param cls closure with the `struct AggregationUnit`
* @param row_id identifies database entry
* @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
@@ -172,20 +261,187 @@ deposit_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire)
{
- /* FIXME: compute aggregates, etc. */
+ struct AggregationUnit *au = cls;
+
+ au->merchant_pub = *merchant_pub;
+ if (GNUNET_OK !=
+ TALER_amount_subtract (&au->total_amount,
+ amount_with_fee,
+ deposit_fee))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Fatally malformed record at %llu\n",
+ row_id);
+ return GNUNET_SYSERR;
+ }
+ au->row_id = row_id;
+ au->wire = (json_t *) wire;
+ au->execution_time = GNUNET_TIME_absolute_get ();
+ TALER_hash_json (au->wire,
+ &au->h_wire);
+ json_incref (au->wire);
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &au->wtid,
+ sizeof (au->wtid));
+ if (GNUNET_OK !=
+ db_plugin->insert_aggregation_tracking (db_plugin->cls,
+ au->session,
+ &au->wtid,
+ merchant_pub,
+ &au->h_wire,
+ h_contract,
+ transaction_id,
+ au->execution_time,
+ coin_pub,
+ amount_with_fee,
+ deposit_fee))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ db_plugin->mark_deposit_done (db_plugin->cls,
+ au->session,
+ row_id))
+ {
+ GNUNET_break (0);
+ au->failed = GNUNET_YES;
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+
+/**
+ * Function called with details about another deposit we
+ * can aggregate into an existing aggregation unit.
+ *
+ * @param cls closure with the `struct AggregationUnit`
+ * @param row_id identifies database entry
+ * @param merchant_pub public key of the merchant
+ * @param coin_pub public key of the coin
+ * @param amount_with_fee amount that was deposited including fee
+ * @param deposit_fee amount the mint gets to keep as transaction fees
+ * @param transaction_id unique transaction ID chosen by the merchant
+ * @param h_contract hash of the contract between merchant and customer
+ * @param wire_deadline by which the merchant adviced that he would like the
+ * wire transfer to be executed
+ * @param wire wire details for the merchant
+ * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ */
+static int
+aggregate_cb (void *cls,
+ unsigned long long row_id,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
+ const struct TALER_Amount *amount_with_fee,
+ const struct TALER_Amount *deposit_fee,
+ uint64_t transaction_id,
+ const struct GNUNET_HashCode *h_contract,
+ struct GNUNET_TIME_Absolute wire_deadline,
+ const json_t *wire)
+{
+ struct AggregationUnit *au = cls;
+ struct TALER_Amount delta;
+
+ GNUNET_break (0 ==
+ memcmp (&au->merchant_pub,
+ merchant_pub,
+ sizeof (struct TALER_MerchantPublicKeyP)));
+ /* compute contribution of this coin after fees */
+ if (GNUNET_OK !=
+ TALER_amount_subtract (&delta,
+ amount_with_fee,
+ deposit_fee))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Fatally malformed record at %llu\n",
+ row_id);
+ return GNUNET_SYSERR;
+ }
+ /* add to total */
+ if (GNUNET_OK !=
+ TALER_amount_add (&au->total_amount,
+ &au->total_amount,
+ &delta))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Overflow or currency incompatibility during aggregation at %llu\n",
+ row_id);
+ /* Skip this one, but keep going! */
+ return GNUNET_OK;
+ }
+ if (au->rows_offset >= aggregation_limit)
+ {
+ /* Bug: we asked for at most #aggregation_limit results! */
+ GNUNET_break (0);
+ /* Skip this one, but keep going. */
+ return GNUNET_OK;
+ }
+ if (NULL == au->additional_rows)
+ au->additional_rows = GNUNET_new_array (aggregation_limit,
+ unsigned long long);
+ /* "append" to our list of rows */
+ au->additional_rows[au->rows_offset++] = row_id;
+ /* insert into aggregation tracking table */
+ if (GNUNET_OK !=
+ db_plugin->insert_aggregation_tracking (db_plugin->cls,
+ au->session,
+ &au->wtid,
+ merchant_pub,
+ &au->h_wire,
+ h_contract,
+ transaction_id,
+ au->execution_time,
+ coin_pub,
+ amount_with_fee,
+ deposit_fee))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ db_plugin->mark_deposit_done (db_plugin->cls,
+ au->session,
+ row_id))
+ {
+ GNUNET_break (0);
+ au->failed = GNUNET_YES;
+ return GNUNET_SYSERR;
+ }
return GNUNET_OK;
}
/**
- * Main work function that queries the DB and executes transactions.
+ * Function to be called with the prepared transfer data.
+ *
+ * @param cls closure with the `struct AggregationUnit`
+ * @param buf transaction data to persist, NULL on error
+ * @param buf_size number of bytes in @a buf, 0 on error
+ */
+static void
+prepare_cb (void *cls,
+ const char *buf,
+ size_t buf_size);
+
+
+/**
+ * Main work function that queries the DB and aggregates transactions
+ * into larger wire transfers.
+ *
+ * @param cls pointer to an `int` which we will return from main()
+ * @param tc scheduler context
*/
static void
-run (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+run_aggregation (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
int *global_ret = cls;
struct TALER_MINTDB_Session *session;
+ struct AggregationUnit *au;
+ unsigned int i;
int ret;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
@@ -207,50 +463,399 @@ run (void *cls,
*global_ret = GNUNET_SYSERR;
return;
}
+ au = GNUNET_new (struct AggregationUnit);
+ au->session = session;
ret = db_plugin->get_ready_deposit (db_plugin->cls,
session,
&deposit_cb,
- NULL);
- // FIXME: handle 0 == ret...
-
+ au);
if (GNUNET_OK != ret)
{
+ GNUNET_free (au);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ if (0 != ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to execute deposit iteration!\n");
+ *global_ret = GNUNET_SYSERR;
+ return;
+ }
+ /* nothing to do, sleep for a minute and try again */
+ task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+ &run_aggregation,
+ global_ret);
+ return;
+ }
+ /* Now try to find other deposits to aggregate */
+ ret = db_plugin->iterate_matching_deposits (db_plugin->cls,
+ session,
+ &au->h_wire,
+ &au->merchant_pub,
+ &aggregate_cb,
+ au,
+ aggregation_limit);
+ if ( (GNUNET_SYSERR == ret) ||
+ (GNUNET_YES == au->failed) )
+ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n");
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ db_plugin->rollback (db_plugin->cls,
+ session);
*global_ret = GNUNET_SYSERR;
+ return;
+ }
+ /* Round to the unit supported by the wire transfer method */
+ GNUNET_assert (GNUNET_SYSERR !=
+ wire_plugin->amount_round (wire_plugin->cls,
+ &au->total_amount));
+ /* Check if after rounding down, we still have an amount to transfer */
+ if ( (0 == au->total_amount.value) &&
+ (0 == au->total_amount.fraction) )
+ {
+ /* Rollback ongoing transaction, as we will not use the respective
+ WTID and thus need to remove the tracking data */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ /* Start another transaction to mark all* of the selected deposits
+ *as minor! */
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ return;
+ }
+ /* Mark transactions by row_id as minor */
+ ret = GNUNET_OK;
+ if (GNUNET_OK !=
+ db_plugin->mark_deposit_tiny (db_plugin->cls,
+ session,
+ au->row_id))
+ ret = GNUNET_SYSERR;
+ else
+ for (i=0;i<au->rows_offset;i++)
+ if (GNUNET_OK !=
+ db_plugin->mark_deposit_tiny (db_plugin->cls,
+ session,
+ au->additional_rows[i]))
+ ret = GNUNET_SYSERR;
+ /* commit */
+ if (GNUNET_OK !=
+ db_plugin->commit (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit database transaction!\n");
+ }
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ /* start again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
+ }
+ au->global_ret = global_ret;
+ au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls,
+ au->wire,
+ &au->total_amount,
+ &au->wtid,
+ &prepare_cb,
+ au);
+ /* FIXME: currently we have no clean-up plan on
+ shutdown to call prepare_wire_transfer_cancel!
+ Maybe make 'au' global? */
+ if (NULL == au->ph)
+ {
+ GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls,
session);
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ /* start again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
return;
}
- /* FIXME: finish aggregate computation */
- /* wire_plugin->prepare_wire_transfer () -- ASYNC! */
- /* db_plugin->wire_prepare_data_insert () -- transactional! */
- /* db_plugin->XXX () -- mark transactions selected for aggregate as finished */
+ /* otherwise we continue with #prepare_cb(), see below */
+}
+
- /* then finally: commit! */
+/**
+ * Execute the wire transfers that we have committed to
+ * do.
+ *
+ * @param cls pointer to an `int` which we will return from main()
+ * @param tc scheduler context
+ */
+static void
+run_transfers (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Function to be called with the prepared transfer data.
+ *
+ * @param cls closure with the `struct AggregationUnit`
+ * @param buf transaction data to persist, NULL on error
+ * @param buf_size number of bytes in @a buf, 0 on error
+ */
+static void
+prepare_cb (void *cls,
+ const char *buf,
+ size_t buf_size)
+{
+ struct AggregationUnit *au = cls;
+ int *global_ret = au->global_ret;
+ struct TALER_MINTDB_Session *session = au->session;
+
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ if (NULL == buf)
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ /* start again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
+ }
+
+ /* Commit our intention to execute the wire transfer! */
+ if (GNUNET_OK !=
+ db_plugin->wire_prepare_data_insert (db_plugin->cls,
+ session,
+ mint_wireformat,
+ buf,
+ buf_size))
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ /* start again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
+ }
+
+ /* Now we can finally commit the overall transaction, as we are
+ again consistent if all of this passes. */
if (GNUNET_OK !=
db_plugin->commit (db_plugin->cls,
session))
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Failed to commit database transaction!\n");
+ /* try again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
}
- /* While possible, run 2nd type of transaction:
- db_plugin->start()
- - select pre-commit data from DB:
- db_plugin->wire_prepare_data_iterate ()
- - execute wire transfer (successfully!)
- wire_plugin->execute_wire_transfer() # ASYNC!
- db_plugin->wire_prepare_data_mark_finished ()
- db_plugin->insert_aggregation_tracking ()
- db_plugin->commit()
- */
+ /* run alternative task: actually do wire transfer! */
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ &global_ret);
+}
+
+
+/**
+ * Data we keep to #run_transfers().
+ */
+struct WirePrepareData
+{
+
+ /**
+ * Database session for all of our transactions.
+ */
+ struct TALER_MINTDB_Session *session;
+
+ /**
+ * Wire execution handle.
+ */
+ struct TALER_WIRE_ExecuteHandle *eh;
+
+ /**
+ * Pointer to global return value. Closure for #run().
+ */
+ int *global_ret;
+
+
+ /**
+ * Row ID of the transfer.
+ */
+ unsigned long long row_id;
+};
- task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS /* FIXME: adjust! */,
- &run,
- global_ret);
+
+/**
+ * Function called with the result from the execute step.
+ *
+ * @param cls closure with the `struct WirePrepareData`
+ * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ * @param emsg NULL on success, otherwise an error message
+ */
+static void
+wire_confirm_cb (void *cls,
+ int success,
+ const char *emsg)
+{
+ struct WirePrepareData *wpd = cls;
+ int *global_ret = wpd->global_ret;
+ struct TALER_MINTDB_Session *session = wpd->session;
+
+ wpd->eh = NULL;
+ if (GNUNET_SYSERR == success)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Wire transaction failed: %s\n",
+ emsg);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free (wpd);
+ return;
+ }
+ if (GNUNET_OK !=
+ db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
+ session,
+ wpd->row_id))
+ {
+ GNUNET_break (0); /* why!? */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free (wpd);
+ return;
+ }
+ GNUNET_free (wpd);
+ if (GNUNET_OK !=
+ db_plugin->commit (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Failed to commit database transaction!\n");
+ /* try again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
+ }
+ /* continue with #run_transfers(), just to guard
+ against the unlikely case that there are more. */
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ &global_ret);
+
+}
+
+
+/**
+ * Callback with data about a prepared transaction.
+ *
+ * @param cls closure with the `struct WirePrepareData`
+ * @param rowid row identifier used to mark prepared transaction as done
+ * @param buf transaction data that was persisted, NULL on error
+ * @param buf_size number of bytes in @a buf, 0 on error
+ */
+static void
+wire_prepare_cb (void *cls,
+ unsigned long long rowid,
+ const char *buf,
+ size_t buf_size)
+{
+ struct WirePrepareData *wpd = cls;
+ int *global_ret = wpd->global_ret;
+
+ wpd->row_id = rowid;
+ wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls,
+ buf,
+ buf_size,
+ &wire_confirm_cb,
+ wpd);
+ /* FIXME: currently we have no clean-up plan on
+ shutdown to call execute_wire_transfer_cancel!
+ Maybe make 'wpd' global? */
+ if (NULL == wpd->eh)
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ wpd->session);
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free (wpd);
+ return;
+ }
+}
+
+
+/**
+ * Execute the wire transfers that we have committed to
+ * do.
+ *
+ * @param cls pointer to an `int` which we will return from main()
+ * @param tc scheduler context
+ */
+static void
+run_transfers (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ int *global_ret = cls;
+ int ret;
+ struct WirePrepareData *wpd;
+ struct TALER_MINTDB_Session *session;
+
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+ if (NULL == (session = db_plugin->get_session (db_plugin->cls,
+ GNUNET_NO)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database session!\n");
+ *global_ret = GNUNET_SYSERR;
+ return;
+ }
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ *global_ret = GNUNET_SYSERR;
+ return;
+ }
+ wpd = GNUNET_new (struct WirePrepareData);
+ wpd->session = session;
+ wpd->global_ret = global_ret;
+ ret = db_plugin->wire_prepare_data_get (db_plugin->cls,
+ session,
+ mint_wireformat,
+ &wire_prepare_cb,
+ wpd);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free (wpd);
+ return;
+ }
+ if (GNUNET_NO == ret)
+ {
+ /* no more prepared wire transfers, go back to aggregation! */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ GNUNET_free (wpd);
+ return;
+ }
+ /* otherwise, continues in #wire_prepare_cb() */
}
@@ -299,7 +904,7 @@ main (int argc,
return 1;
}
- GNUNET_SCHEDULER_run (&run, &ret);
+ GNUNET_SCHEDULER_run (&run_transfers, &ret);
TALER_MINTDB_plugin_unload (db_plugin);
TALER_WIRE_plugin_unload (wire_plugin);