/* This file is part of TALER Copyright (C) 2014-2024 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with TALER; see the file COPYING. If not, see */ /** * @file plugin_auditordb_postgres.c * @brief Low-level (statement-level) Postgres database access for the auditor * @author Christian Grothoff * @author Gabor X Toth */ #include "platform.h" #include "taler_pq_lib.h" #include #include #include "pg_delete_deposit_confirmations.h" #include "pg_delete_pending_deposit.h" #include "pg_delete_purse_info.h" #include "pg_del_denomination_balance.h" #include "pg_del_reserve_info.h" #include "pg_get_auditor_progress.h" #include "pg_get_balance.h" #include "pg_get_denomination_balance.h" #include "pg_get_deposit_confirmations.h" #include "pg_get_purse_info.h" #include "pg_get_reserve_info.h" #include "pg_get_wire_fee_summary.h" #include "pg_helper.h" #include "pg_insert_auditor_progress.h" #include "pg_insert_balance.h" #include "pg_insert_denomination_balance.h" #include "pg_insert_deposit_confirmation.h" #include "pg_insert_exchange_signkey.h" #include "pg_insert_historic_denom_revenue.h" #include "pg_insert_historic_reserve_revenue.h" #include "pg_insert_pending_deposit.h" #include "pg_insert_purse_info.h" #include "pg_insert_reserve_info.h" #include "pg_select_historic_denom_revenue.h" #include "pg_select_historic_reserve_revenue.h" #include "pg_select_pending_deposits.h" #include "pg_select_purse_expired.h" #include "pg_update_auditor_progress.h" #include "pg_update_balance.h" #include "pg_update_denomination_balance.h" #include "pg_update_purse_info.h" #include "pg_update_reserve_info.h" #include "pg_update_wire_fee_summary.h" #define LOG(kind,...) GNUNET_log_from (kind, "taler-auditordb-postgres", \ __VA_ARGS__) /** * Drop all auditor tables OR deletes recoverable auditor state. * This should only be used by testcases or when restarting the * auditor from scratch. * * @param cls the `struct PostgresClosure` with the plugin-specific state * @param drop_exchangelist drop all tables, including schema versioning * and the exchange and deposit_confirmations table; NOT to be * used when restarting the auditor * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure */ static enum GNUNET_GenericReturnValue postgres_drop_tables (void *cls, bool drop_exchangelist) { struct PostgresClosure *pc = cls; struct GNUNET_PQ_Context *conn; enum GNUNET_GenericReturnValue ret; conn = GNUNET_PQ_connect_with_cfg (pc->cfg, "auditordb-postgres", NULL, NULL, NULL); if (NULL == conn) return GNUNET_SYSERR; ret = GNUNET_PQ_exec_sql (conn, (drop_exchangelist) ? "drop" : "restart"); GNUNET_PQ_disconnect (conn); return ret; } /** * Create the necessary tables if they are not present * * @param cls the `struct PostgresClosure` with the plugin-specific state * @param support_partitions true to support partitioning * @param num_partitions number of partitions to use * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure */ static enum GNUNET_GenericReturnValue postgres_create_tables (void *cls, bool support_partitions, uint32_t num_partitions) { struct PostgresClosure *pc = cls; enum GNUNET_GenericReturnValue ret = GNUNET_OK; struct GNUNET_PQ_Context *conn; struct GNUNET_PQ_QueryParam params[] = { support_partitions ? GNUNET_PQ_query_param_uint32 (&num_partitions) : GNUNET_PQ_query_param_null (), GNUNET_PQ_query_param_end }; struct GNUNET_PQ_PreparedStatement ps[] = { GNUNET_PQ_make_prepare ("create_tables", "SELECT" " auditor.do_create_tables" " ($1);"), GNUNET_PQ_PREPARED_STATEMENT_END }; struct GNUNET_PQ_ExecuteStatement es[] = { GNUNET_PQ_make_try_execute ("SET search_path TO auditor;"), GNUNET_PQ_EXECUTE_STATEMENT_END }; conn = GNUNET_PQ_connect_with_cfg (pc->cfg, "auditordb-postgres", "auditor-", es, ps); if (NULL == conn) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to connect to database\n"); return GNUNET_SYSERR; } if (0 > GNUNET_PQ_eval_prepared_non_select (conn, "create_tables", params)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to run 'create_tables' prepared statement\n"); ret = GNUNET_SYSERR; } if (GNUNET_OK == ret) { ret = GNUNET_PQ_exec_sql (conn, "procedures"); if (GNUNET_OK != ret) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to load stored procedures\n"); } GNUNET_PQ_disconnect (conn); return ret; } /** * Register callback to be invoked on events of type @a es. * * @param cls database context to use * @param es specification of the event to listen for * @param timeout how long to wait for the event * @param cb function to call when the event happens, possibly * mulrewardle times (until cancel is invoked) * @param cb_cls closure for @a cb * @return handle useful to cancel the listener */ static struct GNUNET_DB_EventHandler * postgres_event_listen (void *cls, const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_TIME_Relative timeout, GNUNET_DB_EventCallback cb, void *cb_cls) { struct PostgresClosure *pg = cls; return GNUNET_PQ_event_listen (pg->conn, es, timeout, cb, cb_cls); } /** * Stop notifications. * * @param eh handle to unregister. */ static void postgres_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) { GNUNET_PQ_event_listen_cancel (eh); } /** * Notify all that listen on @a es of an event. * * @param cls database context to use * @param es specification of the event to generate * @param extra additional event data provided * @param extra_size number of bytes in @a extra */ static void postgres_event_notify (void *cls, const struct GNUNET_DB_EventHeaderP *es, const void *extra, size_t extra_size) { struct PostgresClosure *pg = cls; return GNUNET_PQ_event_notify (pg->conn, es, extra, extra_size); } /** * Connect to the db if the connection does not exist yet. * * @param[in,out] pg the plugin-specific state * @return #GNUNET_OK on success */ static enum GNUNET_GenericReturnValue setup_connection (struct PostgresClosure *pg) { struct GNUNET_PQ_ExecuteStatement es[] = { GNUNET_PQ_make_try_execute ("SET search_path TO auditor;"), GNUNET_PQ_EXECUTE_STATEMENT_END }; struct GNUNET_PQ_Context *db_conn; if (NULL != pg->conn) { GNUNET_PQ_reconnect_if_down (pg->conn); return GNUNET_OK; } db_conn = GNUNET_PQ_connect_with_cfg (pg->cfg, "auditordb-postgres", NULL, es, NULL); if (NULL == db_conn) return GNUNET_SYSERR; pg->conn = db_conn; pg->prep_gen++; return GNUNET_OK; } /** * Do a pre-flight check that we are not in an uncommitted transaction. * If we are, rollback the previous transaction and output a warning. * * @param cls the `struct PostgresClosure` with the plugin-specific state * @return #GNUNET_OK on success, * #GNUNET_NO if we rolled back an earlier transaction * #GNUNET_SYSERR if we have no DB connection */ static enum GNUNET_GenericReturnValue postgres_preflight (void *cls) { struct PostgresClosure *pg = cls; struct GNUNET_PQ_ExecuteStatement es[] = { GNUNET_PQ_make_execute ("ROLLBACK"), GNUNET_PQ_EXECUTE_STATEMENT_END }; if (NULL == pg->conn) { if (GNUNET_OK != setup_connection (pg)) { GNUNET_break (0); return GNUNET_SYSERR; } } if (NULL == pg->transaction_name) return GNUNET_OK; /* all good */ if (GNUNET_OK == GNUNET_PQ_exec_statements (pg->conn, es)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "BUG: Preflight check rolled back transaction `%s'!\n", pg->transaction_name); } else { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "BUG: Preflight check failed to rollback transaction `%s'!\n", pg->transaction_name); } pg->transaction_name = NULL; return GNUNET_NO; } /** * Start a transaction. * * @param cls the `struct PostgresClosure` with the plugin-specific state * @return #GNUNET_OK on success */ static enum GNUNET_GenericReturnValue postgres_start (void *cls) { struct PostgresClosure *pg = cls; struct GNUNET_PQ_ExecuteStatement es[] = { GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL SERIALIZABLE"), GNUNET_PQ_EXECUTE_STATEMENT_END }; postgres_preflight (cls); if (GNUNET_OK != GNUNET_PQ_exec_statements (pg->conn, es)) { TALER_LOG_ERROR ("Failed to start transaction\n"); GNUNET_break (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Roll back the current transaction of a database connection. * * @param cls the `struct PostgresClosure` with the plugin-specific state */ static void postgres_rollback (void *cls) { struct PostgresClosure *pg = cls; struct GNUNET_PQ_ExecuteStatement es[] = { GNUNET_PQ_make_execute ("ROLLBACK"), GNUNET_PQ_EXECUTE_STATEMENT_END }; GNUNET_break (GNUNET_OK == GNUNET_PQ_exec_statements (pg->conn, es)); } /** * Commit the current transaction of a database connection. * * @param cls the `struct PostgresClosure` with the plugin-specific state * @return transaction status code */ enum GNUNET_DB_QueryStatus postgres_commit (void *cls) { struct PostgresClosure *pg = cls; struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_end }; PREPARE (pg, "do_commit", "COMMIT"); return GNUNET_PQ_eval_prepared_non_select (pg->conn, "do_commit", params); } /** * Function called to perform "garbage collection" on the * database, expiring records we no longer require. * * @param cls closure * @return #GNUNET_OK on success, * #GNUNET_SYSERR on DB errors */ static enum GNUNET_GenericReturnValue postgres_gc (void *cls) { struct PostgresClosure *pg = cls; struct GNUNET_TIME_Absolute now = {0}; struct GNUNET_PQ_QueryParam params_time[] = { GNUNET_PQ_query_param_absolute_time (&now), GNUNET_PQ_query_param_end }; struct GNUNET_PQ_Context *conn; enum GNUNET_DB_QueryStatus qs; struct GNUNET_PQ_PreparedStatement ps[] = { GNUNET_PQ_PREPARED_STATEMENT_END }; struct GNUNET_PQ_ExecuteStatement es[] = { GNUNET_PQ_make_try_execute ("SET search_path TO auditor;"), GNUNET_PQ_EXECUTE_STATEMENT_END }; now = GNUNET_TIME_absolute_get (); conn = GNUNET_PQ_connect_with_cfg (pg->cfg, "auditordb-postgres", NULL, es, ps); if (NULL == conn) return GNUNET_SYSERR; GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "TODO: Auditor GC not implemented (#4960)\n"); qs = GNUNET_PQ_eval_prepared_non_select (conn, "gc_auditor", params_time); GNUNET_PQ_disconnect (conn); if (0 > qs) { GNUNET_break (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Initialize Postgres database subsystem. * * @param cls a configuration instance * @return NULL on error, otherwise a `struct TALER_AUDITORDB_Plugin` */ void * libtaler_plugin_auditordb_postgres_init (void *cls) { const struct GNUNET_CONFIGURATION_Handle *cfg = cls; struct PostgresClosure *pg; struct TALER_AUDITORDB_Plugin *plugin; pg = GNUNET_new (struct PostgresClosure); pg->cfg = cfg; if (GNUNET_OK != TALER_config_get_currency (cfg, &pg->currency)) { GNUNET_free (pg); return NULL; } plugin = GNUNET_new (struct TALER_AUDITORDB_Plugin); plugin->cls = pg; plugin->preflight = &postgres_preflight; plugin->drop_tables = &postgres_drop_tables; plugin->create_tables = &postgres_create_tables; plugin->event_listen = &postgres_event_listen; plugin->event_listen_cancel = &postgres_event_listen_cancel; plugin->event_notify = &postgres_event_notify; plugin->start = &postgres_start; plugin->commit = &postgres_commit; plugin->rollback = &postgres_rollback; plugin->gc = &postgres_gc; plugin->get_auditor_progress = &TAH_PG_get_auditor_progress; plugin->get_balance = &TAH_PG_get_balance; plugin->insert_auditor_progress = &TAH_PG_insert_auditor_progress; plugin->insert_balance = &TAH_PG_insert_balance; plugin->update_auditor_progress = &TAH_PG_update_auditor_progress; plugin->update_balance = &TAH_PG_update_balance; plugin->insert_exchange_signkey = &TAH_PG_insert_exchange_signkey; plugin->insert_deposit_confirmation = &TAH_PG_insert_deposit_confirmation; plugin->get_deposit_confirmations = &TAH_PG_get_deposit_confirmations; plugin->delete_deposit_confirmation = &TAH_PG_delete_deposit_confirmation; plugin->insert_reserve_info = &TAH_PG_insert_reserve_info; plugin->update_reserve_info = &TAH_PG_update_reserve_info; plugin->get_reserve_info = &TAH_PG_get_reserve_info; plugin->del_reserve_info = &TAH_PG_del_reserve_info; plugin->insert_pending_deposit = &TAH_PG_insert_pending_deposit; plugin->select_pending_deposits = &TAH_PG_select_pending_deposits; plugin->delete_pending_deposit = &TAH_PG_delete_pending_deposit; plugin->insert_purse_info = &TAH_PG_insert_purse_info; plugin->update_purse_info = &TAH_PG_update_purse_info; plugin->get_purse_info = &TAH_PG_get_purse_info; plugin->delete_purse_info = &TAH_PG_delete_purse_info; plugin->select_purse_expired = &TAH_PG_select_purse_expired; plugin->insert_denomination_balance = &TAH_PG_insert_denomination_balance; plugin->update_denomination_balance = &TAH_PG_update_denomination_balance; plugin->del_denomination_balance = &TAH_PG_del_denomination_balance; plugin->get_denomination_balance = &TAH_PG_get_denomination_balance; plugin->insert_historic_denom_revenue = &TAH_PG_insert_historic_denom_revenue; plugin->select_historic_denom_revenue = &TAH_PG_select_historic_denom_revenue; plugin->insert_historic_reserve_revenue = &TAH_PG_insert_historic_reserve_revenue; plugin->select_historic_reserve_revenue = &TAH_PG_select_historic_reserve_revenue; return plugin; } /** * Shutdown Postgres database subsystem. * * @param cls a `struct TALER_AUDITORDB_Plugin` * @return NULL (always) */ void * libtaler_plugin_auditordb_postgres_done (void *cls) { struct TALER_AUDITORDB_Plugin *plugin = cls; struct PostgresClosure *pg = plugin->cls; if (NULL != pg->conn) GNUNET_PQ_disconnect (pg->conn); GNUNET_free (pg->currency); GNUNET_free (pg); GNUNET_free (plugin); return NULL; } /* end of plugin_auditordb_postgres.c */