summaryrefslogtreecommitdiff
path: root/src/reducer/anastasis_api_redux.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/reducer/anastasis_api_redux.c')
-rw-r--r--src/reducer/anastasis_api_redux.c354
1 files changed, 231 insertions, 123 deletions
diff --git a/src/reducer/anastasis_api_redux.c b/src/reducer/anastasis_api_redux.c
index 92801f4..59770f3 100644
--- a/src/reducer/anastasis_api_redux.c
+++ b/src/reducer/anastasis_api_redux.c
@@ -234,6 +234,26 @@ static json_t *redux_countries;
*/
static json_t *provider_list;
+/**
+ * External reducer binary or NULL
+ * to use internal reducer.
+ */
+static char *external_reducer_binary;
+
+
+const char *
+ANASTASIS_REDUX_probe_external_reducer (void)
+{
+ if (NULL != external_reducer_binary)
+ return external_reducer_binary;
+ external_reducer_binary = getenv ("ANASTASIS_EXTERNAL_REDUCER");
+ if (NULL != external_reducer_binary)
+ unsetenv ("ANASTASIS_EXTERNAL_REDUCER");
+
+ return external_reducer_binary;
+
+}
+
/**
* Extract the mode of a state from json
@@ -1480,38 +1500,145 @@ typedef struct ANASTASIS_ReduxAction *
ANASTASIS_ActionCallback cb,
void *cb_cls);
+
/**
- * Dummy cleanup function.
+ * Closure for read operations on the external reducer.
*/
-static void
-dummy_cleanup (void *cls)
+struct ExternalReducerCls
{
- GNUNET_assert (0);
-}
-
+ struct GNUNET_Buffer read_buffer;
+ struct GNUNET_SCHEDULER_Task *read_task;
+ struct GNUNET_DISK_PipeHandle *reducer_stdin;
+ struct GNUNET_DISK_PipeHandle *reducer_stdout;
+ struct GNUNET_OS_Process *reducer_process;
+ ANASTASIS_ActionCallback action_cb;
+ void *action_cb_cls;
+};
/**
- * Closure for external_redux_done.
+ * Clean up and destroy the external reducer state.
+ *
+ * @param cls closure, a 'struct ExternalReducerCls *'
*/
-struct ExternalReduxCls
+static void
+cleanup_external_reducer (void *cls)
{
- ANASTASIS_ActionCallback cb;
- void *cb_cls;
- json_t *new_state;
-};
+ struct ExternalReducerCls *red_cls = cls;
+
+ if (NULL != red_cls->read_task)
+ {
+ GNUNET_SCHEDULER_cancel (red_cls->read_task);
+ red_cls->read_task = NULL;
+ }
+
+ GNUNET_buffer_clear (&red_cls->read_buffer);
+ if (NULL != red_cls->reducer_stdin)
+ {
+ GNUNET_DISK_pipe_close (red_cls->reducer_stdin);
+ red_cls->reducer_stdin = NULL;
+ }
+ if (NULL != red_cls->reducer_stdout)
+ {
+ GNUNET_DISK_pipe_close (red_cls->reducer_stdout);
+ red_cls->reducer_stdout = NULL;
+ }
+
+ if (NULL != red_cls->reducer_process)
+ {
+ enum GNUNET_OS_ProcessStatusType type;
+ unsigned long code;
+ enum GNUNET_GenericReturnValue pwret;
+
+ pwret = GNUNET_OS_process_wait_status (red_cls->reducer_process,
+ &type,
+ &code);
+
+ GNUNET_assert (GNUNET_SYSERR != pwret);
+ if (GNUNET_NO == pwret)
+ {
+ GNUNET_OS_process_kill (red_cls->reducer_process,
+ SIGTERM);
+ GNUNET_assert (GNUNET_SYSERR != GNUNET_OS_process_wait (
+ red_cls->reducer_process));
+ }
+
+ GNUNET_OS_process_destroy (red_cls->reducer_process);
+ red_cls->reducer_process = NULL;
+ }
+
+ GNUNET_free (red_cls);
+}
/**
- * Callback called when the redux action has been processed by
- * the external reducer.
+ * Task called when
+ *
+ * @param cls closure, a 'struct ExternalReducerCls *'
*/
static void
-external_redux_done (void *cls)
+external_reducer_read_cb (void *cls)
{
- struct ExternalReduxCls *erc = cls;
- erc->cb (erc->cb_cls,
- TALER_EC_NONE,
- erc->new_state);
+ struct ExternalReducerCls *red_cls = cls;
+ ssize_t sret;
+ char buf[256];
+
+ red_cls->read_task = NULL;
+
+ sret = GNUNET_DISK_file_read (GNUNET_DISK_pipe_handle (
+ red_cls->reducer_stdout,
+ GNUNET_DISK_PIPE_END_READ),
+ buf,
+ 256);
+ if (sret < 0)
+ {
+ GNUNET_break (0);
+ red_cls->action_cb (red_cls->action_cb_cls,
+ TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR,
+ NULL);
+ cleanup_external_reducer (red_cls);
+ return;
+ }
+ else if (0 == sret)
+ {
+ char *str = GNUNET_buffer_reap_str (&red_cls->read_buffer);
+ json_t *json;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got external reducer response: '%s'\n",
+ str);
+
+ json = json_loads (str, 0, NULL);
+
+ if (NULL == json)
+ {
+ GNUNET_break (0);
+ red_cls->action_cb (red_cls->action_cb_cls,
+ TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR,
+ NULL);
+ cleanup_external_reducer (red_cls);
+ return;
+ }
+
+ red_cls->action_cb (red_cls->action_cb_cls,
+ TALER_EC_NONE,
+ json);
+ cleanup_external_reducer (red_cls);
+ return;
+ }
+ else
+ {
+ GNUNET_buffer_write (&red_cls->read_buffer,
+ buf,
+ sret);
+
+ red_cls->read_task = GNUNET_SCHEDULER_add_read_file (
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_DISK_pipe_handle (
+ red_cls->reducer_stdout,
+ GNUNET_DISK_PIPE_END_READ),
+ external_reducer_read_cb,
+ red_cls);
+ }
}
@@ -1527,116 +1654,95 @@ redux_action_external (const char *ext_reducer,
ANASTASIS_ActionCallback cb,
void *cb_cls)
{
- struct ANASTASIS_ReduxAction *act = NULL;
- int pipefd_stdout[2];
- int pipefd_stdin[2];
- pid_t pid = 0;
- int status;
- FILE *reducer_stdout;
- FILE *reducer_stdin;
- json_t *next_state;
+ char *arg_str;
+ char *state_str = json_dumps (state, JSON_COMPACT);
+ ssize_t sret;
+ struct ExternalReducerCls *red_cls = GNUNET_new (struct ExternalReducerCls);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Using external reducer '%s'\n",
- ext_reducer);
-
- GNUNET_assert (0 == pipe (pipefd_stdout));
- GNUNET_assert (0 == pipe (pipefd_stdin));
- pid = fork ();
- if (pid == 0)
- {
- /* Child */
-
- char *arg_str = json_dumps (arguments, JSON_COMPACT);
-
- close (pipefd_stdout[0]);
- dup2 (pipefd_stdout[1], STDOUT_FILENO);
-
- close (pipefd_stdin[1]);
- dup2 (pipefd_stdin[0], STDIN_FILENO);
-
- /* Unset environment variable, otherwise anastasis-reducer
- would recursively shell out to itself. */
- unsetenv ("ANASTASIS_EXTERNAL_REDUCER");
-
- execlp (ext_reducer,
- ext_reducer,
- "-a",
- arg_str,
- action,
- NULL);
- GNUNET_assert (0);
- }
-
- /* Only parent reaches here */
-
- close (pipefd_stdout[1]);
- close (pipefd_stdin[0]);
+ if (NULL == arguments)
+ arg_str = GNUNET_strdup ("{}");
+ else
+ arg_str = json_dumps (arguments, JSON_COMPACT);
- reducer_stdout = fdopen (pipefd_stdout[0],
- "r");
- reducer_stdin = fdopen (pipefd_stdin[1],
- "w");
+ red_cls->action_cb = cb;
+ red_cls->action_cb_cls = cb_cls;
- GNUNET_assert (0 == json_dumpf (state,
- reducer_stdin,
- JSON_COMPACT));
+ GNUNET_assert (NULL != (red_cls->reducer_stdin = GNUNET_DISK_pipe (
+ GNUNET_DISK_PF_NONE)));
+ GNUNET_assert (NULL != (red_cls->reducer_stdout = GNUNET_DISK_pipe (
+ GNUNET_DISK_PF_NONE)));
- GNUNET_assert (0 == fclose (reducer_stdin));
- reducer_stdin = NULL;
+ /* By the time we're here, this variable should be unset, because
+ otherwise using anastasis-reducer as the external reducer
+ will lead to infinite recursion. */
+ GNUNET_assert (NULL == getenv ("ANASTASIS_EXTERNAL_REDUCER"));
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Wrote old state to reducer stdin.\n");
-
+ "Starting external reducer with action '%s' and argument '%s'\n",
+ action,
+ arg_str);
+
+ red_cls->reducer_process = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ERR,
+ red_cls->reducer_stdin,
+ red_cls->reducer_stdout,
+ NULL,
+ ext_reducer,
+ ext_reducer,
+ "-a",
+ arg_str,
+ action,
+ NULL);
+
+ GNUNET_free (arg_str);
+
+ if (NULL == red_cls->reducer_process)
{
- json_error_t err;
-
- next_state = json_loadf (reducer_stdout,
- 0,
- &err);
-
- if (NULL == next_state)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "External reducer did not output valid JSON: %s:%d:%d %s\n",
- err.source,
- err.line,
- err.column,
- err.text);
- GNUNET_assert (0 == fclose (reducer_stdout));
- waitpid (pid, &status, 0);
- return NULL;
- }
+ GNUNET_break (0);
+ GNUNET_free (state_str);
+ cleanup_external_reducer (red_cls);
+ return NULL;
}
- /* FIXME: report error instead! */
- GNUNET_assert (NULL != next_state);
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Waiting for external reducer to terminate.\n");
- GNUNET_assert (0 == fclose (reducer_stdout));
- reducer_stdout = NULL;
- waitpid (pid, &status, 0);
+ /* Close pipe ends we don't use. */
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin,
+ GNUNET_DISK_PIPE_END_READ));
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_DISK_pipe_close_end (red_cls->reducer_stdout,
+ GNUNET_DISK_PIPE_END_WRITE));
+
+ sret = GNUNET_DISK_file_write_blocking (GNUNET_DISK_pipe_handle (
+ red_cls->reducer_stdin,
+ GNUNET_DISK_PIPE_END_WRITE),
+ state_str,
+ strlen (state_str));
+ GNUNET_free (state_str);
+ if (sret <= 0)
+ {
+ GNUNET_break (0);
+ cleanup_external_reducer (red_cls);
+ return NULL;
+ }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "External reducer finished with exit status '%d'\n",
- status);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin,
+ GNUNET_DISK_PIPE_END_WRITE));
- act = GNUNET_new (struct ANASTASIS_ReduxAction);
- /* Callback is called immediately, cleanup must never be called */
- act->cleanup = &dummy_cleanup;
+ red_cls->read_task = GNUNET_SCHEDULER_add_read_file (
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_DISK_pipe_handle (
+ red_cls->reducer_stdout,
+ GNUNET_DISK_PIPE_END_READ),
+ external_reducer_read_cb,
+ red_cls);
{
- struct ExternalReduxCls *sched_cls = GNUNET_new (struct ExternalReduxCls);
-
- sched_cls->cb = cb;
- sched_cls->cb_cls = cb_cls;
- sched_cls->new_state = next_state;
- GNUNET_SCHEDULER_add_now (external_redux_done,
- sched_cls);
+ struct ANASTASIS_ReduxAction *ra = GNUNET_new (struct
+ ANASTASIS_ReduxAction);
+ ra->cleanup_cls = red_cls;
+ ra->cleanup = cleanup_external_reducer;
+ return ra;
}
-
- return act;
}
@@ -1699,16 +1805,18 @@ ANASTASIS_redux_action (const json_t *state,
const char *s = json_string_value (json_object_get (state,
"backup_state"));
enum ANASTASIS_GenericState gs;
- const char *ext_reducer = getenv ("ANASTASIS_EXTERNAL_REDUCER");
/* If requested, handle action with external reducer, used for testing. */
- if (NULL != ext_reducer)
- return redux_action_external (ext_reducer,
- state,
- action,
- arguments,
- cb,
- cb_cls);
+ {
+ const char *ext_reducer = ANASTASIS_REDUX_probe_external_reducer ();
+ if (NULL != ext_reducer)
+ return redux_action_external (ext_reducer,
+ state,
+ action,
+ arguments,
+ cb,
+ cb_cls);
+ }
if (NULL == s)
{