pg_statistics_helpers.sql (33453B)
1 -- 2 -- This file is part of TALER 3 -- Copyright (C) 2025 Taler Systems SA 4 -- 5 -- TALER is free software; you can redistribute it and/or modify it under the 6 -- terms of the GNU General Public License as published by the Free Software 7 -- Foundation; either version 3, or (at your option) any later version. 8 -- 9 -- TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 -- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 -- A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 -- 13 -- You should have received a copy of the GNU General Public License along with 14 -- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 -- 16 17 SET search_path TO merchant; 18 DROP FUNCTION IF EXISTS interval_to_start; 19 CREATE OR REPLACE FUNCTION interval_to_start ( 20 IN in_timestamp TIMESTAMP, 21 IN in_range statistic_range, 22 OUT out_bucket_start INT8 23 ) 24 LANGUAGE plpgsql 25 AS $$ 26 BEGIN 27 out_bucket_start = EXTRACT(EPOCH FROM DATE_TRUNC(in_range::text, in_timestamp)); 28 END $$; 29 COMMENT ON FUNCTION interval_to_start 30 IS 'computes the start time of the bucket for an event at the current time given the desired bucket range'; 31 32 33 DROP PROCEDURE IF EXISTS merchant_do_bump_number_bucket_stat; 34 CREATE OR REPLACE PROCEDURE merchant_do_bump_number_bucket_stat( 35 in_slug TEXT, 36 in_merchant_serial BIGINT, 37 in_timestamp TIMESTAMP, 38 in_delta INT8 39 ) 40 LANGUAGE plpgsql 41 AS $$ 42 DECLARE 43 my_meta INT8; 44 my_range statistic_range; 45 my_bucket_start INT8; 46 my_curs CURSOR (arg_slug TEXT) 47 FOR SELECT UNNEST(ranges) 48 FROM merchant_statistic_bucket_meta 49 WHERE slug=arg_slug; 50 BEGIN 51 SELECT bmeta_serial_id 52 INTO my_meta 53 FROM merchant_statistic_bucket_meta 54 WHERE slug=in_slug 55 AND stype='number'; 56 IF NOT FOUND 57 THEN 58 RETURN; 59 END IF; 60 OPEN my_curs (arg_slug:=in_slug); 61 LOOP 62 FETCH NEXT 63 FROM my_curs 64 INTO my_range; 65 EXIT WHEN NOT FOUND; 66 SELECT * 67 INTO my_bucket_start 68 FROM interval_to_start (in_timestamp, my_range); 69 70 UPDATE merchant_statistic_bucket_counter 71 SET cumulative_number = cumulative_number + in_delta 72 WHERE bmeta_serial_id=my_meta 73 AND merchant_serial=in_merchant_serial 74 AND bucket_start=my_bucket_start 75 AND bucket_range=my_range; 76 IF NOT FOUND 77 THEN 78 INSERT INTO merchant_statistic_bucket_counter 79 (bmeta_serial_id 80 ,merchant_serial 81 ,bucket_start 82 ,bucket_range 83 ,cumulative_number 84 ) VALUES ( 85 my_meta 86 ,in_merchant_serial 87 ,my_bucket_start 88 ,my_range 89 ,in_delta); 90 END IF; 91 END LOOP; 92 CLOSE my_curs; 93 END $$; 94 95 96 DROP PROCEDURE IF EXISTS merchant_do_bump_amount_bucket_stat; 97 CREATE OR REPLACE PROCEDURE merchant_do_bump_amount_bucket_stat( 98 in_slug TEXT, 99 in_merchant_serial BIGINT, 100 in_timestamp TIMESTAMP, 101 in_delta taler_amount_currency 102 ) 103 LANGUAGE plpgsql 104 AS $$ 105 DECLARE 106 my_meta INT8; 107 my_range statistic_range; 108 my_bucket_start INT8; 109 my_curs CURSOR (arg_slug TEXT) 110 FOR SELECT UNNEST(ranges) 111 FROM merchant_statistic_bucket_meta 112 WHERE slug=arg_slug; 113 BEGIN 114 SELECT bmeta_serial_id 115 INTO my_meta 116 FROM merchant_statistic_bucket_meta 117 WHERE slug=in_slug 118 AND stype='amount'; 119 IF NOT FOUND 120 THEN 121 RETURN; 122 END IF; 123 OPEN my_curs (arg_slug:=in_slug); 124 LOOP 125 FETCH NEXT 126 FROM my_curs 127 INTO my_range; 128 EXIT WHEN NOT FOUND; 129 SELECT * 130 INTO my_bucket_start 131 FROM interval_to_start (in_timestamp, my_range); 132 133 UPDATE merchant_statistic_bucket_amount 134 SET 135 cumulative_value = cumulative_value + (in_delta).val 136 + CASE 137 WHEN (in_delta).frac + cumulative_frac >= 100000000 138 THEN 1 139 ELSE 0 140 END, 141 cumulative_frac = cumulative_frac + (in_delta).frac 142 - CASE 143 WHEN (in_delta).frac + cumulative_frac >= 100000000 144 THEN 100000000 145 ELSE 0 146 END 147 WHERE bmeta_serial_id=my_meta 148 AND merchant_serial=in_merchant_serial 149 AND curr=(in_delta).curr 150 AND bucket_start=my_bucket_start 151 AND bucket_range=my_range; 152 IF NOT FOUND 153 THEN 154 INSERT INTO merchant_statistic_bucket_amount 155 (bmeta_serial_id 156 ,merchant_serial 157 ,bucket_start 158 ,bucket_range 159 ,curr 160 ,cumulative_value 161 ,cumulative_frac 162 ) VALUES ( 163 my_meta 164 ,in_merchant_serial 165 ,my_bucket_start 166 ,my_range 167 ,(in_delta).curr 168 ,(in_delta).val 169 ,(in_delta).frac); 170 END IF; 171 END LOOP; 172 CLOSE my_curs; 173 END $$; 174 175 COMMENT ON PROCEDURE merchant_do_bump_amount_bucket_stat 176 IS 'Updates an amount statistic tracked over buckets'; 177 178 179 DROP PROCEDURE IF EXISTS merchant_do_bump_number_interval_stat; 180 CREATE OR REPLACE PROCEDURE merchant_do_bump_number_interval_stat( 181 in_slug TEXT, 182 in_merchant_serial BIGINT, 183 in_timestamp TIMESTAMP, 184 in_delta INT8 185 ) 186 LANGUAGE plpgsql 187 AS $$ 188 DECLARE 189 my_now INT8; 190 my_record RECORD; 191 my_meta INT8; 192 my_ranges INT8[]; 193 my_precisions INT8[]; 194 my_rangex INT8; 195 my_precisionx INT8; 196 my_start INT8; 197 my_event INT8; 198 BEGIN 199 my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 200 SELECT imeta_serial_id 201 ,ranges AS ranges 202 ,precisions AS precisions 203 INTO my_record 204 FROM merchant_statistic_interval_meta 205 WHERE slug=in_slug 206 AND stype='number'; 207 IF NOT FOUND 208 THEN 209 RETURN; 210 END IF; 211 212 my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds 213 my_precisions = my_record.precisions; 214 my_ranges = my_record.ranges; 215 my_rangex = NULL; 216 FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0) 217 LOOP 218 IF my_now - my_ranges[my_x] < my_start 219 THEN 220 my_rangex = my_ranges[my_x]; 221 my_precisionx = my_precisions[my_x]; 222 EXIT; 223 END IF; 224 END LOOP; 225 IF my_rangex IS NULL 226 THEN 227 -- event is beyond the ranges we care about 228 RETURN; 229 END IF; 230 231 my_meta = my_record.imeta_serial_id; 232 my_start = my_start - my_start % my_precisionx; -- round down 233 234 INSERT INTO merchant_statistic_counter_event AS msce 235 (imeta_serial_id 236 ,merchant_serial 237 ,slot 238 ,delta) 239 VALUES 240 (my_meta 241 ,in_merchant_serial 242 ,my_start 243 ,in_delta) 244 ON CONFLICT (imeta_serial_id, merchant_serial, slot) 245 DO UPDATE SET 246 delta = msce.delta + in_delta 247 RETURNING nevent_serial_id 248 INTO my_event; 249 250 UPDATE merchant_statistic_interval_counter 251 SET cumulative_number = cumulative_number + in_delta 252 WHERE imeta_serial_id = my_meta 253 AND merchant_serial = in_merchant_serial 254 AND range=my_rangex; 255 IF NOT FOUND 256 THEN 257 INSERT INTO merchant_statistic_interval_counter 258 (imeta_serial_id 259 ,merchant_serial 260 ,range 261 ,event_delimiter 262 ,cumulative_number 263 ) VALUES ( 264 my_meta 265 ,in_merchant_serial 266 ,my_rangex 267 ,my_event 268 ,in_delta); 269 END IF; 270 END $$; 271 272 COMMENT ON PROCEDURE merchant_do_bump_number_interval_stat 273 IS 'Updates a numeric statistic tracked over an interval'; 274 275 276 DROP PROCEDURE IF EXISTS merchant_do_bump_amount_interval_stat; 277 CREATE OR REPLACE PROCEDURE merchant_do_bump_amount_interval_stat( 278 in_slug TEXT, 279 in_merchant_serial BIGINT, 280 in_timestamp TIMESTAMP, 281 in_delta taler_amount_currency -- new amount in table that we should add to the tracker 282 ) 283 LANGUAGE plpgsql 284 AS $$ 285 DECLARE 286 my_now INT8; 287 my_record RECORD; 288 my_meta INT8; 289 my_ranges INT8[]; 290 my_precisions INT8[]; 291 my_x INT; 292 my_rangex INT8; 293 my_precisionx INT8; 294 my_start INT8; 295 my_event INT8; 296 BEGIN 297 my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 298 SELECT imeta_serial_id 299 ,ranges 300 ,precisions 301 INTO my_record 302 FROM merchant_statistic_interval_meta 303 WHERE slug=in_slug 304 AND stype='amount'; 305 IF NOT FOUND 306 THEN 307 RETURN; 308 END IF; 309 310 my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds since epoch 311 my_precisions = my_record.precisions; 312 my_ranges = my_record.ranges; 313 my_rangex = NULL; 314 FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0) 315 LOOP 316 IF my_now - my_ranges[my_x] < my_start 317 THEN 318 my_rangex = my_ranges[my_x]; 319 my_precisionx = my_precisions[my_x]; 320 EXIT; 321 END IF; 322 END LOOP; 323 IF my_rangex IS NULL 324 THEN 325 -- event is beyond the ranges we care about 326 RETURN; 327 END IF; 328 my_start = my_start - my_start % my_precisionx; -- round down 329 my_meta = my_record.imeta_serial_id; 330 331 INSERT INTO merchant_statistic_amount_event AS msae 332 (imeta_serial_id 333 ,merchant_serial 334 ,slot 335 ,delta_curr 336 ,delta_value 337 ,delta_frac 338 ) VALUES ( 339 my_meta 340 ,in_merchant_serial 341 ,my_start 342 ,(in_delta).curr 343 ,(in_delta).val 344 ,(in_delta).frac 345 ) 346 ON CONFLICT (imeta_serial_id, merchant_serial, slot, delta_curr) 347 DO UPDATE SET 348 delta_value = msae.delta_value + (in_delta).val 349 + CASE 350 WHEN (in_delta).frac + msae.delta_frac >= 100000000 351 THEN 1 352 ELSE 0 353 END, 354 delta_frac = msae.delta_frac + (in_delta).frac 355 - CASE 356 WHEN (in_delta).frac + msae.delta_frac >= 100000000 357 THEN 100000000 358 ELSE 0 359 END 360 RETURNING aevent_serial_id 361 INTO my_event; 362 363 UPDATE merchant_statistic_interval_amount 364 SET 365 cumulative_value = cumulative_value + (in_delta).val 366 + CASE 367 WHEN (in_delta).frac + cumulative_frac >= 100000000 368 THEN 1 369 ELSE 0 370 END, 371 cumulative_frac = cumulative_frac + (in_delta).frac 372 - CASE 373 WHEN (in_delta).frac + cumulative_frac >= 100000000 374 THEN 100000000 375 ELSE 0 376 END 377 WHERE imeta_serial_id=my_meta 378 AND merchant_serial=in_merchant_serial 379 AND range=my_rangex 380 AND curr=(in_delta).curr; 381 IF NOT FOUND 382 THEN 383 INSERT INTO merchant_statistic_interval_amount 384 (imeta_serial_id 385 ,merchant_serial 386 ,range 387 ,event_delimiter 388 ,curr 389 ,cumulative_value 390 ,cumulative_frac 391 ) VALUES ( 392 my_meta 393 ,in_merchant_serial 394 ,my_rangex 395 ,my_event 396 ,(in_delta).curr 397 ,(in_delta).val 398 ,(in_delta).frac); 399 END IF; 400 END $$; 401 COMMENT ON PROCEDURE merchant_do_bump_amount_interval_stat 402 IS 'Updates an amount statistic tracked over an interval'; 403 404 405 DROP PROCEDURE IF EXISTS merchant_do_bump_number_stat; 406 CREATE OR REPLACE PROCEDURE merchant_do_bump_number_stat( 407 in_slug TEXT, 408 in_merchant_serial BIGINT, 409 in_timestamp TIMESTAMP, 410 in_delta INT8 411 ) 412 LANGUAGE plpgsql 413 AS $$ 414 BEGIN 415 CALL merchant_do_bump_number_bucket_stat (in_slug, in_merchant_serial, in_timestamp, in_delta); 416 CALL merchant_do_bump_number_interval_stat (in_slug, in_merchant_serial, in_timestamp, in_delta); 417 END $$; 418 COMMENT ON PROCEDURE merchant_do_bump_number_stat 419 IS 'Updates a numeric statistic (bucket or interval)'; 420 421 422 DROP PROCEDURE IF EXISTS merchant_do_bump_amount_stat; 423 CREATE OR REPLACE PROCEDURE merchant_do_bump_amount_stat( 424 in_slug TEXT, 425 in_merchant_serial BIGINT, 426 in_timestamp TIMESTAMP, 427 in_delta taler_amount_currency 428 ) 429 LANGUAGE plpgsql 430 AS $$ 431 BEGIN 432 CALL merchant_do_bump_amount_bucket_stat (in_slug, in_merchant_serial, in_timestamp, in_delta); 433 CALL merchant_do_bump_amount_interval_stat (in_slug, in_merchant_serial, in_timestamp, in_delta); 434 END $$; 435 COMMENT ON PROCEDURE merchant_do_bump_amount_stat 436 IS 'Updates an amount statistic (bucket or interval)'; 437 438 439 DROP FUNCTION IF EXISTS merchant_statistic_interval_number_get; 440 CREATE FUNCTION merchant_statistic_interval_number_get ( 441 IN in_slug TEXT, 442 IN in_instance_id TEXT 443 ) 444 RETURNS SETOF merchant_statistic_interval_number_get_return_value 445 LANGUAGE plpgsql 446 AS $$ 447 DECLARE 448 my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 449 my_ranges INT8[]; 450 my_range INT8; 451 my_delta INT8; 452 my_meta INT8; 453 my_next_max_serial INT8; 454 my_instance_id INT8; 455 my_rec RECORD; 456 my_irec RECORD; 457 my_i INT; 458 my_min_serial INT8 DEFAULT NULL; 459 my_rval merchant_statistic_interval_number_get_return_value; 460 BEGIN 461 SELECT merchant_serial 462 INTO my_instance_id 463 FROM merchant_instances 464 WHERE merchant_id=in_instance_id; 465 IF NOT FOUND 466 THEN 467 RETURN; 468 END IF; 469 470 SELECT imeta_serial_id 471 ,ranges 472 ,precisions 473 INTO my_rec 474 FROM merchant_statistic_interval_meta 475 WHERE slug=in_slug; 476 IF NOT FOUND 477 THEN 478 RETURN; 479 END IF; 480 my_rval.rvalue = 0; 481 my_ranges = my_rec.ranges; 482 my_meta = my_rec.imeta_serial_id; 483 484 FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) 485 LOOP 486 my_range = my_ranges[my_i]; 487 SELECT event_delimiter 488 ,cumulative_number 489 INTO my_irec 490 FROM merchant_statistic_interval_counter 491 WHERE imeta_serial_id = my_meta 492 AND range = my_range 493 AND merchant_serial = my_instance_id; 494 IF FOUND 495 THEN 496 my_min_serial = my_irec.event_delimiter; 497 my_rval.rvalue = my_rval.rvalue + my_irec.cumulative_number; 498 499 -- Check if we have events that left the applicable range 500 SELECT SUM(delta) AS delta_sum 501 INTO my_irec 502 FROM merchant_statistic_counter_event 503 WHERE imeta_serial_id = my_meta 504 AND merchant_serial = my_instance_id 505 AND slot < my_time - my_range 506 AND nevent_serial_id >= my_min_serial; 507 508 IF FOUND AND my_irec.delta_sum IS NOT NULL 509 THEN 510 my_delta = my_irec.delta_sum; 511 my_rval.rvalue = my_rval.rvalue - my_delta; 512 513 -- First find out the next event delimiter value 514 SELECT nevent_serial_id 515 INTO my_next_max_serial 516 FROM merchant_statistic_counter_event 517 WHERE imeta_serial_id = my_meta 518 AND merchant_serial = my_instance_id 519 AND slot >= my_time - my_range 520 AND nevent_serial_id >= my_min_serial 521 ORDER BY slot ASC 522 LIMIT 1; 523 524 IF FOUND 525 THEN 526 -- remove expired events from the sum of the current slot 527 528 UPDATE merchant_statistic_interval_counter 529 SET cumulative_number = cumulative_number - my_delta, 530 event_delimiter = my_next_max_serial 531 WHERE imeta_serial_id = my_meta 532 AND merchant_serial = my_instance_id 533 AND range = my_range; 534 ELSE 535 -- actually, slot is now empty, remove it entirely 536 DELETE FROM merchant_statistic_interval_counter 537 WHERE imeta_serial_id = my_meta 538 AND merchant_serial = my_instance_id 539 AND range = my_range; 540 END IF; 541 IF (my_i < array_length(my_ranges,1)) 542 THEN 543 -- carry over all events into the next slot 544 UPDATE merchant_statistic_interval_counter AS usic SET 545 cumulative_number = cumulative_number + my_delta, 546 event_delimiter = LEAST(usic.event_delimiter,my_min_serial) 547 WHERE imeta_serial_id = my_meta 548 AND merchant_serial = my_instance_id 549 AND range=my_ranges[my_i+1]; 550 IF NOT FOUND 551 THEN 552 INSERT INTO merchant_statistic_interval_counter 553 (imeta_serial_id 554 ,merchant_serial 555 ,range 556 ,event_delimiter 557 ,cumulative_number 558 ) VALUES ( 559 my_meta 560 ,my_instance_id 561 ,my_ranges[my_i+1] 562 ,my_min_serial 563 ,my_delta); 564 END IF; 565 ELSE 566 -- events are obsolete, delete them 567 DELETE FROM merchant_statistic_counter_event 568 WHERE imeta_serial_id = my_meta 569 AND merchant_serial = my_instance_id 570 AND slot < my_time - my_range; 571 END IF; 572 END IF; 573 574 my_rval.range = my_range; 575 RETURN NEXT my_rval; 576 END IF; 577 END LOOP; 578 END $$; 579 580 COMMENT ON FUNCTION merchant_statistic_interval_number_get 581 IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value for each range'; 582 583 584 DROP FUNCTION IF EXISTS merchant_statistic_interval_amount_get; 585 CREATE FUNCTION merchant_statistic_interval_amount_get ( 586 IN in_slug TEXT, 587 IN in_instance_id TEXT 588 ) 589 RETURNS SETOF merchant_statistic_interval_amount_get_return_value 590 LANGUAGE plpgsql 591 AS $$ 592 DECLARE 593 my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 594 my_ranges INT8[]; 595 my_range INT8; 596 my_delta_value INT8; 597 my_delta_frac INT8; 598 my_meta INT8; 599 my_instance_id INT8; 600 my_next_max_serial INT8; 601 my_currency TEXT; 602 my_rec RECORD; 603 my_irec RECORD; 604 my_jrec RECORD; 605 my_i INT; 606 my_min_serial INT8 DEFAULT NULL; 607 my_rval merchant_statistic_interval_amount_get_return_value; 608 BEGIN 609 SELECT merchant_serial 610 INTO my_instance_id 611 FROM merchant_instances 612 WHERE merchant_id=in_instance_id; 613 IF NOT FOUND 614 THEN 615 RETURN; 616 END IF; 617 618 SELECT imeta_serial_id 619 ,ranges 620 ,precisions 621 INTO my_rec 622 FROM merchant_statistic_interval_meta 623 WHERE slug=in_slug; 624 IF NOT FOUND 625 THEN 626 RETURN; 627 END IF; 628 629 my_meta = my_rec.imeta_serial_id; 630 my_ranges = my_rec.ranges; 631 632 FOR my_currency IN 633 SELECT DISTINCT delta_curr 634 FROM merchant_statistic_amount_event 635 WHERE imeta_serial_id = my_meta 636 LOOP 637 638 my_rval.rvalue.val = 0; 639 my_rval.rvalue.frac = 0; 640 my_rval.rvalue.curr = my_currency; 641 642 FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) 643 LOOP 644 my_range = my_ranges[my_i]; 645 SELECT event_delimiter 646 ,cumulative_value 647 ,cumulative_frac 648 INTO my_irec 649 FROM merchant_statistic_interval_amount 650 WHERE imeta_serial_id = my_meta 651 AND merchant_serial = my_instance_id 652 AND curr = my_currency 653 AND range = my_range; 654 655 IF FOUND 656 THEN 657 my_min_serial = my_irec.event_delimiter; 658 my_rval.rvalue.val = (my_rval.rvalue).val + my_irec.cumulative_value + my_irec.cumulative_frac / 100000000; 659 my_rval.rvalue.frac = (my_rval.rvalue).frac + my_irec.cumulative_frac % 100000000; 660 IF (my_rval.rvalue).frac > 100000000 661 THEN 662 my_rval.rvalue.frac = (my_rval.rvalue).frac - 100000000; 663 my_rval.rvalue.val = (my_rval.rvalue).val + 1; 664 END IF; 665 666 -- Check if we have events that left the applicable range 667 SELECT SUM(delta_value) AS value_sum 668 ,SUM(delta_frac) AS frac_sum 669 INTO my_jrec 670 FROM merchant_statistic_amount_event 671 WHERE imeta_serial_id = my_meta 672 AND merchant_serial = my_instance_id 673 AND delta_curr = my_currency 674 AND slot < my_time - my_range 675 AND aevent_serial_id >= my_min_serial; 676 677 IF FOUND AND my_jrec.value_sum IS NOT NULL 678 THEN 679 -- Normalize sum 680 my_delta_value = my_jrec.value_sum + my_jrec.frac_sum / 100000000; 681 my_delta_frac = my_jrec.frac_sum % 100000000; 682 my_rval.rvalue.val = (my_rval.rvalue).val - my_delta_value; 683 IF ((my_rval.rvalue).frac >= my_delta_frac) 684 THEN 685 my_rval.rvalue.frac = (my_rval.rvalue).frac - my_delta_frac; 686 ELSE 687 my_rval.rvalue.frac = 100000000 + (my_rval.rvalue).frac - my_delta_frac; 688 my_rval.rvalue.val = (my_rval.rvalue).val - 1; 689 END IF; 690 691 -- First find out the next event delimiter value 692 SELECT aevent_serial_id 693 INTO my_next_max_serial 694 FROM merchant_statistic_amount_event 695 WHERE imeta_serial_id = my_meta 696 AND merchant_serial = my_instance_id 697 AND delta_curr = my_currency 698 AND slot >= my_time - my_range 699 AND aevent_serial_id >= my_min_serial 700 ORDER BY slot ASC 701 LIMIT 1; 702 IF FOUND 703 THEN 704 -- remove expired events from the sum of the current slot 705 UPDATE merchant_statistic_interval_amount SET 706 cumulative_value = cumulative_value - my_delta_value 707 - CASE 708 WHEN cumulative_frac < my_delta_frac 709 THEN 1 710 ELSE 0 711 END, 712 cumulative_frac = cumulative_frac - my_delta_frac 713 + CASE 714 WHEN cumulative_frac < my_delta_frac 715 THEN 100000000 716 ELSE 0 717 END, 718 event_delimiter = my_next_max_serial 719 WHERE imeta_serial_id = my_meta 720 AND merchant_serial = my_instance_id 721 AND curr = my_currency 722 AND range = my_range; 723 ELSE 724 -- actually, slot is now empty, remove it entirely 725 DELETE FROM merchant_statistic_interval_amount 726 WHERE imeta_serial_id = my_meta 727 AND merchant_serial = my_instance_id 728 AND curr = my_currency 729 AND range = my_range; 730 END IF; 731 IF (my_i < array_length(my_ranges,1)) 732 THEN 733 -- carry over all events into the next (larger) slot 734 UPDATE merchant_statistic_interval_amount AS msia SET 735 cumulative_value = cumulative_value + my_delta_value 736 + CASE 737 WHEN cumulative_frac + my_delta_frac > 100000000 738 THEN 1 739 ELSE 0 740 END, 741 cumulative_frac = cumulative_frac + my_delta_value 742 - CASE 743 WHEN cumulative_frac + my_delta_frac > 100000000 744 THEN 100000000 745 ELSE 0 746 END, 747 event_delimiter = LEAST (msia.event_delimiter,my_min_serial) 748 WHERE imeta_serial_id = my_meta 749 AND merchant_serial = my_instance_id 750 AND range=my_ranges[my_i+1]; 751 IF NOT FOUND 752 THEN 753 INSERT INTO merchant_statistic_interval_amount 754 (imeta_serial_id 755 ,merchant_serial 756 ,event_delimiter 757 ,range 758 ,curr 759 ,cumulative_value 760 ,cumulative_frac 761 ) VALUES ( 762 my_meta 763 ,my_instance_id 764 ,my_min_serial 765 ,my_ranges[my_i+1] 766 ,my_currency 767 ,my_delta_value 768 ,my_delta_frac); 769 END IF; 770 ELSE 771 -- events are obsolete, delete them 772 DELETE FROM merchant_statistic_amount_event 773 WHERE imeta_serial_id = my_meta 774 AND merchant_serial = my_instance_id 775 AND slot < my_time - my_range; 776 END IF; 777 END IF; 778 779 my_rval.range = my_range; 780 RETURN NEXT my_rval; 781 END IF; 782 END LOOP; -- over my_ranges 783 END LOOP; -- over my_currency 784 END $$; 785 786 COMMENT ON FUNCTION merchant_statistic_interval_amount_get 787 IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value; multiple values are returned, one per currency and range'; 788 789 790 791 792 793 DROP PROCEDURE IF EXISTS merchant_statistic_counter_gc; 794 CREATE OR REPLACE PROCEDURE merchant_statistic_counter_gc () 795 LANGUAGE plpgsql 796 AS $$ 797 DECLARE 798 my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 799 my_instance INT8; 800 my_instance_name TEXT; 801 my_rec RECORD; 802 my_sum RECORD; 803 my_meta INT8; 804 my_ranges INT8[]; 805 my_precisions INT8[]; 806 my_precision INT4; 807 my_i INT4; 808 min_slot INT8; 809 max_slot INT8; 810 end_slot INT8; 811 my_total INT8; 812 BEGIN 813 -- GC for all instances 814 FOR my_instance IN 815 SELECT DISTINCT merchant_serial 816 FROM merchant_statistic_counter_event 817 LOOP 818 -- Do combination work for all numeric statistic events 819 FOR my_rec IN 820 SELECT imeta_serial_id 821 ,ranges 822 ,precisions 823 ,slug 824 FROM merchant_statistic_interval_meta 825 LOOP 826 -- First, we query the current interval statistic to update its counters 827 SELECT merchant_id 828 INTO my_instance_name 829 FROM merchant_instances 830 WHERE merchant_serial = my_instance; 831 PERFORM FROM merchant_statistic_interval_number_get (my_rec.slug, my_instance_name); 832 833 my_meta = my_rec.imeta_serial_id; 834 my_ranges = my_rec.ranges; 835 my_precisions = my_rec.precisions; 836 837 FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) 838 LOOP 839 my_precision = my_precisions[my_i]; 840 IF 1 >= my_precision 841 THEN 842 -- Cannot coarsen in this case 843 CONTINUE; 844 END IF; 845 846 IF 1 = my_i 847 THEN 848 min_slot = 0; 849 ELSE 850 min_slot = my_ranges[my_i - 1]; 851 END IF; 852 end_slot = my_ranges[my_i]; 853 RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision; 854 855 LOOP 856 EXIT WHEN min_slot >= end_slot; 857 max_slot = min_slot + my_precision; 858 SELECT SUM(delta) AS total, 859 COUNT(*) AS matches, 860 MIN(nevent_serial_id) AS rep_serial_id 861 INTO my_sum 862 FROM merchant_statistic_counter_event 863 WHERE merchant_serial=my_instance 864 AND imeta_serial_id=my_meta 865 AND slot >= my_time - max_slot 866 AND slot < my_time - min_slot; 867 868 RAISE NOTICE 'Found % entries between [%,%)', my_sum.matches, my_time - max_slot, my_time - min_slot; 869 -- we only proceed if we had more then one match (optimization) 870 IF FOUND AND my_sum.matches > 1 871 THEN 872 my_total = my_sum.total; 873 874 RAISE NOTICE 'combining % entries to representative % for slots [%-%)', my_sum.matches, my_sum.rep_serial_id, my_time - max_slot, my_time - min_slot; 875 876 -- combine entries 877 DELETE FROM merchant_statistic_counter_event 878 WHERE merchant_serial=my_instance 879 AND imeta_serial_id=my_meta 880 AND slot >= my_time - max_slot 881 AND slot < my_time - min_slot 882 AND nevent_serial_id > my_sum.rep_serial_id; 883 -- Now update the representative to the sum 884 UPDATE merchant_statistic_counter_event SET 885 delta = my_total 886 WHERE imeta_serial_id = my_meta 887 AND merchant_serial = my_instance 888 AND nevent_serial_id = my_sum.rep_serial_id; 889 END IF; 890 min_slot = min_slot + my_precision; 891 END LOOP; -- min_slot to end_slot by precision loop 892 END LOOP; -- my_i loop 893 -- Finally, delete all events beyond the range we care about 894 895 RAISE NOTICE 'deleting entries of %/% before % - % = %', my_instance, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)]; 896 DELETE FROM merchant_statistic_counter_event 897 WHERE merchant_serial=my_instance 898 AND imeta_serial_id=my_meta 899 AND slot < my_time - my_ranges[array_length(my_ranges,1)]; 900 END LOOP; -- my_rec loop 901 END LOOP; -- my_instance loop 902 END $$; 903 COMMENT ON PROCEDURE merchant_statistic_counter_gc 904 IS 'Performs garbage collection and compaction of the merchant_statistic_counter_event table'; 905 906 907 908 DROP PROCEDURE IF EXISTS merchant_statistic_amount_gc; 909 CREATE OR REPLACE PROCEDURE merchant_statistic_amount_gc () 910 LANGUAGE plpgsql 911 AS $$ 912 DECLARE 913 my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; 914 my_instance INT8; 915 my_instance_name TEXT; 916 my_rec RECORD; 917 my_sum RECORD; 918 my_meta INT8; 919 my_ranges INT8[]; 920 my_precisions INT8[]; 921 my_precision INT4; 922 my_currency TEXT; 923 my_i INT4; 924 min_slot INT8; 925 max_slot INT8; 926 end_slot INT8; 927 my_total_val INT8; 928 my_total_frac INT8; 929 BEGIN 930 -- GC for all instances 931 FOR my_instance IN 932 SELECT DISTINCT merchant_serial 933 FROM merchant_statistic_counter_event 934 LOOP 935 -- Do combination work for all numeric statistic events 936 FOR my_rec IN 937 SELECT imeta_serial_id 938 ,ranges 939 ,precisions 940 ,slug 941 FROM merchant_statistic_interval_meta 942 LOOP 943 944 -- First, we query the current interval statistic to update its counters 945 SELECT merchant_id 946 INTO my_instance_name 947 FROM merchant_instances 948 WHERE merchant_serial = my_instance; 949 PERFORM FROM merchant_statistic_interval_amount_get (my_rec.slug, my_instance_name); 950 951 my_meta = my_rec.imeta_serial_id; 952 my_ranges = my_rec.ranges; 953 my_precisions = my_rec.precisions; 954 FOR my_currency IN 955 SELECT DISTINCT delta_curr 956 FROM merchant_statistic_amount_event 957 WHERE imeta_serial_id = my_meta 958 LOOP 959 960 FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) 961 LOOP 962 my_precision = my_precisions[my_i]; 963 IF 1 >= my_precision 964 THEN 965 -- Cannot coarsen in this case 966 CONTINUE; 967 END IF; 968 969 IF 1 = my_i 970 THEN 971 min_slot = 0; 972 ELSE 973 min_slot = my_ranges[my_i - 1]; 974 END IF; 975 end_slot = my_ranges[my_i]; 976 977 RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision; 978 LOOP 979 EXIT WHEN min_slot >= end_slot; 980 max_slot = min_slot + my_precision; 981 SELECT SUM(delta_value) AS total_val, 982 SUM(delta_frac) AS total_frac, 983 COUNT(*) AS matches, 984 MIN(aevent_serial_id) AS rep_serial_id 985 INTO my_sum 986 FROM merchant_statistic_amount_event 987 WHERE imeta_serial_id=my_meta 988 AND merchant_serial=my_instance 989 AND delta_curr = my_currency 990 AND slot >= my_time - max_slot 991 AND slot < my_time - max_slot; 992 -- we only proceed if we had more then one match (optimization) 993 IF FOUND AND my_sum.matches > 1 994 THEN 995 -- normalize new total 996 my_total_frac = my_sum.total_frac % 100000000; 997 my_total_val = my_sum.total_val + my_sum.total_frac / 100000000; 998 999 -- combine entries 1000 DELETE FROM merchant_statistic_amount_event 1001 WHERE imeta_serial_id=my_meta 1002 AND merchant_serial=my_instance 1003 AND delta_curr = my_currency 1004 AND slot >= my_time - max_slot 1005 AND slot < my_time - max_slot 1006 AND aevent_serial_id > my_sum.rep_serial_id; 1007 -- Now update the representative to the sum 1008 UPDATE merchant_statistic_amount_event SET 1009 delta_value = my_total_value 1010 ,delta_frac = my_total_frac 1011 WHERE imeta_serial_id = my_meta 1012 AND merchant_serial = my_instance 1013 AND delta_curr = my_currency 1014 AND aevent_serial_id = my_sum.rep_serial_id; 1015 END IF; 1016 min_slot = min_slot + my_precision; 1017 END LOOP; -- min_slot to end_slot by precision loop 1018 END LOOP; -- my_i loop 1019 END LOOP; -- my_currency loop 1020 -- Finally, delete all events beyond the range we care about 1021 1022 RAISE NOTICE 'deleting entries of %/% before % - % = %', my_instance, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)]; 1023 DELETE FROM merchant_statistic_amount_event 1024 WHERE merchant_serial=my_instance 1025 AND imeta_serial_id=my_meta 1026 AND slot < my_time - my_ranges[array_length(my_ranges,1)]; 1027 END LOOP; -- my_rec loop 1028 END LOOP; -- my_instance loop 1029 END $$; 1030 COMMENT ON PROCEDURE merchant_statistic_amount_gc 1031 IS 'Performs garbage collection and compaction of the merchant_statistic_amount_event table'; 1032 1033 1034 1035 DROP PROCEDURE IF EXISTS merchant_statistic_bucket_gc; 1036 CREATE OR REPLACE PROCEDURE merchant_statistic_bucket_gc () 1037 LANGUAGE plpgsql 1038 AS $$ 1039 DECLARE 1040 my_rec RECORD; 1041 my_range TEXT; 1042 my_now INT8; 1043 my_end INT8; 1044 BEGIN 1045 my_now = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP(0)::TIMESTAMP); -- seconds since epoch 1046 FOR my_rec IN 1047 SELECT bmeta_serial_id 1048 ,stype 1049 ,ranges[array_length(ranges,1)] AS range 1050 ,ages[array_length(ages,1)] AS age 1051 FROM merchant_statistic_bucket_meta 1052 LOOP 1053 my_range = '1 ' || my_rec.range::TEXT; 1054 my_end = my_now - my_rec.age * EXTRACT(SECONDS FROM (SELECT my_range::INTERVAL)); -- age is given in multiples of the range (in seconds) 1055 IF my_rec.stype = 'amount' 1056 THEN 1057 DELETE 1058 FROM merchant_statistic_bucket_amount 1059 WHERE bmeta_serial_id = my_rec.bmeta_serial_id 1060 AND bucket_start >= my_end; 1061 ELSE 1062 DELETE 1063 FROM merchant_statistic_bucket_counter 1064 WHERE bmeta_serial_id = my_rec.bmeta_serial_id 1065 AND bucket_start >= my_end; 1066 END IF; 1067 END LOOP; 1068 END $$; 1069 COMMENT ON PROCEDURE merchant_statistic_bucket_gc 1070 IS 'Performs garbage collection of the merchant_statistic_bucket_counter and merchant_statistic_bucket_amount tables'; 1071 1072 1073 1074 -- The date_trunc may not be necessary if we assume it is already truncated 1075 DROP FUNCTION IF EXISTS merchant_statistics_bucket_end; 1076 CREATE FUNCTION merchant_statistics_bucket_end ( 1077 IN in_bucket_start INT8, 1078 IN in_range statistic_range, 1079 OUT out_bucket_end INT8 1080 ) 1081 LANGUAGE plpgsql 1082 AS $$ 1083 BEGIN 1084 IF in_range='quarter' 1085 THEN 1086 out_bucket_end = EXTRACT(EPOCH FROM CAST(date_trunc('quarter', to_timestamp(in_bucket_start)::date) + interval '3 months' AS date)); 1087 ELSE 1088 out_bucket_end = EXTRACT(EPOCH FROM CAST(to_timestamp(in_bucket_start)::date + ('1 ' || in_range)::interval AS date)); 1089 END IF; 1090 END $$; 1091 COMMENT ON FUNCTION merchant_statistics_bucket_end 1092 IS 'computes the end time of the bucket for an event at the current time given the desired bucket range';