gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

gnunet-service-namestore.c (85111B)


      1 /*
      2      This file is part of GNUnet.
      3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
      4 
      5      GNUnet is free software: you can redistribute it and/or modify it
      6      under the terms of the GNU Affero General Public License as published
      7      by the Free Software Foundation, either version 3 of the License,
      8      or (at your option) any later version.
      9 
     10      GNUnet is distributed in the hope that it will be useful, but
     11      WITHOUT ANY WARRANTY; without even the implied warranty of
     12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     13      Affero General Public License for more details.
     14 
     15      You should have received a copy of the GNU Affero General Public License
     16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 
     18      SPDX-License-Identifier: AGPL3.0-or-later
     19  */
     20 
     21 /**
     22  * @file namestore/gnunet-service-namestore.c
     23  * @brief namestore for the GNUnet naming system
     24  * @author Matthias Wachs
     25  * @author Christian Grothoff
     26  */
     27 #include "gnunet_error_codes.h"
     28 #include "gnunet_gnsrecord_lib.h"
     29 #include "gnunet_protocols.h"
     30 #include "gnunet_util_lib.h"
     31 #include "gnunet_namestore_plugin.h"
     32 #include "gnunet_statistics_service.h"
     33 #include "namestore.h"
     34 
     35 #define LOG_STRERROR_FILE(kind, syscall, filename) \
     36         GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
     37 
     38 /**
     39  * If a monitor takes more than 1 minute to process an event, print a warning.
     40  */
     41 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
     42 
     43 /**
     44  * Size of the cache used by #get_nick_record()
     45  */
     46 #define NC_SIZE 16
     47 
     48 /**
     49  * A namestore client
     50  */
     51 struct NamestoreClient;
     52 
     53 
     54 /**
     55  * A namestore iteration operation.
     56  */
     57 struct ZoneIteration
     58 {
     59   /**
     60    * Next element in the DLL
     61    */
     62   struct ZoneIteration *next;
     63 
     64   /**
     65    * Previous element in the DLL
     66    */
     67   struct ZoneIteration *prev;
     68 
     69   /**
     70    * Namestore client which initiated this zone iteration
     71    */
     72   struct NamestoreClient *nc;
     73 
     74   /**
     75    * The nick to add to the records
     76    */
     77   struct GNUNET_GNSRECORD_Data *nick;
     78 
     79   /**
     80    * Key of the zone we are iterating over.
     81    */
     82   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
     83 
     84   /**
     85    * The record set filter
     86    */
     87   enum GNUNET_GNSRECORD_Filter filter;
     88 
     89   /**
     90    * Last sequence number in the zone iteration used to address next
     91    * result of the zone iteration in the store
     92    *
     93    * Initially set to 0.
     94    * Updated in #zone_iterate_proc()
     95    */
     96   uint64_t seq;
     97 
     98   /**
     99    * The operation id for the zone iteration in the response for the client
    100    */
    101   uint32_t request_id;
    102 
    103   /**
    104    * Offset of the zone iteration used to address next result of the zone
    105    * iteration in the store
    106    *
    107    * Initially set to 0 in #handle_iteration_start
    108    * Incremented with by every call to #handle_iteration_next
    109    */
    110   uint32_t offset;
    111 
    112   /**
    113    * Set to #GNUNET_YES if the last iteration exhausted the limit set by the
    114    * client and we should send the #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END
    115    * message and free the data structure.
    116    */
    117   int send_end;
    118 };
    119 
    120 /**
    121  * A namestore client
    122  */
    123 struct NamestoreClient
    124 {
    125   /**
    126    * The client
    127    */
    128   struct GNUNET_SERVICE_Client *client;
    129 
    130   /**
    131    * Database handle for client
    132    */
    133   struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
    134 
    135   /**
    136    * Name of loaded plugin (needed for cleanup)
    137    */
    138   char *db_lib_name;
    139 
    140   /**
    141    * Message queue for transmission to @e client
    142    */
    143   struct GNUNET_MQ_Handle *mq;
    144 
    145   /**
    146    * Head of the DLL of
    147    * Zone iteration operations in progress initiated by this client
    148    */
    149   struct ZoneIteration *op_head;
    150 
    151   /**
    152    * Tail of the DLL of
    153    * Zone iteration operations in progress initiated by this client
    154    */
    155   struct ZoneIteration *op_tail;
    156 };
    157 
    158 
    159 /**
    160  * A namestore monitor.
    161  */
    162 struct ZoneMonitor
    163 {
    164   /**
    165    * Next element in the DLL
    166    */
    167   struct ZoneMonitor *next;
    168 
    169   /**
    170    * Previous element in the DLL
    171    */
    172   struct ZoneMonitor *prev;
    173 
    174   /**
    175    * Namestore client which initiated this zone monitor
    176    */
    177   struct NamestoreClient *nc;
    178 
    179   /**
    180    * Private key of the zone.
    181    */
    182   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
    183 
    184   /**
    185    * The record set filter
    186    */
    187   enum GNUNET_GNSRECORD_Filter filter;
    188 
    189   /**
    190    * Task active during initial iteration.
    191    */
    192   struct GNUNET_SCHEDULER_Task *task;
    193 
    194   /**
    195    * Task to warn about slow monitors.
    196    */
    197   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
    198 
    199   /**
    200    * Since when are we blocked on this monitor?
    201    */
    202   struct GNUNET_TIME_Absolute sa_waiting_start;
    203 
    204   /**
    205    * Last sequence number in the zone iteration used to address next
    206    * result of the zone iteration in the store
    207    *
    208    * Initially set to 0.
    209    * Updated in #monitor_iterate_cb()
    210    */
    211   uint64_t seq;
    212 
    213   /**
    214    * Current limit of how many more messages we are allowed
    215    * to queue to this monitor.
    216    */
    217   uint64_t limit;
    218 
    219   /**
    220    * How many more requests may we receive from the iterator
    221    * before it is at the limit we gave it?  Will be below or
    222    * equal to @e limit.  The effective limit for monitor
    223    * events is thus @e iteration_cnt - @e limit!
    224    */
    225   uint64_t iteration_cnt;
    226 
    227   /**
    228    * Are we (still) in the initial iteration pass?
    229    */
    230   int in_first_iteration;
    231 
    232   /**
    233    * Run again because we skipped an orphan
    234    */
    235   int run_again;
    236 
    237   /**
    238    * Is there a store activity waiting for this monitor?  We only raise the
    239    * flag when it happens and search the DLL for the store activity when we
    240    * had a limit increase.  If we cannot find any waiting store activity at
    241    * that time, we clear the flag again.
    242    */
    243   int sa_waiting;
    244 };
    245 
    246 
    247 /**
    248  * Information for an ongoing #handle_record_store() operation.
    249  * Needed as we may wait for monitors to be ready for the notification.
    250  */
    251 struct StoreActivity
    252 {
    253   /**
    254    * Kept in a DLL.
    255    */
    256   struct StoreActivity *next;
    257 
    258   /**
    259    * Kept in a DLL.
    260    */
    261   struct StoreActivity *prev;
    262 
    263   /**
    264    * Which client triggered the store activity?
    265    */
    266   struct NamestoreClient *nc;
    267 
    268   /**
    269    * The request ID
    270    */
    271   uint32_t rid;
    272 
    273   /**
    274    * The currently processed record
    275    */
    276   uint16_t rd_set_pos;
    277 
    278   /**
    279    * The number of records in this activity
    280    */
    281   uint16_t rd_set_count;
    282 
    283   /**
    284    * The zone private key
    285    */
    286   struct GNUNET_CRYPTO_BlindablePrivateKey private_key;
    287 
    288   /**
    289    * Copy of the original record set (as data fields in @e rd will
    290    * point into it!).
    291    */
    292   const struct RecordSet *rs;
    293 
    294   /**
    295    * Next zone monitor that still needs to be notified about this PUT.
    296    */
    297   struct ZoneMonitor *zm_pos;
    298 
    299 };
    300 
    301 
    302 /**
    303  * Entry in list of cached nick resolutions.
    304  */
    305 struct NickCache
    306 {
    307   /**
    308    * Zone the cache entry is for.
    309    */
    310   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
    311 
    312   /**
    313    * Cached record data.
    314    */
    315   struct GNUNET_GNSRECORD_Data *rd;
    316 
    317   /**
    318    * Timestamp when this cache entry was used last.
    319    */
    320   struct GNUNET_TIME_Absolute last_used;
    321 };
    322 
    323 /**
    324  * We cache nick records to reduce DB load.
    325  */
    326 static struct NickCache nick_cache[NC_SIZE];
    327 
    328 /**
    329  * Public key of all zeros.
    330  */
    331 static const struct GNUNET_CRYPTO_BlindablePrivateKey zero;
    332 
    333 /**
    334  * Configuration handle.
    335  */
    336 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
    337 
    338 /**
    339  * Handle to the statistics service
    340  */
    341 static struct GNUNET_STATISTICS_Handle *statistics;
    342 
    343 /**
    344  * Name of the database plugin
    345  */
    346 static char *db_lib_name;
    347 
    348 /**
    349  * Database handle for service
    350  */
    351 struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
    352 
    353 
    354 /**
    355  * First active zone monitor.
    356  */
    357 static struct ZoneMonitor *monitor_head;
    358 
    359 /**
    360  * Last active zone monitor.
    361  */
    362 static struct ZoneMonitor *monitor_tail;
    363 
    364 /**
    365  * Head of DLL of monitor-blocked store activities.
    366  */
    367 static struct StoreActivity *sa_head;
    368 
    369 /**
    370  * Tail of DLL of monitor-blocked store activities.
    371  */
    372 static struct StoreActivity *sa_tail;
    373 
    374 /**
    375  * Notification context shared by all monitors.
    376  */
    377 static struct GNUNET_NotificationContext *monitor_nc;
    378 
    379 /**
    380  * Returned orphaned records?
    381  */
    382 static int return_orphaned;
    383 
    384 /**
    385  * Task run during shutdown.
    386  *
    387  * @param cls unused
    388  */
    389 static void
    390 cleanup_task (void *cls)
    391 {
    392   (void) cls;
    393   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stopping namestore service\n");
    394   if (NULL != monitor_nc)
    395   {
    396     GNUNET_notification_context_destroy (monitor_nc);
    397     monitor_nc = NULL;
    398   }
    399   if (NULL != statistics)
    400   {
    401     GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
    402     statistics = NULL;
    403   }
    404   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, GSN_database));
    405   GNUNET_free (db_lib_name);
    406   db_lib_name = NULL;
    407 }
    408 
    409 
    410 /**
    411  * Release memory used by @a sa.
    412  *
    413  * @param sa activity to free
    414  */
    415 static void
    416 free_store_activity (struct StoreActivity *sa)
    417 {
    418   GNUNET_CONTAINER_DLL_remove (sa_head, sa_tail, sa);
    419   GNUNET_free (sa);
    420 }
    421 
    422 
    423 /**
    424  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
    425  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
    426  * record, which (if found) is then copied to @a cls for future use.
    427  *
    428  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
    429  * @param seq sequence number of the record, MUST NOT BE ZERO
    430  * @param private_key the private key of the zone (unused)
    431  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
    432  * @param rd_count number of records in @a rd
    433  * @param rd records stored under @a label in the zone
    434  */
    435 static void
    436 lookup_nick_it (void *cls,
    437                 uint64_t seq,
    438                 const char *editor_hint,
    439                 const struct GNUNET_CRYPTO_BlindablePrivateKey *private_key,
    440                 const char *label,
    441                 unsigned int rd_count,
    442                 const struct GNUNET_GNSRECORD_Data *rd)
    443 {
    444   struct GNUNET_GNSRECORD_Data **res = cls;
    445 
    446   (void) private_key;
    447   GNUNET_assert (0 != seq);
    448   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
    449   {
    450     GNUNET_break (0);
    451     return;
    452   }
    453   for (unsigned int c = 0; c < rd_count; c++)
    454   {
    455     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
    456     {
    457       (*res) =
    458         GNUNET_malloc (rd[c].data_size + sizeof(struct GNUNET_GNSRECORD_Data));
    459       (*res)->data = &(*res)[1];
    460       GNUNET_memcpy ((void *) (*res)->data, rd[c].data, rd[c].data_size);
    461       (*res)->data_size = rd[c].data_size;
    462       (*res)->expiration_time = rd[c].expiration_time;
    463       (*res)->flags = rd[c].flags;
    464       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
    465       return;
    466     }
    467   }
    468   (*res) = NULL;
    469 }
    470 
    471 
    472 /**
    473  * Add entry to the cache for @a zone and @a nick
    474  *
    475  * @param zone zone key to cache under
    476  * @param nick nick entry to cache
    477  */
    478 static void
    479 cache_nick (const struct GNUNET_CRYPTO_BlindablePrivateKey *zone,
    480             const struct GNUNET_GNSRECORD_Data *nick)
    481 {
    482   struct NickCache *oldest;
    483 
    484   oldest = NULL;
    485   for (unsigned int i = 0; i < NC_SIZE; i++)
    486   {
    487     struct NickCache *pos = &nick_cache[i];
    488 
    489     if ((NULL == oldest) ||
    490         (oldest->last_used.abs_value_us > pos->last_used.abs_value_us))
    491       oldest = pos;
    492     if (0 == GNUNET_memcmp (zone, &pos->zone))
    493     {
    494       oldest = pos;
    495       break;
    496     }
    497   }
    498   GNUNET_free (oldest->rd);
    499   oldest->zone = *zone;
    500   if (NULL != nick)
    501   {
    502     oldest->rd = GNUNET_malloc (sizeof(*nick) + nick->data_size);
    503     *oldest->rd = *nick;
    504     oldest->rd->data = &oldest->rd[1];
    505     memcpy (&oldest->rd[1], nick->data, nick->data_size);
    506   }
    507   else
    508   {
    509     oldest->rd = NULL;
    510   }
    511   oldest->last_used = GNUNET_TIME_absolute_get ();
    512 }
    513 
    514 
    515 /**
    516  * Return the NICK record for the zone (if it exists).
    517  *
    518  * @param nc the namestore client
    519  * @param zone private key for the zone to look for nick
    520  * @return NULL if no NICK record was found
    521  */
    522 static struct GNUNET_GNSRECORD_Data *
    523 get_nick_record (const struct GNUNET_CRYPTO_BlindablePrivateKey *zone)
    524 {
    525   struct GNUNET_CRYPTO_BlindablePublicKey pub;
    526   struct GNUNET_GNSRECORD_Data *nick;
    527   int res;
    528 
    529   /* check cache first */
    530   for (unsigned int i = 0; i < NC_SIZE; i++)
    531   {
    532     struct NickCache *pos = &nick_cache[i];
    533     if ((NULL != pos->rd) && (0 == GNUNET_memcmp (zone, &pos->zone)))
    534     {
    535       if (NULL == pos->rd)
    536         return NULL;
    537       nick = GNUNET_malloc (sizeof(*nick) + pos->rd->data_size);
    538       *nick = *pos->rd;
    539       nick->data = &nick[1];
    540       memcpy (&nick[1], pos->rd->data, pos->rd->data_size);
    541       pos->last_used = GNUNET_TIME_absolute_get ();
    542       return nick;
    543     }
    544   }
    545 
    546   nick = NULL;
    547   res = GSN_database->lookup_records (GSN_database->cls,
    548                                       zone,
    549                                       GNUNET_GNS_EMPTY_LABEL_AT,
    550                                       &lookup_nick_it,
    551                                       &nick);
    552   if ((GNUNET_OK != res) || (NULL == nick))
    553   {
    554 #if ! defined(GNUNET_CULL_LOGGING)
    555     static int do_log = GNUNET_LOG_CALL_STATUS;
    556 
    557     if (0 == do_log)
    558       do_log = GNUNET_get_log_call_status (GNUNET_ERROR_TYPE_DEBUG,
    559                                            "namestore",
    560                                            __FILE__,
    561                                            __FUNCTION__,
    562                                            __LINE__);
    563     if (1 == do_log)
    564     {
    565       GNUNET_CRYPTO_blindable_key_get_public (zone, &pub);
    566       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
    567                   "No nick name set for zone `%s'\n",
    568                   GNUNET_GNSRECORD_z2s (&pub));
    569     }
    570 #endif
    571     /* update cache */
    572     cache_nick (zone, NULL);
    573     return NULL;
    574   }
    575 
    576   /* update cache */
    577   cache_nick (zone, nick);
    578   return nick;
    579 }
    580 
    581 
    582 /**
    583  * Merge the nick record @a nick_rd with the rest of the
    584  * record set given in @a rd2.  Store the result in @a rdc_res
    585  * and @a rd_res.  The @a nick_rd's expiration time is set to
    586  * the maximum expiration time of all of the records in @a rd2.
    587  *
    588  * @param nick_rd the nick record to integrate
    589  * @param rd2_length length of the @a rd2 array
    590  * @param rd2 array of records
    591  * @param[out] rdc_res length of the resulting @a rd_res array
    592  * @param[out] rd_res set to an array of records,
    593  *                    including @a nick_rd and @a rd2;
    594  *           all of the variable-size 'data' fields in @a rd2 are
    595  *           allocated in the same chunk of memory!
    596  */
    597 static void
    598 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
    599                          unsigned int rd2_length,
    600                          const struct GNUNET_GNSRECORD_Data *rd2,
    601                          unsigned int *rdc_res,
    602                          struct GNUNET_GNSRECORD_Data **rd_res)
    603 {
    604   uint64_t latest_expiration;
    605   size_t req;
    606   char *data;
    607   size_t data_offset;
    608   struct GNUNET_GNSRECORD_Data *target;
    609 
    610   (*rdc_res) = 1 + rd2_length;
    611   if (0 == 1 + rd2_length)
    612   {
    613     GNUNET_break (0);
    614     (*rd_res) = NULL;
    615     return;
    616   }
    617   req = sizeof(struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
    618   for (unsigned int i = 0; i < rd2_length; i++)
    619   {
    620     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
    621 
    622     if (req + sizeof(struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
    623     {
    624       GNUNET_break (0);
    625       (*rd_res) = NULL;
    626       return;
    627     }
    628     req += sizeof(struct GNUNET_GNSRECORD_Data) + orig->data_size;
    629   }
    630   target = GNUNET_malloc (req);
    631   (*rd_res) = target;
    632   data = (char *) &target[1 + rd2_length];
    633   data_offset = 0;
    634   latest_expiration = 0;
    635   for (unsigned int i = 0; i < rd2_length; i++)
    636   {
    637     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
    638 
    639     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
    640     {
    641       if ((GNUNET_TIME_absolute_get ().abs_value_us + orig->expiration_time) >
    642           latest_expiration)
    643         latest_expiration = orig->expiration_time;
    644     }
    645     else if (orig->expiration_time > latest_expiration)
    646       latest_expiration = orig->expiration_time;
    647     target[i] = *orig;
    648     target[i].data = (void *) &data[data_offset];
    649     GNUNET_memcpy (&data[data_offset], orig->data, orig->data_size);
    650     data_offset += orig->data_size;
    651   }
    652   /* append nick */
    653   target[rd2_length] = *nick_rd;
    654   /* Mark as supplemental */
    655   target[rd2_length].flags = nick_rd->flags | GNUNET_GNSRECORD_RF_SUPPLEMENTAL;
    656   target[rd2_length].expiration_time = latest_expiration;
    657   target[rd2_length].data = (void *) &data[data_offset];
    658   GNUNET_memcpy (&data[data_offset], nick_rd->data, nick_rd->data_size);
    659   data_offset += nick_rd->data_size;
    660   GNUNET_assert (req == (sizeof(struct GNUNET_GNSRECORD_Data)) * (*rdc_res)
    661                  + data_offset);
    662 }
    663 
    664 
    665 /**
    666  * Generate a `struct LookupNameResponseMessage` and send it to the
    667  * given client using the given notification context.
    668  *
    669  * @param nc client to unicast to
    670  * @param request_id request ID to use
    671  * @param zone_key zone key of the zone
    672  * @param name name
    673  * @param rd_count number of records in @a rd
    674  * @param rd array of records
    675  * @param filter record set filter
    676  */
    677 static int
    678 send_lookup_response_with_filter (struct NamestoreClient *nc,
    679                                   uint32_t request_id,
    680                                   const struct
    681                                   GNUNET_CRYPTO_BlindablePrivateKey *zone_key,
    682                                   const char *name,
    683                                   unsigned int rd_count,
    684                                   const struct GNUNET_GNSRECORD_Data *rd,
    685                                   enum GNUNET_GNSRECORD_Filter filter)
    686 {
    687   struct GNUNET_MQ_Envelope *env;
    688   struct RecordResultMessage *zir_msg;
    689   struct GNUNET_GNSRECORD_Data *nick;
    690   struct GNUNET_GNSRECORD_Data *res;
    691   struct GNUNET_GNSRECORD_Data rd_nf[rd_count];
    692   struct GNUNET_TIME_Absolute block_exp;
    693   unsigned int res_count;
    694   unsigned int rd_nf_count;
    695   size_t name_len;
    696   size_t key_len;
    697   ssize_t rd_ser_len;
    698   char *name_tmp;
    699   char *rd_ser;
    700   char *emsg;
    701 
    702   block_exp = GNUNET_TIME_UNIT_ZERO_ABS;
    703   nick = get_nick_record (zone_key);
    704   GNUNET_assert (-1 != GNUNET_GNSRECORD_records_get_size (rd_count, rd));
    705 
    706   if (GNUNET_OK != GNUNET_GNSRECORD_normalize_record_set (name,
    707                                                           rd,
    708                                                           rd_count,
    709                                                           rd_nf,
    710                                                           &rd_nf_count,
    711                                                           &block_exp,
    712                                                           filter,
    713                                                           &emsg))
    714   {
    715     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%s\n", emsg);
    716     GNUNET_free (emsg);
    717     GNUNET_assert (0);
    718   }
    719 
    720   /**
    721    * FIXME if we ever support GNUNET_NAMESTORE_OMIT_PUBLIC,
    722    * we need to omit adding this public record here
    723    */
    724   if ((NULL != nick) && (0 != strcmp (name, GNUNET_GNS_EMPTY_LABEL_AT)))
    725   {
    726     nick->flags =
    727       (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
    728     merge_with_nick_records (nick, rd_nf_count, rd_nf, &res_count, &res);
    729   }
    730   else
    731   {
    732     res_count = rd_nf_count;
    733     res = (struct GNUNET_GNSRECORD_Data *) rd_nf;
    734   }
    735   if (NULL != nick)
    736     GNUNET_free (nick);
    737 
    738   if (0 == res_count)
    739   {
    740     if (rd_nf != res)
    741       GNUNET_free (res);
    742     return 0;
    743   }
    744   GNUNET_assert (-1 != GNUNET_GNSRECORD_records_get_size (res_count, res));
    745 
    746 
    747   name_len = strlen (name) + 1;
    748   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count, res);
    749   if (rd_ser_len < 0)
    750   {
    751     if (rd_nf != res)
    752       GNUNET_free (res);
    753     GNUNET_break (0);
    754     GNUNET_SERVICE_client_drop (nc->client);
    755     return 0;
    756   }
    757   if (((size_t) rd_ser_len) >= UINT16_MAX - name_len - sizeof(*zir_msg))
    758   {
    759     if (rd_nf != res)
    760       GNUNET_free (res);
    761     GNUNET_break (0);
    762     GNUNET_SERVICE_client_drop (nc->client);
    763     return 0;
    764   }
    765   key_len = GNUNET_CRYPTO_blindable_sk_get_length (zone_key);
    766   env = GNUNET_MQ_msg_extra (zir_msg,
    767                              name_len + rd_ser_len + key_len,
    768                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
    769   zir_msg->gns_header.r_id = htonl (request_id);
    770   zir_msg->name_len = htons (name_len);
    771   zir_msg->rd_count = htons (res_count);
    772   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
    773   zir_msg->key_len = htons (key_len);
    774   GNUNET_CRYPTO_write_blindable_sk_to_buffer (zone_key,
    775                                               &zir_msg[1],
    776                                               key_len);
    777   zir_msg->expire = GNUNET_TIME_absolute_hton (block_exp);
    778   name_tmp = (char *) &zir_msg[1] + key_len;
    779   GNUNET_memcpy (name_tmp, name, name_len);
    780   rd_ser = &name_tmp[name_len];
    781   GNUNET_assert (
    782     rd_ser_len ==
    783     GNUNET_GNSRECORD_records_serialize (res_count, res, rd_ser_len, rd_ser));
    784   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    785               "Sending RECORD_RESULT message with %u records\n",
    786               res_count);
    787   GNUNET_STATISTICS_update (statistics,
    788                             "Record sets sent to clients",
    789                             1,
    790                             GNUNET_NO);
    791   GNUNET_MQ_send (nc->mq, env);
    792   if (rd_nf != res)
    793     GNUNET_free (res);
    794   return res_count;
    795 }
    796 
    797 
    798 /**
    799  * Send response to the store request to the client.
    800  *
    801  * @param nc client to talk to
    802  * @param ec status of the operation
    803  * @param rid client's request ID
    804  */
    805 static void
    806 send_store_response (struct NamestoreClient *nc,
    807                      enum GNUNET_ErrorCode ec,
    808                      uint32_t rid)
    809 {
    810   struct GNUNET_MQ_Envelope *env;
    811   struct NamestoreResponseMessage *rcr_msg;
    812 
    813   GNUNET_assert (NULL != nc);
    814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    815               "Sending GENERIC_RESPONSE message\n");
    816   GNUNET_STATISTICS_update (statistics,
    817                             "Store requests completed",
    818                             1,
    819                             GNUNET_NO);
    820   env = GNUNET_MQ_msg (rcr_msg,
    821                        GNUNET_MESSAGE_TYPE_NAMESTORE_GENERIC_RESPONSE);
    822   rcr_msg->gns_header.r_id = htonl (rid);
    823   rcr_msg->ec = htonl (ec);
    824   GNUNET_MQ_send (nc->mq, env);
    825 }
    826 
    827 
    828 /**
    829  * Function called once we are done with the zone iteration and
    830  * allow the zone iteration client to send us more messages.
    831  *
    832  * @param zi zone iteration we are processing
    833  */
    834 static void
    835 zone_iteration_done_client_continue (struct ZoneIteration *zi)
    836 {
    837   struct GNUNET_MQ_Envelope *env;
    838   struct GNUNET_NAMESTORE_Header *em;
    839 
    840   GNUNET_SERVICE_client_continue (zi->nc->client);
    841   if (! zi->send_end)
    842     return;
    843   /* send empty response to indicate end of list */
    844   env = GNUNET_MQ_msg (em, GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END);
    845   em->r_id = htonl (zi->request_id);
    846   GNUNET_MQ_send (zi->nc->mq, env);
    847 
    848   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head, zi->nc->op_tail, zi);
    849   GNUNET_free (zi);
    850 }
    851 
    852 
    853 /**
    854  * Print a warning that one of our monitors is no longer reacting.
    855  *
    856  * @param cls a `struct ZoneMonitor` to warn about
    857  */
    858 static void
    859 warn_monitor_slow (void *cls)
    860 {
    861   struct ZoneMonitor *zm = cls;
    862 
    863   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    864               "No response from monitor since %s\n",
    865               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
    866   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
    867                                                       &warn_monitor_slow,
    868                                                       zm);
    869 }
    870 
    871 
    872 /**
    873  * Continue processing the @a sa.
    874  *
    875  * @param sa store activity to process
    876  */
    877 static int
    878 continue_store_activity (struct StoreActivity *sa,
    879                          int call_continue)
    880 {
    881   const struct RecordSet *rd_set = sa->rs;
    882   unsigned int rd_count;
    883   size_t name_len;
    884   size_t rd_ser_len;
    885   const char *name;
    886   const char *rd_ser;
    887   const char *buf;
    888 
    889   buf = (const char *) &sa[1];
    890   for (int i = sa->rd_set_pos; i < sa->rd_set_count; i++)
    891   {
    892     rd_set = (struct RecordSet *) buf;
    893     name_len = ntohs (rd_set->name_len);
    894     rd_count = ntohs (rd_set->rd_count);
    895     rd_ser_len = ntohs (rd_set->rd_len);
    896     name = (const char *) &rd_set[1];
    897     buf += sizeof (struct RecordSet) + name_len + rd_ser_len;
    898     rd_ser = &name[name_len];
    899     {
    900       struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL (rd_count)];
    901 
    902       /* We did this before, must succeed again */
    903       GNUNET_assert (
    904         GNUNET_OK ==
    905         GNUNET_GNSRECORD_records_deserialize (rd_ser_len, rd_ser, rd_count,
    906                                               rd));
    907 
    908       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    909                   "Checking monitors watching for `%s'\n",
    910                   name);
    911       for (struct ZoneMonitor *zm = sa->zm_pos; NULL != zm; zm = sa->zm_pos)
    912       {
    913         if ((0 != GNUNET_memcmp (&sa->private_key, &zm->zone)) &&
    914             (0 != GNUNET_memcmp (&zm->zone, &zero)))
    915         {
    916           sa->zm_pos = zm->next;   /* not interesting to this monitor */
    917           continue;
    918         }
    919         if (zm->limit == zm->iteration_cnt)
    920         {
    921           zm->sa_waiting = GNUNET_YES;
    922           zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
    923           if (NULL != zm->sa_wait_warning)
    924             GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
    925           zm->sa_wait_warning =
    926             GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
    927                                           &warn_monitor_slow,
    928                                           zm);
    929           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    930                       "Monitor is blocking client for `%s'\n",
    931                       name);
    932           return GNUNET_NO;    /* blocked on zone monitor */
    933         }
    934         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    935                     "Notifying monitor about changes under label `%s'\n",
    936                     name);
    937         if (0 < send_lookup_response_with_filter (zm->nc,
    938                                                   0,
    939                                                   &sa->private_key,
    940                                                   name,
    941                                                   rd_count,
    942                                                   rd,
    943                                                   zm->filter))
    944           zm->limit--;
    945         sa->zm_pos = zm->next;
    946       }
    947       sa->rd_set_pos++;
    948     }
    949   }
    950   if (GNUNET_YES == call_continue)
    951     GNUNET_SERVICE_client_continue (sa->nc->client);
    952   send_store_response (sa->nc, GNUNET_EC_NONE, sa->rid);
    953   free_store_activity (sa);
    954   return GNUNET_OK;
    955 }
    956 
    957 
    958 /**
    959  * Called whenever a client is disconnected.
    960  * Frees our resources associated with that client.
    961  *
    962  * @param cls closure
    963  * @param client identification of the client
    964  * @param app_ctx the `struct NamestoreClient` of @a client
    965  */
    966 static void
    967 client_disconnect_cb (void *cls,
    968                       struct GNUNET_SERVICE_Client *client,
    969                       void *app_ctx)
    970 {
    971   struct NamestoreClient *nc = app_ctx;
    972   struct ZoneIteration *no;
    973   struct StoreActivity *sa = sa_head;
    974   struct StoreActivity *sn;
    975 
    976   (void) cls;
    977   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
    978   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
    979   {
    980     if (nc != zm->nc)
    981       continue;
    982     GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, zm);
    983     if (NULL != zm->task)
    984     {
    985       GNUNET_SCHEDULER_cancel (zm->task);
    986       zm->task = NULL;
    987     }
    988     if (NULL != zm->sa_wait_warning)
    989     {
    990       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
    991       zm->sa_wait_warning = NULL;
    992     }
    993     for (sa = sa_head; NULL != sa; sa = sn)
    994     {
    995       sn = sa->next;
    996       if (zm == sa->zm_pos)
    997       {
    998         sa->zm_pos = zm->next;
    999         /* this may free sa */
   1000         continue_store_activity (sa, GNUNET_YES);
   1001       }
   1002     }
   1003     GNUNET_free (zm);
   1004     break;
   1005   }
   1006   sa = sa_head;
   1007   while (NULL != sa)
   1008   {
   1009     if (nc != sa->nc)
   1010     {
   1011       sa = sa->next;
   1012       continue;
   1013     }
   1014     sn = sa->next;
   1015     free_store_activity (sa);
   1016     sa = sn;
   1017   }
   1018   while (NULL != (no = nc->op_head))
   1019   {
   1020     GNUNET_CONTAINER_DLL_remove (nc->op_head, nc->op_tail, no);
   1021     GNUNET_free (no);
   1022   }
   1023   GNUNET_break (NULL == GNUNET_PLUGIN_unload (nc->db_lib_name,
   1024                                               nc->GSN_database));
   1025   GNUNET_free (nc->db_lib_name);
   1026   GNUNET_free (nc);
   1027 }
   1028 
   1029 
   1030 /**
   1031  * Add a client to our list of active clients.
   1032  *
   1033  * @param cls NULL
   1034  * @param client client to add
   1035  * @param mq message queue for @a client
   1036  * @return internal namestore client structure for this client
   1037  */
   1038 static void *
   1039 client_connect_cb (void *cls,
   1040                    struct GNUNET_SERVICE_Client *client,
   1041                    struct GNUNET_MQ_Handle *mq)
   1042 {
   1043   struct NamestoreClient *nc;
   1044   char *database;
   1045 
   1046   (void) cls;
   1047   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p connected\n", client);
   1048   nc = GNUNET_new (struct NamestoreClient);
   1049   nc->client = client;
   1050   nc->mq = mq;
   1051   /* Loading database plugin */
   1052   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (GSN_cfg,
   1053                                                           "namestore",
   1054                                                           "database",
   1055                                                           &database))
   1056   {
   1057     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No database backend configured\n");
   1058     GNUNET_free (nc);
   1059     return NULL;
   1060   }
   1061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1062               "Loading %s\n",
   1063               db_lib_name);
   1064   nc->GSN_database = GNUNET_PLUGIN_load (GNUNET_OS_project_data_gnunet (),
   1065                                          db_lib_name,
   1066                                          (void *) GSN_cfg);
   1067   GNUNET_free (database);
   1068   if (NULL == nc->GSN_database)
   1069   {
   1070     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1071                 "Could not load database backend `%s'\n",
   1072                 db_lib_name);
   1073     GNUNET_free (nc);
   1074     return NULL;
   1075   }
   1076   nc->db_lib_name = GNUNET_strdup (db_lib_name);
   1077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1078               "Loaded %s\n",
   1079               db_lib_name);
   1080   return nc;
   1081 }
   1082 
   1083 
   1084 /**
   1085  * Closure for #lookup_it().
   1086  */
   1087 struct RecordLookupContext
   1088 {
   1089   /**
   1090    * The label to look up.
   1091    */
   1092   const char *label;
   1093 
   1094   /**
   1095    * The editor hint for set
   1096    */
   1097   char *editor_hint;
   1098 
   1099   /**
   1100    * The record result.
   1101    */
   1102   char *res_rd;
   1103 
   1104   /**
   1105    * The nick for the zone.
   1106    */
   1107   struct GNUNET_GNSRECORD_Data *nick;
   1108 
   1109   /**
   1110    * If a record set was found or not.
   1111    */
   1112   int found;
   1113 
   1114   /**
   1115    * The record filter
   1116    */
   1117   enum GNUNET_GNSRECORD_Filter filter;
   1118 
   1119   /**
   1120    * The number of found records.
   1121    */
   1122   unsigned int res_rd_count;
   1123 
   1124   /**
   1125    * The length of the serialized records.
   1126    */
   1127   ssize_t rd_ser_len;
   1128 };
   1129 
   1130 
   1131 /**
   1132  * Function called by the namestore plugin when we are trying to lookup
   1133  * a record as part of #handle_record_lookup().  Merges all results into
   1134  * the context.
   1135  *
   1136  * @param cls closure with a `struct RecordLookupContext`
   1137  * @param seq unique serial number of the record, MUST NOT BE ZERO
   1138  * @param private_key private key of the zone
   1139  * @param label name that is being mapped (at most 255 characters long)
   1140  * @param rd_count number of entries in @a rd array
   1141  * @param rd array of records with data to store
   1142  */
   1143 static void
   1144 lookup_it (void *cls,
   1145            uint64_t seq,
   1146            const char *editor_hint,
   1147            const struct GNUNET_CRYPTO_BlindablePrivateKey *private_key,
   1148            const char *label,
   1149            unsigned int rd_count_nf,
   1150            const struct GNUNET_GNSRECORD_Data *rd_nf)
   1151 {
   1152   struct RecordLookupContext *rlc = cls;
   1153   struct GNUNET_GNSRECORD_Data rd[rd_count_nf];
   1154   struct GNUNET_TIME_Absolute block_exp;
   1155   unsigned int rd_count = 0;
   1156   char *emsg;
   1157 
   1158   (void) private_key;
   1159   GNUNET_assert (0 != seq);
   1160   if (0 != strcmp (label, rlc->label))
   1161     return;
   1162   rlc->found = GNUNET_YES;
   1163   if (NULL == rlc->editor_hint)
   1164     rlc->editor_hint = GNUNET_strdup (editor_hint);
   1165   if (GNUNET_OK != GNUNET_GNSRECORD_normalize_record_set (rlc->label,
   1166                                                           rd_nf,
   1167                                                           rd_count_nf,
   1168                                                           rd,
   1169                                                           &rd_count,
   1170                                                           &block_exp,
   1171                                                           rlc->filter,
   1172                                                           &emsg))
   1173   {
   1174     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%s\n", emsg);
   1175     GNUNET_free (emsg);
   1176     GNUNET_assert (0);
   1177   }
   1178 
   1179   if (0 == rd_count)
   1180   {
   1181     rlc->rd_ser_len = 0;
   1182     rlc->res_rd_count = 0;
   1183     rlc->res_rd = NULL;
   1184     return;
   1185   }
   1186   if ((NULL != rlc->nick) && (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT)))
   1187   {
   1188     /* Merge */
   1189     struct GNUNET_GNSRECORD_Data *rd_res;
   1190     unsigned int rdc_res;
   1191 
   1192     rd_res = NULL;
   1193     rdc_res = 0;
   1194     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE)
   1195                        ^ GNUNET_GNSRECORD_RF_PRIVATE;
   1196     merge_with_nick_records (rlc->nick, rd_count, rd, &rdc_res, &rd_res);
   1197     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res, rd_res);
   1198     if (rlc->rd_ser_len < 0)
   1199     {
   1200       GNUNET_break (0);
   1201       GNUNET_free (rd_res);
   1202       rlc->found = GNUNET_NO;
   1203       rlc->rd_ser_len = 0;
   1204       return;
   1205     }
   1206     rlc->res_rd_count = rdc_res;
   1207     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
   1208     if (rlc->rd_ser_len != GNUNET_GNSRECORD_records_serialize (rdc_res,
   1209                                                                rd_res,
   1210                                                                rlc->rd_ser_len,
   1211                                                                rlc->res_rd))
   1212     {
   1213       GNUNET_break (0);
   1214       GNUNET_free (rlc->res_rd);
   1215       rlc->res_rd = NULL;
   1216       rlc->res_rd_count = 0;
   1217       rlc->rd_ser_len = 0;
   1218       GNUNET_free (rd_res);
   1219       rlc->found = GNUNET_NO;
   1220       return;
   1221     }
   1222     GNUNET_free (rd_res);
   1223     GNUNET_free (rlc->nick);
   1224     rlc->nick = NULL;
   1225   }
   1226   else
   1227   {
   1228     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count, rd);
   1229     if (rlc->rd_ser_len < 0)
   1230     {
   1231       GNUNET_break (0);
   1232       rlc->found = GNUNET_NO;
   1233       rlc->rd_ser_len = 0;
   1234       return;
   1235     }
   1236     rlc->res_rd_count = rd_count;
   1237     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
   1238     if (rlc->rd_ser_len != GNUNET_GNSRECORD_records_serialize (rd_count,
   1239                                                                rd,
   1240                                                                rlc->rd_ser_len,
   1241                                                                rlc->res_rd))
   1242     {
   1243       GNUNET_break (0);
   1244       GNUNET_free (rlc->res_rd);
   1245       rlc->res_rd = NULL;
   1246       rlc->res_rd_count = 0;
   1247       rlc->rd_ser_len = 0;
   1248       rlc->found = GNUNET_NO;
   1249       return;
   1250     }
   1251   }
   1252 }
   1253 
   1254 
   1255 /**
   1256  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT message
   1257  *
   1258  * @param cls client sending the message
   1259  * @param er_msg message of type `struct EditRecordSetMessage`
   1260  * @return #GNUNET_OK if @a er_msg is well-formed
   1261  */
   1262 static int
   1263 check_edit_record_set (void *cls, const struct EditRecordSetMessage *er_msg)
   1264 {
   1265   uint16_t name_len;
   1266   uint16_t editor_hint_len;
   1267   size_t src_size;
   1268   size_t key_len;
   1269 
   1270   (void) cls;
   1271   name_len = ntohs (er_msg->label_len);
   1272   editor_hint_len = ntohs (er_msg->editor_hint_len);
   1273   key_len = ntohs (er_msg->key_len);
   1274   src_size = ntohs (er_msg->gns_header.header.size);
   1275   if (name_len + editor_hint_len + key_len != src_size - sizeof(struct
   1276                                                                 EditRecordSetMessage))
   1277   {
   1278     GNUNET_break (0);
   1279     return GNUNET_SYSERR;
   1280   }
   1281   return GNUNET_OK;
   1282 }
   1283 
   1284 
   1285 /**
   1286  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT message
   1287  *
   1288  * @param cls client sending the message
   1289  * @param ll_msg message of type `struct EditRecordSetMessage`
   1290  */
   1291 static void
   1292 handle_edit_record_set (void *cls, const struct EditRecordSetMessage *er_msg)
   1293 {
   1294   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
   1295   struct NamestoreClient *nc = cls;
   1296   struct GNUNET_MQ_Envelope *env;
   1297   struct EditRecordSetResponseMessage *rer_msg;
   1298   struct RecordLookupContext rlc;
   1299   const char *name_tmp;
   1300   const char *editor_hint;
   1301   char *conv_name;
   1302   uint16_t name_len;
   1303   uint16_t old_editor_hint_len;
   1304   int res;
   1305   size_t key_len;
   1306   size_t kb_read;
   1307 
   1308   key_len = ntohs (er_msg->key_len);
   1309   name_len = ntohs (er_msg->label_len);
   1310   if ((GNUNET_SYSERR ==
   1311        GNUNET_CRYPTO_read_private_key_from_buffer (&er_msg[1],
   1312                                                    key_len,
   1313                                                    &zone,
   1314                                                    &kb_read)) ||
   1315       (kb_read != key_len))
   1316   {
   1317     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1318                 "Error reading private key\n");
   1319     GNUNET_SERVICE_client_drop (nc->client);
   1320     return;
   1321   }
   1322   name_tmp = (const char *) &er_msg[1] + key_len;
   1323   editor_hint = (const char *) name_tmp + name_len;
   1324   GNUNET_SERVICE_client_continue (nc->client);
   1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1326               "Received NAMESTORE_RECORD_SET_EDIT message for name `%s'\n",
   1327               name_tmp);
   1328 
   1329   conv_name = GNUNET_GNSRECORD_string_normalize (name_tmp);
   1330   if (NULL == conv_name)
   1331   {
   1332     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1333                 "Error converting name `%s'\n",
   1334                 name_tmp);
   1335     GNUNET_SERVICE_client_drop (nc->client);
   1336     return;
   1337   }
   1338   name_len = strlen (conv_name) + 1;
   1339   rlc.editor_hint = NULL;
   1340   rlc.label = conv_name;
   1341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1342               "Looking up without filter\n");
   1343   rlc.filter = GNUNET_GNSRECORD_FILTER_NONE;
   1344   rlc.found = GNUNET_NO;
   1345   rlc.res_rd_count = 0;
   1346   rlc.res_rd = NULL;
   1347   rlc.rd_ser_len = 0;
   1348   rlc.nick = get_nick_record (&zone);
   1349   res = nc->GSN_database->edit_records (nc->GSN_database->cls,
   1350                                         editor_hint,
   1351                                         &zone,
   1352                                         conv_name,
   1353                                         &lookup_it,
   1354                                         &rlc);
   1355 
   1356   old_editor_hint_len = 0;
   1357   if (NULL != rlc.editor_hint)
   1358     old_editor_hint_len = strlen (rlc.editor_hint) + 1;
   1359   env =
   1360     GNUNET_MQ_msg_extra (rer_msg,
   1361                          rlc.rd_ser_len + old_editor_hint_len,
   1362                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT_RESPONSE)
   1363   ;
   1364   rer_msg->editor_hint_len = htons (old_editor_hint_len);
   1365   rer_msg->gns_header.r_id = er_msg->gns_header.r_id;
   1366   rer_msg->rd_count = htons (rlc.res_rd_count);
   1367   rer_msg->rd_len = htons (rlc.rd_ser_len);
   1368   if (GNUNET_YES == rlc.found)
   1369     rer_msg->ec = htons (GNUNET_EC_NONE);
   1370   else if (GNUNET_SYSERR == res)
   1371     rer_msg->ec = htons (GNUNET_EC_NAMESTORE_UNKNOWN);
   1372   else
   1373     rer_msg->ec = htons (GNUNET_EC_NAMESTORE_NO_RESULTS);
   1374   GNUNET_memcpy (&rer_msg[1], rlc.editor_hint, old_editor_hint_len);
   1375   GNUNET_memcpy ((char*) &rer_msg[1] + old_editor_hint_len, rlc.res_rd,
   1376                  rlc.rd_ser_len);
   1377   GNUNET_MQ_send (nc->mq, env);
   1378   GNUNET_free (rlc.editor_hint);
   1379   GNUNET_free (rlc.res_rd);
   1380   GNUNET_free (conv_name);
   1381 }
   1382 
   1383 
   1384 /**
   1385  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT_CANCEL message
   1386  *
   1387  * @param cls client sending the message
   1388  * @param er_msg message of type `struct EditRecordSetMessage`
   1389  * @return #GNUNET_OK if @a er_msg is well-formed
   1390  */
   1391 static int
   1392 check_edit_record_set_cancel (void *cls, const struct
   1393                               EditRecordSetCancelMessage *er_msg)
   1394 {
   1395   uint16_t name_len;
   1396   uint16_t editor_hint_len;
   1397   uint16_t editor_hint_repl_len;
   1398   size_t src_size;
   1399   size_t key_len;
   1400 
   1401   (void) cls;
   1402   name_len = ntohs (er_msg->label_len);
   1403   editor_hint_len = ntohs (er_msg->editor_hint_len);
   1404   editor_hint_repl_len = ntohs (er_msg->editor_hint_len);
   1405   key_len = ntohs (er_msg->key_len);
   1406   src_size = ntohs (er_msg->gns_header.header.size);
   1407   if (name_len + editor_hint_len + editor_hint_repl_len + key_len != src_size
   1408       - sizeof(struct
   1409                EditRecordSetCancelMessage))
   1410   {
   1411     GNUNET_break (0);
   1412     return GNUNET_SYSERR;
   1413   }
   1414   return GNUNET_OK;
   1415 }
   1416 
   1417 
   1418 /**
   1419  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT_CANCEL message
   1420  *
   1421  * @param cls client sending the message
   1422  * @param ll_msg message of type `struct EditRecordSetCancelMessage`
   1423  */
   1424 static void
   1425 handle_edit_record_set_cancel (void *cls, const struct
   1426                                EditRecordSetCancelMessage *er_msg)
   1427 {
   1428   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
   1429   struct NamestoreClient *nc = cls;
   1430   struct GNUNET_MQ_Envelope *env;
   1431   struct NamestoreResponseMessage *rer_msg;
   1432   const char *name_tmp;
   1433   const char *editor_hint;
   1434   const char *editor_hint_repl;
   1435   char *conv_name;
   1436   uint16_t name_len;
   1437   uint16_t editor_hint_len;
   1438   int res;
   1439   size_t key_len;
   1440   size_t kb_read;
   1441 
   1442   key_len = ntohs (er_msg->key_len);
   1443   name_len = ntohs (er_msg->label_len);
   1444   editor_hint_len = ntohs (er_msg->editor_hint_len);
   1445   if ((GNUNET_SYSERR ==
   1446        GNUNET_CRYPTO_read_private_key_from_buffer (&er_msg[1],
   1447                                                    key_len,
   1448                                                    &zone,
   1449                                                    &kb_read)) ||
   1450       (kb_read != key_len))
   1451   {
   1452     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1453                 "Error reading private key\n");
   1454     GNUNET_SERVICE_client_drop (nc->client);
   1455     return;
   1456   }
   1457   name_tmp = (const char *) &er_msg[1] + key_len;
   1458   editor_hint = (const char *) name_tmp + name_len;
   1459   editor_hint_repl = (const char *) name_tmp + name_len + editor_hint_len;
   1460   GNUNET_SERVICE_client_continue (nc->client);
   1461   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1462               "Received NAMESTORE_RECORD_SET_EDIT message for name `%s'\n",
   1463               name_tmp);
   1464 
   1465   conv_name = GNUNET_GNSRECORD_string_normalize (name_tmp);
   1466   if (NULL == conv_name)
   1467   {
   1468     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1469                 "Error converting name `%s'\n",
   1470                 name_tmp);
   1471     GNUNET_SERVICE_client_drop (nc->client);
   1472     return;
   1473   }
   1474   name_len = strlen (conv_name) + 1;
   1475   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1476               "Clearing editor hint\n");
   1477   res = nc->GSN_database->clear_editor_hint (nc->GSN_database->cls,
   1478                                              editor_hint,
   1479                                              editor_hint_repl,
   1480                                              &zone,
   1481                                              conv_name);
   1482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1483               "Clearing editor hint %s\n", (GNUNET_SYSERR == res) ? "failed." :
   1484               "successful.");
   1485 
   1486   env =
   1487     GNUNET_MQ_msg (rer_msg,
   1488                    GNUNET_MESSAGE_TYPE_NAMESTORE_GENERIC_RESPONSE);
   1489   rer_msg->gns_header.r_id = er_msg->gns_header.r_id;
   1490   rer_msg->ec = htons ((GNUNET_OK == res) ? GNUNET_EC_NONE :
   1491                        GNUNET_EC_NAMESTORE_BACKEND_FAILED);
   1492   GNUNET_MQ_send (nc->mq, env);
   1493   GNUNET_free (conv_name);
   1494 }
   1495 
   1496 
   1497 /**
   1498  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
   1499  *
   1500  * @param cls client sending the message
   1501  * @param ll_msg message of type `struct LabelLookupMessage`
   1502  * @return #GNUNET_OK if @a ll_msg is well-formed
   1503  */
   1504 static int
   1505 check_record_lookup (void *cls, const struct LabelLookupMessage *ll_msg)
   1506 {
   1507   uint32_t name_len;
   1508   size_t src_size;
   1509   size_t key_len;
   1510 
   1511   (void) cls;
   1512   name_len = ntohs (ll_msg->label_len);
   1513   key_len = ntohs (ll_msg->key_len);
   1514   src_size = ntohs (ll_msg->gns_header.header.size);
   1515   if (name_len + key_len != src_size - sizeof(struct LabelLookupMessage))
   1516   {
   1517     GNUNET_break (0);
   1518     return GNUNET_SYSERR;
   1519   }
   1520   return GNUNET_OK;
   1521 }
   1522 
   1523 
   1524 /**
   1525  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
   1526  *
   1527  * @param cls client sending the message
   1528  * @param ll_msg message of type `struct LabelLookupMessage`
   1529  */
   1530 static void
   1531 handle_record_lookup (void *cls, const struct LabelLookupMessage *ll_msg)
   1532 {
   1533   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
   1534   struct NamestoreClient *nc = cls;
   1535   struct GNUNET_MQ_Envelope *env;
   1536   struct LabelLookupResponseMessage *llr_msg;
   1537   struct RecordLookupContext rlc;
   1538   const char *name_tmp;
   1539   char *res_name;
   1540   char *conv_name;
   1541   uint32_t name_len;
   1542   int res;
   1543   size_t key_len;
   1544   size_t kb_read;
   1545 
   1546   key_len = ntohs (ll_msg->key_len);
   1547   if ((GNUNET_SYSERR ==
   1548        GNUNET_CRYPTO_read_private_key_from_buffer (&ll_msg[1],
   1549                                                    key_len,
   1550                                                    &zone,
   1551                                                    &kb_read)) ||
   1552       (kb_read != key_len))
   1553   {
   1554     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1555                 "Error reading private key\n");
   1556     GNUNET_SERVICE_client_drop (nc->client);
   1557     return;
   1558   }
   1559   name_tmp = (const char *) &ll_msg[1] + key_len;
   1560   GNUNET_SERVICE_client_continue (nc->client);
   1561   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1562               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
   1563               name_tmp);
   1564 
   1565   conv_name = GNUNET_GNSRECORD_string_normalize (name_tmp);
   1566   if (NULL == conv_name)
   1567   {
   1568     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1569                 "Error converting name `%s'\n",
   1570                 name_tmp);
   1571     GNUNET_SERVICE_client_drop (nc->client);
   1572     return;
   1573   }
   1574   name_len = strlen (conv_name) + 1;
   1575   rlc.editor_hint = NULL;
   1576   rlc.label = conv_name;
   1577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1578               "Looking up with filter %u\n", ntohs (ll_msg->filter));
   1579   rlc.filter = ntohs (ll_msg->filter);
   1580   rlc.found = GNUNET_NO;
   1581   rlc.res_rd_count = 0;
   1582   rlc.res_rd = NULL;
   1583   rlc.rd_ser_len = 0;
   1584   rlc.nick = get_nick_record (&zone);
   1585   res = nc->GSN_database->lookup_records (nc->GSN_database->cls,
   1586                                           &zone,
   1587                                           conv_name,
   1588                                           &lookup_it,
   1589                                           &rlc);
   1590   env =
   1591     GNUNET_MQ_msg_extra (llr_msg,
   1592                          key_len + name_len + rlc.rd_ser_len,
   1593                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
   1594   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
   1595   GNUNET_memcpy (&llr_msg[1], &ll_msg[1], key_len);
   1596   llr_msg->key_len = ll_msg->key_len;
   1597   llr_msg->name_len = htons (name_len);
   1598   llr_msg->rd_count = htons (rlc.res_rd_count);
   1599   llr_msg->rd_len = htons (rlc.rd_ser_len);
   1600   llr_msg->reserved = htons (0);
   1601   res_name = ((char *) &llr_msg[1]) + key_len;
   1602   if (GNUNET_YES == rlc.found)
   1603     llr_msg->found = htons (GNUNET_YES);
   1604   else if (GNUNET_SYSERR == res)
   1605     llr_msg->found = htons (GNUNET_SYSERR);
   1606   else
   1607     llr_msg->found = htons (GNUNET_NO);
   1608   GNUNET_memcpy (res_name, conv_name, name_len);
   1609   GNUNET_memcpy (&res_name[name_len], rlc.res_rd, rlc.rd_ser_len);
   1610   GNUNET_MQ_send (nc->mq, env);
   1611   GNUNET_free (rlc.editor_hint);
   1612   GNUNET_free (rlc.res_rd);
   1613   GNUNET_free (conv_name);
   1614 }
   1615 
   1616 
   1617 /**
   1618  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
   1619  *
   1620  * @param cls client sending the message
   1621  * @param rp_msg message of type `struct RecordStoreMessage`
   1622  * @return #GNUNET_OK if @a rp_msg is well-formed
   1623  */
   1624 static int
   1625 check_record_store (void *cls, const struct RecordStoreMessage *rp_msg)
   1626 {
   1627   size_t msg_size;
   1628   size_t min_size_exp;
   1629   size_t rd_set_count;
   1630   size_t key_len;
   1631 
   1632   (void) cls;
   1633   msg_size = ntohs (rp_msg->gns_header.header.size);
   1634   rd_set_count = ntohs (rp_msg->rd_set_count);
   1635   key_len = ntohs (rp_msg->key_len);
   1636 
   1637   min_size_exp = sizeof(*rp_msg) + key_len + sizeof (struct RecordSet)
   1638                  * rd_set_count;
   1639   if (msg_size < min_size_exp)
   1640   {
   1641     GNUNET_break (0);
   1642     return GNUNET_SYSERR;
   1643   }
   1644   return GNUNET_OK;
   1645 }
   1646 
   1647 
   1648 struct LookupExistingRecordsContext
   1649 {
   1650 
   1651   /**
   1652    * The expiration of the existing records or tombstone
   1653    */
   1654   struct GNUNET_TIME_Absolute exp;
   1655 
   1656   /**
   1657    * Whether the existing record set consists only of a tombstone
   1658    * (e.g. is "empty")
   1659    */
   1660   int only_tombstone;
   1661 
   1662 };
   1663 
   1664 
   1665 /**
   1666  * Check if set contains a tombstone, store if necessary
   1667  *
   1668  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
   1669  * @param seq sequence number of the record, MUST NOT BE ZERO
   1670  * @param private_key the private key of the zone (unused)
   1671  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
   1672  * @param rd_count number of records in @a rd
   1673  * @param rd records stored under @a label in the zone
   1674  */
   1675 static void
   1676 get_existing_rd_exp (void *cls,
   1677                      uint64_t seq,
   1678                      const char *editor_hint,
   1679                      const struct
   1680                      GNUNET_CRYPTO_BlindablePrivateKey *private_key,
   1681                      const char *label,
   1682                      unsigned int rd_count,
   1683                      const struct GNUNET_GNSRECORD_Data *rd)
   1684 {
   1685   struct LookupExistingRecordsContext *lctx = cls;
   1686   struct GNUNET_GNSRECORD_Data rd_pub[rd_count];
   1687   unsigned int rd_pub_count;
   1688   char *emsg;
   1689 
   1690   if ((1 == rd_count) &&
   1691       (GNUNET_GNSRECORD_TYPE_TOMBSTONE == rd[0].record_type))
   1692   {
   1693     /* This record set contains only a tombstone! */
   1694     lctx->only_tombstone = GNUNET_YES;
   1695   }
   1696   if (GNUNET_OK !=
   1697       GNUNET_GNSRECORD_normalize_record_set (label,
   1698                                              rd,
   1699                                              rd_count,
   1700                                              rd_pub,
   1701                                              &rd_pub_count,
   1702                                              &lctx->exp,
   1703                                              GNUNET_GNSRECORD_FILTER_OMIT_PRIVATE,
   1704                                              &emsg))
   1705   {
   1706     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1707                 "%s\n", emsg);
   1708     GNUNET_free (emsg);
   1709   }
   1710 }
   1711 
   1712 
   1713 static enum GNUNET_ErrorCode
   1714 store_record_set (struct NamestoreClient *nc,
   1715                   const struct GNUNET_CRYPTO_BlindablePrivateKey *private_key,
   1716                   const struct RecordSet *rd_set,
   1717                   ssize_t *len)
   1718 {
   1719   size_t name_len;
   1720   size_t rd_ser_len;
   1721   const char *name_tmp;
   1722   const char *rd_ser;
   1723   char *emsg;
   1724   char *conv_name;
   1725   unsigned int rd_count;
   1726   int res;
   1727   enum GNUNET_ErrorCode ec;
   1728   struct GNUNET_TIME_Absolute new_block_exp;
   1729   struct LookupExistingRecordsContext lctx;
   1730   *len = sizeof (struct RecordSet);
   1731 
   1732   ec = GNUNET_EC_NONE;
   1733   lctx.only_tombstone = GNUNET_NO;
   1734   lctx.exp = GNUNET_TIME_UNIT_ZERO_ABS;
   1735   new_block_exp = GNUNET_TIME_UNIT_ZERO_ABS;
   1736   name_len = ntohs (rd_set->name_len);
   1737   *len += name_len;
   1738   rd_count = ntohs (rd_set->rd_count);
   1739   rd_ser_len = ntohs (rd_set->rd_len);
   1740   *len += rd_ser_len;
   1741   name_tmp = (const char *) &rd_set[1];
   1742   rd_ser = &name_tmp[name_len];
   1743   {
   1744     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL (rd_count)];
   1745 
   1746     /* Extracting and converting private key */
   1747     conv_name = GNUNET_GNSRECORD_string_normalize (name_tmp);
   1748     if (NULL == conv_name)
   1749     {
   1750       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1751                   "Error normalizing name `%s'\n",
   1752                   name_tmp);
   1753       return GNUNET_EC_NAMESTORE_LABEL_INVALID;
   1754     }
   1755 
   1756     /* Check name for validity */
   1757     if (GNUNET_OK != GNUNET_GNSRECORD_label_check (conv_name, &emsg))
   1758     {
   1759       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1760                   "Label invalid: `%s'\n",
   1761                   emsg);
   1762       GNUNET_free (emsg);
   1763       GNUNET_free (conv_name);
   1764       return GNUNET_EC_NAMESTORE_LABEL_INVALID;
   1765     }
   1766 
   1767     if (GNUNET_OK !=
   1768         GNUNET_GNSRECORD_records_deserialize (rd_ser_len, rd_ser, rd_count,
   1769                                               rd))
   1770     {
   1771       GNUNET_free (conv_name);
   1772       return GNUNET_EC_NAMESTORE_RECORD_DATA_INVALID;
   1773     }
   1774 
   1775     GNUNET_STATISTICS_update (statistics,
   1776                               "Well-formed store requests received",
   1777                               1,
   1778                               GNUNET_NO);
   1779     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1780                 "Creating %u records for name `%s'\n",
   1781                 (unsigned int) rd_count,
   1782                 conv_name);
   1783     if ((0 == rd_count) &&
   1784         (GNUNET_NO == nc->GSN_database->lookup_records (nc->GSN_database->cls,
   1785                                                         private_key,
   1786                                                         conv_name,
   1787                                                         &get_existing_rd_exp,
   1788                                                         &lctx)))
   1789     {
   1790       /* This name does not exist, so cannot be removed */
   1791       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1792                   "Name `%s' does not exist, no deletion required\n",
   1793                   conv_name);
   1794       res = GNUNET_NO;
   1795       ec = GNUNET_EC_NAMESTORE_RECORD_NOT_FOUND;
   1796     }
   1797     else
   1798     {
   1799       /* remove "NICK" records, unless this is for the
   1800        #GNUNET_GNS_EMPTY_LABEL_AT label
   1801        We may need one additional record later for tombstone.
   1802        FIXME: Since we must normalize the record set (check for
   1803        consistency etc) we have to iterate the set twice.
   1804        May be inefficient.
   1805        We cannot really move the nick caching into GNSRECORD.
   1806        */
   1807       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL (rd_count)];
   1808       struct GNUNET_GNSRECORD_Data rd_nf[GNUNET_NZL (rd_count) + 1];
   1809       unsigned int rd_clean_off;
   1810       unsigned int rd_nf_count;
   1811       int have_nick;
   1812 
   1813       rd_clean_off = 0;
   1814       have_nick = GNUNET_NO;
   1815       for (unsigned int i = 0; i < rd_count; i++)
   1816       {
   1817         rd_clean[rd_clean_off] = rd[i];
   1818 
   1819         if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) ||
   1820             (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type))
   1821           rd_clean_off++;
   1822 
   1823         if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) &&
   1824             (GNUNET_GNSRECORD_TYPE_NICK == rd[i].record_type))
   1825         {
   1826           cache_nick (private_key, &rd[i]);
   1827           have_nick = GNUNET_YES;
   1828         }
   1829       }
   1830       if (GNUNET_OK !=
   1831           GNUNET_GNSRECORD_normalize_record_set (conv_name,
   1832                                                  rd_clean,
   1833                                                  rd_clean_off,
   1834                                                  rd_nf,
   1835                                                  &rd_nf_count,
   1836                                                  &new_block_exp,
   1837                                                  GNUNET_GNSRECORD_FILTER_INCLUDE_MAINTENANCE,
   1838                                                  &emsg))
   1839       {
   1840         GNUNET_free (conv_name);
   1841         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1842                     "Error normalizing record set: `%s'\n",
   1843                     emsg);
   1844         GNUNET_free (emsg);
   1845         return GNUNET_EC_NAMESTORE_RECORD_DATA_INVALID;
   1846       }
   1847       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1848                   "%u/%u records before tombstone\n", rd_nf_count,
   1849                   rd_clean_off);
   1850       /*
   1851        * If existing_block_exp is 0, then there was no record set
   1852        * and no tombstone.
   1853        * Otherwise, if the existing block expiration is after the
   1854        * new block expiration would be, we need to add a tombstone
   1855        * or update it.
   1856        */
   1857       if (GNUNET_TIME_absolute_cmp (new_block_exp, <=, lctx.exp))
   1858       {
   1859         rd_nf[rd_nf_count].record_type = GNUNET_GNSRECORD_TYPE_TOMBSTONE;
   1860         rd_nf[rd_nf_count].expiration_time =
   1861           lctx.exp.abs_value_us;
   1862         rd_nf[rd_nf_count].data = NULL;
   1863         rd_nf[rd_nf_count].data_size = 0;
   1864         rd_nf[rd_nf_count].flags = GNUNET_GNSRECORD_RF_PRIVATE
   1865                                    | GNUNET_GNSRECORD_RF_MAINTENANCE;
   1866         rd_nf_count++;
   1867       }
   1868       if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) &&
   1869           (GNUNET_NO == have_nick))
   1870       {
   1871         cache_nick (private_key, NULL);
   1872       }
   1873       res = nc->GSN_database->store_records (nc->GSN_database->cls,
   1874                                              private_key,
   1875                                              conv_name,
   1876                                              rd_nf_count,
   1877                                              rd_nf);
   1878       /* If after a store there is only a TOMBSTONE left, and
   1879        * there was >1 record under this label found (the tombstone; indicated
   1880        * through res != GNUNET_NO) then we should return "NOT FOUND" == GNUNET_NO
   1881        */
   1882       if ((GNUNET_SYSERR != res) &&
   1883           (0 == rd_count) &&
   1884           (1 == rd_nf_count) &&
   1885           (GNUNET_GNSRECORD_TYPE_TOMBSTONE == rd_nf[0].record_type) &&
   1886           (GNUNET_YES == lctx.only_tombstone))
   1887       {
   1888         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1889                     "Client tried to remove non-existent record\n");
   1890         ec = GNUNET_EC_NAMESTORE_RECORD_NOT_FOUND;
   1891       }
   1892     }
   1893 
   1894     if (GNUNET_SYSERR == res)
   1895     {
   1896       /* store not successful, no need to tell monitors */
   1897       GNUNET_free (conv_name);
   1898       return GNUNET_EC_NAMESTORE_STORE_FAILED;
   1899     }
   1900   }
   1901   return ec;
   1902 }
   1903 
   1904 
   1905 /**
   1906  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
   1907  *
   1908  * @param cls client sending the message
   1909  * @param rp_msg message of type `struct RecordStoreMessage`
   1910  */
   1911 static void
   1912 handle_record_store (void *cls, const struct RecordStoreMessage *rp_msg)
   1913 {
   1914   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
   1915   struct NamestoreClient *nc = cls;
   1916   uint32_t rid;
   1917   uint16_t rd_set_count;
   1918   const char *buf;
   1919   ssize_t read;
   1920   size_t key_len;
   1921   size_t kb_read;
   1922   size_t rp_msg_len;
   1923   size_t rs_len;
   1924   size_t rs_off;
   1925   struct StoreActivity *sa;
   1926   struct RecordSet *rs;
   1927   enum GNUNET_ErrorCode res;
   1928 
   1929   key_len = ntohs (rp_msg->key_len);
   1930   rp_msg_len = ntohs (rp_msg->gns_header.header.size);
   1931   rs_off = sizeof (*rp_msg) + key_len;
   1932   rs_len = rp_msg_len - rs_off;
   1933   if ((GNUNET_SYSERR ==
   1934        GNUNET_CRYPTO_read_private_key_from_buffer (&rp_msg[1],
   1935                                                    key_len,
   1936                                                    &zone,
   1937                                                    &kb_read)) ||
   1938       (kb_read != key_len))
   1939   {
   1940     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1941                 "Error reading private key\n");
   1942     GNUNET_SERVICE_client_drop (nc->client);
   1943     return;
   1944   }
   1945   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   1946               "Received NAMESTORE_RECORD_STORE message\n");
   1947   rid = ntohl (rp_msg->gns_header.r_id);
   1948   rd_set_count = ntohs (rp_msg->rd_set_count);
   1949   buf = (const char *) rp_msg + rs_off;
   1950   if (GNUNET_YES == ntohs (rp_msg->single_tx))
   1951     nc->GSN_database->begin_tx (nc->GSN_database->cls);
   1952   for (int i = 0; i < rd_set_count; i++)
   1953   {
   1954     rs = (struct RecordSet *) buf;
   1955     res = store_record_set (nc, &zone,
   1956                             rs, &read);
   1957     if (GNUNET_EC_NONE != res)
   1958     {
   1959       if (GNUNET_YES == ntohs (rp_msg->single_tx))
   1960         nc->GSN_database->rollback_tx (nc->GSN_database->cls);
   1961       send_store_response (nc, res, rid);
   1962       GNUNET_SERVICE_client_continue (nc->client);
   1963       return;
   1964     }
   1965     buf += read;
   1966   }
   1967   if (GNUNET_YES == ntohs (rp_msg->single_tx))
   1968     nc->GSN_database->commit_tx (nc->GSN_database->cls);
   1969   sa = GNUNET_malloc (sizeof(struct StoreActivity) + rs_len);
   1970   GNUNET_CONTAINER_DLL_insert (sa_head, sa_tail, sa);
   1971   sa->nc = nc;
   1972   sa->rs = (struct RecordSet *) &sa[1];
   1973   sa->rd_set_count = rd_set_count;
   1974   GNUNET_memcpy (&sa[1], (char *) rp_msg + rs_off, rs_len);
   1975   sa->rid = rid;
   1976   sa->rd_set_pos = 0;
   1977   sa->private_key = zone;
   1978   sa->zm_pos = monitor_head;
   1979   continue_store_activity (sa, GNUNET_YES);
   1980 }
   1981 
   1982 
   1983 /**
   1984  * Context for record remove operations passed from #handle_zone_to_name to
   1985  * #handle_zone_to_name_it as closure
   1986  */
   1987 struct ZoneToNameCtx
   1988 {
   1989   /**
   1990    * Namestore client
   1991    */
   1992   struct NamestoreClient *nc;
   1993 
   1994   /**
   1995    * Request id (to be used in the response to the client).
   1996    */
   1997   uint32_t rid;
   1998 
   1999   /**
   2000    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
   2001    * not finding a name for the zone still counts as a 'success' here,
   2002    * as this field is about the success of executing the IPC protocol.
   2003    */
   2004   enum GNUNET_ErrorCode ec;
   2005 };
   2006 
   2007 
   2008 /**
   2009  * Zone to name iterator
   2010  *
   2011  * @param cls struct ZoneToNameCtx *
   2012  * @param seq sequence number of the record, MUST NOT BE ZERO
   2013  * @param zone_key the zone key
   2014  * @param name name
   2015  * @param rd_count number of records in @a rd
   2016  * @param rd record data
   2017  */
   2018 static void
   2019 handle_zone_to_name_it (void *cls,
   2020                         uint64_t seq,
   2021                         const char *editor_hint,
   2022                         const struct GNUNET_CRYPTO_BlindablePrivateKey *zone_key
   2023                         ,
   2024                         const char *name,
   2025                         unsigned int rd_count,
   2026                         const struct GNUNET_GNSRECORD_Data *rd)
   2027 {
   2028   struct ZoneToNameCtx *ztn_ctx = cls;
   2029   struct GNUNET_MQ_Envelope *env;
   2030   struct ZoneToNameResponseMessage *ztnr_msg;
   2031   size_t name_len;
   2032   size_t key_len;
   2033   ssize_t rd_ser_len;
   2034   size_t msg_size;
   2035   char *name_tmp;
   2036   char *rd_tmp;
   2037 
   2038   GNUNET_assert (0 != seq);
   2039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2040               "Found result for zone-to-name lookup: `%s'\n",
   2041               name);
   2042   ztn_ctx->ec = GNUNET_EC_NONE;
   2043   name_len = (NULL == name) ? 0 : strlen (name) + 1;
   2044   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count, rd);
   2045   if (rd_ser_len < 0)
   2046   {
   2047     GNUNET_break (0);
   2048     ztn_ctx->ec = htonl (GNUNET_EC_NAMESTORE_UNKNOWN);
   2049     return;
   2050   }
   2051   key_len = GNUNET_CRYPTO_blindable_sk_get_length (zone_key);
   2052   msg_size = sizeof(struct ZoneToNameResponseMessage)
   2053              + name_len + rd_ser_len + key_len;
   2054   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
   2055   {
   2056     GNUNET_break (0);
   2057     ztn_ctx->ec = GNUNET_EC_NAMESTORE_RECORD_TOO_BIG;
   2058     return;
   2059   }
   2060   env =
   2061     GNUNET_MQ_msg_extra (ztnr_msg,
   2062                          key_len + name_len + rd_ser_len,
   2063                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
   2064   ztnr_msg->gns_header.header.size = htons (msg_size);
   2065   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
   2066   ztnr_msg->ec = htonl (ztn_ctx->ec);
   2067   ztnr_msg->rd_len = htons (rd_ser_len);
   2068   ztnr_msg->rd_count = htons (rd_count);
   2069   ztnr_msg->name_len = htons (name_len);
   2070   ztnr_msg->key_len = htons (key_len);
   2071   GNUNET_CRYPTO_write_blindable_sk_to_buffer (zone_key,
   2072                                               &ztnr_msg[1],
   2073                                               key_len);
   2074   name_tmp = (char *) &ztnr_msg[1] + key_len;
   2075   GNUNET_memcpy (name_tmp, name, name_len);
   2076   rd_tmp = &name_tmp[name_len];
   2077   GNUNET_assert (
   2078     rd_ser_len ==
   2079     GNUNET_GNSRECORD_records_serialize (rd_count, rd, rd_ser_len, rd_tmp));
   2080   ztn_ctx->ec = GNUNET_EC_NONE;
   2081   GNUNET_MQ_send (ztn_ctx->nc->mq, env);
   2082 }
   2083 
   2084 
   2085 static enum GNUNET_GenericReturnValue
   2086 check_zone_to_name (void *cls,
   2087                     const struct ZoneToNameMessage *zis_msg)
   2088 {
   2089   return GNUNET_OK;
   2090 }
   2091 
   2092 
   2093 /**
   2094  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
   2095  *
   2096  * @param cls client client sending the message
   2097  * @param ztn_msg message of type 'struct ZoneToNameMessage'
   2098  */
   2099 static void
   2100 handle_zone_to_name (void *cls, const struct ZoneToNameMessage *ztn_msg)
   2101 {
   2102   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
   2103   struct GNUNET_CRYPTO_BlindablePublicKey value_zone;
   2104   struct NamestoreClient *nc = cls;
   2105   struct ZoneToNameCtx ztn_ctx;
   2106   struct GNUNET_MQ_Envelope *env;
   2107   struct ZoneToNameResponseMessage *ztnr_msg;
   2108   size_t key_len;
   2109   size_t pkey_len;
   2110   size_t kb_read;
   2111 
   2112   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ZONE_TO_NAME message\n");
   2113   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
   2114   ztn_ctx.nc = nc;
   2115   ztn_ctx.ec = GNUNET_EC_NAMESTORE_ZONE_NOT_FOUND;
   2116   key_len = ntohs (ztn_msg->key_len);
   2117   if ((GNUNET_SYSERR ==
   2118        GNUNET_CRYPTO_read_private_key_from_buffer (&ztn_msg[1],
   2119                                                    key_len,
   2120                                                    &zone,
   2121                                                    &kb_read)) ||
   2122       (kb_read != key_len))
   2123   {
   2124     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2125                 "Error parsing private key.\n");
   2126     GNUNET_SERVICE_client_drop (nc->client);
   2127     GNUNET_break (0);
   2128     return;
   2129   }
   2130   pkey_len = ntohs (ztn_msg->pkey_len);
   2131   if ((GNUNET_SYSERR ==
   2132        GNUNET_CRYPTO_read_blindable_pk_from_buffer ((char*) &ztn_msg[1]
   2133                                                     + key_len,
   2134                                                     pkey_len,
   2135                                                     &value_zone,
   2136                                                     &kb_read)) ||
   2137       (kb_read != pkey_len))
   2138   {
   2139     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2140                 "Error parsing public key.\n");
   2141     GNUNET_SERVICE_client_drop (nc->client);
   2142     GNUNET_break (0);
   2143     return;
   2144   }
   2145   if (GNUNET_SYSERR == nc->GSN_database->zone_to_name (nc->GSN_database->cls,
   2146                                                        &zone,
   2147                                                        &value_zone,
   2148                                                        &handle_zone_to_name_it,
   2149                                                        &ztn_ctx))
   2150   {
   2151     /* internal error, hang up instead of signalling something
   2152        that might be wrong */
   2153     GNUNET_break (0);
   2154     GNUNET_SERVICE_client_drop (nc->client);
   2155     return;
   2156   }
   2157   if (GNUNET_EC_NAMESTORE_ZONE_NOT_FOUND == ztn_ctx.ec)
   2158   {
   2159     /* no result found, send empty response */
   2160     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2161                 "Found no result for zone-to-name lookup.\n");
   2162     env = GNUNET_MQ_msg (ztnr_msg,
   2163                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
   2164     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
   2165     ztnr_msg->ec = htonl (GNUNET_EC_NAMESTORE_ZONE_NOT_FOUND);
   2166     GNUNET_MQ_send (nc->mq, env);
   2167   }
   2168   GNUNET_SERVICE_client_continue (nc->client);
   2169 }
   2170 
   2171 
   2172 /**
   2173  * Context for record remove operations passed from
   2174  * #run_zone_iteration_round to #zone_iterate_proc as closure
   2175  */
   2176 struct ZoneIterationProcResult
   2177 {
   2178   /**
   2179    * The zone iteration handle
   2180    */
   2181   struct ZoneIteration *zi;
   2182 
   2183   /**
   2184    * Number of results left to be returned in this iteration.
   2185    */
   2186   uint64_t limit;
   2187 
   2188   /**
   2189    * Skip a result and run again unless GNUNET_NO
   2190    */
   2191   int run_again;
   2192 };
   2193 
   2194 
   2195 /**
   2196  * Process results for zone iteration from database
   2197  *
   2198  * @param cls struct ZoneIterationProcResult
   2199  * @param seq sequence number of the record, MUST NOT BE ZERO
   2200  * @param zone_key the zone key
   2201  * @param name name
   2202  * @param rd_count number of records for this name
   2203  * @param rd record data
   2204  */
   2205 static void
   2206 zone_iterate_proc (void *cls,
   2207                    uint64_t seq,
   2208                    const char *editor_hint,
   2209                    const struct GNUNET_CRYPTO_BlindablePrivateKey *zone_key,
   2210                    const char *name,
   2211                    unsigned int rd_count,
   2212                    const struct GNUNET_GNSRECORD_Data *rd)
   2213 {
   2214   struct ZoneIterationProcResult *proc = cls;
   2215 
   2216   GNUNET_assert (0 != seq);
   2217   if ((NULL == zone_key) && (NULL == name))
   2218   {
   2219     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration done\n");
   2220     return;
   2221   }
   2222   if ((NULL == zone_key) || (NULL == name))
   2223   {
   2224     /* what is this!? should never happen */
   2225     GNUNET_break (0);
   2226     return;
   2227   }
   2228   if (0 == proc->limit)
   2229   {
   2230     /* what is this!? should never happen */
   2231     GNUNET_break (0);
   2232     return;
   2233   }
   2234   proc->zi->seq = seq;
   2235   if (0 < send_lookup_response_with_filter (proc->zi->nc,
   2236                                             proc->zi->request_id,
   2237                                             zone_key,
   2238                                             name,
   2239                                             rd_count,
   2240                                             rd,
   2241                                             proc->zi->filter))
   2242     proc->limit--;
   2243   else
   2244     proc->run_again = GNUNET_YES;
   2245 }
   2246 
   2247 
   2248 /**
   2249  * Perform the next round of the zone iteration.
   2250  *
   2251  * @param zi zone iterator to process
   2252  * @param limit number of results to return in one pass
   2253  */
   2254 static void
   2255 run_zone_iteration_round (struct ZoneIteration *zi, uint64_t limit)
   2256 {
   2257   struct ZoneIterationProcResult proc;
   2258   struct GNUNET_TIME_Absolute start;
   2259   struct GNUNET_TIME_Relative duration;
   2260   struct NamestoreClient *nc = zi->nc;
   2261 
   2262   memset (&proc, 0, sizeof(proc));
   2263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2264               "Asked to return up to %llu records at position %llu\n",
   2265               (unsigned long long) limit,
   2266               (unsigned long long) zi->seq);
   2267   proc.zi = zi;
   2268   proc.limit = limit;
   2269   proc.run_again = GNUNET_YES;
   2270   start = GNUNET_TIME_absolute_get ();
   2271   while (GNUNET_YES == proc.run_again)
   2272   {
   2273     proc.run_again = GNUNET_NO;
   2274     GNUNET_break (GNUNET_SYSERR !=
   2275                   nc->GSN_database->iterate_records (nc->GSN_database->cls,
   2276                                                      (GNUNET_YES ==
   2277                                                       GNUNET_is_zero (
   2278                                                         &zi->zone))
   2279                                                ? NULL
   2280                                                : &zi->zone,
   2281                                                      zi->seq,
   2282                                                      proc.limit,
   2283                                                      &zone_iterate_proc,
   2284                                                      &proc));
   2285   }
   2286   duration = GNUNET_TIME_absolute_get_duration (start);
   2287   duration = GNUNET_TIME_relative_divide (duration, limit - proc.limit);
   2288   GNUNET_STATISTICS_set (statistics,
   2289                          "NAMESTORE iteration delay (μs/record)",
   2290                          duration.rel_value_us,
   2291                          GNUNET_NO);
   2292   if (0 == proc.limit)
   2293     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2294                 "Returned %llu results, more results available\n",
   2295                 (unsigned long long) limit);
   2296   zi->send_end = (0 != proc.limit);
   2297   zone_iteration_done_client_continue (zi);
   2298 }
   2299 
   2300 
   2301 static enum GNUNET_GenericReturnValue
   2302 check_iteration_start (void *cls,
   2303                        const struct ZoneIterationStartMessage *zis_msg)
   2304 {
   2305   uint16_t size;
   2306   size_t key_len;
   2307 
   2308   size = ntohs (zis_msg->gns_header.header.size);
   2309   key_len = ntohs (zis_msg->key_len);
   2310 
   2311   if (size < key_len + sizeof(*zis_msg))
   2312   {
   2313     GNUNET_break (0);
   2314     return GNUNET_SYSERR;
   2315   }
   2316   return GNUNET_OK;
   2317 }
   2318 
   2319 
   2320 /**
   2321  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
   2322  *
   2323  * @param cls the client sending the message
   2324  * @param zis_msg message from the client
   2325  */
   2326 static void
   2327 handle_iteration_start (void *cls,
   2328                         const struct ZoneIterationStartMessage *zis_msg)
   2329 {
   2330   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
   2331   struct NamestoreClient *nc = cls;
   2332   struct ZoneIteration *zi;
   2333   size_t key_len;
   2334   size_t kb_read;
   2335 
   2336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2337               "Received ZONE_ITERATION_START message\n");
   2338   key_len = ntohs (zis_msg->key_len);
   2339   zi = GNUNET_new (struct ZoneIteration);
   2340   if (0 < key_len)
   2341   {
   2342     if ((GNUNET_SYSERR ==
   2343          GNUNET_CRYPTO_read_private_key_from_buffer (&zis_msg[1],
   2344                                                      key_len,
   2345                                                      &zone,
   2346                                                      &kb_read)) ||
   2347         (kb_read != key_len))
   2348     {
   2349       GNUNET_SERVICE_client_drop (nc->client);
   2350       GNUNET_free (zi);
   2351       return;
   2352     }
   2353     zi->zone = zone;
   2354   }
   2355   zi->request_id = ntohl (zis_msg->gns_header.r_id);
   2356   zi->filter = ntohs (zis_msg->filter);
   2357   zi->offset = 0;
   2358   zi->nc = nc;
   2359   GNUNET_CONTAINER_DLL_insert (nc->op_head, nc->op_tail, zi);
   2360   run_zone_iteration_round (zi, 1);
   2361 }
   2362 
   2363 
   2364 /**
   2365  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
   2366  *
   2367  * @param cls the client sending the message
   2368  * @param zis_msg message from the client
   2369  */
   2370 static void
   2371 handle_iteration_stop (void *cls,
   2372                        const struct ZoneIterationStopMessage *zis_msg)
   2373 {
   2374   struct NamestoreClient *nc = cls;
   2375   struct ZoneIteration *zi;
   2376   uint32_t rid;
   2377 
   2378   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2379               "Received ZONE_ITERATION_STOP message\n");
   2380   rid = ntohl (zis_msg->gns_header.r_id);
   2381   for (zi = nc->op_head; NULL != zi; zi = zi->next)
   2382     if (zi->request_id == rid)
   2383       break;
   2384   if (NULL == zi)
   2385   {
   2386     GNUNET_break (0);
   2387     GNUNET_SERVICE_client_drop (nc->client);
   2388     return;
   2389   }
   2390   GNUNET_CONTAINER_DLL_remove (nc->op_head, nc->op_tail, zi);
   2391   GNUNET_free (zi);
   2392   GNUNET_SERVICE_client_continue (nc->client);
   2393 }
   2394 
   2395 
   2396 /**
   2397  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
   2398  *
   2399  * @param cls the client sending the message
   2400  * @param zis_msg message from the client
   2401  */
   2402 static void
   2403 handle_iteration_next (void *cls,
   2404                        const struct ZoneIterationNextMessage *zis_msg)
   2405 {
   2406   struct NamestoreClient *nc = cls;
   2407   struct ZoneIteration *zi;
   2408   uint32_t rid;
   2409   uint64_t limit;
   2410 
   2411   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2412               "Received ZONE_ITERATION_NEXT message\n");
   2413   GNUNET_STATISTICS_update (statistics,
   2414                             "Iteration NEXT messages received",
   2415                             1,
   2416                             GNUNET_NO);
   2417   rid = ntohl (zis_msg->gns_header.r_id);
   2418   limit = GNUNET_ntohll (zis_msg->limit);
   2419   for (zi = nc->op_head; NULL != zi; zi = zi->next)
   2420     if (zi->request_id == rid)
   2421       break;
   2422   if (NULL == zi)
   2423   {
   2424     GNUNET_break (0);
   2425     GNUNET_SERVICE_client_drop (nc->client);
   2426     return;
   2427   }
   2428   run_zone_iteration_round (zi, limit);
   2429 }
   2430 
   2431 
   2432 /**
   2433  * Function called when the monitor is ready for more data, and we
   2434  * should thus unblock PUT operations that were blocked on the
   2435  * monitor not being ready.
   2436  */
   2437 static void
   2438 monitor_unblock (struct ZoneMonitor *zm)
   2439 {
   2440   struct StoreActivity *sa = sa_head;
   2441 
   2442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2443               "Unblocking zone monitor %p\n", zm);
   2444   while ((NULL != sa) && (zm->limit > zm->iteration_cnt))
   2445   {
   2446     struct StoreActivity *sn = sa->next;
   2447 
   2448     if (sa->zm_pos == zm)
   2449       continue_store_activity (sa, GNUNET_YES);
   2450     sa = sn;
   2451   }
   2452   if (zm->limit > zm->iteration_cnt)
   2453   {
   2454     zm->sa_waiting = GNUNET_NO;
   2455     if (NULL != zm->sa_wait_warning)
   2456     {
   2457       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
   2458       zm->sa_wait_warning = NULL;
   2459     }
   2460   }
   2461   else if (GNUNET_YES == zm->sa_waiting)
   2462   {
   2463     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
   2464     if (NULL != zm->sa_wait_warning)
   2465       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
   2466     zm->sa_wait_warning =
   2467       GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
   2468                                     &warn_monitor_slow,
   2469                                     zm);
   2470   }
   2471 }
   2472 
   2473 
   2474 /**
   2475  * Send 'sync' message to zone monitor, we're now in sync.
   2476  *
   2477  * @param zm monitor that is now in sync
   2478  */
   2479 static void
   2480 monitor_sync (struct ZoneMonitor *zm)
   2481 {
   2482   struct GNUNET_MQ_Envelope *env;
   2483   struct GNUNET_MessageHeader *sync;
   2484   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2485               "Syncing zone monitor %p\n", zm);
   2486 
   2487   env = GNUNET_MQ_msg (sync, GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
   2488   GNUNET_MQ_send (zm->nc->mq, env);
   2489   /* mark iteration done */
   2490   zm->in_first_iteration = GNUNET_NO;
   2491   zm->iteration_cnt = 0;
   2492   if ((zm->limit > 0) && (zm->sa_waiting))
   2493     monitor_unblock (zm);
   2494 }
   2495 
   2496 
   2497 /**
   2498  * Obtain the next datum during the zone monitor's zone initial iteration.
   2499  *
   2500  * @param cls zone monitor that does its initial iteration
   2501  */
   2502 static void
   2503 monitor_iteration_next (void *cls);
   2504 
   2505 
   2506 /**
   2507  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
   2508  *
   2509  * @param cls a 'struct ZoneMonitor *' with information about the monitor
   2510  * @param seq sequence number of the record, MUST NOT BE ZERO
   2511  * @param zone_key zone key of the zone
   2512  * @param name name
   2513  * @param rd_count number of records in @a rd
   2514  * @param rd array of records
   2515  */
   2516 static void
   2517 monitor_iterate_cb (void *cls,
   2518                     uint64_t seq,
   2519                     const char *editor_hint,
   2520                     const struct GNUNET_CRYPTO_BlindablePrivateKey *zone_key,
   2521                     const char *name,
   2522                     unsigned int rd_count,
   2523                     const struct GNUNET_GNSRECORD_Data *rd)
   2524 {
   2525   struct ZoneMonitor *zm = cls;
   2526 
   2527   GNUNET_assert (0 != seq);
   2528   zm->seq = seq;
   2529   GNUNET_assert (NULL != name);
   2530   GNUNET_STATISTICS_update (statistics,
   2531                             "Monitor notifications sent",
   2532                             1,
   2533                             GNUNET_NO);
   2534   if (0 < send_lookup_response_with_filter (zm->nc, 0, zone_key, name,
   2535                                             rd_count, rd, zm->filter))
   2536   {
   2537     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2538                 "Sent records.\n");
   2539     zm->limit--;
   2540     zm->iteration_cnt--;
   2541   }
   2542   else
   2543     zm->run_again = GNUNET_YES;
   2544   if ((0 == zm->iteration_cnt) && (0 != zm->limit))
   2545   {
   2546     /* We are done with the current iteration batch, AND the
   2547        client would right now accept more, so go again! */
   2548     GNUNET_assert (NULL == zm->task);
   2549     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, zm);
   2550   }
   2551 }
   2552 
   2553 
   2554 static enum GNUNET_GenericReturnValue
   2555 check_monitor_start (void *cls,
   2556                      const struct ZoneMonitorStartMessage *zis_msg)
   2557 {
   2558   uint16_t size;
   2559   size_t key_len;
   2560 
   2561   size = ntohs (zis_msg->header.size);
   2562   key_len = ntohs (zis_msg->key_len);
   2563 
   2564   if (size < key_len + sizeof(*zis_msg))
   2565   {
   2566     GNUNET_break (0);
   2567     return GNUNET_SYSERR;
   2568   }
   2569   return GNUNET_OK;
   2570 }
   2571 
   2572 
   2573 /**
   2574  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
   2575  *
   2576  * @param cls the client sending the message
   2577  * @param zis_msg message from the client
   2578  */
   2579 static void
   2580 handle_monitor_start (void *cls, const struct
   2581                       ZoneMonitorStartMessage *zis_msg)
   2582 {
   2583   struct GNUNET_CRYPTO_BlindablePrivateKey zone;
   2584   struct NamestoreClient *nc = cls;
   2585   struct ZoneMonitor *zm;
   2586   size_t key_len;
   2587   size_t kb_read;
   2588 
   2589   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2590               "Received ZONE_MONITOR_START message\n");
   2591   zm = GNUNET_new (struct ZoneMonitor);
   2592   zm->nc = nc;
   2593   key_len = ntohs (zis_msg->key_len);
   2594   if (0 < key_len)
   2595   {
   2596     if ((GNUNET_SYSERR ==
   2597          GNUNET_CRYPTO_read_private_key_from_buffer (&zis_msg[1],
   2598                                                      key_len,
   2599                                                      &zone,
   2600                                                      &kb_read)) ||
   2601         (kb_read != key_len))
   2602     {
   2603       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   2604                   "Error reading private key\n");
   2605       GNUNET_SERVICE_client_drop (nc->client);
   2606       GNUNET_free (zm);
   2607       return;
   2608     }
   2609     zm->zone = zone;
   2610   }
   2611   zm->limit = 1;
   2612   zm->filter = ntohs (zis_msg->filter);
   2613   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
   2614   GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, zm);
   2615   GNUNET_SERVICE_client_mark_monitor (nc->client);
   2616   GNUNET_SERVICE_client_continue (nc->client);
   2617   GNUNET_notification_context_add (monitor_nc, nc->mq);
   2618   if (zm->in_first_iteration)
   2619     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, zm);
   2620   else
   2621     monitor_sync (zm);
   2622 }
   2623 
   2624 
   2625 /**
   2626  * Obtain the next datum during the zone monitor's zone initial iteration.
   2627  *
   2628  * @param cls zone monitor that does its initial iteration
   2629  */
   2630 static void
   2631 monitor_iteration_next (void *cls)
   2632 {
   2633   struct ZoneMonitor *zm = cls;
   2634   struct NamestoreClient *nc = zm->nc;
   2635   int ret;
   2636 
   2637   zm->task = NULL;
   2638   GNUNET_assert (0 == zm->iteration_cnt);
   2639   if (zm->limit > 16)
   2640     zm->iteration_cnt = zm->limit / 2;   /* leave half for monitor events */
   2641   else
   2642     zm->iteration_cnt = zm->limit;   /* use it all */
   2643   zm->run_again = GNUNET_YES;
   2644   while (GNUNET_YES == zm->run_again)
   2645   {
   2646     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2647                 "Running iteration\n");
   2648     zm->run_again = GNUNET_NO;
   2649     ret = nc->GSN_database->iterate_records (nc->GSN_database->cls,
   2650                                              (GNUNET_YES == GNUNET_is_zero (
   2651                                                 &zm->zone)) ? NULL : &zm->zone,
   2652                                              zm->seq,
   2653                                              zm->iteration_cnt,
   2654                                              &monitor_iterate_cb,
   2655                                              zm);
   2656   }
   2657   if (GNUNET_SYSERR == ret)
   2658   {
   2659     GNUNET_SERVICE_client_drop (zm->nc->client);
   2660     return;
   2661   }
   2662   if (GNUNET_NO == ret)
   2663   {
   2664     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2665                 "Zone empty... syncing\n");
   2666     /* empty zone */
   2667     monitor_sync (zm);
   2668     return;
   2669   }
   2670 }
   2671 
   2672 
   2673 /**
   2674  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
   2675  *
   2676  * @param cls the client sending the message
   2677  * @param nm message from the client
   2678  */
   2679 static void
   2680 handle_monitor_next (void *cls, const struct ZoneMonitorNextMessage *nm)
   2681 {
   2682   struct NamestoreClient *nc = cls;
   2683   struct ZoneMonitor *zm;
   2684   uint64_t inc;
   2685 
   2686   inc = GNUNET_ntohll (nm->limit);
   2687   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2688               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
   2689               (unsigned long long) inc);
   2690   for (zm = monitor_head; NULL != zm; zm = zm->next)
   2691     if (zm->nc == nc)
   2692       break;
   2693   if (NULL == zm)
   2694   {
   2695     GNUNET_break (0);
   2696     GNUNET_SERVICE_client_drop (nc->client);
   2697     return;
   2698   }
   2699   GNUNET_SERVICE_client_continue (nc->client);
   2700   if (zm->limit + inc < zm->limit)
   2701   {
   2702     GNUNET_break (0);
   2703     GNUNET_SERVICE_client_drop (nc->client);
   2704     return;
   2705   }
   2706   zm->limit += inc;
   2707   if ((zm->in_first_iteration) && (zm->limit == inc))
   2708   {
   2709     /* We are still iterating, and the previous iteration must
   2710        have stopped due to the client's limit, so continue it! */
   2711     GNUNET_assert (NULL == zm->task);
   2712     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, zm);
   2713   }
   2714   GNUNET_assert (zm->iteration_cnt <= zm->limit);
   2715   if ((zm->limit > zm->iteration_cnt) && (zm->sa_waiting))
   2716   {
   2717     monitor_unblock (zm);
   2718   }
   2719   else if (GNUNET_YES == zm->sa_waiting)
   2720   {
   2721     if (NULL != zm->sa_wait_warning)
   2722       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
   2723     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
   2724     zm->sa_wait_warning =
   2725       GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
   2726                                     &warn_monitor_slow,
   2727                                     zm);
   2728   }
   2729 }
   2730 
   2731 
   2732 /**
   2733  * Process namestore requests.
   2734  *
   2735  * @param cls closure
   2736  * @param cfg configuration to use
   2737  * @param service the initialized service
   2738  */
   2739 static void
   2740 run (void *cls,
   2741      const struct GNUNET_CONFIGURATION_Handle *cfg,
   2742      struct GNUNET_SERVICE_Handle *service)
   2743 {
   2744   char *database;
   2745   (void) cls;
   2746   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting namestore service\n");
   2747   return_orphaned = GNUNET_CONFIGURATION_get_value_yesno (cfg,
   2748                                                           "namestore",
   2749                                                           "RETURN_ORPHANED");
   2750   GSN_cfg = cfg;
   2751   monitor_nc = GNUNET_notification_context_create (1);
   2752   statistics = GNUNET_STATISTICS_create ("namestore", cfg);
   2753   /* Loading database plugin */
   2754   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg,
   2755                                                           "namestore",
   2756                                                           "database",
   2757                                                           &database))
   2758   {
   2759     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No database backend configured\n");
   2760     GNUNET_SCHEDULER_add_now (&cleanup_task, NULL);
   2761     return;
   2762   }
   2763   GNUNET_asprintf (&db_lib_name,
   2764                    "libgnunet_plugin_namestore_%s",
   2765                    database);
   2766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   2767               "Loading %s\n",
   2768               db_lib_name);
   2769   GSN_database = GNUNET_PLUGIN_load (GNUNET_OS_project_data_gnunet (),
   2770                                      db_lib_name,
   2771                                      (void *) cfg);
   2772   GNUNET_free (database);
   2773   if (NULL == GSN_database)
   2774   {
   2775     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   2776                 "Could not load database backend `%s'\n",
   2777                 db_lib_name);
   2778     GNUNET_free (db_lib_name);
   2779     GNUNET_SCHEDULER_add_now (&cleanup_task, NULL);
   2780     return;
   2781   }
   2782   GNUNET_SCHEDULER_add_shutdown (&cleanup_task, NULL);
   2783 }
   2784 
   2785 
   2786 /**
   2787  * Define "main" method using service macro.
   2788  */
   2789 GNUNET_SERVICE_MAIN (
   2790   GNUNET_OS_project_data_gnunet (),
   2791   "namestore",
   2792   GNUNET_SERVICE_OPTION_NONE,
   2793   &run,
   2794   &client_connect_cb,
   2795   &client_disconnect_cb,
   2796   NULL,
   2797   GNUNET_MQ_hd_var_size (record_store,
   2798                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
   2799                          struct RecordStoreMessage,
   2800                          NULL),
   2801   GNUNET_MQ_hd_var_size (edit_record_set,
   2802                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT,
   2803                          struct EditRecordSetMessage,
   2804                          NULL),
   2805   GNUNET_MQ_hd_var_size (edit_record_set_cancel,
   2806                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT_CANCEL,
   2807                          struct EditRecordSetCancelMessage,
   2808                          NULL),
   2809   GNUNET_MQ_hd_var_size (record_lookup,
   2810                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
   2811                          struct LabelLookupMessage,
   2812                          NULL),
   2813   GNUNET_MQ_hd_var_size (zone_to_name,
   2814                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
   2815                          struct ZoneToNameMessage,
   2816                          NULL),
   2817   GNUNET_MQ_hd_var_size (iteration_start,
   2818                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
   2819                          struct ZoneIterationStartMessage,
   2820                          NULL),
   2821   GNUNET_MQ_hd_fixed_size (iteration_next,
   2822                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
   2823                            struct ZoneIterationNextMessage,
   2824                            NULL),
   2825   GNUNET_MQ_hd_fixed_size (iteration_stop,
   2826                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
   2827                            struct ZoneIterationStopMessage,
   2828                            NULL),
   2829   GNUNET_MQ_hd_var_size (monitor_start,
   2830                          GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
   2831                          struct ZoneMonitorStartMessage,
   2832                          NULL),
   2833   GNUNET_MQ_hd_fixed_size (monitor_next,
   2834                            GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
   2835                            struct ZoneMonitorNextMessage,
   2836                            NULL),
   2837   GNUNET_MQ_handler_end ());
   2838 
   2839 
   2840 /* end of gnunet-service-namestore.c */