summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-06-21 00:17:16 +0200
committerChristian Grothoff <christian@grothoff.org>2021-06-21 00:17:16 +0200
commit9c51720cbfb86c89bc1f1872432c4f6a66fba5bd (patch)
tree56eede9e232962013b09afa2efc3c1cb37f41e0f /src
parent108bf57d048a135cb71f9453540c9d6579ae2028 (diff)
downloadexchange-9c51720cbfb86c89bc1f1872432c4f6a66fba5bd.tar.gz
exchange-9c51720cbfb86c89bc1f1872432c4f6a66fba5bd.tar.bz2
exchange-9c51720cbfb86c89bc1f1872432c4f6a66fba5bd.zip
fixing parallel fakebank to ensure transactions are ordered, fixing indices/constraint preservation after DB update to 0002
Diffstat (limited to 'src')
-rw-r--r--src/bank-lib/fakebank.c323
-rw-r--r--src/benchmark/bank-benchmark.conf2
-rw-r--r--src/benchmark/taler-bank-benchmark.c48
-rw-r--r--src/exchange/taler-exchange-wirewatch.c414
-rw-r--r--src/exchangedb/drop0002.sql1
-rw-r--r--src/exchangedb/exchange-0001.sql5
-rw-r--r--src/exchangedb/exchange-0002.sql71
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c18
-rw-r--r--src/include/taler_testing_lib.h12
-rw-r--r--src/testing/testing_api_helpers_exchange.c51
10 files changed, 593 insertions, 352 deletions
diff --git a/src/bank-lib/fakebank.c b/src/bank-lib/fakebank.c
index 0365a651..f656c878 100644
--- a/src/bank-lib/fakebank.c
+++ b/src/bank-lib/fakebank.c
@@ -82,13 +82,6 @@ struct Account
char *account_name;
/**
- * Lock for modifying transaction list of this account.
- * Note that per-transaction locks MUST be acquired before
- * per-account locks (if both are acquired).
- */
- pthread_mutex_t lock;
-
- /**
* Current account balance.
*/
struct TALER_Amount balance;
@@ -158,11 +151,6 @@ struct Transaction
uint64_t row_id;
/**
- * Lock for accessing this transaction array entry.
- */
- pthread_mutex_t lock;
-
- /**
* What does the @e subject contain?
*/
enum
@@ -257,7 +245,7 @@ struct TALER_FAKEBANK_Handle
/**
* We store transactions in a revolving array.
*/
- struct Transaction *transactions;
+ struct Transaction **transactions;
/**
* HTTP server we run to pretend to be the "test" bank.
@@ -303,6 +291,12 @@ struct TALER_FAKEBANK_Handle
pthread_mutex_t uuid_map_lock;
/**
+ * Lock for accessing the internals of
+ * accounts and transaction array entries.
+ */
+ pthread_mutex_t big_lock;
+
+ /**
* Current transaction counter.
*/
uint64_t serial_counter;
@@ -380,9 +374,6 @@ lookup_account (struct TALER_FAKEBANK_Handle *h,
if (NULL == account)
{
account = GNUNET_new (struct Account);
- GNUNET_assert (0 ==
- pthread_mutex_init (&account->lock,
- NULL));
account->account_name = GNUNET_strdup (name);
GNUNET_assert (GNUNET_OK ==
TALER_amount_get_zero (h->currency,
@@ -409,7 +400,7 @@ check_log (struct TALER_FAKEBANK_Handle *h)
{
for (uint64_t i = 0; i<h->ram_limit; i++)
{
- struct Transaction *t = &h->transactions[i];
+ struct Transaction *t = h->transactions[i];
if (t->unchecked)
continue;
@@ -542,61 +533,9 @@ TALER_FAKEBANK_check_credit (struct TALER_FAKEBANK_Handle *h,
/**
- * Clean up space used by old transaction @a t.
- * The transaction @a t must already be locked
- * when calling this function!
- *
- * @param[in,out] h bank handle
- * @param[in] t transaction to clean up
- */
-static void
-clean_transaction (struct TALER_FAKEBANK_Handle *h,
- struct Transaction *t)
-{
- struct Account *da = t->debit_account;
- struct Account *ca = t->credit_account;
-
- if (NULL == da)
- return; /* nothing to be cleaned */
-
- /* slot was already in use, must clean out old
- entry first! */
- GNUNET_assert (0 ==
- pthread_mutex_lock (&da->lock));
- GNUNET_CONTAINER_MDLL_remove (out,
- da->out_head,
- da->out_tail,
- t);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&da->lock));
- GNUNET_assert (0 ==
- pthread_mutex_lock (&ca->lock));
- GNUNET_CONTAINER_MDLL_remove (in,
- ca->in_head,
- ca->in_tail,
- t);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&ca->lock));
- if (T_DEBIT == t->type)
- {
- GNUNET_assert (0 ==
- pthread_mutex_lock (&h->uuid_map_lock));
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_remove (h->uuid_map,
- &t->request_uid,
- t));
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&h->uuid_map_lock));
- }
- t->debit_account = NULL;
- t->credit_account = NULL;
-}
-
-
-/**
* Update @a account balance by @a amount.
*
- * The @a account must already be locked when calling
+ * The @a big_lock must already be locked when calling
* this function.
*
* @param[in,out] account account to update
@@ -642,16 +581,24 @@ update_balance (struct Account *account,
* The transaction @a t must already be locked
* when calling this function!
*
- * @param[in] t transaction to clean up
+ * @param[in,out] h bank handle
+ * @param[in,out] t transaction to add to account lists
*/
static void
-post_transaction (struct Transaction *t)
+post_transaction (struct TALER_FAKEBANK_Handle *h,
+ struct Transaction *t)
{
struct Account *debit_acc = t->debit_account;
struct Account *credit_acc = t->credit_account;
+ uint64_t row_id;
+ struct Transaction *old;
GNUNET_assert (0 ==
- pthread_mutex_lock (&debit_acc->lock));
+ pthread_mutex_lock (&h->big_lock));
+ row_id = ++h->serial_counter;
+ old = h->transactions[row_id % h->ram_limit];
+ h->transactions[row_id % h->ram_limit] = t;
+ t->row_id = row_id;
GNUNET_CONTAINER_MDLL_insert_tail (out,
debit_acc->out_head,
debit_acc->out_tail,
@@ -659,10 +606,6 @@ post_transaction (struct Transaction *t)
update_balance (debit_acc,
&t->amount,
true);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&debit_acc->lock));
- GNUNET_assert (0 ==
- pthread_mutex_lock (&credit_acc->lock));
GNUNET_CONTAINER_MDLL_insert_tail (in,
credit_acc->in_head,
credit_acc->in_tail,
@@ -670,8 +613,39 @@ post_transaction (struct Transaction *t)
update_balance (credit_acc,
&t->amount,
false);
+ if (NULL != old)
+ {
+ struct Account *da;
+ struct Account *ca;
+
+ da = old->debit_account;
+ ca = old->credit_account;
+ /* slot was already in use, must clean out old
+ entry first! */
+ GNUNET_CONTAINER_MDLL_remove (out,
+ da->out_head,
+ da->out_tail,
+ old);
+ GNUNET_CONTAINER_MDLL_remove (in,
+ ca->in_head,
+ ca->in_tail,
+ old);
+ }
GNUNET_assert (0 ==
- pthread_mutex_unlock (&credit_acc->lock));
+ pthread_mutex_unlock (&h->big_lock));
+ if ( (NULL != old) &&
+ (T_DEBIT == old->type) )
+ {
+ GNUNET_assert (0 ==
+ pthread_mutex_lock (&h->uuid_map_lock));
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (h->uuid_map,
+ &old->request_uid,
+ old));
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->uuid_map_lock));
+ }
+ GNUNET_free (old);
}
@@ -725,12 +699,8 @@ make_transfer (
pthread_mutex_lock (&h->uuid_map_lock));
t = GNUNET_CONTAINER_multihashmap_get (h->uuid_map,
request_uid);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&h->uuid_map_lock));
if (NULL != t)
{
- GNUNET_assert (0 ==
- pthread_mutex_lock (&t->lock));
if ( (debit_acc != t->debit_account) ||
(credit_acc != t->credit_account) ||
(0 != TALER_amount_cmp (amount,
@@ -742,30 +712,18 @@ make_transfer (
/* Transaction exists, but with different details. */
GNUNET_break (0);
GNUNET_assert (0 ==
- pthread_mutex_unlock (&t->lock));
+ pthread_mutex_unlock (&h->uuid_map_lock));
return GNUNET_SYSERR;
}
*ret_row_id = t->row_id;
GNUNET_assert (0 ==
- pthread_mutex_unlock (&t->lock));
+ pthread_mutex_unlock (&h->uuid_map_lock));
return GNUNET_OK;
}
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->uuid_map_lock));
}
- *ret_row_id = __sync_fetch_and_add (&h->serial_counter,
- 1);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Making transfer %llu from %s to %s over %s and subject %s; for exchange: %s\n",
- (unsigned long long) *ret_row_id,
- debit_account,
- credit_account,
- TALER_amount2s (amount),
- TALER_B2S (subject),
- exchange_base_url);
- t = &h->transactions[*ret_row_id % h->ram_limit];
- GNUNET_assert (0 ==
- pthread_mutex_lock (&t->lock));
- clean_transaction (h,
- t);
+ t = GNUNET_new (struct Transaction);
t->unchecked = true;
t->debit_account = debit_acc;
t->credit_account = credit_acc;
@@ -783,7 +741,8 @@ make_transfer (
&t->request_uid);
else
t->request_uid = *request_uid;
- post_transaction (t);
+ post_transaction (h,
+ t);
GNUNET_assert (0 ==
pthread_mutex_lock (&h->uuid_map_lock));
GNUNET_assert (GNUNET_OK ==
@@ -794,8 +753,15 @@ make_transfer (
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->uuid_map_lock));
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&t->lock));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Making transfer %llu from %s to %s over %s and subject %s; for exchange: %s\n",
+ (unsigned long long) t->row_id,
+ debit_account,
+ credit_account,
+ TALER_amount2s (amount),
+ TALER_B2S (subject),
+ exchange_base_url);
+ *ret_row_id = t->row_id;
return GNUNET_OK;
}
@@ -826,7 +792,6 @@ make_admin_transfer (
const struct GNUNET_PeerIdentity *pid;
struct Account *debit_acc;
struct Account *credit_acc;
- uint64_t ret;
GNUNET_static_assert (sizeof (*pid) ==
sizeof (*reserve_pub));
@@ -859,34 +824,21 @@ make_admin_transfer (
return GNUNET_NO;
}
- ret = __sync_fetch_and_add (&h->serial_counter,
- 1);
- if (NULL != row_id)
- *row_id = ret;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Making transfer from %s to %s over %s and subject %s at row %llu\n",
- debit_account,
- credit_account,
- TALER_amount2s (amount),
- TALER_B2S (reserve_pub),
- (unsigned long long) ret);
- t = &h->transactions[ret % h->ram_limit];
- GNUNET_assert (0 ==
- pthread_mutex_lock (&t->lock));
- clean_transaction (h,
- t);
+ t = GNUNET_new (struct Transaction);
t->unchecked = true;
t->debit_account = debit_acc;
t->credit_account = credit_acc;
t->amount = *amount;
- t->row_id = ret;
t->date = GNUNET_TIME_absolute_get ();
(void) GNUNET_TIME_round_abs (&t->date);
if (NULL != timestamp)
*timestamp = t->date;
t->type = T_CREDIT;
t->subject.credit.reserve_pub = *reserve_pub;
- post_transaction (t);
+ post_transaction (h,
+ t);
+ if (NULL != row_id)
+ *row_id = t->row_id;
GNUNET_assert (0 ==
pthread_mutex_lock (&h->rpubs_lock));
GNUNET_assert (GNUNET_OK ==
@@ -897,8 +849,13 @@ make_admin_transfer (
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->rpubs_lock));
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&t->lock));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Making transfer from %s to %s over %s and subject %s at row %llu\n",
+ debit_account,
+ credit_account,
+ TALER_amount2s (amount),
+ TALER_B2S (reserve_pub),
+ (unsigned long long) t->row_id);
return GNUNET_OK;
}
@@ -908,7 +865,7 @@ TALER_FAKEBANK_check_empty (struct TALER_FAKEBANK_Handle *h)
{
for (uint64_t i = 0; i<h->ram_limit; i++)
{
- struct Transaction *t = &h->transactions[i];
+ struct Transaction *t = h->transactions[i];
if (t->unchecked)
{
@@ -929,8 +886,6 @@ free_account (void *cls,
{
struct Account *account = val;
- GNUNET_assert (0 ==
- pthread_mutex_destroy (&account->lock));
GNUNET_free (account->account_name);
GNUNET_free (account);
return GNUNET_OK;
@@ -966,14 +921,16 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h)
}
GNUNET_CONTAINER_multihashmap_destroy (h->uuid_map);
GNUNET_CONTAINER_multipeermap_destroy (h->rpubs);
- for (uint64_t i = 0; i<h->ram_limit; i++)
- pthread_mutex_destroy (&h->transactions[i].lock);
+ GNUNET_assert (0 ==
+ pthread_mutex_destroy (&h->big_lock));
GNUNET_assert (0 ==
pthread_mutex_destroy (&h->uuid_map_lock));
GNUNET_assert (0 ==
pthread_mutex_destroy (&h->accounts_lock));
GNUNET_assert (0 ==
pthread_mutex_destroy (&h->rpubs_lock));
+ for (uint64_t i = 0; i<h->ram_limit; i++)
+ GNUNET_free (h->transactions[i]);
GNUNET_free (h->transactions);
GNUNET_free (h->my_baseurl);
GNUNET_free (h->currency);
@@ -1424,20 +1381,40 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
acc = lookup_account (h,
account);
+ GNUNET_asprintf (&debit_payto,
+ "payto://x-taler-bank/localhost/%s",
+ account);
+ history = json_array ();
+ if (NULL == history)
+ {
+ GNUNET_break (0);
+ GNUNET_free (debit_payto);
+ return MHD_NO;
+ }
+ GNUNET_assert (0 ==
+ pthread_mutex_lock (&h->big_lock));
if (! ha.have_start)
{
- GNUNET_assert (0 ==
- pthread_mutex_lock (&acc->lock));
pos = (0 > ha.delta)
? acc->out_tail
: acc->out_head;
}
else
{
- struct Transaction *t = &h->transactions[ha.start_idx % h->ram_limit];
+ struct Transaction *t = h->transactions[ha.start_idx % h->ram_limit];
- GNUNET_assert (0 ==
- pthread_mutex_lock (&t->lock));
+ if (NULL == t)
+ {
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ GNUNET_free (debit_payto);
+ /* FIXME: suspend for long-polling instead */
+ return TALER_MHD_reply_json_pack (connection,
+ MHD_HTTP_OK,
+ "{s:o}",
+ "outgoing_transactions",
+ history);
+ }
if (t->debit_account != acc)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1445,23 +1422,17 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
(unsigned long long) ha.start_idx,
account);
GNUNET_assert (0 ==
- pthread_mutex_unlock (&t->lock));
+ pthread_mutex_unlock (&h->big_lock));
+ GNUNET_free (debit_payto);
+ json_decref (history);
return MHD_NO;
}
- GNUNET_assert (0 ==
- pthread_mutex_lock (&acc->lock));
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&t->lock));
/* range is exclusive, skip the matching entry */
if (0 > ha.delta)
pos = t->prev_out;
else
pos = t->next_out;
}
- GNUNET_asprintf (&debit_payto,
- "payto://x-taler-bank/localhost/%s",
- account);
- history = json_array ();
while ( (0 != ha.delta) &&
(NULL != pos) )
{
@@ -1472,10 +1443,6 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
GNUNET_asprintf (&credit_payto,
"payto://x-taler-bank/localhost/%s",
pos->credit_account->account_name);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Appending credit_payto (%s) from credit_account (%s) within fakebank\n",
- credit_payto,
- pos->credit_account->account_name);
trans = json_pack (
"{s:I, s:o, s:o, s:s, s:s, s:s, s:o}",
"row_id", (json_int_t) pos->row_id,
@@ -1487,6 +1454,7 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
pos->subject.debit.exchange_base_url,
"wtid", GNUNET_JSON_from_data_auto (
&pos->subject.debit.wtid));
+ GNUNET_assert (NULL != trans);
GNUNET_free (credit_payto);
GNUNET_assert (0 ==
json_array_append_new (history,
@@ -1501,7 +1469,7 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
pos = pos->next_out;
}
GNUNET_assert (0 ==
- pthread_mutex_unlock (&acc->lock));
+ pthread_mutex_unlock (&h->big_lock));
GNUNET_free (debit_payto);
return TALER_MHD_reply_json_pack (connection,
MHD_HTTP_OK,
@@ -1539,20 +1507,35 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
}
acc = lookup_account (h,
account);
+ history = json_array ();
+ GNUNET_assert (NULL != history);
+ GNUNET_asprintf (&credit_payto,
+ "payto://x-taler-bank/localhost/%s",
+ account);
+ GNUNET_assert (0 ==
+ pthread_mutex_lock (&h->big_lock));
if (! ha.have_start)
{
- GNUNET_assert (0 ==
- pthread_mutex_lock (&acc->lock));
pos = (0 > ha.delta)
? acc->in_tail
: acc->in_head;
}
else
{
- struct Transaction *t = &h->transactions[ha.start_idx % h->ram_limit];
+ struct Transaction *t = h->transactions[ha.start_idx % h->ram_limit];
- GNUNET_assert (0 ==
- pthread_mutex_lock (&t->lock));
+ if (NULL == t)
+ {
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ GNUNET_free (credit_payto);
+ /* FIXME: suspend for long-polling instead */
+ return TALER_MHD_reply_json_pack (connection,
+ MHD_HTTP_OK,
+ "{s:o}",
+ "incoming_transactions",
+ history);
+ }
if (t->credit_account != acc)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1560,23 +1543,17 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
(unsigned long long) ha.start_idx,
account);
GNUNET_assert (0 ==
- pthread_mutex_unlock (&t->lock));
+ pthread_mutex_unlock (&h->big_lock));
+ json_decref (history);
+ GNUNET_free (credit_payto);
return MHD_NO;
}
- GNUNET_assert (0 ==
- pthread_mutex_lock (&acc->lock));
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&t->lock));
/* range is exclusive, skip the matching entry */
if (0 > ha.delta)
pos = t->prev_in;
else
pos = t->next_in;
}
- GNUNET_asprintf (&credit_payto,
- "payto://x-taler-bank/localhost/%s",
- account);
- history = json_array ();
while ( (0 != ha.delta) &&
(NULL != pos) )
{
@@ -1587,12 +1564,6 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
GNUNET_asprintf (&debit_payto,
"payto://x-taler-bank/localhost/%s",
pos->debit_account->account_name);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Returning transaction %s->%s (%s) at %llu\n",
- pos->debit_account->account_name,
- pos->credit_account->account_name,
- TALER_B2S (&pos->subject.credit.reserve_pub),
- (unsigned long long) pos->row_id);
trans = json_pack (
"{s:I, s:o, s:o, s:s, s:s, s:o}",
"row_id", (json_int_t) pos->row_id,
@@ -1602,6 +1573,7 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
"debit_account", debit_payto,
"reserve_pub", GNUNET_JSON_from_data_auto (
&pos->subject.credit.reserve_pub));
+ GNUNET_assert (NULL != trans);
GNUNET_free (debit_payto);
GNUNET_assert (0 ==
json_array_append_new (history,
@@ -1616,7 +1588,7 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
pos = pos->next_in;
}
GNUNET_assert (0 ==
- pthread_mutex_unlock (&acc->lock));
+ pthread_mutex_unlock (&h->big_lock));
GNUNET_free (credit_payto);
return TALER_MHD_reply_json_pack (connection,
MHD_HTTP_OK,
@@ -1909,7 +1881,7 @@ TALER_FAKEBANK_start2 (uint16_t port,
{
struct TALER_FAKEBANK_Handle *h;
- if (SIZE_MAX / sizeof (struct Transaction) < ram_limit)
+ if (SIZE_MAX / sizeof (struct Transaction *) < ram_limit)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"This CPU architecture does not support keeping %llu transactions in RAM\n",
@@ -1921,7 +1893,7 @@ TALER_FAKEBANK_start2 (uint16_t port,
h->port = port;
h->force_close = close_connections;
h->ram_limit = ram_limit;
- h->serial_counter = 1;
+ h->serial_counter = 0;
GNUNET_assert (0 ==
pthread_mutex_init (&h->accounts_lock,
NULL));
@@ -1931,8 +1903,11 @@ TALER_FAKEBANK_start2 (uint16_t port,
GNUNET_assert (0 ==
pthread_mutex_init (&h->uuid_map_lock,
NULL));
+ GNUNET_assert (0 ==
+ pthread_mutex_init (&h->big_lock,
+ NULL));
h->transactions
- = GNUNET_malloc_large (sizeof (struct Transaction)
+ = GNUNET_malloc_large (sizeof (struct Transaction *)
* ram_limit);
if (NULL == h->transactions)
{
@@ -1961,12 +1936,6 @@ TALER_FAKEBANK_start2 (uint16_t port,
TALER_FAKEBANK_stop (h);
return NULL;
}
- for (uint64_t i = 0; i<ram_limit; i++)
- {
- GNUNET_assert (0 ==
- pthread_mutex_init (&h->transactions[i].lock,
- NULL));
- }
h->currency = GNUNET_strdup (currency);
GNUNET_asprintf (&h->my_baseurl,
"http://localhost:%u/",
diff --git a/src/benchmark/bank-benchmark.conf b/src/benchmark/bank-benchmark.conf
index b36fa794..1b2eccaf 100644
--- a/src/benchmark/bank-benchmark.conf
+++ b/src/benchmark/bank-benchmark.conf
@@ -28,7 +28,7 @@ DB = postgres
# exchange (or the twister) is actually listening.
base_url = "http://localhost:8081/"
-WIREWATCH_IDLE_SLEEP_INTERVAL = 5 ms
+WIREWATCH_IDLE_SLEEP_INTERVAL = 1500 ms
[exchange-offline]
MASTER_PRIV_FILE = ${TALER_DATA_HOME}/exchange/offline-keys/master.priv
diff --git a/src/benchmark/taler-bank-benchmark.c b/src/benchmark/taler-bank-benchmark.c
index 81e91552..53ea1357 100644
--- a/src/benchmark/taler-bank-benchmark.c
+++ b/src/benchmark/taler-bank-benchmark.c
@@ -497,6 +497,20 @@ parallel_benchmark (void)
{
if (use_fakebank)
{
+ unsigned long long pnum;
+
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (cfg,
+ "bank",
+ "HTTP_PORT",
+ &pnum))
+ {
+ GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
+ "bank",
+ "HTTP_PORT",
+ "must be valid port number");
+ return GNUNET_SYSERR;
+ }
/* start fakebank */
fakebank = fork ();
if (0 == fakebank)
@@ -515,7 +529,33 @@ parallel_benchmark (void)
return GNUNET_SYSERR;
}
/* wait for fakebank to be ready */
- sleep (1 + history_size / 65536);
+ {
+ char *bank_url;
+ int ret;
+
+ GNUNET_asprintf (&bank_url,
+ "http://localhost:%u/",
+ (unsigned int) (uint16_t) pnum);
+ ret = TALER_TESTING_wait_httpd_ready (bank_url);
+ GNUNET_free (bank_url);
+ if (0 != ret)
+ {
+ int wstatus;
+
+ kill (fakebank,
+ SIGTERM);
+ if (fakebank !=
+ waitpid (fakebank,
+ &wstatus,
+ 0))
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
+ "waitpid");
+ }
+ fakebank = -1;
+ exit (ret);
+ }
+ }
}
else
{
@@ -587,7 +627,11 @@ parallel_benchmark (void)
(MODE_BANK == mode) )
{
printf ("Press ENTER to stop!\n");
+ if (MODE_BANK != mode)
+ duration = GNUNET_TIME_absolute_get_duration (start_time);
(void) getchar ();
+ if (MODE_BANK == mode)
+ duration = GNUNET_TIME_absolute_get_duration (start_time);
}
if ( (MODE_BANK == mode) ||
@@ -817,7 +861,7 @@ main (int argc,
/* If we're the bank, we're done now. No need to print results. */
return (GNUNET_OK == result) ? 0 : result;
}
- duration = GNUNET_TIME_absolute_get_duration (start_time);
+
if (GNUNET_OK == result)
{
struct rusage usage;
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 28fa81e7..5d35eba5 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -29,13 +29,12 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
-#define DEBUG_LOGGING 0
/**
- * What is the initial batch size we use for credit history
+ * What is the maximum batch size we use for credit history
* requests with the bank. See `batch_size` below.
*/
-#define INITIAL_BATCH_SIZE 1024
+#define MAXIMUM_BATCH_SIZE 1024
/**
* Information we keep for each supported account.
@@ -81,34 +80,48 @@ struct WireAccount
* Encoded offset in the wire transfer list from where
* to start the next query with the bank.
*/
- uint64_t last_row_off;
+ uint64_t batch_start;
/**
* Latest row offset seen in this transaction, becomes
- * the new #last_row_off upon commit.
+ * the new #batch_start upon commit.
*/
uint64_t latest_row_off;
/**
- * Offset where our current shard ends.
+ * Offset where our current shard begins (inclusive).
+ */
+ uint64_t shard_start;
+
+ /**
+ * Offset where our current shard ends (exclusive).
*/
uint64_t shard_end;
/**
+ * When did we start with the shard?
+ */
+ struct GNUNET_TIME_Absolute shard_start_time;
+
+ /**
+ * Name of our job in the shard table.
+ */
+ char *job_name;
+
+ /**
* How many transactions do we retrieve per batch?
*/
unsigned int batch_size;
/**
- * How many transactions did we see in the current batch?
+ * How much do we incremnt @e batch_size on success?
*/
- unsigned int current_batch_size;
+ unsigned int batch_increment;
/**
- * Are we running from scratch and should re-process all transactions
- * for this account?
+ * How many transactions did we see in the current batch?
*/
- bool reset_mode;
+ unsigned int current_batch_size;
/**
* Should we delay the next request to the wire plugin a bit? Set to
@@ -157,13 +170,29 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
* How long should we sleep when idle before trying to find more work?
+ * Also used for how long we wait to grab a shard before trying it again.
+ * The value should be set to a bit above the average time it takes to
+ * process a shard.
*/
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
- * Modulus to apply to group shards.
+ * How long did we take to finish the last shard?
*/
-static unsigned int shard_size = 1024;
+static struct GNUNET_TIME_Relative shard_delay;
+
+/**
+ * Modulus to apply to group shards. The shard size must ultimately be a
+ * multiple of the batch size. Thus, if this is not a multiple of the
+ * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
+ */
+static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
+
+/**
+ * How many workers should we plan our scheduling with?
+ */
+static unsigned int max_workers = 16;
+
/**
* Value to return from main(). 0 on success, non-zero on
@@ -187,11 +216,6 @@ static enum
static int test_mode;
/**
- * Are we running from scratch and should re-process all transactions?
- */
-static int reset_mode;
-
-/**
* Current task waiting for execution, if any.
*/
static struct GNUNET_SCHEDULER_Task *task;
@@ -221,6 +245,7 @@ shutdown_task (void *cls)
wa);
TALER_BANK_auth_free (&wa->auth);
GNUNET_free (wa->section_name);
+ GNUNET_free (wa->job_name);
GNUNET_free (wa);
}
}
@@ -263,7 +288,6 @@ add_account_cb (void *cls,
if (GNUNET_YES != ai->credit_enabled)
return; /* not enabled for us, skip */
wa = GNUNET_new (struct WireAccount);
- wa->reset_mode = reset_mode;
if (GNUNET_OK !=
TALER_BANK_auth_parse_cfg (cfg,
ai->section_name,
@@ -276,7 +300,12 @@ add_account_cb (void *cls,
return;
}
wa->section_name = GNUNET_strdup (ai->section_name);
- wa->batch_size = INITIAL_BATCH_SIZE;
+ GNUNET_asprintf (&wa->job_name,
+ "wirewatch-%s",
+ ai->section_name);
+ wa->batch_size = MAXIMUM_BATCH_SIZE;
+ if (0 != shard_size % wa->batch_size)
+ wa->batch_size = shard_size;
GNUNET_CONTAINER_DLL_insert (wa_head,
wa_tail,
wa);
@@ -334,6 +363,127 @@ find_transfers (void *cls);
/**
+ * We encountered a serialization error.
+ * Rollback the transaction and try again
+ *
+ * @param wa account we are transacting on
+ */
+static void
+handle_soft_error (struct WireAccount *wa)
+{
+ db_plugin->rollback (db_plugin->cls,
+ wa->session);
+ if (1 < wa->batch_size)
+ {
+ wa->batch_size /= 2;
+ wa->batch_increment = 0;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Reduced batch size to %llu due to serialization issue\n",
+ (unsigned long long) wa->batch_size);
+ }
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&find_transfers,
+ NULL);
+}
+
+
+/**
+ * We are finished with the current transaction, try
+ * to commit and then schedule the next iteration.
+ *
+ * @param wa wire account to commit for
+ */
+static void
+do_commit (struct WireAccount *wa)
+{
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (wa->shard_end <= wa->latest_row_off)
+ {
+ /* shard is complete, mark this as well */
+ qs = db_plugin->complete_shard (db_plugin->cls,
+ wa->session,
+ wa->job_name,
+ wa->shard_start,
+ wa->shard_end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ wa->session);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for complete_shard. Rolling back.\n");
+ handle_soft_error (wa);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* already existed, ok, let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
+
+ break;
+ }
+ }
+ qs = db_plugin->commit (db_plugin->cls,
+ wa->session);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* reduce transaction size to reduce rollback probability */
+ handle_soft_error (wa);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ break;
+ }
+ /* transaction success, update #last_row_off */
+ wa->batch_start = wa->latest_row_off;
+ wa->session = NULL; /* should not be needed */
+ if (wa->batch_size < MAXIMUM_BATCH_SIZE)
+ {
+ wa->batch_increment++;
+ wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
+ wa->batch_size + wa->batch_increment);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Increasing batch size to %llu\n",
+ (unsigned long long) wa->batch_size);
+ }
+ if ( (wa->delay) &&
+ (test_mode) &&
+ (NULL == wa->next) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Shutdown due to test mode!\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (wa->delay)
+ {
+ wa->delayed_until
+ = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
+ wa_pos = wa_pos->next;
+ if (NULL == wa_pos)
+ wa_pos = wa_head;
+ GNUNET_assert (NULL != wa_pos);
+ }
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
+ &find_transfers,
+ NULL);
+}
+
+
+/**
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
*
@@ -370,89 +520,38 @@ history_cb (void *cls,
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"End of list. Committing progress!\n");
- qs = db_plugin->commit (db_plugin->cls,
- session);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
- {
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_OK;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- /* reduce transaction size to reduce rollback probability */
- if (2 > wa->batch_size)
- {
- wa->batch_size /= 2;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Reduced batch size to %llu due to serialization issue\n",
- (unsigned long long) wa->batch_size);
- }
- /* try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
- return GNUNET_OK; /* will be ignored anyway */
- }
- GNUNET_break (0 <= qs);
- /* transaction success, update #last_row_off */
- wa->last_row_off = wa->latest_row_off;
- wa->latest_row_off = 0; /* should not be needed */
- wa->session = NULL; /* should not be needed */
- if (wa->batch_size < INITIAL_BATCH_SIZE)
- {
- wa->batch_size += 1;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Increasing batch size to %llu\n",
- (unsigned long long) wa->batch_size);
- }
- if ( (wa->delay) &&
- (test_mode) &&
- (NULL == wa->next) )
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Shutdown due to test mode!\n");
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_OK;
- }
- if (wa->delay)
- {
- wa->delayed_until
- = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
- wa_pos = wa_pos->next;
- if (NULL == wa_pos)
- wa_pos = wa_head;
- GNUNET_assert (NULL != wa_pos);
- }
- task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
- &find_transfers,
- NULL);
+ do_commit (wa);
return GNUNET_OK; /* will be ignored anyway */
}
+ if (serial_id < wa->latest_row_off)
+ {
+ /* we are done with the current shard, commit and stop this iteration! */
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Serial ID %llu not monotonic (got %llu before). Failing!\n",
+ (unsigned long long) serial_id,
+ (unsigned long long) wa->latest_row_off);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ GNUNET_SCHEDULER_shutdown ();
+ wa->hh = NULL;
+ return GNUNET_SYSERR;
+ }
+ if (serial_id > wa->shard_end)
+ {
+ /* we are done with the current shard, commit and stop this iteration! */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serial ID %llu past shard end at %llu, ending iteration early!\n",
+ (unsigned long long) serial_id,
+ (unsigned long long) wa->shard_end);
+ wa->latest_row_off = serial_id - 1;
+ do_commit (wa);
+ wa->hh = NULL;
+ return GNUNET_SYSERR;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding wire transfer over %s with (hashed) subject `%s'\n",
TALER_amount2s (&details->amount),
TALER_B2S (&details->reserve_pub));
-
- /**
- * Debug block.
- */
-#if DEBUG_LOGGING
- {
- /** Should be 53, give 80 just to be extra conservative (and aligned). */
-#define PUBSIZE 80
- char wtid_s[PUBSIZE];
-
- GNUNET_break (NULL !=
- GNUNET_STRINGS_data_to_string (&details->reserve_pub,
- sizeof (details->reserve_pub),
- &wtid_s[0],
- PUBSIZE));
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Plain text subject (= reserve_pub): %s\n",
- wtid_s);
- }
-#endif
-
/* FIXME-PERFORMANCE: Consider using Postgres multi-valued insert here,
for up to 15x speed-up according to
https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006
@@ -466,26 +565,27 @@ history_cb (void *cls,
details->debit_account_url,
wa->section_name,
serial_id);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ switch (qs)
{
+ case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls,
session);
GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for reserves_in_insert. Rolling back.\n");
- db_plugin->rollback (db_plugin->cls,
- session);
+ handle_soft_error (wa);
wa->hh = NULL;
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
return GNUNET_SYSERR;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* already existed, ok, let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ break;
}
wa->delay = false;
wa->latest_row_off = serial_id;
@@ -515,64 +615,77 @@ find_transfers (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
- db_plugin->preflight (db_plugin->cls,
- session);
- if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- session,
- "wirewatch check for incoming wire transfers"))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start database transaction!\n");
- global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */
wa_pos->session = session;
- if (wa_pos->shard_end == wa_pos->last_row_off)
+ if (wa_pos->shard_end <= wa_pos->batch_start)
{
+ uint64_t start;
+ uint64_t end;
+ struct GNUNET_TIME_Relative delay;
/* advance to next shard */
- // FIXME: if other processes are running in parallel,
- // update 'last_row_off' to next free shard!
- wa_pos->shard_end = wa_pos->last_row_off + shard_size;
- }
- if (! wa_pos->reset_mode)
- {
- // FIXME: need good way to fetch
- // shard data here!
- qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
- session,
- wa_pos->section_name,
- &wa_pos->last_row_off);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+
+ delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
+ GNUNET_CRYPTO_QUALITY_WEAK,
+ 4 * GNUNET_TIME_relative_max (
+ wirewatch_idle_sleep_interval,
+ GNUNET_TIME_relative_multiply (shard_delay,
+ max_workers)).rel_value_us);
+ qs = db_plugin->begin_shard (db_plugin->cls,
+ wa_pos->job_name,
+ delay,
+ shard_size,
+ &start,
+ &end);
+ switch (qs)
{
+ case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain starting point for montoring from database!\n");
- db_plugin->rollback (db_plugin->cls,
- session);
global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
- db_plugin->rollback (db_plugin->cls,
- session);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
+ task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
+ &find_transfers,
+ NULL);
return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
+ &find_transfers,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ wa_pos->shard_start_time = GNUNET_TIME_absolute_get ();
+ wa_pos->shard_start = start;
+ wa_pos->shard_end = end;
+ wa_pos->batch_start = start;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Starting with shard at %llu\n",
+ (unsigned long long) start);
+ break;
}
}
- wa_pos->reset_mode = true;
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
+ session,
+ "wirewatch check for incoming wire transfers"))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
limit = GNUNET_MIN (wa_pos->batch_size,
- wa_pos->shard_end - wa_pos->last_row_off);
+ wa_pos->shard_end - wa_pos->batch_start);
GNUNET_assert (NULL == wa_pos->hh);
+ wa_pos->latest_row_off = wa_pos->batch_start;
wa_pos->hh = TALER_BANK_credit_history (ctx,
&wa_pos->auth,
- wa_pos->last_row_off,
+ wa_pos->batch_start,
limit,
&history_cb,
wa_pos);
@@ -644,10 +757,6 @@ main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
- GNUNET_GETOPT_option_flag ('r',
- "reset",
- "start fresh with all transactions in the history",
- &reset_mode),
GNUNET_GETOPT_option_uint ('S',
"size",
"SIZE",
@@ -659,6 +768,11 @@ main (int argc,
"test",
"run in test mode and exit when idle",
&test_mode),
+ GNUNET_GETOPT_option_uint ('w',
+ "workers",
+ "COUNT",
+ "Plan work load with up to COUNT worker processes (default: 16)",
+ &max_workers),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;
diff --git a/src/exchangedb/drop0002.sql b/src/exchangedb/drop0002.sql
index 03870e63..5bffab66 100644
--- a/src/exchangedb/drop0002.sql
+++ b/src/exchangedb/drop0002.sql
@@ -27,6 +27,7 @@ DROP TABLE IF EXISTS auditor_denom_sigs CASCADE;
DROP TABLE IF EXISTS exchange_sign_keys CASCADE;
DROP TABLE IF EXISTS wire_accounts CASCADE;
DROP TABLE IF EXISTS signkey_revocations CASCADE;
+DROP TABLE IF EXISTS work_shards CASCADE;
-- And we're out of here...
COMMIT;
diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001.sql
index 1f7e005e..55d3d07d 100644
--- a/src/exchangedb/exchange-0001.sql
+++ b/src/exchangedb/exchange-0001.sql
@@ -389,6 +389,9 @@ COMMENT ON TABLE recoup
COMMENT ON COLUMN recoup.coin_pub
IS 'Do not CASCADE ON DROP on the coin_pub, as we may keep the coin alive!';
+-- Note: this first index is redundant;
+-- It is implicitly removed by the exchange-0002.sql
+-- schema changes.
CREATE INDEX IF NOT EXISTS recoup_by_coin_index
ON recoup
(coin_pub);
@@ -415,6 +418,8 @@ CREATE TABLE IF NOT EXISTS recoup_refresh
COMMENT ON COLUMN recoup_refresh.coin_pub
IS 'Do not CASCADE ON DROP on the coin_pub, as we may keep the coin alive!';
+-- Note: this index is redundant; implicitly removed
+-- by the exchange-0002.sql update!
CREATE INDEX IF NOT EXISTS recoup_refresh_by_coin_index
ON recoup_refresh
(coin_pub);
diff --git a/src/exchangedb/exchange-0002.sql b/src/exchangedb/exchange-0002.sql
index 361b69b8..175ffb39 100644
--- a/src/exchangedb/exchange-0002.sql
+++ b/src/exchangedb/exchange-0002.sql
@@ -80,7 +80,9 @@ UPDATE reserves_in
ALTER TABLE reserves_in
ALTER COLUMN reserve_uuid SET NOT NULL;
ALTER TABLE reserves_in
- DROP COLUMN reserve_pub;
+ DROP COLUMN reserve_pub,
+ ADD CONSTRAINT unique_in PRIMARY KEY (reserve_uuid, wire_reference);
+
ALTER TABLE reserves_out
ADD COLUMN reserve_uuid INT8 REFERENCES reserves (reserve_uuid) ON DELETE CASCADE;
UPDATE reserves_out
@@ -93,6 +95,12 @@ ALTER TABLE reserves_out
DROP COLUMN reserve_pub;
ALTER TABLE reserves_close
ADD COLUMN reserve_uuid INT8 REFERENCES reserves (reserve_uuid) ON DELETE CASCADE;
+CREATE INDEX IF NOT EXISTS reserves_out_reserve_uuid_index
+ ON reserves_out
+ (reserve_uuid);
+COMMENT ON INDEX reserves_out_reserve_uuid_index
+ IS 'for get_reserves_out';
+
UPDATE reserves_close
SET reserve_uuid=r.reserve_uuid
FROM reserves_close rclose
@@ -101,6 +109,11 @@ ALTER TABLE reserves_close
ALTER COLUMN reserve_uuid SET NOT NULL;
ALTER TABLE reserves_close
DROP COLUMN reserve_pub;
+CREATE INDEX IF NOT EXISTS reserves_close_by_uuid
+ ON reserves_close
+ (reserve_uuid);
+
+
-- change all foreign keys using 'denom_pub_hash' to using 'denominations_serial' instead
ALTER TABLE reserves_out
@@ -113,6 +126,11 @@ ALTER TABLE reserves_out
ALTER COLUMN denominations_serial SET NOT NULL;
ALTER TABLE reserves_out
DROP COLUMN denom_pub_hash;
+CREATE INDEX IF NOT EXISTS reserves_out_for_get_withdraw_info
+ ON reserves_out
+ (denominations_serial
+ ,h_blind_ev
+ );
ALTER TABLE known_coins
ADD COLUMN denominations_serial INT8 REFERENCES denominations (denominations_serial) ON DELETE CASCADE;
@@ -124,6 +142,9 @@ ALTER TABLE known_coins
ALTER COLUMN denominations_serial SET NOT NULL;
ALTER TABLE known_coins
DROP COLUMN denom_pub_hash;
+CREATE INDEX IF NOT EXISTS known_coins_by_denomination
+ ON known_coins
+ (denominations_serial);
ALTER TABLE denomination_revocations
ADD COLUMN denominations_serial INT8 REFERENCES denominations (denominations_serial) ON DELETE CASCADE;
@@ -137,6 +158,9 @@ ALTER TABLE denomination_revocations
DROP COLUMN denom_pub_hash;
ALTER TABLE denomination_revocations
ADD CONSTRAINT denominations_serial_pk PRIMARY KEY (denominations_serial);
+CREATE INDEX IF NOT EXISTS denomination_revocations_by_denomination
+ ON denomination_revocations
+ (denominations_serial);
ALTER TABLE refresh_revealed_coins
ADD COLUMN denominations_serial INT8 REFERENCES denominations (denominations_serial) ON DELETE CASCADE;
@@ -148,6 +172,9 @@ ALTER TABLE refresh_revealed_coins
ALTER COLUMN denominations_serial SET NOT NULL;
ALTER TABLE refresh_revealed_coins
DROP COLUMN denom_pub_hash;
+CREATE INDEX IF NOT EXISTS refresh_revealed_coins_denominations_index
+ ON refresh_revealed_coins
+ (denominations_serial);
-- Change all foreign keys involving 'coin_pub' to use known_coin_id instead.
ALTER TABLE recoup_refresh
@@ -161,6 +188,7 @@ ALTER TABLE recoup_refresh
ALTER TABLE recoup_refresh
DROP COLUMN coin_pub;
+
ALTER TABLE recoup
ADD COLUMN known_coin_id INT8 REFERENCES known_coins (known_coin_id) ON DELETE CASCADE;
UPDATE recoup
@@ -172,6 +200,7 @@ ALTER TABLE recoup
ALTER TABLE recoup
DROP COLUMN coin_pub;
+
ALTER TABLE refresh_commitments
ADD COLUMN old_known_coin_id INT8 REFERENCES known_coins (known_coin_id) ON DELETE CASCADE;
UPDATE refresh_commitments
@@ -182,6 +211,10 @@ ALTER TABLE refresh_commitments
ALTER COLUMN old_known_coin_id SET NOT NULL;
ALTER TABLE refresh_commitments
DROP COLUMN old_coin_pub;
+CREATE INDEX IF NOT EXISTS refresh_commitments_old_coin_pub_index
+ ON refresh_commitments
+ (old_known_coin_id);
+
ALTER TABLE deposits
ADD COLUMN known_coin_id INT8 REFERENCES known_coins (known_coin_id) ON DELETE CASCADE;
@@ -190,7 +223,8 @@ UPDATE deposits
FROM deposits o
INNER JOIN known_coins d USING(coin_pub);
ALTER TABLE deposits
- ALTER COLUMN known_coin_id SET NOT NULL;
+ ALTER COLUMN known_coin_id SET NOT NULL,
+ ADD CONSTRAINT deposit_unique UNIQUE (known_coin_id, merchant_pub, h_contract_terms);
ALTER TABLE deposits
DROP COLUMN coin_pub;
@@ -216,6 +250,16 @@ ALTER TABLE recoup
ALTER COLUMN reserve_out_serial_id SET NOT NULL;
ALTER TABLE recoup
DROP COLUMN h_blind_ev;
+CREATE INDEX IF NOT EXISTS recoup_by_h_blind_ev
+ ON recoup
+ (reserve_out_serial_id);
+CREATE INDEX IF NOT EXISTS recoup_for_by_reserve
+ ON recoup
+ (known_coin_id
+ ,reserve_out_serial_id
+ );
+
+
COMMENT ON COLUMN recoup.reserve_out_serial_id
IS 'Identifies the h_blind_ev of the recouped coin.';
@@ -228,11 +272,20 @@ UPDATE recoup_refresh
FROM recoup_refresh o
INNER JOIN refresh_revealed_coins d ON (d.h_coin_ev = o.h_blind_ev);
ALTER TABLE recoup_refresh
- ALTER COLUMN rrc_serial SET NOT NULL;
+ ALTER COLUMN rrc_serial SET NOT NULL,
+ ADD CONSTRAINT recoup_unique UNIQUE (rrc_serial);
ALTER TABLE recoup_refresh
DROP COLUMN h_blind_ev;
COMMENT ON COLUMN recoup_refresh.rrc_serial
IS 'Identifies the h_blind_ev of the recouped coin (as h_coin_ev).';
+CREATE INDEX IF NOT EXISTS recoup_refresh_by_h_blind_ev
+ ON recoup_refresh
+ (rrc_serial);
+CREATE INDEX IF NOT EXISTS recoup_refresh_for_by_reserve
+ ON recoup_refresh
+ (known_coin_id
+ ,rrc_serial
+ );
-- Change 'rc' in refresh_transfer_keys and refresh_revealed_coins tables to 'melt_serial_id'
@@ -248,6 +301,14 @@ ALTER TABLE refresh_transfer_keys
DROP COLUMN rc;
COMMENT ON COLUMN refresh_transfer_keys.melt_serial_id
IS 'Identifies the refresh commitment (rc) of the operation.';
+CREATE INDEX IF NOT EXISTS refresh_transfer_keys_coin_tpub
+ ON refresh_transfer_keys
+ (melt_serial_id
+ ,transfer_pub
+ );
+COMMENT ON INDEX refresh_transfer_keys_coin_tpub
+ IS 'for get_link (unsure if this helps or hurts for performance as there should be very few transfer public keys per rc, but at least in theory this helps the ORDER BY clause)';
+
ALTER TABLE refresh_revealed_coins
ADD COLUMN melt_serial_id INT8 REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE;
@@ -280,6 +341,8 @@ ALTER TABLE refunds
DROP COLUMN merchant_pub,
DROP COLUMN h_contract_terms,
DROP COLUMN known_coin_id;
+ALTER TABLE refunds
+ ADD CONSTRAINT refunds_primary_key PRIMARY KEY (deposit_serial_id, rtransaction_id);
COMMENT ON COLUMN refunds.deposit_serial_id
IS 'Identifies ONLY the merchant_pub, h_contract_terms and known_coin_id. Multiple deposits may match a refund, this only identifies one of them.';
@@ -380,7 +443,7 @@ CREATE TABLE IF NOT EXISTS work_shards
,last_attempt INT8 NOT NULL
,start_row INT8 NOT NULL
,end_row INT8 NOT NULL
- ,completed BOOLEAN NOT NULL
+ ,completed BOOLEAN NOT NULL DEFAULT FALSE
,job_name VARCHAR NOT NULL
,PRIMARY KEY (job_name, start_row)
);
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index e61a1ac7..1ab5ff3e 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -424,7 +424,8 @@ postgres_get_session (void *cls)
",gc_date"
" FROM reserves"
" WHERE reserve_pub=$1"
- " LIMIT 1;",
+ " LIMIT 1"
+ ";", // FOR UPDATE;", // FIXME: helpful?
1),
/* Used in #postgres_reserves_in_insert() when the reserve is new */
GNUNET_PQ_make_prepare ("reserve_create",
@@ -2463,7 +2464,6 @@ postgres_get_session (void *cls)
" end_row"
" FROM work_shards"
" WHERE job_name=$1"
- " AND completed=FALSE"
" ORDER BY end_row DESC"
" LIMIT 1;",
1),
@@ -3529,7 +3529,8 @@ postgres_reserves_in_insert (void *cls,
balance; we do this after checking for duplication, as
otherwise we might have to actually pay the cost to roll this
back for duplicate transactions; like this, we should virtually
- never actually have to rollback anything. */struct TALER_EXCHANGEDB_Reserve updated_reserve;
+ never actually have to rollback anything. */
+ struct TALER_EXCHANGEDB_Reserve updated_reserve;
updated_reserve.pub = reserve.pub;
if (0 >
@@ -10356,6 +10357,10 @@ postgres_begin_shard (void *cls,
};
now = GNUNET_TIME_absolute_get ();
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Trying to claim shard %llu-%llu\n",
+ (unsigned long long) *start_row,
+ (unsigned long long) *end_row);
qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
"claim_next_shard",
params);
@@ -10374,7 +10379,8 @@ postgres_begin_shard (void *cls,
/* continued below */
break;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- GNUNET_break (0);
+ /* someone else got this shard already,
+ try again */
postgres_rollback (cls,
session);
continue;
@@ -10434,6 +10440,10 @@ postgres_complete_shard (void *cls,
};
(void) cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Completing shard %llu-%llu\n",
+ (unsigned long long) start_row,
+ (unsigned long long) end_row);
return GNUNET_PQ_eval_prepared_non_select (session->conn,
"complete_shard",
params);
diff --git a/src/include/taler_testing_lib.h b/src/include/taler_testing_lib.h
index b35e393d..57bf7406 100644
--- a/src/include/taler_testing_lib.h
+++ b/src/include/taler_testing_lib.h
@@ -173,6 +173,18 @@ TALER_TESTING_wait_exchange_ready (const char *base_url);
/**
+ * Wait for an HTTPD service to have started. Waits for at
+ * most 10s, after that returns 77 to indicate an error.
+ *
+ * @param base_url what URL should we expect the exchange
+ * to be running at
+ * @return 0 on success
+ */
+int
+TALER_TESTING_wait_httpd_ready (const char *base_url);
+
+
+/**
* Wait for the auditor to have started. Waits for at
* most 10s, after that returns 77 to indicate an error.
*
diff --git a/src/testing/testing_api_helpers_exchange.c b/src/testing/testing_api_helpers_exchange.c
index 735abcc1..3fa2d416 100644
--- a/src/testing/testing_api_helpers_exchange.c
+++ b/src/testing/testing_api_helpers_exchange.c
@@ -446,14 +446,6 @@ TALER_TESTING_find_pk (const struct TALER_EXCHANGE_Keys *keys,
}
-/**
- * Wait for the exchange to have started. Waits for at
- * most 10s, after that returns 77 to indicate an error.
- *
- * @param base_url what URL should we expect the exchange
- * to be running at
- * @return 0 on success
- */
int
TALER_TESTING_wait_exchange_ready (const char *base_url)
{
@@ -464,20 +456,51 @@ TALER_TESTING_wait_exchange_ready (const char *base_url)
"wget -q -t 1 -T 1 %sseed -o /dev/null -O /dev/null",
base_url); // make sure ends with '/'
/* give child time to start and bind against the socket */
- fprintf (stderr,
- "Waiting for `taler-exchange-httpd' to be ready (check with: %s)\n",
- wget_cmd);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Waiting for `taler-exchange-httpd` service to be ready (check with: %s)\n",
+ wget_cmd);
iter = 0;
do
{
if (10 == iter)
{
- fprintf (stderr,
- "Failed to launch `taler-exchange-httpd' (or `wget')\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to launch `taler-exchange-httpd` service (or `wget')\n");
+ GNUNET_free (wget_cmd);
+ return 77;
+ }
+ sleep (1);
+ iter++;
+ }
+ while (0 != system (wget_cmd));
+ GNUNET_free (wget_cmd);
+ return 0;
+}
+
+
+int
+TALER_TESTING_wait_httpd_ready (const char *base_url)
+{
+ char *wget_cmd;
+ unsigned int iter;
+
+ GNUNET_asprintf (&wget_cmd,
+ "wget -q -t 1 -T 1 %s -o /dev/null -O /dev/null",
+ base_url); // make sure ends with '/'
+ /* give child time to start and bind against the socket */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Waiting for HTTP service to be ready (check with: %s)\n",
+ wget_cmd);
+ iter = 0;
+ do
+ {
+ if (10 == iter)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to launch HTTP service (or `wget')\n");
GNUNET_free (wget_cmd);
return 77;
}
- fprintf (stderr, ".\n");
sleep (1);
iter++;
}