diff options
author | Florian Dold <florian@dold.me> | 2021-10-06 10:54:04 +0200 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2021-10-06 10:54:04 +0200 |
commit | f5a6b377fb13f46b073a892410237c54eceee4ed (patch) | |
tree | 87565625b7d6d90f6dfff3452e7d1bea6f16fc6e /src | |
parent | be5e8be888f4e2ab033c12bbb37b1eec2478aa37 (diff) | |
download | anastasis-f5a6b377fb13f46b073a892410237c54eceee4ed.tar.gz anastasis-f5a6b377fb13f46b073a892410237c54eceee4ed.tar.bz2 anastasis-f5a6b377fb13f46b073a892410237c54eceee4ed.zip |
async external reducer, external reducer for start state
Diffstat (limited to 'src')
-rw-r--r-- | src/reducer/anastasis_api_backup_redux.c | 62 | ||||
-rw-r--r-- | src/reducer/anastasis_api_recovery_redux.c | 62 | ||||
-rw-r--r-- | src/reducer/anastasis_api_redux.c | 354 | ||||
-rw-r--r-- | src/reducer/anastasis_api_redux.h | 10 |
4 files changed, 365 insertions, 123 deletions
diff --git a/src/reducer/anastasis_api_backup_redux.c b/src/reducer/anastasis_api_backup_redux.c index 27b5730..cb3bd5a 100644 --- a/src/reducer/anastasis_api_backup_redux.c +++ b/src/reducer/anastasis_api_backup_redux.c @@ -177,6 +177,68 @@ json_t * ANASTASIS_backup_start (const struct GNUNET_CONFIGURATION_Handle *cfg) { json_t *initial_state; + const char *external_reducer = ANASTASIS_REDUX_probe_external_reducer (); + + if (NULL != external_reducer) + { + int pipefd_stdout[2]; + pid_t pid = 0; + int status; + FILE *reducer_stdout; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Using external reducer '%s' for backup start status\n", + external_reducer); + + GNUNET_assert (0 == pipe (pipefd_stdout)); + pid = fork (); + if (pid == 0) + { + close (pipefd_stdout[0]); + dup2 (pipefd_stdout[1], STDOUT_FILENO); + execlp (external_reducer, + external_reducer, + "-b", + NULL); + GNUNET_assert (0); + } + + close (pipefd_stdout[1]); + reducer_stdout = fdopen (pipefd_stdout[0], + "r"); + { + json_error_t err; + + initial_state = json_loadf (reducer_stdout, + 0, + &err); + + if (NULL == initial_state) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "External reducer did not output valid JSON: %s:%d:%d %s\n", + err.source, + err.line, + err.column, + err.text); + GNUNET_assert (0 == fclose (reducer_stdout)); + waitpid (pid, &status, 0); + return NULL; + } + } + + GNUNET_assert (NULL != initial_state); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Waiting for external reducer to terminate.\n"); + GNUNET_assert (0 == fclose (reducer_stdout)); + reducer_stdout = NULL; + waitpid (pid, &status, 0); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "External reducer finished with exit status '%d'\n", + status); + return initial_state; + } (void) cfg; initial_state = ANASTASIS_REDUX_load_continents_ (); diff --git a/src/reducer/anastasis_api_recovery_redux.c b/src/reducer/anastasis_api_recovery_redux.c index 59f5ae3..94e5e10 100644 --- a/src/reducer/anastasis_api_recovery_redux.c +++ b/src/reducer/anastasis_api_recovery_redux.c @@ -83,6 +83,68 @@ json_t * ANASTASIS_recovery_start (const struct GNUNET_CONFIGURATION_Handle *cfg) { json_t *initial_state; + const char *external_reducer = ANASTASIS_REDUX_probe_external_reducer (); + + if (NULL != external_reducer) + { + int pipefd_stdout[2]; + pid_t pid = 0; + int status; + FILE *reducer_stdout; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Using external reducer '%s' for recovery start status\n", + external_reducer); + + GNUNET_assert (0 == pipe (pipefd_stdout)); + pid = fork (); + if (pid == 0) + { + close (pipefd_stdout[0]); + dup2 (pipefd_stdout[1], STDOUT_FILENO); + execlp (external_reducer, + external_reducer, + "-r", + NULL); + GNUNET_assert (0); + } + + close (pipefd_stdout[1]); + reducer_stdout = fdopen (pipefd_stdout[0], + "r"); + { + json_error_t err; + + initial_state = json_loadf (reducer_stdout, + 0, + &err); + + if (NULL == initial_state) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "External reducer did not output valid JSON: %s:%d:%d %s\n", + err.source, + err.line, + err.column, + err.text); + GNUNET_assert (0 == fclose (reducer_stdout)); + waitpid (pid, &status, 0); + return NULL; + } + } + + GNUNET_assert (NULL != initial_state); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Waiting for external reducer to terminate.\n"); + GNUNET_assert (0 == fclose (reducer_stdout)); + reducer_stdout = NULL; + waitpid (pid, &status, 0); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "External reducer finished with exit status '%d'\n", + status); + return initial_state; + } (void) cfg; initial_state = ANASTASIS_REDUX_load_continents_ (); diff --git a/src/reducer/anastasis_api_redux.c b/src/reducer/anastasis_api_redux.c index 92801f4..59770f3 100644 --- a/src/reducer/anastasis_api_redux.c +++ b/src/reducer/anastasis_api_redux.c @@ -234,6 +234,26 @@ static json_t *redux_countries; */ static json_t *provider_list; +/** + * External reducer binary or NULL + * to use internal reducer. + */ +static char *external_reducer_binary; + + +const char * +ANASTASIS_REDUX_probe_external_reducer (void) +{ + if (NULL != external_reducer_binary) + return external_reducer_binary; + external_reducer_binary = getenv ("ANASTASIS_EXTERNAL_REDUCER"); + if (NULL != external_reducer_binary) + unsetenv ("ANASTASIS_EXTERNAL_REDUCER"); + + return external_reducer_binary; + +} + /** * Extract the mode of a state from json @@ -1480,38 +1500,145 @@ typedef struct ANASTASIS_ReduxAction * ANASTASIS_ActionCallback cb, void *cb_cls); + /** - * Dummy cleanup function. + * Closure for read operations on the external reducer. */ -static void -dummy_cleanup (void *cls) +struct ExternalReducerCls { - GNUNET_assert (0); -} - + struct GNUNET_Buffer read_buffer; + struct GNUNET_SCHEDULER_Task *read_task; + struct GNUNET_DISK_PipeHandle *reducer_stdin; + struct GNUNET_DISK_PipeHandle *reducer_stdout; + struct GNUNET_OS_Process *reducer_process; + ANASTASIS_ActionCallback action_cb; + void *action_cb_cls; +}; /** - * Closure for external_redux_done. + * Clean up and destroy the external reducer state. + * + * @param cls closure, a 'struct ExternalReducerCls *' */ -struct ExternalReduxCls +static void +cleanup_external_reducer (void *cls) { - ANASTASIS_ActionCallback cb; - void *cb_cls; - json_t *new_state; -}; + struct ExternalReducerCls *red_cls = cls; + + if (NULL != red_cls->read_task) + { + GNUNET_SCHEDULER_cancel (red_cls->read_task); + red_cls->read_task = NULL; + } + + GNUNET_buffer_clear (&red_cls->read_buffer); + if (NULL != red_cls->reducer_stdin) + { + GNUNET_DISK_pipe_close (red_cls->reducer_stdin); + red_cls->reducer_stdin = NULL; + } + if (NULL != red_cls->reducer_stdout) + { + GNUNET_DISK_pipe_close (red_cls->reducer_stdout); + red_cls->reducer_stdout = NULL; + } + + if (NULL != red_cls->reducer_process) + { + enum GNUNET_OS_ProcessStatusType type; + unsigned long code; + enum GNUNET_GenericReturnValue pwret; + + pwret = GNUNET_OS_process_wait_status (red_cls->reducer_process, + &type, + &code); + + GNUNET_assert (GNUNET_SYSERR != pwret); + if (GNUNET_NO == pwret) + { + GNUNET_OS_process_kill (red_cls->reducer_process, + SIGTERM); + GNUNET_assert (GNUNET_SYSERR != GNUNET_OS_process_wait ( + red_cls->reducer_process)); + } + + GNUNET_OS_process_destroy (red_cls->reducer_process); + red_cls->reducer_process = NULL; + } + + GNUNET_free (red_cls); +} /** - * Callback called when the redux action has been processed by - * the external reducer. + * Task called when + * + * @param cls closure, a 'struct ExternalReducerCls *' */ static void -external_redux_done (void *cls) +external_reducer_read_cb (void *cls) { - struct ExternalReduxCls *erc = cls; - erc->cb (erc->cb_cls, - TALER_EC_NONE, - erc->new_state); + struct ExternalReducerCls *red_cls = cls; + ssize_t sret; + char buf[256]; + + red_cls->read_task = NULL; + + sret = GNUNET_DISK_file_read (GNUNET_DISK_pipe_handle ( + red_cls->reducer_stdout, + GNUNET_DISK_PIPE_END_READ), + buf, + 256); + if (sret < 0) + { + GNUNET_break (0); + red_cls->action_cb (red_cls->action_cb_cls, + TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR, + NULL); + cleanup_external_reducer (red_cls); + return; + } + else if (0 == sret) + { + char *str = GNUNET_buffer_reap_str (&red_cls->read_buffer); + json_t *json; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got external reducer response: '%s'\n", + str); + + json = json_loads (str, 0, NULL); + + if (NULL == json) + { + GNUNET_break (0); + red_cls->action_cb (red_cls->action_cb_cls, + TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR, + NULL); + cleanup_external_reducer (red_cls); + return; + } + + red_cls->action_cb (red_cls->action_cb_cls, + TALER_EC_NONE, + json); + cleanup_external_reducer (red_cls); + return; + } + else + { + GNUNET_buffer_write (&red_cls->read_buffer, + buf, + sret); + + red_cls->read_task = GNUNET_SCHEDULER_add_read_file ( + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_DISK_pipe_handle ( + red_cls->reducer_stdout, + GNUNET_DISK_PIPE_END_READ), + external_reducer_read_cb, + red_cls); + } } @@ -1527,116 +1654,95 @@ redux_action_external (const char *ext_reducer, ANASTASIS_ActionCallback cb, void *cb_cls) { - struct ANASTASIS_ReduxAction *act = NULL; - int pipefd_stdout[2]; - int pipefd_stdin[2]; - pid_t pid = 0; - int status; - FILE *reducer_stdout; - FILE *reducer_stdin; - json_t *next_state; + char *arg_str; + char *state_str = json_dumps (state, JSON_COMPACT); + ssize_t sret; + struct ExternalReducerCls *red_cls = GNUNET_new (struct ExternalReducerCls); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Using external reducer '%s'\n", - ext_reducer); - - GNUNET_assert (0 == pipe (pipefd_stdout)); - GNUNET_assert (0 == pipe (pipefd_stdin)); - pid = fork (); - if (pid == 0) - { - /* Child */ - - char *arg_str = json_dumps (arguments, JSON_COMPACT); - - close (pipefd_stdout[0]); - dup2 (pipefd_stdout[1], STDOUT_FILENO); - - close (pipefd_stdin[1]); - dup2 (pipefd_stdin[0], STDIN_FILENO); - - /* Unset environment variable, otherwise anastasis-reducer - would recursively shell out to itself. */ - unsetenv ("ANASTASIS_EXTERNAL_REDUCER"); - - execlp (ext_reducer, - ext_reducer, - "-a", - arg_str, - action, - NULL); - GNUNET_assert (0); - } - - /* Only parent reaches here */ - - close (pipefd_stdout[1]); - close (pipefd_stdin[0]); + if (NULL == arguments) + arg_str = GNUNET_strdup ("{}"); + else + arg_str = json_dumps (arguments, JSON_COMPACT); - reducer_stdout = fdopen (pipefd_stdout[0], - "r"); - reducer_stdin = fdopen (pipefd_stdin[1], - "w"); + red_cls->action_cb = cb; + red_cls->action_cb_cls = cb_cls; - GNUNET_assert (0 == json_dumpf (state, - reducer_stdin, - JSON_COMPACT)); + GNUNET_assert (NULL != (red_cls->reducer_stdin = GNUNET_DISK_pipe ( + GNUNET_DISK_PF_NONE))); + GNUNET_assert (NULL != (red_cls->reducer_stdout = GNUNET_DISK_pipe ( + GNUNET_DISK_PF_NONE))); - GNUNET_assert (0 == fclose (reducer_stdin)); - reducer_stdin = NULL; + /* By the time we're here, this variable should be unset, because + otherwise using anastasis-reducer as the external reducer + will lead to infinite recursion. */ + GNUNET_assert (NULL == getenv ("ANASTASIS_EXTERNAL_REDUCER")); GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Wrote old state to reducer stdin.\n"); - + "Starting external reducer with action '%s' and argument '%s'\n", + action, + arg_str); + + red_cls->reducer_process = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ERR, + red_cls->reducer_stdin, + red_cls->reducer_stdout, + NULL, + ext_reducer, + ext_reducer, + "-a", + arg_str, + action, + NULL); + + GNUNET_free (arg_str); + + if (NULL == red_cls->reducer_process) { - json_error_t err; - - next_state = json_loadf (reducer_stdout, - 0, - &err); - - if (NULL == next_state) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "External reducer did not output valid JSON: %s:%d:%d %s\n", - err.source, - err.line, - err.column, - err.text); - GNUNET_assert (0 == fclose (reducer_stdout)); - waitpid (pid, &status, 0); - return NULL; - } + GNUNET_break (0); + GNUNET_free (state_str); + cleanup_external_reducer (red_cls); + return NULL; } - /* FIXME: report error instead! */ - GNUNET_assert (NULL != next_state); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Waiting for external reducer to terminate.\n"); - GNUNET_assert (0 == fclose (reducer_stdout)); - reducer_stdout = NULL; - waitpid (pid, &status, 0); + /* Close pipe ends we don't use. */ + GNUNET_assert (GNUNET_OK == + GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin, + GNUNET_DISK_PIPE_END_READ)); + GNUNET_assert (GNUNET_OK == + GNUNET_DISK_pipe_close_end (red_cls->reducer_stdout, + GNUNET_DISK_PIPE_END_WRITE)); + + sret = GNUNET_DISK_file_write_blocking (GNUNET_DISK_pipe_handle ( + red_cls->reducer_stdin, + GNUNET_DISK_PIPE_END_WRITE), + state_str, + strlen (state_str)); + GNUNET_free (state_str); + if (sret <= 0) + { + GNUNET_break (0); + cleanup_external_reducer (red_cls); + return NULL; + } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "External reducer finished with exit status '%d'\n", - status); + GNUNET_assert (GNUNET_OK == + GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin, + GNUNET_DISK_PIPE_END_WRITE)); - act = GNUNET_new (struct ANASTASIS_ReduxAction); - /* Callback is called immediately, cleanup must never be called */ - act->cleanup = &dummy_cleanup; + red_cls->read_task = GNUNET_SCHEDULER_add_read_file ( + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_DISK_pipe_handle ( + red_cls->reducer_stdout, + GNUNET_DISK_PIPE_END_READ), + external_reducer_read_cb, + red_cls); { - struct ExternalReduxCls *sched_cls = GNUNET_new (struct ExternalReduxCls); - - sched_cls->cb = cb; - sched_cls->cb_cls = cb_cls; - sched_cls->new_state = next_state; - GNUNET_SCHEDULER_add_now (external_redux_done, - sched_cls); + struct ANASTASIS_ReduxAction *ra = GNUNET_new (struct + ANASTASIS_ReduxAction); + ra->cleanup_cls = red_cls; + ra->cleanup = cleanup_external_reducer; + return ra; } - - return act; } @@ -1699,16 +1805,18 @@ ANASTASIS_redux_action (const json_t *state, const char *s = json_string_value (json_object_get (state, "backup_state")); enum ANASTASIS_GenericState gs; - const char *ext_reducer = getenv ("ANASTASIS_EXTERNAL_REDUCER"); /* If requested, handle action with external reducer, used for testing. */ - if (NULL != ext_reducer) - return redux_action_external (ext_reducer, - state, - action, - arguments, - cb, - cb_cls); + { + const char *ext_reducer = ANASTASIS_REDUX_probe_external_reducer (); + if (NULL != ext_reducer) + return redux_action_external (ext_reducer, + state, + action, + arguments, + cb, + cb_cls); + } if (NULL == s) { diff --git a/src/reducer/anastasis_api_redux.h b/src/reducer/anastasis_api_redux.h index 4d62d5e..b4fe5c4 100644 --- a/src/reducer/anastasis_api_redux.h +++ b/src/reducer/anastasis_api_redux.h @@ -328,6 +328,16 @@ ANASTASIS_backup_action_ (json_t *state, /** + * Check if an external reducer binary is requested. + * Cache the result and unset the corresponding environment + * variable. + * + * @returns name of the external reducer or NULL to user internal reducer + */ +const char * +ANASTASIS_REDUX_probe_external_reducer (void); + +/** * Generic container for an action with asynchronous activities. */ struct ANASTASIS_ReduxAction |