quickjs-tart

quickjs-based runtime for wallet-core logic
Log | Files | Refs | README | LICENSE

mqttd.c (23082B)


      1 /***************************************************************************
      2  *                                  _   _ ____  _
      3  *  Project                     ___| | | |  _ \| |
      4  *                             / __| | | | |_) | |
      5  *                            | (__| |_| |  _ <| |___
      6  *                             \___|\___/|_| \_\_____|
      7  *
      8  * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
      9  *
     10  * This software is licensed as described in the file COPYING, which
     11  * you should have received as part of this distribution. The terms
     12  * are also available at https://curl.se/docs/copyright.html.
     13  *
     14  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
     15  * copies of the Software, and permit persons to whom the Software is
     16  * furnished to do so, under the terms of the COPYING file.
     17  *
     18  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
     19  * KIND, either express or implied.
     20  *
     21  * SPDX-License-Identifier: curl
     22  *
     23  ***************************************************************************/
     24 #include "first.h"
     25 
     26 #include <stdlib.h>
     27 #include <string.h>
     28 
     29 /* Function
     30  *
     31  * Accepts a TCP connection on a custom port (IPv4 or IPv6).  Speaks MQTT.
     32  *
     33  * Read commands from FILE (set with --config). The commands control how to
     34  * act and is reset to defaults each client TCP connect.
     35  *
     36  */
     37 
     38 /* based on sockfilt.c */
     39 
     40 #define MQTT_MSG_CONNECT    0x10
     41 #define MQTT_MSG_CONNACK    0x20
     42 #define MQTT_MSG_PUBLISH    0x30
     43 /* #define MQTT_MSG_PUBACK     0x40 */
     44 #define MQTT_MSG_SUBSCRIBE  0x82
     45 #define MQTT_MSG_SUBACK     0x90
     46 #define MQTT_MSG_DISCONNECT 0xe0
     47 
     48 struct mqttd_configurable {
     49   unsigned char version; /* initial version byte in the request must match
     50                             this */
     51   bool publish_before_suback;
     52   bool short_publish;
     53   bool excessive_remaining;
     54   unsigned char error_connack;
     55   int testnum;
     56 };
     57 
     58 #define REQUEST_DUMP  "server.input"
     59 #define CONFIG_VERSION 5
     60 
     61 static struct mqttd_configurable m_config;
     62 
     63 static void mqttd_resetdefaults(void)
     64 {
     65   logmsg("Reset to defaults");
     66   m_config.version = CONFIG_VERSION;
     67   m_config.publish_before_suback = FALSE;
     68   m_config.short_publish = FALSE;
     69   m_config.excessive_remaining = FALSE;
     70   m_config.error_connack = 0;
     71   m_config.testnum = 0;
     72 }
     73 
     74 static void mqttd_getconfig(void)
     75 {
     76   FILE *fp = fopen(configfile, FOPEN_READTEXT);
     77   mqttd_resetdefaults();
     78   if(fp) {
     79     char buffer[512];
     80     logmsg("parse config file");
     81     while(fgets(buffer, sizeof(buffer), fp)) {
     82       char key[32];
     83       char value[32];
     84       if(2 == sscanf(buffer, "%31s %31s", key, value)) {
     85         if(!strcmp(key, "version")) {
     86           m_config.version = byteval(value);
     87           logmsg("version [%d] set", m_config.version);
     88         }
     89         else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
     90           logmsg("PUBLISH-before-SUBACK set");
     91           m_config.publish_before_suback = TRUE;
     92         }
     93         else if(!strcmp(key, "short-PUBLISH")) {
     94           logmsg("short-PUBLISH set");
     95           m_config.short_publish = TRUE;
     96         }
     97         else if(!strcmp(key, "error-CONNACK")) {
     98           m_config.error_connack = byteval(value);
     99           logmsg("error-CONNACK = %d", m_config.error_connack);
    100         }
    101         else if(!strcmp(key, "Testnum")) {
    102           m_config.testnum = atoi(value);
    103           logmsg("testnum = %d", m_config.testnum);
    104         }
    105         else if(!strcmp(key, "excessive-remaining")) {
    106           logmsg("excessive-remaining set");
    107           m_config.excessive_remaining = TRUE;
    108         }
    109       }
    110     }
    111     fclose(fp);
    112   }
    113   else {
    114     logmsg("No config file '%s' to read", configfile);
    115   }
    116 }
    117 
    118 typedef enum {
    119   FROM_CLIENT,
    120   FROM_SERVER
    121 } mqttdir;
    122 
    123 static void logprotocol(mqttdir dir,
    124                         const char *prefix, size_t remlen,
    125                         FILE *output,
    126                         unsigned char *buffer, ssize_t len)
    127 {
    128   char data[12000] = "";
    129   ssize_t i;
    130   unsigned char *ptr = buffer;
    131   char *optr = data;
    132   int left = sizeof(data);
    133 
    134   for(i = 0; i < len && (left >= 0); i++) {
    135     snprintf(optr, left, "%02x", ptr[i]);
    136     optr += 2;
    137     left -= 2;
    138   }
    139   fprintf(output, "%s %s %x %s\n",
    140           dir == FROM_CLIENT ? "client" : "server",
    141           prefix, (int)remlen, data);
    142 }
    143 
    144 
    145 /* return 0 on success */
    146 static int connack(FILE *dump, curl_socket_t fd)
    147 {
    148   unsigned char packet[]={
    149     MQTT_MSG_CONNACK, 0x02,
    150     0x00, 0x00
    151   };
    152   ssize_t rc;
    153 
    154   packet[3] = m_config.error_connack;
    155 
    156   rc = swrite(fd, (char *)packet, sizeof(packet));
    157   if(rc > 0) {
    158     logmsg("WROTE %zd bytes [CONNACK]", rc);
    159     loghex(packet, rc);
    160     logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
    161   }
    162   if(rc == sizeof(packet)) {
    163     return 0;
    164   }
    165   return 1;
    166 }
    167 
    168 /* return 0 on success */
    169 static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
    170 {
    171   unsigned char packet[]={
    172     MQTT_MSG_SUBACK, 0x03,
    173     0, 0, /* filled in below */
    174     0x00
    175   };
    176   ssize_t rc;
    177   packet[2] = (unsigned char)(packetid >> 8);
    178   packet[3] = (unsigned char)(packetid & 0xff);
    179 
    180   rc = swrite(fd, (char *)packet, sizeof(packet));
    181   if(rc == sizeof(packet)) {
    182     logmsg("WROTE %zd bytes [SUBACK]", rc);
    183     loghex(packet, rc);
    184     logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
    185     return 0;
    186   }
    187   return 1;
    188 }
    189 
    190 #ifdef QOS
    191 /* return 0 on success */
    192 static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
    193 {
    194   unsigned char packet[]={
    195     MQTT_MSG_PUBACK, 0x00,
    196     0, 0 /* filled in below */
    197   };
    198   ssize_t rc;
    199   packet[2] = (unsigned char)(packetid >> 8);
    200   packet[3] = (unsigned char)(packetid & 0xff);
    201 
    202   rc = swrite(fd, (char *)packet, sizeof(packet));
    203   if(rc == sizeof(packet)) {
    204     logmsg("WROTE %zd bytes [PUBACK]", rc);
    205     loghex(packet, rc);
    206     logprotocol(FROM_SERVER, dump, packet, rc);
    207     return 0;
    208   }
    209   logmsg("Failed sending [PUBACK]");
    210   return 1;
    211 }
    212 #endif
    213 
    214 /* return 0 on success */
    215 static int disconnect(FILE *dump, curl_socket_t fd)
    216 {
    217   unsigned char packet[]={
    218     MQTT_MSG_DISCONNECT, 0x00,
    219   };
    220   ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
    221   if(rc == sizeof(packet)) {
    222     logmsg("WROTE %zd bytes [DISCONNECT]", rc);
    223     loghex(packet, rc);
    224     logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
    225     return 0;
    226   }
    227   logmsg("Failed sending [DISCONNECT]");
    228   return 1;
    229 }
    230 
    231 
    232 
    233 /*
    234   do
    235 
    236      encodedByte = X MOD 128
    237 
    238      X = X DIV 128
    239 
    240      // if there are more data to encode, set the top bit of this byte
    241 
    242      if ( X > 0 )
    243 
    244         encodedByte = encodedByte OR 128
    245 
    246       endif
    247 
    248     'output' encodedByte
    249 
    250   while ( X > 0 )
    251 
    252 */
    253 
    254 /* return number of bytes used */
    255 static size_t encode_length(size_t packetlen,
    256                             unsigned char *remlength) /* 4 bytes */
    257 {
    258   size_t bytes = 0;
    259   unsigned char encode;
    260 
    261   do {
    262     encode = packetlen % 0x80;
    263     packetlen /= 0x80;
    264     if(packetlen)
    265       encode |= 0x80;
    266 
    267     remlength[bytes++] = encode;
    268 
    269     if(bytes > 3) {
    270       logmsg("too large packet!");
    271       return 0;
    272     }
    273   } while(packetlen);
    274 
    275   return bytes;
    276 }
    277 
    278 
    279 static size_t decode_length(unsigned char *buffer,
    280                             size_t buflen, size_t *lenbytes)
    281 {
    282   size_t len = 0;
    283   size_t mult = 1;
    284   size_t i;
    285   unsigned char encoded = 0x80;
    286 
    287   for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
    288     encoded = buffer[i];
    289     len += (encoded & 0x7f) * mult;
    290     mult *= 0x80;
    291   }
    292 
    293   if(lenbytes)
    294     *lenbytes = i;
    295 
    296   return len;
    297 }
    298 
    299 
    300 /* return 0 on success */
    301 static int publish(FILE *dump,
    302                    curl_socket_t fd, unsigned short packetid,
    303                    char *topic, const char *payload, size_t payloadlen)
    304 {
    305   size_t topiclen = strlen(topic);
    306   unsigned char *packet;
    307   size_t payloadindex;
    308   size_t remaininglength = topiclen + 2 + payloadlen;
    309   size_t packetlen;
    310   size_t sendamount;
    311   ssize_t rc;
    312   unsigned char rembuffer[4];
    313   size_t encodedlen;
    314 
    315   if(m_config.excessive_remaining) {
    316     /* manually set illegal remaining length */
    317     rembuffer[0] = 0xff;
    318     rembuffer[1] = 0xff;
    319     rembuffer[2] = 0xff;
    320     rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */
    321     encodedlen = 4;
    322   }
    323   else
    324     encodedlen = encode_length(remaininglength, rembuffer);
    325 
    326   /* one packet type byte (possibly two more for packetid) */
    327   packetlen = remaininglength + encodedlen + 1;
    328   packet = malloc(packetlen);
    329   if(!packet)
    330     return 1;
    331 
    332   packet[0] = MQTT_MSG_PUBLISH;
    333   memcpy(&packet[1], rembuffer, encodedlen);
    334 
    335   (void)packetid;
    336   /* packet_id if QoS is set */
    337 
    338   packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
    339   packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
    340   memcpy(&packet[3 + encodedlen], topic, topiclen);
    341 
    342   payloadindex = 3 + topiclen + encodedlen;
    343   memcpy(&packet[payloadindex], payload, payloadlen);
    344 
    345   sendamount = packetlen;
    346   if(m_config.short_publish)
    347     sendamount -= 2;
    348 
    349   rc = swrite(fd, (char *)packet, sendamount);
    350   if(rc > 0) {
    351     logmsg("WROTE %zd bytes [PUBLISH]", rc);
    352     loghex(packet, rc);
    353     logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
    354   }
    355   free(packet);
    356   if((size_t)rc == packetlen)
    357     return 0;
    358   return 1;
    359 }
    360 
    361 #define MAX_TOPIC_LENGTH 65535
    362 #define MAX_CLIENT_ID_LENGTH 32
    363 
    364 static char topic[MAX_TOPIC_LENGTH + 1];
    365 
    366 static int fixedheader(curl_socket_t fd,
    367                        unsigned char *bytep,
    368                        size_t *remaining_lengthp,
    369                        size_t *remaining_length_bytesp)
    370 {
    371   /* get the fixed header */
    372   unsigned char buffer[10];
    373 
    374   /* get the first two bytes */
    375   ssize_t rc = sread(fd, (char *)buffer, 2);
    376   size_t i;
    377   if(rc < 2) {
    378     logmsg("READ %zd bytes [SHORT!]", rc);
    379     return 1; /* fail */
    380   }
    381   logmsg("READ %zd bytes", rc);
    382   loghex(buffer, rc);
    383   *bytep = buffer[0];
    384 
    385   /* if the length byte has the top bit set, get the next one too */
    386   i = 1;
    387   while(buffer[i] & 0x80) {
    388     i++;
    389     rc = sread(fd, (char *)&buffer[i], 1);
    390     if(rc != 1) {
    391       logmsg("Remaining Length broken");
    392       return 1;
    393     }
    394   }
    395   *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
    396   logmsg("Remaining Length: %zu [%zu bytes]", *remaining_lengthp,
    397          *remaining_length_bytesp);
    398   return 0;
    399 }
    400 
    401 static curl_socket_t mqttit(curl_socket_t fd)
    402 {
    403   size_t buff_size = 10*1024;
    404   unsigned char *buffer = NULL;
    405   ssize_t rc;
    406   unsigned char byte;
    407   unsigned short packet_id;
    408   size_t payload_len;
    409   size_t client_id_length;
    410   size_t topic_len;
    411   size_t remaining_length = 0;
    412   size_t bytes = 0; /* remaining length field size in bytes */
    413   char client_id[MAX_CLIENT_ID_LENGTH];
    414   long testno;
    415   FILE *stream = NULL;
    416   FILE *dump;
    417   char dumpfile[256];
    418 
    419   static const char protocol[7] = {
    420     0x00, 0x04,       /* protocol length */
    421     'M','Q','T','T',  /* protocol name */
    422     0x04              /* protocol level */
    423   };
    424   snprintf(dumpfile, sizeof(dumpfile), "%s/%s", logdir, REQUEST_DUMP);
    425   dump = fopen(dumpfile, "ab");
    426   if(!dump)
    427     goto end;
    428 
    429   mqttd_getconfig();
    430 
    431   testno = m_config.testnum;
    432 
    433   if(testno)
    434     logmsg("Found test number %ld", testno);
    435 
    436   buffer = malloc(buff_size);
    437   if(!buffer) {
    438     logmsg("Out of memory, unable to allocate buffer");
    439     goto end;
    440   }
    441   memset(buffer, 0, buff_size);
    442 
    443   do {
    444     unsigned char usr_flag = 0x80;
    445     unsigned char passwd_flag = 0x40;
    446     unsigned char conn_flags;
    447     const size_t client_id_offset = 12;
    448     size_t start_usr;
    449     size_t start_passwd;
    450 
    451     /* get the fixed header */
    452     rc = fixedheader(fd, &byte, &remaining_length, &bytes);
    453     if(rc)
    454       break;
    455 
    456     if(remaining_length >= buff_size) {
    457       unsigned char *newbuffer;
    458       buff_size = remaining_length;
    459       newbuffer = realloc(buffer, buff_size);
    460       if(!newbuffer) {
    461         logmsg("Failed realloc of size %zu", buff_size);
    462         goto end;
    463       }
    464       buffer = newbuffer;
    465     }
    466 
    467     if(remaining_length) {
    468       /* reading variable header and payload into buffer */
    469       rc = sread(fd, (char *)buffer, remaining_length);
    470       if(rc > 0) {
    471         logmsg("READ %zd bytes", rc);
    472         loghex(buffer, rc);
    473       }
    474     }
    475 
    476     if(byte == MQTT_MSG_CONNECT) {
    477       logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
    478                   dump, buffer, rc);
    479 
    480       if(memcmp(protocol, buffer, sizeof(protocol))) {
    481         logmsg("Protocol preamble mismatch");
    482         goto end;
    483       }
    484       /* ignore the connect flag byte and two keepalive bytes */
    485       payload_len = (size_t)(buffer[10] << 8) | buffer[11];
    486       /* first part of the payload is the client ID */
    487       client_id_length = payload_len;
    488 
    489       /* checking if user and password flags were set */
    490       conn_flags = buffer[7];
    491 
    492       start_usr = client_id_offset + payload_len;
    493       if(usr_flag == (unsigned char)(conn_flags & usr_flag)) {
    494         logmsg("User flag is present in CONN flag");
    495         payload_len += (size_t)(buffer[start_usr] << 8) |
    496                        buffer[start_usr + 1];
    497         payload_len += 2; /* MSB and LSB for user length */
    498       }
    499 
    500       start_passwd = client_id_offset + payload_len;
    501       if(passwd_flag == (char)(conn_flags & passwd_flag)) {
    502         logmsg("Password flag is present in CONN flags");
    503         payload_len += (size_t)(buffer[start_passwd] << 8) |
    504                        buffer[start_passwd + 1];
    505         payload_len += 2; /* MSB and LSB for password length */
    506       }
    507 
    508       /* check the length of the payload */
    509       if((ssize_t)payload_len != (rc - 12)) {
    510         logmsg("Payload length mismatch, expected %zx got %zx",
    511                rc - 12, payload_len);
    512         goto end;
    513       }
    514       /* check the length of the client ID */
    515       else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) {
    516         logmsg("Too large client id");
    517         goto end;
    518       }
    519       memcpy(client_id, &buffer[12], client_id_length);
    520       client_id[client_id_length] = 0;
    521 
    522       logmsg("MQTT client connect accepted: %s", client_id);
    523 
    524       /* The first packet sent from the Server to the Client MUST be a
    525          CONNACK Packet */
    526 
    527       if(connack(dump, fd)) {
    528         logmsg("failed sending CONNACK");
    529         goto end;
    530       }
    531     }
    532     else if(byte == MQTT_MSG_SUBSCRIBE) {
    533       int error;
    534       char *data;
    535       size_t datalen;
    536       logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
    537                   dump, buffer, rc);
    538       logmsg("Incoming SUBSCRIBE");
    539 
    540       if(rc < 6) {
    541         logmsg("Too small SUBSCRIBE");
    542         goto end;
    543       }
    544 
    545       /* two bytes packet id */
    546       packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
    547 
    548       /* two bytes topic length */
    549       topic_len = (size_t)(buffer[2] << 8) | buffer[3];
    550       if(topic_len != (remaining_length - 5)) {
    551         logmsg("Wrong topic length, got %zu expected %zu",
    552                topic_len, remaining_length - 5);
    553         goto end;
    554       }
    555       memcpy(topic, &buffer[4], topic_len);
    556       topic[topic_len] = 0;
    557 
    558       /* there's a QoS byte (two bits) after the topic */
    559 
    560       logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
    561       stream = test2fopen(testno, logdir);
    562       if(!stream) {
    563         error = errno;
    564         logmsg("fopen() failed with error (%d) %s", error, strerror(error));
    565         logmsg("Couldn't open test file %ld", testno);
    566         goto end;
    567       }
    568       error = getpart(&data, &datalen, "reply", "data", stream);
    569       if(!error) {
    570         if(!m_config.publish_before_suback) {
    571           if(suback(dump, fd, packet_id)) {
    572             logmsg("failed sending SUBACK");
    573             free(data);
    574             goto end;
    575           }
    576         }
    577         if(publish(dump, fd, packet_id, topic, data, datalen)) {
    578           logmsg("PUBLISH failed");
    579           free(data);
    580           goto end;
    581         }
    582         free(data);
    583         if(m_config.publish_before_suback) {
    584           if(suback(dump, fd, packet_id)) {
    585             logmsg("failed sending SUBACK");
    586             goto end;
    587           }
    588         }
    589       }
    590       else {
    591         const char *def = "this is random payload yes yes it is";
    592         publish(dump, fd, packet_id, topic, def, strlen(def));
    593       }
    594       disconnect(dump, fd);
    595     }
    596     else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
    597       size_t topiclen;
    598 
    599       logmsg("Incoming PUBLISH");
    600       logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
    601                   dump, buffer, rc);
    602 
    603       topiclen = (size_t)(buffer[1 + bytes] << 8) | buffer[2 + bytes];
    604       logmsg("Got %zu bytes topic", topiclen);
    605 
    606 #ifdef QOS
    607       /* Handle packetid if there is one. Send puback if QoS > 0 */
    608       puback(dump, fd, 0);
    609 #endif
    610       /* expect a disconnect here */
    611       /* get the request */
    612       rc = sread(fd, (char *)&buffer[0], 2);
    613 
    614       logmsg("READ %zd bytes [DISCONNECT]", rc);
    615       loghex(buffer, rc);
    616       logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
    617       goto end;
    618     }
    619     else {
    620       /* not supported (yet) */
    621       goto end;
    622     }
    623   } while(1);
    624 
    625 end:
    626   if(buffer)
    627     free(buffer);
    628   if(dump)
    629     fclose(dump);
    630   if(stream)
    631     fclose(stream);
    632   return CURL_SOCKET_BAD;
    633 }
    634 
    635 /*
    636   sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
    637 
    638   if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
    639   accept()
    640 */
    641 static bool mqttd_incoming(curl_socket_t listenfd)
    642 {
    643   fd_set fds_read;
    644   fd_set fds_write;
    645   fd_set fds_err;
    646   int clients = 0; /* connected clients */
    647 
    648   if(got_exit_signal) {
    649     logmsg("signalled to die, exiting...");
    650     return FALSE;
    651   }
    652 
    653 #ifdef HAVE_GETPPID
    654   /* As a last resort, quit if socks5 process becomes orphan. */
    655   if(getppid() <= 1) {
    656     logmsg("process becomes orphan, exiting");
    657     return FALSE;
    658   }
    659 #endif
    660 
    661   do {
    662     ssize_t rc;
    663     int error = 0;
    664     curl_socket_t sockfd = listenfd;
    665     int maxfd = (int)sockfd;
    666 
    667     FD_ZERO(&fds_read);
    668     FD_ZERO(&fds_write);
    669     FD_ZERO(&fds_err);
    670 
    671     /* there's always a socket to wait for */
    672 #if defined(__DJGPP__)
    673 #pragma GCC diagnostic push
    674 #pragma GCC diagnostic ignored "-Warith-conversion"
    675 #endif
    676     FD_SET(sockfd, &fds_read);
    677 #if defined(__DJGPP__)
    678 #pragma GCC diagnostic pop
    679 #endif
    680 
    681     do {
    682       /* select() blocking behavior call on blocking descriptors please */
    683       rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
    684       if(got_exit_signal) {
    685         logmsg("signalled to die, exiting...");
    686         return FALSE;
    687       }
    688     } while((rc == -1) && ((error = SOCKERRNO) == SOCKEINTR));
    689 
    690     if(rc < 0) {
    691       logmsg("select() failed with error (%d) %s",
    692              error, strerror(error));
    693       return FALSE;
    694     }
    695 
    696     if(FD_ISSET(sockfd, &fds_read)) {
    697       curl_socket_t newfd = accept(sockfd, NULL, NULL);
    698       if(CURL_SOCKET_BAD == newfd) {
    699         error = SOCKERRNO;
    700         logmsg("accept() failed with error (%d) %s", error, sstrerror(error));
    701       }
    702       else {
    703         logmsg("====> Client connect, fd %ld. "
    704                "Read config from %s", (long)newfd, configfile);
    705         set_advisor_read_lock(loglockfile);
    706         (void)mqttit(newfd); /* until done */
    707         clear_advisor_read_lock(loglockfile);
    708 
    709         logmsg("====> Client disconnect");
    710         sclose(newfd);
    711       }
    712     }
    713   } while(clients);
    714 
    715   return TRUE;
    716 }
    717 
    718 static int test_mqttd(int argc, char *argv[])
    719 {
    720   curl_socket_t sock = CURL_SOCKET_BAD;
    721   curl_socket_t msgsock = CURL_SOCKET_BAD;
    722   int wrotepidfile = 0;
    723   int wroteportfile = 0;
    724   bool juggle_again;
    725   int error;
    726   int arg = 1;
    727 
    728   pidname = ".mqttd.pid";
    729   portname = ".mqttd.port";
    730   serverlogfile = "log/mqttd.log";
    731   configfile = "mqttd.config";
    732   server_port = 1883; /* MQTT default port */
    733 
    734   while(argc > arg) {
    735     if(!strcmp("--version", argv[arg])) {
    736       printf("mqttd IPv4%s\n",
    737 #ifdef USE_IPV6
    738              "/IPv6"
    739 #else
    740              ""
    741 #endif
    742              );
    743       return 0;
    744     }
    745     else if(!strcmp("--pidfile", argv[arg])) {
    746       arg++;
    747       if(argc > arg)
    748         pidname = argv[arg++];
    749     }
    750     else if(!strcmp("--portfile", argv[arg])) {
    751       arg++;
    752       if(argc > arg)
    753         portname = argv[arg++];
    754     }
    755     else if(!strcmp("--config", argv[arg])) {
    756       arg++;
    757       if(argc > arg)
    758         configfile = argv[arg++];
    759     }
    760     else if(!strcmp("--logfile", argv[arg])) {
    761       arg++;
    762       if(argc > arg)
    763         serverlogfile = argv[arg++];
    764     }
    765     else if(!strcmp("--logdir", argv[arg])) {
    766       arg++;
    767       if(argc > arg)
    768         logdir = argv[arg++];
    769     }
    770     else if(!strcmp("--ipv6", argv[arg])) {
    771 #ifdef USE_IPV6
    772       socket_domain = AF_INET6;
    773       ipv_inuse = "IPv6";
    774 #endif
    775       arg++;
    776     }
    777     else if(!strcmp("--ipv4", argv[arg])) {
    778       /* for completeness, we support this option as well */
    779 #ifdef USE_IPV6
    780       socket_domain = AF_INET;
    781       ipv_inuse = "IPv4";
    782 #endif
    783       arg++;
    784     }
    785     else if(!strcmp("--port", argv[arg])) {
    786       arg++;
    787       if(argc > arg) {
    788         char *endptr;
    789         unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
    790         if((endptr != argv[arg] + strlen(argv[arg])) ||
    791            ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
    792           fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
    793                   argv[arg]);
    794           return 0;
    795         }
    796         server_port = util_ultous(ulnum);
    797         arg++;
    798       }
    799     }
    800     else {
    801       puts("Usage: mqttd [option]\n"
    802            " --config [file]\n"
    803            " --version\n"
    804            " --logfile [file]\n"
    805            " --logdir [directory]\n"
    806            " --pidfile [file]\n"
    807            " --portfile [file]\n"
    808            " --ipv4\n"
    809            " --ipv6\n"
    810            " --port [port]\n");
    811       return 0;
    812     }
    813   }
    814 
    815   snprintf(loglockfile, sizeof(loglockfile), "%s/%s/mqtt-%s.lock",
    816            logdir, SERVERLOGS_LOCKDIR, ipv_inuse);
    817 
    818 #ifdef _WIN32
    819   if(win32_init())
    820     return 2;
    821 #endif
    822 
    823   CURLX_SET_BINMODE(stdin);
    824   CURLX_SET_BINMODE(stdout);
    825   CURLX_SET_BINMODE(stderr);
    826 
    827   install_signal_handlers(FALSE);
    828 
    829   sock = socket(socket_domain, SOCK_STREAM, 0);
    830 
    831   if(CURL_SOCKET_BAD == sock) {
    832     error = SOCKERRNO;
    833     logmsg("Error creating socket (%d) %s", error, sstrerror(error));
    834     goto mqttd_cleanup;
    835   }
    836 
    837   {
    838     /* passive daemon style */
    839     sock = sockdaemon(sock, &server_port, NULL, FALSE);
    840     if(CURL_SOCKET_BAD == sock) {
    841       goto mqttd_cleanup;
    842     }
    843     msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
    844   }
    845 
    846   logmsg("Running %s version", ipv_inuse);
    847   logmsg("Listening on port %hu", server_port);
    848 
    849   wrotepidfile = write_pidfile(pidname);
    850   if(!wrotepidfile) {
    851     goto mqttd_cleanup;
    852   }
    853 
    854   wroteportfile = write_portfile(portname, server_port);
    855   if(!wroteportfile) {
    856     goto mqttd_cleanup;
    857   }
    858 
    859   do {
    860     juggle_again = mqttd_incoming(sock);
    861   } while(juggle_again);
    862 
    863 mqttd_cleanup:
    864 
    865   if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
    866     sclose(msgsock);
    867 
    868   if(sock != CURL_SOCKET_BAD)
    869     sclose(sock);
    870 
    871   if(wrotepidfile)
    872     unlink(pidname);
    873   if(wroteportfile)
    874     unlink(portname);
    875 
    876   restore_signal_handlers(FALSE);
    877 
    878   if(got_exit_signal) {
    879     logmsg("============> mqttd exits with signal (%d)", exit_signal);
    880     /*
    881      * To properly set the return status of the process we
    882      * must raise the same signal SIGINT or SIGTERM that we
    883      * caught and let the old handler take care of it.
    884      */
    885     raise(exit_signal);
    886   }
    887 
    888   logmsg("============> mqttd quits");
    889   return 0;
    890 }