exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

pg_aggregate.c (14718B)


      1 #include "postgres.h"
      2 #include "fmgr.h"
      3 #include "utils/numeric.h"
      4 #include "utils/builtins.h"
      5 #include "executor/spi.h"
      6 
      7 PG_MODULE_MAGIC;
      8 
      9 PG_FUNCTION_INFO_V1 (get_deposit_summary);
     10 
     11 Datum
     12 get_deposit_summary (PG_FUNCTION_ARGS)
     13 {
     14 
     15   static SPIPlanPtr deposit_plan;
     16   static SPIPlanPtr refund_plan;
     17   static SPIPlanPtr refund_by_coin_plan;
     18   static SPIPlanPtr norm_refund_by_coin_plan;
     19   static SPIPlanPtr fully_refunded_by_coins_plan;
     20   static SPIPlanPtr fees_plan;
     21 
     22   int shard = PG_GETARG_INT32 (0);
     23   char *sql;
     24   char *merchant_pub = text_to_cstring (PG_GETARG_TEXT_P (1));
     25   char *wire_target_h_payto = text_to_cstring (PG_GETARG_TEXT_P (2));
     26   char *wtid_raw = text_to_cstring (PG_GETARG_TEXT_P (3));
     27   int refund_deadline = PG_GETARG_INT32 (4);
     28   int conn = SPI_connect ();
     29   if (conn != SPI_OK_CONNECT)
     30   {
     31     elog (ERROR, "DB connection failed ! \n");
     32   }
     33 
     34   if (deposit_plan == NULL
     35       || refund_plan == NULL
     36       || refund_by_coin_plan == NULL
     37       || norm_refund_by_coin_plan = NULL
     38                                     || fully_refunded_coins_plan = NULL
     39                                                                    || fees_plan
     40                                                                    == NULL)
     41   {
     42     if (deposit_plan == NULL)
     43     {
     44       int nargs = 3;
     45       Oid argtypes[3];
     46       argtypes[0] = INT8OID;
     47       argtypes[1] = BYTEAOID;
     48       argtypes[2] = BYTEAOID;
     49       const char *dep_sql =
     50         "    UPDATE deposits"
     51         "    SET done=TRUE"
     52         "    WHERE NOT (done OR policy_blocked)"
     53         "        AND refund_deadline < $1"
     54         "        AND merchant_pub = $2"
     55         "        AND wire_target_h_payto = $3"
     56         "    RETURNING"
     57         "        deposit_serial_id"
     58         "        ,coin_pub"
     59         "        ,amount_with_fee_val AS amount_val"
     60         "        ,amount_with_fee_frac AS amount_frac";
     61       SPIPlanPtr new_plan =
     62         SPI_prepare (dep_sql, 4, argtypes);
     63       if (new_plan == NULL)
     64       {
     65         elog (ERROR, "SPI_prepare for deposit failed ! \n");
     66       }
     67       deposit_plan = SPI_saveplan (new_plan);
     68       if (deposit_plan == NULL)
     69       {
     70         elog (ERROR, "SPI_saveplan for deposit failed ! \n");
     71       }
     72     }
     73 
     74     Datum values[4];
     75     values[0] = Int64GetDatum (refund_deadline);
     76     values[1] = CStringGetDatum (merchant_pub);
     77     values[2] = CStringGetDatum (wire_target_h_payto);
     78     int ret = SPI_execute_plan (deposit_plan,
     79                                 values,
     80                                 NULL,
     81                                 true,
     82                                 0);
     83     if (ret != SPI_OK_UPDATE)
     84     {
     85       elog (ERROR, "Failed to execute subquery 1\n");
     86     }
     87     uint64_t *dep_deposit_serial_ids = palloc (sizeof(uint64_t)
     88                                                * SPI_processed);
     89     BYTEA **dep_coin_pubs = palloc (sizeof(BYTEA *) * SPI_processed);
     90     uint64_t *dep_amount_vals = palloc (sizeof(uint64_t) * SPI_processed);
     91     uint32_t *dep_amount_fracs = palloc (sizeof(uint32_t) * SPI_processed);
     92     for (unsigned int i = 0; i < SPI_processed; i++)
     93     {
     94       HeapTuple tuple = SPI_tuptable->vals[i];
     95       dep_deposit_serial_ids[i] =
     96         DatumGetInt64 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 1, &ret));
     97       dep_coin_pubs[i] =
     98         DatumGetByteaP (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 2, &ret));
     99       dep_amount_vals[i] =
    100         DatumGetInt64 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 3, &ret));
    101       dep_amount_fracs[i] =
    102         DatumGetInt32 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 4, &ret));
    103     }
    104 
    105 
    106     if (refund_plan == NULL)
    107     {
    108       const char *ref_sql =
    109         "ref AS ("
    110         "  SELECT"
    111         "    amount_with_fee_val AS refund_val"
    112         "   ,amount_with_fee_frac AS refund_frac"
    113         "   ,coin_pub"
    114         "   ,deposit_serial_id"
    115         "    FROM refunds"
    116         "   WHERE coin_pub IN (SELECT coin_pub FROM dep)"
    117         "     AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep)) ";
    118       SPIPlanPtr new_plan = SPI_prepare (ref_sql, 0, NULL);
    119       if (new_plan == NULL)
    120         elog (ERROR, "SPI_prepare for refund failed ! \n");
    121       refund_plan = SPI_saveplan (new_plan);
    122       if (refund_plan == NULL)
    123       {
    124         elog (ERROR, "SPI_saveplan for refund failed ! \n");
    125       }
    126     }
    127 
    128     int64t_t *ref_deposit_serial_ids = palloc (sizeof(int64_t) * SPI_processed);
    129 
    130     int res = SPI_execute_plan (refund_plan, NULL, NULL, false, 0);
    131     if (res != SPI_OK_SELECT)
    132     {
    133       elog (ERROR, "Failed to execute subquery 2\n");
    134     }
    135     SPITupleTable *tuptable = SPI_tuptable;
    136     TupleDesc tupdesc = tuptable->tupdesc;
    137     for (unsigned int i = 0; i < SPI_processed; i++)
    138     {
    139       HeapTuple tuple = tuptable->vals[i];
    140       Datum refund_val = SPI_getbinval (tuple, tupdesc, 1, &refund_val_isnull);
    141       Datum refund_frac = SPI_getbinval (tuple, tupdesc, 2,
    142                                          &refund_frac_isnull);
    143       Datum coin_pub = SPI_getbinval (tuple, tupdesc, 3, &coin_pub_isnull);
    144       Datum deposit_serial_id = SPI_getbinval (tuple, tupdesc, 4,
    145                                                &deposit_serial_id_isnull);
    146       if (refund_val_isnull
    147           || refund_frac_isnull
    148           || coin_pub_isnull
    149           || deposit_serial_id_isnull)
    150       {
    151         elog (ERROR, "Failed to retrieve data from subquery 2");
    152       }
    153       uint64_t refund_val_int = DatumGetUInt64 (refund_val);
    154       uint32_t refund_frac_int = DatumGetUInt32 (refund_frac);
    155       BYTEA coin_pub = DatumGetByteaP (coin_pub);
    156       ref_deposit_serial_ids = DatumGetInt64 (deposit_serial_id);
    157 
    158       refund *new_refund = (refund*) palloc (sizeof(refund));
    159       new_refund->coin_pub = coin_pub_str;
    160       new_refund->deposit_serial_id = deposit_serial_id_int;
    161       new_refund->amount_with_fee_val = refund_val_int;
    162       new_refund->amount_with_fee_frac = refund_frac_int;
    163     }
    164 
    165 
    166     if (refund_by_coin_plan == NULL)
    167     {
    168       const char *ref_by_coin_sql =
    169         "ref_by_coin AS ("
    170         "  SELECT"
    171         "    SUM(refund_val) AS sum_refund_val"
    172         "   ,SUM(refund_frac) AS sum_refund_frac"
    173         "   ,coin_pub"
    174         "   ,deposit_serial_id"
    175         "    FROM ref"
    176         "   GROUP BY coin_pub, deposit_serial_id) ";
    177       SPIPlanPtr new_plan = SPI_prepare (ref_by_coin_sql, 0, NULL);
    178       if (new_plan == NULL)
    179         elog (ERROR, "SPI_prepare for refund by coin failed ! \n");
    180       refund_by_coin_plan = SPI_saveplan (new_plan);
    181       if (refund_by_coin_plan == NULL)
    182         elog (ERROR, "SPI_saveplan for refund failed");
    183     }
    184 
    185 
    186     int res = SPI_execute_plan (refund_by_coin_plan, NULL, NULL, false, 0);
    187     if (res != SPI_OK_SELECT)
    188     {
    189       elog (ERROR, "Failed to execute subquery 2\n");
    190     }
    191 
    192     SPITupleTable *tuptable = SPI_tuptable;
    193     TupleDesc tupdesc = tuptable->tupdesc;
    194     for (unsigned int i = 0; i < SPI_processed; i++)
    195     {
    196       HeapTuple tuple = tuptable->vals[i];
    197       Datum sum_refund_val = SPI_getbinval (tuple, tupdesc, 1,
    198                                             &refund_val_isnull);
    199       Datum sum_refund_frac = SPI_getbinval (tuple, tupdesc, 2,
    200                                              &refund_frac_isnull);
    201       Datum coin_pub = SPI_getbinval (tuple, tupdesc, 3, &coin_pub_isnull);
    202       Datum deposit_serial_id_int = SPI_getbinval (tuple, tupdesc, 4,
    203                                                    &deposit_serial_id_isnull);
    204       if (refund_val_isnull
    205           || refund_frac_isnull
    206           || coin_pub_isnull
    207           || deposit_serial_id_isnull)
    208       {
    209         elog (ERROR, "Failed to retrieve data from subquery 2");
    210       }
    211       uint64_t s_refund_val_int = DatumGetUInt64 (sum_refund_val);
    212       uint32_t s_refund_frac_int = DatumGetUInt32 (sum_refund_frac);
    213       BYTEA coin_pub = DatumGetByteaP (coin_pub);
    214       uint64_t deposit_serial_id_int = DatumGetInt64 (deposit_serial_id_int);
    215       refund *new_refund_by_coin = (refund*) palloc (sizeof(refund));
    216       new_refund_by_coin->coin_pub = coin_pub;
    217       new_refund_by_coin->deposit_serial_id = deposit_serial_id_int;
    218       new_refund_by_coin->refund_amount_with_fee_val = s_refund_val_int;
    219       new_refund_by_coin->refund_amount_with_fee_frac = s_refund_frac_int;
    220     }
    221 
    222 
    223     if (norm_refund_by_coin_plan == NULL)
    224     {
    225       const char *norm_ref_by_coin_sql =
    226         "norm_ref_by_coin AS ("
    227         "  SELECT"
    228         "   coin_pub"
    229         "   ,deposit_serial_id"
    230         "    FROM ref_by_coin) ";
    231       SPIPlanPtr new_plan = SPI_prepare (norm_ref_by_coin_sql, 0, NULL);
    232       if (new_plan == NULL)
    233         elog (ERROR, "SPI_prepare for norm refund by coin failed ! \n");
    234       norm_refund_by_coin_plan = SPI_saveplan (new_plan);
    235       if (norm_refund_by_coin_plan == NULL)
    236         elog (ERROR, "SPI_saveplan for norm refund by coin failed ! \n");
    237     }
    238 
    239     double norm_refund_val =
    240       ((double) new_refund_by_coin->refund_amount_with_fee_val
    241        + (double) new_refund_by_coin->refund_amount_with_fee_frac) / 100000000;
    242     double norm_refund_frac =
    243       (double) new_refund_by_coin->refund_amount_with_fee_frac % 100000000;
    244 
    245     if (fully_refunded_coins_plan == NULL)
    246     {
    247       const char *fully_refunded_coins_sql =
    248         "fully_refunded_coins AS ("
    249         "  SELECT"
    250         "    dep.coin_pub"
    251         "    FROM norm_ref_by_coin norm"
    252         "    JOIN dep"
    253         "      ON (norm.coin_pub = dep.coin_pub"
    254         "      AND norm.deposit_serial_id = dep.deposit_serial_id"
    255         "      AND norm.norm_refund_val = dep.amount_val"
    256         "      AND norm.norm_refund_frac = dep.amount_frac)) ";
    257       SPIPlanPtr new_plan =
    258         SPI_prepare (fully_refunded_coins_sql, 0, NULL);
    259       if (new_plan == NULL)
    260         elog (ERROR, "SPI_prepare for fully refunded coins failed ! \n");
    261       fully_refunded_coins_plan = SPI_saveplan (new_plan);
    262       if (fully_refunded_coins_plan == NULL)
    263         elog (ERROR, "SPI_saveplan for fully refunded coins failed ! \n");
    264     }
    265 
    266     int res = SPI_execute_plan (fully_refunded_coins_sql);
    267     if (res != SPI_OK_SELECT)
    268       elog (ERROR, "Failed to execute subquery 4\n");
    269     SPITupleTable *tuptable = SPI_tuptable;
    270     TupleDesc tupdesc = tuptable->tupdesc;
    271 
    272     BYTEA coin_pub = SPI_getbinval (tuple, tupdesc, 1, &coin_pub_isnull);
    273     if (fees_plan == NULL)
    274     {
    275       const char *fees_sql =
    276         "SELECT "
    277         "  denom.fee_deposit_val AS fee_val, "
    278         "  denom.fee_deposit_frac AS fee_frac, "
    279         "  cs.deposit_serial_id "
    280         "FROM dep cs "
    281         "JOIN known_coins kc USING (coin_pub) "
    282         "JOIN denominations denom USING (denominations_serial) "
    283         "WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins)";
    284       SPIPlanPtr new_plan =
    285         SPI_prepare (fees_sql, 0, NULL);
    286       if (new_plan == NULL)
    287       {
    288         elog (ERROR, "SPI_prepare for fees failed ! \n");
    289       }
    290       fees_plan = SPI_saveplan (new_plan);
    291       if (fees_plan == NULL)
    292       {
    293         elog (ERROR, "SPI_saveplan for fees failed ! \n");
    294       }
    295     }
    296   }
    297   int fees_ntuples;
    298   SPI_execute (fees_sql, true, 0);
    299   if (SPI_result_code () != SPI_OK_SELECT)
    300   {
    301     ereport (
    302       ERROR,
    303       (errcode (ERRCODE_INTERNAL_ERROR),
    304        errmsg ("deposit fee query failed: error code %d \n",
    305                SPI_result_code ())));
    306   }
    307   fees_ntuples = SPI_processed;
    308 
    309   if (fees_ntuples > 0)
    310   {
    311     for (i = 0; i < fees_ntuples; i++)
    312     {
    313       Datum fee_val_datum =
    314         SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1,
    315                        &fee_null);
    316       Datum fee_frac_datum =
    317         SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2,
    318                        &fee_null);
    319       Datum deposit_id_datum =
    320         SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 3,
    321                        &deposit_null);
    322       if (! fee_null && ! deposit_null)
    323       {
    324         int64 fee_val = DatumGetInt64 (fee_val_datum);
    325         int32 fee_frac = DatumGetInt32 (fee_frac_datum);
    326         int64 deposit_id = DatumGetInt64 (deposit_id_datum);
    327         sum_fee_value += fee_val;
    328         sum_fee_fraction += fee_frac;
    329         char *insert_agg_sql =
    330           psprintf (
    331             "INSERT INTO "
    332             "aggregation_tracking(deposit_serial_id, wtid_raw)"
    333             " VALUES (%lld, '%s')",
    334             deposit_id, wtid_raw);
    335         SPI_execute (insert_agg_sql, false, 0);
    336       }
    337     }
    338   }
    339 
    340   TupleDesc tupdesc;
    341   SPITupleTable *tuptable = SPI_tuptable;
    342   HeapTuple tuple;
    343   Datum result;
    344 
    345   if (tuptable == NULL || SPI_processed != 1)
    346   {
    347     ereport (
    348       ERROR,
    349       (errcode (ERRCODE_INTERNAL_ERROR),
    350        errmsg ("Unexpected result \n")));
    351   }
    352   tupdesc = SPI_tuptable->tupdesc;
    353   tuple = SPI_tuptable->vals[0];
    354   result = HeapTupleGetDatum (tuple);
    355 
    356   TupleDesc result_desc = CreateTemplateTupleDesc (6, false);
    357   TupleDescInitEntry (result_desc, (AttrNumber) 1, "sum_deposit_value", INT8OID,
    358                       -1, 0);
    359   TupleDescInitEntry (result_desc, (AttrNumber) 2, "sum_deposit_fraction",
    360                       INT4OID, -1, 0);
    361   TupleDescInitEntry (result_desc, (AttrNumber) 3, "sum_refund_value", INT8OID,
    362                       -1, 0);
    363   TupleDescInitEntry (result_desc, (AttrNumber) 4, "sum_refund_fraction",
    364                       INT4OID, -1, 0);
    365   TupleDescInitEntry (result_desc, (AttrNumber) 5, "sum_fee_value", INT8OID, -1,
    366                       0);
    367   TupleDescInitEntry (result_desc, (AttrNumber) 6, "sum_fee_fraction", INT4OID,
    368                       -1, 0);
    369 
    370   int ret = SPI_prepare (sql, 4, argtypes);
    371   if (ret != SPI_OK_PREPARE)
    372   {
    373     elog (ERROR, "Failed to prepare statement: %s \n", sql);
    374   }
    375 
    376   ret = SPI_execute_plan (plan, args, nulls, true, 0);
    377   if (ret != SPI_OK_SELECT)
    378   {
    379     elog (ERROR, "Failed to execute statement: %s \n", sql);
    380   }
    381 
    382   if (SPI_processed > 0)
    383   {
    384     HeapTuple tuple;
    385     Datum values[6];
    386     bool nulls[6] = {false};
    387     values[0] =
    388       SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1,
    389                      &nulls[0]);
    390     values[1] =
    391       SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2,
    392                      &nulls[1]);
    393     values[2] =
    394       SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 3,
    395                      &nulls[2]);
    396     values[3] =
    397       SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 4,
    398                      &nulls[3]);
    399     values[4] =
    400       SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 5,
    401                      &nulls[4]);
    402     values[5] =
    403       SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 6,
    404                      &nulls[5]);
    405     tuple = heap_form_tuple (result_desc, values, nulls);
    406     PG_RETURN_DATUM (HeapTupleGetDatum (tuple));
    407   }
    408   SPI_finish ();
    409 
    410   PG_RETURN_NULL ();
    411 }