summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-aggregator.c
diff options
context:
space:
mode:
authorChristian Grothoff <grothoff@gnunet.org>2022-03-24 17:33:29 +0100
committerChristian Grothoff <grothoff@gnunet.org>2022-03-24 17:33:29 +0100
commitb856d56d95f92eb9dedb0af49493350ea8ea2268 (patch)
tree3490ebf1e069fbe858a3f6cf97b18da8289840ae /src/exchange/taler-exchange-aggregator.c
parentc782dfe2aadfd06e47ed354c1fb389fecc715433 (diff)
downloadexchange-b856d56d95f92eb9dedb0af49493350ea8ea2268.tar.gz
exchange-b856d56d95f92eb9dedb0af49493350ea8ea2268.tar.bz2
exchange-b856d56d95f92eb9dedb0af49493350ea8ea2268.zip
rework deposits sharding, towards making aggregator faster (not necessarily done)
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r--src/exchange/taler-exchange-aggregator.c83
1 files changed, 64 insertions, 19 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index abab347fe..c34d47f9c 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -28,6 +28,18 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
+struct AdditionalDeposit
+{
+ /**
+ * Public key of the coin.
+ */
+ struct TALER_CoinSpendPublicKeyP coin_pub;
+
+ /**
+ * Row of the deposit.
+ */
+ uint64_t row;
+};
/**
* Information about one aggregation process to be executed. There is
@@ -43,6 +55,11 @@ struct AggregationUnit
struct TALER_MerchantPublicKeyP merchant_pub;
/**
+ * Public key of the coin.
+ */
+ struct TALER_CoinSpendPublicKeyP coin_pub;
+
+ /**
* Total amount to be transferred, before subtraction of @e fees.wire and rounding down.
*/
struct TALER_Amount total_amount;
@@ -97,7 +114,8 @@ struct AggregationUnit
/**
* Array of row_ids from the aggregation.
*/
- uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
+ struct AdditionalDeposit
+ additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
/**
* Offset specifying how many @e additional_rows are in use.
@@ -383,7 +401,8 @@ deposit_cb (void *cls,
enum GNUNET_DB_QueryStatus qs;
au->merchant_pub = *merchant_pub;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ au->coin_pub = *coin_pub;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Aggregator processing payment %s with amount %s\n",
TALER_B2S (coin_pub),
TALER_amount2s (amount_with_fee));
@@ -405,7 +424,7 @@ deposit_cb (void *cls,
{
struct TALER_Amount ntotal;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Non-refunded transaction, subtracting deposit fee %s\n",
TALER_amount2s (deposit_fee));
if (0 >
@@ -428,6 +447,9 @@ deposit_cb (void *cls,
au->total_amount = ntotal;
}
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Amount after fee is %s\n",
+ TALER_amount2s (&au->total_amount));
GNUNET_assert (NULL == au->payto_uri);
au->payto_uri = GNUNET_strdup (payto_uri);
@@ -437,7 +459,7 @@ deposit_cb (void *cls,
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&au->wtid,
sizeof (au->wtid));
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n",
TALER_B2S (&au->wtid),
TALER_amount2s (amount_with_fee),
@@ -493,7 +515,7 @@ deposit_cb (void *cls,
"Aggregator marks deposit %llu as done\n",
(unsigned long long) row_id);
qs = db_plugin->mark_deposit_done (db_plugin->cls,
- merchant_pub,
+ coin_pub,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
@@ -528,6 +550,8 @@ aggregate_cb (void *cls,
struct TALER_Amount old;
enum GNUNET_DB_QueryStatus qs;
+ if (row_id == au->row_id)
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
{
/* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */
@@ -605,18 +629,29 @@ aggregate_cb (void *cls,
}
/* "append" to our list of rows */
- au->additional_rows[au->rows_offset++] = row_id;
+ au->additional_rows[au->rows_offset].coin_pub = *coin_pub;
+ au->additional_rows[au->rows_offset].row = row_id;
+ au->rows_offset++;
/* insert into aggregation tracking table */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding %llu to aggregate %s\n",
+ (unsigned long long) row_id,
+ TALER_B2S (&au->wtid));
qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
&au->wtid,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to add %llu to aggregate %s: %d\n",
+ (unsigned long long) row_id,
+ TALER_B2S (&au->wtid),
+ qs);
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return qs;
}
qs = db_plugin->mark_deposit_done (db_plugin->cls,
- &au->merchant_pub,
+ coin_pub,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
@@ -775,7 +810,7 @@ run_aggregation (void *cls)
}
/* Now try to find other deposits to aggregate */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Found ready deposit for %s, aggregating by target %llu\n",
TALER_B2S (&au_active.merchant_pub),
(unsigned long long) au_active.wire_target);
@@ -808,13 +843,17 @@ run_aggregation (void *cls)
s);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Found %d other deposits to combine into wire transfer.\n",
- qs);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found %d other deposits to combine into wire transfer with fee %s.\n",
+ qs,
+ TALER_amount2s (&au_active.fees.wire));
/* Subtract wire transfer fee and round to the unit supported by the
wire transfer method; Check if after rounding down, we still have
an amount to transfer, and if not mark as 'tiny'. */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Rounding aggregate of %s\n",
+ TALER_amount2s (&au_active.total_amount));
if ( (0 >=
TALER_amount_subtract (&au_active.final_amount,
&au_active.total_amount,
@@ -822,8 +861,7 @@ run_aggregation (void *cls)
(GNUNET_SYSERR ==
TALER_amount_round_down (&au_active.final_amount,
&currency_round_unit)) ||
- ( (0 == au_active.final_amount.value) &&
- (0 == au_active.final_amount.fraction) ) )
+ (TALER_amount_is_zero (&au_active.final_amount)) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Aggregate value too low for transfer (%d/%s)\n",
@@ -848,23 +886,29 @@ run_aggregation (void *cls)
return;
}
/* Mark transactions by row_id as minor */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Marking %s (%llu) as tiny\n",
+ TALER_B2S (&au_active.coin_pub),
+ (unsigned long long) au_active.row_id);
qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- &au_active.merchant_pub,
+ &au_active.coin_pub,
au_active.row_id);
- if (0 <= qs)
+ if (0 < qs)
{
for (unsigned int i = 0; i<au_active.rows_offset; i++)
{
qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- &au_active.merchant_pub,
- au_active.additional_rows[i]);
- if (0 > qs)
+ &au_active.additional_rows[i].
+ coin_pub,
+ au_active.additional_rows[i].row);
+ if (0 >= qs)
break;
}
}
+ GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serialization issue, trying again later!\n");
db_plugin->rollback (db_plugin->cls);
cleanup_au (&au_active);
@@ -876,6 +920,7 @@ run_aggregation (void *cls)
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
+ GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
cleanup_au (&au_active);
global_ret = EXIT_FAILURE;