diff options
Diffstat (limited to 'src/reducer/anastasis_api_redux.c')
-rw-r--r-- | src/reducer/anastasis_api_redux.c | 354 |
1 files changed, 231 insertions, 123 deletions
diff --git a/src/reducer/anastasis_api_redux.c b/src/reducer/anastasis_api_redux.c index 92801f4..59770f3 100644 --- a/src/reducer/anastasis_api_redux.c +++ b/src/reducer/anastasis_api_redux.c | |||
@@ -234,6 +234,26 @@ static json_t *redux_countries; | |||
234 | */ | 234 | */ |
235 | static json_t *provider_list; | 235 | static json_t *provider_list; |
236 | 236 | ||
237 | /** | ||
238 | * External reducer binary or NULL | ||
239 | * to use internal reducer. | ||
240 | */ | ||
241 | static char *external_reducer_binary; | ||
242 | |||
243 | |||
244 | const char * | ||
245 | ANASTASIS_REDUX_probe_external_reducer (void) | ||
246 | { | ||
247 | if (NULL != external_reducer_binary) | ||
248 | return external_reducer_binary; | ||
249 | external_reducer_binary = getenv ("ANASTASIS_EXTERNAL_REDUCER"); | ||
250 | if (NULL != external_reducer_binary) | ||
251 | unsetenv ("ANASTASIS_EXTERNAL_REDUCER"); | ||
252 | |||
253 | return external_reducer_binary; | ||
254 | |||
255 | } | ||
256 | |||
237 | 257 | ||
238 | /** | 258 | /** |
239 | * Extract the mode of a state from json | 259 | * Extract the mode of a state from json |
@@ -1480,38 +1500,145 @@ typedef struct ANASTASIS_ReduxAction * | |||
1480 | ANASTASIS_ActionCallback cb, | 1500 | ANASTASIS_ActionCallback cb, |
1481 | void *cb_cls); | 1501 | void *cb_cls); |
1482 | 1502 | ||
1503 | |||
1483 | /** | 1504 | /** |
1484 | * Dummy cleanup function. | 1505 | * Closure for read operations on the external reducer. |
1485 | */ | 1506 | */ |
1486 | static void | 1507 | struct ExternalReducerCls |
1487 | dummy_cleanup (void *cls) | ||
1488 | { | 1508 | { |
1489 | GNUNET_assert (0); | 1509 | struct GNUNET_Buffer read_buffer; |
1490 | } | 1510 | struct GNUNET_SCHEDULER_Task *read_task; |
1491 | 1511 | struct GNUNET_DISK_PipeHandle *reducer_stdin; | |
1512 | struct GNUNET_DISK_PipeHandle *reducer_stdout; | ||
1513 | struct GNUNET_OS_Process *reducer_process; | ||
1514 | ANASTASIS_ActionCallback action_cb; | ||
1515 | void *action_cb_cls; | ||
1516 | }; | ||
1492 | 1517 | ||
1493 | /** | 1518 | /** |
1494 | * Closure for external_redux_done. | 1519 | * Clean up and destroy the external reducer state. |
1520 | * | ||
1521 | * @param cls closure, a 'struct ExternalReducerCls *' | ||
1495 | */ | 1522 | */ |
1496 | struct ExternalReduxCls | 1523 | static void |
1524 | cleanup_external_reducer (void *cls) | ||
1497 | { | 1525 | { |
1498 | ANASTASIS_ActionCallback cb; | 1526 | struct ExternalReducerCls *red_cls = cls; |
1499 | void *cb_cls; | 1527 | |
1500 | json_t *new_state; | 1528 | if (NULL != red_cls->read_task) |
1501 | }; | 1529 | { |
1530 | GNUNET_SCHEDULER_cancel (red_cls->read_task); | ||
1531 | red_cls->read_task = NULL; | ||
1532 | } | ||
1533 | |||
1534 | GNUNET_buffer_clear (&red_cls->read_buffer); | ||
1535 | if (NULL != red_cls->reducer_stdin) | ||
1536 | { | ||
1537 | GNUNET_DISK_pipe_close (red_cls->reducer_stdin); | ||
1538 | red_cls->reducer_stdin = NULL; | ||
1539 | } | ||
1540 | if (NULL != red_cls->reducer_stdout) | ||
1541 | { | ||
1542 | GNUNET_DISK_pipe_close (red_cls->reducer_stdout); | ||
1543 | red_cls->reducer_stdout = NULL; | ||
1544 | } | ||
1545 | |||
1546 | if (NULL != red_cls->reducer_process) | ||
1547 | { | ||
1548 | enum GNUNET_OS_ProcessStatusType type; | ||
1549 | unsigned long code; | ||
1550 | enum GNUNET_GenericReturnValue pwret; | ||
1551 | |||
1552 | pwret = GNUNET_OS_process_wait_status (red_cls->reducer_process, | ||
1553 | &type, | ||
1554 | &code); | ||
1555 | |||
1556 | GNUNET_assert (GNUNET_SYSERR != pwret); | ||
1557 | if (GNUNET_NO == pwret) | ||
1558 | { | ||
1559 | GNUNET_OS_process_kill (red_cls->reducer_process, | ||
1560 | SIGTERM); | ||
1561 | GNUNET_assert (GNUNET_SYSERR != GNUNET_OS_process_wait ( | ||
1562 | red_cls->reducer_process)); | ||
1563 | } | ||
1564 | |||
1565 | GNUNET_OS_process_destroy (red_cls->reducer_process); | ||
1566 | red_cls->reducer_process = NULL; | ||
1567 | } | ||
1568 | |||
1569 | GNUNET_free (red_cls); | ||
1570 | } | ||
1502 | 1571 | ||
1503 | 1572 | ||
1504 | /** | 1573 | /** |
1505 | * Callback called when the redux action has been processed by | 1574 | * Task called when |
1506 | * the external reducer. | 1575 | * |
1576 | * @param cls closure, a 'struct ExternalReducerCls *' | ||
1507 | */ | 1577 | */ |
1508 | static void | 1578 | static void |
1509 | external_redux_done (void *cls) | 1579 | external_reducer_read_cb (void *cls) |
1510 | { | 1580 | { |
1511 | struct ExternalReduxCls *erc = cls; | 1581 | struct ExternalReducerCls *red_cls = cls; |
1512 | erc->cb (erc->cb_cls, | 1582 | ssize_t sret; |
1513 | TALER_EC_NONE, | 1583 | char buf[256]; |
1514 | erc->new_state); | 1584 | |
1585 | red_cls->read_task = NULL; | ||
1586 | |||
1587 | sret = GNUNET_DISK_file_read (GNUNET_DISK_pipe_handle ( | ||
1588 | red_cls->reducer_stdout, | ||
1589 | GNUNET_DISK_PIPE_END_READ), | ||
1590 | buf, | ||
1591 | 256); | ||
1592 | if (sret < 0) | ||
1593 | { | ||
1594 | GNUNET_break (0); | ||
1595 | red_cls->action_cb (red_cls->action_cb_cls, | ||
1596 | TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR, | ||
1597 | NULL); | ||
1598 | cleanup_external_reducer (red_cls); | ||
1599 | return; | ||
1600 | } | ||
1601 | else if (0 == sret) | ||
1602 | { | ||
1603 | char *str = GNUNET_buffer_reap_str (&red_cls->read_buffer); | ||
1604 | json_t *json; | ||
1605 | |||
1606 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1607 | "Got external reducer response: '%s'\n", | ||
1608 | str); | ||
1609 | |||
1610 | json = json_loads (str, 0, NULL); | ||
1611 | |||
1612 | if (NULL == json) | ||
1613 | { | ||
1614 | GNUNET_break (0); | ||
1615 | red_cls->action_cb (red_cls->action_cb_cls, | ||
1616 | TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR, | ||
1617 | NULL); | ||
1618 | cleanup_external_reducer (red_cls); | ||
1619 | return; | ||
1620 | } | ||
1621 | |||
1622 | red_cls->action_cb (red_cls->action_cb_cls, | ||
1623 | TALER_EC_NONE, | ||
1624 | json); | ||
1625 | cleanup_external_reducer (red_cls); | ||
1626 | return; | ||
1627 | } | ||
1628 | else | ||
1629 | { | ||
1630 | GNUNET_buffer_write (&red_cls->read_buffer, | ||
1631 | buf, | ||
1632 | sret); | ||
1633 | |||
1634 | red_cls->read_task = GNUNET_SCHEDULER_add_read_file ( | ||
1635 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1636 | GNUNET_DISK_pipe_handle ( | ||
1637 | red_cls->reducer_stdout, | ||
1638 | GNUNET_DISK_PIPE_END_READ), | ||
1639 | external_reducer_read_cb, | ||
1640 | red_cls); | ||
1641 | } | ||
1515 | } | 1642 | } |
1516 | 1643 | ||
1517 | 1644 | ||
@@ -1527,116 +1654,95 @@ redux_action_external (const char *ext_reducer, | |||
1527 | ANASTASIS_ActionCallback cb, | 1654 | ANASTASIS_ActionCallback cb, |
1528 | void *cb_cls) | 1655 | void *cb_cls) |
1529 | { | 1656 | { |
1530 | struct ANASTASIS_ReduxAction *act = NULL; | 1657 | char *arg_str; |
1531 | int pipefd_stdout[2]; | 1658 | char *state_str = json_dumps (state, JSON_COMPACT); |
1532 | int pipefd_stdin[2]; | 1659 | ssize_t sret; |
1533 | pid_t pid = 0; | 1660 | struct ExternalReducerCls *red_cls = GNUNET_new (struct ExternalReducerCls); |
1534 | int status; | ||
1535 | FILE *reducer_stdout; | ||
1536 | FILE *reducer_stdin; | ||
1537 | json_t *next_state; | ||
1538 | 1661 | ||
1539 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1662 | if (NULL == arguments) |
1540 | "Using external reducer '%s'\n", | 1663 | arg_str = GNUNET_strdup ("{}"); |
1541 | ext_reducer); | 1664 | else |
1542 | 1665 | arg_str = json_dumps (arguments, JSON_COMPACT); | |
1543 | GNUNET_assert (0 == pipe (pipefd_stdout)); | ||
1544 | GNUNET_assert (0 == pipe (pipefd_stdin)); | ||
1545 | pid = fork (); | ||
1546 | if (pid == 0) | ||
1547 | { | ||
1548 | /* Child */ | ||
1549 | |||
1550 | char *arg_str = json_dumps (arguments, JSON_COMPACT); | ||
1551 | |||
1552 | close (pipefd_stdout[0]); | ||
1553 | dup2 (pipefd_stdout[1], STDOUT_FILENO); | ||
1554 | |||
1555 | close (pipefd_stdin[1]); | ||
1556 | dup2 (pipefd_stdin[0], STDIN_FILENO); | ||
1557 | |||
1558 | /* Unset environment variable, otherwise anastasis-reducer | ||
1559 | would recursively shell out to itself. */ | ||
1560 | unsetenv ("ANASTASIS_EXTERNAL_REDUCER"); | ||
1561 | |||
1562 | execlp (ext_reducer, | ||
1563 | ext_reducer, | ||
1564 | "-a", | ||
1565 | arg_str, | ||
1566 | action, | ||
1567 | NULL); | ||
1568 | GNUNET_assert (0); | ||
1569 | } | ||
1570 | |||
1571 | /* Only parent reaches here */ | ||
1572 | |||
1573 | close (pipefd_stdout[1]); | ||
1574 | close (pipefd_stdin[0]); | ||
1575 | 1666 | ||
1576 | reducer_stdout = fdopen (pipefd_stdout[0], | 1667 | red_cls->action_cb = cb; |
1577 | "r"); | 1668 | red_cls->action_cb_cls = cb_cls; |
1578 | reducer_stdin = fdopen (pipefd_stdin[1], | ||
1579 | "w"); | ||
1580 | 1669 | ||
1581 | GNUNET_assert (0 == json_dumpf (state, | 1670 | GNUNET_assert (NULL != (red_cls->reducer_stdin = GNUNET_DISK_pipe ( |
1582 | reducer_stdin, | 1671 | GNUNET_DISK_PF_NONE))); |
1583 | JSON_COMPACT)); | 1672 | GNUNET_assert (NULL != (red_cls->reducer_stdout = GNUNET_DISK_pipe ( |
1673 | GNUNET_DISK_PF_NONE))); | ||
1584 | 1674 | ||
1585 | GNUNET_assert (0 == fclose (reducer_stdin)); | 1675 | /* By the time we're here, this variable should be unset, because |
1586 | reducer_stdin = NULL; | 1676 | otherwise using anastasis-reducer as the external reducer |
1677 | will lead to infinite recursion. */ | ||
1678 | GNUNET_assert (NULL == getenv ("ANASTASIS_EXTERNAL_REDUCER")); | ||
1587 | 1679 | ||
1588 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1680 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
1589 | "Wrote old state to reducer stdin.\n"); | 1681 | "Starting external reducer with action '%s' and argument '%s'\n", |
1590 | 1682 | action, | |
1683 | arg_str); | ||
1684 | |||
1685 | red_cls->reducer_process = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ERR, | ||
1686 | red_cls->reducer_stdin, | ||
1687 | red_cls->reducer_stdout, | ||
1688 | NULL, | ||
1689 | ext_reducer, | ||
1690 | ext_reducer, | ||
1691 | "-a", | ||
1692 | arg_str, | ||
1693 | action, | ||
1694 | NULL); | ||
1695 | |||
1696 | GNUNET_free (arg_str); | ||
1697 | |||
1698 | if (NULL == red_cls->reducer_process) | ||
1591 | { | 1699 | { |
1592 | json_error_t err; | 1700 | GNUNET_break (0); |
1593 | 1701 | GNUNET_free (state_str); | |
1594 | next_state = json_loadf (reducer_stdout, | 1702 | cleanup_external_reducer (red_cls); |
1595 | 0, | 1703 | return NULL; |
1596 | &err); | ||
1597 | |||
1598 | if (NULL == next_state) | ||
1599 | { | ||
1600 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1601 | "External reducer did not output valid JSON: %s:%d:%d %s\n", | ||
1602 | err.source, | ||
1603 | err.line, | ||
1604 | err.column, | ||
1605 | err.text); | ||
1606 | GNUNET_assert (0 == fclose (reducer_stdout)); | ||
1607 | waitpid (pid, &status, 0); | ||
1608 | return NULL; | ||
1609 | } | ||
1610 | } | 1704 | } |
1611 | 1705 | ||
1612 | /* FIXME: report error instead! */ | 1706 | /* Close pipe ends we don't use. */ |
1613 | GNUNET_assert (NULL != next_state); | 1707 | GNUNET_assert (GNUNET_OK == |
1614 | 1708 | GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin, | |
1615 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1709 | GNUNET_DISK_PIPE_END_READ)); |
1616 | "Waiting for external reducer to terminate.\n"); | 1710 | GNUNET_assert (GNUNET_OK == |
1617 | GNUNET_assert (0 == fclose (reducer_stdout)); | 1711 | GNUNET_DISK_pipe_close_end (red_cls->reducer_stdout, |
1618 | reducer_stdout = NULL; | 1712 | GNUNET_DISK_PIPE_END_WRITE)); |
1619 | waitpid (pid, &status, 0); | 1713 | |
1714 | sret = GNUNET_DISK_file_write_blocking (GNUNET_DISK_pipe_handle ( | ||
1715 | red_cls->reducer_stdin, | ||
1716 | GNUNET_DISK_PIPE_END_WRITE), | ||
1717 | state_str, | ||
1718 | strlen (state_str)); | ||
1719 | GNUNET_free (state_str); | ||
1720 | if (sret <= 0) | ||
1721 | { | ||
1722 | GNUNET_break (0); | ||
1723 | cleanup_external_reducer (red_cls); | ||
1724 | return NULL; | ||
1725 | } | ||
1620 | 1726 | ||
1621 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1727 | GNUNET_assert (GNUNET_OK == |
1622 | "External reducer finished with exit status '%d'\n", | 1728 | GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin, |
1623 | status); | 1729 | GNUNET_DISK_PIPE_END_WRITE)); |
1624 | 1730 | ||
1625 | act = GNUNET_new (struct ANASTASIS_ReduxAction); | 1731 | red_cls->read_task = GNUNET_SCHEDULER_add_read_file ( |
1626 | /* Callback is called immediately, cleanup must never be called */ | 1732 | GNUNET_TIME_UNIT_FOREVER_REL, |
1627 | act->cleanup = &dummy_cleanup; | 1733 | GNUNET_DISK_pipe_handle ( |
1734 | red_cls->reducer_stdout, | ||
1735 | GNUNET_DISK_PIPE_END_READ), | ||
1736 | external_reducer_read_cb, | ||
1737 | red_cls); | ||
1628 | 1738 | ||
1629 | { | 1739 | { |
1630 | struct ExternalReduxCls *sched_cls = GNUNET_new (struct ExternalReduxCls); | 1740 | struct ANASTASIS_ReduxAction *ra = GNUNET_new (struct |
1631 | 1741 | ANASTASIS_ReduxAction); | |
1632 | sched_cls->cb = cb; | 1742 | ra->cleanup_cls = red_cls; |
1633 | sched_cls->cb_cls = cb_cls; | 1743 | ra->cleanup = cleanup_external_reducer; |
1634 | sched_cls->new_state = next_state; | 1744 | return ra; |
1635 | GNUNET_SCHEDULER_add_now (external_redux_done, | ||
1636 | sched_cls); | ||
1637 | } | 1745 | } |
1638 | |||
1639 | return act; | ||
1640 | } | 1746 | } |
1641 | 1747 | ||
1642 | 1748 | ||
@@ -1699,16 +1805,18 @@ ANASTASIS_redux_action (const json_t *state, | |||
1699 | const char *s = json_string_value (json_object_get (state, | 1805 | const char *s = json_string_value (json_object_get (state, |
1700 | "backup_state")); | 1806 | "backup_state")); |
1701 | enum ANASTASIS_GenericState gs; | 1807 | enum ANASTASIS_GenericState gs; |
1702 | const char *ext_reducer = getenv ("ANASTASIS_EXTERNAL_REDUCER"); | ||
1703 | 1808 | ||
1704 | /* If requested, handle action with external reducer, used for testing. */ | 1809 | /* If requested, handle action with external reducer, used for testing. */ |
1705 | if (NULL != ext_reducer) | 1810 | { |
1706 | return redux_action_external (ext_reducer, | 1811 | const char *ext_reducer = ANASTASIS_REDUX_probe_external_reducer (); |
1707 | state, | 1812 | if (NULL != ext_reducer) |
1708 | action, | 1813 | return redux_action_external (ext_reducer, |
1709 | arguments, | 1814 | state, |
1710 | cb, | 1815 | action, |
1711 | cb_cls); | 1816 | arguments, |
1817 | cb, | ||
1818 | cb_cls); | ||
1819 | } | ||
1712 | 1820 | ||
1713 | if (NULL == s) | 1821 | if (NULL == s) |
1714 | { | 1822 | { |