aboutsummaryrefslogtreecommitdiff
path: root/src/reducer/anastasis_api_redux.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/reducer/anastasis_api_redux.c')
-rw-r--r--src/reducer/anastasis_api_redux.c354
1 files changed, 231 insertions, 123 deletions
diff --git a/src/reducer/anastasis_api_redux.c b/src/reducer/anastasis_api_redux.c
index 92801f4..59770f3 100644
--- a/src/reducer/anastasis_api_redux.c
+++ b/src/reducer/anastasis_api_redux.c
@@ -234,6 +234,26 @@ static json_t *redux_countries;
234 */ 234 */
235static json_t *provider_list; 235static json_t *provider_list;
236 236
237/**
238 * External reducer binary or NULL
239 * to use internal reducer.
240 */
241static char *external_reducer_binary;
242
243
244const char *
245ANASTASIS_REDUX_probe_external_reducer (void)
246{
247 if (NULL != external_reducer_binary)
248 return external_reducer_binary;
249 external_reducer_binary = getenv ("ANASTASIS_EXTERNAL_REDUCER");
250 if (NULL != external_reducer_binary)
251 unsetenv ("ANASTASIS_EXTERNAL_REDUCER");
252
253 return external_reducer_binary;
254
255}
256
237 257
238/** 258/**
239 * Extract the mode of a state from json 259 * Extract the mode of a state from json
@@ -1480,38 +1500,145 @@ typedef struct ANASTASIS_ReduxAction *
1480 ANASTASIS_ActionCallback cb, 1500 ANASTASIS_ActionCallback cb,
1481 void *cb_cls); 1501 void *cb_cls);
1482 1502
1503
1483/** 1504/**
1484 * Dummy cleanup function. 1505 * Closure for read operations on the external reducer.
1485 */ 1506 */
1486static void 1507struct ExternalReducerCls
1487dummy_cleanup (void *cls)
1488{ 1508{
1489 GNUNET_assert (0); 1509 struct GNUNET_Buffer read_buffer;
1490} 1510 struct GNUNET_SCHEDULER_Task *read_task;
1491 1511 struct GNUNET_DISK_PipeHandle *reducer_stdin;
1512 struct GNUNET_DISK_PipeHandle *reducer_stdout;
1513 struct GNUNET_OS_Process *reducer_process;
1514 ANASTASIS_ActionCallback action_cb;
1515 void *action_cb_cls;
1516};
1492 1517
1493/** 1518/**
1494 * Closure for external_redux_done. 1519 * Clean up and destroy the external reducer state.
1520 *
1521 * @param cls closure, a 'struct ExternalReducerCls *'
1495 */ 1522 */
1496struct ExternalReduxCls 1523static void
1524cleanup_external_reducer (void *cls)
1497{ 1525{
1498 ANASTASIS_ActionCallback cb; 1526 struct ExternalReducerCls *red_cls = cls;
1499 void *cb_cls; 1527
1500 json_t *new_state; 1528 if (NULL != red_cls->read_task)
1501}; 1529 {
1530 GNUNET_SCHEDULER_cancel (red_cls->read_task);
1531 red_cls->read_task = NULL;
1532 }
1533
1534 GNUNET_buffer_clear (&red_cls->read_buffer);
1535 if (NULL != red_cls->reducer_stdin)
1536 {
1537 GNUNET_DISK_pipe_close (red_cls->reducer_stdin);
1538 red_cls->reducer_stdin = NULL;
1539 }
1540 if (NULL != red_cls->reducer_stdout)
1541 {
1542 GNUNET_DISK_pipe_close (red_cls->reducer_stdout);
1543 red_cls->reducer_stdout = NULL;
1544 }
1545
1546 if (NULL != red_cls->reducer_process)
1547 {
1548 enum GNUNET_OS_ProcessStatusType type;
1549 unsigned long code;
1550 enum GNUNET_GenericReturnValue pwret;
1551
1552 pwret = GNUNET_OS_process_wait_status (red_cls->reducer_process,
1553 &type,
1554 &code);
1555
1556 GNUNET_assert (GNUNET_SYSERR != pwret);
1557 if (GNUNET_NO == pwret)
1558 {
1559 GNUNET_OS_process_kill (red_cls->reducer_process,
1560 SIGTERM);
1561 GNUNET_assert (GNUNET_SYSERR != GNUNET_OS_process_wait (
1562 red_cls->reducer_process));
1563 }
1564
1565 GNUNET_OS_process_destroy (red_cls->reducer_process);
1566 red_cls->reducer_process = NULL;
1567 }
1568
1569 GNUNET_free (red_cls);
1570}
1502 1571
1503 1572
1504/** 1573/**
1505 * Callback called when the redux action has been processed by 1574 * Task called when
1506 * the external reducer. 1575 *
1576 * @param cls closure, a 'struct ExternalReducerCls *'
1507 */ 1577 */
1508static void 1578static void
1509external_redux_done (void *cls) 1579external_reducer_read_cb (void *cls)
1510{ 1580{
1511 struct ExternalReduxCls *erc = cls; 1581 struct ExternalReducerCls *red_cls = cls;
1512 erc->cb (erc->cb_cls, 1582 ssize_t sret;
1513 TALER_EC_NONE, 1583 char buf[256];
1514 erc->new_state); 1584
1585 red_cls->read_task = NULL;
1586
1587 sret = GNUNET_DISK_file_read (GNUNET_DISK_pipe_handle (
1588 red_cls->reducer_stdout,
1589 GNUNET_DISK_PIPE_END_READ),
1590 buf,
1591 256);
1592 if (sret < 0)
1593 {
1594 GNUNET_break (0);
1595 red_cls->action_cb (red_cls->action_cb_cls,
1596 TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR,
1597 NULL);
1598 cleanup_external_reducer (red_cls);
1599 return;
1600 }
1601 else if (0 == sret)
1602 {
1603 char *str = GNUNET_buffer_reap_str (&red_cls->read_buffer);
1604 json_t *json;
1605
1606 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1607 "Got external reducer response: '%s'\n",
1608 str);
1609
1610 json = json_loads (str, 0, NULL);
1611
1612 if (NULL == json)
1613 {
1614 GNUNET_break (0);
1615 red_cls->action_cb (red_cls->action_cb_cls,
1616 TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR,
1617 NULL);
1618 cleanup_external_reducer (red_cls);
1619 return;
1620 }
1621
1622 red_cls->action_cb (red_cls->action_cb_cls,
1623 TALER_EC_NONE,
1624 json);
1625 cleanup_external_reducer (red_cls);
1626 return;
1627 }
1628 else
1629 {
1630 GNUNET_buffer_write (&red_cls->read_buffer,
1631 buf,
1632 sret);
1633
1634 red_cls->read_task = GNUNET_SCHEDULER_add_read_file (
1635 GNUNET_TIME_UNIT_FOREVER_REL,
1636 GNUNET_DISK_pipe_handle (
1637 red_cls->reducer_stdout,
1638 GNUNET_DISK_PIPE_END_READ),
1639 external_reducer_read_cb,
1640 red_cls);
1641 }
1515} 1642}
1516 1643
1517 1644
@@ -1527,116 +1654,95 @@ redux_action_external (const char *ext_reducer,
1527 ANASTASIS_ActionCallback cb, 1654 ANASTASIS_ActionCallback cb,
1528 void *cb_cls) 1655 void *cb_cls)
1529{ 1656{
1530 struct ANASTASIS_ReduxAction *act = NULL; 1657 char *arg_str;
1531 int pipefd_stdout[2]; 1658 char *state_str = json_dumps (state, JSON_COMPACT);
1532 int pipefd_stdin[2]; 1659 ssize_t sret;
1533 pid_t pid = 0; 1660 struct ExternalReducerCls *red_cls = GNUNET_new (struct ExternalReducerCls);
1534 int status;
1535 FILE *reducer_stdout;
1536 FILE *reducer_stdin;
1537 json_t *next_state;
1538 1661
1539 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1662 if (NULL == arguments)
1540 "Using external reducer '%s'\n", 1663 arg_str = GNUNET_strdup ("{}");
1541 ext_reducer); 1664 else
1542 1665 arg_str = json_dumps (arguments, JSON_COMPACT);
1543 GNUNET_assert (0 == pipe (pipefd_stdout));
1544 GNUNET_assert (0 == pipe (pipefd_stdin));
1545 pid = fork ();
1546 if (pid == 0)
1547 {
1548 /* Child */
1549
1550 char *arg_str = json_dumps (arguments, JSON_COMPACT);
1551
1552 close (pipefd_stdout[0]);
1553 dup2 (pipefd_stdout[1], STDOUT_FILENO);
1554
1555 close (pipefd_stdin[1]);
1556 dup2 (pipefd_stdin[0], STDIN_FILENO);
1557
1558 /* Unset environment variable, otherwise anastasis-reducer
1559 would recursively shell out to itself. */
1560 unsetenv ("ANASTASIS_EXTERNAL_REDUCER");
1561
1562 execlp (ext_reducer,
1563 ext_reducer,
1564 "-a",
1565 arg_str,
1566 action,
1567 NULL);
1568 GNUNET_assert (0);
1569 }
1570
1571 /* Only parent reaches here */
1572
1573 close (pipefd_stdout[1]);
1574 close (pipefd_stdin[0]);
1575 1666
1576 reducer_stdout = fdopen (pipefd_stdout[0], 1667 red_cls->action_cb = cb;
1577 "r"); 1668 red_cls->action_cb_cls = cb_cls;
1578 reducer_stdin = fdopen (pipefd_stdin[1],
1579 "w");
1580 1669
1581 GNUNET_assert (0 == json_dumpf (state, 1670 GNUNET_assert (NULL != (red_cls->reducer_stdin = GNUNET_DISK_pipe (
1582 reducer_stdin, 1671 GNUNET_DISK_PF_NONE)));
1583 JSON_COMPACT)); 1672 GNUNET_assert (NULL != (red_cls->reducer_stdout = GNUNET_DISK_pipe (
1673 GNUNET_DISK_PF_NONE)));
1584 1674
1585 GNUNET_assert (0 == fclose (reducer_stdin)); 1675 /* By the time we're here, this variable should be unset, because
1586 reducer_stdin = NULL; 1676 otherwise using anastasis-reducer as the external reducer
1677 will lead to infinite recursion. */
1678 GNUNET_assert (NULL == getenv ("ANASTASIS_EXTERNAL_REDUCER"));
1587 1679
1588 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1680 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1589 "Wrote old state to reducer stdin.\n"); 1681 "Starting external reducer with action '%s' and argument '%s'\n",
1590 1682 action,
1683 arg_str);
1684
1685 red_cls->reducer_process = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ERR,
1686 red_cls->reducer_stdin,
1687 red_cls->reducer_stdout,
1688 NULL,
1689 ext_reducer,
1690 ext_reducer,
1691 "-a",
1692 arg_str,
1693 action,
1694 NULL);
1695
1696 GNUNET_free (arg_str);
1697
1698 if (NULL == red_cls->reducer_process)
1591 { 1699 {
1592 json_error_t err; 1700 GNUNET_break (0);
1593 1701 GNUNET_free (state_str);
1594 next_state = json_loadf (reducer_stdout, 1702 cleanup_external_reducer (red_cls);
1595 0, 1703 return NULL;
1596 &err);
1597
1598 if (NULL == next_state)
1599 {
1600 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1601 "External reducer did not output valid JSON: %s:%d:%d %s\n",
1602 err.source,
1603 err.line,
1604 err.column,
1605 err.text);
1606 GNUNET_assert (0 == fclose (reducer_stdout));
1607 waitpid (pid, &status, 0);
1608 return NULL;
1609 }
1610 } 1704 }
1611 1705
1612 /* FIXME: report error instead! */ 1706 /* Close pipe ends we don't use. */
1613 GNUNET_assert (NULL != next_state); 1707 GNUNET_assert (GNUNET_OK ==
1614 1708 GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin,
1615 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1709 GNUNET_DISK_PIPE_END_READ));
1616 "Waiting for external reducer to terminate.\n"); 1710 GNUNET_assert (GNUNET_OK ==
1617 GNUNET_assert (0 == fclose (reducer_stdout)); 1711 GNUNET_DISK_pipe_close_end (red_cls->reducer_stdout,
1618 reducer_stdout = NULL; 1712 GNUNET_DISK_PIPE_END_WRITE));
1619 waitpid (pid, &status, 0); 1713
1714 sret = GNUNET_DISK_file_write_blocking (GNUNET_DISK_pipe_handle (
1715 red_cls->reducer_stdin,
1716 GNUNET_DISK_PIPE_END_WRITE),
1717 state_str,
1718 strlen (state_str));
1719 GNUNET_free (state_str);
1720 if (sret <= 0)
1721 {
1722 GNUNET_break (0);
1723 cleanup_external_reducer (red_cls);
1724 return NULL;
1725 }
1620 1726
1621 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1727 GNUNET_assert (GNUNET_OK ==
1622 "External reducer finished with exit status '%d'\n", 1728 GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin,
1623 status); 1729 GNUNET_DISK_PIPE_END_WRITE));
1624 1730
1625 act = GNUNET_new (struct ANASTASIS_ReduxAction); 1731 red_cls->read_task = GNUNET_SCHEDULER_add_read_file (
1626 /* Callback is called immediately, cleanup must never be called */ 1732 GNUNET_TIME_UNIT_FOREVER_REL,
1627 act->cleanup = &dummy_cleanup; 1733 GNUNET_DISK_pipe_handle (
1734 red_cls->reducer_stdout,
1735 GNUNET_DISK_PIPE_END_READ),
1736 external_reducer_read_cb,
1737 red_cls);
1628 1738
1629 { 1739 {
1630 struct ExternalReduxCls *sched_cls = GNUNET_new (struct ExternalReduxCls); 1740 struct ANASTASIS_ReduxAction *ra = GNUNET_new (struct
1631 1741 ANASTASIS_ReduxAction);
1632 sched_cls->cb = cb; 1742 ra->cleanup_cls = red_cls;
1633 sched_cls->cb_cls = cb_cls; 1743 ra->cleanup = cleanup_external_reducer;
1634 sched_cls->new_state = next_state; 1744 return ra;
1635 GNUNET_SCHEDULER_add_now (external_redux_done,
1636 sched_cls);
1637 } 1745 }
1638
1639 return act;
1640} 1746}
1641 1747
1642 1748
@@ -1699,16 +1805,18 @@ ANASTASIS_redux_action (const json_t *state,
1699 const char *s = json_string_value (json_object_get (state, 1805 const char *s = json_string_value (json_object_get (state,
1700 "backup_state")); 1806 "backup_state"));
1701 enum ANASTASIS_GenericState gs; 1807 enum ANASTASIS_GenericState gs;
1702 const char *ext_reducer = getenv ("ANASTASIS_EXTERNAL_REDUCER");
1703 1808
1704 /* If requested, handle action with external reducer, used for testing. */ 1809 /* If requested, handle action with external reducer, used for testing. */
1705 if (NULL != ext_reducer) 1810 {
1706 return redux_action_external (ext_reducer, 1811 const char *ext_reducer = ANASTASIS_REDUX_probe_external_reducer ();
1707 state, 1812 if (NULL != ext_reducer)
1708 action, 1813 return redux_action_external (ext_reducer,
1709 arguments, 1814 state,
1710 cb, 1815 action,
1711 cb_cls); 1816 arguments,
1817 cb,
1818 cb_cls);
1819 }
1712 1820
1713 if (NULL == s) 1821 if (NULL == s)
1714 { 1822 {