gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit 64aefd7b6fb27b8625af12783201f3c87da41f47
parent 3d7d23db1764973179fe9fc0013b942692c47df5
Author: Martin Schanzenbach <schanzen@gnunet.org>
Date:   Thu, 20 Oct 2022 17:01:48 +0900

ZONEMASTER: Use parallel worker thread for GNS block signing

Diffstat:
Msrc/gnsrecord/gnsrecord_crypto.c | 233++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
Msrc/include/gnunet_gnsrecord_lib.h | 40+++++++++++++++++++++++++++++++++++++++-
Msrc/zonemaster/gnunet-service-zonemaster.c | 311++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
Msrc/zonemaster/zonemaster.conf.in | 1+
4 files changed, 472 insertions(+), 113 deletions(-)

diff --git a/src/gnsrecord/gnsrecord_crypto.c b/src/gnsrecord/gnsrecord_crypto.c @@ -95,7 +95,8 @@ eddsa_symmetric_decrypt ( if (ctlen < 0) return GNUNET_SYSERR; if (0 != crypto_secretbox_open_detached (result, - ((unsigned char*) block) + crypto_secretbox_MACBYTES, // Ciphertext + ((unsigned char*) block) + + crypto_secretbox_MACBYTES, // Ciphertext block, // Tag ctlen, nonce, key)) @@ -193,6 +194,116 @@ block_get_size_ecdsa (const struct GNUNET_GNSRECORD_Data *rd, return len; } +enum GNUNET_GenericReturnValue +block_sign_ecdsa (const struct + GNUNET_CRYPTO_EcdsaPrivateKey *key, + const struct + GNUNET_CRYPTO_EcdsaPublicKey *pkey, + const char *label, + struct GNUNET_GNSRECORD_Block *block) +{ + struct GNRBlockPS *gnr_block; + struct GNUNET_GNSRECORD_EcdsaBlock *ecblock; + size_t size = ntohl (block->size) - sizeof (*block) + sizeof (*gnr_block); + + gnr_block = GNUNET_malloc (size); + ecblock = &(block)->ecdsa_block; + gnr_block->purpose.size = htonl (size); + gnr_block->purpose.purpose = + htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN); + gnr_block->expiration_time = ecblock->expiration_time; + /* encrypt and sign */ + GNUNET_memcpy (&gnr_block[1], &ecblock[1], + size - sizeof (*gnr_block)); + GNUNET_CRYPTO_ecdsa_public_key_derive (pkey, + label, + "gns", + &ecblock->derived_key); + if (GNUNET_OK != + GNUNET_CRYPTO_ecdsa_sign_derived (key, + label, + "gns", + &gnr_block->purpose, + &ecblock->signature)) + { + GNUNET_break (0); + GNUNET_free (gnr_block); + return GNUNET_SYSERR; + } + GNUNET_free (gnr_block); + return GNUNET_OK; +} + + +enum GNUNET_GenericReturnValue +block_sign_eddsa (const struct + GNUNET_CRYPTO_EddsaPrivateKey *key, + const struct + GNUNET_CRYPTO_EddsaPublicKey *pkey, + const char *label, + struct GNUNET_GNSRECORD_Block *block) +{ + struct GNRBlockPS *gnr_block; + struct GNUNET_GNSRECORD_EddsaBlock *edblock; + size_t size = ntohl (block->size) - sizeof (*block) + sizeof (*gnr_block); + gnr_block = GNUNET_malloc (size); + edblock = &(block)->eddsa_block; + gnr_block->purpose.size = htonl (size); + gnr_block->purpose.purpose = + htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN); + gnr_block->expiration_time = edblock->expiration_time; + GNUNET_memcpy (&gnr_block[1], &edblock[1], + size - sizeof (*gnr_block)); + /* encrypt and sign */ + GNUNET_CRYPTO_eddsa_public_key_derive (pkey, + label, + "gns", + &edblock->derived_key); + GNUNET_CRYPTO_eddsa_sign_derived (key, + label, + "gns", + &gnr_block->purpose, + &edblock->signature); + GNUNET_free (gnr_block); + return GNUNET_OK; +} + + +enum GNUNET_GenericReturnValue +GNUNET_GNSRECORD_block_sign (const struct + GNUNET_IDENTITY_PrivateKey *key, + const char *label, + struct GNUNET_GNSRECORD_Block *block) +{ + struct GNUNET_IDENTITY_PublicKey pkey; + enum GNUNET_GenericReturnValue res = GNUNET_SYSERR; + char *norm_label; + + GNUNET_IDENTITY_key_get_public (key, + &pkey); + norm_label = GNUNET_GNSRECORD_string_normalize (label); + + switch (ntohl (key->type)) + { + case GNUNET_GNSRECORD_TYPE_PKEY: + res = block_sign_ecdsa (&key->ecdsa_key, + &pkey.ecdsa_key, + norm_label, + block); + break; + case GNUNET_GNSRECORD_TYPE_EDKEY: + res = block_sign_eddsa (&key->eddsa_key, + &pkey.eddsa_key, + norm_label, + block); + break; + default: + GNUNET_assert (0); + } + GNUNET_free (norm_label); + return res; +} + /** * Sign name and records @@ -204,6 +315,7 @@ block_get_size_ecdsa (const struct GNUNET_GNSRECORD_Data *rd, * @param rd record data * @param rd_count number of records * @param block the block result. Must be allocated sufficiently. + * @param sign sign the block GNUNET_NO if block will be signed later. * @return GNUNET_SYSERR on error (otherwise GNUNET_OK) */ static enum GNUNET_GenericReturnValue @@ -213,12 +325,12 @@ block_create_ecdsa (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key, const char *label, const struct GNUNET_GNSRECORD_Data *rd, unsigned int rd_count, - struct GNUNET_GNSRECORD_Block **block) + struct GNUNET_GNSRECORD_Block **block, + int sign) { ssize_t payload_len = GNUNET_GNSRECORD_records_get_size (rd_count, rd); struct GNUNET_GNSRECORD_EcdsaBlock *ecblock; - struct GNRBlockPS *gnr_block; unsigned char ctr[GNUNET_CRYPTO_AES_KEY_LENGTH / 2]; unsigned char skey[GNUNET_CRYPTO_AES_KEY_LENGTH]; struct GNUNET_GNSRECORD_Data rdc[GNUNET_NZL (rd_count)]; @@ -251,7 +363,7 @@ block_create_ecdsa (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key, } /* serialize */ *block = GNUNET_malloc (sizeof (struct GNUNET_GNSRECORD_Block) + payload_len); - (*block)->size = htonl(sizeof (struct GNUNET_GNSRECORD_Block) + payload_len); + (*block)->size = htonl (sizeof (struct GNUNET_GNSRECORD_Block) + payload_len); { char payload[payload_len]; @@ -260,19 +372,9 @@ block_create_ecdsa (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key, rdc, payload_len, payload)); - gnr_block = GNUNET_malloc (sizeof (struct GNRBlockPS) + payload_len); ecblock = &(*block)->ecdsa_block; (*block)->type = htonl (GNUNET_GNSRECORD_TYPE_PKEY); - gnr_block->purpose.size = htonl (sizeof(struct GNRBlockPS) + payload_len); - gnr_block->purpose.purpose = - htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN); - gnr_block->expiration_time = GNUNET_TIME_absolute_hton (expire); - ecblock->expiration_time = gnr_block->expiration_time; - /* encrypt and sign */ - GNUNET_CRYPTO_ecdsa_public_key_derive (pkey, - label, - "gns", - &ecblock->derived_key); + ecblock->expiration_time = GNUNET_TIME_absolute_hton (expire); GNR_derive_block_aes_key (ctr, skey, label, @@ -284,21 +386,16 @@ block_create_ecdsa (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key, skey, ctr, &ecblock[1])); - GNUNET_memcpy (&gnr_block[1], &ecblock[1], payload_len); } + if (GNUNET_YES != sign) + return GNUNET_OK; if (GNUNET_OK != - GNUNET_CRYPTO_ecdsa_sign_derived (key, - label, - "gns", - &gnr_block->purpose, - &ecblock->signature)) + block_sign_ecdsa (key, pkey, label, *block)) { GNUNET_break (0); GNUNET_free (*block); - GNUNET_free (gnr_block); return GNUNET_SYSERR; } - GNUNET_free (gnr_block); return GNUNET_OK; } @@ -327,6 +424,7 @@ block_get_size_eddsa (const struct GNUNET_GNSRECORD_Data *rd, * @param rd record data * @param rd_count number of records * @param block where to store the block. Must be allocated sufficiently. + * @param sign GNUNET_YES if block shall be signed as well * @return GNUNET_SYSERR on error (otherwise GNUNET_OK) */ enum GNUNET_GenericReturnValue @@ -336,12 +434,12 @@ block_create_eddsa (const struct GNUNET_CRYPTO_EddsaPrivateKey *key, const char *label, const struct GNUNET_GNSRECORD_Data *rd, unsigned int rd_count, - struct GNUNET_GNSRECORD_Block **block) + struct GNUNET_GNSRECORD_Block **block, + int sign) { ssize_t payload_len = GNUNET_GNSRECORD_records_get_size (rd_count, rd); struct GNUNET_GNSRECORD_EddsaBlock *edblock; - struct GNRBlockPS *gnr_block; unsigned char nonce[crypto_secretbox_NONCEBYTES]; unsigned char skey[crypto_secretbox_KEYBYTES]; struct GNUNET_GNSRECORD_Data rdc[GNUNET_NZL (rd_count)]; @@ -375,8 +473,8 @@ block_create_eddsa (const struct GNUNET_CRYPTO_EddsaPrivateKey *key, /* serialize */ *block = GNUNET_malloc (sizeof (struct GNUNET_GNSRECORD_Block) + payload_len + crypto_secretbox_MACBYTES); - (*block)->size = htonl(sizeof (struct GNUNET_GNSRECORD_Block) - + payload_len + crypto_secretbox_MACBYTES); + (*block)->size = htonl (sizeof (struct GNUNET_GNSRECORD_Block) + + payload_len + crypto_secretbox_MACBYTES); { char payload[payload_len]; @@ -385,24 +483,9 @@ block_create_eddsa (const struct GNUNET_CRYPTO_EddsaPrivateKey *key, rdc, payload_len, payload)); - gnr_block = GNUNET_malloc (sizeof (struct GNRBlockPS) - + payload_len - + crypto_secretbox_MACBYTES); edblock = &(*block)->eddsa_block; (*block)->type = htonl (GNUNET_GNSRECORD_TYPE_EDKEY); - gnr_block->purpose.size = - htonl (sizeof(struct GNRBlockPS) - + payload_len - + crypto_secretbox_MACBYTES); - gnr_block->purpose.purpose = - htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN); - gnr_block->expiration_time = GNUNET_TIME_absolute_hton (expire); - edblock->expiration_time = gnr_block->expiration_time; - /* encrypt and sign */ - GNUNET_CRYPTO_eddsa_public_key_derive (pkey, - label, - "gns", - &edblock->derived_key); + edblock->expiration_time = GNUNET_TIME_absolute_hton (expire); GNR_derive_block_xsalsa_key (nonce, skey, label, @@ -414,14 +497,9 @@ block_create_eddsa (const struct GNUNET_CRYPTO_EddsaPrivateKey *key, skey, nonce, &edblock[1])); - GNUNET_memcpy (&gnr_block[1], &edblock[1], - payload_len + crypto_secretbox_MACBYTES); - - GNUNET_CRYPTO_eddsa_sign_derived (key, - label, - "gns", - &gnr_block->purpose, - &edblock->signature); + if (GNUNET_YES != sign) + return GNUNET_OK; + block_sign_eddsa (key, pkey, label, *block); } return GNUNET_OK; } @@ -477,7 +555,8 @@ GNUNET_GNSRECORD_block_create (const struct GNUNET_IDENTITY_PrivateKey *key, norm_label, rd, rd_count, - result); + result, + GNUNET_YES); break; case GNUNET_GNSRECORD_TYPE_EDKEY: res = block_create_eddsa (&key->eddsa_key, @@ -486,7 +565,8 @@ GNUNET_GNSRECORD_block_create (const struct GNUNET_IDENTITY_PrivateKey *key, norm_label, rd, rd_count, - result); + result, + GNUNET_YES); break; default: GNUNET_assert (0); @@ -513,13 +593,14 @@ struct KeyCacheLine }; -enum GNUNET_GenericReturnValue -GNUNET_GNSRECORD_block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey, - struct GNUNET_TIME_Absolute expire, - const char *label, - const struct GNUNET_GNSRECORD_Data *rd, - unsigned int rd_count, - struct GNUNET_GNSRECORD_Block **result) +static enum GNUNET_GenericReturnValue +block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey, + struct GNUNET_TIME_Absolute expire, + const char *label, + const struct GNUNET_GNSRECORD_Data *rd, + unsigned int rd_count, + struct GNUNET_GNSRECORD_Block **result, + int sign) { const struct GNUNET_CRYPTO_EcdsaPrivateKey *key; struct GNUNET_CRYPTO_EddsaPublicKey edpubkey; @@ -552,7 +633,8 @@ GNUNET_GNSRECORD_block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey, norm_label, rd, rd_count, - result); + result, + sign); } else if (GNUNET_IDENTITY_TYPE_EDDSA == ntohl (pkey->type)) { @@ -564,13 +646,40 @@ GNUNET_GNSRECORD_block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey, norm_label, rd, rd_count, - result); + result, + sign); } GNUNET_free (norm_label); return res; } + +enum GNUNET_GenericReturnValue +GNUNET_GNSRECORD_block_create_unsigned (const struct + GNUNET_IDENTITY_PrivateKey *pkey, + struct GNUNET_TIME_Absolute expire, + const char *label, + const struct GNUNET_GNSRECORD_Data *rd, + unsigned int rd_count, + struct GNUNET_GNSRECORD_Block **result) +{ + return block_create2 (pkey, expire, label, rd, rd_count, result, GNUNET_NO); +} + + + +enum GNUNET_GenericReturnValue +GNUNET_GNSRECORD_block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey, + struct GNUNET_TIME_Absolute expire, + const char *label, + const struct GNUNET_GNSRECORD_Data *rd, + unsigned int rd_count, + struct GNUNET_GNSRECORD_Block **result) +{ + return block_create2 (pkey, expire, label, rd, rd_count, result, GNUNET_YES); +} + /** * Check if a signature is valid. This API is used by the GNS Block * to validate signatures received from the network. diff --git a/src/include/gnunet_gnsrecord_lib.h b/src/include/gnunet_gnsrecord_lib.h @@ -150,7 +150,7 @@ enum GNUNET_GNSRECORD_Filter * Filter public records. * FIXME: Not implemented */ - //GNUNET_NAMESTORE_FILTER_OMIT_PUBLIC = 4, + // GNUNET_NAMESTORE_FILTER_OMIT_PUBLIC = 4, }; @@ -554,6 +554,19 @@ GNUNET_GNSRECORD_block_calculate_size (const struct const struct GNUNET_GNSRECORD_Data *rd, unsigned int rd_count); +/** + * Sign a block create with #GNUNET_GNSRECORD_block_create_unsigned + * + * @param key the private key + * @param label the label of the block + * @param block the unsigned block + * @return GNUNET_OK on success + */ +enum GNUNET_GenericReturnValue +GNUNET_GNSRECORD_block_sign (const struct + GNUNET_IDENTITY_PrivateKey *key, + const char *label, + struct GNUNET_GNSRECORD_Block *block); /** * Sign name and records @@ -576,6 +589,31 @@ GNUNET_GNSRECORD_block_create (const struct GNUNET_IDENTITY_PrivateKey *key, /** + * Create name and records but do not sign! + * Sign later with #GNUNET_GNSRECORD_block_sign(). + * Cache derived public key (also keeps the + * private key in static memory, so do not use this function if + * keeping the private key in the process'es RAM is a major issue). + * + * @param key the private key + * @param expire block expiration + * @param label the name for the records + * @param rd record data + * @param rd_count number of records in @a rd + * @param result the block buffer. Will be allocated. + * @return GNUNET_OK on success. + */ +enum GNUNET_GenericReturnValue +GNUNET_GNSRECORD_block_create_unsigned (const struct + GNUNET_IDENTITY_PrivateKey *key, + struct GNUNET_TIME_Absolute expire, + const char *label, + const struct GNUNET_GNSRECORD_Data *rd, + unsigned int rd_count, + struct GNUNET_GNSRECORD_Block **result); + + +/** * Sign name and records, cache derived public key (also keeps the * private key in static memory, so do not use this function if * keeping the private key in the process'es RAM is a major issue). diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c @@ -97,6 +97,81 @@ #define DHT_GNS_REPLICATION_LEVEL 5 /** + * Our workers + */ +static pthread_t * worker; + +/** + * Lock for the open jobs queue. + */ +static pthread_mutex_t jobs_lock; + +/** + * Lock for the finished results queue. + */ +static pthread_mutex_t results_lock; + +/** + * For threads to know we are shutting down + */ +static int in_shutdown = GNUNET_NO; + +/** + * Our notification pipe + */ +static struct GNUNET_DISK_PipeHandle *notification_pipe; + +/** + * Pipe read task + */ +static struct GNUNET_SCHEDULER_Task *pipe_read_task; + +struct OpenSignJob +{ + + struct OpenSignJob *next; + + struct OpenSignJob *prev; + + struct GNUNET_IDENTITY_PrivateKey zone; + + struct GNUNET_GNSRECORD_Block *block; + + struct GNUNET_GNSRECORD_Block *block_priv; + + struct DhtPutActivity *ma; + + size_t block_size; + + struct GNUNET_TIME_Absolute expire_pub; + + char *label; + +}; + + +/** + * DLL + */ +static struct OpenSignJob *jobs_head; + +/** + * DLL + */ +static struct OpenSignJob *jobs_tail; + +/** + * DLL + */ +static struct OpenSignJob *results_head; + +/** + * DLL + */ +static struct OpenSignJob *results_tail; + + +/** * Handle for DHT PUT activity triggered from the namestore monitor. */ struct DhtPutActivity @@ -319,8 +394,13 @@ shutdown_task (void *cls) struct CacheOperation *cop; (void) cls; + in_shutdown == GNUNET_YES; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutting down!\n"); + if (NULL != notification_pipe) + GNUNET_DISK_pipe_close (notification_pipe); + if (NULL != pipe_read_task) + GNUNET_SCHEDULER_cancel (pipe_read_task); while (NULL != (cop = cop_head)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -332,7 +412,8 @@ shutdown_task (void *cls) while (NULL != (ma = it_head)) { - GNUNET_DHT_put_cancel (ma->ph); + if (NULL != ma->ph) + GNUNET_DHT_put_cancel (ma->ph); dht_queue_length--; GNUNET_CONTAINER_DLL_remove (it_head, it_tail, @@ -682,6 +763,16 @@ dht_put_continuation (void *cls) GNUNET_free (ma); } +static void +free_job (struct OpenSignJob *job) +{ + if (job->block != job->block_priv) + GNUNET_free (job->block_priv); + GNUNET_free (job->block); + if (NULL != job->label) + GNUNET_free (job->label); + GNUNET_free (job); +} /** @@ -760,35 +851,86 @@ perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey *key, else block_priv = block; block_size = GNUNET_GNSRECORD_block_get_size (block); - GNUNET_GNSRECORD_query_from_private_key (key, - label, + GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); + struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); + job->block = GNUNET_malloc (block_size); // FIXME this does not need to be copied, can be freed by worker + memcpy (job->block, block, block_size); + job->block_size = block_size; + job->block_priv = block_priv; + job->zone = *key; + job->ma = ma; + job->label = GNUNET_strdup (label); + job->expire_pub = expire_pub; + GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); + GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Storing %u record(s) for label `%s' in DHT with expiration `%s'\n", + rd_public_count, + label, + GNUNET_STRINGS_absolute_time_to_string (expire)); + num_public_records++; +} + +static void +notification_pipe_cb (void *cls); + +static void +initiate_put_from_pipe_trigger (void *cls) +{ + struct GNUNET_HashCode query; + struct OpenSignJob *job; + + pipe_read_task = NULL; + GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); + job = results_head; + if (NULL == job) + { + GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); + const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle ( + notification_pipe, + GNUNET_DISK_PIPE_END_READ); + pipe_read_task = + GNUNET_SCHEDULER_add_read_file ( + GNUNET_TIME_UNIT_FOREVER_REL, + np_fh, + notification_pipe_cb, + NULL); + return; + } + GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job); + GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); + GNUNET_GNSRECORD_query_from_private_key (&job->zone, + job->label, &query); GNUNET_STATISTICS_update (statistics, "DHT put operations initiated", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Storing %u record(s) for label `%s' in DHT with expiration `%s' under key %s\n", - rd_public_count, - label, - GNUNET_STRINGS_absolute_time_to_string (expire), + "Storing record(s) for label `%s' in DHT under key %s\n", + job->label, GNUNET_h2s (&query)); - num_public_records++; - ret = GNUNET_DHT_put (dht_handle, - &query, - DHT_GNS_REPLICATION_LEVEL, - GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, - GNUNET_BLOCK_TYPE_GNS_NAMERECORD, - block_size, - block, - expire_pub, - &dht_put_continuation, - ma); - refresh_block (block_priv); - if (block != block_priv) - GNUNET_free (block_priv); - GNUNET_free (block); - return ret; + job->ma->ph = GNUNET_DHT_put (dht_handle, + &query, + DHT_GNS_REPLICATION_LEVEL, + GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, + GNUNET_BLOCK_TYPE_GNS_NAMERECORD, + job->block_size, + job->block, + job->expire_pub, + &dht_put_continuation, + job->ma); + if (NULL == job->ma->ph) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Could not perform DHT PUT, is the DHT running?\n"); + GNUNET_free (job->ma); + free_job (job); + return; + } + refresh_block (job->block_priv); + free_job (job); + return; } @@ -907,45 +1049,29 @@ put_gns_record (void *cls, /* We got a set of records to publish */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting DHT PUT\n"); - - ma = GNUNET_new (struct DhtPutActivity); - ma->start_date = GNUNET_TIME_absolute_get (); - ma->ph = perform_dht_put (key, - label, - rd, - rd_count, - expire, - ma); put_cnt++; if (0 == put_cnt % DELTA_INTERVAL) update_velocity (DELTA_INTERVAL); check_zone_namestore_next (); - if (NULL == ma->ph) + if (dht_queue_length >= DHT_QUEUE_LIMIT) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Could not perform DHT PUT, is the DHT running?\n"); - GNUNET_free (ma); + "DHT PUT queue length exceeded (%u), aborting PUT\n", + DHT_QUEUE_LIMIT); return; } + + ma = GNUNET_new (struct DhtPutActivity); + perform_dht_put (key, + label, + rd, + rd_count, + expire, + ma); dht_queue_length++; GNUNET_CONTAINER_DLL_insert_tail (it_head, it_tail, ma); - if (dht_queue_length > DHT_QUEUE_LIMIT) - { - ma = it_head; - GNUNET_CONTAINER_DLL_remove (it_head, - it_tail, - ma); - GNUNET_DHT_put_cancel (ma->ph); - dht_queue_length--; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "DHT PUT unconfirmed after %s, aborting PUT\n", - GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_duration (ma->start_date), - GNUNET_YES)); - GNUNET_free (ma); - } } /** @@ -1075,9 +1201,18 @@ perform_dht_put_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, else block_priv = block; block_size = GNUNET_GNSRECORD_block_get_size (block); + GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); + struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); + job->block = GNUNET_malloc (block_size); // FIXME this does not need to be copied, can be freed by worker + memcpy (job->block, block, block_size); + job->zone = *key; + job->label = GNUNET_strdup (label); + GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); + GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); GNUNET_GNSRECORD_query_from_private_key (key, label, &query); + GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); GNUNET_STATISTICS_update (statistics, "DHT put operations initiated", 1, @@ -1196,6 +1331,48 @@ handle_monitor_error (void *cls) GNUNET_NO); } +static void* +sign_worker (void *) +{ + struct OpenSignJob *job; + const struct GNUNET_DISK_FileHandle *fh; + + fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE); + while (GNUNET_YES != in_shutdown) + { + GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); + if (NULL != jobs_head) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Taking on Job for %s\n", jobs_head->label); + job = jobs_head; + GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job); + } + GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); + if (NULL != job) + { + GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); + GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job); + GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); + job = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Done, notifying main thread throug pipe!\n"); + GNUNET_DISK_file_write (fh, "!", 1); + } + else { + sleep (1); + } + } + return NULL; +} + +static void +notification_pipe_cb (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received wake up notification through pipe, checking results\n"); + GNUNET_SCHEDULER_add_now (&initiate_put_from_pipe_trigger, NULL); +} /** * Perform zonemaster duties: watch namestore, publish records. @@ -1305,6 +1482,40 @@ run (void *cls, GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); + + notification_pipe = GNUNET_DISK_pipe (GNUNET_DISK_PF_NONE); + const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle ( + notification_pipe, + GNUNET_DISK_PIPE_END_READ); + pipe_read_task = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, + np_fh, + notification_pipe_cb, NULL); + + long long unsigned int worker_count = 1; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_number (c, + "zonemaster", + "WORKER_COUNT", + &worker_count)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Number of workers not defined falling back to 1\n"); + } + worker = GNUNET_malloc (sizeof (pthread_t) * worker_count); + /** Start worker */ + for (int i = 0; i < worker_count; i++) + { + if (0 != + pthread_create (&worker[i], + NULL, + &sign_worker, + NULL)) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, + "pthread_create"); + GNUNET_SCHEDULER_shutdown (); + } + } } diff --git a/src/zonemaster/zonemaster.conf.in b/src/zonemaster/zonemaster.conf.in @@ -6,6 +6,7 @@ HOSTNAME = localhost BINARY = gnunet-service-zonemaster UNIXPATH = $GNUNET_USER_RUNTIME_DIR/gnunet-service-zonemaster.sock @JAVAPORT@PORT = 2123 +WORKER_COUNT = 10 # Do we require users that want to access GNS to run this process # (usually not a good idea)