pg_aggregate.c (14718B)
1 #include "postgres.h" 2 #include "fmgr.h" 3 #include "utils/numeric.h" 4 #include "utils/builtins.h" 5 #include "executor/spi.h" 6 7 PG_MODULE_MAGIC; 8 9 PG_FUNCTION_INFO_V1 (get_deposit_summary); 10 11 Datum 12 get_deposit_summary (PG_FUNCTION_ARGS) 13 { 14 15 static SPIPlanPtr deposit_plan; 16 static SPIPlanPtr refund_plan; 17 static SPIPlanPtr refund_by_coin_plan; 18 static SPIPlanPtr norm_refund_by_coin_plan; 19 static SPIPlanPtr fully_refunded_by_coins_plan; 20 static SPIPlanPtr fees_plan; 21 22 int shard = PG_GETARG_INT32 (0); 23 char *sql; 24 char *merchant_pub = text_to_cstring (PG_GETARG_TEXT_P (1)); 25 char *wire_target_h_payto = text_to_cstring (PG_GETARG_TEXT_P (2)); 26 char *wtid_raw = text_to_cstring (PG_GETARG_TEXT_P (3)); 27 int refund_deadline = PG_GETARG_INT32 (4); 28 int conn = SPI_connect (); 29 if (conn != SPI_OK_CONNECT) 30 { 31 elog (ERROR, "DB connection failed ! \n"); 32 } 33 34 if (deposit_plan == NULL 35 || refund_plan == NULL 36 || refund_by_coin_plan == NULL 37 || norm_refund_by_coin_plan = NULL 38 || fully_refunded_coins_plan = NULL 39 || fees_plan 40 == NULL) 41 { 42 if (deposit_plan == NULL) 43 { 44 int nargs = 3; 45 Oid argtypes[3]; 46 argtypes[0] = INT8OID; 47 argtypes[1] = BYTEAOID; 48 argtypes[2] = BYTEAOID; 49 const char *dep_sql = 50 " UPDATE deposits" 51 " SET done=TRUE" 52 " WHERE NOT (done OR policy_blocked)" 53 " AND refund_deadline < $1" 54 " AND merchant_pub = $2" 55 " AND wire_target_h_payto = $3" 56 " RETURNING" 57 " deposit_serial_id" 58 " ,coin_pub" 59 " ,amount_with_fee_val AS amount_val" 60 " ,amount_with_fee_frac AS amount_frac"; 61 SPIPlanPtr new_plan = 62 SPI_prepare (dep_sql, 4, argtypes); 63 if (new_plan == NULL) 64 { 65 elog (ERROR, "SPI_prepare for deposit failed ! \n"); 66 } 67 deposit_plan = SPI_saveplan (new_plan); 68 if (deposit_plan == NULL) 69 { 70 elog (ERROR, "SPI_saveplan for deposit failed ! \n"); 71 } 72 } 73 74 Datum values[4]; 75 values[0] = Int64GetDatum (refund_deadline); 76 values[1] = CStringGetDatum (merchant_pub); 77 values[2] = CStringGetDatum (wire_target_h_payto); 78 int ret = SPI_execute_plan (deposit_plan, 79 values, 80 NULL, 81 true, 82 0); 83 if (ret != SPI_OK_UPDATE) 84 { 85 elog (ERROR, "Failed to execute subquery 1\n"); 86 } 87 uint64_t *dep_deposit_serial_ids = palloc (sizeof(uint64_t) 88 * SPI_processed); 89 BYTEA **dep_coin_pubs = palloc (sizeof(BYTEA *) * SPI_processed); 90 uint64_t *dep_amount_vals = palloc (sizeof(uint64_t) * SPI_processed); 91 uint32_t *dep_amount_fracs = palloc (sizeof(uint32_t) * SPI_processed); 92 for (unsigned int i = 0; i < SPI_processed; i++) 93 { 94 HeapTuple tuple = SPI_tuptable->vals[i]; 95 dep_deposit_serial_ids[i] = 96 DatumGetInt64 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 1, &ret)); 97 dep_coin_pubs[i] = 98 DatumGetByteaP (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 2, &ret)); 99 dep_amount_vals[i] = 100 DatumGetInt64 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 3, &ret)); 101 dep_amount_fracs[i] = 102 DatumGetInt32 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 4, &ret)); 103 } 104 105 106 if (refund_plan == NULL) 107 { 108 const char *ref_sql = 109 "ref AS (" 110 " SELECT" 111 " amount_with_fee_val AS refund_val" 112 " ,amount_with_fee_frac AS refund_frac" 113 " ,coin_pub" 114 " ,deposit_serial_id" 115 " FROM refunds" 116 " WHERE coin_pub IN (SELECT coin_pub FROM dep)" 117 " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep)) "; 118 SPIPlanPtr new_plan = SPI_prepare (ref_sql, 0, NULL); 119 if (new_plan == NULL) 120 elog (ERROR, "SPI_prepare for refund failed ! \n"); 121 refund_plan = SPI_saveplan (new_plan); 122 if (refund_plan == NULL) 123 { 124 elog (ERROR, "SPI_saveplan for refund failed ! \n"); 125 } 126 } 127 128 int64t_t *ref_deposit_serial_ids = palloc (sizeof(int64_t) * SPI_processed); 129 130 int res = SPI_execute_plan (refund_plan, NULL, NULL, false, 0); 131 if (res != SPI_OK_SELECT) 132 { 133 elog (ERROR, "Failed to execute subquery 2\n"); 134 } 135 SPITupleTable *tuptable = SPI_tuptable; 136 TupleDesc tupdesc = tuptable->tupdesc; 137 for (unsigned int i = 0; i < SPI_processed; i++) 138 { 139 HeapTuple tuple = tuptable->vals[i]; 140 Datum refund_val = SPI_getbinval (tuple, tupdesc, 1, &refund_val_isnull); 141 Datum refund_frac = SPI_getbinval (tuple, tupdesc, 2, 142 &refund_frac_isnull); 143 Datum coin_pub = SPI_getbinval (tuple, tupdesc, 3, &coin_pub_isnull); 144 Datum deposit_serial_id = SPI_getbinval (tuple, tupdesc, 4, 145 &deposit_serial_id_isnull); 146 if (refund_val_isnull 147 || refund_frac_isnull 148 || coin_pub_isnull 149 || deposit_serial_id_isnull) 150 { 151 elog (ERROR, "Failed to retrieve data from subquery 2"); 152 } 153 uint64_t refund_val_int = DatumGetUInt64 (refund_val); 154 uint32_t refund_frac_int = DatumGetUInt32 (refund_frac); 155 BYTEA coin_pub = DatumGetByteaP (coin_pub); 156 ref_deposit_serial_ids = DatumGetInt64 (deposit_serial_id); 157 158 refund *new_refund = (refund*) palloc (sizeof(refund)); 159 new_refund->coin_pub = coin_pub_str; 160 new_refund->deposit_serial_id = deposit_serial_id_int; 161 new_refund->amount_with_fee_val = refund_val_int; 162 new_refund->amount_with_fee_frac = refund_frac_int; 163 } 164 165 166 if (refund_by_coin_plan == NULL) 167 { 168 const char *ref_by_coin_sql = 169 "ref_by_coin AS (" 170 " SELECT" 171 " SUM(refund_val) AS sum_refund_val" 172 " ,SUM(refund_frac) AS sum_refund_frac" 173 " ,coin_pub" 174 " ,deposit_serial_id" 175 " FROM ref" 176 " GROUP BY coin_pub, deposit_serial_id) "; 177 SPIPlanPtr new_plan = SPI_prepare (ref_by_coin_sql, 0, NULL); 178 if (new_plan == NULL) 179 elog (ERROR, "SPI_prepare for refund by coin failed ! \n"); 180 refund_by_coin_plan = SPI_saveplan (new_plan); 181 if (refund_by_coin_plan == NULL) 182 elog (ERROR, "SPI_saveplan for refund failed"); 183 } 184 185 186 int res = SPI_execute_plan (refund_by_coin_plan, NULL, NULL, false, 0); 187 if (res != SPI_OK_SELECT) 188 { 189 elog (ERROR, "Failed to execute subquery 2\n"); 190 } 191 192 SPITupleTable *tuptable = SPI_tuptable; 193 TupleDesc tupdesc = tuptable->tupdesc; 194 for (unsigned int i = 0; i < SPI_processed; i++) 195 { 196 HeapTuple tuple = tuptable->vals[i]; 197 Datum sum_refund_val = SPI_getbinval (tuple, tupdesc, 1, 198 &refund_val_isnull); 199 Datum sum_refund_frac = SPI_getbinval (tuple, tupdesc, 2, 200 &refund_frac_isnull); 201 Datum coin_pub = SPI_getbinval (tuple, tupdesc, 3, &coin_pub_isnull); 202 Datum deposit_serial_id_int = SPI_getbinval (tuple, tupdesc, 4, 203 &deposit_serial_id_isnull); 204 if (refund_val_isnull 205 || refund_frac_isnull 206 || coin_pub_isnull 207 || deposit_serial_id_isnull) 208 { 209 elog (ERROR, "Failed to retrieve data from subquery 2"); 210 } 211 uint64_t s_refund_val_int = DatumGetUInt64 (sum_refund_val); 212 uint32_t s_refund_frac_int = DatumGetUInt32 (sum_refund_frac); 213 BYTEA coin_pub = DatumGetByteaP (coin_pub); 214 uint64_t deposit_serial_id_int = DatumGetInt64 (deposit_serial_id_int); 215 refund *new_refund_by_coin = (refund*) palloc (sizeof(refund)); 216 new_refund_by_coin->coin_pub = coin_pub; 217 new_refund_by_coin->deposit_serial_id = deposit_serial_id_int; 218 new_refund_by_coin->refund_amount_with_fee_val = s_refund_val_int; 219 new_refund_by_coin->refund_amount_with_fee_frac = s_refund_frac_int; 220 } 221 222 223 if (norm_refund_by_coin_plan == NULL) 224 { 225 const char *norm_ref_by_coin_sql = 226 "norm_ref_by_coin AS (" 227 " SELECT" 228 " coin_pub" 229 " ,deposit_serial_id" 230 " FROM ref_by_coin) "; 231 SPIPlanPtr new_plan = SPI_prepare (norm_ref_by_coin_sql, 0, NULL); 232 if (new_plan == NULL) 233 elog (ERROR, "SPI_prepare for norm refund by coin failed ! \n"); 234 norm_refund_by_coin_plan = SPI_saveplan (new_plan); 235 if (norm_refund_by_coin_plan == NULL) 236 elog (ERROR, "SPI_saveplan for norm refund by coin failed ! \n"); 237 } 238 239 double norm_refund_val = 240 ((double) new_refund_by_coin->refund_amount_with_fee_val 241 + (double) new_refund_by_coin->refund_amount_with_fee_frac) / 100000000; 242 double norm_refund_frac = 243 (double) new_refund_by_coin->refund_amount_with_fee_frac % 100000000; 244 245 if (fully_refunded_coins_plan == NULL) 246 { 247 const char *fully_refunded_coins_sql = 248 "fully_refunded_coins AS (" 249 " SELECT" 250 " dep.coin_pub" 251 " FROM norm_ref_by_coin norm" 252 " JOIN dep" 253 " ON (norm.coin_pub = dep.coin_pub" 254 " AND norm.deposit_serial_id = dep.deposit_serial_id" 255 " AND norm.norm_refund_val = dep.amount_val" 256 " AND norm.norm_refund_frac = dep.amount_frac)) "; 257 SPIPlanPtr new_plan = 258 SPI_prepare (fully_refunded_coins_sql, 0, NULL); 259 if (new_plan == NULL) 260 elog (ERROR, "SPI_prepare for fully refunded coins failed ! \n"); 261 fully_refunded_coins_plan = SPI_saveplan (new_plan); 262 if (fully_refunded_coins_plan == NULL) 263 elog (ERROR, "SPI_saveplan for fully refunded coins failed ! \n"); 264 } 265 266 int res = SPI_execute_plan (fully_refunded_coins_sql); 267 if (res != SPI_OK_SELECT) 268 elog (ERROR, "Failed to execute subquery 4\n"); 269 SPITupleTable *tuptable = SPI_tuptable; 270 TupleDesc tupdesc = tuptable->tupdesc; 271 272 BYTEA coin_pub = SPI_getbinval (tuple, tupdesc, 1, &coin_pub_isnull); 273 if (fees_plan == NULL) 274 { 275 const char *fees_sql = 276 "SELECT " 277 " denom.fee_deposit_val AS fee_val, " 278 " denom.fee_deposit_frac AS fee_frac, " 279 " cs.deposit_serial_id " 280 "FROM dep cs " 281 "JOIN known_coins kc USING (coin_pub) " 282 "JOIN denominations denom USING (denominations_serial) " 283 "WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins)"; 284 SPIPlanPtr new_plan = 285 SPI_prepare (fees_sql, 0, NULL); 286 if (new_plan == NULL) 287 { 288 elog (ERROR, "SPI_prepare for fees failed ! \n"); 289 } 290 fees_plan = SPI_saveplan (new_plan); 291 if (fees_plan == NULL) 292 { 293 elog (ERROR, "SPI_saveplan for fees failed ! \n"); 294 } 295 } 296 } 297 int fees_ntuples; 298 SPI_execute (fees_sql, true, 0); 299 if (SPI_result_code () != SPI_OK_SELECT) 300 { 301 ereport ( 302 ERROR, 303 (errcode (ERRCODE_INTERNAL_ERROR), 304 errmsg ("deposit fee query failed: error code %d \n", 305 SPI_result_code ()))); 306 } 307 fees_ntuples = SPI_processed; 308 309 if (fees_ntuples > 0) 310 { 311 for (i = 0; i < fees_ntuples; i++) 312 { 313 Datum fee_val_datum = 314 SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, 315 &fee_null); 316 Datum fee_frac_datum = 317 SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, 318 &fee_null); 319 Datum deposit_id_datum = 320 SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 3, 321 &deposit_null); 322 if (! fee_null && ! deposit_null) 323 { 324 int64 fee_val = DatumGetInt64 (fee_val_datum); 325 int32 fee_frac = DatumGetInt32 (fee_frac_datum); 326 int64 deposit_id = DatumGetInt64 (deposit_id_datum); 327 sum_fee_value += fee_val; 328 sum_fee_fraction += fee_frac; 329 char *insert_agg_sql = 330 psprintf ( 331 "INSERT INTO " 332 "aggregation_tracking(deposit_serial_id, wtid_raw)" 333 " VALUES (%lld, '%s')", 334 deposit_id, wtid_raw); 335 SPI_execute (insert_agg_sql, false, 0); 336 } 337 } 338 } 339 340 TupleDesc tupdesc; 341 SPITupleTable *tuptable = SPI_tuptable; 342 HeapTuple tuple; 343 Datum result; 344 345 if (tuptable == NULL || SPI_processed != 1) 346 { 347 ereport ( 348 ERROR, 349 (errcode (ERRCODE_INTERNAL_ERROR), 350 errmsg ("Unexpected result \n"))); 351 } 352 tupdesc = SPI_tuptable->tupdesc; 353 tuple = SPI_tuptable->vals[0]; 354 result = HeapTupleGetDatum (tuple); 355 356 TupleDesc result_desc = CreateTemplateTupleDesc (6, false); 357 TupleDescInitEntry (result_desc, (AttrNumber) 1, "sum_deposit_value", INT8OID, 358 -1, 0); 359 TupleDescInitEntry (result_desc, (AttrNumber) 2, "sum_deposit_fraction", 360 INT4OID, -1, 0); 361 TupleDescInitEntry (result_desc, (AttrNumber) 3, "sum_refund_value", INT8OID, 362 -1, 0); 363 TupleDescInitEntry (result_desc, (AttrNumber) 4, "sum_refund_fraction", 364 INT4OID, -1, 0); 365 TupleDescInitEntry (result_desc, (AttrNumber) 5, "sum_fee_value", INT8OID, -1, 366 0); 367 TupleDescInitEntry (result_desc, (AttrNumber) 6, "sum_fee_fraction", INT4OID, 368 -1, 0); 369 370 int ret = SPI_prepare (sql, 4, argtypes); 371 if (ret != SPI_OK_PREPARE) 372 { 373 elog (ERROR, "Failed to prepare statement: %s \n", sql); 374 } 375 376 ret = SPI_execute_plan (plan, args, nulls, true, 0); 377 if (ret != SPI_OK_SELECT) 378 { 379 elog (ERROR, "Failed to execute statement: %s \n", sql); 380 } 381 382 if (SPI_processed > 0) 383 { 384 HeapTuple tuple; 385 Datum values[6]; 386 bool nulls[6] = {false}; 387 values[0] = 388 SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, 389 &nulls[0]); 390 values[1] = 391 SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2, 392 &nulls[1]); 393 values[2] = 394 SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 3, 395 &nulls[2]); 396 values[3] = 397 SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 4, 398 &nulls[3]); 399 values[4] = 400 SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 5, 401 &nulls[4]); 402 values[5] = 403 SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 6, 404 &nulls[5]); 405 tuple = heap_form_tuple (result_desc, values, nulls); 406 PG_RETURN_DATUM (HeapTupleGetDatum (tuple)); 407 } 408 SPI_finish (); 409 410 PG_RETURN_NULL (); 411 }