summaryrefslogtreecommitdiff
path: root/src/exchangedb/pg_batch_reserves_in_insert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchangedb/pg_batch_reserves_in_insert.c')
-rw-r--r--src/exchangedb/pg_batch_reserves_in_insert.c157
1 files changed, 123 insertions, 34 deletions
diff --git a/src/exchangedb/pg_batch_reserves_in_insert.c b/src/exchangedb/pg_batch_reserves_in_insert.c
index 455f080dd..f40641edd 100644
--- a/src/exchangedb/pg_batch_reserves_in_insert.c
+++ b/src/exchangedb/pg_batch_reserves_in_insert.c
@@ -16,7 +16,7 @@
/**
* @file exchangedb/pg_batch_reserves_in_insert.c
* @brief Implementation of the reserves_in_insert function for Postgres
- * @author JOSEPHxu
+ * @author Joseph XU
*/
#include "platform.h"
#include "taler_error_codes.h"
@@ -35,15 +35,12 @@
/**
- * Generate event notification for the reserve
- * change.
+ * Generate event notification for the reserve change.
*
- * @param pg plugin state
* @param reserve_pub reserve to notfiy on
*/
-static void
-notify_on_reserve (struct PostgresClosure *pg,
- const struct TALER_ReservePublicKeyP *reserve_pub)
+static char *
+compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
{
struct TALER_ReserveEventP rep = {
.header.size = htons (sizeof (rep)),
@@ -51,12 +48,7 @@ notify_on_reserve (struct PostgresClosure *pg,
.reserve_pub = *reserve_pub
};
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Notifying on reserve!\n");
- TEH_PG_event_notify (pg,
- &rep.header,
- NULL,
- 0);
+ return GNUNET_PG_get_event_notify_channel (&rep.header);
}
@@ -75,8 +67,11 @@ TEH_PG_batch_reserves_in_insert (void *cls,
uint64_t reserve_uuid;
bool conflicted;
bool transaction_duplicate;
+ bool need_update = false;
struct GNUNET_TIME_Timestamp reserve_expiration
= GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
+ bool conflicts[reserves_length];
+ char *notify_s[reserves_length];
PREPARE (pg,
"reserve_create",
@@ -84,8 +79,8 @@ TEH_PG_batch_reserves_in_insert (void *cls,
"out_reserve_found AS conflicted"
",transaction_duplicate"
",ruuid AS reserve_uuid"
- " FROM exchange_do_batch_reserves_in"
- " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
+ " FROM batch_reserves_insert"
+ " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12);");
expiry = GNUNET_TIME_absolute_to_timestamp (
GNUNET_TIME_absolute_add (reserves->execution_time.abs_time,
pg->idle_reserve_expiration_time));
@@ -98,6 +93,16 @@ TEH_PG_batch_reserves_in_insert (void *cls,
GNUNET_STRINGS_relative_time_to_string (
pg->idle_reserve_expiration_time,
GNUNET_NO));
+
+ {
+ if (GNUNET_OK !=
+ TEH_PG_start_read_committed(pg,
+ "READ_COMMITED"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ }
/* Optimistically assume this is a new reserve, create balance for the first
time; we do this before adding the actual transaction to "reserves_in",
as for a new reserve it can't be a duplicate 'add' operation, and as
@@ -105,27 +110,34 @@ TEH_PG_batch_reserves_in_insert (void *cls,
for (unsigned int i = 0; i<reserves_length; i++)
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
+ notify_s[i] = compute_notify_on_reserve (&reserve->reserve_pub);
+ }
+
+ for (unsigned int i=0;i<reserves_length;i++)
+ {
+ const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub), /*$1*/
- GNUNET_PQ_query_param_timestamp (&expiry), /*$4*/
- GNUNET_PQ_query_param_timestamp (&gc), /*$5*/
- GNUNET_PQ_query_param_uint64 (&reserve->wire_reference), /*6*/
- TALER_PQ_query_param_amount (&reserve->balance), /*7+8*/
- GNUNET_PQ_query_param_string (reserve->exchange_account_name), /*9*/
- GNUNET_PQ_query_param_timestamp (&reserve->execution_time), /*10*/
- GNUNET_PQ_query_param_auto_from_type (&h_payto), /*11*/
- GNUNET_PQ_query_param_string (reserve->sender_account_details),/*12*/
- GNUNET_PQ_query_param_timestamp (&reserve_expiration),/*13*/
+ GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
+ GNUNET_PQ_query_param_timestamp (&expiry),
+ GNUNET_PQ_query_param_timestamp (&gc),
+ GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
+ TALER_PQ_query_param_amount (&reserve->balance),
+ GNUNET_PQ_query_param_string (reserve->exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserve->execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserve->sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+ GNUNET_PQ_query_param_string (notify_s[i]),
GNUNET_PQ_query_param_end
};
- /* We should get all our results into results[]*/
+
struct GNUNET_PQ_ResultSpec rs[] = {
- GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
- &reserve_uuid),
GNUNET_PQ_result_spec_bool ("conflicted",
&conflicted),
GNUNET_PQ_result_spec_bool ("transaction_duplicate",
&transaction_duplicate),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
+ &reserve_uuid),
GNUNET_PQ_result_spec_end
};
@@ -137,15 +149,92 @@ TEH_PG_batch_reserves_in_insert (void *cls,
params,
rs);
if (qs1 < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserves (%d)\n",
+ qs1);
return qs1;
- notify_on_reserve (pg,
- &reserve->reserve_pub);
- GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1);
- results[i] = (transaction_duplicate)
+ }
+ GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1);
+ results[i] = (transaction_duplicate)
? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
: GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- if ( (! conflicted) && transaction_duplicate)
- TEH_PG_rollback (pg);
+ conflicts[i] = conflicted;
+ if (!conflicts[i] && transaction_duplicate)
+ {
+ GNUNET_break (0);
+ TEH_PG_rollback (pg);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ need_update |= conflicted;
}
+ // commit
+ {
+ enum GNUNET_DB_QueryStatus cs;
+
+ cs = TEH_PG_commit (pg);
+ if (cs < 0)
+ return cs;
+ }
+
+ if (!need_update)
+ goto exit;
+ // begin serializable
+ {
+ if (GNUNET_OK !=
+ TEH_PG_start(pg,
+ "reserve-insert-continued"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ }
+
+ enum GNUNET_DB_QueryStatus qs2;
+ PREPARE (pg,
+ "reserves_in_add_transaction",
+ "SELECT batch_reserves_update"
+ " ($1,$2,$3,$4,$5,$6,$7,$8,$9);");
+ for (unsigned int i=0;i<reserves_length;i++)
+ {
+ if (! conflicts[i])
+ continue;
+ {
+ const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
+ GNUNET_PQ_query_param_timestamp (&expiry),
+ GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
+ TALER_PQ_query_param_amount (&reserve->balance),
+ GNUNET_PQ_query_param_string (reserve->exchange_account_name),
+ GNUNET_PQ_query_param_bool (conflicted),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (notify_s[i]),
+ GNUNET_PQ_query_param_end
+ };
+ qs2 = GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "reserves_in_add_transaction",
+ params);
+ if (qs2<0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to update reserves (%d)\n",
+ qs2);
+ return qs2;
+ }
+ }
+ }
+ {
+ enum GNUNET_DB_QueryStatus cs;
+
+ cs = TEH_PG_commit (pg);
+ if (cs < 0)
+ return cs;
+ }
+
+ exit:
+ for (unsigned int i=0;i<reserves_length;i++)
+ GNUNET_free (notify_s[i]);
+
return reserves_length;
}