exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

exchange_statistics_helpers.sql (31641B)


      1 --
      2 -- This file is part of TALER
      3 -- Copyright (C) 2025 Taler Systems SA
      4 --
      5 -- TALER is free software; you can redistribute it and/or modify it under the
      6 -- terms of the GNU General Public License as published by the Free Software
      7 -- Foundation; either version 3, or (at your option) any later version.
      8 --
      9 -- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10 -- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11 -- A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 --
     13 -- You should have received a copy of the GNU General Public License along with
     14 -- TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 --
     16 
     17 SET search_path TO exchange;
     18 DROP FUNCTION IF EXISTS interval_to_start;
     19 CREATE OR REPLACE FUNCTION interval_to_start (
     20   IN in_timestamp TIMESTAMP,
     21   IN in_range statistic_range,
     22   OUT out_bucket_start INT8
     23 )
     24 LANGUAGE plpgsql
     25 AS $$
     26 BEGIN
     27   out_bucket_start = EXTRACT(EPOCH FROM DATE_TRUNC(in_range::text, in_timestamp));
     28 END $$;
     29 COMMENT ON FUNCTION interval_to_start
     30  IS 'computes the start time of the bucket for an event at the current time given the desired bucket range';
     31 
     32 
     33 DROP PROCEDURE IF EXISTS exchange_do_bump_number_bucket_stat;
     34 CREATE OR REPLACE PROCEDURE exchange_do_bump_number_bucket_stat(
     35   in_slug TEXT,
     36   in_h_payto BYTEA,
     37   in_timestamp TIMESTAMP,
     38   in_delta INT8
     39 )
     40 LANGUAGE plpgsql
     41 AS $$
     42 DECLARE
     43   my_meta INT8;
     44   my_range statistic_range;
     45   my_bucket_start INT8;
     46   my_curs CURSOR (arg_slug TEXT)
     47    FOR SELECT UNNEST(ranges)
     48          FROM exchange_statistic_bucket_meta
     49         WHERE slug=arg_slug;
     50 BEGIN
     51   SELECT bmeta_serial_id
     52     INTO my_meta
     53     FROM exchange_statistic_bucket_meta
     54    WHERE slug=in_slug
     55      AND stype='number';
     56   IF NOT FOUND
     57   THEN
     58     RETURN;
     59   END IF;
     60   OPEN my_curs (arg_slug:=in_slug);
     61   LOOP
     62     FETCH NEXT
     63       FROM my_curs
     64       INTO my_range;
     65     EXIT WHEN NOT FOUND;
     66     SELECT *
     67       INTO my_bucket_start
     68       FROM interval_to_start (in_timestamp, my_range);
     69 
     70     UPDATE exchange_statistic_bucket_counter
     71        SET cumulative_number = cumulative_number + in_delta
     72      WHERE bmeta_serial_id=my_meta
     73        AND h_payto=in_h_payto
     74        AND bucket_start=my_bucket_start
     75        AND bucket_range=my_range;
     76     IF NOT FOUND
     77     THEN
     78       INSERT INTO exchange_statistic_bucket_counter
     79         (bmeta_serial_id
     80         ,h_payto
     81         ,bucket_start
     82         ,bucket_range
     83         ,cumulative_number
     84         ) VALUES (
     85          my_meta
     86         ,in_h_payto
     87         ,my_bucket_start
     88         ,my_range
     89         ,in_delta);
     90     END IF;
     91   END LOOP;
     92   CLOSE my_curs;
     93 END $$;
     94 
     95 
     96 DROP PROCEDURE IF EXISTS exchange_do_bump_amount_bucket_stat;
     97 CREATE OR REPLACE PROCEDURE exchange_do_bump_amount_bucket_stat(
     98   in_slug TEXT,
     99   in_h_payto BYTEA,
    100   in_timestamp TIMESTAMP,
    101   in_delta taler_amount
    102 )
    103 LANGUAGE plpgsql
    104 AS $$
    105 DECLARE
    106   my_meta INT8;
    107   my_range statistic_range;
    108   my_bucket_start INT8;
    109   my_curs CURSOR (arg_slug TEXT)
    110    FOR SELECT UNNEST(ranges)
    111          FROM exchange_statistic_bucket_meta
    112         WHERE slug=arg_slug;
    113 BEGIN
    114   SELECT bmeta_serial_id
    115     INTO my_meta
    116     FROM exchange_statistic_bucket_meta
    117    WHERE slug=in_slug
    118      AND stype='amount';
    119   IF NOT FOUND
    120   THEN
    121     RETURN;
    122   END IF;
    123   OPEN my_curs (arg_slug:=in_slug);
    124   LOOP
    125     FETCH NEXT
    126       FROM my_curs
    127       INTO my_range;
    128     EXIT WHEN NOT FOUND;
    129     SELECT *
    130       INTO my_bucket_start
    131       FROM interval_to_start (in_timestamp, my_range);
    132 
    133     UPDATE exchange_statistic_bucket_amount
    134       SET
    135         cumulative_value.val = (cumulative_value).val + (in_delta).val
    136         + CASE
    137             WHEN (in_delta).frac + (cumulative_value).frac >= 100000000
    138             THEN 1
    139             ELSE 0
    140           END,
    141         cumulative_value.frac = (cumulative_value).frac + (in_delta).frac
    142         - CASE
    143             WHEN (in_delta).frac + (cumulative_value).frac >= 100000000
    144             THEN 100000000
    145             ELSE 0
    146           END
    147      WHERE bmeta_serial_id=my_meta
    148        AND h_payto=in_h_payto
    149        AND bucket_start=my_bucket_start
    150        AND bucket_range=my_range;
    151     IF NOT FOUND
    152     THEN
    153       INSERT INTO exchange_statistic_bucket_amount
    154         (bmeta_serial_id
    155         ,h_payto
    156         ,bucket_start
    157         ,bucket_range
    158         ,cumulative_value
    159         ) VALUES (
    160          my_meta
    161         ,in_h_payto
    162         ,my_bucket_start
    163         ,my_range
    164         ,in_delta);
    165     END IF;
    166   END LOOP;
    167   CLOSE my_curs;
    168 END $$;
    169 
    170 COMMENT ON PROCEDURE exchange_do_bump_amount_bucket_stat
    171   IS 'Updates an amount statistic tracked over buckets';
    172 
    173 
    174 DROP PROCEDURE IF EXISTS exchange_do_bump_number_interval_stat;
    175 CREATE OR REPLACE PROCEDURE exchange_do_bump_number_interval_stat(
    176   in_slug TEXT,
    177   in_h_payto BYTEA,
    178   in_timestamp TIMESTAMP,
    179   in_delta INT8
    180 )
    181 LANGUAGE plpgsql
    182 AS $$
    183 DECLARE
    184   my_now INT8;
    185   my_record RECORD;
    186   my_meta INT8;
    187   my_ranges INT8[];
    188   my_precisions INT8[];
    189   my_rangex INT8;
    190   my_precisionx INT8;
    191   my_start INT8;
    192   my_event INT8;
    193 BEGIN
    194   my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    195   SELECT imeta_serial_id
    196         ,ranges AS ranges
    197         ,precisions AS precisions
    198     INTO my_record
    199     FROM exchange_statistic_interval_meta
    200    WHERE slug=in_slug
    201      AND stype='number';
    202   IF NOT FOUND
    203   THEN
    204     RETURN;
    205   END IF;
    206 
    207   my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds
    208   my_precisions = my_record.precisions;
    209   my_ranges = my_record.ranges;
    210   my_rangex = NULL;
    211   FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0)
    212   LOOP
    213     IF my_now - my_ranges[my_x] < my_start
    214     THEN
    215       my_rangex = my_ranges[my_x];
    216       my_precisionx = my_precisions[my_x];
    217       EXIT;
    218     END IF;
    219   END LOOP;
    220   IF my_rangex IS NULL
    221   THEN
    222     -- event is beyond the ranges we care about
    223     RETURN;
    224   END IF;
    225 
    226   my_meta = my_record.imeta_serial_id;
    227   my_start = my_start - my_start % my_precisionx; -- round down
    228 
    229   INSERT INTO exchange_statistic_counter_event AS msce
    230     (imeta_serial_id
    231     ,h_payto
    232     ,slot
    233     ,delta)
    234    VALUES
    235     (my_meta
    236     ,in_h_payto
    237     ,my_start
    238     ,in_delta)
    239    ON CONFLICT (imeta_serial_id, h_payto, slot)
    240    DO UPDATE SET
    241      delta = msce.delta + in_delta
    242    RETURNING nevent_serial_id
    243         INTO my_event;
    244 
    245   UPDATE exchange_statistic_interval_counter
    246      SET cumulative_number = cumulative_number + in_delta
    247    WHERE imeta_serial_id = my_meta
    248      AND h_payto = in_h_payto
    249      AND range=my_rangex;
    250   IF NOT FOUND
    251   THEN
    252     INSERT INTO exchange_statistic_interval_counter
    253       (imeta_serial_id
    254       ,h_payto
    255       ,range
    256       ,event_delimiter
    257       ,cumulative_number
    258      ) VALUES (
    259        my_meta
    260       ,in_h_payto
    261       ,my_rangex
    262       ,my_event
    263       ,in_delta);
    264   END IF;
    265 END $$;
    266 
    267 COMMENT ON PROCEDURE exchange_do_bump_number_interval_stat
    268   IS 'Updates a numeric statistic tracked over an interval';
    269 
    270 
    271 DROP PROCEDURE IF EXISTS exchange_do_bump_amount_interval_stat;
    272 CREATE OR REPLACE PROCEDURE exchange_do_bump_amount_interval_stat(
    273   in_slug TEXT,
    274   in_h_payto BYTEA,
    275   in_timestamp TIMESTAMP,
    276   in_delta taler_amount
    277 )
    278 LANGUAGE plpgsql
    279 AS $$
    280 DECLARE
    281   my_now INT8;
    282   my_record RECORD;
    283   my_meta INT8;
    284   my_ranges INT8[];
    285   my_precisions INT8[];
    286   my_x INT;
    287   my_rangex INT8;
    288   my_precisionx INT8;
    289   my_start INT8;
    290   my_event INT8;
    291 BEGIN
    292   my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    293   SELECT imeta_serial_id
    294         ,ranges
    295         ,precisions
    296     INTO my_record
    297     FROM exchange_statistic_interval_meta
    298    WHERE slug=in_slug
    299      AND stype='amount';
    300   IF NOT FOUND
    301   THEN
    302     RETURN;
    303   END IF;
    304 
    305   my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds since epoch
    306   my_precisions = my_record.precisions;
    307   my_ranges = my_record.ranges;
    308   my_rangex = NULL;
    309   FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0)
    310   LOOP
    311     IF my_now - my_ranges[my_x] < my_start
    312     THEN
    313       my_rangex = my_ranges[my_x];
    314       my_precisionx = my_precisions[my_x];
    315       EXIT;
    316     END IF;
    317   END LOOP;
    318   IF my_rangex IS NULL
    319   THEN
    320     -- event is beyond the ranges we care about
    321     RETURN;
    322   END IF;
    323   my_start = my_start - my_start % my_precisionx; -- round down
    324   my_meta = my_record.imeta_serial_id;
    325 
    326   INSERT INTO exchange_statistic_amount_event AS msae
    327     (imeta_serial_id
    328     ,h_payto
    329     ,slot
    330     ,delta
    331     ) VALUES (
    332      my_meta
    333     ,in_h_payto
    334     ,my_start
    335     ,in_delta
    336     )
    337     ON CONFLICT (imeta_serial_id, h_payto, slot)
    338     DO UPDATE SET
    339       delta.val = (msae.delta).val + (in_delta).val
    340         + CASE
    341           WHEN (in_delta).frac + (msae.delta).frac >= 100000000
    342           THEN 1
    343           ELSE 0
    344         END,
    345       delta.frac = (msae.delta).frac + (in_delta).frac
    346         - CASE
    347           WHEN (in_delta).frac + (msae.delta).frac >= 100000000
    348           THEN 100000000
    349           ELSE 0
    350         END
    351     RETURNING aevent_serial_id
    352          INTO my_event;
    353 
    354   UPDATE exchange_statistic_interval_amount
    355     SET
    356       cumulative_value.val = (cumulative_value).val + (in_delta).val
    357       + CASE
    358           WHEN (in_delta).frac + (cumulative_value).frac >= 100000000
    359           THEN 1
    360           ELSE 0
    361         END,
    362       cumulative_value.frac = (cumulative_value).frac + (in_delta).frac
    363       - CASE
    364           WHEN (in_delta).frac + (cumulative_value).frac >= 100000000
    365           THEN 100000000
    366           ELSE 0
    367         END
    368    WHERE imeta_serial_id=my_meta
    369      AND h_payto=in_h_payto
    370      AND range=my_rangex;
    371   IF NOT FOUND
    372   THEN
    373     INSERT INTO exchange_statistic_interval_amount
    374       (imeta_serial_id
    375       ,h_payto
    376       ,range
    377       ,event_delimiter
    378       ,cumulative_value
    379       ) VALUES (
    380        my_meta
    381       ,in_h_payto
    382       ,my_rangex
    383       ,my_event
    384       ,in_delta);
    385   END IF;
    386 END $$;
    387 COMMENT ON PROCEDURE exchange_do_bump_amount_interval_stat
    388   IS 'Updates an amount statistic tracked over an interval';
    389 
    390 
    391 DROP PROCEDURE IF EXISTS exchange_do_bump_number_stat;
    392 CREATE OR REPLACE PROCEDURE exchange_do_bump_number_stat(
    393   in_slug TEXT,
    394   in_h_payto BYTEA,
    395   in_timestamp TIMESTAMP,
    396   in_delta INT8
    397 )
    398 LANGUAGE plpgsql
    399 AS $$
    400 BEGIN
    401   CALL exchange_do_bump_number_bucket_stat (in_slug, in_h_payto, in_timestamp, in_delta);
    402   CALL exchange_do_bump_number_interval_stat (in_slug, in_h_payto, in_timestamp, in_delta);
    403 END $$;
    404 COMMENT ON PROCEDURE exchange_do_bump_number_stat
    405   IS 'Updates a numeric statistic (bucket or interval)';
    406 
    407 
    408 DROP PROCEDURE IF EXISTS exchange_do_bump_amount_stat;
    409 CREATE OR REPLACE PROCEDURE exchange_do_bump_amount_stat(
    410   in_slug TEXT,
    411   in_h_payto BYTEA,
    412   in_timestamp TIMESTAMP,
    413   in_delta taler_amount
    414 )
    415 LANGUAGE plpgsql
    416 AS $$
    417 BEGIN
    418   CALL exchange_do_bump_amount_bucket_stat (in_slug, in_h_payto, in_timestamp, in_delta);
    419   CALL exchange_do_bump_amount_interval_stat (in_slug, in_h_payto, in_timestamp, in_delta);
    420 END $$;
    421 COMMENT ON PROCEDURE exchange_do_bump_amount_stat
    422   IS 'Updates an amount statistic (bucket or interval)';
    423 
    424 
    425 DROP FUNCTION IF EXISTS exchange_statistic_interval_number_get;
    426 CREATE OR REPLACE FUNCTION exchange_statistic_interval_number_get (
    427   IN in_slug TEXT,
    428   IN in_h_payto BYTEA
    429 )
    430 RETURNS SETOF exchange_statistic_interval_number_get_return_value
    431 LANGUAGE plpgsql
    432 AS $$
    433 DECLARE
    434   my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    435   my_ranges INT8[];
    436   my_range INT8;
    437   my_delta INT8;
    438   my_meta INT8;
    439   my_next_max_serial INT8;
    440   my_rec RECORD;
    441   my_irec RECORD;
    442   my_i INT;
    443   my_min_serial INT8 DEFAULT NULL;
    444   my_rval exchange_statistic_interval_number_get_return_value;
    445 BEGIN
    446   SELECT imeta_serial_id
    447         ,ranges
    448         ,precisions
    449     INTO my_rec
    450     FROM exchange_statistic_interval_meta
    451    WHERE slug=in_slug;
    452   IF NOT FOUND
    453   THEN
    454     RETURN;
    455   END IF;
    456   my_rval.rvalue = 0;
    457   my_ranges = my_rec.ranges;
    458   my_meta = my_rec.imeta_serial_id;
    459 
    460   FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0)
    461   LOOP
    462     my_range = my_ranges[my_i];
    463     SELECT event_delimiter
    464           ,cumulative_number
    465       INTO my_irec
    466       FROM exchange_statistic_interval_counter
    467      WHERE imeta_serial_id = my_meta
    468        AND range = my_range
    469        AND h_payto = in_h_payto;
    470     IF FOUND
    471     THEN
    472       my_min_serial = my_irec.event_delimiter;
    473       my_rval.rvalue = my_rval.rvalue + my_irec.cumulative_number;
    474 
    475       -- Check if we have events that left the applicable range
    476       SELECT SUM(delta) AS delta_sum
    477         INTO my_irec
    478         FROM exchange_statistic_counter_event
    479        WHERE imeta_serial_id = my_meta
    480          AND h_payto = in_h_payto
    481          AND slot < my_time - my_range
    482          AND nevent_serial_id >= my_min_serial;
    483 
    484       IF FOUND AND my_irec.delta_sum IS NOT NULL
    485       THEN
    486         my_delta = my_irec.delta_sum;
    487         my_rval.rvalue = my_rval.rvalue - my_delta;
    488 
    489         -- First find out the next event delimiter value
    490         SELECT nevent_serial_id
    491           INTO my_next_max_serial
    492           FROM exchange_statistic_counter_event
    493          WHERE imeta_serial_id = my_meta
    494            AND h_payto = in_h_payto
    495            AND slot >= my_time - my_range
    496            AND nevent_serial_id >= my_min_serial
    497          ORDER BY slot ASC
    498          LIMIT 1;
    499 
    500         IF FOUND
    501         THEN
    502           -- remove expired events from the sum of the current slot
    503 
    504           UPDATE exchange_statistic_interval_counter
    505              SET cumulative_number = cumulative_number - my_delta,
    506                  event_delimiter = my_next_max_serial
    507            WHERE imeta_serial_id = my_meta
    508              AND h_payto = in_h_payto
    509              AND range = my_range;
    510         ELSE
    511           -- actually, slot is now empty, remove it entirely
    512           DELETE FROM exchange_statistic_interval_counter
    513            WHERE imeta_serial_id = my_meta
    514              AND h_payto = in_h_payto
    515              AND range = my_range;
    516         END IF;
    517         IF (my_i < array_length(my_ranges,1))
    518         THEN
    519           -- carry over all events into the next slot
    520           UPDATE exchange_statistic_interval_counter AS usic SET
    521             cumulative_number = cumulative_number + my_delta,
    522             event_delimiter = LEAST(usic.event_delimiter,my_min_serial)
    523            WHERE imeta_serial_id = my_meta
    524              AND h_payto = in_h_payto
    525              AND range=my_ranges[my_i+1];
    526           IF NOT FOUND
    527           THEN
    528             INSERT INTO exchange_statistic_interval_counter
    529               (imeta_serial_id
    530               ,h_payto
    531               ,range
    532               ,event_delimiter
    533               ,cumulative_number
    534               ) VALUES (
    535                my_meta
    536               ,in_h_payto
    537               ,my_ranges[my_i+1]
    538               ,my_min_serial
    539               ,my_delta);
    540           END IF;
    541         ELSE
    542           -- events are obsolete, delete them
    543           DELETE FROM exchange_statistic_counter_event
    544                 WHERE imeta_serial_id = my_meta
    545                   AND h_payto = in_h_payto
    546                   AND slot < my_time - my_range;
    547         END IF;
    548       END IF;
    549 
    550       my_rval.range = my_range;
    551       RETURN NEXT my_rval;
    552     END IF;
    553   END LOOP;
    554 END $$;
    555 
    556 COMMENT ON FUNCTION exchange_statistic_interval_number_get
    557   IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value for each range';
    558 
    559 
    560 DROP FUNCTION IF EXISTS exchange_statistic_interval_amount_get;
    561 CREATE OR REPLACE FUNCTION exchange_statistic_interval_amount_get (
    562   IN in_slug TEXT,
    563   IN in_h_payto BYTEA
    564 )
    565 RETURNS SETOF exchange_statistic_interval_amount_get_return_value
    566 LANGUAGE plpgsql
    567 AS $$
    568 DECLARE
    569   my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    570   my_ranges INT8[];
    571   my_range INT8;
    572   my_delta_value INT8;
    573   my_delta_frac INT8;
    574   my_delta taler_amount;
    575   my_meta INT8;
    576   my_next_max_serial INT8;
    577   my_rec RECORD;
    578   my_irec RECORD;
    579   my_jrec RECORD;
    580   my_i INT;
    581   my_min_serial INT8 DEFAULT NULL;
    582   my_rval exchange_statistic_interval_amount_get_return_value;
    583 BEGIN
    584   SELECT imeta_serial_id
    585         ,ranges
    586         ,precisions
    587     INTO my_rec
    588     FROM exchange_statistic_interval_meta
    589    WHERE slug=in_slug;
    590   IF NOT FOUND
    591   THEN
    592     RETURN;
    593   END IF;
    594 
    595   my_meta = my_rec.imeta_serial_id;
    596   my_ranges = my_rec.ranges;
    597 
    598   my_rval.rvalue.val = 0;
    599   my_rval.rvalue.frac = 0;
    600 
    601   FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0)
    602   LOOP
    603     my_range = my_ranges[my_i];
    604     SELECT event_delimiter
    605           ,cumulative_value
    606       INTO my_irec
    607       FROM exchange_statistic_interval_amount
    608      WHERE imeta_serial_id = my_meta
    609        AND h_payto = in_h_payto
    610        AND range = my_range;
    611 
    612     IF FOUND
    613     THEN
    614       my_min_serial = my_irec.event_delimiter;
    615       my_rval.rvalue.val = (my_rval.rvalue).val + (my_irec.cumulative_value).val + (my_irec.cumulative_value).frac / 100000000;
    616       my_rval.rvalue.frac = (my_rval.rvalue).frac + (my_irec.cumulative_value).frac % 100000000;
    617       IF (my_rval.rvalue).frac > 100000000
    618       THEN
    619         my_rval.rvalue.frac = (my_rval.rvalue).frac - 100000000;
    620         my_rval.rvalue.val = (my_rval.rvalue).val + 1;
    621       END IF;
    622 
    623       -- Check if we have events that left the applicable range
    624       SELECT SUM((esae.delta).val) AS value_sum
    625             ,SUM((esae.delta).frac) AS frac_sum
    626         INTO my_jrec
    627         FROM exchange_statistic_amount_event esae
    628        WHERE imeta_serial_id = my_meta
    629          AND h_payto = in_h_payto
    630          AND slot < my_time - my_range
    631          AND aevent_serial_id >= my_min_serial;
    632 
    633       IF FOUND AND my_jrec.value_sum IS NOT NULL
    634       THEN
    635         -- Normalize sum
    636         my_delta_value = my_jrec.value_sum + my_jrec.frac_sum / 100000000;
    637         my_delta_frac = my_jrec.frac_sum % 100000000;
    638         my_rval.rvalue.val = (my_rval.rvalue).val - my_delta_value;
    639         IF ((my_rval.rvalue).frac >= my_delta_frac)
    640         THEN
    641           my_rval.rvalue.frac = (my_rval.rvalue).frac - my_delta_frac;
    642         ELSE
    643           my_rval.rvalue.frac = 100000000 + (my_rval.rvalue).frac - my_delta_frac;
    644           my_rval.rvalue.val = (my_rval.rvalue).val - 1;
    645         END IF;
    646 
    647         -- First find out the next event delimiter value
    648         SELECT aevent_serial_id
    649           INTO my_next_max_serial
    650           FROM exchange_statistic_amount_event
    651          WHERE imeta_serial_id = my_meta
    652            AND h_payto = in_h_payto
    653            AND slot >= my_time - my_range
    654            AND aevent_serial_id >= my_min_serial
    655          ORDER BY slot ASC
    656          LIMIT 1;
    657         IF FOUND
    658         THEN
    659           -- remove expired events from the sum of the current slot
    660           UPDATE exchange_statistic_interval_amount SET
    661              cumulative_value.val = (cumulative_value).val - my_delta_value
    662               - CASE
    663                   WHEN (cumulative_value).frac < my_delta_frac
    664                   THEN 1
    665                   ELSE 0
    666                 END,
    667              cumulative_value.frac = (cumulative_value).frac - my_delta_frac
    668              + CASE
    669                  WHEN (cumulative_value).frac < my_delta_frac
    670                  THEN 100000000
    671                  ELSE 0
    672                END,
    673              event_delimiter = my_next_max_serial
    674            WHERE imeta_serial_id = my_meta
    675              AND h_payto = in_h_payto
    676              AND range = my_range;
    677         ELSE
    678           -- actually, slot is now empty, remove it entirely
    679           DELETE FROM exchange_statistic_interval_amount
    680            WHERE imeta_serial_id = my_meta
    681              AND h_payto = in_h_payto
    682              AND range = my_range;
    683         END IF;
    684         IF (my_i < array_length(my_ranges,1))
    685         THEN
    686           -- carry over all events into the next (larger) slot
    687           UPDATE exchange_statistic_interval_amount AS msia SET
    688             cumulative_value.val = (cumulative_value).val + my_delta_value
    689               + CASE
    690                  WHEN (cumulative_value).frac + my_delta_frac > 100000000
    691                  THEN 1
    692                  ELSE 0
    693                END,
    694             cumulative_value.frac = (cumulative_value).frac + my_delta_value
    695               - CASE
    696                  WHEN (cumulative_value).frac + my_delta_frac > 100000000
    697                  THEN 100000000
    698                  ELSE 0
    699                END,
    700             event_delimiter = LEAST (msia.event_delimiter,my_min_serial)
    701            WHERE imeta_serial_id = my_meta
    702              AND h_payto = in_h_payto
    703              AND range=my_ranges[my_i+1];
    704           IF NOT FOUND
    705           THEN
    706             my_delta.val = my_delta_value;
    707             my_delta.frac = my_delta_frac;
    708             INSERT INTO exchange_statistic_interval_amount
    709               (imeta_serial_id
    710               ,h_payto
    711               ,event_delimiter
    712               ,range
    713               ,cumulative_value
    714               ) VALUES (
    715                my_meta
    716               ,in_h_payto
    717               ,my_min_serial
    718               ,my_ranges[my_i+1]
    719               ,my_delta);
    720           END IF;
    721         ELSE
    722           -- events are obsolete, delete them
    723           DELETE FROM exchange_statistic_amount_event
    724                 WHERE imeta_serial_id = my_meta
    725                   AND h_payto = in_h_payto
    726                   AND slot < my_time - my_range;
    727         END IF;
    728       END IF;
    729 
    730       my_rval.range = my_range;
    731       RETURN NEXT my_rval;
    732     END IF;
    733   END LOOP; -- over my_ranges
    734 END $$;
    735 
    736 COMMENT ON FUNCTION exchange_statistic_interval_amount_get
    737   IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value; multiple values are returned, one per range';
    738 
    739 
    740 
    741 
    742 
    743 DROP PROCEDURE IF EXISTS exchange_statistic_counter_gc;
    744 CREATE OR REPLACE PROCEDURE exchange_statistic_counter_gc ()
    745 LANGUAGE plpgsql
    746 AS $$
    747 DECLARE
    748   my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    749   my_h_payto BYTEA;
    750   my_rec RECORD;
    751   my_sum RECORD;
    752   my_meta INT8;
    753   my_ranges INT8[];
    754   my_precisions INT8[];
    755   my_precision INT4;
    756   my_i INT4;
    757   min_slot INT8;
    758   max_slot INT8;
    759   end_slot INT8;
    760   my_total INT8;
    761 BEGIN
    762   -- GC for all instances
    763   FOR my_h_payto IN
    764     SELECT DISTINCT h_payto
    765       FROM exchange_statistic_counter_event
    766   LOOP
    767   -- Do combination work for all numeric statistic events
    768   FOR my_rec IN
    769     SELECT imeta_serial_id
    770           ,ranges
    771           ,precisions
    772           ,slug
    773       FROM exchange_statistic_interval_meta
    774   LOOP
    775     -- First, we query the current interval statistic to update its counters
    776     PERFORM FROM exchange_statistic_interval_number_get (my_rec.slug, my_h_payto);
    777 
    778     my_meta = my_rec.imeta_serial_id;
    779     my_ranges = my_rec.ranges;
    780     my_precisions = my_rec.precisions;
    781 
    782     FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0)
    783     LOOP
    784       my_precision = my_precisions[my_i];
    785       IF 1 >= my_precision
    786       THEN
    787         -- Cannot coarsen in this case
    788         CONTINUE;
    789       END IF;
    790 
    791       IF 1 = my_i
    792       THEN
    793         min_slot = 0;
    794       ELSE
    795         min_slot = my_ranges[my_i - 1];
    796       END IF;
    797       end_slot = my_ranges[my_i];
    798 --    RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision;
    799 
    800       LOOP
    801         EXIT WHEN min_slot >= end_slot;
    802         max_slot = min_slot + my_precision;
    803         SELECT SUM(delta) AS total,
    804                COUNT(*)   AS matches,
    805                MIN(nevent_serial_id) AS rep_serial_id
    806           INTO my_sum
    807           FROM exchange_statistic_counter_event
    808          WHERE h_payto=my_h_payto
    809            AND imeta_serial_id=my_meta
    810            AND slot >= my_time - max_slot
    811            AND slot  < my_time - min_slot;
    812 
    813 --      RAISE NOTICE 'Found % entries between [%,%)', my_sum.matches, my_time - max_slot, my_time - min_slot;
    814         -- we only proceed if we had more then one match (optimization)
    815         IF FOUND AND my_sum.matches > 1
    816         THEN
    817           my_total = my_sum.total;
    818 
    819 --        RAISE NOTICE 'combining % entries to representative % for slots [%-%)', my_sum.matches, my_sum.rep_serial_id, my_time - max_slot, my_time - min_slot;
    820 
    821           -- combine entries
    822           DELETE FROM exchange_statistic_counter_event
    823            WHERE h_payto=my_h_payto
    824              AND imeta_serial_id=my_meta
    825              AND slot >= my_time - max_slot
    826              AND slot  < my_time - min_slot
    827              AND nevent_serial_id > my_sum.rep_serial_id;
    828            -- Now update the representative to the sum
    829           UPDATE exchange_statistic_counter_event SET
    830             delta = my_total
    831            WHERE imeta_serial_id = my_meta
    832              AND h_payto = my_h_payto
    833              AND nevent_serial_id = my_sum.rep_serial_id;
    834         END IF;
    835         min_slot = min_slot + my_precision;
    836       END LOOP; -- min_slot to end_slot by precision loop
    837     END LOOP; -- my_i loop
    838     -- Finally, delete all events beyond the range we care about
    839 
    840 --  RAISE NOTICE 'deleting entries of %/% before % - % = %', my_h_payto, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)];
    841     DELETE FROM exchange_statistic_counter_event
    842      WHERE h_payto=my_h_payto
    843        AND imeta_serial_id=my_meta
    844        AND slot < my_time - my_ranges[array_length(my_ranges,1)];
    845   END LOOP; -- my_rec loop
    846   END LOOP; -- my_h_payto loop
    847 END $$;
    848 COMMENT ON PROCEDURE exchange_statistic_counter_gc
    849   IS 'Performs garbage collection and compaction of the exchange_statistic_counter_event table';
    850 
    851 
    852 
    853 DROP PROCEDURE IF EXISTS exchange_statistic_amount_gc;
    854 CREATE OR REPLACE PROCEDURE exchange_statistic_amount_gc ()
    855 LANGUAGE plpgsql
    856 AS $$
    857 DECLARE
    858   my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000;
    859   my_h_payto BYTEA;
    860   my_rec RECORD;
    861   my_sum RECORD;
    862   my_meta INT8;
    863   my_ranges INT8[];
    864   my_precisions INT8[];
    865   my_precision INT4;
    866   my_i INT4;
    867   min_slot INT8;
    868   max_slot INT8;
    869   end_slot INT8;
    870   my_total_val INT8;
    871   my_total_frac INT8;
    872 BEGIN
    873   -- GC for all accounts
    874   FOR my_h_payto IN
    875     SELECT DISTINCT h_payto
    876       FROM exchange_statistic_counter_event
    877   LOOP
    878   -- Do combination work for all numeric statistic events
    879   FOR my_rec IN
    880     SELECT imeta_serial_id
    881           ,ranges
    882           ,precisions
    883           ,slug
    884       FROM exchange_statistic_interval_meta
    885   LOOP
    886 
    887     -- First, we query the current interval statistic to update its counters
    888     PERFORM FROM exchange_statistic_interval_amount_get (my_rec.slug, my_h_payto);
    889 
    890     my_meta = my_rec.imeta_serial_id;
    891     my_ranges = my_rec.ranges;
    892     my_precisions = my_rec.precisions;
    893 
    894     FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0)
    895     LOOP
    896       my_precision = my_precisions[my_i];
    897       IF 1 >= my_precision
    898       THEN
    899         -- Cannot coarsen in this case
    900         CONTINUE;
    901       END IF;
    902 
    903       IF 1 = my_i
    904       THEN
    905         min_slot = 0;
    906       ELSE
    907         min_slot = my_ranges[my_i - 1];
    908       END IF;
    909       end_slot = my_ranges[my_i];
    910 
    911 --    RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision;
    912       LOOP
    913         EXIT WHEN min_slot >= end_slot;
    914         max_slot = min_slot + my_precision;
    915         SELECT SUM((delta).val)  AS total_val,
    916                SUM((delta).frac) AS total_frac,
    917                COUNT(*)          AS matches,
    918                MIN(aevent_serial_id) AS rep_serial_id
    919           INTO my_sum
    920           FROM exchange_statistic_amount_event
    921          WHERE imeta_serial_id=my_meta
    922            AND h_payto=my_h_payto
    923            AND slot >= my_time - max_slot
    924            AND slot  < my_time - max_slot;
    925         -- we only proceed if we had more then one match (optimization)
    926         IF FOUND AND my_sum.matches > 1
    927         THEN
    928           -- normalize new total
    929           my_total_frac = my_sum.total_frac % 100000000;
    930           my_total_val = my_sum.total_val + my_sum.total_frac / 100000000;
    931 
    932           -- combine entries
    933           DELETE FROM exchange_statistic_amount_event
    934            WHERE imeta_serial_id=my_meta
    935              AND h_payto=my_h_payto
    936              AND slot >= my_time - max_slot
    937              AND slot  < my_time - max_slot
    938              AND aevent_serial_id > my_sum.rep_serial_id;
    939           -- Now update the representative to the sum
    940           UPDATE exchange_statistic_amount_event SET
    941              delta.val = my_total_value
    942             ,delta.frac = my_total_frac
    943            WHERE imeta_serial_id = my_meta
    944              AND h_payto = my_h_payto
    945              AND aevent_serial_id = my_sum.rep_serial_id;
    946         END IF;
    947         min_slot = min_slot + my_precision;
    948       END LOOP; -- min_slot to end_slot by precision loop
    949     END LOOP; -- my_i loop
    950     -- Finally, delete all events beyond the range we care about
    951 
    952 --  RAISE NOTICE 'deleting entries of %/% before % - % = %', my_h_payto, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)];
    953     DELETE FROM exchange_statistic_amount_event
    954      WHERE h_payto=my_h_payto
    955        AND imeta_serial_id=my_meta
    956        AND slot < my_time - my_ranges[array_length(my_ranges,1)];
    957     END LOOP; -- my_rec loop
    958   END LOOP; -- my_h_payto loop
    959 END $$;
    960 COMMENT ON PROCEDURE exchange_statistic_amount_gc
    961   IS 'Performs garbage collection and compaction of the exchange_statistic_amount_event table';
    962 
    963 
    964 
    965 DROP PROCEDURE IF EXISTS exchange_statistic_bucket_gc;
    966 CREATE OR REPLACE PROCEDURE exchange_statistic_bucket_gc ()
    967 LANGUAGE plpgsql
    968 AS $$
    969 DECLARE
    970   my_rec RECORD;
    971   my_range TEXT;
    972   my_now INT8;
    973   my_end INT8;
    974 BEGIN
    975   my_now = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP(0)::TIMESTAMP); -- seconds since epoch
    976   FOR my_rec IN
    977     SELECT bmeta_serial_id
    978           ,stype
    979           ,ranges[array_length(ranges,1)] AS range
    980           ,ages[array_length(ages,1)] AS age
    981       FROM exchange_statistic_bucket_meta
    982   LOOP
    983     my_range = '1 ' || my_rec.range::TEXT;
    984     my_end = my_now - my_rec.age * EXTRACT(SECONDS FROM (SELECT my_range::INTERVAL)); -- age is given in multiples of the range (in seconds)
    985     IF my_rec.stype = 'amount'
    986     THEN
    987       DELETE
    988         FROM exchange_statistic_bucket_amount
    989        WHERE bmeta_serial_id = my_rec.bmeta_serial_id
    990          AND bucket_start >= my_end;
    991     ELSE
    992       DELETE
    993         FROM exchange_statistic_bucket_counter
    994        WHERE bmeta_serial_id = my_rec.bmeta_serial_id
    995          AND bucket_start >= my_end;
    996     END IF;
    997   END LOOP;
    998 END $$;
    999 COMMENT ON PROCEDURE exchange_statistic_bucket_gc
   1000   IS 'Performs garbage collection of the exchange_statistic_bucket_counter and exchange_statistic_bucket_amount tables';
   1001 
   1002 
   1003 
   1004 DROP FUNCTION IF EXISTS exchange_drop_customization;
   1005 CREATE OR REPLACE FUNCTION exchange_drop_customization (
   1006   IN in_schema TEXT,
   1007   OUT out_found BOOLEAN
   1008 )
   1009 LANGUAGE plpgsql
   1010 AS $$
   1011 DECLARE
   1012   my_xpatches TEXT;
   1013 BEGIN
   1014   -- Update DB versioning table.
   1015   out_found = FALSE;
   1016   FOR my_xpatches IN
   1017     SELECT patch_name
   1018       FROM _v.patches
   1019      WHERE starts_with(patch_name, in_schema || '-')
   1020   LOOP
   1021     PERFORM _v.unregister_patch(my_xpatches);
   1022     out_found = TRUE;
   1023   END LOOP;
   1024 
   1025   IF out_found
   1026   THEN
   1027     -- Drop the schema with all stored procedures/functions.
   1028     -- This also removes all associated triggers, hence CASCADE.
   1029     EXECUTE FORMAT('DROP SCHEMA %s CASCADE'
   1030       ,in_schema);
   1031   END IF;
   1032 
   1033   -- Finally, need to also remove entries from the statistics meta-tables.
   1034   -- Doing so also DELETEs the associated statistics, hence CASCADE.
   1035   DELETE
   1036      FROM exchange_statistic_interval_meta
   1037     WHERE origin=in_schema;
   1038   DELETE
   1039      FROM exchange_statistic_bucket_meta
   1040     WHERE origin=in_schema;
   1041 END $$;
   1042 COMMENT ON FUNCTION exchange_drop_customization
   1043   IS 'Removes all entries related to a particular exchange customization schema';