diff options
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 85 |
1 files changed, 50 insertions, 35 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 40b962f8a..760dbe10b 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of TALER | 2 | This file is part of TALER |
3 | Copyright (C) 2016--2020 Taler Systems SA | 3 | Copyright (C) 2016--2021 Taler Systems SA |
4 | 4 | ||
5 | TALER is free software; you can redistribute it and/or modify it under the | 5 | TALER is free software; you can redistribute it and/or modify it under the |
6 | terms of the GNU Affero General Public License as published by the Free Software | 6 | terms of the GNU Affero General Public License as published by the Free Software |
@@ -90,6 +90,11 @@ struct WireAccount | |||
90 | uint64_t latest_row_off; | 90 | uint64_t latest_row_off; |
91 | 91 | ||
92 | /** | 92 | /** |
93 | * Offset where our current shard ends. | ||
94 | */ | ||
95 | uint64_t shard_end; | ||
96 | |||
97 | /** | ||
93 | * How many transactions do we retrieve per batch? | 98 | * How many transactions do we retrieve per batch? |
94 | */ | 99 | */ |
95 | unsigned int batch_size; | 100 | unsigned int batch_size; |
@@ -103,19 +108,14 @@ struct WireAccount | |||
103 | * Are we running from scratch and should re-process all transactions | 108 | * Are we running from scratch and should re-process all transactions |
104 | * for this account? | 109 | * for this account? |
105 | */ | 110 | */ |
106 | int reset_mode; | 111 | bool reset_mode; |
107 | 112 | ||
108 | /** | 113 | /** |
109 | * Should we delay the next request to the wire plugin a bit? Set to | 114 | * Should we delay the next request to the wire plugin a bit? Set to |
110 | * #GNUNET_NO if we actually did some work. | 115 | * false if we actually did some work. |
111 | */ | 116 | */ |
112 | int delay; | 117 | bool delay; |
113 | 118 | ||
114 | /** | ||
115 | * Did we experience a soft failure during the current | ||
116 | * transaction? | ||
117 | */ | ||
118 | bool soft_fail; | ||
119 | }; | 119 | }; |
120 | 120 | ||
121 | 121 | ||
@@ -161,6 +161,11 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; | |||
161 | static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; | 161 | static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; |
162 | 162 | ||
163 | /** | 163 | /** |
164 | * Modulus to apply to group shards. | ||
165 | */ | ||
166 | static unsigned int shard_size = 1024; | ||
167 | |||
168 | /** | ||
164 | * Value to return from main(). 0 on success, non-zero on | 169 | * Value to return from main(). 0 on success, non-zero on |
165 | * on serious errors. | 170 | * on serious errors. |
166 | */ | 171 | */ |
@@ -363,20 +368,10 @@ history_cb (void *cls, | |||
363 | (unsigned int) ec, | 368 | (unsigned int) ec, |
364 | http_status); | 369 | http_status); |
365 | } | 370 | } |
366 | if (wa->soft_fail) | 371 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
367 | { | 372 | "End of list. Committing progress!\n"); |
368 | /* no point to commit, transaction was already rolled | 373 | qs = db_plugin->commit (db_plugin->cls, |
369 | back after we encountered a soft failure */ | 374 | session); |
370 | wa->soft_fail = false; | ||
371 | qs = GNUNET_DB_STATUS_SOFT_ERROR; | ||
372 | } | ||
373 | else | ||
374 | { | ||
375 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
376 | "End of list. Committing progress!\n"); | ||
377 | qs = db_plugin->commit (db_plugin->cls, | ||
378 | session); | ||
379 | } | ||
380 | if (GNUNET_DB_STATUS_HARD_ERROR == qs) | 375 | if (GNUNET_DB_STATUS_HARD_ERROR == qs) |
381 | { | 376 | { |
382 | GNUNET_SCHEDULER_shutdown (); | 377 | GNUNET_SCHEDULER_shutdown (); |
@@ -410,7 +405,7 @@ history_cb (void *cls, | |||
410 | "Increasing batch size to %llu\n", | 405 | "Increasing batch size to %llu\n", |
411 | (unsigned long long) wa->batch_size); | 406 | (unsigned long long) wa->batch_size); |
412 | } | 407 | } |
413 | if ( (GNUNET_YES == wa->delay) && | 408 | if ( (wa->delay) && |
414 | (test_mode) && | 409 | (test_mode) && |
415 | (NULL == wa->next) ) | 410 | (NULL == wa->next) ) |
416 | { | 411 | { |
@@ -419,7 +414,7 @@ history_cb (void *cls, | |||
419 | GNUNET_SCHEDULER_shutdown (); | 414 | GNUNET_SCHEDULER_shutdown (); |
420 | return GNUNET_OK; | 415 | return GNUNET_OK; |
421 | } | 416 | } |
422 | if (GNUNET_YES == wa->delay) | 417 | if (wa->delay) |
423 | { | 418 | { |
424 | wa->delayed_until | 419 | wa->delayed_until |
425 | = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); | 420 | = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); |
@@ -477,6 +472,7 @@ history_cb (void *cls, | |||
477 | db_plugin->rollback (db_plugin->cls, | 472 | db_plugin->rollback (db_plugin->cls, |
478 | session); | 473 | session); |
479 | GNUNET_SCHEDULER_shutdown (); | 474 | GNUNET_SCHEDULER_shutdown (); |
475 | wa->hh = NULL; | ||
480 | return GNUNET_SYSERR; | 476 | return GNUNET_SYSERR; |
481 | } | 477 | } |
482 | if (GNUNET_DB_STATUS_SOFT_ERROR == qs) | 478 | if (GNUNET_DB_STATUS_SOFT_ERROR == qs) |
@@ -485,10 +481,13 @@ history_cb (void *cls, | |||
485 | "Got DB soft error for reserves_in_insert. Rolling back.\n"); | 481 | "Got DB soft error for reserves_in_insert. Rolling back.\n"); |
486 | db_plugin->rollback (db_plugin->cls, | 482 | db_plugin->rollback (db_plugin->cls, |
487 | session); | 483 | session); |
488 | wa->soft_fail = true; | 484 | wa->hh = NULL; |
485 | GNUNET_assert (NULL == task); | ||
486 | task = GNUNET_SCHEDULER_add_now (&find_transfers, | ||
487 | NULL); | ||
489 | return GNUNET_SYSERR; | 488 | return GNUNET_SYSERR; |
490 | } | 489 | } |
491 | wa->delay = GNUNET_NO; | 490 | wa->delay = false; |
492 | wa->latest_row_off = serial_id; | 491 | wa->latest_row_off = serial_id; |
493 | return GNUNET_OK; | 492 | return GNUNET_OK; |
494 | } | 493 | } |
@@ -504,6 +503,7 @@ find_transfers (void *cls) | |||
504 | { | 503 | { |
505 | struct TALER_EXCHANGEDB_Session *session; | 504 | struct TALER_EXCHANGEDB_Session *session; |
506 | enum GNUNET_DB_QueryStatus qs; | 505 | enum GNUNET_DB_QueryStatus qs; |
506 | unsigned int limit; | ||
507 | 507 | ||
508 | (void) cls; | 508 | (void) cls; |
509 | task = NULL; | 509 | task = NULL; |
@@ -555,13 +555,21 @@ find_transfers (void *cls) | |||
555 | } | 555 | } |
556 | wa_pos->reset_mode = GNUNET_NO; | 556 | wa_pos->reset_mode = GNUNET_NO; |
557 | } | 557 | } |
558 | wa_pos->delay = GNUNET_YES; | 558 | wa_pos->delay = true; |
559 | wa_pos->current_batch_size = 0; /* reset counter */ | 559 | wa_pos->current_batch_size = 0; /* reset counter */ |
560 | wa_pos->session = session; | 560 | wa_pos->session = session; |
561 | if (wa_pos->shard_end == wa_pos->last_row_off) | ||
562 | { | ||
563 | /* advance to next shard */ | ||
564 | wa_pos->shard_end += shard_size; | ||
565 | } | ||
566 | limit = GNUNET_MIN (wa_pos->batch_size, | ||
567 | wa_pos->shard_end - wa_pos->last_row_off); | ||
568 | GNUNET_assert (NULL == wa_pos->hh); | ||
561 | wa_pos->hh = TALER_BANK_credit_history (ctx, | 569 | wa_pos->hh = TALER_BANK_credit_history (ctx, |
562 | &wa_pos->auth, | 570 | &wa_pos->auth, |
563 | wa_pos->last_row_off, | 571 | wa_pos->last_row_off, |
564 | wa_pos->batch_size, | 572 | limit, |
565 | &history_cb, | 573 | &history_cb, |
566 | wa_pos); | 574 | wa_pos); |
567 | if (NULL == wa_pos->hh) | 575 | if (NULL == wa_pos->hh) |
@@ -594,6 +602,7 @@ run (void *cls, | |||
594 | (void) cls; | 602 | (void) cls; |
595 | (void) args; | 603 | (void) args; |
596 | (void) cfgfile; | 604 | (void) cfgfile; |
605 | |||
597 | cfg = c; | 606 | cfg = c; |
598 | if (GNUNET_OK != | 607 | if (GNUNET_OK != |
599 | exchange_serve_process_config ()) | 608 | exchange_serve_process_config ()) |
@@ -603,8 +612,6 @@ run (void *cls, | |||
603 | } | 612 | } |
604 | wa_pos = wa_head; | 613 | wa_pos = wa_head; |
605 | GNUNET_assert (NULL != wa_pos); | 614 | GNUNET_assert (NULL != wa_pos); |
606 | task = GNUNET_SCHEDULER_add_now (&find_transfers, | ||
607 | NULL); | ||
608 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, | 615 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, |
609 | cls); | 616 | cls); |
610 | ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, | 617 | ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, |
@@ -615,6 +622,9 @@ run (void *cls, | |||
615 | GNUNET_break (0); | 622 | GNUNET_break (0); |
616 | return; | 623 | return; |
617 | } | 624 | } |
625 | |||
626 | task = GNUNET_SCHEDULER_add_now (&find_transfers, | ||
627 | NULL); | ||
618 | } | 628 | } |
619 | 629 | ||
620 | 630 | ||
@@ -630,16 +640,21 @@ main (int argc, | |||
630 | char *const *argv) | 640 | char *const *argv) |
631 | { | 641 | { |
632 | struct GNUNET_GETOPT_CommandLineOption options[] = { | 642 | struct GNUNET_GETOPT_CommandLineOption options[] = { |
643 | GNUNET_GETOPT_option_flag ('r', | ||
644 | "reset", | ||
645 | "start fresh with all transactions in the history", | ||
646 | &reset_mode), | ||
647 | GNUNET_GETOPT_option_uint ('S', | ||
648 | "size", | ||
649 | "SIZE", | ||
650 | "Size to process per shard (default: 1024)", | ||
651 | &shard_size), | ||
633 | GNUNET_GETOPT_option_timetravel ('T', | 652 | GNUNET_GETOPT_option_timetravel ('T', |
634 | "timetravel"), | 653 | "timetravel"), |
635 | GNUNET_GETOPT_option_flag ('t', | 654 | GNUNET_GETOPT_option_flag ('t', |
636 | "test", | 655 | "test", |
637 | "run in test mode and exit when idle", | 656 | "run in test mode and exit when idle", |
638 | &test_mode), | 657 | &test_mode), |
639 | GNUNET_GETOPT_option_flag ('r', | ||
640 | "reset", | ||
641 | "start fresh with all transactions in the history", | ||
642 | &reset_mode), | ||
643 | GNUNET_GETOPT_OPTION_END | 658 | GNUNET_GETOPT_OPTION_END |
644 | }; | 659 | }; |
645 | enum GNUNET_GenericReturnValue ret; | 660 | enum GNUNET_GenericReturnValue ret; |