summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/exchange/taler-exchange-httpd.c2
-rw-r--r--src/exchange/taler-exchange-httpd_parsing.c202
-rw-r--r--src/exchange/test_taler_exchange_aggregator.c385
3 files changed, 388 insertions, 201 deletions
diff --git a/src/exchange/taler-exchange-httpd.c b/src/exchange/taler-exchange-httpd.c
index 2ef58b0f7..93636b00c 100644
--- a/src/exchange/taler-exchange-httpd.c
+++ b/src/exchange/taler-exchange-httpd.c
@@ -121,7 +121,7 @@ handle_mhd_completion_callback (void *cls,
/**
- * Handle a request coming from libmicrohttpd.
+ * Handle incoming HTTP request.
*
* @param cls closure for MHD daemon (unused)
* @param connection the connection
diff --git a/src/exchange/taler-exchange-httpd_parsing.c b/src/exchange/taler-exchange-httpd_parsing.c
index f6e367813..beac81a1c 100644
--- a/src/exchange/taler-exchange-httpd_parsing.c
+++ b/src/exchange/taler-exchange-httpd_parsing.c
@@ -24,122 +24,18 @@
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
+#include <gnunet/gnunet_json_lib.h>
#include "taler_json_lib.h"
#include "taler-exchange-httpd_parsing.h"
#include "taler-exchange-httpd_responses.h"
/**
- * Initial size for POST request buffer.
- */
-#define REQUEST_BUFFER_INITIAL (2*1024)
-
-/**
* Maximum POST request size.
*/
#define REQUEST_BUFFER_MAX (1024*1024)
-/**
- * Buffer for POST requests.
- */
-struct Buffer
-{
- /**
- * Allocated memory
- */
- char *data;
-
- /**
- * Number of valid bytes in buffer.
- */
- size_t fill;
-
- /**
- * Number of allocated bytes in buffer.
- */
- size_t alloc;
-};
-
-
-/**
- * Initialize a buffer.
- *
- * @param buf the buffer to initialize
- * @param data the initial data
- * @param data_size size of the initial data
- * @param alloc_size size of the buffer
- * @param max_size maximum size that the buffer can grow to
- * @return a GNUnet result code
- */
-static int
-buffer_init (struct Buffer *buf,
- const void *data,
- size_t data_size,
- size_t alloc_size,
- size_t max_size)
-{
- if (data_size > max_size || alloc_size > max_size)
- return GNUNET_SYSERR;
- if (data_size > alloc_size)
- alloc_size = data_size;
- buf->data = GNUNET_malloc (alloc_size);
- memcpy (buf->data, data, data_size);
- return GNUNET_OK;
-}
-
-
-/**
- * Free the data in a buffer. Does *not* free
- * the buffer object itself.
- *
- * @param buf buffer to de-initialize
- */
-static void
-buffer_deinit (struct Buffer *buf)
-{
- GNUNET_free (buf->data);
- buf->data = NULL;
-}
-
-
-/**
- * Append data to a buffer, growing the buffer if necessary.
- *
- * @param buf the buffer to append to
- * @param data the data to append
- * @param data_size the size of @a data
- * @param max_size maximum size that the buffer can grow to
- * @return #GNUNET_OK on success,
- * #GNUNET_NO if the buffer can't accomodate for the new data
- */
-static int
-buffer_append (struct Buffer *buf,
- const void *data,
- size_t data_size,
- size_t max_size)
-{
- if (buf->fill + data_size > max_size)
- return GNUNET_NO;
- if (data_size + buf->fill > buf->alloc)
- {
- char *new_buf;
- size_t new_size = buf->alloc;
- while (new_size < buf->fill + data_size)
- new_size += 2;
- if (new_size > max_size)
- return GNUNET_NO;
- new_buf = GNUNET_malloc (new_size);
- memcpy (new_buf, buf->data, buf->fill);
- GNUNET_free (buf->data);
- buf->data = new_buf;
- buf->alloc = new_size;
- }
- memcpy (buf->data + buf->fill, data, data_size);
- buf->fill += data_size;
- return GNUNET_OK;
-}
-
/**
* Process a POST request containing a JSON object. This function
@@ -171,75 +67,37 @@ TMH_PARSE_post_json (struct MHD_Connection *connection,
size_t *upload_data_size,
json_t **json)
{
- struct Buffer *r = *con_cls;
-
- *json = NULL;
- if (NULL == *con_cls)
- {
- /* We are seeing a fresh POST request. */
- r = GNUNET_new (struct Buffer);
- if (GNUNET_OK !=
- buffer_init (r,
- upload_data,
- *upload_data_size,
- REQUEST_BUFFER_INITIAL,
- REQUEST_BUFFER_MAX))
- {
- *con_cls = NULL;
- buffer_deinit (r);
- GNUNET_free (r);
- return (MHD_NO ==
- TMH_RESPONSE_reply_internal_error (connection,
- "out of memory"))
- ? GNUNET_SYSERR : GNUNET_NO;
- }
- /* everything OK, wait for more POST data */
- *upload_data_size = 0;
- *con_cls = r;
- return GNUNET_YES;
- }
- if (0 != *upload_data_size)
+ enum GNUNET_JSON_PostResult pr;
+
+ pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX,
+ con_cls,
+ upload_data,
+ upload_data_size,
+ json);
+ switch (pr)
{
- /* We are seeing an old request with more data available. */
-
- if (GNUNET_OK !=
- buffer_append (r,
- upload_data,
- *upload_data_size,
- REQUEST_BUFFER_MAX))
- {
- /* Request too long */
- *con_cls = NULL;
- buffer_deinit (r);
- GNUNET_free (r);
- return (MHD_NO ==
- TMH_RESPONSE_reply_request_too_large (connection))
- ? GNUNET_SYSERR : GNUNET_NO;
- }
- /* everything OK, wait for more POST data */
- *upload_data_size = 0;
+ case GNUNET_JSON_PR_OUT_OF_MEMORY:
+ return (MHD_NO ==
+ TMH_RESPONSE_reply_internal_error (connection,
+ "out of memory"))
+ ? GNUNET_SYSERR : GNUNET_NO;
+ case GNUNET_JSON_PR_CONTINUE:
return GNUNET_YES;
- }
-
- /* We have seen the whole request. */
-
- *json = json_loadb (r->data,
- r->fill,
- 0,
- NULL);
- if (NULL == *json)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to parse JSON request body\n");
+ case GNUNET_JSON_PR_REQUEST_TOO_LARGE:
+ return (MHD_NO ==
+ TMH_RESPONSE_reply_request_too_large (connection))
+ ? GNUNET_SYSERR : GNUNET_NO;
+ case GNUNET_JSON_PR_JSON_INVALID:
return (MHD_YES ==
TMH_RESPONSE_reply_invalid_json (connection))
? GNUNET_NO : GNUNET_SYSERR;
+ case GNUNET_JSON_PR_SUCCESS:
+ GNUNET_break (NULL != *json);
+ return GNUNET_YES;
}
- buffer_deinit (r);
- GNUNET_free (r);
- *con_cls = NULL;
-
- return GNUNET_YES;
+ /* this should never happen */
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
}
@@ -253,13 +111,7 @@ TMH_PARSE_post_json (struct MHD_Connection *connection,
void
TMH_PARSE_post_cleanup_callback (void *con_cls)
{
- struct Buffer *r = con_cls;
-
- if (NULL != r)
- {
- buffer_deinit (r);
- GNUNET_free (r);
- }
+ GNUNET_JSON_post_parser_cleanup (con_cls);
}
diff --git a/src/exchange/test_taler_exchange_aggregator.c b/src/exchange/test_taler_exchange_aggregator.c
index d4821bbd1..f239461f4 100644
--- a/src/exchange/test_taler_exchange_aggregator.c
+++ b/src/exchange/test_taler_exchange_aggregator.c
@@ -22,6 +22,71 @@
#include "platform.h"
#include "taler_util.h"
#include "taler_exchangedb_plugin.h"
+#include <microhttpd.h>
+
+/**
+ * Commands for the interpreter.
+ */
+enum OpCode {
+
+ /**
+ * Terminate testcase with 'skipped' result.
+ */
+ OPCODE_TERMINATE_SKIP,
+
+ /**
+ * Run taler-exchange-aggregator.
+ */
+ OPCODE_RUN_AGGREGATOR,
+
+ /**
+ * Finish testcase with success.
+ */
+ OPCODE_TERMINATE_SUCCESS
+};
+
+/**
+ * Command state for the interpreter.
+ */
+struct Command
+{
+
+ enum OpCode opcode;
+
+};
+
+
+/**
+ * State of the interpreter.
+ */
+struct State
+{
+ /**
+ * Array of commands to run.
+ */
+ struct Command* commands;
+
+ /**
+ * Offset of the next command to be run.
+ */
+ unsigned int ioff;
+};
+
+
+/**
+ * Pipe used to communicate child death via signal.
+ */
+static struct GNUNET_DISK_PipeHandle *sigpipe;
+
+/**
+ * ID of task called whenever we get a SIGCHILD.
+ */
+static struct GNUNET_SCHEDULER_Task *child_death_task;
+
+/**
+ * ID of task called whenever are shutting down.
+ */
+static struct GNUNET_SCHEDULER_Task *shutdown_task;
/**
* Return value from main().
@@ -33,26 +98,257 @@ static int result;
*/
static char *config_filename;
+/**
+ * Database plugin.
+ */
+static struct TALER_EXCHANGEDB_Plugin *plugin;
+
+/**
+ * Our session with the database.
+ */
+static struct TALER_EXCHANGEDB_Session *session;
/**
- * Runs the aggregator process.
+ * The handle for the aggregator process that we are testing.
+ */
+static struct GNUNET_OS_Process *aggregator_proc;
+
+/**
+ * State of our interpreter while we are running the aggregator
+ * process.
+ */
+static struct State *aggregator_state;
+
+/**
+ * HTTP server we run to pretend to be the "test" bank.
+ */
+static struct MHD_Daemon *mhd_bank;
+
+/**
+ * Task running HTTP server for the "test" bank.
+ */
+static struct GNUNET_SCHEDULER_Task *mhd_task;
+
+
+/**
+ * Interprets the commands from the test program.
+ *
+ * @param cls the `struct State` of the interpreter
+ * @param tc scheduler context
*/
static void
-run_aggregator ()
+interpreter (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Task triggered whenever we are to shutdown.
+ *
+ * @param cls closure, NULL if we need to self-restart
+ * @param tc context
+ */
+static void
+shutdown_action (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct GNUNET_OS_Process *proc;
-
- proc = GNUNET_OS_start_process (GNUNET_NO,
- GNUNET_OS_INHERIT_STD_ALL,
- NULL, NULL, NULL,
- "taler-exchange-aggregator",
- "taler-exchange-aggregator",
- /* "-c", config_filename, */
- "-d", "test-exchange-home",
- "-t", /* enable temporary tables */
- NULL);
- GNUNET_OS_process_wait (proc);
- GNUNET_OS_process_destroy (proc);
+ shutdown_task = NULL;
+ if (NULL != mhd_task)
+ {
+ GNUNET_SCHEDULER_cancel (mhd_task);
+ mhd_task = NULL;
+ }
+ if (NULL != mhd_bank)
+ {
+ MHD_stop_daemon (mhd_bank);
+ mhd_bank = NULL;
+ }
+ if (NULL == aggregator_proc)
+ {
+ GNUNET_SCHEDULER_cancel (child_death_task);
+ child_death_task = NULL;
+ }
+ else
+ {
+ GNUNET_break (0 == GNUNET_OS_process_kill (aggregator_proc,
+ SIGKILL));
+ }
+ plugin->drop_temporary (plugin->cls,
+ session);
+ TALER_EXCHANGEDB_plugin_unload (plugin);
+ plugin = NULL;
+}
+
+
+/**
+ * Task triggered whenever we receive a SIGCHLD (child
+ * process died).
+ *
+ * @param cls closure, NULL if we need to self-restart
+ * @param tc context
+ */
+static void
+maint_child_death (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ const struct GNUNET_DISK_FileHandle *pr;
+ char c[16];
+ struct State *state;
+
+ child_death_task = NULL;
+ pr = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
+ if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
+ {
+ /* shutdown scheduled us, ignore! */
+ child_death_task =
+ GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+ pr,
+ &maint_child_death,
+ NULL);
+ return;
+ }
+ GNUNET_break (0 < GNUNET_DISK_file_read (pr, &c, sizeof (c)));
+ GNUNET_OS_process_wait (aggregator_proc);
+ GNUNET_OS_process_destroy (aggregator_proc);
+ aggregator_proc = NULL;
+ aggregator_state->ioff++;
+ state = aggregator_state;
+ aggregator_state = NULL;
+ interpreter (state, NULL);
+ if (NULL == shutdown_task)
+ return;
+ child_death_task = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+ pr,
+ &maint_child_death, NULL);
+
+}
+
+
+/**
+ * Interprets the commands from the test program.
+ *
+ * @param cls the `struct State` of the interpreter
+ * @param tc scheduler context
+ */
+static void
+interpreter (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct State *state = cls;
+ struct Command *cmd = &state->commands[state->ioff];
+
+ switch (cmd->opcode)
+ {
+ case OPCODE_TERMINATE_SKIP:
+ /* return skip: test not finished, but did not fail either */
+ result = 77;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case OPCODE_RUN_AGGREGATOR:
+ GNUNET_assert (NULL == aggregator_state);
+ aggregator_state = state;
+ aggregator_proc
+ = GNUNET_OS_start_process (GNUNET_NO,
+ GNUNET_OS_INHERIT_STD_ALL,
+ NULL, NULL, NULL,
+ "taler-exchange-aggregator",
+ "taler-exchange-aggregator",
+ /* "-c", config_filename, */
+ "-d", "test-exchange-home",
+ "-t", /* enable temporary tables */
+ NULL);
+ return;
+ case OPCODE_TERMINATE_SUCCESS:
+ result = 0;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+}
+
+
+/**
+ * Contains the test program. Here each step of the testcase
+ * is defined.
+ */
+static void
+run_test ()
+{
+ static struct Command commands[] = {
+ /* FIXME: prime DB */
+ {
+ .opcode = OPCODE_RUN_AGGREGATOR
+ },
+ {
+ .opcode = OPCODE_TERMINATE_SKIP
+ }
+ };
+ static struct State state = {
+ .commands = commands
+ };
+
+ GNUNET_SCHEDULER_add_now (&interpreter,
+ &state);
+}
+
+
+/**
+ * Function called whenever MHD is done with a request. If the
+ * request was a POST, we may have stored a `struct Buffer *` in the
+ * @a con_cls that might still need to be cleaned up. Call the
+ * respective function to free the memory.
+ *
+ * @param cls client-defined closure
+ * @param connection connection handle
+ * @param con_cls value as set by the last call to
+ * the #MHD_AccessHandlerCallback
+ * @param toe reason for request termination
+ * @see #MHD_OPTION_NOTIFY_COMPLETED
+ * @ingroup request
+ */
+static void
+handle_mhd_completion_callback (void *cls,
+ struct MHD_Connection *connection,
+ void **con_cls,
+ enum MHD_RequestTerminationCode toe)
+{
+ TMH_PARSE_post_cleanup_callback (*con_cls);
+ *con_cls = NULL;
+}
+
+
+/**
+ * Handle incoming HTTP request.
+ *
+ * @param cls closure for MHD daemon (unused)
+ * @param connection the connection
+ * @param url the requested url
+ * @param method the method (POST, GET, ...)
+ * @param version HTTP version (ignored)
+ * @param upload_data request data
+ * @param upload_data_size size of @a upload_data in bytes
+ * @param con_cls closure for request (a `struct Buffer *`)
+ * @return MHD result code
+ */
+static int
+handle_mhd_request (void *cls,
+ struct MHD_Connection *connection,
+ const char *url,
+ const char *method,
+ const char *version,
+ const char *upload_data,
+ size_t *upload_data_size,
+ void **con_cls)
+{
+ if (0 != strcasecmp (url,
+ "/admin/add/incoming"))
+ {
+ /* Unexpected URI path, just close the connection. */
+ /* we're rather impolite here, but it's a testcase. */
+ GNUNET_break_op (0);
+ return MHD_NO;
+ }
+ /* FIXME: to be implemented! */
+ GNUNET_break (0);
+ return MHD_NO;
}
@@ -67,8 +363,6 @@ run (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_CONFIGURATION_Handle *cfg = cls;
- struct TALER_EXCHANGEDB_Plugin *plugin;
- struct TALER_EXCHANGEDB_Session *session;
plugin = TALER_EXCHANGEDB_plugin_load (cfg);
if (GNUNET_OK !=
@@ -81,15 +375,48 @@ run (void *cls,
}
session = plugin->get_session (plugin->cls,
GNUNET_YES);
- /* FIXME: prime DB */
- /* FIXME: launch bank on 8082! */
- run_aggregator ();
- /* FIXME: check DB and bank */
+ GNUNET_assert (NULL != session);
+ child_death_task =
+ GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_DISK_pipe_handle (sigpipe,
+ GNUNET_DISK_PIPE_END_READ),
+ &maint_child_death, NULL);
+ shutdown_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ &shutdown_action,
+ NULL);
+ result = 1; /* test failed for undefined reason */
+ mhd_bank = MHD_start_daemon (MHD_USE_SELECT_INTERNALLY | MHD_USE_DEBUG,
+ 8082,
+ NULL, NULL,
+ &handle_mhd_request, NULL,
+ MHD_OPTION_NOTIFY_COMPLETED, &handle_mhd_completion_callback, NULL,
+ MHD_OPTION_END);
+ if (NULL == mhd_bank)
+ {
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ mhd_task = FIXME;
+ run_test ();
+}
- plugin->drop_temporary (plugin->cls,
- session);
- TALER_EXCHANGEDB_plugin_unload (plugin);
- result = 77; /* skip: not finished */
+
+/**
+ * Signal handler called for SIGCHLD. Triggers the
+ * respective handler by writing to the trigger pipe.
+ */
+static void
+sighandler_child_death ()
+{
+ static char c;
+ int old_errno = errno; /* back-up errno */
+
+ GNUNET_break (1 ==
+ GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
+ (sigpipe, GNUNET_DISK_PIPE_END_WRITE),
+ &c, sizeof (c)));
+ errno = old_errno; /* restore errno */
}
@@ -100,6 +427,7 @@ main (int argc,
const char *plugin_name;
char *testname;
struct GNUNET_CONFIGURATION_Handle *cfg;
+ struct GNUNET_SIGNAL_Context *shc_chld;
result = -1;
if (NULL == (plugin_name = strrchr (argv[0], (int) '-')))
@@ -122,7 +450,14 @@ main (int argc,
GNUNET_free (testname);
return 2;
}
+ sigpipe = GNUNET_DISK_pipe (GNUNET_NO, GNUNET_NO, GNUNET_NO, GNUNET_NO);
+ GNUNET_assert (NULL != sigpipe);
+ shc_chld =
+ GNUNET_SIGNAL_handler_install (GNUNET_SIGCHLD, &sighandler_child_death);
GNUNET_SCHEDULER_run (&run, cfg);
+ GNUNET_SIGNAL_handler_uninstall (shc_chld);
+ shc_chld = NULL;
+ GNUNET_DISK_pipe_close (sigpipe);
GNUNET_CONFIGURATION_destroy (cfg);
GNUNET_free (config_filename);
GNUNET_free (testname);