summaryrefslogtreecommitdiff
path: root/src/exchangedb
diff options
context:
space:
mode:
authorJoseph <Joseph.xu@efrei.net>2022-11-23 10:41:57 -0500
committerJoseph <Joseph.xu@efrei.net>2022-12-06 08:07:14 -0500
commitb6476ac881cfd3bde41c88b94f6a7538acf76f9c (patch)
tree08019207d8eb58f0c3d592c69121a3be6c40b5db /src/exchangedb
parent87198f124c989d014adc9a2bae5098cf80555d62 (diff)
downloadexchange-b6476ac881cfd3bde41c88b94f6a7538acf76f9c.tar.gz
exchange-b6476ac881cfd3bde41c88b94f6a7538acf76f9c.tar.bz2
exchange-b6476ac881cfd3bde41c88b94f6a7538acf76f9c.zip
batch modifications
Diffstat (limited to 'src/exchangedb')
-rw-r--r--src/exchangedb/pg_batch_reserves_in_insert.c157
-rw-r--r--src/exchangedb/test_exchangedb_by_j.c53
2 files changed, 174 insertions, 36 deletions
diff --git a/src/exchangedb/pg_batch_reserves_in_insert.c b/src/exchangedb/pg_batch_reserves_in_insert.c
index 455f080dd..f40641edd 100644
--- a/src/exchangedb/pg_batch_reserves_in_insert.c
+++ b/src/exchangedb/pg_batch_reserves_in_insert.c
@@ -16,7 +16,7 @@
/**
* @file exchangedb/pg_batch_reserves_in_insert.c
* @brief Implementation of the reserves_in_insert function for Postgres
- * @author JOSEPHxu
+ * @author Joseph XU
*/
#include "platform.h"
#include "taler_error_codes.h"
@@ -35,15 +35,12 @@
/**
- * Generate event notification for the reserve
- * change.
+ * Generate event notification for the reserve change.
*
- * @param pg plugin state
* @param reserve_pub reserve to notfiy on
*/
-static void
-notify_on_reserve (struct PostgresClosure *pg,
- const struct TALER_ReservePublicKeyP *reserve_pub)
+static char *
+compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
{
struct TALER_ReserveEventP rep = {
.header.size = htons (sizeof (rep)),
@@ -51,12 +48,7 @@ notify_on_reserve (struct PostgresClosure *pg,
.reserve_pub = *reserve_pub
};
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Notifying on reserve!\n");
- TEH_PG_event_notify (pg,
- &rep.header,
- NULL,
- 0);
+ return GNUNET_PG_get_event_notify_channel (&rep.header);
}
@@ -75,8 +67,11 @@ TEH_PG_batch_reserves_in_insert (void *cls,
uint64_t reserve_uuid;
bool conflicted;
bool transaction_duplicate;
+ bool need_update = false;
struct GNUNET_TIME_Timestamp reserve_expiration
= GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
+ bool conflicts[reserves_length];
+ char *notify_s[reserves_length];
PREPARE (pg,
"reserve_create",
@@ -84,8 +79,8 @@ TEH_PG_batch_reserves_in_insert (void *cls,
"out_reserve_found AS conflicted"
",transaction_duplicate"
",ruuid AS reserve_uuid"
- " FROM exchange_do_batch_reserves_in"
- " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
+ " FROM batch_reserves_insert"
+ " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12);");
expiry = GNUNET_TIME_absolute_to_timestamp (
GNUNET_TIME_absolute_add (reserves->execution_time.abs_time,
pg->idle_reserve_expiration_time));
@@ -98,6 +93,16 @@ TEH_PG_batch_reserves_in_insert (void *cls,
GNUNET_STRINGS_relative_time_to_string (
pg->idle_reserve_expiration_time,
GNUNET_NO));
+
+ {
+ if (GNUNET_OK !=
+ TEH_PG_start_read_committed(pg,
+ "READ_COMMITED"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ }
/* Optimistically assume this is a new reserve, create balance for the first
time; we do this before adding the actual transaction to "reserves_in",
as for a new reserve it can't be a duplicate 'add' operation, and as
@@ -105,27 +110,34 @@ TEH_PG_batch_reserves_in_insert (void *cls,
for (unsigned int i = 0; i<reserves_length; i++)
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
+ notify_s[i] = compute_notify_on_reserve (&reserve->reserve_pub);
+ }
+
+ for (unsigned int i=0;i<reserves_length;i++)
+ {
+ const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub), /*$1*/
- GNUNET_PQ_query_param_timestamp (&expiry), /*$4*/
- GNUNET_PQ_query_param_timestamp (&gc), /*$5*/
- GNUNET_PQ_query_param_uint64 (&reserve->wire_reference), /*6*/
- TALER_PQ_query_param_amount (&reserve->balance), /*7+8*/
- GNUNET_PQ_query_param_string (reserve->exchange_account_name), /*9*/
- GNUNET_PQ_query_param_timestamp (&reserve->execution_time), /*10*/
- GNUNET_PQ_query_param_auto_from_type (&h_payto), /*11*/
- GNUNET_PQ_query_param_string (reserve->sender_account_details),/*12*/
- GNUNET_PQ_query_param_timestamp (&reserve_expiration),/*13*/
+ GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
+ GNUNET_PQ_query_param_timestamp (&expiry),
+ GNUNET_PQ_query_param_timestamp (&gc),
+ GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
+ TALER_PQ_query_param_amount (&reserve->balance),
+ GNUNET_PQ_query_param_string (reserve->exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserve->execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserve->sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+ GNUNET_PQ_query_param_string (notify_s[i]),
GNUNET_PQ_query_param_end
};
- /* We should get all our results into results[]*/
+
struct GNUNET_PQ_ResultSpec rs[] = {
- GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
- &reserve_uuid),
GNUNET_PQ_result_spec_bool ("conflicted",
&conflicted),
GNUNET_PQ_result_spec_bool ("transaction_duplicate",
&transaction_duplicate),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
+ &reserve_uuid),
GNUNET_PQ_result_spec_end
};
@@ -137,15 +149,92 @@ TEH_PG_batch_reserves_in_insert (void *cls,
params,
rs);
if (qs1 < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserves (%d)\n",
+ qs1);
return qs1;
- notify_on_reserve (pg,
- &reserve->reserve_pub);
- GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1);
- results[i] = (transaction_duplicate)
+ }
+ GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1);
+ results[i] = (transaction_duplicate)
? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
: GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- if ( (! conflicted) && transaction_duplicate)
- TEH_PG_rollback (pg);
+ conflicts[i] = conflicted;
+ if (!conflicts[i] && transaction_duplicate)
+ {
+ GNUNET_break (0);
+ TEH_PG_rollback (pg);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ need_update |= conflicted;
}
+ // commit
+ {
+ enum GNUNET_DB_QueryStatus cs;
+
+ cs = TEH_PG_commit (pg);
+ if (cs < 0)
+ return cs;
+ }
+
+ if (!need_update)
+ goto exit;
+ // begin serializable
+ {
+ if (GNUNET_OK !=
+ TEH_PG_start(pg,
+ "reserve-insert-continued"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ }
+
+ enum GNUNET_DB_QueryStatus qs2;
+ PREPARE (pg,
+ "reserves_in_add_transaction",
+ "SELECT batch_reserves_update"
+ " ($1,$2,$3,$4,$5,$6,$7,$8,$9);");
+ for (unsigned int i=0;i<reserves_length;i++)
+ {
+ if (! conflicts[i])
+ continue;
+ {
+ const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
+ GNUNET_PQ_query_param_timestamp (&expiry),
+ GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
+ TALER_PQ_query_param_amount (&reserve->balance),
+ GNUNET_PQ_query_param_string (reserve->exchange_account_name),
+ GNUNET_PQ_query_param_bool (conflicted),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (notify_s[i]),
+ GNUNET_PQ_query_param_end
+ };
+ qs2 = GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "reserves_in_add_transaction",
+ params);
+ if (qs2<0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to update reserves (%d)\n",
+ qs2);
+ return qs2;
+ }
+ }
+ }
+ {
+ enum GNUNET_DB_QueryStatus cs;
+
+ cs = TEH_PG_commit (pg);
+ if (cs < 0)
+ return cs;
+ }
+
+ exit:
+ for (unsigned int i=0;i<reserves_length;i++)
+ GNUNET_free (notify_s[i]);
+
return reserves_length;
}
diff --git a/src/exchangedb/test_exchangedb_by_j.c b/src/exchangedb/test_exchangedb_by_j.c
index 43f471671..0f252a097 100644
--- a/src/exchangedb/test_exchangedb_by_j.c
+++ b/src/exchangedb/test_exchangedb_by_j.c
@@ -91,9 +91,22 @@ run (void *cls)
result = 77;
goto cleanup;
}
+<<<<<<< HEAD
for (unsigned int i = 0; i< 7; i++)
+=======
+ if (GNUNET_OK !=
+ plugin->setup_partitions (plugin->cls,
+ num_partitions))
{
- static unsigned int batches[] = {1, 1, 2, 4, 16, 64, 256};
+ GNUNET_break (0);
+ result = 77;
+ goto cleanup;
+ }
+
+ for (unsigned int i = 0; i< 8; i++)
+>>>>>>> 26922c6d (batch modifications)
+ {
+ static unsigned int batches[] = {1, 1,0, 2, 4, 16, 64, 256};
const char *sndr = "payto://x-taler-bank/localhost:8080/1";
struct TALER_Amount value;
unsigned int batch_size = batches[i];
@@ -101,6 +114,7 @@ run (void *cls)
struct GNUNET_TIME_Timestamp ts;
struct GNUNET_TIME_Relative duration;
struct TALER_EXCHANGEDB_ReserveInInfo reserves[batch_size];
+ /* struct TALER_EXCHANGEDB_ReserveInInfo reserves2[batch_size];*/
enum GNUNET_DB_QueryStatus results[batch_size];
GNUNET_assert (GNUNET_OK ==
TALER_string_to_amount (CURRENCY ":1.000010",
@@ -109,9 +123,14 @@ run (void *cls)
ts = GNUNET_TIME_timestamp_get ();
for (unsigned int r = 0; r<10; r++)
{
+<<<<<<< HEAD
plugin->start_read_committed (plugin->cls,
"test_by_j");
+=======
+ plugin->start (plugin->cls,
+ "test_by_exchange_j");
+>>>>>>> 26922c6d (batch modifications)
for (unsigned int k = 0; k<batch_size; k++)
{
RND_BLK (&reserves[k].reserve_pub);
@@ -120,6 +139,7 @@ run (void *cls)
reserves[k].sender_account_details = sndr;
reserves[k].exchange_account_name = "name";
reserves[k].wire_reference = k;
+<<<<<<< HEAD
}
FAILIF (batch_size !=
@@ -129,13 +149,42 @@ run (void *cls)
results));
plugin->commit (plugin->cls);
+=======
+ }
+ FAILIF (batch_size !=
+ plugin->batch_reserves_in_insert (plugin->cls,
+ reserves,
+ batch_size,
+ results));
+ /*plugin->commit (plugin->cls);*/
+>>>>>>> 26922c6d (batch modifications)
}
+ /*
+ for (unsigned int s=0;s<10;s++)
+ {
+ for (unsigned int k = 0; k<batch_size; k++)
+ {
+ RND_BLK (&reserves2[k].reserve_pub);
+ reserves2[k].balance = value;
+ reserves2[k].execution_time = ts;
+ reserves2[k].sender_account_details = sndr;
+ reserves2[k].exchange_account_name = "name";
+ reserves2[k].wire_reference = k;
+ }
+ FAILIF (batch_size !=
+ plugin->batch_reserves_in_insert (plugin->cls,
+ reserves2,
+ batch_size,
+ results));
+ }*/
+
duration = GNUNET_TIME_absolute_get_duration (now);
fprintf (stdout,
"for a batchsize equal to %d it took %s\n",
batch_size,
GNUNET_STRINGS_relative_time_to_string (duration,
GNUNET_NO) );
+
}
result = 0;
drop:
@@ -155,7 +204,6 @@ main (int argc,
char *config_filename;
char *testname;
struct GNUNET_CONFIGURATION_Handle *cfg;
-
(void) argc;
result = -1;
if (NULL == (plugin_name = strrchr (argv[0], (int) '-')))
@@ -163,6 +211,7 @@ main (int argc,
GNUNET_break (0);
return -1;
}
+
GNUNET_log_setup (argv[0],
"WARNING",
NULL);