merchant

Merchant backend to process payments, run by merchants
Log | Files | Refs | Submodules | README | LICENSE

pg_statistics_helpers.sql (33453B)


      1 --
      2 -- This file is part of TALER
      3 -- Copyright (C) 2025 Taler Systems SA
      4 --
      5 -- TALER is free software; you can redistribute it and/or modify it under the
      6 -- terms of the GNU General Public License as published by the Free Software
      7 -- Foundation; either version 3, or (at your option) any later version.
      8 --
      9 -- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10 -- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11 -- A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 --
     13 -- You should have received a copy of the GNU General Public License along with
     14 -- TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 --
     16 
     17 SET search_path TO merchant;
     18 DROP FUNCTION IF EXISTS interval_to_start;
     19 CREATE OR REPLACE FUNCTION interval_to_start (
     20   IN in_timestamp TIMESTAMP,
     21   IN in_range statistic_range,
     22   OUT out_bucket_start INT8
     23 )
     24 LANGUAGE plpgsql
     25 AS $$
     26 BEGIN
     27   out_bucket_start = EXTRACT(EPOCH FROM DATE_TRUNC(in_range::text, in_timestamp));
     28 END $$;
     29 COMMENT ON FUNCTION interval_to_start
     30  IS 'computes the start time of the bucket for an event at the current time given the desired bucket range';
     31 
     32 
     33 DROP PROCEDURE IF EXISTS merchant_do_bump_number_bucket_stat;
     34 CREATE OR REPLACE PROCEDURE merchant_do_bump_number_bucket_stat(
     35   in_slug TEXT,
     36   in_merchant_serial BIGINT,
     37   in_timestamp TIMESTAMP,
     38   in_delta INT8
     39 )
     40 LANGUAGE plpgsql
     41 AS $$
     42 DECLARE
     43   my_meta INT8;
     44   my_range statistic_range;
     45   my_bucket_start INT8;
     46   my_curs CURSOR (arg_slug TEXT)
     47    FOR SELECT UNNEST(ranges)
     48          FROM merchant_statistic_bucket_meta
     49         WHERE slug=arg_slug;
     50 BEGIN
     51   SELECT bmeta_serial_id
     52     INTO my_meta
     53     FROM merchant_statistic_bucket_meta
     54    WHERE slug=in_slug
     55      AND stype='number';
     56   IF NOT FOUND
     57   THEN
     58     RETURN;
     59   END IF;
     60   OPEN my_curs (arg_slug:=in_slug);
     61   LOOP
     62     FETCH NEXT
     63       FROM my_curs
     64       INTO my_range;
     65     EXIT WHEN NOT FOUND;
     66     SELECT *
     67       INTO my_bucket_start
     68       FROM interval_to_start (in_timestamp, my_range);
     69 
     70     UPDATE merchant_statistic_bucket_counter
     71        SET cumulative_number = cumulative_number + in_delta
     72      WHERE bmeta_serial_id=my_meta
     73        AND merchant_serial=in_merchant_serial
     74        AND bucket_start=my_bucket_start
     75        AND bucket_range=my_range;
     76     IF NOT FOUND
     77     THEN
     78       INSERT INTO merchant_statistic_bucket_counter
     79         (bmeta_serial_id
     80         ,merchant_serial
     81         ,bucket_start
     82         ,bucket_range
     83         ,cumulative_number
     84         ) VALUES (
     85          my_meta
     86         ,in_merchant_serial
     87         ,my_bucket_start
     88         ,my_range
     89         ,in_delta);
     90     END IF;
     91   END LOOP;
     92   CLOSE my_curs;
     93 END $$;
     94 
     95 
     96 DROP PROCEDURE IF EXISTS merchant_do_bump_amount_bucket_stat;
     97 CREATE OR REPLACE PROCEDURE merchant_do_bump_amount_bucket_stat(
     98   in_slug TEXT,
     99   in_merchant_serial BIGINT,
    100   in_timestamp TIMESTAMP,
    101   in_delta taler_amount_currency
    102 )
    103 LANGUAGE plpgsql
    104 AS $$
    105 DECLARE
    106   my_meta INT8;
    107   my_range statistic_range;
    108   my_bucket_start INT8;
    109   my_curs CURSOR (arg_slug TEXT)
    110    FOR SELECT UNNEST(ranges)
    111          FROM merchant_statistic_bucket_meta
    112         WHERE slug=arg_slug;
    113 BEGIN
    114   SELECT bmeta_serial_id
    115     INTO my_meta
    116     FROM merchant_statistic_bucket_meta
    117    WHERE slug=in_slug
    118      AND stype='amount';
    119   IF NOT FOUND
    120   THEN
    121     RETURN;
    122   END IF;
    123   OPEN my_curs (arg_slug:=in_slug);
    124   LOOP
    125     FETCH NEXT
    126       FROM my_curs
    127       INTO my_range;
    128     EXIT WHEN NOT FOUND;
    129     SELECT *
    130       INTO my_bucket_start
    131       FROM interval_to_start (in_timestamp, my_range);
    132 
    133     UPDATE merchant_statistic_bucket_amount
    134       SET
    135         cumulative_value = cumulative_value + (in_delta).val
    136         + CASE
    137             WHEN (in_delta).frac + cumulative_frac >= 100000000
    138             THEN 1
    139             ELSE 0
    140           END,
    141         cumulative_frac = cumulative_frac + (in_delta).frac
    142         - CASE
    143             WHEN (in_delta).frac + cumulative_frac >= 100000000
    144             THEN 100000000
    145             ELSE 0
    146           END
    147      WHERE bmeta_serial_id=my_meta
    148        AND merchant_serial=in_merchant_serial
    149        AND curr=(in_delta).curr
    150        AND bucket_start=my_bucket_start
    151        AND bucket_range=my_range;
    152     IF NOT FOUND
    153     THEN
    154       INSERT INTO merchant_statistic_bucket_amount
    155         (bmeta_serial_id
    156         ,merchant_serial
    157         ,bucket_start
    158         ,bucket_range
    159         ,curr
    160         ,cumulative_value
    161         ,cumulative_frac
    162         ) VALUES (
    163          my_meta
    164         ,in_merchant_serial
    165         ,my_bucket_start
    166         ,my_range
    167         ,(in_delta).curr
    168         ,(in_delta).val
    169         ,(in_delta).frac);
    170     END IF;
    171   END LOOP;
    172   CLOSE my_curs;
    173 END $$;
    174 
    175 COMMENT ON PROCEDURE merchant_do_bump_amount_bucket_stat
    176   IS 'Updates an amount statistic tracked over buckets';
    177 
    178 
    179 DROP PROCEDURE IF EXISTS merchant_do_bump_number_interval_stat;
    180 CREATE OR REPLACE PROCEDURE merchant_do_bump_number_interval_stat(
    181   in_slug TEXT,
    182   in_merchant_serial BIGINT,
    183   in_timestamp TIMESTAMP,
    184   in_delta INT8
    185 )
    186 LANGUAGE plpgsql
    187 AS $$
    188 DECLARE
    189   my_now INT8;
    190   my_record RECORD;
    191   my_meta INT8;
    192   my_ranges INT8[];
    193   my_precisions INT8[];
    194   my_rangex INT8;
    195   my_precisionx INT8;
    196   my_start INT8;
    197   my_event INT8;
    198 BEGIN
    199   my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    200   SELECT imeta_serial_id
    201         ,ranges AS ranges
    202         ,precisions AS precisions
    203     INTO my_record
    204     FROM merchant_statistic_interval_meta
    205    WHERE slug=in_slug
    206      AND stype='number';
    207   IF NOT FOUND
    208   THEN
    209     RETURN;
    210   END IF;
    211 
    212   my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds
    213   my_precisions = my_record.precisions;
    214   my_ranges = my_record.ranges;
    215   my_rangex = NULL;
    216   FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0)
    217   LOOP
    218     IF my_now - my_ranges[my_x] < my_start
    219     THEN
    220       my_rangex = my_ranges[my_x];
    221       my_precisionx = my_precisions[my_x];
    222       EXIT;
    223     END IF;
    224   END LOOP;
    225   IF my_rangex IS NULL
    226   THEN
    227     -- event is beyond the ranges we care about
    228     RETURN;
    229   END IF;
    230 
    231   my_meta = my_record.imeta_serial_id;
    232   my_start = my_start - my_start % my_precisionx; -- round down
    233 
    234   INSERT INTO merchant_statistic_counter_event AS msce
    235     (imeta_serial_id
    236     ,merchant_serial
    237     ,slot
    238     ,delta)
    239    VALUES
    240     (my_meta
    241     ,in_merchant_serial
    242     ,my_start
    243     ,in_delta)
    244    ON CONFLICT (imeta_serial_id, merchant_serial, slot)
    245    DO UPDATE SET
    246      delta = msce.delta + in_delta
    247    RETURNING nevent_serial_id
    248         INTO my_event;
    249 
    250   UPDATE merchant_statistic_interval_counter
    251      SET cumulative_number = cumulative_number + in_delta
    252    WHERE imeta_serial_id = my_meta
    253      AND merchant_serial = in_merchant_serial
    254      AND range=my_rangex;
    255   IF NOT FOUND
    256   THEN
    257     INSERT INTO merchant_statistic_interval_counter
    258       (imeta_serial_id
    259       ,merchant_serial
    260       ,range
    261       ,event_delimiter
    262       ,cumulative_number
    263      ) VALUES (
    264        my_meta
    265       ,in_merchant_serial
    266       ,my_rangex
    267       ,my_event
    268       ,in_delta);
    269   END IF;
    270 END $$;
    271 
    272 COMMENT ON PROCEDURE merchant_do_bump_number_interval_stat
    273   IS 'Updates a numeric statistic tracked over an interval';
    274 
    275 
    276 DROP PROCEDURE IF EXISTS merchant_do_bump_amount_interval_stat;
    277 CREATE OR REPLACE PROCEDURE merchant_do_bump_amount_interval_stat(
    278   in_slug TEXT,
    279   in_merchant_serial BIGINT,
    280   in_timestamp TIMESTAMP,
    281   in_delta taler_amount_currency -- new amount in table that we should add to the tracker
    282 )
    283 LANGUAGE plpgsql
    284 AS $$
    285 DECLARE
    286   my_now INT8;
    287   my_record RECORD;
    288   my_meta INT8;
    289   my_ranges INT8[];
    290   my_precisions INT8[];
    291   my_x INT;
    292   my_rangex INT8;
    293   my_precisionx INT8;
    294   my_start INT8;
    295   my_event INT8;
    296 BEGIN
    297   my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    298   SELECT imeta_serial_id
    299         ,ranges
    300         ,precisions
    301     INTO my_record
    302     FROM merchant_statistic_interval_meta
    303    WHERE slug=in_slug
    304      AND stype='amount';
    305   IF NOT FOUND
    306   THEN
    307     RETURN;
    308   END IF;
    309 
    310   my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds since epoch
    311   my_precisions = my_record.precisions;
    312   my_ranges = my_record.ranges;
    313   my_rangex = NULL;
    314   FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0)
    315   LOOP
    316     IF my_now - my_ranges[my_x] < my_start
    317     THEN
    318       my_rangex = my_ranges[my_x];
    319       my_precisionx = my_precisions[my_x];
    320       EXIT;
    321     END IF;
    322   END LOOP;
    323   IF my_rangex IS NULL
    324   THEN
    325     -- event is beyond the ranges we care about
    326     RETURN;
    327   END IF;
    328   my_start = my_start - my_start % my_precisionx; -- round down
    329   my_meta = my_record.imeta_serial_id;
    330 
    331   INSERT INTO merchant_statistic_amount_event AS msae
    332     (imeta_serial_id
    333     ,merchant_serial
    334     ,slot
    335     ,delta_curr
    336     ,delta_value
    337     ,delta_frac
    338     ) VALUES (
    339      my_meta
    340     ,in_merchant_serial
    341     ,my_start
    342     ,(in_delta).curr
    343     ,(in_delta).val
    344     ,(in_delta).frac
    345     )
    346     ON CONFLICT (imeta_serial_id, merchant_serial, slot, delta_curr)
    347     DO UPDATE SET
    348       delta_value = msae.delta_value + (in_delta).val
    349         + CASE
    350           WHEN (in_delta).frac + msae.delta_frac >= 100000000
    351           THEN 1
    352           ELSE 0
    353         END,
    354       delta_frac = msae.delta_frac + (in_delta).frac
    355         - CASE
    356           WHEN (in_delta).frac + msae.delta_frac >= 100000000
    357           THEN 100000000
    358           ELSE 0
    359         END
    360     RETURNING aevent_serial_id
    361          INTO my_event;
    362 
    363   UPDATE merchant_statistic_interval_amount
    364     SET
    365       cumulative_value = cumulative_value + (in_delta).val
    366       + CASE
    367           WHEN (in_delta).frac + cumulative_frac >= 100000000
    368           THEN 1
    369           ELSE 0
    370         END,
    371       cumulative_frac = cumulative_frac + (in_delta).frac
    372       - CASE
    373           WHEN (in_delta).frac + cumulative_frac >= 100000000
    374           THEN 100000000
    375           ELSE 0
    376         END
    377    WHERE imeta_serial_id=my_meta
    378      AND merchant_serial=in_merchant_serial
    379      AND range=my_rangex
    380      AND curr=(in_delta).curr;
    381   IF NOT FOUND
    382   THEN
    383     INSERT INTO merchant_statistic_interval_amount
    384       (imeta_serial_id
    385       ,merchant_serial
    386       ,range
    387       ,event_delimiter
    388       ,curr
    389       ,cumulative_value
    390       ,cumulative_frac
    391       ) VALUES (
    392        my_meta
    393       ,in_merchant_serial
    394       ,my_rangex
    395       ,my_event
    396       ,(in_delta).curr
    397       ,(in_delta).val
    398       ,(in_delta).frac);
    399   END IF;
    400 END $$;
    401 COMMENT ON PROCEDURE merchant_do_bump_amount_interval_stat
    402   IS 'Updates an amount statistic tracked over an interval';
    403 
    404 
    405 DROP PROCEDURE IF EXISTS merchant_do_bump_number_stat;
    406 CREATE OR REPLACE PROCEDURE merchant_do_bump_number_stat(
    407   in_slug TEXT,
    408   in_merchant_serial BIGINT,
    409   in_timestamp TIMESTAMP,
    410   in_delta INT8
    411 )
    412 LANGUAGE plpgsql
    413 AS $$
    414 BEGIN
    415   CALL merchant_do_bump_number_bucket_stat (in_slug, in_merchant_serial, in_timestamp, in_delta);
    416   CALL merchant_do_bump_number_interval_stat (in_slug, in_merchant_serial, in_timestamp, in_delta);
    417 END $$;
    418 COMMENT ON PROCEDURE merchant_do_bump_number_stat
    419   IS 'Updates a numeric statistic (bucket or interval)';
    420 
    421 
    422 DROP PROCEDURE IF EXISTS merchant_do_bump_amount_stat;
    423 CREATE OR REPLACE PROCEDURE merchant_do_bump_amount_stat(
    424   in_slug TEXT,
    425   in_merchant_serial BIGINT,
    426   in_timestamp TIMESTAMP,
    427   in_delta taler_amount_currency
    428 )
    429 LANGUAGE plpgsql
    430 AS $$
    431 BEGIN
    432   CALL merchant_do_bump_amount_bucket_stat (in_slug, in_merchant_serial, in_timestamp, in_delta);
    433   CALL merchant_do_bump_amount_interval_stat (in_slug, in_merchant_serial, in_timestamp, in_delta);
    434 END $$;
    435 COMMENT ON PROCEDURE merchant_do_bump_amount_stat
    436   IS 'Updates an amount statistic (bucket or interval)';
    437 
    438 
    439 DROP FUNCTION IF EXISTS merchant_statistic_interval_number_get;
    440 CREATE FUNCTION merchant_statistic_interval_number_get (
    441   IN in_slug TEXT,
    442   IN in_instance_id TEXT
    443 )
    444 RETURNS SETOF merchant_statistic_interval_number_get_return_value
    445 LANGUAGE plpgsql
    446 AS $$
    447 DECLARE
    448   my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    449   my_ranges INT8[];
    450   my_range INT8;
    451   my_delta INT8;
    452   my_meta INT8;
    453   my_next_max_serial INT8;
    454   my_instance_id INT8;
    455   my_rec RECORD;
    456   my_irec RECORD;
    457   my_i INT;
    458   my_min_serial INT8 DEFAULT NULL;
    459   my_rval merchant_statistic_interval_number_get_return_value;
    460 BEGIN
    461   SELECT merchant_serial
    462     INTO my_instance_id
    463     FROM merchant_instances
    464    WHERE merchant_id=in_instance_id;
    465   IF NOT FOUND
    466   THEN
    467     RETURN;
    468   END IF;
    469 
    470   SELECT imeta_serial_id
    471         ,ranges
    472         ,precisions
    473     INTO my_rec
    474     FROM merchant_statistic_interval_meta
    475    WHERE slug=in_slug;
    476   IF NOT FOUND
    477   THEN
    478     RETURN;
    479   END IF;
    480   my_rval.rvalue = 0;
    481   my_ranges = my_rec.ranges;
    482   my_meta = my_rec.imeta_serial_id;
    483 
    484   FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0)
    485   LOOP
    486     my_range = my_ranges[my_i];
    487     SELECT event_delimiter
    488           ,cumulative_number
    489       INTO my_irec
    490       FROM merchant_statistic_interval_counter
    491      WHERE imeta_serial_id = my_meta
    492        AND range = my_range
    493        AND merchant_serial = my_instance_id;
    494     IF FOUND
    495     THEN
    496       my_min_serial = my_irec.event_delimiter;
    497       my_rval.rvalue = my_rval.rvalue + my_irec.cumulative_number;
    498 
    499       -- Check if we have events that left the applicable range
    500       SELECT SUM(delta) AS delta_sum
    501         INTO my_irec
    502         FROM merchant_statistic_counter_event
    503        WHERE imeta_serial_id = my_meta
    504          AND merchant_serial = my_instance_id
    505          AND slot < my_time - my_range
    506          AND nevent_serial_id >= my_min_serial;
    507 
    508       IF FOUND AND my_irec.delta_sum IS NOT NULL
    509       THEN
    510         my_delta = my_irec.delta_sum;
    511         my_rval.rvalue = my_rval.rvalue - my_delta;
    512 
    513         -- First find out the next event delimiter value
    514         SELECT nevent_serial_id
    515           INTO my_next_max_serial
    516           FROM merchant_statistic_counter_event
    517          WHERE imeta_serial_id = my_meta
    518            AND merchant_serial = my_instance_id
    519            AND slot >= my_time - my_range
    520            AND nevent_serial_id >= my_min_serial
    521          ORDER BY slot ASC
    522          LIMIT 1;
    523 
    524         IF FOUND
    525         THEN
    526           -- remove expired events from the sum of the current slot
    527 
    528           UPDATE merchant_statistic_interval_counter
    529              SET cumulative_number = cumulative_number - my_delta,
    530                  event_delimiter = my_next_max_serial
    531            WHERE imeta_serial_id = my_meta
    532              AND merchant_serial = my_instance_id
    533              AND range = my_range;
    534         ELSE
    535           -- actually, slot is now empty, remove it entirely
    536           DELETE FROM merchant_statistic_interval_counter
    537            WHERE imeta_serial_id = my_meta
    538              AND merchant_serial = my_instance_id
    539              AND range = my_range;
    540         END IF;
    541         IF (my_i < array_length(my_ranges,1))
    542         THEN
    543           -- carry over all events into the next slot
    544           UPDATE merchant_statistic_interval_counter AS usic SET
    545             cumulative_number = cumulative_number + my_delta,
    546             event_delimiter = LEAST(usic.event_delimiter,my_min_serial)
    547            WHERE imeta_serial_id = my_meta
    548              AND merchant_serial = my_instance_id
    549              AND range=my_ranges[my_i+1];
    550           IF NOT FOUND
    551           THEN
    552             INSERT INTO merchant_statistic_interval_counter
    553               (imeta_serial_id
    554               ,merchant_serial
    555               ,range
    556               ,event_delimiter
    557               ,cumulative_number
    558               ) VALUES (
    559                my_meta
    560               ,my_instance_id
    561               ,my_ranges[my_i+1]
    562               ,my_min_serial
    563               ,my_delta);
    564           END IF;
    565         ELSE
    566           -- events are obsolete, delete them
    567           DELETE FROM merchant_statistic_counter_event
    568                 WHERE imeta_serial_id = my_meta
    569                   AND merchant_serial = my_instance_id
    570                   AND slot < my_time - my_range;
    571         END IF;
    572       END IF;
    573 
    574       my_rval.range = my_range;
    575       RETURN NEXT my_rval;
    576     END IF;
    577   END LOOP;
    578 END $$;
    579 
    580 COMMENT ON FUNCTION merchant_statistic_interval_number_get
    581   IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value for each range';
    582 
    583 
    584 DROP FUNCTION IF EXISTS merchant_statistic_interval_amount_get;
    585 CREATE FUNCTION merchant_statistic_interval_amount_get (
    586   IN in_slug TEXT,
    587   IN in_instance_id TEXT
    588 )
    589 RETURNS SETOF merchant_statistic_interval_amount_get_return_value
    590 LANGUAGE plpgsql
    591 AS $$
    592 DECLARE
    593   my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    594   my_ranges INT8[];
    595   my_range INT8;
    596   my_delta_value INT8;
    597   my_delta_frac INT8;
    598   my_meta INT8;
    599   my_instance_id INT8;
    600   my_next_max_serial INT8;
    601   my_currency TEXT;
    602   my_rec RECORD;
    603   my_irec RECORD;
    604   my_jrec RECORD;
    605   my_i INT;
    606   my_min_serial INT8 DEFAULT NULL;
    607   my_rval merchant_statistic_interval_amount_get_return_value;
    608 BEGIN
    609   SELECT merchant_serial
    610     INTO my_instance_id
    611     FROM merchant_instances
    612    WHERE merchant_id=in_instance_id;
    613   IF NOT FOUND
    614   THEN
    615     RETURN;
    616   END IF;
    617 
    618   SELECT imeta_serial_id
    619         ,ranges
    620         ,precisions
    621     INTO my_rec
    622     FROM merchant_statistic_interval_meta
    623    WHERE slug=in_slug;
    624   IF NOT FOUND
    625   THEN
    626     RETURN;
    627   END IF;
    628 
    629   my_meta = my_rec.imeta_serial_id;
    630   my_ranges = my_rec.ranges;
    631 
    632   FOR my_currency IN
    633     SELECT DISTINCT delta_curr
    634       FROM merchant_statistic_amount_event
    635      WHERE imeta_serial_id = my_meta
    636   LOOP
    637 
    638   my_rval.rvalue.val = 0;
    639   my_rval.rvalue.frac = 0;
    640   my_rval.rvalue.curr = my_currency;
    641 
    642   FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0)
    643   LOOP
    644     my_range = my_ranges[my_i];
    645     SELECT event_delimiter
    646           ,cumulative_value
    647           ,cumulative_frac
    648       INTO my_irec
    649       FROM merchant_statistic_interval_amount
    650      WHERE imeta_serial_id = my_meta
    651        AND merchant_serial = my_instance_id
    652        AND curr = my_currency
    653        AND range = my_range;
    654 
    655     IF FOUND
    656     THEN
    657       my_min_serial = my_irec.event_delimiter;
    658       my_rval.rvalue.val = (my_rval.rvalue).val + my_irec.cumulative_value + my_irec.cumulative_frac / 100000000;
    659       my_rval.rvalue.frac = (my_rval.rvalue).frac + my_irec.cumulative_frac % 100000000;
    660       IF (my_rval.rvalue).frac > 100000000
    661       THEN
    662         my_rval.rvalue.frac = (my_rval.rvalue).frac - 100000000;
    663         my_rval.rvalue.val = (my_rval.rvalue).val + 1;
    664       END IF;
    665 
    666       -- Check if we have events that left the applicable range
    667       SELECT SUM(delta_value) AS value_sum
    668             ,SUM(delta_frac) AS frac_sum
    669         INTO my_jrec
    670         FROM merchant_statistic_amount_event
    671        WHERE imeta_serial_id = my_meta
    672          AND merchant_serial = my_instance_id
    673          AND delta_curr = my_currency
    674          AND slot < my_time - my_range
    675          AND aevent_serial_id >= my_min_serial;
    676 
    677       IF FOUND AND my_jrec.value_sum IS NOT NULL
    678       THEN
    679         -- Normalize sum
    680         my_delta_value = my_jrec.value_sum + my_jrec.frac_sum / 100000000;
    681         my_delta_frac = my_jrec.frac_sum % 100000000;
    682         my_rval.rvalue.val = (my_rval.rvalue).val - my_delta_value;
    683         IF ((my_rval.rvalue).frac >= my_delta_frac)
    684         THEN
    685           my_rval.rvalue.frac = (my_rval.rvalue).frac - my_delta_frac;
    686         ELSE
    687           my_rval.rvalue.frac = 100000000 + (my_rval.rvalue).frac - my_delta_frac;
    688           my_rval.rvalue.val = (my_rval.rvalue).val - 1;
    689         END IF;
    690 
    691         -- First find out the next event delimiter value
    692         SELECT aevent_serial_id
    693           INTO my_next_max_serial
    694           FROM merchant_statistic_amount_event
    695          WHERE imeta_serial_id = my_meta
    696            AND merchant_serial = my_instance_id
    697            AND delta_curr = my_currency
    698            AND slot >= my_time - my_range
    699            AND aevent_serial_id >= my_min_serial
    700          ORDER BY slot ASC
    701          LIMIT 1;
    702         IF FOUND
    703         THEN
    704           -- remove expired events from the sum of the current slot
    705           UPDATE merchant_statistic_interval_amount SET
    706              cumulative_value = cumulative_value - my_delta_value
    707               - CASE
    708                   WHEN cumulative_frac < my_delta_frac
    709                   THEN 1
    710                   ELSE 0
    711                 END,
    712              cumulative_frac = cumulative_frac - my_delta_frac
    713              + CASE
    714                  WHEN cumulative_frac < my_delta_frac
    715                  THEN 100000000
    716                  ELSE 0
    717                END,
    718              event_delimiter = my_next_max_serial
    719            WHERE imeta_serial_id = my_meta
    720              AND merchant_serial = my_instance_id
    721              AND curr = my_currency
    722              AND range = my_range;
    723         ELSE
    724           -- actually, slot is now empty, remove it entirely
    725           DELETE FROM merchant_statistic_interval_amount
    726            WHERE imeta_serial_id = my_meta
    727              AND merchant_serial = my_instance_id
    728              AND curr = my_currency
    729              AND range = my_range;
    730         END IF;
    731         IF (my_i < array_length(my_ranges,1))
    732         THEN
    733           -- carry over all events into the next (larger) slot
    734           UPDATE merchant_statistic_interval_amount AS msia SET
    735             cumulative_value = cumulative_value + my_delta_value
    736               + CASE
    737                  WHEN cumulative_frac + my_delta_frac > 100000000
    738                  THEN 1
    739                  ELSE 0
    740                END,
    741             cumulative_frac = cumulative_frac + my_delta_value
    742               - CASE
    743                  WHEN cumulative_frac + my_delta_frac > 100000000
    744                  THEN 100000000
    745                  ELSE 0
    746                END,
    747             event_delimiter = LEAST (msia.event_delimiter,my_min_serial)
    748            WHERE imeta_serial_id = my_meta
    749              AND merchant_serial = my_instance_id
    750              AND range=my_ranges[my_i+1];
    751           IF NOT FOUND
    752           THEN
    753             INSERT INTO merchant_statistic_interval_amount
    754               (imeta_serial_id
    755               ,merchant_serial
    756               ,event_delimiter
    757               ,range
    758               ,curr
    759               ,cumulative_value
    760               ,cumulative_frac
    761               ) VALUES (
    762                my_meta
    763               ,my_instance_id
    764               ,my_min_serial
    765               ,my_ranges[my_i+1]
    766               ,my_currency
    767               ,my_delta_value
    768               ,my_delta_frac);
    769           END IF;
    770         ELSE
    771           -- events are obsolete, delete them
    772           DELETE FROM merchant_statistic_amount_event
    773                 WHERE imeta_serial_id = my_meta
    774                   AND merchant_serial = my_instance_id
    775                   AND slot < my_time - my_range;
    776         END IF;
    777       END IF;
    778 
    779       my_rval.range = my_range;
    780       RETURN NEXT my_rval;
    781     END IF;
    782   END LOOP; -- over my_ranges
    783   END LOOP; -- over my_currency
    784 END $$;
    785 
    786 COMMENT ON FUNCTION merchant_statistic_interval_amount_get
    787   IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value; multiple values are returned, one per currency and range';
    788 
    789 
    790 
    791 
    792 
    793 DROP PROCEDURE IF EXISTS merchant_statistic_counter_gc;
    794 CREATE OR REPLACE PROCEDURE merchant_statistic_counter_gc ()
    795 LANGUAGE plpgsql
    796 AS $$
    797 DECLARE
    798   my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    799   my_instance INT8;
    800   my_instance_name TEXT;
    801   my_rec RECORD;
    802   my_sum RECORD;
    803   my_meta INT8;
    804   my_ranges INT8[];
    805   my_precisions INT8[];
    806   my_precision INT4;
    807   my_i INT4;
    808   min_slot INT8;
    809   max_slot INT8;
    810   end_slot INT8;
    811   my_total INT8;
    812 BEGIN
    813   -- GC for all instances
    814   FOR my_instance IN
    815     SELECT DISTINCT merchant_serial
    816       FROM merchant_statistic_counter_event
    817   LOOP
    818   -- Do combination work for all numeric statistic events
    819   FOR my_rec IN
    820     SELECT imeta_serial_id
    821           ,ranges
    822           ,precisions
    823           ,slug
    824       FROM merchant_statistic_interval_meta
    825   LOOP
    826     -- First, we query the current interval statistic to update its counters
    827     SELECT merchant_id
    828       INTO my_instance_name
    829        FROM merchant_instances
    830       WHERE merchant_serial = my_instance;
    831     PERFORM FROM merchant_statistic_interval_number_get (my_rec.slug, my_instance_name);
    832 
    833     my_meta = my_rec.imeta_serial_id;
    834     my_ranges = my_rec.ranges;
    835     my_precisions = my_rec.precisions;
    836 
    837     FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0)
    838     LOOP
    839       my_precision = my_precisions[my_i];
    840       IF 1 >= my_precision
    841       THEN
    842         -- Cannot coarsen in this case
    843         CONTINUE;
    844       END IF;
    845 
    846       IF 1 = my_i
    847       THEN
    848         min_slot = 0;
    849       ELSE
    850         min_slot = my_ranges[my_i - 1];
    851       END IF;
    852       end_slot = my_ranges[my_i];
    853       RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision;
    854 
    855       LOOP
    856         EXIT WHEN min_slot >= end_slot;
    857         max_slot = min_slot + my_precision;
    858         SELECT SUM(delta) AS total,
    859                COUNT(*)   AS matches,
    860                MIN(nevent_serial_id) AS rep_serial_id
    861           INTO my_sum
    862           FROM merchant_statistic_counter_event
    863          WHERE merchant_serial=my_instance
    864            AND imeta_serial_id=my_meta
    865            AND slot >= my_time - max_slot
    866            AND slot  < my_time - min_slot;
    867 
    868         RAISE NOTICE 'Found % entries between [%,%)', my_sum.matches, my_time - max_slot, my_time - min_slot;
    869         -- we only proceed if we had more then one match (optimization)
    870         IF FOUND AND my_sum.matches > 1
    871         THEN
    872           my_total = my_sum.total;
    873 
    874           RAISE NOTICE 'combining % entries to representative % for slots [%-%)', my_sum.matches, my_sum.rep_serial_id, my_time - max_slot, my_time - min_slot;
    875 
    876           -- combine entries
    877           DELETE FROM merchant_statistic_counter_event
    878            WHERE merchant_serial=my_instance
    879              AND imeta_serial_id=my_meta
    880              AND slot >= my_time - max_slot
    881              AND slot  < my_time - min_slot
    882              AND nevent_serial_id > my_sum.rep_serial_id;
    883            -- Now update the representative to the sum
    884           UPDATE merchant_statistic_counter_event SET
    885             delta = my_total
    886            WHERE imeta_serial_id = my_meta
    887              AND merchant_serial = my_instance
    888              AND nevent_serial_id = my_sum.rep_serial_id;
    889         END IF;
    890         min_slot = min_slot + my_precision;
    891       END LOOP; -- min_slot to end_slot by precision loop
    892     END LOOP; -- my_i loop
    893     -- Finally, delete all events beyond the range we care about
    894 
    895     RAISE NOTICE 'deleting entries of %/% before % - % = %', my_instance, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)];
    896     DELETE FROM merchant_statistic_counter_event
    897      WHERE merchant_serial=my_instance
    898        AND imeta_serial_id=my_meta
    899        AND slot < my_time - my_ranges[array_length(my_ranges,1)];
    900   END LOOP; -- my_rec loop
    901   END LOOP; -- my_instance loop
    902 END $$;
    903 COMMENT ON PROCEDURE merchant_statistic_counter_gc
    904   IS 'Performs garbage collection and compaction of the merchant_statistic_counter_event table';
    905 
    906 
    907 
    908 DROP PROCEDURE IF EXISTS merchant_statistic_amount_gc;
    909 CREATE OR REPLACE PROCEDURE merchant_statistic_amount_gc ()
    910 LANGUAGE plpgsql
    911 AS $$
    912 DECLARE
    913   my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    914   my_instance INT8;
    915   my_instance_name TEXT;
    916   my_rec RECORD;
    917   my_sum RECORD;
    918   my_meta INT8;
    919   my_ranges INT8[];
    920   my_precisions INT8[];
    921   my_precision INT4;
    922   my_currency TEXT;
    923   my_i INT4;
    924   min_slot INT8;
    925   max_slot INT8;
    926   end_slot INT8;
    927   my_total_val INT8;
    928   my_total_frac INT8;
    929 BEGIN
    930   -- GC for all instances
    931   FOR my_instance IN
    932     SELECT DISTINCT merchant_serial
    933       FROM merchant_statistic_counter_event
    934   LOOP
    935   -- Do combination work for all numeric statistic events
    936   FOR my_rec IN
    937     SELECT imeta_serial_id
    938           ,ranges
    939           ,precisions
    940           ,slug
    941       FROM merchant_statistic_interval_meta
    942   LOOP
    943 
    944   -- First, we query the current interval statistic to update its counters
    945   SELECT merchant_id
    946     INTO my_instance_name
    947      FROM merchant_instances
    948     WHERE merchant_serial = my_instance;
    949   PERFORM FROM merchant_statistic_interval_amount_get (my_rec.slug, my_instance_name);
    950 
    951   my_meta = my_rec.imeta_serial_id;
    952   my_ranges = my_rec.ranges;
    953   my_precisions = my_rec.precisions;
    954   FOR my_currency IN
    955     SELECT DISTINCT delta_curr
    956       FROM merchant_statistic_amount_event
    957      WHERE imeta_serial_id = my_meta
    958   LOOP
    959 
    960     FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0)
    961     LOOP
    962       my_precision = my_precisions[my_i];
    963       IF 1 >= my_precision
    964       THEN
    965         -- Cannot coarsen in this case
    966         CONTINUE;
    967       END IF;
    968 
    969       IF 1 = my_i
    970       THEN
    971         min_slot = 0;
    972       ELSE
    973         min_slot = my_ranges[my_i - 1];
    974       END IF;
    975       end_slot = my_ranges[my_i];
    976 
    977       RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision;
    978       LOOP
    979         EXIT WHEN min_slot >= end_slot;
    980         max_slot = min_slot + my_precision;
    981         SELECT SUM(delta_value) AS total_val,
    982                SUM(delta_frac)  AS total_frac,
    983                COUNT(*)         AS matches,
    984                MIN(aevent_serial_id) AS rep_serial_id
    985           INTO my_sum
    986           FROM merchant_statistic_amount_event
    987          WHERE imeta_serial_id=my_meta
    988            AND merchant_serial=my_instance
    989            AND delta_curr = my_currency
    990            AND slot >= my_time - max_slot
    991            AND slot  < my_time - max_slot;
    992         -- we only proceed if we had more then one match (optimization)
    993         IF FOUND AND my_sum.matches > 1
    994         THEN
    995           -- normalize new total
    996           my_total_frac = my_sum.total_frac % 100000000;
    997           my_total_val = my_sum.total_val + my_sum.total_frac / 100000000;
    998 
    999           -- combine entries
   1000           DELETE FROM merchant_statistic_amount_event
   1001            WHERE imeta_serial_id=my_meta
   1002              AND merchant_serial=my_instance
   1003              AND delta_curr = my_currency
   1004              AND slot >= my_time - max_slot
   1005              AND slot  < my_time - max_slot
   1006              AND aevent_serial_id > my_sum.rep_serial_id;
   1007           -- Now update the representative to the sum
   1008           UPDATE merchant_statistic_amount_event SET
   1009              delta_value = my_total_value
   1010             ,delta_frac = my_total_frac
   1011            WHERE imeta_serial_id = my_meta
   1012              AND merchant_serial = my_instance
   1013              AND delta_curr = my_currency
   1014              AND aevent_serial_id = my_sum.rep_serial_id;
   1015         END IF;
   1016         min_slot = min_slot + my_precision;
   1017       END LOOP; -- min_slot to end_slot by precision loop
   1018     END LOOP; -- my_i loop
   1019   END LOOP; -- my_currency loop
   1020   -- Finally, delete all events beyond the range we care about
   1021 
   1022   RAISE NOTICE 'deleting entries of %/% before % - % = %', my_instance, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)];
   1023   DELETE FROM merchant_statistic_amount_event
   1024    WHERE merchant_serial=my_instance
   1025      AND imeta_serial_id=my_meta
   1026      AND slot < my_time - my_ranges[array_length(my_ranges,1)];
   1027   END LOOP; -- my_rec loop
   1028   END LOOP; -- my_instance loop
   1029 END $$;
   1030 COMMENT ON PROCEDURE merchant_statistic_amount_gc
   1031   IS 'Performs garbage collection and compaction of the merchant_statistic_amount_event table';
   1032 
   1033 
   1034 
   1035 DROP PROCEDURE IF EXISTS merchant_statistic_bucket_gc;
   1036 CREATE OR REPLACE PROCEDURE merchant_statistic_bucket_gc ()
   1037 LANGUAGE plpgsql
   1038 AS $$
   1039 DECLARE
   1040   my_rec RECORD;
   1041   my_range TEXT;
   1042   my_now INT8;
   1043   my_end INT8;
   1044 BEGIN
   1045   my_now = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP(0)::TIMESTAMP); -- seconds since epoch
   1046   FOR my_rec IN
   1047     SELECT bmeta_serial_id
   1048           ,stype
   1049           ,ranges[array_length(ranges,1)] AS range
   1050           ,ages[array_length(ages,1)] AS age
   1051       FROM merchant_statistic_bucket_meta
   1052   LOOP
   1053     my_range = '1 ' || my_rec.range::TEXT;
   1054     my_end = my_now - my_rec.age * EXTRACT(SECONDS FROM (SELECT my_range::INTERVAL)); -- age is given in multiples of the range (in seconds)
   1055     IF my_rec.stype = 'amount'
   1056     THEN
   1057       DELETE
   1058         FROM merchant_statistic_bucket_amount
   1059        WHERE bmeta_serial_id = my_rec.bmeta_serial_id
   1060          AND bucket_start >= my_end;
   1061     ELSE
   1062       DELETE
   1063         FROM merchant_statistic_bucket_counter
   1064        WHERE bmeta_serial_id = my_rec.bmeta_serial_id
   1065          AND bucket_start >= my_end;
   1066     END IF;
   1067   END LOOP;
   1068 END $$;
   1069 COMMENT ON PROCEDURE merchant_statistic_bucket_gc
   1070   IS 'Performs garbage collection of the merchant_statistic_bucket_counter and merchant_statistic_bucket_amount tables';
   1071 
   1072 
   1073 
   1074 -- The date_trunc may not be necessary if we assume it is already truncated
   1075 DROP FUNCTION IF EXISTS merchant_statistics_bucket_end;
   1076 CREATE FUNCTION merchant_statistics_bucket_end (
   1077   IN in_bucket_start INT8,
   1078   IN in_range statistic_range,
   1079   OUT out_bucket_end INT8
   1080 )
   1081 LANGUAGE plpgsql
   1082 AS $$
   1083 BEGIN
   1084     IF in_range='quarter'
   1085     THEN
   1086       out_bucket_end = EXTRACT(EPOCH FROM CAST(date_trunc('quarter', to_timestamp(in_bucket_start)::date)  + interval '3 months' AS date));
   1087     ELSE
   1088       out_bucket_end = EXTRACT(EPOCH FROM CAST(to_timestamp(in_bucket_start)::date  + ('1 ' || in_range)::interval AS date));
   1089     END IF;
   1090 END $$;
   1091 COMMENT ON FUNCTION merchant_statistics_bucket_end
   1092 IS 'computes the end time of the bucket for an event at the current time given the desired bucket range';