aboutsummaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-wirewatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r--src/exchange/taler-exchange-wirewatch.c85
1 files changed, 50 insertions, 35 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 40b962f8a..760dbe10b 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of TALER 2 This file is part of TALER
3 Copyright (C) 2016--2020 Taler Systems SA 3 Copyright (C) 2016--2021 Taler Systems SA
4 4
5 TALER is free software; you can redistribute it and/or modify it under the 5 TALER is free software; you can redistribute it and/or modify it under the
6 terms of the GNU Affero General Public License as published by the Free Software 6 terms of the GNU Affero General Public License as published by the Free Software
@@ -90,6 +90,11 @@ struct WireAccount
90 uint64_t latest_row_off; 90 uint64_t latest_row_off;
91 91
92 /** 92 /**
93 * Offset where our current shard ends.
94 */
95 uint64_t shard_end;
96
97 /**
93 * How many transactions do we retrieve per batch? 98 * How many transactions do we retrieve per batch?
94 */ 99 */
95 unsigned int batch_size; 100 unsigned int batch_size;
@@ -103,19 +108,14 @@ struct WireAccount
103 * Are we running from scratch and should re-process all transactions 108 * Are we running from scratch and should re-process all transactions
104 * for this account? 109 * for this account?
105 */ 110 */
106 int reset_mode; 111 bool reset_mode;
107 112
108 /** 113 /**
109 * Should we delay the next request to the wire plugin a bit? Set to 114 * Should we delay the next request to the wire plugin a bit? Set to
110 * #GNUNET_NO if we actually did some work. 115 * false if we actually did some work.
111 */ 116 */
112 int delay; 117 bool delay;
113 118
114 /**
115 * Did we experience a soft failure during the current
116 * transaction?
117 */
118 bool soft_fail;
119}; 119};
120 120
121 121
@@ -161,6 +161,11 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
161static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; 161static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
162 162
163/** 163/**
164 * Modulus to apply to group shards.
165 */
166static unsigned int shard_size = 1024;
167
168/**
164 * Value to return from main(). 0 on success, non-zero on 169 * Value to return from main(). 0 on success, non-zero on
165 * on serious errors. 170 * on serious errors.
166 */ 171 */
@@ -363,20 +368,10 @@ history_cb (void *cls,
363 (unsigned int) ec, 368 (unsigned int) ec,
364 http_status); 369 http_status);
365 } 370 }
366 if (wa->soft_fail) 371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367 { 372 "End of list. Committing progress!\n");
368 /* no point to commit, transaction was already rolled 373 qs = db_plugin->commit (db_plugin->cls,
369 back after we encountered a soft failure */ 374 session);
370 wa->soft_fail = false;
371 qs = GNUNET_DB_STATUS_SOFT_ERROR;
372 }
373 else
374 {
375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
376 "End of list. Committing progress!\n");
377 qs = db_plugin->commit (db_plugin->cls,
378 session);
379 }
380 if (GNUNET_DB_STATUS_HARD_ERROR == qs) 375 if (GNUNET_DB_STATUS_HARD_ERROR == qs)
381 { 376 {
382 GNUNET_SCHEDULER_shutdown (); 377 GNUNET_SCHEDULER_shutdown ();
@@ -410,7 +405,7 @@ history_cb (void *cls,
410 "Increasing batch size to %llu\n", 405 "Increasing batch size to %llu\n",
411 (unsigned long long) wa->batch_size); 406 (unsigned long long) wa->batch_size);
412 } 407 }
413 if ( (GNUNET_YES == wa->delay) && 408 if ( (wa->delay) &&
414 (test_mode) && 409 (test_mode) &&
415 (NULL == wa->next) ) 410 (NULL == wa->next) )
416 { 411 {
@@ -419,7 +414,7 @@ history_cb (void *cls,
419 GNUNET_SCHEDULER_shutdown (); 414 GNUNET_SCHEDULER_shutdown ();
420 return GNUNET_OK; 415 return GNUNET_OK;
421 } 416 }
422 if (GNUNET_YES == wa->delay) 417 if (wa->delay)
423 { 418 {
424 wa->delayed_until 419 wa->delayed_until
425 = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); 420 = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
@@ -477,6 +472,7 @@ history_cb (void *cls,
477 db_plugin->rollback (db_plugin->cls, 472 db_plugin->rollback (db_plugin->cls,
478 session); 473 session);
479 GNUNET_SCHEDULER_shutdown (); 474 GNUNET_SCHEDULER_shutdown ();
475 wa->hh = NULL;
480 return GNUNET_SYSERR; 476 return GNUNET_SYSERR;
481 } 477 }
482 if (GNUNET_DB_STATUS_SOFT_ERROR == qs) 478 if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -485,10 +481,13 @@ history_cb (void *cls,
485 "Got DB soft error for reserves_in_insert. Rolling back.\n"); 481 "Got DB soft error for reserves_in_insert. Rolling back.\n");
486 db_plugin->rollback (db_plugin->cls, 482 db_plugin->rollback (db_plugin->cls,
487 session); 483 session);
488 wa->soft_fail = true; 484 wa->hh = NULL;
485 GNUNET_assert (NULL == task);
486 task = GNUNET_SCHEDULER_add_now (&find_transfers,
487 NULL);
489 return GNUNET_SYSERR; 488 return GNUNET_SYSERR;
490 } 489 }
491 wa->delay = GNUNET_NO; 490 wa->delay = false;
492 wa->latest_row_off = serial_id; 491 wa->latest_row_off = serial_id;
493 return GNUNET_OK; 492 return GNUNET_OK;
494} 493}
@@ -504,6 +503,7 @@ find_transfers (void *cls)
504{ 503{
505 struct TALER_EXCHANGEDB_Session *session; 504 struct TALER_EXCHANGEDB_Session *session;
506 enum GNUNET_DB_QueryStatus qs; 505 enum GNUNET_DB_QueryStatus qs;
506 unsigned int limit;
507 507
508 (void) cls; 508 (void) cls;
509 task = NULL; 509 task = NULL;
@@ -555,13 +555,21 @@ find_transfers (void *cls)
555 } 555 }
556 wa_pos->reset_mode = GNUNET_NO; 556 wa_pos->reset_mode = GNUNET_NO;
557 } 557 }
558 wa_pos->delay = GNUNET_YES; 558 wa_pos->delay = true;
559 wa_pos->current_batch_size = 0; /* reset counter */ 559 wa_pos->current_batch_size = 0; /* reset counter */
560 wa_pos->session = session; 560 wa_pos->session = session;
561 if (wa_pos->shard_end == wa_pos->last_row_off)
562 {
563 /* advance to next shard */
564 wa_pos->shard_end += shard_size;
565 }
566 limit = GNUNET_MIN (wa_pos->batch_size,
567 wa_pos->shard_end - wa_pos->last_row_off);
568 GNUNET_assert (NULL == wa_pos->hh);
561 wa_pos->hh = TALER_BANK_credit_history (ctx, 569 wa_pos->hh = TALER_BANK_credit_history (ctx,
562 &wa_pos->auth, 570 &wa_pos->auth,
563 wa_pos->last_row_off, 571 wa_pos->last_row_off,
564 wa_pos->batch_size, 572 limit,
565 &history_cb, 573 &history_cb,
566 wa_pos); 574 wa_pos);
567 if (NULL == wa_pos->hh) 575 if (NULL == wa_pos->hh)
@@ -594,6 +602,7 @@ run (void *cls,
594 (void) cls; 602 (void) cls;
595 (void) args; 603 (void) args;
596 (void) cfgfile; 604 (void) cfgfile;
605
597 cfg = c; 606 cfg = c;
598 if (GNUNET_OK != 607 if (GNUNET_OK !=
599 exchange_serve_process_config ()) 608 exchange_serve_process_config ())
@@ -603,8 +612,6 @@ run (void *cls,
603 } 612 }
604 wa_pos = wa_head; 613 wa_pos = wa_head;
605 GNUNET_assert (NULL != wa_pos); 614 GNUNET_assert (NULL != wa_pos);
606 task = GNUNET_SCHEDULER_add_now (&find_transfers,
607 NULL);
608 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 615 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
609 cls); 616 cls);
610 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 617 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
@@ -615,6 +622,9 @@ run (void *cls,
615 GNUNET_break (0); 622 GNUNET_break (0);
616 return; 623 return;
617 } 624 }
625
626 task = GNUNET_SCHEDULER_add_now (&find_transfers,
627 NULL);
618} 628}
619 629
620 630
@@ -630,16 +640,21 @@ main (int argc,
630 char *const *argv) 640 char *const *argv)
631{ 641{
632 struct GNUNET_GETOPT_CommandLineOption options[] = { 642 struct GNUNET_GETOPT_CommandLineOption options[] = {
643 GNUNET_GETOPT_option_flag ('r',
644 "reset",
645 "start fresh with all transactions in the history",
646 &reset_mode),
647 GNUNET_GETOPT_option_uint ('S',
648 "size",
649 "SIZE",
650 "Size to process per shard (default: 1024)",
651 &shard_size),
633 GNUNET_GETOPT_option_timetravel ('T', 652 GNUNET_GETOPT_option_timetravel ('T',
634 "timetravel"), 653 "timetravel"),
635 GNUNET_GETOPT_option_flag ('t', 654 GNUNET_GETOPT_option_flag ('t',
636 "test", 655 "test",
637 "run in test mode and exit when idle", 656 "run in test mode and exit when idle",
638 &test_mode), 657 &test_mode),
639 GNUNET_GETOPT_option_flag ('r',
640 "reset",
641 "start fresh with all transactions in the history",
642 &reset_mode),
643 GNUNET_GETOPT_OPTION_END 658 GNUNET_GETOPT_OPTION_END
644 }; 659 };
645 enum GNUNET_GenericReturnValue ret; 660 enum GNUNET_GenericReturnValue ret;