quickjs-tart

quickjs-based runtime for wallet-core logic
Log | Files | Refs | README | LICENSE

mqtt.c (27397B)


      1 /***************************************************************************
      2  *                                  _   _ ____  _
      3  *  Project                     ___| | | |  _ \| |
      4  *                             / __| | | | |_) | |
      5  *                            | (__| |_| |  _ <| |___
      6  *                             \___|\___/|_| \_\_____|
      7  *
      8  * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
      9  * Copyright (C) Björn Stenberg, <bjorn@haxx.se>
     10  *
     11  * This software is licensed as described in the file COPYING, which
     12  * you should have received as part of this distribution. The terms
     13  * are also available at https://curl.se/docs/copyright.html.
     14  *
     15  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
     16  * copies of the Software, and permit persons to whom the Software is
     17  * furnished to do so, under the terms of the COPYING file.
     18  *
     19  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
     20  * KIND, either express or implied.
     21  *
     22  * SPDX-License-Identifier: curl
     23  *
     24  ***************************************************************************/
     25 
     26 #include "curl_setup.h"
     27 
     28 #ifndef CURL_DISABLE_MQTT
     29 
     30 #include "urldata.h"
     31 #include <curl/curl.h>
     32 #include "transfer.h"
     33 #include "sendf.h"
     34 #include "progress.h"
     35 #include "mqtt.h"
     36 #include "select.h"
     37 #include "strdup.h"
     38 #include "url.h"
     39 #include "escape.h"
     40 #include "curlx/warnless.h"
     41 #include "curl_printf.h"
     42 #include "curl_memory.h"
     43 #include "multiif.h"
     44 #include "rand.h"
     45 
     46 /* The last #include file should be: */
     47 #include "memdebug.h"
     48 
     49 /* first byte is command.
     50    second byte is for flags. */
     51 #define MQTT_MSG_CONNECT    0x10
     52 /* #define MQTT_MSG_CONNACK    0x20 */
     53 #define MQTT_MSG_PUBLISH    0x30
     54 #define MQTT_MSG_SUBSCRIBE  0x82
     55 #define MQTT_MSG_SUBACK     0x90
     56 #define MQTT_MSG_DISCONNECT 0xe0
     57 #define MQTT_MSG_PINGREQ    0xC0
     58 #define MQTT_MSG_PINGRESP   0xD0
     59 
     60 #define MQTT_CONNACK_LEN 2
     61 #define MQTT_SUBACK_LEN 3
     62 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
     63 
     64 /* meta key for storing protocol meta at easy handle */
     65 #define CURL_META_MQTT_EASY   "meta:proto:mqtt:easy"
     66 /* meta key for storing protocol meta at connection */
     67 #define CURL_META_MQTT_CONN   "meta:proto:mqtt:conn"
     68 
     69 enum mqttstate {
     70   MQTT_FIRST,             /* 0 */
     71   MQTT_REMAINING_LENGTH,  /* 1 */
     72   MQTT_CONNACK,           /* 2 */
     73   MQTT_SUBACK,            /* 3 */
     74   MQTT_SUBACK_COMING,     /* 4 - the SUBACK remainder */
     75   MQTT_PUBWAIT,    /* 5 - wait for publish */
     76   MQTT_PUB_REMAIN,  /* 6 - wait for the remainder of the publish */
     77 
     78   MQTT_NOSTATE /* 7 - never used an actual state */
     79 };
     80 
     81 struct mqtt_conn {
     82   enum mqttstate state;
     83   enum mqttstate nextstate; /* switch to this after remaining length is
     84                                done */
     85   unsigned int packetid;
     86 };
     87 
     88 /* protocol-specific transfer-related data */
     89 struct MQTT {
     90   struct dynbuf sendbuf;
     91   /* when receiving */
     92   struct dynbuf recvbuf;
     93   size_t npacket; /* byte counter */
     94   size_t remaining_length;
     95   unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
     96   struct curltime lastTime; /* last time we sent or received data */
     97   unsigned char firstbyte;
     98   BIT(pingsent); /* 1 while we wait for ping response */
     99 };
    100 
    101 
    102 /*
    103  * Forward declarations.
    104  */
    105 
    106 static CURLcode mqtt_do(struct Curl_easy *data, bool *done);
    107 static CURLcode mqtt_done(struct Curl_easy *data,
    108                           CURLcode status, bool premature);
    109 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done);
    110 static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn,
    111                         curl_socket_t *sock);
    112 static CURLcode mqtt_setup_conn(struct Curl_easy *data,
    113                                 struct connectdata *conn);
    114 
    115 /*
    116  * MQTT protocol handler.
    117  */
    118 
    119 const struct Curl_handler Curl_handler_mqtt = {
    120   "mqtt",                             /* scheme */
    121   mqtt_setup_conn,                    /* setup_connection */
    122   mqtt_do,                            /* do_it */
    123   mqtt_done,                          /* done */
    124   ZERO_NULL,                          /* do_more */
    125   ZERO_NULL,                          /* connect_it */
    126   ZERO_NULL,                          /* connecting */
    127   mqtt_doing,                         /* doing */
    128   ZERO_NULL,                          /* proto_getsock */
    129   mqtt_getsock,                       /* doing_getsock */
    130   ZERO_NULL,                          /* domore_getsock */
    131   ZERO_NULL,                          /* perform_getsock */
    132   ZERO_NULL,                          /* disconnect */
    133   ZERO_NULL,                          /* write_resp */
    134   ZERO_NULL,                          /* write_resp_hd */
    135   ZERO_NULL,                          /* connection_check */
    136   ZERO_NULL,                          /* attach connection */
    137   ZERO_NULL,                          /* follow */
    138   PORT_MQTT,                          /* defport */
    139   CURLPROTO_MQTT,                     /* protocol */
    140   CURLPROTO_MQTT,                     /* family */
    141   PROTOPT_NONE                        /* flags */
    142 };
    143 
    144 static void mqtt_easy_dtor(void *key, size_t klen, void *entry)
    145 {
    146   struct MQTT *mq = entry;
    147   (void)key;
    148   (void)klen;
    149   curlx_dyn_free(&mq->sendbuf);
    150   curlx_dyn_free(&mq->recvbuf);
    151   free(mq);
    152 }
    153 
    154 static void mqtt_conn_dtor(void *key, size_t klen, void *entry)
    155 {
    156   (void)key;
    157   (void)klen;
    158   free(entry);
    159 }
    160 
    161 static CURLcode mqtt_setup_conn(struct Curl_easy *data,
    162                                 struct connectdata *conn)
    163 {
    164   /* setup MQTT specific meta data at easy handle and connection */
    165   struct mqtt_conn *mqtt;
    166   struct MQTT *mq;
    167 
    168   mqtt = calloc(1, sizeof(*mqtt));
    169   if(!mqtt ||
    170      Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor))
    171     return CURLE_OUT_OF_MEMORY;
    172 
    173   mq = calloc(1, sizeof(struct MQTT));
    174   if(!mq)
    175     return CURLE_OUT_OF_MEMORY;
    176   curlx_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
    177   curlx_dyn_init(&mq->sendbuf, DYN_MQTT_SEND);
    178   if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor))
    179     return CURLE_OUT_OF_MEMORY;
    180   return CURLE_OK;
    181 }
    182 
    183 static CURLcode mqtt_send(struct Curl_easy *data,
    184                           const char *buf, size_t len)
    185 {
    186   size_t n;
    187   CURLcode result;
    188   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    189 
    190   if(!mq)
    191     return CURLE_FAILED_INIT;
    192 
    193   result = Curl_xfer_send(data, buf, len, FALSE, &n);
    194   if(result)
    195     return result;
    196   mq->lastTime = curlx_now();
    197   Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
    198   if(len != n) {
    199     size_t nsend = len - n;
    200     if(curlx_dyn_len(&mq->sendbuf)) {
    201       DEBUGASSERT(curlx_dyn_len(&mq->sendbuf) >= nsend);
    202       result = curlx_dyn_tail(&mq->sendbuf, nsend); /* keep this much */
    203     }
    204     else {
    205       result = curlx_dyn_addn(&mq->sendbuf, &buf[n], nsend);
    206     }
    207   }
    208   else
    209     curlx_dyn_reset(&mq->sendbuf);
    210   return result;
    211 }
    212 
    213 /* Generic function called by the multi interface to figure out what socket(s)
    214    to wait for and for what actions during the DOING and PROTOCONNECT
    215    states */
    216 static int mqtt_getsock(struct Curl_easy *data,
    217                         struct connectdata *conn,
    218                         curl_socket_t *sock)
    219 {
    220   (void)data;
    221   sock[0] = conn->sock[FIRSTSOCKET];
    222   return GETSOCK_READSOCK(FIRSTSOCKET);
    223 }
    224 
    225 static int mqtt_encode_len(char *buf, size_t len)
    226 {
    227   int i;
    228 
    229   for(i = 0; (len > 0) && (i < 4); i++) {
    230     unsigned char encoded;
    231     encoded = len % 0x80;
    232     len /= 0x80;
    233     if(len)
    234       encoded |= 0x80;
    235     buf[i] = (char)encoded;
    236   }
    237 
    238   return i;
    239 }
    240 
    241 /* add the passwd to the CONNECT packet */
    242 static int add_passwd(const char *passwd, const size_t plen,
    243                       char *pkt, const size_t start, int remain_pos)
    244 {
    245   /* magic number that need to be set properly */
    246   const size_t conn_flags_pos = remain_pos + 8;
    247   if(plen > 0xffff)
    248     return 1;
    249 
    250   /* set password flag */
    251   pkt[conn_flags_pos] |= 0x40;
    252 
    253   /* length of password provided */
    254   pkt[start] = (char)((plen >> 8) & 0xFF);
    255   pkt[start + 1] = (char)(plen & 0xFF);
    256   memcpy(&pkt[start + 2], passwd, plen);
    257   return 0;
    258 }
    259 
    260 /* add user to the CONNECT packet */
    261 static int add_user(const char *username, const size_t ulen,
    262                     unsigned char *pkt, const size_t start, int remain_pos)
    263 {
    264   /* magic number that need to be set properly */
    265   const size_t conn_flags_pos = remain_pos + 8;
    266   if(ulen > 0xffff)
    267     return 1;
    268 
    269   /* set username flag */
    270   pkt[conn_flags_pos] |= 0x80;
    271   /* length of username provided */
    272   pkt[start] = (unsigned char)((ulen >> 8) & 0xFF);
    273   pkt[start + 1] = (unsigned char)(ulen & 0xFF);
    274   memcpy(&pkt[start + 2], username, ulen);
    275   return 0;
    276 }
    277 
    278 /* add client ID to the CONNECT packet */
    279 static int add_client_id(const char *client_id, const size_t client_id_len,
    280                          char *pkt, const size_t start)
    281 {
    282   if(client_id_len != MQTT_CLIENTID_LEN)
    283     return 1;
    284   pkt[start] = 0x00;
    285   pkt[start + 1] = MQTT_CLIENTID_LEN;
    286   memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN);
    287   return 0;
    288 }
    289 
    290 /* Set initial values of CONNECT packet */
    291 static int init_connpack(char *packet, char *remain, int remain_pos)
    292 {
    293   /* Fixed header starts */
    294   /* packet type */
    295   packet[0] = MQTT_MSG_CONNECT;
    296   /* remaining length field */
    297   memcpy(&packet[1], remain, remain_pos);
    298   /* Fixed header ends */
    299 
    300   /* Variable header starts */
    301   /* protocol length */
    302   packet[remain_pos + 1] = 0x00;
    303   packet[remain_pos + 2] = 0x04;
    304   /* protocol name */
    305   packet[remain_pos + 3] = 'M';
    306   packet[remain_pos + 4] = 'Q';
    307   packet[remain_pos + 5] = 'T';
    308   packet[remain_pos + 6] = 'T';
    309   /* protocol level */
    310   packet[remain_pos + 7] = 0x04;
    311   /* CONNECT flag: CleanSession */
    312   packet[remain_pos + 8] = 0x02;
    313   /* keep-alive 0 = disabled */
    314   packet[remain_pos + 9] = 0x00;
    315   packet[remain_pos + 10] = 0x3c;
    316   /* end of variable header */
    317   return remain_pos + 10;
    318 }
    319 
    320 static CURLcode mqtt_connect(struct Curl_easy *data)
    321 {
    322   CURLcode result = CURLE_OK;
    323   int pos = 0;
    324   int rc = 0;
    325   /* remain length */
    326   int remain_pos = 0;
    327   char remain[4] = {0};
    328   size_t packetlen = 0;
    329   size_t start_user = 0;
    330   size_t start_pwd = 0;
    331   char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
    332   const size_t clen = strlen("curl");
    333   char *packet = NULL;
    334 
    335   /* extracting username from request */
    336   const char *username = data->state.aptr.user ?
    337     data->state.aptr.user : "";
    338   const size_t ulen = strlen(username);
    339   /* extracting password from request */
    340   const char *passwd = data->state.aptr.passwd ?
    341     data->state.aptr.passwd : "";
    342   const size_t plen = strlen(passwd);
    343   const size_t payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2 +
    344   /* The plus 2s below are for the MSB and LSB describing the length of the
    345      string to be added on the payload. Refer to spec 1.5.2 and 1.5.4 */
    346     (ulen ? 2 : 0) +
    347     (plen ? 2 : 0);
    348 
    349   /* getting how much occupy the remain length */
    350   remain_pos = mqtt_encode_len(remain, payloadlen + 10);
    351 
    352   /* 10 length of variable header and 1 the first byte of the fixed header */
    353   packetlen = payloadlen + 10 + remain_pos + 1;
    354 
    355   /* allocating packet */
    356   if(packetlen > 0xFFFFFFF)
    357     return CURLE_WEIRD_SERVER_REPLY;
    358   packet = calloc(1, packetlen);
    359   if(!packet)
    360     return CURLE_OUT_OF_MEMORY;
    361 
    362   /* set initial values for the CONNECT packet */
    363   pos = init_connpack(packet, remain, remain_pos);
    364 
    365   result = Curl_rand_alnum(data, (unsigned char *)&client_id[clen],
    366                            MQTT_CLIENTID_LEN - clen + 1);
    367   /* add client id */
    368   rc = add_client_id(client_id, strlen(client_id), packet, pos + 1);
    369   if(rc) {
    370     failf(data, "Client ID length mismatched: [%zu]", strlen(client_id));
    371     result = CURLE_WEIRD_SERVER_REPLY;
    372     goto end;
    373   }
    374   infof(data, "Using client id '%s'", client_id);
    375 
    376   /* position where the user payload starts */
    377   start_user = pos + 3 + MQTT_CLIENTID_LEN;
    378   /* position where the password payload starts */
    379   start_pwd = start_user + ulen;
    380   /* if username was provided, add it to the packet */
    381   if(ulen) {
    382     start_pwd += 2;
    383 
    384     rc = add_user(username, ulen,
    385                   (unsigned char *)packet, start_user, remain_pos);
    386     if(rc) {
    387       failf(data, "Username too long: [%zu]", ulen);
    388       result = CURLE_WEIRD_SERVER_REPLY;
    389       goto end;
    390     }
    391   }
    392 
    393   /* if passwd was provided, add it to the packet */
    394   if(plen) {
    395     rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos);
    396     if(rc) {
    397       failf(data, "Password too long: [%zu]", plen);
    398       result = CURLE_WEIRD_SERVER_REPLY;
    399       goto end;
    400     }
    401   }
    402 
    403   if(!result)
    404     result = mqtt_send(data, packet, packetlen);
    405 
    406 end:
    407   if(packet)
    408     free(packet);
    409   Curl_safefree(data->state.aptr.user);
    410   Curl_safefree(data->state.aptr.passwd);
    411   return result;
    412 }
    413 
    414 static CURLcode mqtt_disconnect(struct Curl_easy *data)
    415 {
    416   return mqtt_send(data, "\xe0\x00", 2);
    417 }
    418 
    419 static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
    420 {
    421   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    422   size_t rlen;
    423   CURLcode result;
    424 
    425   if(!mq)
    426     return CURLE_FAILED_INIT;
    427   rlen = curlx_dyn_len(&mq->recvbuf);
    428 
    429   if(rlen < nbytes) {
    430     unsigned char readbuf[1024];
    431     size_t nread;
    432 
    433     DEBUGASSERT(nbytes - rlen < sizeof(readbuf));
    434     result = Curl_xfer_recv(data, (char *)readbuf, nbytes - rlen, &nread);
    435     if(result)
    436       return result;
    437     if(curlx_dyn_addn(&mq->recvbuf, readbuf, nread))
    438       return CURLE_OUT_OF_MEMORY;
    439     rlen = curlx_dyn_len(&mq->recvbuf);
    440   }
    441   return (rlen >= nbytes) ? CURLE_OK : CURLE_AGAIN;
    442 }
    443 
    444 static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
    445 {
    446   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    447   DEBUGASSERT(mq);
    448   if(mq) {
    449     size_t rlen = curlx_dyn_len(&mq->recvbuf);
    450     if(rlen <= nbytes)
    451       curlx_dyn_reset(&mq->recvbuf);
    452     else
    453       curlx_dyn_tail(&mq->recvbuf, rlen - nbytes);
    454   }
    455 }
    456 
    457 static CURLcode mqtt_verify_connack(struct Curl_easy *data)
    458 {
    459   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    460   CURLcode result;
    461   char *ptr;
    462 
    463   DEBUGASSERT(mq);
    464   if(!mq)
    465     return CURLE_FAILED_INIT;
    466 
    467   result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
    468   if(result)
    469     goto fail;
    470 
    471   /* verify CONNACK */
    472   DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN);
    473   ptr = curlx_dyn_ptr(&mq->recvbuf);
    474   Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN);
    475 
    476   if(ptr[0] != 0x00 || ptr[1] != 0x00) {
    477     failf(data, "Expected %02x%02x but got %02x%02x",
    478           0x00, 0x00, ptr[0], ptr[1]);
    479     curlx_dyn_reset(&mq->recvbuf);
    480     result = CURLE_WEIRD_SERVER_REPLY;
    481     goto fail;
    482   }
    483   mqtt_recv_consume(data, MQTT_CONNACK_LEN);
    484 fail:
    485   return result;
    486 }
    487 
    488 static CURLcode mqtt_get_topic(struct Curl_easy *data,
    489                                char **topic, size_t *topiclen)
    490 {
    491   char *path = data->state.up.path;
    492   CURLcode result = CURLE_URL_MALFORMAT;
    493   if(strlen(path) > 1) {
    494     result = Curl_urldecode(path + 1, 0, topic, topiclen, REJECT_NADA);
    495     if(!result && (*topiclen > 0xffff)) {
    496       failf(data, "Too long MQTT topic");
    497       result = CURLE_URL_MALFORMAT;
    498     }
    499   }
    500   else
    501     failf(data, "No MQTT topic found. Forgot to URL encode it?");
    502 
    503   return result;
    504 }
    505 
    506 static CURLcode mqtt_subscribe(struct Curl_easy *data)
    507 {
    508   CURLcode result = CURLE_OK;
    509   char *topic = NULL;
    510   size_t topiclen;
    511   unsigned char *packet = NULL;
    512   size_t packetlen;
    513   char encodedsize[4];
    514   size_t n;
    515   struct connectdata *conn = data->conn;
    516   struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
    517 
    518   if(!mqtt)
    519     return CURLE_FAILED_INIT;
    520 
    521   result = mqtt_get_topic(data, &topic, &topiclen);
    522   if(result)
    523     goto fail;
    524 
    525   mqtt->packetid++;
    526 
    527   packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
    528                                + 2 bytes topic length + QoS byte */
    529   n = mqtt_encode_len((char *)encodedsize, packetlen);
    530   packetlen += n + 1; /* add one for the control packet type byte */
    531 
    532   packet = malloc(packetlen);
    533   if(!packet) {
    534     result = CURLE_OUT_OF_MEMORY;
    535     goto fail;
    536   }
    537 
    538   packet[0] = MQTT_MSG_SUBSCRIBE;
    539   memcpy(&packet[1], encodedsize, n);
    540   packet[1 + n] = (mqtt->packetid >> 8) & 0xff;
    541   packet[2 + n] = mqtt->packetid & 0xff;
    542   packet[3 + n] = (topiclen >> 8) & 0xff;
    543   packet[4 + n ] = topiclen & 0xff;
    544   memcpy(&packet[5 + n], topic, topiclen);
    545   packet[5 + n + topiclen] = 0; /* QoS zero */
    546 
    547   result = mqtt_send(data, (const char *)packet, packetlen);
    548 
    549 fail:
    550   free(topic);
    551   free(packet);
    552   return result;
    553 }
    554 
    555 /*
    556  * Called when the first byte was already read.
    557  */
    558 static CURLcode mqtt_verify_suback(struct Curl_easy *data)
    559 {
    560   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    561   struct connectdata *conn = data->conn;
    562   struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
    563   CURLcode result;
    564   char *ptr;
    565 
    566   if(!mqtt || !mq)
    567     return CURLE_FAILED_INIT;
    568 
    569   result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
    570   if(result)
    571     goto fail;
    572 
    573   /* verify SUBACK */
    574   DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN);
    575   ptr = curlx_dyn_ptr(&mq->recvbuf);
    576   Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN);
    577 
    578   if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) ||
    579      ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) ||
    580      ptr[2] != 0x00) {
    581     curlx_dyn_reset(&mq->recvbuf);
    582     result = CURLE_WEIRD_SERVER_REPLY;
    583     goto fail;
    584   }
    585   mqtt_recv_consume(data, MQTT_SUBACK_LEN);
    586 fail:
    587   return result;
    588 }
    589 
    590 static CURLcode mqtt_publish(struct Curl_easy *data)
    591 {
    592   CURLcode result;
    593   char *payload = data->set.postfields;
    594   size_t payloadlen;
    595   char *topic = NULL;
    596   size_t topiclen;
    597   unsigned char *pkt = NULL;
    598   size_t i = 0;
    599   size_t remaininglength;
    600   size_t encodelen;
    601   char encodedbytes[4];
    602   curl_off_t postfieldsize = data->set.postfieldsize;
    603 
    604   if(!payload) {
    605     DEBUGF(infof(data, "mqtt_publish without payload, return bad arg"));
    606     return CURLE_BAD_FUNCTION_ARGUMENT;
    607   }
    608   if(postfieldsize < 0)
    609     payloadlen = strlen(payload);
    610   else
    611     payloadlen = (size_t)postfieldsize;
    612 
    613   result = mqtt_get_topic(data, &topic, &topiclen);
    614   if(result)
    615     goto fail;
    616 
    617   remaininglength = payloadlen + 2 + topiclen;
    618   encodelen = mqtt_encode_len(encodedbytes, remaininglength);
    619 
    620   /* add the control byte and the encoded remaining length */
    621   pkt = malloc(remaininglength + 1 + encodelen);
    622   if(!pkt) {
    623     result = CURLE_OUT_OF_MEMORY;
    624     goto fail;
    625   }
    626 
    627   /* assemble packet */
    628   pkt[i++] = MQTT_MSG_PUBLISH;
    629   memcpy(&pkt[i], encodedbytes, encodelen);
    630   i += encodelen;
    631   pkt[i++] = (topiclen >> 8) & 0xff;
    632   pkt[i++] = (topiclen & 0xff);
    633   memcpy(&pkt[i], topic, topiclen);
    634   i += topiclen;
    635   memcpy(&pkt[i], payload, payloadlen);
    636   i += payloadlen;
    637   result = mqtt_send(data, (const char *)pkt, i);
    638 
    639 fail:
    640   free(pkt);
    641   free(topic);
    642   return result;
    643 }
    644 
    645 static size_t mqtt_decode_len(unsigned char *buf,
    646                               size_t buflen, size_t *lenbytes)
    647 {
    648   size_t len = 0;
    649   size_t mult = 1;
    650   size_t i;
    651   unsigned char encoded = 128;
    652 
    653   for(i = 0; (i < buflen) && (encoded & 128); i++) {
    654     encoded = buf[i];
    655     len += (encoded & 127) * mult;
    656     mult *= 128;
    657   }
    658 
    659   if(lenbytes)
    660     *lenbytes = i;
    661 
    662   return len;
    663 }
    664 
    665 #ifdef DEBUGBUILD
    666 static const char *statenames[]={
    667   "MQTT_FIRST",
    668   "MQTT_REMAINING_LENGTH",
    669   "MQTT_CONNACK",
    670   "MQTT_SUBACK",
    671   "MQTT_SUBACK_COMING",
    672   "MQTT_PUBWAIT",
    673   "MQTT_PUB_REMAIN",
    674 
    675   "NOT A STATE"
    676 };
    677 #endif
    678 
    679 /* The only way to change state */
    680 static void mqstate(struct Curl_easy *data,
    681                     enum mqttstate state,
    682                     enum mqttstate nextstate) /* used if state == FIRST */
    683 {
    684   struct connectdata *conn = data->conn;
    685   struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
    686   DEBUGASSERT(mqtt);
    687   if(!mqtt)
    688     return;
    689 #ifdef DEBUGBUILD
    690   infof(data, "%s (from %s) (next is %s)",
    691         statenames[state],
    692         statenames[mqtt->state],
    693         (state == MQTT_FIRST) ? statenames[nextstate] : "");
    694 #endif
    695   mqtt->state = state;
    696   if(state == MQTT_FIRST)
    697     mqtt->nextstate = nextstate;
    698 }
    699 
    700 
    701 static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
    702 {
    703   CURLcode result = CURLE_OK;
    704   struct connectdata *conn = data->conn;
    705   size_t nread;
    706   size_t remlen;
    707   struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
    708   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    709   unsigned char packet;
    710 
    711   DEBUGASSERT(mqtt);
    712   if(!mqtt || !mq)
    713     return CURLE_FAILED_INIT;
    714 
    715   switch(mqtt->state) {
    716 MQTT_SUBACK_COMING:
    717   case MQTT_SUBACK_COMING:
    718     result = mqtt_verify_suback(data);
    719     if(result)
    720       break;
    721 
    722     mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
    723     break;
    724 
    725   case MQTT_SUBACK:
    726   case MQTT_PUBWAIT:
    727     /* we are expecting PUBLISH or SUBACK */
    728     packet = mq->firstbyte & 0xf0;
    729     if(packet == MQTT_MSG_PUBLISH)
    730       mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE);
    731     else if(packet == MQTT_MSG_SUBACK) {
    732       mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE);
    733       goto MQTT_SUBACK_COMING;
    734     }
    735     else if(packet == MQTT_MSG_DISCONNECT) {
    736       infof(data, "Got DISCONNECT");
    737       *done = TRUE;
    738       goto end;
    739     }
    740     else {
    741       result = CURLE_WEIRD_SERVER_REPLY;
    742       goto end;
    743     }
    744 
    745     /* -- switched state -- */
    746     remlen = mq->remaining_length;
    747     infof(data, "Remaining length: %zu bytes", remlen);
    748     if(data->set.max_filesize &&
    749        (curl_off_t)remlen > data->set.max_filesize) {
    750       failf(data, "Maximum file size exceeded");
    751       result = CURLE_FILESIZE_EXCEEDED;
    752       goto end;
    753     }
    754     Curl_pgrsSetDownloadSize(data, remlen);
    755     data->req.bytecount = 0;
    756     data->req.size = remlen;
    757     mq->npacket = remlen; /* get this many bytes */
    758     FALLTHROUGH();
    759   case MQTT_PUB_REMAIN: {
    760     /* read rest of packet, but no more. Cap to buffer size */
    761     char buffer[4*1024];
    762     size_t rest = mq->npacket;
    763     if(rest > sizeof(buffer))
    764       rest = sizeof(buffer);
    765     result = Curl_xfer_recv(data, buffer, rest, &nread);
    766     if(result) {
    767       if(CURLE_AGAIN == result) {
    768         infof(data, "EEEE AAAAGAIN");
    769       }
    770       goto end;
    771     }
    772     if(!nread) {
    773       infof(data, "server disconnected");
    774       result = CURLE_PARTIAL_FILE;
    775       goto end;
    776     }
    777 
    778     /* we received something */
    779     mq->lastTime = curlx_now();
    780 
    781     /* if QoS is set, message contains packet id */
    782     result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread);
    783     if(result)
    784       goto end;
    785 
    786     mq->npacket -= nread;
    787     if(!mq->npacket)
    788       /* no more PUBLISH payload, back to subscribe wait state */
    789       mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
    790     break;
    791   }
    792   default:
    793     DEBUGASSERT(NULL); /* illegal state */
    794     result = CURLE_WEIRD_SERVER_REPLY;
    795     goto end;
    796   }
    797 end:
    798   return result;
    799 }
    800 
    801 static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
    802 {
    803   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    804   CURLcode result = CURLE_OK;
    805   *done = FALSE; /* unconditionally */
    806 
    807   if(!mq)
    808     return CURLE_FAILED_INIT;
    809   mq->lastTime = curlx_now();
    810   mq->pingsent = FALSE;
    811 
    812   result = mqtt_connect(data);
    813   if(result) {
    814     failf(data, "Error %d sending MQTT CONNECT request", result);
    815     return result;
    816   }
    817   mqstate(data, MQTT_FIRST, MQTT_CONNACK);
    818   return CURLE_OK;
    819 }
    820 
    821 static CURLcode mqtt_done(struct Curl_easy *data,
    822                           CURLcode status, bool premature)
    823 {
    824   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    825   (void)status;
    826   (void)premature;
    827   if(mq) {
    828     curlx_dyn_free(&mq->sendbuf);
    829     curlx_dyn_free(&mq->recvbuf);
    830   }
    831   return CURLE_OK;
    832 }
    833 
    834 /* we ping regularly to avoid being disconnected by the server */
    835 static CURLcode mqtt_ping(struct Curl_easy *data)
    836 {
    837   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    838   CURLcode result = CURLE_OK;
    839   struct connectdata *conn = data->conn;
    840   struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
    841 
    842   if(!mqtt || !mq)
    843     return CURLE_FAILED_INIT;
    844 
    845   if(mqtt->state == MQTT_FIRST &&
    846      !mq->pingsent &&
    847      data->set.upkeep_interval_ms > 0) {
    848     struct curltime t = curlx_now();
    849     timediff_t diff = curlx_timediff(t, mq->lastTime);
    850 
    851     if(diff > data->set.upkeep_interval_ms) {
    852       /* 0xC0 is PINGREQ, and 0x00 is remaining length */
    853       unsigned char packet[2] = { 0xC0, 0x00 };
    854       size_t packetlen = sizeof(packet);
    855 
    856       result = mqtt_send(data, (char *)packet, packetlen);
    857       if(!result) {
    858         mq->pingsent = TRUE;
    859       }
    860       infof(data, "mqtt_ping: sent ping request.");
    861     }
    862   }
    863   return result;
    864 }
    865 
    866 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
    867 {
    868   struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
    869   CURLcode result = CURLE_OK;
    870   size_t nread;
    871   unsigned char recvbyte;
    872   struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN);
    873 
    874   if(!mqtt || !mq)
    875     return CURLE_FAILED_INIT;
    876 
    877   *done = FALSE;
    878 
    879   if(curlx_dyn_len(&mq->sendbuf)) {
    880     /* send the remainder of an outgoing packet */
    881     result = mqtt_send(data, curlx_dyn_ptr(&mq->sendbuf),
    882                        curlx_dyn_len(&mq->sendbuf));
    883     if(result)
    884       return result;
    885   }
    886 
    887   result = mqtt_ping(data);
    888   if(result)
    889     return result;
    890 
    891   infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
    892   switch(mqtt->state) {
    893   case MQTT_FIRST:
    894     /* Read the initial byte only */
    895     result = Curl_xfer_recv(data, (char *)&mq->firstbyte, 1, &nread);
    896     if(result)
    897       break;
    898     else if(!nread) {
    899       failf(data, "Connection disconnected");
    900       *done = TRUE;
    901       result = CURLE_RECV_ERROR;
    902       break;
    903     }
    904     Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1);
    905 
    906     /* we received something */
    907     mq->lastTime = curlx_now();
    908 
    909     /* remember the first byte */
    910     mq->npacket = 0;
    911     mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
    912     FALLTHROUGH();
    913   case MQTT_REMAINING_LENGTH:
    914     do {
    915       result = Curl_xfer_recv(data, (char *)&recvbyte, 1, &nread);
    916       if(result || !nread)
    917         break;
    918       Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&recvbyte, 1);
    919       mq->pkt_hd[mq->npacket++] = recvbyte;
    920     } while((recvbyte & 0x80) && (mq->npacket < 4));
    921     if(!result && nread && (recvbyte & 0x80))
    922       /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 +
    923          127 * 128^3 bytes. server tried to send more */
    924       result = CURLE_WEIRD_SERVER_REPLY;
    925     if(result)
    926       break;
    927     mq->remaining_length = mqtt_decode_len(mq->pkt_hd, mq->npacket, NULL);
    928     mq->npacket = 0;
    929     if(mq->remaining_length) {
    930       mqstate(data, mqtt->nextstate, MQTT_NOSTATE);
    931       break;
    932     }
    933     mqstate(data, MQTT_FIRST, MQTT_FIRST);
    934 
    935     if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
    936       infof(data, "Got DISCONNECT");
    937       *done = TRUE;
    938     }
    939 
    940     /* ping response */
    941     if(mq->firstbyte == MQTT_MSG_PINGRESP) {
    942       infof(data, "Received ping response.");
    943       mq->pingsent = FALSE;
    944       mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
    945     }
    946     break;
    947   case MQTT_CONNACK:
    948     result = mqtt_verify_connack(data);
    949     if(result)
    950       break;
    951 
    952     if(data->state.httpreq == HTTPREQ_POST) {
    953       result = mqtt_publish(data);
    954       if(!result) {
    955         result = mqtt_disconnect(data);
    956         *done = TRUE;
    957       }
    958       mqtt->nextstate = MQTT_FIRST;
    959     }
    960     else {
    961       result = mqtt_subscribe(data);
    962       if(!result) {
    963         mqstate(data, MQTT_FIRST, MQTT_SUBACK);
    964       }
    965     }
    966     break;
    967 
    968   case MQTT_SUBACK:
    969   case MQTT_PUBWAIT:
    970   case MQTT_PUB_REMAIN:
    971     result = mqtt_read_publish(data, done);
    972     break;
    973 
    974   default:
    975     failf(data, "State not handled yet");
    976     *done = TRUE;
    977     break;
    978   }
    979 
    980   if(result == CURLE_AGAIN)
    981     result = CURLE_OK;
    982   return result;
    983 }
    984 
    985 #endif /* CURL_DISABLE_MQTT */