commit 2c5447ad30978dae029396722fb343916a1b6b9d
parent 7488381babb7ceee14a89b01fee748e9c4ab4a90
Author: Christian Grothoff <christian@grothoff.org>
Date: Fri, 26 Dec 2025 15:13:08 +0100
implement lookup_reports_pending
Diffstat:
3 files changed, 273 insertions(+), 0 deletions(-)
diff --git a/src/backenddb/pg_lookup_reports_pending.c b/src/backenddb/pg_lookup_reports_pending.c
@@ -25,9 +25,151 @@
#include "pg_lookup_reports_pending.h"
#include "pg_helper.h"
+
+/**
+ * Context used for TMH_PG_lookup_reports_pending().
+ */
+struct SelectReportsContext
+{
+ /**
+ * Function to call with the results.
+ */
+ TALER_MERCHANTDB_ReportsPendingCallback cb;
+
+ /**
+ * Closure for @a cb.
+ */
+ void *cb_cls;
+
+ /**
+ * Did database result extraction fail?
+ */
+ bool extract_failed;
+};
+
+
+/**
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results about reports.
+ *
+ * @param[in,out] cls of type `struct SelectReportsContext *`
+ * @param result the postgres result
+ * @param num_results the number of results in @a result
+ */
+static void
+select_pending_reports_cb (void *cls,
+ PGresult *result,
+ unsigned int num_results)
+{
+ struct SelectReportsContext *plc = cls;
+
+ for (unsigned int i = 0; i < num_results; i++)
+ {
+ char *instance_id;
+ uint64_t report_serial;
+ char *report_program_section;
+ char *report_description;
+ char *mime_type;
+ char *data_source;
+ char *target_address;
+ struct GNUNET_TIME_Relative frequency;
+ struct GNUNET_TIME_Relative frequency_shift;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_string ("merchant_id",
+ &instance_id),
+ GNUNET_PQ_result_spec_uint64 ("report_serial",
+ &report_serial),
+ GNUNET_PQ_result_spec_string ("report_program_section",
+ &report_program_section),
+ GNUNET_PQ_result_spec_string ("report_description",
+ &report_description),
+ GNUNET_PQ_result_spec_string ("mime_type",
+ &mime_type),
+ GNUNET_PQ_result_spec_string ("data_source",
+ &data_source),
+ GNUNET_PQ_result_spec_string ("target_address",
+ &target_address),
+ GNUNET_PQ_result_spec_relative_time ("frequency",
+ &frequency),
+ GNUNET_PQ_result_spec_relative_time ("frequency_shift",
+ &frequency_shift),
+ GNUNET_PQ_result_spec_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_PQ_extract_result (result,
+ rs,
+ i))
+ {
+ GNUNET_break (0);
+ plc->extract_failed = true;
+ return;
+ }
+ plc->cb (plc->cb_cls,
+ instance_id,
+ report_serial,
+ report_program_section,
+ report_description,
+ mime_type,
+ data_source,
+ target_address,
+ frequency,
+ frequency_shift);
+ GNUNET_PQ_cleanup_result (rs);
+ }
+}
+
+
enum GNUNET_DB_QueryStatus
TMH_PG_lookup_reports_pending (void *cls,
TALER_MERCHANTDB_ReportsPendingCallback cb,
void *cb_cls)
{
+ struct PostgresClosure *pg = cls;
+ struct SelectReportsContext plc = {
+ .cb = cb,
+ .cb_cls = cb_cls,
+ /* Can be overwritten by the lookup_reports_cb */
+ .extract_failed = false,
+ };
+ struct GNUNET_TIME_Absolute now
+ = GNUNET_TIME_absolute_get ();
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_end
+ };
+ enum GNUNET_DB_QueryStatus qs;
+
+ check_connection (pg);
+ PREPARE (pg,
+ "lookup_reports_pending",
+ "SELECT"
+ " mi.merchant_id"
+ " ,mr.report_serial"
+ " ,mr.report_program_section"
+ " ,mr.report_description"
+ " ,mr.mime_type"
+ " ,mr.data_source"
+ " ,mr.target_address"
+ " ,mr.frequency"
+ " ,mr.frequency_shift"
+ " FROM merchant_reports mr"
+ " JOIN merchant_instances mi"
+ " USING (merchant_serial)"
+ " WHERE next_transmission <= $1"
+ " ORDER BY next_transmission ASC"
+ " LIMIT 1000");
+ qs = GNUNET_PQ_eval_prepared_multi_select (
+ pg->conn,
+ "lookup_reports_pending",
+ params,
+ &select_pending_reports_cb,
+ &plc);
+ /* If there was an error inside select_pending_reports_cb, return a hard error. */
+ if (plc.extract_failed)
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ return qs;
}
diff --git a/src/backenddb/pg_select_reports.c b/src/backenddb/pg_select_reports.c
@@ -25,6 +25,77 @@
#include "pg_select_reports.h"
#include "pg_helper.h"
+
+/**
+ * Context used for TMH_PG_select_reports().
+ */
+struct SelectReportsContext
+{
+ /**
+ * Function to call with the results.
+ */
+ TALER_MERCHANTDB_ReportsCallback cb;
+
+ /**
+ * Closure for @a cb.
+ */
+ void *cb_cls;
+
+ /**
+ * Did database result extraction fail?
+ */
+ bool extract_failed;
+};
+
+
+/**
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results about reports.
+ *
+ * @param[in,out] cls of type `struct SelectReportsContext *`
+ * @param result the postgres result
+ * @param num_results the number of results in @a result
+ */
+static void
+select_reports_cb (void *cls,
+ PGresult *result,
+ unsigned int num_results)
+{
+ struct SelectReportsContext *plc = cls;
+
+ for (unsigned int i = 0; i < num_results; i++)
+ {
+ uint64_t report_serial;
+ char *report_description;
+ struct GNUNET_TIME_Relative frequency;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("report_serial",
+ &report_serial),
+ GNUNET_PQ_result_spec_string ("report_description",
+ &report_description),
+ GNUNET_PQ_result_spec_relative_time ("frequency",
+ &frequency),
+ GNUNET_PQ_result_spec_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_PQ_extract_result (result,
+ rs,
+ i))
+ {
+ GNUNET_break (0);
+ plc->extract_failed = true;
+ return;
+ }
+ plc->cb (plc->cb_cls,
+ report_serial,
+ report_description,
+ frequency);
+ GNUNET_PQ_cleanup_result (rs);
+ }
+}
+
+
enum GNUNET_DB_QueryStatus
TMH_PG_select_reports (void *cls,
const char *instance_id,
@@ -33,4 +104,62 @@ TMH_PG_select_reports (void *cls,
TALER_MERCHANTDB_ReportsCallback cb,
void *cb_cls)
{
+ struct PostgresClosure *pg = cls;
+ uint64_t plimit = (uint64_t) ((limit < 0) ? -limit : limit);
+ struct SelectReportsContext plc = {
+ .cb = cb,
+ .cb_cls = cb_cls,
+ /* Can be overwritten by the select_reports_cb */
+ .extract_failed = false,
+ };
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (instance_id),
+ GNUNET_PQ_query_param_uint64 (&offset),
+ GNUNET_PQ_query_param_uint64 (&plimit),
+ GNUNET_PQ_query_param_end
+ };
+ enum GNUNET_DB_QueryStatus qs;
+
+ check_connection (pg);
+ PREPARE (pg,
+ "select_reports_asc",
+ "SELECT"
+ " report_serial"
+ " ,report_description"
+ " ,frequency"
+ " FROM merchant_reports"
+ " JOIN merchant_instances"
+ " USING (merchant_serial)"
+ " WHERE merchant_instances.merchant_id=$1"
+ " AND report_serial > $2"
+ " ORDER BY report_serial ASC"
+ " LIMIT $3");
+ PREPARE (pg,
+ "select_reports_desc",
+ "SELECT"
+ " report_serial"
+ " ,report_description"
+ " ,frequency"
+ " FROM merchant_reports"
+ " JOIN merchant_instances"
+ " USING (merchant_serial)"
+ " WHERE merchant_instances.merchant_id=$1"
+ " AND report_serial < $2"
+ " ORDER BY report_serial DESC"
+ " LIMIT $3");
+ qs = GNUNET_PQ_eval_prepared_multi_select (
+ pg->conn,
+ (limit > 0)
+ ? "select_reports_asc"
+ : "select_reports_desc",
+ params,
+ &select_reports_cb,
+ &plc);
+ /* If there was an error inside select_reports_cb, return a hard error. */
+ if (plc.extract_failed)
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ return qs;
}
diff --git a/src/include/taler_merchantdb_plugin.h b/src/include/taler_merchantdb_plugin.h
@@ -1369,6 +1369,7 @@ typedef void
* Typically called by `lookup_reports_pending`.
*
* @param cls closure
+ * @param instance_id name of the instance
* @param report_id serial number of the report
* @param report_program_section configuration section of program
* for report generation
@@ -1384,6 +1385,7 @@ typedef void
typedef void
(*TALER_MERCHANTDB_ReportsPendingCallback)(
void *cls,
+ const char *instance_id,
uint64_t report_id,
const char *report_program_section,
const char *report_description,