exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

commit cd3764199ee5880cc2c6c2064dc83cdec21f82c5
parent 5493c3224bfff283e37525cef2f938a5f584e7ce
Author: Christian Grothoff <grothoff@gnunet.org>
Date:   Sun, 23 Mar 2025 16:23:16 +0100

starting point for exchange statistics, import from merchant

Diffstat:
Msrc/exchange-tools/taler-exchange-dbinit.c | 2+-
Asrc/exchangedb/exchange-xx10.sql | 263+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/exchangedb/exchange_statistics_helpers.sql | 1079+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 1343 insertions(+), 1 deletion(-)

diff --git a/src/exchange-tools/taler-exchange-dbinit.c b/src/exchange-tools/taler-exchange-dbinit.c @@ -45,7 +45,7 @@ static int reset_db; static int clear_shards; /** - * -g option: garbage collect DB reset + * -g option: garbage collect DB */ static int gc_db; diff --git a/src/exchangedb/exchange-xx10.sql b/src/exchangedb/exchange-xx10.sql @@ -0,0 +1,263 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +-- + +-- @file exchange-xx10.sql +-- @brief Tables for statistics +-- @author Christian Grothoff + +-- FIXME: add partitions and figure out if we do need NULL fo h_payto! + +BEGIN; + +-- Check patch versioning is in place. +SELECT _v.register_patch('exchange-xx10', NULL, NULL); + +SET search_path TO exchange; + +-- Ranges given here must be supported by the date_trunc function of Postgresql! +CREATE TYPE statistic_range AS + ENUM('century', 'decade', 'year', 'quarter', 'month', 'week', 'day', 'hour', 'minute', 'second'); + +CREATE TYPE statistic_type AS + ENUM('amount', 'number'); + +-- -------------- Bucket statistics --------------------- + +CREATE TABLE exchange_statistic_bucket_meta + (bmeta_serial_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY + ,origin TEXT NOT NULL + ,slug TEXT NOT NULL + ,description TEXT NOT NULL + ,stype statistic_type NOT NULL + ,ranges statistic_range[] NOT NULL + ,ages INT4[] NOT NULL + ,UNIQUE(slug,stype) + ,CONSTRAINT equal_array_length + CHECK (array_length(ranges,1) = + array_length(ages,1)) + ); +COMMENT ON TABLE exchange_statistic_bucket_meta + IS 'meta data about a statistic with events falling into buckets we are tracking'; +COMMENT ON COLUMN exchange_statistic_bucket_meta.bmeta_serial_id + IS 'unique identifier for this type of bucket statistic we are tracking'; +COMMENT ON COLUMN exchange_statistic_bucket_meta.origin + IS 'which customization schema does this statistic originate from (used for easy deletion)'; +COMMENT ON COLUMN exchange_statistic_bucket_meta.slug + IS 'keyword (or name) of the statistic; identifies what the statistic is about; should be a slug suitable for a URI path'; +COMMENT ON COLUMN exchange_statistic_bucket_meta.description + IS 'description of the statistic being tracked'; +COMMENT ON COLUMN exchange_statistic_bucket_meta.stype + IS 'statistic type, what kind of data is being tracked, amount or number'; +COMMENT ON COLUMN exchange_statistic_bucket_meta.ranges + IS 'size of the buckets that are being kept for this statistic'; +COMMENT ON COLUMN exchange_statistic_bucket_meta.ages + IS 'determines how long into the past we keep buckets for the range at the given index around (in generations)'; +CREATE INDEX exchange_statistic_bucket_meta_by_origin + ON exchange_statistic_bucket_meta + (origin); + + +CREATE TABLE exchange_statistic_bucket_counter + (bmeta_serial_id INT8 NOT NULL + REFERENCES exchange_statistic_bucket_meta (bmeta_serial_id) ON DELETE CASCADE + ,h_payto BYTEA CHECK (LENGTH(h_payto)=32) + ,bucket_start INT8 NOT NULL + ,bucket_range statistic_range NOT NULL + ,cumulative_number INT8 NOT NULL + ,UNIQUE (bmeta_serial_id,h_payto,bucket_start,bucket_range) + ); +COMMENT ON TABLE exchange_statistic_bucket_counter + IS 'various numeric statistics (cumulative counters) being tracked by bucket into which they fall'; +COMMENT ON COLUMN exchange_statistic_bucket_counter.bmeta_serial_id + IS 'identifies what the statistic is about'; +COMMENT ON COLUMN exchange_statistic_bucket_counter.h_payto + IS 'identifies an account (hash of normalized payto) for which the statistic is kept, NULL for global statistics'; +COMMENT ON COLUMN exchange_statistic_bucket_counter.bucket_start + IS 'start date for the bucket in seconds since the epoch'; +COMMENT ON COLUMN exchange_statistic_bucket_counter.bucket_range + IS 'range of the bucket'; +COMMENT ON COLUMN exchange_statistic_bucket_counter.cumulative_number + IS 'aggregate (sum) of tracked by the statistic; what exactly is tracked is determined by the keyword'; + + +CREATE TABLE exchange_statistic_bucket_amount + (bmeta_serial_id INT8 NOT NULL + REFERENCES exchange_statistic_bucket_meta (bmeta_serial_id) ON DELETE CASCADE + ,h_payto BYTEA CHECK (LENGTH(h_payto)=32) + ,bucket_start INT8 NOT NULL + ,bucket_range statistic_range NOT NULL + ,cumulative_value taler_amount NOT NULL + ,UNIQUE (bmeta_serial_id,h_payto,bucket_start,bucket_range) + ); +COMMENT ON TABLE exchange_statistic_bucket_amount + IS 'various amount statistics (in various currencies) being tracked'; +COMMENT ON COLUMN exchange_statistic_bucket_amount.bmeta_serial_id + IS 'identifies what the statistic is about'; +COMMENT ON COLUMN exchange_statistic_bucket_amount.h_payto + IS 'identifies an account (hash of normalized payto) for which the statistic is kept, NULL for global statistics'; +COMMENT ON COLUMN exchange_statistic_bucket_amount.bucket_start + IS 'start date for the bucket in seconds since the epoch'; +COMMENT ON COLUMN exchange_statistic_bucket_amount.bucket_range + IS 'range of the bucket'; +COMMENT ON COLUMN exchange_statistic_bucket_amount.cumulative_value + IS 'amount being tracked'; + + +-- -------------- Interval statistics --------------------- + + +CREATE TABLE exchange_statistic_interval_meta + (imeta_serial_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY + ,origin TEXT NOT NULL + ,slug TEXT NOT NULL + ,description TEXT NOT NULL + ,stype statistic_type NOT NULL + ,ranges INT8[] NOT NULL CHECK (array_length(ranges,1) > 0) + ,precisions INT8[] NOT NULL CHECK (array_length(precisions,1) > 0) + ,UNIQUE(slug,stype) + ,CONSTRAINT equal_array_length + CHECK (array_length(ranges,1) = + array_length(precisions,1)) + ); +COMMENT ON TABLE exchange_statistic_interval_meta + IS 'meta data about an interval statistic we are tracking'; +COMMENT ON COLUMN exchange_statistic_interval_meta.imeta_serial_id + IS 'unique identifier for this type of interval statistic we are tracking'; +COMMENT ON COLUMN exchange_statistic_interval_meta.origin + IS 'which customization schema does this statistic originate from (used for easy deletion)'; +COMMENT ON COLUMN exchange_statistic_interval_meta.slug + IS 'keyword (or name) of the statistic; identifies what the statistic is about; should be a slug suitable for a URI path'; +COMMENT ON COLUMN exchange_statistic_interval_meta.description + IS 'description of the statistic being tracked'; +COMMENT ON COLUMN exchange_statistic_interval_meta.stype + IS 'statistic type, what kind of data is being tracked, amount or number'; +COMMENT ON COLUMN exchange_statistic_interval_meta.ranges + IS 'range of values that is being kept for this statistic, in seconds, must be monotonically increasing'; +COMMENT ON COLUMN exchange_statistic_interval_meta.precisions + IS 'determines how precisely we track which events fall into the range at the same index (allowing us to coalesce events with timestamps in proximity close to the given precision), in seconds, 0 is not allowed'; +CREATE INDEX exchange_statistic_interval_meta_by_origin + ON exchange_statistic_interval_meta + (origin); + +CREATE TABLE exchange_statistic_counter_event + (nevent_serial_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY + ,imeta_serial_id INT8 + REFERENCES exchange_statistic_interval_meta (imeta_serial_id) ON DELETE CASCADE + ,h_payto BYTEA CHECK (LENGTH(h_payto)=32) + ,slot INT8 NOT NULL + ,delta INT8 NOT NULL + ,UNIQUE (imeta_serial_id, h_payto, slot) + ); +COMMENT ON TABLE exchange_statistic_counter_event + IS 'number to decrement an interval statistic by when a certain time value is reached'; +COMMENT ON COLUMN exchange_statistic_counter_event.nevent_serial_id + IS 'unique identifier for this number event'; +COMMENT ON COLUMN exchange_statistic_counter_event.imeta_serial_id + IS 'identifies what the statistic is about; must be of stype number'; +COMMENT ON COLUMN exchange_statistic_counter_event.h_payto + IS 'identifies an account (hash of normalized payto) for which the statistic is kept, NULL for global statistics'; +COMMENT ON COLUMN exchange_statistic_counter_event.slot + IS 'identifies the time slot at which the given event(s) happened, rounded down by the respective precisions value'; +COMMENT ON COLUMN exchange_statistic_counter_event.delta + IS 'total cumulative number that was added at the time identified by slot'; + +CREATE TABLE exchange_statistic_interval_counter + (imeta_serial_id INT8 NOT NULL + REFERENCES exchange_statistic_interval_meta (imeta_serial_id) ON DELETE CASCADE + ,h_payto BYTEA CHECK (LENGTH(h_payto)=32) + ,range INT8 NOT NULL + ,event_delimiter INT8 NOT NULL + REFERENCES exchange_statistic_counter_event (nevent_serial_id) ON DELETE RESTRICT + ,cumulative_number INT8 NOT NULL + ,UNIQUE (imeta_serial_id,h_payto,range) + ); +COMMENT ON TABLE exchange_statistic_interval_counter + IS 'various numeric statistics (cumulative counters) being tracked'; +COMMENT ON COLUMN exchange_statistic_interval_counter.imeta_serial_id + IS 'identifies what the statistic is about'; +COMMENT ON COLUMN exchange_statistic_interval_counter.h_payto + IS 'identifies an account (hash of normalized payto) for which the statistic is kept, NULL for global statistics'; +COMMENT ON COLUMN exchange_statistic_interval_counter.range + IS 'for which range is this the counter; note that the cumulative_number excludes the values already stored in smaller ranges'; +COMMENT ON COLUMN exchange_statistic_interval_counter.event_delimiter + IS 'determines the last event currently included in the interval'; +COMMENT ON COLUMN exchange_statistic_interval_counter.cumulative_number + IS 'aggregate (sum) of tracked by the statistic; what exactly is tracked is determined by the keyword'; + + +CREATE TABLE exchange_statistic_amount_event + (aevent_serial_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY + ,imeta_serial_id INT8 + REFERENCES exchange_statistic_interval_meta (imeta_serial_id) ON DELETE CASCADE + ,h_payto BYTEA CHECK (LENGTH(h_payto)=32) + ,slot INT8 NOT NULL + ,delta taler_amount NOT NULL + ,CONSTRAINT event_key UNIQUE (imeta_serial_id, h_payto, slot) + ); +COMMENT ON TABLE exchange_statistic_amount_event + IS 'amount to decrement an interval statistic by when a certain time value is reached'; +COMMENT ON COLUMN exchange_statistic_amount_event.aevent_serial_id + IS 'unique identifier for this amount event'; +COMMENT ON COLUMN exchange_statistic_amount_event.imeta_serial_id + IS 'identifies what the statistic is about; must be of clazz interval and of stype amount'; +COMMENT ON COLUMN exchange_statistic_amount_event.h_payto + IS 'identifies an account (hash of normalized payto) for which the statistic is kept, NULL for global statistics'; +COMMENT ON COLUMN exchange_statistic_amount_event.slot + IS 'identifies the time slot at which the given event(s) happened'; +COMMENT ON COLUMN exchange_statistic_amount_event.delta + IS 'total cumulative amount that was added at the time identified by slot'; + + +CREATE TABLE exchange_statistic_interval_amount + (imeta_serial_id INT8 NOT NULL + REFERENCES exchange_statistic_interval_meta (imeta_serial_id) ON DELETE CASCADE + ,h_payto BYTEA CHECK (LENGTH(h_payto)=32) + ,event_delimiter INT8 NOT NULL + REFERENCES exchange_statistic_amount_event (aevent_serial_id) ON DELETE RESTRICT + ,range INT8 NOT NULL + ,cumulative_value taler_amount NOT NULL + ,UNIQUE (imeta_serial_id,h_payto,range) + ); +COMMENT ON TABLE exchange_statistic_interval_amount + IS 'various amount statistics being tracked'; +COMMENT ON COLUMN exchange_statistic_interval_amount.imeta_serial_id + IS 'identifies what the statistic is about'; +COMMENT ON COLUMN exchange_statistic_interval_amount.h_payto + IS 'identifies an account (hash of normalized payto) for which the statistic is kept, NULL for global statistics'; +COMMENT ON COLUMN exchange_statistic_interval_amount.range + IS 'for which range is this the counter; note that the cumulative_number excludes the values already stored in smaller ranges'; +COMMENT ON COLUMN exchange_statistic_interval_amount.cumulative_value + IS 'amount affected by the event'; + +CREATE TYPE exchange_statistic_interval_number_get_return_value + AS + (range INT8 + ,rvalue INT8 + ); +COMMENT ON TYPE exchange_statistic_interval_number_get_return_value + IS 'Return type for exchange_statistic_interval_number_get stored procedure'; + +CREATE TYPE exchange_statistic_interval_amount_get_return_value + AS + (range INT8 + ,rvalue taler_amount + ); +COMMENT ON TYPE exchange_statistic_interval_amount_get_return_value + IS 'Return type for exchange_statistic_interval_amount_get stored procedure'; + + + +COMMIT; diff --git a/src/exchangedb/exchange_statistics_helpers.sql b/src/exchangedb/exchange_statistics_helpers.sql @@ -0,0 +1,1079 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +-- + +-- FIXME: adapt procedures to changes in table structure for the exchange: +-- - remove multi-currency support, consistent use of exchange amount type +-- - replace instances by h_payto (or NULL!) +-- - new 'origin' field in _meta + + + +SET search_path TO exchange; +DROP FUNCTION IF EXISTS interval_to_start; +CREATE OR REPLACE FUNCTION interval_to_start ( + IN in_timestamp TIMESTAMP, + IN in_range statistic_range, + OUT out_bucket_start INT8 +) +LANGUAGE plpgsql +AS $$ +BEGIN + out_bucket_start = EXTRACT(EPOCH FROM DATE_TRUNC(in_range::text, in_timestamp)); +END $$; +COMMENT ON FUNCTION interval_to_start + IS 'computes the start time of the bucket for an event at the current time given the desired bucket range'; + + +DROP PROCEDURE IF EXISTS exchange_do_bump_number_bucket_stat; +CREATE OR REPLACE PROCEDURE exchange_do_bump_number_bucket_stat( + in_slug TEXT, + in_exchange_serial BIGINT, + in_timestamp TIMESTAMP, + in_delta INT8 +) +LANGUAGE plpgsql +AS $$ +DECLARE + my_meta INT8; + my_range statistic_range; + my_bucket_start INT8; + my_curs CURSOR (arg_slug TEXT) + FOR SELECT UNNEST(ranges) + FROM exchange_statistic_bucket_meta + WHERE slug=arg_slug; +BEGIN + SELECT bmeta_serial_id + INTO my_meta + FROM exchange_statistic_bucket_meta + WHERE slug=in_slug + AND stype='number'; + IF NOT FOUND + THEN + RETURN; + END IF; + OPEN my_curs (arg_slug:=in_slug); + LOOP + FETCH NEXT + FROM my_curs + INTO my_range; + EXIT WHEN NOT FOUND; + SELECT * + INTO my_bucket_start + FROM interval_to_start (in_timestamp, my_range); + + UPDATE exchange_statistic_bucket_counter + SET cumulative_number = cumulative_number + in_delta + WHERE bmeta_serial_id=my_meta + AND exchange_serial=in_exchange_serial + AND bucket_start=my_bucket_start + AND bucket_range=my_range; + IF NOT FOUND + THEN + INSERT INTO exchange_statistic_bucket_counter + (bmeta_serial_id + ,exchange_serial + ,bucket_start + ,bucket_range + ,cumulative_number + ) VALUES ( + my_meta + ,in_exchange_serial + ,my_bucket_start + ,my_range + ,in_delta); + END IF; + END LOOP; + CLOSE my_curs; +END $$; + + +DROP PROCEDURE IF EXISTS exchange_do_bump_amount_bucket_stat; +CREATE OR REPLACE PROCEDURE exchange_do_bump_amount_bucket_stat( + in_slug TEXT, + in_exchange_serial BIGINT, + in_timestamp TIMESTAMP, + in_delta taler_amount_currency +) +LANGUAGE plpgsql +AS $$ +DECLARE + my_meta INT8; + my_range statistic_range; + my_bucket_start INT8; + my_curs CURSOR (arg_slug TEXT) + FOR SELECT UNNEST(ranges) + FROM exchange_statistic_bucket_meta + WHERE slug=arg_slug; +BEGIN + SELECT bmeta_serial_id + INTO my_meta + FROM exchange_statistic_bucket_meta + WHERE slug=in_slug + AND stype='amount'; + IF NOT FOUND + THEN + RETURN; + END IF; + OPEN my_curs (arg_slug:=in_slug); + LOOP + FETCH NEXT + FROM my_curs + INTO my_range; + EXIT WHEN NOT FOUND; + SELECT * + INTO my_bucket_start + FROM interval_to_start (in_timestamp, my_range); + + UPDATE exchange_statistic_bucket_amount + SET + cumulative_value = cumulative_value + (in_delta).val + + CASE + WHEN (in_delta).frac + cumulative_frac >= 100000000 + THEN 1 + ELSE 0 + END, + cumulative_frac = cumulative_frac + (in_delta).frac + - CASE + WHEN (in_delta).frac + cumulative_frac >= 100000000 + THEN 100000000 + ELSE 0 + END + WHERE bmeta_serial_id=my_meta + AND exchange_serial=in_exchange_serial + AND curr=(in_delta).curr + AND bucket_start=my_bucket_start + AND bucket_range=my_range; + IF NOT FOUND + THEN + INSERT INTO exchange_statistic_bucket_amount + (bmeta_serial_id + ,exchange_serial + ,bucket_start + ,bucket_range + ,curr + ,cumulative_value + ,cumulative_frac + ) VALUES ( + my_meta + ,in_exchange_serial + ,my_bucket_start + ,my_range + ,(in_delta).curr + ,(in_delta).val + ,(in_delta).frac); + END IF; + END LOOP; + CLOSE my_curs; +END $$; + +COMMENT ON PROCEDURE exchange_do_bump_amount_bucket_stat + IS 'Updates an amount statistic tracked over buckets'; + + +DROP PROCEDURE IF EXISTS exchange_do_bump_number_interval_stat; +CREATE OR REPLACE PROCEDURE exchange_do_bump_number_interval_stat( + in_slug TEXT, + in_exchange_serial BIGINT, + in_timestamp TIMESTAMP, + in_delta INT8 +) +LANGUAGE plpgsql +AS $$ +DECLARE + my_now INT8; + my_record RECORD; + my_meta INT8; + my_ranges INT8[]; + my_precisions INT8[]; + my_rangex INT8; + my_precisionx INT8; + my_start INT8; + my_event INT8; +BEGIN + my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; + SELECT imeta_serial_id + ,ranges AS ranges + ,precisions AS precisions + INTO my_record + FROM exchange_statistic_interval_meta + WHERE slug=in_slug + AND stype='number'; + IF NOT FOUND + THEN + RETURN; + END IF; + + my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds + my_precisions = my_record.precisions; + my_ranges = my_record.ranges; + my_rangex = NULL; + FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0) + LOOP + IF my_now - my_ranges[my_x] < my_start + THEN + my_rangex = my_ranges[my_x]; + my_precisionx = my_precisions[my_x]; + EXIT; + END IF; + END LOOP; + IF my_rangex IS NULL + THEN + -- event is beyond the ranges we care about + RETURN; + END IF; + + my_meta = my_record.imeta_serial_id; + my_start = my_start - my_start % my_precisionx; -- round down + + INSERT INTO exchange_statistic_counter_event AS msce + (imeta_serial_id + ,exchange_serial + ,slot + ,delta) + VALUES + (my_meta + ,in_exchange_serial + ,my_start + ,in_delta) + ON CONFLICT (imeta_serial_id, exchange_serial, slot) + DO UPDATE SET + delta = msce.delta + in_delta + RETURNING nevent_serial_id + INTO my_event; + + UPDATE exchange_statistic_interval_counter + SET cumulative_number = cumulative_number + in_delta + WHERE imeta_serial_id = my_meta + AND exchange_serial = in_exchange_serial + AND range=my_rangex; + IF NOT FOUND + THEN + INSERT INTO exchange_statistic_interval_counter + (imeta_serial_id + ,exchange_serial + ,range + ,event_delimiter + ,cumulative_number + ) VALUES ( + my_meta + ,in_exchange_serial + ,my_rangex + ,my_event + ,in_delta); + END IF; +END $$; + +COMMENT ON PROCEDURE exchange_do_bump_number_interval_stat + IS 'Updates a numeric statistic tracked over an interval'; + + +DROP PROCEDURE IF EXISTS exchange_do_bump_amount_interval_stat; +CREATE OR REPLACE PROCEDURE exchange_do_bump_amount_interval_stat( + in_slug TEXT, + in_exchange_serial BIGINT, + in_timestamp TIMESTAMP, + in_delta taler_amount_currency -- new amount in table that we should add to the tracker +) +LANGUAGE plpgsql +AS $$ +DECLARE + my_now INT8; + my_record RECORD; + my_meta INT8; + my_ranges INT8[]; + my_precisions INT8[]; + my_x INT; + my_rangex INT8; + my_precisionx INT8; + my_start INT8; + my_event INT8; +BEGIN + my_now = ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; + SELECT imeta_serial_id + ,ranges + ,precisions + INTO my_record + FROM exchange_statistic_interval_meta + WHERE slug=in_slug + AND stype='amount'; + IF NOT FOUND + THEN + RETURN; + END IF; + + my_start = ROUND(EXTRACT(epoch FROM in_timestamp) * 1000000)::INT8 / 1000 / 1000; -- convert to seconds since epoch + my_precisions = my_record.precisions; + my_ranges = my_record.ranges; + my_rangex = NULL; + FOR my_x IN 1..COALESCE(array_length(my_ranges,1),0) + LOOP + IF my_now - my_ranges[my_x] < my_start + THEN + my_rangex = my_ranges[my_x]; + my_precisionx = my_precisions[my_x]; + EXIT; + END IF; + END LOOP; + IF my_rangex IS NULL + THEN + -- event is beyond the ranges we care about + RETURN; + END IF; + my_start = my_start - my_start % my_precisionx; -- round down + my_meta = my_record.imeta_serial_id; + + INSERT INTO exchange_statistic_amount_event AS msae + (imeta_serial_id + ,exchange_serial + ,slot + ,delta_curr + ,delta_value + ,delta_frac + ) VALUES ( + my_meta + ,in_exchange_serial + ,my_start + ,(in_delta).curr + ,(in_delta).val + ,(in_delta).frac + ) + ON CONFLICT (imeta_serial_id, exchange_serial, slot, delta_curr) + DO UPDATE SET + delta_value = msae.delta_value + (in_delta).val + + CASE + WHEN (in_delta).frac + msae.delta_frac >= 100000000 + THEN 1 + ELSE 0 + END, + delta_frac = msae.delta_frac + (in_delta).frac + - CASE + WHEN (in_delta).frac + msae.delta_frac >= 100000000 + THEN 100000000 + ELSE 0 + END + RETURNING aevent_serial_id + INTO my_event; + + UPDATE exchange_statistic_interval_amount + SET + cumulative_value = cumulative_value + (in_delta).val + + CASE + WHEN (in_delta).frac + cumulative_frac >= 100000000 + THEN 1 + ELSE 0 + END, + cumulative_frac = cumulative_frac + (in_delta).frac + - CASE + WHEN (in_delta).frac + cumulative_frac >= 100000000 + THEN 100000000 + ELSE 0 + END + WHERE imeta_serial_id=my_meta + AND exchange_serial=in_exchange_serial + AND range=my_rangex + AND curr=(in_delta).curr; + IF NOT FOUND + THEN + INSERT INTO exchange_statistic_interval_amount + (imeta_serial_id + ,exchange_serial + ,range + ,event_delimiter + ,curr + ,cumulative_value + ,cumulative_frac + ) VALUES ( + my_meta + ,in_exchange_serial + ,my_rangex + ,my_event + ,(in_delta).curr + ,(in_delta).val + ,(in_delta).frac); + END IF; +END $$; +COMMENT ON PROCEDURE exchange_do_bump_amount_interval_stat + IS 'Updates an amount statistic tracked over an interval'; + + +DROP PROCEDURE IF EXISTS exchange_do_bump_number_stat; +CREATE OR REPLACE PROCEDURE exchange_do_bump_number_stat( + in_slug TEXT, + in_exchange_serial BIGINT, + in_timestamp TIMESTAMP, + in_delta INT8 +) +LANGUAGE plpgsql +AS $$ +BEGIN + CALL exchange_do_bump_number_bucket_stat (in_slug, in_exchange_serial, in_timestamp, in_delta); + CALL exchange_do_bump_number_interval_stat (in_slug, in_exchange_serial, in_timestamp, in_delta); +END $$; +COMMENT ON PROCEDURE exchange_do_bump_number_stat + IS 'Updates a numeric statistic (bucket or interval)'; + + +DROP PROCEDURE IF EXISTS exchange_do_bump_amount_stat; +CREATE OR REPLACE PROCEDURE exchange_do_bump_amount_stat( + in_slug TEXT, + in_exchange_serial BIGINT, + in_timestamp TIMESTAMP, + in_delta taler_amount_currency +) +LANGUAGE plpgsql +AS $$ +BEGIN + CALL exchange_do_bump_amount_bucket_stat (in_slug, in_exchange_serial, in_timestamp, in_delta); + CALL exchange_do_bump_amount_interval_stat (in_slug, in_exchange_serial, in_timestamp, in_delta); +END $$; +COMMENT ON PROCEDURE exchange_do_bump_amount_stat + IS 'Updates an amount statistic (bucket or interval)'; + + +DROP FUNCTION IF EXISTS exchange_statistic_interval_number_get; +CREATE OR REPLACE FUNCTION exchange_statistic_interval_number_get ( + IN in_slug TEXT, + IN in_instance_id TEXT +) +RETURNS SETOF exchange_statistic_interval_number_get_return_value +LANGUAGE plpgsql +AS $$ +DECLARE + my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; + my_ranges INT8[]; + my_range INT8; + my_delta INT8; + my_meta INT8; + my_next_max_serial INT8; + my_instance_id INT8; + my_rec RECORD; + my_irec RECORD; + my_i INT; + my_min_serial INT8 DEFAULT NULL; + my_rval exchange_statistic_interval_number_get_return_value; +BEGIN + SELECT exchange_serial + INTO my_instance_id + FROM exchange_instances + WHERE exchange_id=in_instance_id; + IF NOT FOUND + THEN + RETURN; + END IF; + + SELECT imeta_serial_id + ,ranges + ,precisions + INTO my_rec + FROM exchange_statistic_interval_meta + WHERE slug=in_slug; + IF NOT FOUND + THEN + RETURN; + END IF; + my_rval.rvalue = 0; + my_ranges = my_rec.ranges; + my_meta = my_rec.imeta_serial_id; + + FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) + LOOP + my_range = my_ranges[my_i]; + SELECT event_delimiter + ,cumulative_number + INTO my_irec + FROM exchange_statistic_interval_counter + WHERE imeta_serial_id = my_meta + AND range = my_range + AND exchange_serial = my_instance_id; + IF FOUND + THEN + my_min_serial = my_irec.event_delimiter; + my_rval.rvalue = my_rval.rvalue + my_irec.cumulative_number; + + -- Check if we have events that left the applicable range + SELECT SUM(delta) AS delta_sum + INTO my_irec + FROM exchange_statistic_counter_event + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND slot < my_time - my_range + AND nevent_serial_id >= my_min_serial; + + IF FOUND AND my_irec.delta_sum IS NOT NULL + THEN + my_delta = my_irec.delta_sum; + my_rval.rvalue = my_rval.rvalue - my_delta; + + -- First find out the next event delimiter value + SELECT nevent_serial_id + INTO my_next_max_serial + FROM exchange_statistic_counter_event + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND slot >= my_time - my_range + AND nevent_serial_id >= my_min_serial + ORDER BY slot ASC + LIMIT 1; + + IF FOUND + THEN + -- remove expired events from the sum of the current slot + + UPDATE exchange_statistic_interval_counter + SET cumulative_number = cumulative_number - my_delta, + event_delimiter = my_next_max_serial + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND range = my_range; + ELSE + -- actually, slot is now empty, remove it entirely + DELETE FROM exchange_statistic_interval_counter + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND range = my_range; + END IF; + IF (my_i < array_length(my_ranges,1)) + THEN + -- carry over all events into the next slot + UPDATE exchange_statistic_interval_counter AS usic SET + cumulative_number = cumulative_number + my_delta, + event_delimiter = LEAST(usic.event_delimiter,my_min_serial) + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND range=my_ranges[my_i+1]; + IF NOT FOUND + THEN + INSERT INTO exchange_statistic_interval_counter + (imeta_serial_id + ,exchange_serial + ,range + ,event_delimiter + ,cumulative_number + ) VALUES ( + my_meta + ,my_instance_id + ,my_ranges[my_i+1] + ,my_min_serial + ,my_delta); + END IF; + ELSE + -- events are obsolete, delete them + DELETE FROM exchange_statistic_counter_event + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND slot < my_time - my_range; + END IF; + END IF; + + my_rval.range = my_range; + RETURN NEXT my_rval; + END IF; + END LOOP; +END $$; + +COMMENT ON FUNCTION exchange_statistic_interval_number_get + IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value for each range'; + + +DROP FUNCTION IF EXISTS exchange_statistic_interval_amount_get; +CREATE OR REPLACE FUNCTION exchange_statistic_interval_amount_get ( + IN in_slug TEXT, + IN in_instance_id TEXT +) +RETURNS SETOF exchange_statistic_interval_amount_get_return_value +LANGUAGE plpgsql +AS $$ +DECLARE + my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; + my_ranges INT8[]; + my_range INT8; + my_delta_value INT8; + my_delta_frac INT8; + my_meta INT8; + my_instance_id INT8; + my_next_max_serial INT8; + my_currency TEXT; + my_rec RECORD; + my_irec RECORD; + my_jrec RECORD; + my_i INT; + my_min_serial INT8 DEFAULT NULL; + my_rval exchange_statistic_interval_amount_get_return_value; +BEGIN + SELECT exchange_serial + INTO my_instance_id + FROM exchange_instances + WHERE exchange_id=in_instance_id; + IF NOT FOUND + THEN + RETURN; + END IF; + + SELECT imeta_serial_id + ,ranges + ,precisions + INTO my_rec + FROM exchange_statistic_interval_meta + WHERE slug=in_slug; + IF NOT FOUND + THEN + RETURN; + END IF; + + my_meta = my_rec.imeta_serial_id; + my_ranges = my_rec.ranges; + + FOR my_currency IN + SELECT DISTINCT delta_curr + FROM exchange_statistic_amount_event + WHERE imeta_serial_id = my_meta + LOOP + + my_rval.rvalue.val = 0; + my_rval.rvalue.frac = 0; + my_rval.rvalue.curr = my_currency; + + FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) + LOOP + my_range = my_ranges[my_i]; + SELECT event_delimiter + ,cumulative_value + ,cumulative_frac + INTO my_irec + FROM exchange_statistic_interval_amount + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND curr = my_currency + AND range = my_range; + + IF FOUND + THEN + my_min_serial = my_irec.event_delimiter; + my_rval.rvalue.val = (my_rval.rvalue).val + my_irec.cumulative_value + my_irec.cumulative_frac / 100000000; + my_rval.rvalue.frac = (my_rval.rvalue).frac + my_irec.cumulative_frac % 100000000; + IF (my_rval.rvalue).frac > 100000000 + THEN + my_rval.rvalue.frac = (my_rval.rvalue).frac - 100000000; + my_rval.rvalue.val = (my_rval.rvalue).val + 1; + END IF; + + -- Check if we have events that left the applicable range + SELECT SUM(delta_value) AS value_sum + ,SUM(delta_frac) AS frac_sum + INTO my_jrec + FROM exchange_statistic_amount_event + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND delta_curr = my_currency + AND slot < my_time - my_range + AND aevent_serial_id >= my_min_serial; + + IF FOUND AND my_jrec.value_sum IS NOT NULL + THEN + -- Normalize sum + my_delta_value = my_jrec.value_sum + my_jrec.frac_sum / 100000000; + my_delta_frac = my_jrec.frac_sum % 100000000; + my_rval.rvalue.val = (my_rval.rvalue).val - my_delta_value; + IF ((my_rval.rvalue).frac >= my_delta_frac) + THEN + my_rval.rvalue.frac = (my_rval.rvalue).frac - my_delta_frac; + ELSE + my_rval.rvalue.frac = 100000000 + (my_rval.rvalue).frac - my_delta_frac; + my_rval.rvalue.val = (my_rval.rvalue).val - 1; + END IF; + + -- First find out the next event delimiter value + SELECT aevent_serial_id + INTO my_next_max_serial + FROM exchange_statistic_amount_event + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND delta_curr = my_currency + AND slot >= my_time - my_range + AND aevent_serial_id >= my_min_serial + ORDER BY slot ASC + LIMIT 1; + IF FOUND + THEN + -- remove expired events from the sum of the current slot + UPDATE exchange_statistic_interval_amount SET + cumulative_value = cumulative_value - my_delta_value + - CASE + WHEN cumulative_frac < my_delta_frac + THEN 1 + ELSE 0 + END, + cumulative_frac = cumulative_frac - my_delta_frac + + CASE + WHEN cumulative_frac < my_delta_frac + THEN 100000000 + ELSE 0 + END, + event_delimiter = my_next_max_serial + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND curr = my_currency + AND range = my_range; + ELSE + -- actually, slot is now empty, remove it entirely + DELETE FROM exchange_statistic_interval_amount + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND curr = my_currency + AND range = my_range; + END IF; + IF (my_i < array_length(my_ranges,1)) + THEN + -- carry over all events into the next (larger) slot + UPDATE exchange_statistic_interval_amount AS msia SET + cumulative_value = cumulative_value + my_delta_value + + CASE + WHEN cumulative_frac + my_delta_frac > 100000000 + THEN 1 + ELSE 0 + END, + cumulative_frac = cumulative_frac + my_delta_value + - CASE + WHEN cumulative_frac + my_delta_frac > 100000000 + THEN 100000000 + ELSE 0 + END, + event_delimiter = LEAST (msia.event_delimiter,my_min_serial) + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND range=my_ranges[my_i+1]; + IF NOT FOUND + THEN + INSERT INTO exchange_statistic_interval_amount + (imeta_serial_id + ,exchange_serial + ,event_delimiter + ,range + ,curr + ,cumulative_value + ,cumulative_frac + ) VALUES ( + my_meta + ,my_instance_id + ,my_min_serial + ,my_ranges[my_i+1] + ,my_currency + ,my_delta_value + ,my_delta_frac); + END IF; + ELSE + -- events are obsolete, delete them + DELETE FROM exchange_statistic_amount_event + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance_id + AND slot < my_time - my_range; + END IF; + END IF; + + my_rval.range = my_range; + RETURN NEXT my_rval; + END IF; + END LOOP; -- over my_ranges + END LOOP; -- over my_currency +END $$; + +COMMENT ON FUNCTION exchange_statistic_interval_amount_get + IS 'Returns deposit statistic tracking deposited amounts over certain time intervals; we first trim the stored data to only track what is still in-range, and then return the remaining value; multiple values are returned, one per currency and range'; + + + + + +DROP PROCEDURE IF EXISTS exchange_statistic_counter_gc; +CREATE OR REPLACE PROCEDURE exchange_statistic_counter_gc () +LANGUAGE plpgsql +AS $$ +DECLARE + my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; + my_instance INT8; + my_instance_name TEXT; + my_rec RECORD; + my_sum RECORD; + my_meta INT8; + my_ranges INT8[]; + my_precisions INT8[]; + my_precision INT4; + my_i INT4; + min_slot INT8; + max_slot INT8; + end_slot INT8; + my_total INT8; +BEGIN + -- GC for all instances + FOR my_instance IN + SELECT DISTINCT exchange_serial + FROM exchange_statistic_counter_event + LOOP + -- Do combination work for all numeric statistic events + FOR my_rec IN + SELECT imeta_serial_id + ,ranges + ,precisions + ,slug + FROM exchange_statistic_interval_meta + LOOP + -- First, we query the current interval statistic to update its counters + SELECT exchange_id + INTO my_instance_name + FROM exchange_instances + WHERE exchange_serial = my_instance; + PERFORM FROM exchange_statistic_interval_number_get (my_rec.slug, my_instance_name); + + my_meta = my_rec.imeta_serial_id; + my_ranges = my_rec.ranges; + my_precisions = my_rec.precisions; + + FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) + LOOP + my_precision = my_precisions[my_i]; + IF 1 >= my_precision + THEN + -- Cannot coarsen in this case + CONTINUE; + END IF; + + IF 1 = my_i + THEN + min_slot = 0; + ELSE + min_slot = my_ranges[my_i - 1]; + END IF; + end_slot = my_ranges[my_i]; + RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision; + + LOOP + EXIT WHEN min_slot >= end_slot; + max_slot = min_slot + my_precision; + SELECT SUM(delta) AS total, + COUNT(*) AS matches, + MIN(nevent_serial_id) AS rep_serial_id + INTO my_sum + FROM exchange_statistic_counter_event + WHERE exchange_serial=my_instance + AND imeta_serial_id=my_meta + AND slot >= my_time - max_slot + AND slot < my_time - min_slot; + + RAISE NOTICE 'Found % entries between [%,%)', my_sum.matches, my_time - max_slot, my_time - min_slot; + -- we only proceed if we had more then one match (optimization) + IF FOUND AND my_sum.matches > 1 + THEN + my_total = my_sum.total; + + RAISE NOTICE 'combining % entries to representative % for slots [%-%)', my_sum.matches, my_sum.rep_serial_id, my_time - max_slot, my_time - min_slot; + + -- combine entries + DELETE FROM exchange_statistic_counter_event + WHERE exchange_serial=my_instance + AND imeta_serial_id=my_meta + AND slot >= my_time - max_slot + AND slot < my_time - min_slot + AND nevent_serial_id > my_sum.rep_serial_id; + -- Now update the representative to the sum + UPDATE exchange_statistic_counter_event SET + delta = my_total + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance + AND nevent_serial_id = my_sum.rep_serial_id; + END IF; + min_slot = min_slot + my_precision; + END LOOP; -- min_slot to end_slot by precision loop + END LOOP; -- my_i loop + -- Finally, delete all events beyond the range we care about + + RAISE NOTICE 'deleting entries of %/% before % - % = %', my_instance, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)]; + DELETE FROM exchange_statistic_counter_event + WHERE exchange_serial=my_instance + AND imeta_serial_id=my_meta + AND slot < my_time - my_ranges[array_length(my_ranges,1)]; + END LOOP; -- my_rec loop + END LOOP; -- my_instance loop +END $$; +COMMENT ON PROCEDURE exchange_statistic_counter_gc + IS 'Performs garbage collection and compaction of the exchange_statistic_counter_event table'; + + + +DROP PROCEDURE IF EXISTS exchange_statistic_amount_gc; +CREATE OR REPLACE PROCEDURE exchange_statistic_amount_gc () +LANGUAGE plpgsql +AS $$ +DECLARE + my_time INT8 DEFAULT ROUND(EXTRACT(epoch FROM CURRENT_TIMESTAMP(0)::TIMESTAMP) * 1000000)::INT8 / 1000 / 1000; + my_instance INT8; + my_instance_name TEXT; + my_rec RECORD; + my_sum RECORD; + my_meta INT8; + my_ranges INT8[]; + my_precisions INT8[]; + my_precision INT4; + my_currency TEXT; + my_i INT4; + min_slot INT8; + max_slot INT8; + end_slot INT8; + my_total_val INT8; + my_total_frac INT8; +BEGIN + -- GC for all instances + FOR my_instance IN + SELECT DISTINCT exchange_serial + FROM exchange_statistic_counter_event + LOOP + -- Do combination work for all numeric statistic events + FOR my_rec IN + SELECT imeta_serial_id + ,ranges + ,precisions + ,slug + FROM exchange_statistic_interval_meta + LOOP + + -- First, we query the current interval statistic to update its counters + SELECT exchange_id + INTO my_instance_name + FROM exchange_instances + WHERE exchange_serial = my_instance; + PERFORM FROM exchange_statistic_interval_amount_get (my_rec.slug, my_instance_name); + + my_meta = my_rec.imeta_serial_id; + my_ranges = my_rec.ranges; + my_precisions = my_rec.precisions; + FOR my_currency IN + SELECT DISTINCT delta_curr + FROM exchange_statistic_amount_event + WHERE imeta_serial_id = my_meta + LOOP + + FOR my_i IN 1..COALESCE(array_length(my_ranges,1),0) + LOOP + my_precision = my_precisions[my_i]; + IF 1 >= my_precision + THEN + -- Cannot coarsen in this case + CONTINUE; + END IF; + + IF 1 = my_i + THEN + min_slot = 0; + ELSE + min_slot = my_ranges[my_i - 1]; + END IF; + end_slot = my_ranges[my_i]; + + RAISE NOTICE 'Coarsening from [%,%) at %', my_time - end_slot, my_time - min_slot, my_precision; + LOOP + EXIT WHEN min_slot >= end_slot; + max_slot = min_slot + my_precision; + SELECT SUM(delta_value) AS total_val, + SUM(delta_frac) AS total_frac, + COUNT(*) AS matches, + MIN(aevent_serial_id) AS rep_serial_id + INTO my_sum + FROM exchange_statistic_amount_event + WHERE imeta_serial_id=my_meta + AND exchange_serial=my_instance + AND delta_curr = my_currency + AND slot >= my_time - max_slot + AND slot < my_time - max_slot; + -- we only proceed if we had more then one match (optimization) + IF FOUND AND my_sum.matches > 1 + THEN + -- normalize new total + my_total_frac = my_sum.total_frac % 100000000; + my_total_val = my_sum.total_val + my_sum.total_frac / 100000000; + + -- combine entries + DELETE FROM exchange_statistic_amount_event + WHERE imeta_serial_id=my_meta + AND exchange_serial=my_instance + AND delta_curr = my_currency + AND slot >= my_time - max_slot + AND slot < my_time - max_slot + AND aevent_serial_id > my_sum.rep_serial_id; + -- Now update the representative to the sum + UPDATE exchange_statistic_amount_event SET + delta_value = my_total_value + ,delta_frac = my_total_frac + WHERE imeta_serial_id = my_meta + AND exchange_serial = my_instance + AND delta_curr = my_currency + AND aevent_serial_id = my_sum.rep_serial_id; + END IF; + min_slot = min_slot + my_precision; + END LOOP; -- min_slot to end_slot by precision loop + END LOOP; -- my_i loop + END LOOP; -- my_currency loop + -- Finally, delete all events beyond the range we care about + + RAISE NOTICE 'deleting entries of %/% before % - % = %', my_instance, my_meta, my_time, my_ranges[array_length(my_ranges,1)], my_time - my_ranges[array_length(my_ranges,1)]; + DELETE FROM exchange_statistic_amount_event + WHERE exchange_serial=my_instance + AND imeta_serial_id=my_meta + AND slot < my_time - my_ranges[array_length(my_ranges,1)]; + END LOOP; -- my_rec loop + END LOOP; -- my_instance loop +END $$; +COMMENT ON PROCEDURE exchange_statistic_amount_gc + IS 'Performs garbage collection and compaction of the exchange_statistic_amount_event table'; + + + +DROP PROCEDURE IF EXISTS exchange_statistic_bucket_gc; +CREATE OR REPLACE PROCEDURE exchange_statistic_bucket_gc () +LANGUAGE plpgsql +AS $$ +DECLARE + my_rec RECORD; + my_range TEXT; + my_now INT8; + my_end INT8; +BEGIN + my_now = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP(0)::TIMESTAMP); -- seconds since epoch + FOR my_rec IN + SELECT bmeta_serial_id + ,stype + ,ranges[array_length(ranges,1)] AS range + ,ages[array_length(ages,1)] AS age + FROM exchange_statistic_bucket_meta + LOOP + my_range = '1 ' || my_rec.range::TEXT; + my_end = my_now - my_rec.age * EXTRACT(SECONDS FROM (SELECT my_range::INTERVAL)); -- age is given in multiples of the range (in seconds) + IF my_rec.stype = 'amount' + THEN + DELETE + FROM exchange_statistic_bucket_amount + WHERE bmeta_serial_id = my_rec.bmeta_serial_id + AND bucket_start >= my_end; + ELSE + DELETE + FROM exchange_statistic_bucket_counter + WHERE bmeta_serial_id = my_rec.bmeta_serial_id + AND bucket_start >= my_end; + END IF; + END LOOP; +END $$; +COMMENT ON PROCEDURE exchange_statistic_bucket_gc + IS 'Performs garbage collection of the exchange_statistic_bucket_counter and exchange_statistic_bucket_amount tables'; + +