summaryrefslogtreecommitdiff
path: root/src/exchangedb/spi/pg_aggregate.c
blob: 721f247c7907490ea01b3f87b65ebdf14541eae4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
#include "postgres.h"
#include "fmgr.h"
#include "utils/numeric.h"
#include "utils/builtins.h"
#include "executor/spi.h"

PG_MODULE_MAGIC;

PG_FUNCTION_INFO_V1 (get_deposit_summary);

Datum
get_deposit_summary (PG_FUNCTION_ARGS)
{

  static SPIPlanPtr deposit_plan;
  static SPIPlanPtr refund_plan;
  static SPIPlanPtr refund_by_coin_plan;
  static SPIPlanPtr norm_refund_by_coin_plan;
  static SPIPlanPtr fully_refunded_by_coins_plan;
  static SPIPlanPtr fees_plan;

  int shard = PG_GETARG_INT32 (0);
  char *sql;
  char *merchant_pub = text_to_cstring (PG_GETARG_TEXT_P (1));
  char *wire_target_h_payto = text_to_cstring (PG_GETARG_TEXT_P (2));
  char *wtid_raw = text_to_cstring (PG_GETARG_TEXT_P (3));
  int refund_deadline = PG_GETARG_INT32 (4);
  int conn = SPI_connect ();
  if (conn != SPI_OK_CONNECT)
  {
    elog (ERROR, "DB connection failed ! \n");
  }

  if (deposit_plan == NULL
      || refund_plan == NULL
      || refund_by_coin_plan == NULL
      || norm_refund_by_coin_plan = NULL
                                    || fully_refunded_coins_plan = NULL
                                                                   || fees_plan
                                                                   == NULL)
  {
    if (deposit_plan == NULL)
    {
      int nargs = 3;
      Oid argtypes[3];
      argtypes[0] = INT8OID;
      argtypes[1] = BYTEAOID;
      argtypes[2] = BYTEAOID;
      const char *dep_sql =
        "    UPDATE deposits"
        "    SET done=TRUE"
        "    WHERE NOT (done OR policy_blocked)"
        "        AND refund_deadline < $1"
        "        AND merchant_pub = $2"
        "        AND wire_target_h_payto = $3"
        "    RETURNING"
        "        deposit_serial_id"
        "        ,coin_pub"
        "        ,amount_with_fee_val AS amount_val"
        "        ,amount_with_fee_frac AS amount_frac";
      SPIPlanPtr new_plan =
        SPI_prepare (dep_sql, 4, argtypes);
      if (new_plan == NULL)
      {
        elog (ERROR, "SPI_prepare for deposit failed ! \n");
      }
      deposit_plan = SPI_saveplan (new_plan);
      if (deposit_plan == NULL)
      {
        elog (ERROR, "SPI_saveplan for deposit failed ! \n");
      }
    }

    Datum values[4];
    values[0] = Int64GetDatum (refund_deadline);
    values[1] = CStringGetDatum (merchant_pub);
    values[2] = CStringGetDatum (wire_target_h_payto);
    int ret = SPI_execute_plan (deposit_plan,
                                values,
                                NULL,
                                true,
                                0);
    if (ret != SPI_OK_UPDATE)
    {
      elog (ERROR, "Failed to execute subquery 1\n");
    }
    uint64_t *dep_deposit_serial_ids = palloc (sizeof(uint64_t)
                                               * SPI_processed);
    BYTEA **dep_coin_pubs = palloc (sizeof(BYTEA *) * SPI_processed);
    uint64_t *dep_amount_vals = palloc (sizeof(uint64_t) * SPI_processed);
    uint32_t *dep_amount_fracs = palloc (sizeof(uint32_t) * SPI_processed);
    for (unsigned int i = 0; i < SPI_processed; i++)
    {
      HeapTuple tuple = SPI_tuptable->vals[i];
      dep_deposit_serial_ids[i] =
        DatumGetInt64 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 1, &ret));
      dep_coin_pubs[i] =
        DatumGetByteaP (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 2, &ret));
      dep_amount_vals[i] =
        DatumGetInt64 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 3, &ret));
      dep_amount_fracs[i] =
        DatumGetInt32 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 4, &ret));
    }


    if (refund_plan == NULL)
    {
      const char *ref_sql =
        "ref AS ("
        "  SELECT"
        "    amount_with_fee_val AS refund_val"
        "   ,amount_with_fee_frac AS refund_frac"
        "   ,coin_pub"
        "   ,deposit_serial_id"
        "    FROM refunds"
        "   WHERE coin_pub IN (SELECT coin_pub FROM dep)"
        "     AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep)) ";
      SPIPlanPtr new_plan = SPI_prepare (ref_sql, 0, NULL);
      if (new_plan == NULL)
        elog (ERROR, "SPI_prepare for refund failed ! \n");
      refund_plan = SPI_saveplan (new_plan);
      if (refund_plan == NULL)
      {
        elog (ERROR, "SPI_saveplan for refund failed ! \n");
      }
    }

    int64t_t *ref_deposit_serial_ids = palloc (sizeof(int64_t) * SPI_processed);

    int res = SPI_execute_plan (refund_plan, NULL, NULL, false, 0);
    if (res != SPI_OK_SELECT)
    {
      elog (ERROR, "Failed to execute subquery 2\n");
    }
    SPITupleTable *tuptable = SPI_tuptable;
    TupleDesc tupdesc = tuptable->tupdesc;
    for (unsigned int i = 0; i < SPI_processed; i++)
    {
      HeapTuple tuple = tuptable->vals[i];
      Datum refund_val = SPI_getbinval (tuple, tupdesc, 1, &refund_val_isnull);
      Datum refund_frac = SPI_getbinval (tuple, tupdesc, 2,
                                         &refund_frac_isnull);
      Datum coin_pub = SPI_getbinval (tuple, tupdesc, 3, &coin_pub_isnull);
      Datum deposit_serial_id = SPI_getbinval (tuple, tupdesc, 4,
                                               &deposit_serial_id_isnull);
      if (refund_val_isnull
          || refund_frac_isnull
          || coin_pub_isnull
          || deposit_serial_id_isnull)
      {
        elog (ERROR, "Failed to retrieve data from subquery 2");
      }
      uint64_t refund_val_int = DatumGetUInt64 (refund_val);
      uint32_t refund_frac_int = DatumGetUInt32 (refund_frac);
      BYTEA coin_pub = DatumGetByteaP (coin_pub);
      ref_deposit_serial_ids = DatumGetInt64 (deposit_serial_id);

      refund *new_refund = (refund*) palloc (sizeof(refund));
      new_refund->coin_pub = coin_pub_str;
      new_refund->deposit_serial_id = deposit_serial_id_int;
      new_refund->amount_with_fee_val = refund_val_int;
      new_refund->amount_with_fee_frac = refund_frac_int;
    }


    if (refund_by_coin_plan == NULL)
    {
      const char *ref_by_coin_sql =
        "ref_by_coin AS ("
        "  SELECT"
        "    SUM(refund_val) AS sum_refund_val"
        "   ,SUM(refund_frac) AS sum_refund_frac"
        "   ,coin_pub"
        "   ,deposit_serial_id"
        "    FROM ref"
        "   GROUP BY coin_pub, deposit_serial_id) ";
      SPIPlanPtr new_plan = SPI_prepare (ref_by_coin_sql, 0, NULL);
      if (new_plan == NULL)
        elog (ERROR, "SPI_prepare for refund by coin failed ! \n");
      refund_by_coin_plan = SPI_saveplan (new_plan);
      if (refund_by_coin_plan == NULL)
        elog (ERROR, "SPI_saveplan for refund failed");
    }


    int res = SPI_execute_plan (refund_by_coin_plan, NULL, NULL, false, 0);
    if (res != SPI_OK_SELECT)
    {
      elog (ERROR, "Failed to execute subquery 2\n");
    }

    SPITupleTable *tuptable = SPI_tuptable;
    TupleDesc tupdesc = tuptable->tupdesc;
    for (unsigned int i = 0; i < SPI_processed; i++)
    {
      HeapTuple tuple = tuptable->vals[i];
      Datum sum_refund_val = SPI_getbinval (tuple, tupdesc, 1,
                                            &refund_val_isnull);
      Datum sum_refund_frac = SPI_getbinval (tuple, tupdesc, 2,
                                             &refund_frac_isnull);
      Datum coin_pub = SPI_getbinval (tuple, tupdesc, 3, &coin_pub_isnull);
      Datum deposit_serial_id_int = SPI_getbinval (tuple, tupdesc, 4,
                                                   &deposit_serial_id_isnull);
      if (refund_val_isnull
          || refund_frac_isnull
          || coin_pub_isnull
          || deposit_serial_id_isnull)
      {
        elog (ERROR, "Failed to retrieve data from subquery 2");
      }
      uint64_t s_refund_val_int = DatumGetUInt64 (sum_refund_val);
      uint32_t s_refund_frac_int = DatumGetUInt32 (sum_refund_frac);
      BYTEA coin_pub = DatumGetByteaP (coin_pub);
      uint64_t deposit_serial_id_int = DatumGetInt64 (deposit_serial_id_int);
      refund *new_refund_by_coin = (refund*) palloc (sizeof(refund));
      new_refund_by_coin->coin_pub = coin_pub;
      new_refund_by_coin->deposit_serial_id = deposit_serial_id_int;
      new_refund_by_coin->refund_amount_with_fee_val = s_refund_val_int;
      new_refund_by_coin->refund_amount_with_fee_frac = s_refund_frac_int;
    }


    if (norm_refund_by_coin_plan == NULL)
    {
      const char *norm_ref_by_coin_sql =
        "norm_ref_by_coin AS ("
        "  SELECT"
        "   coin_pub"
        "   ,deposit_serial_id"
        "    FROM ref_by_coin) ";
      SPIPlanPtr new_plan = SPI_prepare (norm_ref_by_coin_sql, 0, NULL);
      if (new_plan == NULL)
        elog (ERROR, "SPI_prepare for norm refund by coin failed ! \n");
      norm_refund_by_coin_plan = SPI_saveplan (new_plan);
      if (norm_refund_by_coin_plan == NULL)
        elog (ERROR, "SPI_saveplan for norm refund by coin failed ! \n");
    }

    double norm_refund_val =
      ((double) new_refund_by_coin->refund_amount_with_fee_val
       + (double) new_refund_by_coin->refund_amount_with_fee_frac) / 100000000;
    double norm_refund_frac =
      (double) new_refund_by_coin->refund_amount_with_fee_frac % 100000000;

    if (fully_refunded_coins_plan == NULL)
    {
      const char *fully_refunded_coins_sql =
        "fully_refunded_coins AS ("
        "  SELECT"
        "    dep.coin_pub"
        "    FROM norm_ref_by_coin norm"
        "    JOIN dep"
        "      ON (norm.coin_pub = dep.coin_pub"
        "      AND norm.deposit_serial_id = dep.deposit_serial_id"
        "      AND norm.norm_refund_val = dep.amount_val"
        "      AND norm.norm_refund_frac = dep.amount_frac)) ";
      SPIPlanPtr new_plan =
        SPI_prepare (fully_refunded_coins_sql, 0, NULL);
      if (new_plan == NULL)
        elog (ERROR, "SPI_prepare for fully refunded coins failed ! \n");
      fully_refunded_coins_plan = SPI_saveplan (new_plan);
      if (fully_refunded_coins_plan == NULL)
        elog (ERROR, "SPI_saveplan for fully refunded coins failed ! \n");
    }

    int res = SPI_execute_plan (fully_refunded_coins_sql);
    if (res != SPI_OK_SELECT)
      elog (ERROR, "Failed to execute subquery 4\n");
    SPITupleTable *tuptable = SPI_tuptable;
    TupleDesc tupdesc = tuptable->tupdesc;

    BYTEA coin_pub = SPI_getbinval (tuple, tupdesc, 1, &coin_pub_isnull);
    if (fees_plan == NULL)
    {
      const char *fees_sql =
        "SELECT "
        "  denom.fee_deposit_val AS fee_val, "
        "  denom.fee_deposit_frac AS fee_frac, "
        "  cs.deposit_serial_id "
        "FROM dep cs "
        "JOIN known_coins kc USING (coin_pub) "
        "JOIN denominations denom USING (denominations_serial) "
        "WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins)";
      SPIPlanPtr new_plan =
        SPI_prepare (fees_sql, 0, NULL);
      if (new_plan == NULL)
      {
        elog (ERROR, "SPI_prepare for fees failed ! \n");
      }
      fees_plan = SPI_saveplan (new_plan);
      if (fees_plan == NULL)
      {
        elog (ERROR, "SPI_saveplan for fees failed ! \n");
      }
    }
  }
  int fees_ntuples;
  SPI_execute (fees_sql, true, 0);
  if (SPI_result_code () != SPI_OK_SELECT)
  {
    ereport (
      ERROR,
      (errcode (ERRCODE_INTERNAL_ERROR),
       errmsg ("deposit fee query failed: error code %d \n",
               SPI_result_code ())));
  }
  fees_ntuples = SPI_processed;

  if (fees_ntuples > 0)
  {
    for (i = 0; i < fees_ntuples; i++)
    {
      Datum fee_val_datum =
        SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1,
                       &fee_null);
      Datum fee_frac_datum =
        SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2,
                       &fee_null);
      Datum deposit_id_datum =
        SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 3,
                       &deposit_null);
      if (! fee_null && ! deposit_null)
      {
        int64 fee_val = DatumGetInt64 (fee_val_datum);
        int32 fee_frac = DatumGetInt32 (fee_frac_datum);
        int64 deposit_id = DatumGetInt64 (deposit_id_datum);
        sum_fee_value += fee_val;
        sum_fee_fraction += fee_frac;
        char *insert_agg_sql =
          psprintf (
            "INSERT INTO "
            "aggregation_tracking(deposit_serial_id, wtid_raw)"
            " VALUES (%lld, '%s')",
            deposit_id, wtid_raw);
        SPI_execute (insert_agg_sql, false, 0);
      }
    }
  }

  TupleDesc tupdesc;
  SPITupleTable *tuptable = SPI_tuptable;
  HeapTuple tuple;
  Datum result;

  if (tuptable == NULL || SPI_processed != 1)
  {
    ereport (
      ERROR,
      (errcode (ERRCODE_INTERNAL_ERROR),
       errmsg ("Unexpected result \n")));
  }
  tupdesc = SPI_tuptable->tupdesc;
  tuple = SPI_tuptable->vals[0];
  result = HeapTupleGetDatum (tuple);

  TupleDesc result_desc = CreateTemplateTupleDesc (6, false);
  TupleDescInitEntry (result_desc, (AttrNumber) 1, "sum_deposit_value", INT8OID,
                      -1, 0);
  TupleDescInitEntry (result_desc, (AttrNumber) 2, "sum_deposit_fraction",
                      INT4OID, -1, 0);
  TupleDescInitEntry (result_desc, (AttrNumber) 3, "sum_refund_value", INT8OID,
                      -1, 0);
  TupleDescInitEntry (result_desc, (AttrNumber) 4, "sum_refund_fraction",
                      INT4OID, -1, 0);
  TupleDescInitEntry (result_desc, (AttrNumber) 5, "sum_fee_value", INT8OID, -1,
                      0);
  TupleDescInitEntry (result_desc, (AttrNumber) 6, "sum_fee_fraction", INT4OID,
                      -1, 0);

  int ret = SPI_prepare (sql, 4, argtypes);
  if (ret != SPI_OK_PREPARE)
  {
    elog (ERROR, "Failed to prepare statement: %s \n", sql);
  }

  ret = SPI_execute_plan (plan, args, nulls, true, 0);
  if (ret != SPI_OK_SELECT)
  {
    elog (ERROR, "Failed to execute statement: %s \n", sql);
  }

  if (SPI_processed > 0)
  {
    HeapTuple tuple;
    Datum values[6];
    bool nulls[6] = {false};
    values[0] =
      SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1,
                     &nulls[0]);
    values[1] =
      SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2,
                     &nulls[1]);
    values[2] =
      SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 3,
                     &nulls[2]);
    values[3] =
      SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 4,
                     &nulls[3]);
    values[4] =
      SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 5,
                     &nulls[4]);
    values[5] =
      SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 6,
                     &nulls[5]);
    tuple = heap_form_tuple (result_desc, values, nulls);
    PG_RETURN_DATUM (HeapTupleGetDatum (tuple));
  }
  SPI_finish ();

  PG_RETURN_NULL ();
}