exchange_statistics_helpers.sql (31641B)
1 -- 2 -- This file is part of TALER 3 -- Copyright (C) 2025 Taler Systems SA 4 -- 5 -- TALER is free software; you can redistribute it and/or modify it under the 6 -- terms of the GNU General Public License as published by the Free Software 7 -- Foundation; either version 3, or (at your option) any later version. 8 -- 9 -- TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 -- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 -- A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 -- 13 -- You should have received a copy of the GNU General Public License along with 14 -- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 -- 16 17 SET search_path TO exchange; 18 DROP FUNCTION IF EXISTS interval_to_start; 19 CREATE OR REPLACE FUNCTION interval_to_start ( 20 IN in_timestamp TIMESTAMP, 21 IN in_range statistic_range, 22 OUT out_bucket_start INT8 23 ) 24 LANGUAGE plpgsql 25 AS $$ 26 BEGIN 27 out_bucket_start = EXTRACT(EPOCH FROM DATE_TRUNC(in_range::text, in_timestamp)); 28 END $$; 29 COMMENT ON FUNCTION interval_to_start 30 IS 'computes the start time of the bucket for an event at the current time given the desired bucket range'; 31 32 33 DROP PROCEDURE IF EXISTS exchange_do_bump_number_bucket_stat; 34 CREATE OR REPLACE PROCEDURE exchange_do_bump_number_bucket_stat( 35 in_slug TEXT, 36 in_h_payto BYTEA, 37 in_timestamp TIMESTAMP, 38 in_delta INT8 39 ) 40 LANGUAGE plpgsql 41 AS $$ 42 DECLARE 43 my_meta INT8; 44 my_range statistic_range; 45 my_bucket_start INT8; 46 my_curs CURSOR (arg_slug TEXT) 47 FOR SELECT UNNEST(ranges) 48 FROM exchange_statistic_bucket_meta 49 WHERE slug=arg_slug; 50 BEGIN 51 SELECT bmeta_serial_id 52 INTO my_meta 53 FROM exchange_statistic_bucket_meta 54 WHERE slug=in_slug 55 AND stype='number'; 56 IF NOT FOUND 57 THEN 58 RETURN; 59 END IF; 60 OPEN my_curs (arg_slug:=in_slug); 61 LOOP 62 FETCH NEXT 63 FROM my_curs 64 INTO my_range; 65 EXIT WHEN NOT FOUND; 66 SELECT * 67 INTO my_bucket_start 68 FROM interval_to_start (in_timestamp, my_range); 69 70 UPDATE exchange_statistic_bucket_counter 71 SET cumulative_number = cumulative_number + in_delta 72 WHERE bmeta_serial_id=my_meta 73 AND h_payto=in_h_payto 74 AND bucket_start=my_bucket_start 75 AND bucket_range=my_range; 76 IF NOT FOUND 77 THEN 78 INSERT INTO exchange_statistic_bucket_counter 79 (bmeta_serial_id 80 ,h_payto 81 ,bucket_start 82 ,bucket_range 83 ,cumulative_number 84 ) VALUES ( 85 my_meta 86 ,in_h_payto 87 ,my_bucket_start 88 ,my_range 89 ,in_delta); 90 END IF; 91 END LOOP; 92 CLOSE my_curs; 93 END $$; 94 95 96 DROP PROCEDURE IF EXISTS exchange_do_bump_amount_bucket_stat; 97 CREATE OR REPLACE PROCEDURE exchange_do_bump_amount_bucket_stat( 98 in_slug TEXT, 99 in_h_payto BYTEA, 100 in_timestamp TIMESTAMP, 101 in_delta taler_amount 102 ) 103 LANGUAGE plpgsql 104 AS $$ 105 DECLARE 106 my_meta INT8; 107 my_range statistic_range; 108 my_bucket_start INT8; 109 my_curs CURSOR (arg_slug TEXT) 110 FOR SELECT UNNEST(ranges) 111 FROM exchange_statistic_bucket_meta 112 WHERE slug=arg_slug; 113 BEGIN 114 SELECT bmeta_serial_id 115 INTO my_meta 116 FROM exchange_statistic_bucket_meta 117 WHERE slug=in_slug 118 AND stype='amount'; 119 IF NOT FOUND 120 THEN 121 RETURN; 122 END IF; 123 OPEN my_curs (arg_slug:=in_slug); 124 LOOP 125 FETCH NEXT 126 FROM my_curs 127 INTO my_range; 128 EXIT WHEN NOT FOUND; 129 SELECT * 130 INTO my_bucket_start 131 FROM interval_to_start (in_timestamp, my_range); 132 133 UPDATE exchange_statistic_bucket_amount 134 SET 135 cumulative_value.val = (cumulative_value).val + (in_delta).val 136 + CASE 137 WHEN (in_delta).frac + (cumulative_value).frac >= 100000000 138 THEN 1 139 ELSE 0 140 END, 141 cumulative_value.frac = (cumulative_value).frac + (in_delta).frac 142 - CASE 143 WHEN (in_delta).frac + (cumulative_value).frac >= 100000000 144 THEN 100000000 145 ELSE 0 146 END 147 WHERE bmeta_serial_id=my_meta 148 AND h_payto=in_h_payto 149 AND bucket_start=my_bucket_start 150 AND bucket_range=my_range; 151 IF NOT FOUND 152 THEN 153 INSERT INTO exchange_statistic_bucket_amount 154 (bmeta_serial_id 155 ,h_payto 156 ,bucket_start 157 ,bucket_range 158 ,cumulative_value 159 ) VALUES ( 160 my_meta 161 ,in_h_payto 162 ,my_bucket_start 163 ,my_range 164 ,in_delta); 165 END IF; 166 END LOOP; 167 CLOSE my_curs; 168 END $$; 169 170 COMMENT ON PROCEDURE exchange_do_bump_amount_bucket_stat 171 IS 'Updates an amount statistic tracked over buckets'; 172 173 174 DROP PROCEDURE IF EXISTS exchange_do_bump_number_interval_stat; 175 CREATE OR REPLACE PROCEDURE exchange_do_bump_number_interval_stat( 176 in_slug TEXT, 177 in_h_payto BYTEA, 178 in_timestamp TIMESTAMP, 179 in_delta INT8 180 ) 181 LANGUAGE plpgsql 182 AS $$ 183 DECLARE 184 my_now INT8; 185 my_record RECORD; 186 my_meta INT8; 187 my_ranges INT8[]; 188 my_precisions INT8[]; 189 my_rangex INT8; 190 my_precisionx INT8; 191 my_start INT8; 192 my_event INT8; 193 BEGIN 194 my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 195 SELECT imeta_serial_id 196 ,ranges AS ranges 197 ,precisions AS precisions 198 INTO my_record 199 FROM exchange_statistic_interval_meta 200 WHERE slug=in_slug 201 AND stype='number'; 202 IF NOT FOUND 203 THEN 204 RETURN; 205 END IF; 206 207 my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds 208 my_precisions = my_record.precisions; 209 my_ranges = my_record.ranges; 210 my_rangex = NULL; 211 FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0) 212 LOOP 213 IF my_now - my_ranges[my_x] < my_start 214 THEN 215 my_rangex = my_ranges[my_x]; 216 my_precisionx = my_precisions[my_x]; 217 EXIT; 218 END IF; 219 END LOOP; 220 IF my_rangex IS NULL 221 THEN 222 -- event is beyond the ranges we care about 223 RETURN; 224 END IF; 225 226 my_meta = my_record.imeta_serial_id; 227 my_start = my_start - my_start % my_precisionx; -- round down 228 229 INSERT INTO exchange_statistic_counter_event AS msce 230 (imeta_serial_id 231 ,h_payto 232 ,slot 233 ,delta) 234 VALUES 235 (my_meta 236 ,in_h_payto 237 ,my_start 238 ,in_delta) 239 ON CONFLICT (imeta_serial_id, h_payto, slot) 240 DO UPDATE SET 241 delta = msce.delta + in_delta 242 RETURNING nevent_serial_id 243 INTO my_event; 244 245 UPDATE exchange_statistic_interval_counter 246 SET cumulative_number = cumulative_number + in_delta 247 WHERE imeta_serial_id = my_meta 248 AND h_payto = in_h_payto 249 AND range=my_rangex; 250 IF NOT FOUND 251 THEN 252 INSERT INTO exchange_statistic_interval_counter 253 (imeta_serial_id 254 ,h_payto 255 ,range 256 ,event_delimiter 257 ,cumulative_number 258 ) VALUES ( 259 my_meta 260 ,in_h_payto 261 ,my_rangex 262 ,my_event 263 ,in_delta); 264 END IF; 265 END $$; 266 267 COMMENT ON PROCEDURE exchange_do_bump_number_interval_stat 268 IS 'Updates a numeric statistic tracked over an interval'; 269 270 271 DROP PROCEDURE IF EXISTS exchange_do_bump_amount_interval_stat; 272 CREATE OR REPLACE PROCEDURE exchange_do_bump_amount_interval_stat( 273 in_slug TEXT, 274 in_h_payto BYTEA, 275 in_timestamp TIMESTAMP, 276 in_delta taler_amount 277 ) 278 LANGUAGE plpgsql 279 AS $$ 280 DECLARE 281 my_now INT8; 282 my_record RECORD; 283 my_meta INT8; 284 my_ranges INT8[]; 285 my_precisions INT8[]; 286 my_x INT; 287 my_rangex INT8; 288 my_precisionx INT8; 289 my_start INT8; 290 my_event INT8; 291 BEGIN 292 my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 293 SELECT imeta_serial_id 294 ,ranges 295 ,precisions 296 INTO my_record 297 FROM exchange_statistic_interval_meta 298 WHERE slug=in_slug 299 AND stype='amount'; 300 IF NOT FOUND 301 THEN 302 RETURN; 303 END IF; 304 305 my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds since epoch 306 my_precisions = my_record.precisions; 307 my_ranges = my_record.ranges; 308 my_rangex = NULL; 309 FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0) 310 LOOP 311 IF my_now - my_ranges[my_x] < my_start 312 THEN 313 my_rangex = my_ranges[my_x]; 314 my_precisionx = my_precisions[my_x]; 315 EXIT; 316 END IF; 317 END LOOP; 318 IF my_rangex IS NULL 319 THEN 320 -- event is beyond the ranges we care about 321 RETURN; 322 END IF; 323 my_start = my_start - my_start % my_precisionx; -- round down 324 my_meta = my_record.imeta_serial_id; 325 326 INSERT INTO exchange_statistic_amount_event AS msae 327 (imeta_serial_id 328 ,h_payto 329 ,slot 330 ,delta 331 ) VALUES ( 332 my_meta 333 ,in_h_payto 334 ,my_start 335 ,in_delta 336 ) 337 ON CONFLICT (imeta_serial_id, h_payto, slot) 338 DO UPDATE SET 339 delta.val = (msae.delta).val + (in_delta).val 340 + CASE 341 WHEN (in_delta).frac + (msae.delta).frac >= 100000000 342 THEN 1 343 ELSE 0 344 END, 345 delta.frac = (msae.delta).frac + (in_delta).frac 346 - CASE 347 WHEN (in_delta).frac + (msae.delta).frac >= 100000000 348 THEN 100000000 349 ELSE 0 350 END 351 RETURNING aevent_serial_id 352 INTO my_event; 353 354 UPDATE exchange_statistic_interval_amount 355 SET 356 cumulative_value.val = (cumulative_value).val + (in_delta).val 357 + CASE 358 WHEN (in_delta).frac + (cumulative_value).frac >= 100000000 359 THEN 1 360 ELSE 0 361 END, 362 cumulative_value.frac = (cumulative_value).frac + (in_delta).frac 363 - CASE 364 WHEN (in_delta).frac + (cumulative_value).frac >= 100000000 365 THEN 100000000 366 ELSE 0 367 END 368 WHERE imeta_serial_id=my_meta 369 AND h_payto=in_h_payto 370 AND range=my_rangex; 371 IF NOT FOUND 372 THEN 373 INSERT INTO exchange_statistic_interval_amount 374 (imeta_serial_id 375 ,h_payto 376 ,range 377 ,event_delimiter 378 ,cumulative_value 379 ) VALUES ( 380 my_meta 381 ,in_h_payto 382 ,my_rangex 383 ,my_event 384 ,in_delta); 385 END IF; 386 END $$; 387 COMMENT ON PROCEDURE exchange_do_bump_amount_interval_stat 388 IS 'Updates an amount statistic tracked over an interval'; 389 390 391 DROP PROCEDURE IF EXISTS exchange_do_bump_number_stat; 392 CREATE OR REPLACE PROCEDURE exchange_do_bump_number_stat( 393 in_slug TEXT, 394 in_h_payto BYTEA, 395 in_timestamp TIMESTAMP, 396 in_delta INT8 397 ) 398 LANGUAGE plpgsql 399 AS $$ 400 BEGIN 401 CALL exchange_do_bump_number_bucket_stat (in_slug, in_h_payto, in_timestamp, in_delta); 402 CALL exchange_do_bump_number_interval_stat (in_slug, in_h_payto, in_timestamp, in_delta); 403 END $$; 404 COMMENT ON PROCEDURE exchange_do_bump_number_stat 405 IS 'Updates a numeric statistic (bucket or interval)'; 406 407 408 DROP PROCEDURE IF EXISTS exchange_do_bump_amount_stat; 409 CREATE OR REPLACE PROCEDURE exchange_do_bump_amount_stat( 410 in_slug TEXT, 411 in_h_payto BYTEA, 412 in_timestamp TIMESTAMP, 413 in_delta taler_amount 414 ) 415 LANGUAGE plpgsql 416 AS $$ 417 BEGIN 418 CALL exchange_do_bump_amount_bucket_stat (in_slug, in_h_payto, in_timestamp, in_delta); 419 CALL exchange_do_bump_amount_interval_stat (in_slug, in_h_payto, in_timestamp, in_delta); 420 END $$; 421 COMMENT ON PROCEDURE exchange_do_bump_amount_stat 422 IS 'Updates an amount statistic (bucket or interval)'; 423 424 425 DROP FUNCTION IF EXISTS exchange_statistic_interval_number_get; 426 CREATE OR REPLACE FUNCTION exchange_statistic_interval_number_get ( 427 IN in_slug TEXT, 428 IN in_h_payto BYTEA 429 ) 430 RETURNS SETOF exchange_statistic_interval_number_get_return_value 431 LANGUAGE plpgsql 432 AS $$ 433 DECLARE 434 my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 435 my_ranges INT8[]; 436 my_range INT8; 437 my_delta INT8; 438 my_meta INT8; 439 my_next_max_serial INT8; 440 my_rec RECORD; 441 my_irec RECORD; 442 my_i INT; 443 my_min_serial INT8 DEFAULT NULL; 444 my_rval exchange_statistic_interval_number_get_return_value; 445 BEGIN 446 SELECT imeta_serial_id 447 ,ranges 448 ,precisions 449 INTO my_rec 450 FROM exchange_statistic_interval_meta 451 WHERE slug=in_slug; 452 IF NOT FOUND 453 THEN 454 RETURN; 455 END IF; 456 my_rval.rvalue = 0; 457 my_ranges = my_rec.ranges; 458 my_meta = my_rec.imeta_serial_id; 459 460 FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) 461 LOOP 462 my_range = my_ranges[my_i]; 463 SELECT event_delimiter 464 ,cumulative_number 465 INTO my_irec 466 FROM exchange_statistic_interval_counter 467 WHERE imeta_serial_id = my_meta 468 AND range = my_range 469 AND h_payto = in_h_payto; 470 IF FOUND 471 THEN 472 my_min_serial = my_irec.event_delimiter; 473 my_rval.rvalue = my_rval.rvalue + my_irec.cumulative_number; 474 475 -- Check if we have events that left the applicable range 476 SELECT SUM(delta) AS delta_sum 477 INTO my_irec 478 FROM exchange_statistic_counter_event 479 WHERE imeta_serial_id = my_meta 480 AND h_payto = in_h_payto 481 AND slot < my_time - my_range 482 AND nevent_serial_id >= my_min_serial; 483 484 IF FOUND AND my_irec.delta_sum IS NOT NULL 485 THEN 486 my_delta = my_irec.delta_sum; 487 my_rval.rvalue = my_rval.rvalue - my_delta; 488 489 -- First find out the next event delimiter value 490 SELECT nevent_serial_id 491 INTO my_next_max_serial 492 FROM exchange_statistic_counter_event 493 WHERE imeta_serial_id = my_meta 494 AND h_payto = in_h_payto 495 AND slot >= my_time - my_range 496 AND nevent_serial_id >= my_min_serial 497 ORDER BY slot ASC 498 LIMIT 1; 499 500 IF FOUND 501 THEN 502 -- remove expired events from the sum of the current slot 503 504 UPDATE exchange_statistic_interval_counter 505 SET cumulative_number = cumulative_number - my_delta, 506 event_delimiter = my_next_max_serial 507 WHERE imeta_serial_id = my_meta 508 AND h_payto = in_h_payto 509 AND range = my_range; 510 ELSE 511 -- actually, slot is now empty, remove it entirely 512 DELETE FROM exchange_statistic_interval_counter 513 WHERE imeta_serial_id = my_meta 514 AND h_payto = in_h_payto 515 AND range = my_range; 516 END IF; 517 IF (my_i < array_length(my_ranges,1)) 518 THEN 519 -- carry over all events into the next slot 520 UPDATE exchange_statistic_interval_counter AS usic SET 521 cumulative_number = cumulative_number + my_delta, 522 event_delimiter = LEAST(usic.event_delimiter,my_min_serial) 523 WHERE imeta_serial_id = my_meta 524 AND h_payto = in_h_payto 525 AND range=my_ranges[my_i+1]; 526 IF NOT FOUND 527 THEN 528 INSERT INTO exchange_statistic_interval_counter 529 (imeta_serial_id 530 ,h_payto 531 ,range 532 ,event_delimiter 533 ,cumulative_number 534 ) VALUES ( 535 my_meta 536 ,in_h_payto 537 ,my_ranges[my_i+1] 538 ,my_min_serial 539 ,my_delta); 540 END IF; 541 ELSE 542 -- events are obsolete, delete them 543 DELETE FROM exchange_statistic_counter_event 544 WHERE imeta_serial_id = my_meta 545 AND h_payto = in_h_payto 546 AND slot < my_time - my_range; 547 END IF; 548 END IF; 549 550 my_rval.range = my_range; 551 RETURN NEXT my_rval; 552 END IF; 553 END LOOP; 554 END $$; 555 556 COMMENT ON FUNCTION exchange_statistic_interval_number_get 557 IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value for each range'; 558 559 560 DROP FUNCTION IF EXISTS exchange_statistic_interval_amount_get; 561 CREATE OR REPLACE FUNCTION exchange_statistic_interval_amount_get ( 562 IN in_slug TEXT, 563 IN in_h_payto BYTEA 564 ) 565 RETURNS SETOF exchange_statistic_interval_amount_get_return_value 566 LANGUAGE plpgsql 567 AS $$ 568 DECLARE 569 my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 570 my_ranges INT8[]; 571 my_range INT8; 572 my_delta_value INT8; 573 my_delta_frac INT8; 574 my_delta taler_amount; 575 my_meta INT8; 576 my_next_max_serial INT8; 577 my_rec RECORD; 578 my_irec RECORD; 579 my_jrec RECORD; 580 my_i INT; 581 my_min_serial INT8 DEFAULT NULL; 582 my_rval exchange_statistic_interval_amount_get_return_value; 583 BEGIN 584 SELECT imeta_serial_id 585 ,ranges 586 ,precisions 587 INTO my_rec 588 FROM exchange_statistic_interval_meta 589 WHERE slug=in_slug; 590 IF NOT FOUND 591 THEN 592 RETURN; 593 END IF; 594 595 my_meta = my_rec.imeta_serial_id; 596 my_ranges = my_rec.ranges; 597 598 my_rval.rvalue.val = 0; 599 my_rval.rvalue.frac = 0; 600 601 FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) 602 LOOP 603 my_range = my_ranges[my_i]; 604 SELECT event_delimiter 605 ,cumulative_value 606 INTO my_irec 607 FROM exchange_statistic_interval_amount 608 WHERE imeta_serial_id = my_meta 609 AND h_payto = in_h_payto 610 AND range = my_range; 611 612 IF FOUND 613 THEN 614 my_min_serial = my_irec.event_delimiter; 615 my_rval.rvalue.val = (my_rval.rvalue).val + (my_irec.cumulative_value).val + (my_irec.cumulative_value).frac / 100000000; 616 my_rval.rvalue.frac = (my_rval.rvalue).frac + (my_irec.cumulative_value).frac % 100000000; 617 IF (my_rval.rvalue).frac > 100000000 618 THEN 619 my_rval.rvalue.frac = (my_rval.rvalue).frac - 100000000; 620 my_rval.rvalue.val = (my_rval.rvalue).val + 1; 621 END IF; 622 623 -- Check if we have events that left the applicable range 624 SELECT SUM((esae.delta).val) AS value_sum 625 ,SUM((esae.delta).frac) AS frac_sum 626 INTO my_jrec 627 FROM exchange_statistic_amount_event esae 628 WHERE imeta_serial_id = my_meta 629 AND h_payto = in_h_payto 630 AND slot < my_time - my_range 631 AND aevent_serial_id >= my_min_serial; 632 633 IF FOUND AND my_jrec.value_sum IS NOT NULL 634 THEN 635 -- Normalize sum 636 my_delta_value = my_jrec.value_sum + my_jrec.frac_sum / 100000000; 637 my_delta_frac = my_jrec.frac_sum % 100000000; 638 my_rval.rvalue.val = (my_rval.rvalue).val - my_delta_value; 639 IF ((my_rval.rvalue).frac >= my_delta_frac) 640 THEN 641 my_rval.rvalue.frac = (my_rval.rvalue).frac - my_delta_frac; 642 ELSE 643 my_rval.rvalue.frac = 100000000 + (my_rval.rvalue).frac - my_delta_frac; 644 my_rval.rvalue.val = (my_rval.rvalue).val - 1; 645 END IF; 646 647 -- First find out the next event delimiter value 648 SELECT aevent_serial_id 649 INTO my_next_max_serial 650 FROM exchange_statistic_amount_event 651 WHERE imeta_serial_id = my_meta 652 AND h_payto = in_h_payto 653 AND slot >= my_time - my_range 654 AND aevent_serial_id >= my_min_serial 655 ORDER BY slot ASC 656 LIMIT 1; 657 IF FOUND 658 THEN 659 -- remove expired events from the sum of the current slot 660 UPDATE exchange_statistic_interval_amount SET 661 cumulative_value.val = (cumulative_value).val - my_delta_value 662 - CASE 663 WHEN (cumulative_value).frac < my_delta_frac 664 THEN 1 665 ELSE 0 666 END, 667 cumulative_value.frac = (cumulative_value).frac - my_delta_frac 668 + CASE 669 WHEN (cumulative_value).frac < my_delta_frac 670 THEN 100000000 671 ELSE 0 672 END, 673 event_delimiter = my_next_max_serial 674 WHERE imeta_serial_id = my_meta 675 AND h_payto = in_h_payto 676 AND range = my_range; 677 ELSE 678 -- actually, slot is now empty, remove it entirely 679 DELETE FROM exchange_statistic_interval_amount 680 WHERE imeta_serial_id = my_meta 681 AND h_payto = in_h_payto 682 AND range = my_range; 683 END IF; 684 IF (my_i < array_length(my_ranges,1)) 685 THEN 686 -- carry over all events into the next (larger) slot 687 UPDATE exchange_statistic_interval_amount AS msia SET 688 cumulative_value.val = (cumulative_value).val + my_delta_value 689 + CASE 690 WHEN (cumulative_value).frac + my_delta_frac > 100000000 691 THEN 1 692 ELSE 0 693 END, 694 cumulative_value.frac = (cumulative_value).frac + my_delta_value 695 - CASE 696 WHEN (cumulative_value).frac + my_delta_frac > 100000000 697 THEN 100000000 698 ELSE 0 699 END, 700 event_delimiter = LEAST (msia.event_delimiter,my_min_serial) 701 WHERE imeta_serial_id = my_meta 702 AND h_payto = in_h_payto 703 AND range=my_ranges[my_i+1]; 704 IF NOT FOUND 705 THEN 706 my_delta.val = my_delta_value; 707 my_delta.frac = my_delta_frac; 708 INSERT INTO exchange_statistic_interval_amount 709 (imeta_serial_id 710 ,h_payto 711 ,event_delimiter 712 ,range 713 ,cumulative_value 714 ) VALUES ( 715 my_meta 716 ,in_h_payto 717 ,my_min_serial 718 ,my_ranges[my_i+1] 719 ,my_delta); 720 END IF; 721 ELSE 722 -- events are obsolete, delete them 723 DELETE FROM exchange_statistic_amount_event 724 WHERE imeta_serial_id = my_meta 725 AND h_payto = in_h_payto 726 AND slot < my_time - my_range; 727 END IF; 728 END IF; 729 730 my_rval.range = my_range; 731 RETURN NEXT my_rval; 732 END IF; 733 END LOOP; -- over my_ranges 734 END $$; 735 736 COMMENT ON FUNCTION exchange_statistic_interval_amount_get 737 IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value; multiple values are returned, one per range'; 738 739 740 741 742 743 DROP PROCEDURE IF EXISTS exchange_statistic_counter_gc; 744 CREATE OR REPLACE PROCEDURE exchange_statistic_counter_gc () 745 LANGUAGE plpgsql 746 AS $$ 747 DECLARE 748 my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 749 my_h_payto BYTEA; 750 my_rec RECORD; 751 my_sum RECORD; 752 my_meta INT8; 753 my_ranges INT8[]; 754 my_precisions INT8[]; 755 my_precision INT4; 756 my_i INT4; 757 min_slot INT8; 758 max_slot INT8; 759 end_slot INT8; 760 my_total INT8; 761 BEGIN 762 -- GC for all instances 763 FOR my_h_payto IN 764 SELECT DISTINCT h_payto 765 FROM exchange_statistic_counter_event 766 LOOP 767 -- Do combination work for all numeric statistic events 768 FOR my_rec IN 769 SELECT imeta_serial_id 770 ,ranges 771 ,precisions 772 ,slug 773 FROM exchange_statistic_interval_meta 774 LOOP 775 -- First, we query the current interval statistic to update its counters 776 PERFORM FROM exchange_statistic_interval_number_get (my_rec.slug, my_h_payto); 777 778 my_meta = my_rec.imeta_serial_id; 779 my_ranges = my_rec.ranges; 780 my_precisions = my_rec.precisions; 781 782 FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) 783 LOOP 784 my_precision = my_precisions[my_i]; 785 IF 1 >= my_precision 786 THEN 787 -- Cannot coarsen in this case 788 CONTINUE; 789 END IF; 790 791 IF 1 = my_i 792 THEN 793 min_slot = 0; 794 ELSE 795 min_slot = my_ranges[my_i - 1]; 796 END IF; 797 end_slot = my_ranges[my_i]; 798 -- RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision; 799 800 LOOP 801 EXIT WHEN min_slot >= end_slot; 802 max_slot = min_slot + my_precision; 803 SELECT SUM(delta) AS total, 804 COUNT(*) AS matches, 805 MIN(nevent_serial_id) AS rep_serial_id 806 INTO my_sum 807 FROM exchange_statistic_counter_event 808 WHERE h_payto=my_h_payto 809 AND imeta_serial_id=my_meta 810 AND slot >= my_time - max_slot 811 AND slot < my_time - min_slot; 812 813 -- RAISE NOTICE 'Found % entries between [%,%)', my_sum.matches, my_time - max_slot, my_time - min_slot; 814 -- we only proceed if we had more then one match (optimization) 815 IF FOUND AND my_sum.matches > 1 816 THEN 817 my_total = my_sum.total; 818 819 -- RAISE NOTICE 'combining % entries to representative % for slots [%-%)', my_sum.matches, my_sum.rep_serial_id, my_time - max_slot, my_time - min_slot; 820 821 -- combine entries 822 DELETE FROM exchange_statistic_counter_event 823 WHERE h_payto=my_h_payto 824 AND imeta_serial_id=my_meta 825 AND slot >= my_time - max_slot 826 AND slot < my_time - min_slot 827 AND nevent_serial_id > my_sum.rep_serial_id; 828 -- Now update the representative to the sum 829 UPDATE exchange_statistic_counter_event SET 830 delta = my_total 831 WHERE imeta_serial_id = my_meta 832 AND h_payto = my_h_payto 833 AND nevent_serial_id = my_sum.rep_serial_id; 834 END IF; 835 min_slot = min_slot + my_precision; 836 END LOOP; -- min_slot to end_slot by precision loop 837 END LOOP; -- my_i loop 838 -- Finally, delete all events beyond the range we care about 839 840 -- RAISE NOTICE 'deleting entries of %/% before % - % = %', my_h_payto, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)]; 841 DELETE FROM exchange_statistic_counter_event 842 WHERE h_payto=my_h_payto 843 AND imeta_serial_id=my_meta 844 AND slot < my_time - my_ranges[array_length(my_ranges,1)]; 845 END LOOP; -- my_rec loop 846 END LOOP; -- my_h_payto loop 847 END $$; 848 COMMENT ON PROCEDURE exchange_statistic_counter_gc 849 IS 'Performs garbage collection and compaction of the exchange_statistic_counter_event table'; 850 851 852 853 DROP PROCEDURE IF EXISTS exchange_statistic_amount_gc; 854 CREATE OR REPLACE PROCEDURE exchange_statistic_amount_gc () 855 LANGUAGE plpgsql 856 AS $$ 857 DECLARE 858 my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 859 my_h_payto BYTEA; 860 my_rec RECORD; 861 my_sum RECORD; 862 my_meta INT8; 863 my_ranges INT8[]; 864 my_precisions INT8[]; 865 my_precision INT4; 866 my_i INT4; 867 min_slot INT8; 868 max_slot INT8; 869 end_slot INT8; 870 my_total_val INT8; 871 my_total_frac INT8; 872 BEGIN 873 -- GC for all accounts 874 FOR my_h_payto IN 875 SELECT DISTINCT h_payto 876 FROM exchange_statistic_counter_event 877 LOOP 878 -- Do combination work for all numeric statistic events 879 FOR my_rec IN 880 SELECT imeta_serial_id 881 ,ranges 882 ,precisions 883 ,slug 884 FROM exchange_statistic_interval_meta 885 LOOP 886 887 -- First, we query the current interval statistic to update its counters 888 PERFORM FROM exchange_statistic_interval_amount_get (my_rec.slug, my_h_payto); 889 890 my_meta = my_rec.imeta_serial_id; 891 my_ranges = my_rec.ranges; 892 my_precisions = my_rec.precisions; 893 894 FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) 895 LOOP 896 my_precision = my_precisions[my_i]; 897 IF 1 >= my_precision 898 THEN 899 -- Cannot coarsen in this case 900 CONTINUE; 901 END IF; 902 903 IF 1 = my_i 904 THEN 905 min_slot = 0; 906 ELSE 907 min_slot = my_ranges[my_i - 1]; 908 END IF; 909 end_slot = my_ranges[my_i]; 910 911 -- RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision; 912 LOOP 913 EXIT WHEN min_slot >= end_slot; 914 max_slot = min_slot + my_precision; 915 SELECT SUM((delta).val) AS total_val, 916 SUM((delta).frac) AS total_frac, 917 COUNT(*) AS matches, 918 MIN(aevent_serial_id) AS rep_serial_id 919 INTO my_sum 920 FROM exchange_statistic_amount_event 921 WHERE imeta_serial_id=my_meta 922 AND h_payto=my_h_payto 923 AND slot >= my_time - max_slot 924 AND slot < my_time - max_slot; 925 -- we only proceed if we had more then one match (optimization) 926 IF FOUND AND my_sum.matches > 1 927 THEN 928 -- normalize new total 929 my_total_frac = my_sum.total_frac % 100000000; 930 my_total_val = my_sum.total_val + my_sum.total_frac / 100000000; 931 932 -- combine entries 933 DELETE FROM exchange_statistic_amount_event 934 WHERE imeta_serial_id=my_meta 935 AND h_payto=my_h_payto 936 AND slot >= my_time - max_slot 937 AND slot < my_time - max_slot 938 AND aevent_serial_id > my_sum.rep_serial_id; 939 -- Now update the representative to the sum 940 UPDATE exchange_statistic_amount_event SET 941 delta.val = my_total_value 942 ,delta.frac = my_total_frac 943 WHERE imeta_serial_id = my_meta 944 AND h_payto = my_h_payto 945 AND aevent_serial_id = my_sum.rep_serial_id; 946 END IF; 947 min_slot = min_slot + my_precision; 948 END LOOP; -- min_slot to end_slot by precision loop 949 END LOOP; -- my_i loop 950 -- Finally, delete all events beyond the range we care about 951 952 -- RAISE NOTICE 'deleting entries of %/% before % - % = %', my_h_payto, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)]; 953 DELETE FROM exchange_statistic_amount_event 954 WHERE h_payto=my_h_payto 955 AND imeta_serial_id=my_meta 956 AND slot < my_time - my_ranges[array_length(my_ranges,1)]; 957 END LOOP; -- my_rec loop 958 END LOOP; -- my_h_payto loop 959 END $$; 960 COMMENT ON PROCEDURE exchange_statistic_amount_gc 961 IS 'Performs garbage collection and compaction of the exchange_statistic_amount_event table'; 962 963 964 965 DROP PROCEDURE IF EXISTS exchange_statistic_bucket_gc; 966 CREATE OR REPLACE PROCEDURE exchange_statistic_bucket_gc () 967 LANGUAGE plpgsql 968 AS $$ 969 DECLARE 970 my_rec RECORD; 971 my_range TEXT; 972 my_now INT8; 973 my_end INT8; 974 BEGIN 975 my_now = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP(0)::TIMESTAMP); -- seconds since epoch 976 FOR my_rec IN 977 SELECT bmeta_serial_id 978 ,stype 979 ,ranges[array_length(ranges,1)] AS range 980 ,ages[array_length(ages,1)] AS age 981 FROM exchange_statistic_bucket_meta 982 LOOP 983 my_range = '1 ' || my_rec.range::TEXT; 984 my_end = my_now - my_rec.age * EXTRACT(SECONDS FROM (SELECT my_range::INTERVAL)); -- age is given in multiples of the range (in seconds) 985 IF my_rec.stype = 'amount' 986 THEN 987 DELETE 988 FROM exchange_statistic_bucket_amount 989 WHERE bmeta_serial_id = my_rec.bmeta_serial_id 990 AND bucket_start >= my_end; 991 ELSE 992 DELETE 993 FROM exchange_statistic_bucket_counter 994 WHERE bmeta_serial_id = my_rec.bmeta_serial_id 995 AND bucket_start >= my_end; 996 END IF; 997 END LOOP; 998 END $$; 999 COMMENT ON PROCEDURE exchange_statistic_bucket_gc 1000 IS 'Performs garbage collection of the exchange_statistic_bucket_counter and exchange_statistic_bucket_amount tables'; 1001 1002 1003 1004 DROP FUNCTION IF EXISTS exchange_drop_customization; 1005 CREATE OR REPLACE FUNCTION exchange_drop_customization ( 1006 IN in_schema TEXT, 1007 OUT out_found BOOLEAN 1008 ) 1009 LANGUAGE plpgsql 1010 AS $$ 1011 DECLARE 1012 my_xpatches TEXT; 1013 BEGIN 1014 -- Update DB versioning table. 1015 out_found = FALSE; 1016 FOR my_xpatches IN 1017 SELECT patch_name 1018 FROM _v.patches 1019 WHERE starts_with(patch_name, in_schema || '-') 1020 LOOP 1021 PERFORM _v.unregister_patch(my_xpatches); 1022 out_found = TRUE; 1023 END LOOP; 1024 1025 IF out_found 1026 THEN 1027 -- Drop the schema with all stored procedures/functions. 1028 -- This also removes all associated triggers, hence CASCADE. 1029 EXECUTE FORMAT('DROP SCHEMA %s CASCADE' 1030 ,in_schema); 1031 END IF; 1032 1033 -- Finally, need to also remove entries from the statistics meta-tables. 1034 -- Doing so also DELETEs the associated statistics, hence CASCADE. 1035 DELETE 1036 FROM exchange_statistic_interval_meta 1037 WHERE origin=in_schema; 1038 DELETE 1039 FROM exchange_statistic_bucket_meta 1040 WHERE origin=in_schema; 1041 END $$; 1042 COMMENT ON FUNCTION exchange_drop_customization 1043 IS 'Removes all entries related to a particular exchange customization schema';