summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2022-05-22 13:48:56 +0200
committerChristian Grothoff <christian@grothoff.org>2022-05-22 13:48:56 +0200
commit21bcc5fa0bb4e2c101fc71d5740934d5914eb480 (patch)
tree73c83681219894b607b97b8133331ecd0462e04f
parent3233195d2d6c4733e6c98e754c54902f9c6d657c (diff)
downloadexchange-21bcc5fa0bb4e2c101fc71d5740934d5914eb480.tar.gz
exchange-21bcc5fa0bb4e2c101fc71d5740934d5914eb480.tar.bz2
exchange-21bcc5fa0bb4e2c101fc71d5740934d5914eb480.zip
-fix wirewatch assertion
-rw-r--r--src/benchmark/taler-bank-benchmark.c125
-rw-r--r--src/exchange/taler-exchange-wirewatch.c40
-rw-r--r--src/testing/testing_api_loop.c5
3 files changed, 124 insertions, 46 deletions
diff --git a/src/benchmark/taler-bank-benchmark.c b/src/benchmark/taler-bank-benchmark.c
index 4d7dbe35e..75a7434d2 100644
--- a/src/benchmark/taler-bank-benchmark.c
+++ b/src/benchmark/taler-bank-benchmark.c
@@ -111,9 +111,9 @@ static char *cfg_filename;
static int use_fakebank = 1;
/**
- * Launch taler-exchange-wirewatch.
+ * Number of taler-exchange-wirewatchers to launch.
*/
-static int start_wirewatch;
+static unsigned int start_wirewatch;
/**
* Verbosity level.
@@ -265,8 +265,9 @@ run (void *cls,
(void) cls;
len = howmany_reserves + 2;
- all_commands = GNUNET_new_array (len,
- struct TALER_TESTING_Command);
+ all_commands = GNUNET_malloc_large (len
+ * sizeof (struct TALER_TESTING_Command));
+ GNUNET_assert (NULL != all_commands);
GNUNET_asprintf (&total_reserve_amount,
"%s:5",
currency);
@@ -465,14 +466,17 @@ launch_fakebank (void *cls)
*
* @return #GNUNET_OK on success
*/
-static int
+static enum GNUNET_GenericReturnValue
parallel_benchmark (void)
{
enum GNUNET_GenericReturnValue result = GNUNET_OK;
pid_t fakebank = -1;
struct GNUNET_OS_Process *bankd = NULL;
- struct GNUNET_OS_Process *wirewatch = NULL;
+ struct GNUNET_OS_Process *wirewatch[GNUNET_NZL (start_wirewatch)];
+ memset (wirewatch,
+ 0,
+ sizeof (wirewatch));
if ( (MODE_BANK == mode) ||
(MODE_BOTH == mode) )
{
@@ -560,19 +564,30 @@ parallel_benchmark (void)
GNUNET_OS_process_wait (dbinit));
GNUNET_OS_process_destroy (dbinit);
}
- if (start_wirewatch)
+ /* start exchange wirewatch */
+ for (unsigned int w = 0; w<start_wirewatch; w++)
{
- /* start exchange wirewatch */
- wirewatch = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL,
- NULL, NULL, NULL,
- "taler-exchange-wirewatch",
- "taler-exchange-wirewatch",
- "-c", cfg_filename,
- NULL);
- if (NULL == wirewatch)
+ wirewatch[w] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL,
+ NULL, NULL, NULL,
+ "taler-exchange-wirewatch",
+ "taler-exchange-wirewatch",
+ "-c", cfg_filename,
+ "-L", loglev,
+ NULL);
+ if (NULL == wirewatch[w])
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to launch wirewatch, aborting benchmark\n");
+ for (unsigned int x = 0; x<w; x++)
+ {
+ GNUNET_break (0 ==
+ GNUNET_OS_process_kill (wirewatch[x],
+ SIGTERM));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_OS_process_wait (wirewatch[x]));
+ GNUNET_OS_process_destroy (wirewatch[x]);
+ wirewatch[x] = NULL;
+ }
if (-1 != fakebank)
{
int wstatus;
@@ -618,17 +633,61 @@ parallel_benchmark (void)
if ( (MODE_BANK == mode) ||
(MODE_BOTH == mode) )
{
- if (NULL != wirewatch)
+ /* Ensure wirewatch runs to completion! */
+ if (0 != start_wirewatch)
{
- /* stop wirewatch */
+ /* replace ONE of the wirewatchers with one that is in test-mode */
GNUNET_break (0 ==
- GNUNET_OS_process_kill (wirewatch,
+ GNUNET_OS_process_kill (wirewatch[0],
SIGTERM));
GNUNET_break (GNUNET_OK ==
- GNUNET_OS_process_wait (wirewatch));
- GNUNET_OS_process_destroy (wirewatch);
- wirewatch = NULL;
+ GNUNET_OS_process_wait (wirewatch[0]));
+ GNUNET_OS_process_destroy (wirewatch[0]);
+ wirewatch[0] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL,
+ NULL, NULL, NULL,
+ "taler-exchange-wirewatch",
+ "taler-exchange-wirewatch",
+ "-c", cfg_filename,
+ "-L", loglev,
+ "-t",
+ NULL);
+ /* wait for it to finish! */
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_OS_process_wait (wirewatch[0]));
+ GNUNET_OS_process_destroy (wirewatch[0]);
+ wirewatch[0] = NULL;
+ /* Then stop the rest, which should basically also be finished */
+ for (unsigned int w = 1; w<start_wirewatch; w++)
+ {
+ GNUNET_break (0 ==
+ GNUNET_OS_process_kill (wirewatch[w],
+ SIGTERM));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_OS_process_wait (wirewatch[w]));
+ GNUNET_OS_process_destroy (wirewatch[w]);
+ }
+
+ /* But be extra sure we did finish all shards by doing one more */
+ wirewatch[0] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL,
+ NULL, NULL, NULL,
+ "taler-exchange-wirewatch",
+ "taler-exchange-wirewatch",
+ "-c", cfg_filename,
+ "-L", loglev,
+ "-t",
+ NULL);
+ /* wait for it to finish! */
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_OS_process_wait (wirewatch[0]));
+ GNUNET_OS_process_destroy (wirewatch[0]);
+ wirewatch[0] = NULL;
}
+
+ /* Now stop the time, if this was the right mode */
+ if ( (GNUNET_YES != linger) &&
+ (MODE_BANK != mode) )
+ duration = GNUNET_TIME_absolute_get_duration (start_time);
+
/* stop fakebank */
if (-1 != fakebank)
{
@@ -727,9 +786,10 @@ main (int argc,
&history_size),
GNUNET_GETOPT_option_version (PACKAGE_VERSION " " VCS_VERSION),
GNUNET_GETOPT_option_verbose (&verbose),
- GNUNET_GETOPT_option_flag ('w',
+ GNUNET_GETOPT_option_uint ('w',
"wirewatch",
- "run taler-exchange-wirewatch",
+ "NPROC",
+ "run NPROC taler-exchange-wirewatch processes",
&start_wirewatch),
GNUNET_GETOPT_OPTION_END
};
@@ -858,14 +918,17 @@ main (int argc,
howmany_clients,
GNUNET_STRINGS_relative_time_to_string (duration,
GNUNET_YES));
- tps = ((unsigned long long) howmany_reserves) * howmany_clients * 1000LLU
- / (duration.rel_value_us / 1000LL);
- fprintf (stdout,
- "RAW: %04u %04u %16llu (%llu TPS)\n",
- howmany_reserves,
- howmany_clients,
- (unsigned long long) duration.rel_value_us,
- tps);
+ if (! GNUNET_TIME_relative_is_zero (duration))
+ {
+ tps = ((unsigned long long) howmany_reserves) * howmany_clients * 1000LLU
+ / (duration.rel_value_us / 1000LL);
+ fprintf (stdout,
+ "RAW: %04u %04u %16llu (%llu TPS)\n",
+ howmany_reserves,
+ howmany_clients,
+ (unsigned long long) duration.rel_value_us,
+ tps);
+ }
fprintf (stdout,
"CPU time: sys %llu user %llu\n", \
(unsigned long long) (usage.ru_stime.tv_sec * 1000 * 1000
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 21d2df150..7cc4ac382 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -384,10 +384,10 @@ handle_soft_error (struct WireAccount *wa)
"Reduced batch size to %llu due to serialization issue\n",
(unsigned long long) wa->batch_size);
}
- GNUNET_assert (NULL == task);
/* Reset to beginning of transaction, and go again
from there. */
wa->latest_row_off = wa->batch_start;
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
wa);
}
@@ -458,6 +458,7 @@ account_completed (struct WireAccount *wa)
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa = wa->next;
}
+ GNUNET_assert (NULL == task);
schedule_transfers (wa);
}
@@ -533,6 +534,7 @@ do_commit (struct WireAccount *wa)
enum GNUNET_DB_QueryStatus qs;
bool shard_done;
+ GNUNET_assert (NULL == task);
shard_done = check_shard_done (wa);
wa->started_transaction = false;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -563,7 +565,8 @@ do_commit (struct WireAccount *wa)
if (shard_done)
account_completed (wa);
else
- continue_with_shard (wa);
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ wa);
}
@@ -591,6 +594,7 @@ history_cb (void *cls,
enum GNUNET_DB_QueryStatus qs;
(void) json;
+ GNUNET_assert (NULL == task);
if (NULL == details)
{
wa->hh = NULL;
@@ -660,14 +664,17 @@ history_cb (void *cls,
wa->hh = NULL;
if (wa->started_transaction)
{
+ GNUNET_assert (NULL == task);
do_commit (wa);
}
else
{
+ GNUNET_assert (NULL == task);
if (check_shard_done (wa))
account_completed (wa);
else
- continue_with_shard (wa);
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ wa);
}
return GNUNET_SYSERR;
}
@@ -746,6 +753,7 @@ continue_with_shard (void *cls)
struct WireAccount *wa = cls;
unsigned int limit;
+ task = NULL;
limit = GNUNET_MIN (wa->batch_size,
wa->shard_end - wa->latest_row_off);
wa->max_row_off = wa->latest_row_off + limit;
@@ -816,15 +824,18 @@ lock_shard (void *cls)
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
- GNUNET_break (0);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Serialization error tying to obtain shard %s, will try again in %s!\n",
- wa->job_name,
- GNUNET_STRINGS_relative_time_to_string (
- wirewatch_idle_sleep_interval,
- GNUNET_YES));
- wa->delayed_until = GNUNET_TIME_relative_to_absolute (
- wirewatch_idle_sleep_interval);
+ {
+ struct GNUNET_TIME_Relative rdelay;
+
+ rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization error tying to obtain shard %s, will try again in %s!\n",
+ wa->job_name,
+ GNUNET_STRINGS_relative_time_to_string (rdelay,
+ GNUNET_YES));
+ wa->delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
+ }
+ GNUNET_assert (NULL == task);
schedule_transfers (wa->next);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
@@ -837,6 +848,7 @@ lock_shard (void *cls)
GNUNET_YES));
wa->delayed_until = GNUNET_TIME_relative_to_absolute (
wirewatch_idle_sleep_interval);
+ GNUNET_assert (NULL == task);
schedule_transfers (wa->next);
return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
@@ -854,7 +866,8 @@ lock_shard (void *cls)
we find out that we're really busy */
wa->batch_start = wa->shard_start;
wa->latest_row_off = wa->batch_start;
- continue_with_shard (wa);
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ wa);
}
@@ -894,6 +907,7 @@ run (void *cls,
return;
}
rc = GNUNET_CURL_gnunet_rc_create (ctx);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&lock_shard,
wa_head);
}
diff --git a/src/testing/testing_api_loop.c b/src/testing/testing_api_loop.c
index 1ea1d5a26..190e20928 100644
--- a/src/testing/testing_api_loop.c
+++ b/src/testing/testing_api_loop.c
@@ -449,8 +449,9 @@ TALER_TESTING_run2 (struct TALER_TESTING_Interpreter *is,
/* get the number of commands */
for (i = 0; NULL != commands[i].label; i++)
;
- is->commands = GNUNET_new_array (i + 1,
- struct TALER_TESTING_Command);
+ is->commands = GNUNET_malloc_large ( (i + 1)
+ * sizeof (struct TALER_TESTING_Command));
+ GNUNET_assert (NULL != is->commands);
memcpy (is->commands,
commands,
sizeof (struct TALER_TESTING_Command) * i);