summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-01-28 03:58:21 +0100
committerChristian Grothoff <christian@grothoff.org>2016-01-28 03:58:21 +0100
commit46d9cc367bdc9bf8cda7ae12e78ea0a2e0853d36 (patch)
treeff13f79a4c570c1fe63050665e3c3398dc6d9a11
parent4506b4878ffb4f827cd0be0ab7587ed3654edd0d (diff)
downloadexchange-46d9cc367bdc9bf8cda7ae12e78ea0a2e0853d36.tar.gz
exchange-46d9cc367bdc9bf8cda7ae12e78ea0a2e0853d36.tar.bz2
exchange-46d9cc367bdc9bf8cda7ae12e78ea0a2e0853d36.zip
finishing core logic for #4141, but untested
-rw-r--r--src/include/taler_mint_service.h5
-rw-r--r--src/include/taler_mintdb_plugin.h6
-rw-r--r--src/include/taler_signatures.h14
-rw-r--r--src/mint-lib/mint_api_deposit_wtid.c9
-rw-r--r--src/mint/taler-mint-aggregator.c661
-rw-r--r--src/mint/taler-mint-httpd_db.c3
-rw-r--r--src/mint/taler-mint-httpd_responses.c5
-rw-r--r--src/mint/taler-mint-httpd_responses.h2
-rw-r--r--src/mintdb/plugin_mintdb_postgres.c25
9 files changed, 640 insertions, 90 deletions
diff --git a/src/include/taler_mint_service.h b/src/include/taler_mint_service.h
index b151cb00..1502edfb 100644
--- a/src/include/taler_mint_service.h
+++ b/src/include/taler_mint_service.h
@@ -1173,8 +1173,6 @@ struct TALER_MINT_DepositWtidHandle;
* yet execute the transaction
* @param execution_time actual or planned execution time for the wire transfer
* @param coin_contribution contribution to the @a total_amount of the deposited coin (may be NULL)
- * @param total_amount total amount of the wire transfer, or NULL if the mint could
- * not provide any @a wtid (set only if @a http_status is #MHD_HTTP_OK)
*/
typedef void
(*TALER_MINT_DepositWtidCallback)(void *cls,
@@ -1182,8 +1180,7 @@ typedef void
json_t *json,
const struct TALER_WireTransferIdentifierRawP *wtid,
struct GNUNET_TIME_Absolute execution_time,
- const struct TALER_Amount *coin_contribution,
- const struct TALER_Amount *total_amount);
+ const struct TALER_Amount *coin_contribution);
/**
diff --git a/src/include/taler_mintdb_plugin.h b/src/include/taler_mintdb_plugin.h
index 7c48114b..d2cc3d76 100644
--- a/src/include/taler_mintdb_plugin.h
+++ b/src/include/taler_mintdb_plugin.h
@@ -583,7 +583,6 @@ typedef void
* @param coin_contribution how much did the coin we asked about
* contribute to the total transfer value? (deposit value including fee)
* @param coin_fee how much did the mint charge for the deposit fee
- * @param total_amount how much was the total wire transfer?
* @param execution_time when was the transaction done, or
* when we expect it to be done (if @a wtid was NULL)
*/
@@ -592,7 +591,6 @@ typedef void
const struct TALER_WireTransferIdentifierRawP *wtid,
const struct TALER_Amount *coin_contribution,
const struct TALER_Amount *coin_fee,
- const struct TALER_Amount *total_amount,
struct GNUNET_TIME_Absolute execution_time);
@@ -1360,7 +1358,6 @@ struct TALER_MINTDB_Plugin
* @param coin_pub which public key was this payment about
* @param coin_value amount contributed by this coin in total
* @param coin_fee deposit fee charged by mint for this coin
- * @param transfer_value total amount of the wire transfer
* @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
*/
int
@@ -1374,8 +1371,7 @@ struct TALER_MINTDB_Plugin
struct GNUNET_TIME_Absolute execution_time,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *coin_value,
- const struct TALER_Amount *coin_fee,
- const struct TALER_Amount *transfer_value);
+ const struct TALER_Amount *coin_fee);
/**
diff --git a/src/include/taler_signatures.h b/src/include/taler_signatures.h
index 85c681da..2526597e 100644
--- a/src/include/taler_signatures.h
+++ b/src/include/taler_signatures.h
@@ -953,20 +953,6 @@ struct TALER_ConfirmWirePS
*/
struct TALER_AmountNBO coin_contribution;
- /**
- * The total amount the mint transferred in the transaction.
- * Note that we may be aggregating multiple coin's @e coin_contribution
- * values into a single wire transfer, so this value may be larger
- * than that of @e coin_contribution. It may also be smaller, as
- * @e coin_contribution may be say "1.123456" but the wire unit may
- * be rounded down, i.e. to "1.12" (depending on the transfer method).
- *
- * Note that the mint books the deltas from rounding down as profit,
- * so aggregating transfers is a good thing for the merchant (as it
- * reduces rounding down expenses).
- */
- struct TALER_AmountNBO total_amount;
-
};
GNUNET_NETWORK_STRUCT_END
diff --git a/src/mint-lib/mint_api_deposit_wtid.c b/src/mint-lib/mint_api_deposit_wtid.c
index 50f9c55d..d29f406e 100644
--- a/src/mint-lib/mint_api_deposit_wtid.c
+++ b/src/mint-lib/mint_api_deposit_wtid.c
@@ -148,9 +148,7 @@ handle_deposit_wtid_finished (void *cls,
const struct TALER_WireTransferIdentifierRawP *wtid = NULL;
struct GNUNET_TIME_Absolute execution_time = GNUNET_TIME_UNIT_FOREVER_ABS;
const struct TALER_Amount *coin_contribution = NULL;
- const struct TALER_Amount *total_amount = NULL;
struct TALER_Amount coin_contribution_s;
- struct TALER_Amount total_amount_s;
dwh->job = NULL;
json = MAC_download_get_result (&dwh->db,
@@ -166,7 +164,6 @@ handle_deposit_wtid_finished (void *cls,
MAJ_spec_fixed_auto ("wtid", &dwh->depconf.wtid),
MAJ_spec_absolute_time ("execution_time", &execution_time),
MAJ_spec_amount ("coin_contribution", &coin_contribution_s),
- MAJ_spec_amount ("total_amount", &total_amount_s),
MAJ_spec_end
};
@@ -183,9 +180,6 @@ handle_deposit_wtid_finished (void *cls,
TALER_amount_hton (&dwh->depconf.coin_contribution,
&coin_contribution_s);
coin_contribution = &coin_contribution_s;
- TALER_amount_hton (&dwh->depconf.total_amount,
- &total_amount_s);
- total_amount = &total_amount_s;
if (GNUNET_OK !=
verify_deposit_wtid_signature_ok (dwh,
json))
@@ -244,8 +238,7 @@ handle_deposit_wtid_finished (void *cls,
json,
wtid,
execution_time,
- coin_contribution,
- total_amount);
+ coin_contribution);
json_decref (json);
TALER_MINT_deposit_wtid_cancel (dwh);
}
diff --git a/src/mint/taler-mint-aggregator.c b/src/mint/taler-mint-aggregator.c
index ee0f6ab2..5e05c867 100644
--- a/src/mint/taler-mint-aggregator.c
+++ b/src/mint/taler-mint-aggregator.c
@@ -18,6 +18,10 @@
* @file taler-mint-aggregator.c
* @brief Process that aggregates outgoing transactions and executes them
* @author Christian Grothoff
+ *
+ * TODO:
+ * - simplify global_ret: make it a global!
+ * - handle shutdown more nicely (call 'cancel' method on wire transfers)
*/
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
@@ -62,6 +66,16 @@ static struct TALER_WIRE_Plugin *wire_plugin;
*/
static struct GNUNET_SCHEDULER_Task *task;
+/**
+ * Limit on the number of transactions we aggregate at once. Note
+ * that the limit must be big enough to ensure that when transactions
+ * of the smallest possible unit are aggregated, they do surpass the
+ * "tiny" threshold beyond which we never trigger a wire transaction!
+ *
+ * TODO: make configurable (via config file or command line option)
+ */
+static unsigned int aggregation_limit = 10000;
+
/**
* Load configuration parameters for the mint
@@ -144,10 +158,85 @@ mint_serve_process_config (const char *mint_directory)
/**
+ * Information about one aggregation process to
+ * be executed.
+ */
+struct AggregationUnit
+{
+ /**
+ * Public key of the merchant.
+ */
+ struct TALER_MerchantPublicKeyP merchant_pub;
+
+ /**
+ * Total amount to be transferred.
+ */
+ struct TALER_Amount total_amount;
+
+ /**
+ * Hash of @e wire.
+ */
+ struct GNUNET_HashCode h_wire;
+
+ /**
+ * Wire transfer identifier we use.
+ */
+ struct TALER_WireTransferIdentifierRawP wtid;
+
+ /**
+ * Row ID of the transaction that started it all.
+ */
+ unsigned long long row_id;
+
+ /**
+ * The current time.
+ */
+ struct GNUNET_TIME_Absolute execution_time;
+
+ /**
+ * Wire details of the merchant.
+ */
+ json_t *wire;
+
+ /**
+ * Database session for all of our transactions.
+ */
+ struct TALER_MINTDB_Session *session;
+
+ /**
+ * Wire preparation handle.
+ */
+ struct TALER_WIRE_PrepareHandle *ph;
+
+ /**
+ * Array of #aggregation_limit row_ids from the
+ * aggregation.
+ */
+ unsigned long long *additional_rows;
+
+ /**
+ * Pointer to global return value. Closure for #run().
+ */
+ int *global_ret;
+
+ /**
+ * Offset specifying how many #additional_rows are in use.
+ */
+ unsigned int rows_offset;
+
+ /**
+ * Set to #GNUNET_YES if we have to abort due to failure.
+ */
+ int failed;
+
+};
+
+
+/**
* Function called with details about deposits that have been made,
* with the goal of executing the corresponding wire transaction.
*
- * @param cls closure
+ * @param cls closure with the `struct AggregationUnit`
* @param row_id identifies database entry
* @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
@@ -172,20 +261,187 @@ deposit_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire)
{
- /* FIXME: compute aggregates, etc. */
+ struct AggregationUnit *au = cls;
+
+ au->merchant_pub = *merchant_pub;
+ if (GNUNET_OK !=
+ TALER_amount_subtract (&au->total_amount,
+ amount_with_fee,
+ deposit_fee))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Fatally malformed record at %llu\n",
+ row_id);
+ return GNUNET_SYSERR;
+ }
+ au->row_id = row_id;
+ au->wire = (json_t *) wire;
+ au->execution_time = GNUNET_TIME_absolute_get ();
+ TALER_hash_json (au->wire,
+ &au->h_wire);
+ json_incref (au->wire);
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &au->wtid,
+ sizeof (au->wtid));
+ if (GNUNET_OK !=
+ db_plugin->insert_aggregation_tracking (db_plugin->cls,
+ au->session,
+ &au->wtid,
+ merchant_pub,
+ &au->h_wire,
+ h_contract,
+ transaction_id,
+ au->execution_time,
+ coin_pub,
+ amount_with_fee,
+ deposit_fee))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ db_plugin->mark_deposit_done (db_plugin->cls,
+ au->session,
+ row_id))
+ {
+ GNUNET_break (0);
+ au->failed = GNUNET_YES;
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+
+/**
+ * Function called with details about another deposit we
+ * can aggregate into an existing aggregation unit.
+ *
+ * @param cls closure with the `struct AggregationUnit`
+ * @param row_id identifies database entry
+ * @param merchant_pub public key of the merchant
+ * @param coin_pub public key of the coin
+ * @param amount_with_fee amount that was deposited including fee
+ * @param deposit_fee amount the mint gets to keep as transaction fees
+ * @param transaction_id unique transaction ID chosen by the merchant
+ * @param h_contract hash of the contract between merchant and customer
+ * @param wire_deadline by which the merchant adviced that he would like the
+ * wire transfer to be executed
+ * @param wire wire details for the merchant
+ * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ */
+static int
+aggregate_cb (void *cls,
+ unsigned long long row_id,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
+ const struct TALER_Amount *amount_with_fee,
+ const struct TALER_Amount *deposit_fee,
+ uint64_t transaction_id,
+ const struct GNUNET_HashCode *h_contract,
+ struct GNUNET_TIME_Absolute wire_deadline,
+ const json_t *wire)
+{
+ struct AggregationUnit *au = cls;
+ struct TALER_Amount delta;
+
+ GNUNET_break (0 ==
+ memcmp (&au->merchant_pub,
+ merchant_pub,
+ sizeof (struct TALER_MerchantPublicKeyP)));
+ /* compute contribution of this coin after fees */
+ if (GNUNET_OK !=
+ TALER_amount_subtract (&delta,
+ amount_with_fee,
+ deposit_fee))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Fatally malformed record at %llu\n",
+ row_id);
+ return GNUNET_SYSERR;
+ }
+ /* add to total */
+ if (GNUNET_OK !=
+ TALER_amount_add (&au->total_amount,
+ &au->total_amount,
+ &delta))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Overflow or currency incompatibility during aggregation at %llu\n",
+ row_id);
+ /* Skip this one, but keep going! */
+ return GNUNET_OK;
+ }
+ if (au->rows_offset >= aggregation_limit)
+ {
+ /* Bug: we asked for at most #aggregation_limit results! */
+ GNUNET_break (0);
+ /* Skip this one, but keep going. */
+ return GNUNET_OK;
+ }
+ if (NULL == au->additional_rows)
+ au->additional_rows = GNUNET_new_array (aggregation_limit,
+ unsigned long long);
+ /* "append" to our list of rows */
+ au->additional_rows[au->rows_offset++] = row_id;
+ /* insert into aggregation tracking table */
+ if (GNUNET_OK !=
+ db_plugin->insert_aggregation_tracking (db_plugin->cls,
+ au->session,
+ &au->wtid,
+ merchant_pub,
+ &au->h_wire,
+ h_contract,
+ transaction_id,
+ au->execution_time,
+ coin_pub,
+ amount_with_fee,
+ deposit_fee))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ db_plugin->mark_deposit_done (db_plugin->cls,
+ au->session,
+ row_id))
+ {
+ GNUNET_break (0);
+ au->failed = GNUNET_YES;
+ return GNUNET_SYSERR;
+ }
return GNUNET_OK;
}
/**
- * Main work function that queries the DB and executes transactions.
+ * Function to be called with the prepared transfer data.
+ *
+ * @param cls closure with the `struct AggregationUnit`
+ * @param buf transaction data to persist, NULL on error
+ * @param buf_size number of bytes in @a buf, 0 on error
+ */
+static void
+prepare_cb (void *cls,
+ const char *buf,
+ size_t buf_size);
+
+
+/**
+ * Main work function that queries the DB and aggregates transactions
+ * into larger wire transfers.
+ *
+ * @param cls pointer to an `int` which we will return from main()
+ * @param tc scheduler context
*/
static void
-run (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+run_aggregation (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
int *global_ret = cls;
struct TALER_MINTDB_Session *session;
+ struct AggregationUnit *au;
+ unsigned int i;
int ret;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
@@ -207,50 +463,399 @@ run (void *cls,
*global_ret = GNUNET_SYSERR;
return;
}
+ au = GNUNET_new (struct AggregationUnit);
+ au->session = session;
ret = db_plugin->get_ready_deposit (db_plugin->cls,
session,
&deposit_cb,
- NULL);
- // FIXME: handle 0 == ret...
-
+ au);
if (GNUNET_OK != ret)
{
+ GNUNET_free (au);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ if (0 != ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to execute deposit iteration!\n");
+ *global_ret = GNUNET_SYSERR;
+ return;
+ }
+ /* nothing to do, sleep for a minute and try again */
+ task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+ &run_aggregation,
+ global_ret);
+ return;
+ }
+ /* Now try to find other deposits to aggregate */
+ ret = db_plugin->iterate_matching_deposits (db_plugin->cls,
+ session,
+ &au->h_wire,
+ &au->merchant_pub,
+ &aggregate_cb,
+ au,
+ aggregation_limit);
+ if ( (GNUNET_SYSERR == ret) ||
+ (GNUNET_YES == au->failed) )
+ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n");
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ db_plugin->rollback (db_plugin->cls,
+ session);
*global_ret = GNUNET_SYSERR;
+ return;
+ }
+ /* Round to the unit supported by the wire transfer method */
+ GNUNET_assert (GNUNET_SYSERR !=
+ wire_plugin->amount_round (wire_plugin->cls,
+ &au->total_amount));
+ /* Check if after rounding down, we still have an amount to transfer */
+ if ( (0 == au->total_amount.value) &&
+ (0 == au->total_amount.fraction) )
+ {
+ /* Rollback ongoing transaction, as we will not use the respective
+ WTID and thus need to remove the tracking data */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ /* Start another transaction to mark all* of the selected deposits
+ *as minor! */
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ return;
+ }
+ /* Mark transactions by row_id as minor */
+ ret = GNUNET_OK;
+ if (GNUNET_OK !=
+ db_plugin->mark_deposit_tiny (db_plugin->cls,
+ session,
+ au->row_id))
+ ret = GNUNET_SYSERR;
+ else
+ for (i=0;i<au->rows_offset;i++)
+ if (GNUNET_OK !=
+ db_plugin->mark_deposit_tiny (db_plugin->cls,
+ session,
+ au->additional_rows[i]))
+ ret = GNUNET_SYSERR;
+ /* commit */
+ if (GNUNET_OK !=
+ db_plugin->commit (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit database transaction!\n");
+ }
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ /* start again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
+ }
+ au->global_ret = global_ret;
+ au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls,
+ au->wire,
+ &au->total_amount,
+ &au->wtid,
+ &prepare_cb,
+ au);
+ /* FIXME: currently we have no clean-up plan on
+ shutdown to call prepare_wire_transfer_cancel!
+ Maybe make 'au' global? */
+ if (NULL == au->ph)
+ {
+ GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls,
session);
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ /* start again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
return;
}
- /* FIXME: finish aggregate computation */
- /* wire_plugin->prepare_wire_transfer () -- ASYNC! */
- /* db_plugin->wire_prepare_data_insert () -- transactional! */
- /* db_plugin->XXX () -- mark transactions selected for aggregate as finished */
+ /* otherwise we continue with #prepare_cb(), see below */
+}
+
- /* then finally: commit! */
+/**
+ * Execute the wire transfers that we have committed to
+ * do.
+ *
+ * @param cls pointer to an `int` which we will return from main()
+ * @param tc scheduler context
+ */
+static void
+run_transfers (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Function to be called with the prepared transfer data.
+ *
+ * @param cls closure with the `struct AggregationUnit`
+ * @param buf transaction data to persist, NULL on error
+ * @param buf_size number of bytes in @a buf, 0 on error
+ */
+static void
+prepare_cb (void *cls,
+ const char *buf,
+ size_t buf_size)
+{
+ struct AggregationUnit *au = cls;
+ int *global_ret = au->global_ret;
+ struct TALER_MINTDB_Session *session = au->session;
+
+ GNUNET_free_non_null (au->additional_rows);
+ GNUNET_free (au);
+ if (NULL == buf)
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ /* start again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
+ }
+
+ /* Commit our intention to execute the wire transfer! */
+ if (GNUNET_OK !=
+ db_plugin->wire_prepare_data_insert (db_plugin->cls,
+ session,
+ mint_wireformat,
+ buf,
+ buf_size))
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ /* start again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
+ }
+
+ /* Now we can finally commit the overall transaction, as we are
+ again consistent if all of this passes. */
if (GNUNET_OK !=
db_plugin->commit (db_plugin->cls,
session))
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Failed to commit database transaction!\n");
+ /* try again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
}
- /* While possible, run 2nd type of transaction:
- db_plugin->start()
- - select pre-commit data from DB:
- db_plugin->wire_prepare_data_iterate ()
- - execute wire transfer (successfully!)
- wire_plugin->execute_wire_transfer() # ASYNC!
- db_plugin->wire_prepare_data_mark_finished ()
- db_plugin->insert_aggregation_tracking ()
- db_plugin->commit()
- */
+ /* run alternative task: actually do wire transfer! */
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ &global_ret);
+}
+
+
+/**
+ * Data we keep to #run_transfers().
+ */
+struct WirePrepareData
+{
+
+ /**
+ * Database session for all of our transactions.
+ */
+ struct TALER_MINTDB_Session *session;
+
+ /**
+ * Wire execution handle.
+ */
+ struct TALER_WIRE_ExecuteHandle *eh;
+
+ /**
+ * Pointer to global return value. Closure for #run().
+ */
+ int *global_ret;
+
+
+ /**
+ * Row ID of the transfer.
+ */
+ unsigned long long row_id;
+};
- task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS /* FIXME: adjust! */,
- &run,
- global_ret);
+
+/**
+ * Function called with the result from the execute step.
+ *
+ * @param cls closure with the `struct WirePrepareData`
+ * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ * @param emsg NULL on success, otherwise an error message
+ */
+static void
+wire_confirm_cb (void *cls,
+ int success,
+ const char *emsg)
+{
+ struct WirePrepareData *wpd = cls;
+ int *global_ret = wpd->global_ret;
+ struct TALER_MINTDB_Session *session = wpd->session;
+
+ wpd->eh = NULL;
+ if (GNUNET_SYSERR == success)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Wire transaction failed: %s\n",
+ emsg);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free (wpd);
+ return;
+ }
+ if (GNUNET_OK !=
+ db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
+ session,
+ wpd->row_id))
+ {
+ GNUNET_break (0); /* why!? */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free (wpd);
+ return;
+ }
+ GNUNET_free (wpd);
+ if (GNUNET_OK !=
+ db_plugin->commit (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Failed to commit database transaction!\n");
+ /* try again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ return;
+ }
+ /* continue with #run_transfers(), just to guard
+ against the unlikely case that there are more. */
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ &global_ret);
+
+}
+
+
+/**
+ * Callback with data about a prepared transaction.
+ *
+ * @param cls closure with the `struct WirePrepareData`
+ * @param rowid row identifier used to mark prepared transaction as done
+ * @param buf transaction data that was persisted, NULL on error
+ * @param buf_size number of bytes in @a buf, 0 on error
+ */
+static void
+wire_prepare_cb (void *cls,
+ unsigned long long rowid,
+ const char *buf,
+ size_t buf_size)
+{
+ struct WirePrepareData *wpd = cls;
+ int *global_ret = wpd->global_ret;
+
+ wpd->row_id = rowid;
+ wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls,
+ buf,
+ buf_size,
+ &wire_confirm_cb,
+ wpd);
+ /* FIXME: currently we have no clean-up plan on
+ shutdown to call execute_wire_transfer_cancel!
+ Maybe make 'wpd' global? */
+ if (NULL == wpd->eh)
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ wpd->session);
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free (wpd);
+ return;
+ }
+}
+
+
+/**
+ * Execute the wire transfers that we have committed to
+ * do.
+ *
+ * @param cls pointer to an `int` which we will return from main()
+ * @param tc scheduler context
+ */
+static void
+run_transfers (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ int *global_ret = cls;
+ int ret;
+ struct WirePrepareData *wpd;
+ struct TALER_MINTDB_Session *session;
+
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+ if (NULL == (session = db_plugin->get_session (db_plugin->cls,
+ GNUNET_NO)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database session!\n");
+ *global_ret = GNUNET_SYSERR;
+ return;
+ }
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ *global_ret = GNUNET_SYSERR;
+ return;
+ }
+ wpd = GNUNET_new (struct WirePrepareData);
+ wpd->session = session;
+ wpd->global_ret = global_ret;
+ ret = db_plugin->wire_prepare_data_get (db_plugin->cls,
+ session,
+ mint_wireformat,
+ &wire_prepare_cb,
+ wpd);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ *global_ret = GNUNET_SYSERR;
+ GNUNET_free (wpd);
+ return;
+ }
+ if (GNUNET_NO == ret)
+ {
+ /* no more prepared wire transfers, go back to aggregation! */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ global_ret);
+ GNUNET_free (wpd);
+ return;
+ }
+ /* otherwise, continues in #wire_prepare_cb() */
}
@@ -299,7 +904,7 @@ main (int argc,
return 1;
}
- GNUNET_SCHEDULER_run (&run, &ret);
+ GNUNET_SCHEDULER_run (&run_transfers, &ret);
TALER_MINTDB_plugin_unload (db_plugin);
TALER_WIRE_plugin_unload (wire_plugin);
diff --git a/src/mint/taler-mint-httpd_db.c b/src/mint/taler-mint-httpd_db.c
index c39cbbcf..b93ead3a 100644
--- a/src/mint/taler-mint-httpd_db.c
+++ b/src/mint/taler-mint-httpd_db.c
@@ -1804,7 +1804,6 @@ struct DepositWtidContext
* @param coin_contribution how much did the coin we asked about
* contribute to the total transfer value? (deposit value including fee)
* @param coin_fee how much did the mint charge for the deposit fee
- * @param total_amount how much was the total wire transfer?
* @param execution_time when was the transaction done, or
* when we expect it to be done (if @a wtid was NULL);
* #GNUNET_TIME_UNIT_FOREVER_ABS if the /deposit is unknown
@@ -1815,7 +1814,6 @@ handle_wtid_data (void *cls,
const struct TALER_WireTransferIdentifierRawP *wtid,
const struct TALER_Amount *coin_contribution,
const struct TALER_Amount *coin_fee,
- const struct TALER_Amount *total_amount,
struct GNUNET_TIME_Absolute execution_time)
{
struct DepositWtidContext *ctx = cls;
@@ -1843,7 +1841,6 @@ handle_wtid_data (void *cls,
&ctx->h_wire,
&ctx->coin_pub,
&coin_delta,
- total_amount,
ctx->transaction_id,
wtid,
execution_time);
diff --git a/src/mint/taler-mint-httpd_responses.c b/src/mint/taler-mint-httpd_responses.c
index 041f694b..2ebd0d33 100644
--- a/src/mint/taler-mint-httpd_responses.c
+++ b/src/mint/taler-mint-httpd_responses.c
@@ -1097,7 +1097,6 @@ TMH_RESPONSE_reply_deposit_pending (struct MHD_Connection *connection,
* @param coin_pub public key of the coin
* @param coin_contribution how much did the coin we asked about
* contribute to the total transfer value? (deposit value minus fee)
- * @param total_amount how much was the total wire transfer?
* @param transaction_id merchant transaction identifier
* @param wtid raw wire transfer identifier
* @param exec_time execution time of the wire transfer
@@ -1109,7 +1108,6 @@ TMH_RESPONSE_reply_deposit_wtid (struct MHD_Connection *connection,
const struct GNUNET_HashCode *h_wire,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *coin_contribution,
- const struct TALER_Amount *total_amount,
uint64_t transaction_id,
const struct TALER_WireTransferIdentifierRawP *wtid,
struct GNUNET_TIME_Absolute exec_time)
@@ -1128,8 +1126,6 @@ TMH_RESPONSE_reply_deposit_wtid (struct MHD_Connection *connection,
cw.execution_time = GNUNET_TIME_absolute_hton (exec_time);
TALER_amount_hton (&cw.coin_contribution,
coin_contribution);
- TALER_amount_hton (&cw.total_amount,
- total_amount);
TMH_KS_sign (&cw.purpose,
&pub,
&sig);
@@ -1140,7 +1136,6 @@ TMH_RESPONSE_reply_deposit_wtid (struct MHD_Connection *connection,
sizeof (*wtid)),
"execution_time", TALER_json_from_abs (exec_time),
"coin_contribution", TALER_json_from_amount (coin_contribution),
- "total_amount", TALER_json_from_amount (total_amount),
"mint_sig", TALER_json_from_data (&sig,
sizeof (sig)),
"mint_pub", TALER_json_from_data (&pub,
diff --git a/src/mint/taler-mint-httpd_responses.h b/src/mint/taler-mint-httpd_responses.h
index caad2904..a0396c8a 100644
--- a/src/mint/taler-mint-httpd_responses.h
+++ b/src/mint/taler-mint-httpd_responses.h
@@ -280,6 +280,7 @@ TMH_RESPONSE_reply_deposit_pending (struct MHD_Connection *connection,
* @param h_contract hash of the contract
* @param h_wire hash of wire account details
* @param coin_pub public key of the coin
+ * @param coin_contribution contribution of this coin to the total amount transferred
* @param transaction_id merchant transaction identifier
* @param wtid raw wire transfer identifier
* @param exec_time execution time of the wire transfer
@@ -291,7 +292,6 @@ TMH_RESPONSE_reply_deposit_wtid (struct MHD_Connection *connection,
const struct GNUNET_HashCode *h_wire,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *coin_contribution,
- const struct TALER_Amount *total_amount,
uint64_t transaction_id,
const struct TALER_WireTransferIdentifierRawP *wtid,
struct GNUNET_TIME_Absolute exec_time);
diff --git a/src/mintdb/plugin_mintdb_postgres.c b/src/mintdb/plugin_mintdb_postgres.c
index fc204f5e..2ab3e81a 100644
--- a/src/mintdb/plugin_mintdb_postgres.c
+++ b/src/mintdb/plugin_mintdb_postgres.c
@@ -466,9 +466,6 @@ postgres_create_tables (void *cls,
",coin_fee_val INT8 NOT NULL"
",coin_fee_frac INT4 NOT NULL"
",coin_fee_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"
- ",transfer_total_val INT8 NOT NULL"
- ",transfer_total_frac INT4 NOT NULL"
- ",transfer_total_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"
")");
/* Index for lookup_transactions statement on wtid */
SQLEXEC_INDEX("CREATE INDEX aggregation_tracking_wtid_index "
@@ -1090,9 +1087,6 @@ postgres_prepare (PGconn *db_conn)
",coin_fee_val"
",coin_fee_frac"
",coin_fee_curr"
- ",transfer_total_val"
- ",transfer_total_frac"
- ",transfer_total_curr"
" FROM aggregation_tracking"
" WHERE wtid_raw=$1",
1, NULL);
@@ -1108,9 +1102,6 @@ postgres_prepare (PGconn *db_conn)
",coin_fee_val"
",coin_fee_frac"
",coin_fee_curr"
- ",transfer_total_val"
- ",transfer_total_frac"
- ",transfer_total_curr"
" FROM aggregation_tracking"
" WHERE"
" coin_pub=$1 AND"
@@ -1136,12 +1127,9 @@ postgres_prepare (PGconn *db_conn)
",coin_fee_val"
",coin_fee_frac"
",coin_fee_curr"
- ",transfer_total_val"
- ",transfer_total_frac"
- ",transfer_total_curr"
") VALUES "
- "($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
- 16, NULL);
+ "($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
+ 13, NULL);
/* Used in #postgres_wire_prepare_data_insert() to store
@@ -3950,7 +3938,6 @@ postgres_wire_lookup_deposit_wtid (void *cls,
NULL,
&coin_amount,
&coin_fee,
- NULL,
exec_time);
PQclear (result);
return GNUNET_YES;
@@ -3967,13 +3954,11 @@ postgres_wire_lookup_deposit_wtid (void *cls,
struct GNUNET_TIME_Absolute exec_time;
struct TALER_Amount coin_amount;
struct TALER_Amount coin_fee;
- struct TALER_Amount transaction_amount;
struct TALER_PQ_ResultSpec rs[] = {
TALER_PQ_result_spec_auto_from_type ("wtid_raw", &wtid),
TALER_PQ_result_spec_absolute_time ("execution_time", &exec_time),
TALER_PQ_result_spec_amount ("coin_amount", &coin_amount),
TALER_PQ_result_spec_amount ("coin_fee", &coin_fee),
- TALER_PQ_result_spec_amount ("transfer_total", &transaction_amount),
TALER_PQ_result_spec_end
};
if (GNUNET_OK != TALER_PQ_extract_result (result, rs, 0))
@@ -3986,7 +3971,6 @@ postgres_wire_lookup_deposit_wtid (void *cls,
&wtid,
&coin_amount,
&coin_fee,
- &transaction_amount,
exec_time);
}
PQclear (result);
@@ -4007,7 +3991,6 @@ postgres_wire_lookup_deposit_wtid (void *cls,
* @param coin_pub which public key was this payment about
* @param coin_value amount contributed by this coin in total
* @param coin_fee deposit fee charged by mint for this coin
- * @param transfer_value total amount of the wire transfer
* @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
*/
static int
@@ -4021,8 +4004,7 @@ postgres_insert_aggregation_tracking (void *cls,
struct GNUNET_TIME_Absolute execution_time,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *coin_value,
- const struct TALER_Amount *coin_fee,
- const struct TALER_Amount *transfer_value)
+ const struct TALER_Amount *coin_fee)
{
struct TALER_PQ_QueryParam params[] = {
TALER_PQ_query_param_auto_from_type (h_contract),
@@ -4034,7 +4016,6 @@ postgres_insert_aggregation_tracking (void *cls,
TALER_PQ_query_param_absolute_time (&execution_time),
TALER_PQ_query_param_amount (coin_value),
TALER_PQ_query_param_amount (coin_fee),
- TALER_PQ_query_param_amount (transfer_value),
TALER_PQ_query_param_end
};
PGresult *result;