mqtt.c (27397B)
1 /*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 9 * Copyright (C) Björn Stenberg, <bjorn@haxx.se> 10 * 11 * This software is licensed as described in the file COPYING, which 12 * you should have received as part of this distribution. The terms 13 * are also available at https://curl.se/docs/copyright.html. 14 * 15 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 16 * copies of the Software, and permit persons to whom the Software is 17 * furnished to do so, under the terms of the COPYING file. 18 * 19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 20 * KIND, either express or implied. 21 * 22 * SPDX-License-Identifier: curl 23 * 24 ***************************************************************************/ 25 26 #include "curl_setup.h" 27 28 #ifndef CURL_DISABLE_MQTT 29 30 #include "urldata.h" 31 #include <curl/curl.h> 32 #include "transfer.h" 33 #include "sendf.h" 34 #include "progress.h" 35 #include "mqtt.h" 36 #include "select.h" 37 #include "strdup.h" 38 #include "url.h" 39 #include "escape.h" 40 #include "curlx/warnless.h" 41 #include "curl_printf.h" 42 #include "curl_memory.h" 43 #include "multiif.h" 44 #include "rand.h" 45 46 /* The last #include file should be: */ 47 #include "memdebug.h" 48 49 /* first byte is command. 50 second byte is for flags. */ 51 #define MQTT_MSG_CONNECT 0x10 52 /* #define MQTT_MSG_CONNACK 0x20 */ 53 #define MQTT_MSG_PUBLISH 0x30 54 #define MQTT_MSG_SUBSCRIBE 0x82 55 #define MQTT_MSG_SUBACK 0x90 56 #define MQTT_MSG_DISCONNECT 0xe0 57 #define MQTT_MSG_PINGREQ 0xC0 58 #define MQTT_MSG_PINGRESP 0xD0 59 60 #define MQTT_CONNACK_LEN 2 61 #define MQTT_SUBACK_LEN 3 62 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ 63 64 /* meta key for storing protocol meta at easy handle */ 65 #define CURL_META_MQTT_EASY "meta:proto:mqtt:easy" 66 /* meta key for storing protocol meta at connection */ 67 #define CURL_META_MQTT_CONN "meta:proto:mqtt:conn" 68 69 enum mqttstate { 70 MQTT_FIRST, /* 0 */ 71 MQTT_REMAINING_LENGTH, /* 1 */ 72 MQTT_CONNACK, /* 2 */ 73 MQTT_SUBACK, /* 3 */ 74 MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */ 75 MQTT_PUBWAIT, /* 5 - wait for publish */ 76 MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */ 77 78 MQTT_NOSTATE /* 7 - never used an actual state */ 79 }; 80 81 struct mqtt_conn { 82 enum mqttstate state; 83 enum mqttstate nextstate; /* switch to this after remaining length is 84 done */ 85 unsigned int packetid; 86 }; 87 88 /* protocol-specific transfer-related data */ 89 struct MQTT { 90 struct dynbuf sendbuf; 91 /* when receiving */ 92 struct dynbuf recvbuf; 93 size_t npacket; /* byte counter */ 94 size_t remaining_length; 95 unsigned char pkt_hd[4]; /* for decoding the arriving packet length */ 96 struct curltime lastTime; /* last time we sent or received data */ 97 unsigned char firstbyte; 98 BIT(pingsent); /* 1 while we wait for ping response */ 99 }; 100 101 102 /* 103 * Forward declarations. 104 */ 105 106 static CURLcode mqtt_do(struct Curl_easy *data, bool *done); 107 static CURLcode mqtt_done(struct Curl_easy *data, 108 CURLcode status, bool premature); 109 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done); 110 static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn, 111 curl_socket_t *sock); 112 static CURLcode mqtt_setup_conn(struct Curl_easy *data, 113 struct connectdata *conn); 114 115 /* 116 * MQTT protocol handler. 117 */ 118 119 const struct Curl_handler Curl_handler_mqtt = { 120 "mqtt", /* scheme */ 121 mqtt_setup_conn, /* setup_connection */ 122 mqtt_do, /* do_it */ 123 mqtt_done, /* done */ 124 ZERO_NULL, /* do_more */ 125 ZERO_NULL, /* connect_it */ 126 ZERO_NULL, /* connecting */ 127 mqtt_doing, /* doing */ 128 ZERO_NULL, /* proto_getsock */ 129 mqtt_getsock, /* doing_getsock */ 130 ZERO_NULL, /* domore_getsock */ 131 ZERO_NULL, /* perform_getsock */ 132 ZERO_NULL, /* disconnect */ 133 ZERO_NULL, /* write_resp */ 134 ZERO_NULL, /* write_resp_hd */ 135 ZERO_NULL, /* connection_check */ 136 ZERO_NULL, /* attach connection */ 137 ZERO_NULL, /* follow */ 138 PORT_MQTT, /* defport */ 139 CURLPROTO_MQTT, /* protocol */ 140 CURLPROTO_MQTT, /* family */ 141 PROTOPT_NONE /* flags */ 142 }; 143 144 static void mqtt_easy_dtor(void *key, size_t klen, void *entry) 145 { 146 struct MQTT *mq = entry; 147 (void)key; 148 (void)klen; 149 curlx_dyn_free(&mq->sendbuf); 150 curlx_dyn_free(&mq->recvbuf); 151 free(mq); 152 } 153 154 static void mqtt_conn_dtor(void *key, size_t klen, void *entry) 155 { 156 (void)key; 157 (void)klen; 158 free(entry); 159 } 160 161 static CURLcode mqtt_setup_conn(struct Curl_easy *data, 162 struct connectdata *conn) 163 { 164 /* setup MQTT specific meta data at easy handle and connection */ 165 struct mqtt_conn *mqtt; 166 struct MQTT *mq; 167 168 mqtt = calloc(1, sizeof(*mqtt)); 169 if(!mqtt || 170 Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor)) 171 return CURLE_OUT_OF_MEMORY; 172 173 mq = calloc(1, sizeof(struct MQTT)); 174 if(!mq) 175 return CURLE_OUT_OF_MEMORY; 176 curlx_dyn_init(&mq->recvbuf, DYN_MQTT_RECV); 177 curlx_dyn_init(&mq->sendbuf, DYN_MQTT_SEND); 178 if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor)) 179 return CURLE_OUT_OF_MEMORY; 180 return CURLE_OK; 181 } 182 183 static CURLcode mqtt_send(struct Curl_easy *data, 184 const char *buf, size_t len) 185 { 186 size_t n; 187 CURLcode result; 188 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 189 190 if(!mq) 191 return CURLE_FAILED_INIT; 192 193 result = Curl_xfer_send(data, buf, len, FALSE, &n); 194 if(result) 195 return result; 196 mq->lastTime = curlx_now(); 197 Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n); 198 if(len != n) { 199 size_t nsend = len - n; 200 if(curlx_dyn_len(&mq->sendbuf)) { 201 DEBUGASSERT(curlx_dyn_len(&mq->sendbuf) >= nsend); 202 result = curlx_dyn_tail(&mq->sendbuf, nsend); /* keep this much */ 203 } 204 else { 205 result = curlx_dyn_addn(&mq->sendbuf, &buf[n], nsend); 206 } 207 } 208 else 209 curlx_dyn_reset(&mq->sendbuf); 210 return result; 211 } 212 213 /* Generic function called by the multi interface to figure out what socket(s) 214 to wait for and for what actions during the DOING and PROTOCONNECT 215 states */ 216 static int mqtt_getsock(struct Curl_easy *data, 217 struct connectdata *conn, 218 curl_socket_t *sock) 219 { 220 (void)data; 221 sock[0] = conn->sock[FIRSTSOCKET]; 222 return GETSOCK_READSOCK(FIRSTSOCKET); 223 } 224 225 static int mqtt_encode_len(char *buf, size_t len) 226 { 227 int i; 228 229 for(i = 0; (len > 0) && (i < 4); i++) { 230 unsigned char encoded; 231 encoded = len % 0x80; 232 len /= 0x80; 233 if(len) 234 encoded |= 0x80; 235 buf[i] = (char)encoded; 236 } 237 238 return i; 239 } 240 241 /* add the passwd to the CONNECT packet */ 242 static int add_passwd(const char *passwd, const size_t plen, 243 char *pkt, const size_t start, int remain_pos) 244 { 245 /* magic number that need to be set properly */ 246 const size_t conn_flags_pos = remain_pos + 8; 247 if(plen > 0xffff) 248 return 1; 249 250 /* set password flag */ 251 pkt[conn_flags_pos] |= 0x40; 252 253 /* length of password provided */ 254 pkt[start] = (char)((plen >> 8) & 0xFF); 255 pkt[start + 1] = (char)(plen & 0xFF); 256 memcpy(&pkt[start + 2], passwd, plen); 257 return 0; 258 } 259 260 /* add user to the CONNECT packet */ 261 static int add_user(const char *username, const size_t ulen, 262 unsigned char *pkt, const size_t start, int remain_pos) 263 { 264 /* magic number that need to be set properly */ 265 const size_t conn_flags_pos = remain_pos + 8; 266 if(ulen > 0xffff) 267 return 1; 268 269 /* set username flag */ 270 pkt[conn_flags_pos] |= 0x80; 271 /* length of username provided */ 272 pkt[start] = (unsigned char)((ulen >> 8) & 0xFF); 273 pkt[start + 1] = (unsigned char)(ulen & 0xFF); 274 memcpy(&pkt[start + 2], username, ulen); 275 return 0; 276 } 277 278 /* add client ID to the CONNECT packet */ 279 static int add_client_id(const char *client_id, const size_t client_id_len, 280 char *pkt, const size_t start) 281 { 282 if(client_id_len != MQTT_CLIENTID_LEN) 283 return 1; 284 pkt[start] = 0x00; 285 pkt[start + 1] = MQTT_CLIENTID_LEN; 286 memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN); 287 return 0; 288 } 289 290 /* Set initial values of CONNECT packet */ 291 static int init_connpack(char *packet, char *remain, int remain_pos) 292 { 293 /* Fixed header starts */ 294 /* packet type */ 295 packet[0] = MQTT_MSG_CONNECT; 296 /* remaining length field */ 297 memcpy(&packet[1], remain, remain_pos); 298 /* Fixed header ends */ 299 300 /* Variable header starts */ 301 /* protocol length */ 302 packet[remain_pos + 1] = 0x00; 303 packet[remain_pos + 2] = 0x04; 304 /* protocol name */ 305 packet[remain_pos + 3] = 'M'; 306 packet[remain_pos + 4] = 'Q'; 307 packet[remain_pos + 5] = 'T'; 308 packet[remain_pos + 6] = 'T'; 309 /* protocol level */ 310 packet[remain_pos + 7] = 0x04; 311 /* CONNECT flag: CleanSession */ 312 packet[remain_pos + 8] = 0x02; 313 /* keep-alive 0 = disabled */ 314 packet[remain_pos + 9] = 0x00; 315 packet[remain_pos + 10] = 0x3c; 316 /* end of variable header */ 317 return remain_pos + 10; 318 } 319 320 static CURLcode mqtt_connect(struct Curl_easy *data) 321 { 322 CURLcode result = CURLE_OK; 323 int pos = 0; 324 int rc = 0; 325 /* remain length */ 326 int remain_pos = 0; 327 char remain[4] = {0}; 328 size_t packetlen = 0; 329 size_t start_user = 0; 330 size_t start_pwd = 0; 331 char client_id[MQTT_CLIENTID_LEN + 1] = "curl"; 332 const size_t clen = strlen("curl"); 333 char *packet = NULL; 334 335 /* extracting username from request */ 336 const char *username = data->state.aptr.user ? 337 data->state.aptr.user : ""; 338 const size_t ulen = strlen(username); 339 /* extracting password from request */ 340 const char *passwd = data->state.aptr.passwd ? 341 data->state.aptr.passwd : ""; 342 const size_t plen = strlen(passwd); 343 const size_t payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2 + 344 /* The plus 2s below are for the MSB and LSB describing the length of the 345 string to be added on the payload. Refer to spec 1.5.2 and 1.5.4 */ 346 (ulen ? 2 : 0) + 347 (plen ? 2 : 0); 348 349 /* getting how much occupy the remain length */ 350 remain_pos = mqtt_encode_len(remain, payloadlen + 10); 351 352 /* 10 length of variable header and 1 the first byte of the fixed header */ 353 packetlen = payloadlen + 10 + remain_pos + 1; 354 355 /* allocating packet */ 356 if(packetlen > 0xFFFFFFF) 357 return CURLE_WEIRD_SERVER_REPLY; 358 packet = calloc(1, packetlen); 359 if(!packet) 360 return CURLE_OUT_OF_MEMORY; 361 362 /* set initial values for the CONNECT packet */ 363 pos = init_connpack(packet, remain, remain_pos); 364 365 result = Curl_rand_alnum(data, (unsigned char *)&client_id[clen], 366 MQTT_CLIENTID_LEN - clen + 1); 367 /* add client id */ 368 rc = add_client_id(client_id, strlen(client_id), packet, pos + 1); 369 if(rc) { 370 failf(data, "Client ID length mismatched: [%zu]", strlen(client_id)); 371 result = CURLE_WEIRD_SERVER_REPLY; 372 goto end; 373 } 374 infof(data, "Using client id '%s'", client_id); 375 376 /* position where the user payload starts */ 377 start_user = pos + 3 + MQTT_CLIENTID_LEN; 378 /* position where the password payload starts */ 379 start_pwd = start_user + ulen; 380 /* if username was provided, add it to the packet */ 381 if(ulen) { 382 start_pwd += 2; 383 384 rc = add_user(username, ulen, 385 (unsigned char *)packet, start_user, remain_pos); 386 if(rc) { 387 failf(data, "Username too long: [%zu]", ulen); 388 result = CURLE_WEIRD_SERVER_REPLY; 389 goto end; 390 } 391 } 392 393 /* if passwd was provided, add it to the packet */ 394 if(plen) { 395 rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos); 396 if(rc) { 397 failf(data, "Password too long: [%zu]", plen); 398 result = CURLE_WEIRD_SERVER_REPLY; 399 goto end; 400 } 401 } 402 403 if(!result) 404 result = mqtt_send(data, packet, packetlen); 405 406 end: 407 if(packet) 408 free(packet); 409 Curl_safefree(data->state.aptr.user); 410 Curl_safefree(data->state.aptr.passwd); 411 return result; 412 } 413 414 static CURLcode mqtt_disconnect(struct Curl_easy *data) 415 { 416 return mqtt_send(data, "\xe0\x00", 2); 417 } 418 419 static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes) 420 { 421 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 422 size_t rlen; 423 CURLcode result; 424 425 if(!mq) 426 return CURLE_FAILED_INIT; 427 rlen = curlx_dyn_len(&mq->recvbuf); 428 429 if(rlen < nbytes) { 430 unsigned char readbuf[1024]; 431 size_t nread; 432 433 DEBUGASSERT(nbytes - rlen < sizeof(readbuf)); 434 result = Curl_xfer_recv(data, (char *)readbuf, nbytes - rlen, &nread); 435 if(result) 436 return result; 437 if(curlx_dyn_addn(&mq->recvbuf, readbuf, nread)) 438 return CURLE_OUT_OF_MEMORY; 439 rlen = curlx_dyn_len(&mq->recvbuf); 440 } 441 return (rlen >= nbytes) ? CURLE_OK : CURLE_AGAIN; 442 } 443 444 static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes) 445 { 446 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 447 DEBUGASSERT(mq); 448 if(mq) { 449 size_t rlen = curlx_dyn_len(&mq->recvbuf); 450 if(rlen <= nbytes) 451 curlx_dyn_reset(&mq->recvbuf); 452 else 453 curlx_dyn_tail(&mq->recvbuf, rlen - nbytes); 454 } 455 } 456 457 static CURLcode mqtt_verify_connack(struct Curl_easy *data) 458 { 459 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 460 CURLcode result; 461 char *ptr; 462 463 DEBUGASSERT(mq); 464 if(!mq) 465 return CURLE_FAILED_INIT; 466 467 result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN); 468 if(result) 469 goto fail; 470 471 /* verify CONNACK */ 472 DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN); 473 ptr = curlx_dyn_ptr(&mq->recvbuf); 474 Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN); 475 476 if(ptr[0] != 0x00 || ptr[1] != 0x00) { 477 failf(data, "Expected %02x%02x but got %02x%02x", 478 0x00, 0x00, ptr[0], ptr[1]); 479 curlx_dyn_reset(&mq->recvbuf); 480 result = CURLE_WEIRD_SERVER_REPLY; 481 goto fail; 482 } 483 mqtt_recv_consume(data, MQTT_CONNACK_LEN); 484 fail: 485 return result; 486 } 487 488 static CURLcode mqtt_get_topic(struct Curl_easy *data, 489 char **topic, size_t *topiclen) 490 { 491 char *path = data->state.up.path; 492 CURLcode result = CURLE_URL_MALFORMAT; 493 if(strlen(path) > 1) { 494 result = Curl_urldecode(path + 1, 0, topic, topiclen, REJECT_NADA); 495 if(!result && (*topiclen > 0xffff)) { 496 failf(data, "Too long MQTT topic"); 497 result = CURLE_URL_MALFORMAT; 498 } 499 } 500 else 501 failf(data, "No MQTT topic found. Forgot to URL encode it?"); 502 503 return result; 504 } 505 506 static CURLcode mqtt_subscribe(struct Curl_easy *data) 507 { 508 CURLcode result = CURLE_OK; 509 char *topic = NULL; 510 size_t topiclen; 511 unsigned char *packet = NULL; 512 size_t packetlen; 513 char encodedsize[4]; 514 size_t n; 515 struct connectdata *conn = data->conn; 516 struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); 517 518 if(!mqtt) 519 return CURLE_FAILED_INIT; 520 521 result = mqtt_get_topic(data, &topic, &topiclen); 522 if(result) 523 goto fail; 524 525 mqtt->packetid++; 526 527 packetlen = topiclen + 5; /* packetid + topic (has a two byte length field) 528 + 2 bytes topic length + QoS byte */ 529 n = mqtt_encode_len((char *)encodedsize, packetlen); 530 packetlen += n + 1; /* add one for the control packet type byte */ 531 532 packet = malloc(packetlen); 533 if(!packet) { 534 result = CURLE_OUT_OF_MEMORY; 535 goto fail; 536 } 537 538 packet[0] = MQTT_MSG_SUBSCRIBE; 539 memcpy(&packet[1], encodedsize, n); 540 packet[1 + n] = (mqtt->packetid >> 8) & 0xff; 541 packet[2 + n] = mqtt->packetid & 0xff; 542 packet[3 + n] = (topiclen >> 8) & 0xff; 543 packet[4 + n ] = topiclen & 0xff; 544 memcpy(&packet[5 + n], topic, topiclen); 545 packet[5 + n + topiclen] = 0; /* QoS zero */ 546 547 result = mqtt_send(data, (const char *)packet, packetlen); 548 549 fail: 550 free(topic); 551 free(packet); 552 return result; 553 } 554 555 /* 556 * Called when the first byte was already read. 557 */ 558 static CURLcode mqtt_verify_suback(struct Curl_easy *data) 559 { 560 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 561 struct connectdata *conn = data->conn; 562 struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); 563 CURLcode result; 564 char *ptr; 565 566 if(!mqtt || !mq) 567 return CURLE_FAILED_INIT; 568 569 result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN); 570 if(result) 571 goto fail; 572 573 /* verify SUBACK */ 574 DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN); 575 ptr = curlx_dyn_ptr(&mq->recvbuf); 576 Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN); 577 578 if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) || 579 ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) || 580 ptr[2] != 0x00) { 581 curlx_dyn_reset(&mq->recvbuf); 582 result = CURLE_WEIRD_SERVER_REPLY; 583 goto fail; 584 } 585 mqtt_recv_consume(data, MQTT_SUBACK_LEN); 586 fail: 587 return result; 588 } 589 590 static CURLcode mqtt_publish(struct Curl_easy *data) 591 { 592 CURLcode result; 593 char *payload = data->set.postfields; 594 size_t payloadlen; 595 char *topic = NULL; 596 size_t topiclen; 597 unsigned char *pkt = NULL; 598 size_t i = 0; 599 size_t remaininglength; 600 size_t encodelen; 601 char encodedbytes[4]; 602 curl_off_t postfieldsize = data->set.postfieldsize; 603 604 if(!payload) { 605 DEBUGF(infof(data, "mqtt_publish without payload, return bad arg")); 606 return CURLE_BAD_FUNCTION_ARGUMENT; 607 } 608 if(postfieldsize < 0) 609 payloadlen = strlen(payload); 610 else 611 payloadlen = (size_t)postfieldsize; 612 613 result = mqtt_get_topic(data, &topic, &topiclen); 614 if(result) 615 goto fail; 616 617 remaininglength = payloadlen + 2 + topiclen; 618 encodelen = mqtt_encode_len(encodedbytes, remaininglength); 619 620 /* add the control byte and the encoded remaining length */ 621 pkt = malloc(remaininglength + 1 + encodelen); 622 if(!pkt) { 623 result = CURLE_OUT_OF_MEMORY; 624 goto fail; 625 } 626 627 /* assemble packet */ 628 pkt[i++] = MQTT_MSG_PUBLISH; 629 memcpy(&pkt[i], encodedbytes, encodelen); 630 i += encodelen; 631 pkt[i++] = (topiclen >> 8) & 0xff; 632 pkt[i++] = (topiclen & 0xff); 633 memcpy(&pkt[i], topic, topiclen); 634 i += topiclen; 635 memcpy(&pkt[i], payload, payloadlen); 636 i += payloadlen; 637 result = mqtt_send(data, (const char *)pkt, i); 638 639 fail: 640 free(pkt); 641 free(topic); 642 return result; 643 } 644 645 static size_t mqtt_decode_len(unsigned char *buf, 646 size_t buflen, size_t *lenbytes) 647 { 648 size_t len = 0; 649 size_t mult = 1; 650 size_t i; 651 unsigned char encoded = 128; 652 653 for(i = 0; (i < buflen) && (encoded & 128); i++) { 654 encoded = buf[i]; 655 len += (encoded & 127) * mult; 656 mult *= 128; 657 } 658 659 if(lenbytes) 660 *lenbytes = i; 661 662 return len; 663 } 664 665 #ifdef DEBUGBUILD 666 static const char *statenames[]={ 667 "MQTT_FIRST", 668 "MQTT_REMAINING_LENGTH", 669 "MQTT_CONNACK", 670 "MQTT_SUBACK", 671 "MQTT_SUBACK_COMING", 672 "MQTT_PUBWAIT", 673 "MQTT_PUB_REMAIN", 674 675 "NOT A STATE" 676 }; 677 #endif 678 679 /* The only way to change state */ 680 static void mqstate(struct Curl_easy *data, 681 enum mqttstate state, 682 enum mqttstate nextstate) /* used if state == FIRST */ 683 { 684 struct connectdata *conn = data->conn; 685 struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); 686 DEBUGASSERT(mqtt); 687 if(!mqtt) 688 return; 689 #ifdef DEBUGBUILD 690 infof(data, "%s (from %s) (next is %s)", 691 statenames[state], 692 statenames[mqtt->state], 693 (state == MQTT_FIRST) ? statenames[nextstate] : ""); 694 #endif 695 mqtt->state = state; 696 if(state == MQTT_FIRST) 697 mqtt->nextstate = nextstate; 698 } 699 700 701 static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done) 702 { 703 CURLcode result = CURLE_OK; 704 struct connectdata *conn = data->conn; 705 size_t nread; 706 size_t remlen; 707 struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); 708 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 709 unsigned char packet; 710 711 DEBUGASSERT(mqtt); 712 if(!mqtt || !mq) 713 return CURLE_FAILED_INIT; 714 715 switch(mqtt->state) { 716 MQTT_SUBACK_COMING: 717 case MQTT_SUBACK_COMING: 718 result = mqtt_verify_suback(data); 719 if(result) 720 break; 721 722 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); 723 break; 724 725 case MQTT_SUBACK: 726 case MQTT_PUBWAIT: 727 /* we are expecting PUBLISH or SUBACK */ 728 packet = mq->firstbyte & 0xf0; 729 if(packet == MQTT_MSG_PUBLISH) 730 mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE); 731 else if(packet == MQTT_MSG_SUBACK) { 732 mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE); 733 goto MQTT_SUBACK_COMING; 734 } 735 else if(packet == MQTT_MSG_DISCONNECT) { 736 infof(data, "Got DISCONNECT"); 737 *done = TRUE; 738 goto end; 739 } 740 else { 741 result = CURLE_WEIRD_SERVER_REPLY; 742 goto end; 743 } 744 745 /* -- switched state -- */ 746 remlen = mq->remaining_length; 747 infof(data, "Remaining length: %zu bytes", remlen); 748 if(data->set.max_filesize && 749 (curl_off_t)remlen > data->set.max_filesize) { 750 failf(data, "Maximum file size exceeded"); 751 result = CURLE_FILESIZE_EXCEEDED; 752 goto end; 753 } 754 Curl_pgrsSetDownloadSize(data, remlen); 755 data->req.bytecount = 0; 756 data->req.size = remlen; 757 mq->npacket = remlen; /* get this many bytes */ 758 FALLTHROUGH(); 759 case MQTT_PUB_REMAIN: { 760 /* read rest of packet, but no more. Cap to buffer size */ 761 char buffer[4*1024]; 762 size_t rest = mq->npacket; 763 if(rest > sizeof(buffer)) 764 rest = sizeof(buffer); 765 result = Curl_xfer_recv(data, buffer, rest, &nread); 766 if(result) { 767 if(CURLE_AGAIN == result) { 768 infof(data, "EEEE AAAAGAIN"); 769 } 770 goto end; 771 } 772 if(!nread) { 773 infof(data, "server disconnected"); 774 result = CURLE_PARTIAL_FILE; 775 goto end; 776 } 777 778 /* we received something */ 779 mq->lastTime = curlx_now(); 780 781 /* if QoS is set, message contains packet id */ 782 result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread); 783 if(result) 784 goto end; 785 786 mq->npacket -= nread; 787 if(!mq->npacket) 788 /* no more PUBLISH payload, back to subscribe wait state */ 789 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); 790 break; 791 } 792 default: 793 DEBUGASSERT(NULL); /* illegal state */ 794 result = CURLE_WEIRD_SERVER_REPLY; 795 goto end; 796 } 797 end: 798 return result; 799 } 800 801 static CURLcode mqtt_do(struct Curl_easy *data, bool *done) 802 { 803 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 804 CURLcode result = CURLE_OK; 805 *done = FALSE; /* unconditionally */ 806 807 if(!mq) 808 return CURLE_FAILED_INIT; 809 mq->lastTime = curlx_now(); 810 mq->pingsent = FALSE; 811 812 result = mqtt_connect(data); 813 if(result) { 814 failf(data, "Error %d sending MQTT CONNECT request", result); 815 return result; 816 } 817 mqstate(data, MQTT_FIRST, MQTT_CONNACK); 818 return CURLE_OK; 819 } 820 821 static CURLcode mqtt_done(struct Curl_easy *data, 822 CURLcode status, bool premature) 823 { 824 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 825 (void)status; 826 (void)premature; 827 if(mq) { 828 curlx_dyn_free(&mq->sendbuf); 829 curlx_dyn_free(&mq->recvbuf); 830 } 831 return CURLE_OK; 832 } 833 834 /* we ping regularly to avoid being disconnected by the server */ 835 static CURLcode mqtt_ping(struct Curl_easy *data) 836 { 837 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 838 CURLcode result = CURLE_OK; 839 struct connectdata *conn = data->conn; 840 struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); 841 842 if(!mqtt || !mq) 843 return CURLE_FAILED_INIT; 844 845 if(mqtt->state == MQTT_FIRST && 846 !mq->pingsent && 847 data->set.upkeep_interval_ms > 0) { 848 struct curltime t = curlx_now(); 849 timediff_t diff = curlx_timediff(t, mq->lastTime); 850 851 if(diff > data->set.upkeep_interval_ms) { 852 /* 0xC0 is PINGREQ, and 0x00 is remaining length */ 853 unsigned char packet[2] = { 0xC0, 0x00 }; 854 size_t packetlen = sizeof(packet); 855 856 result = mqtt_send(data, (char *)packet, packetlen); 857 if(!result) { 858 mq->pingsent = TRUE; 859 } 860 infof(data, "mqtt_ping: sent ping request."); 861 } 862 } 863 return result; 864 } 865 866 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) 867 { 868 struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); 869 CURLcode result = CURLE_OK; 870 size_t nread; 871 unsigned char recvbyte; 872 struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN); 873 874 if(!mqtt || !mq) 875 return CURLE_FAILED_INIT; 876 877 *done = FALSE; 878 879 if(curlx_dyn_len(&mq->sendbuf)) { 880 /* send the remainder of an outgoing packet */ 881 result = mqtt_send(data, curlx_dyn_ptr(&mq->sendbuf), 882 curlx_dyn_len(&mq->sendbuf)); 883 if(result) 884 return result; 885 } 886 887 result = mqtt_ping(data); 888 if(result) 889 return result; 890 891 infof(data, "mqtt_doing: state [%d]", (int) mqtt->state); 892 switch(mqtt->state) { 893 case MQTT_FIRST: 894 /* Read the initial byte only */ 895 result = Curl_xfer_recv(data, (char *)&mq->firstbyte, 1, &nread); 896 if(result) 897 break; 898 else if(!nread) { 899 failf(data, "Connection disconnected"); 900 *done = TRUE; 901 result = CURLE_RECV_ERROR; 902 break; 903 } 904 Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1); 905 906 /* we received something */ 907 mq->lastTime = curlx_now(); 908 909 /* remember the first byte */ 910 mq->npacket = 0; 911 mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE); 912 FALLTHROUGH(); 913 case MQTT_REMAINING_LENGTH: 914 do { 915 result = Curl_xfer_recv(data, (char *)&recvbyte, 1, &nread); 916 if(result || !nread) 917 break; 918 Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&recvbyte, 1); 919 mq->pkt_hd[mq->npacket++] = recvbyte; 920 } while((recvbyte & 0x80) && (mq->npacket < 4)); 921 if(!result && nread && (recvbyte & 0x80)) 922 /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 + 923 127 * 128^3 bytes. server tried to send more */ 924 result = CURLE_WEIRD_SERVER_REPLY; 925 if(result) 926 break; 927 mq->remaining_length = mqtt_decode_len(mq->pkt_hd, mq->npacket, NULL); 928 mq->npacket = 0; 929 if(mq->remaining_length) { 930 mqstate(data, mqtt->nextstate, MQTT_NOSTATE); 931 break; 932 } 933 mqstate(data, MQTT_FIRST, MQTT_FIRST); 934 935 if(mq->firstbyte == MQTT_MSG_DISCONNECT) { 936 infof(data, "Got DISCONNECT"); 937 *done = TRUE; 938 } 939 940 /* ping response */ 941 if(mq->firstbyte == MQTT_MSG_PINGRESP) { 942 infof(data, "Received ping response."); 943 mq->pingsent = FALSE; 944 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); 945 } 946 break; 947 case MQTT_CONNACK: 948 result = mqtt_verify_connack(data); 949 if(result) 950 break; 951 952 if(data->state.httpreq == HTTPREQ_POST) { 953 result = mqtt_publish(data); 954 if(!result) { 955 result = mqtt_disconnect(data); 956 *done = TRUE; 957 } 958 mqtt->nextstate = MQTT_FIRST; 959 } 960 else { 961 result = mqtt_subscribe(data); 962 if(!result) { 963 mqstate(data, MQTT_FIRST, MQTT_SUBACK); 964 } 965 } 966 break; 967 968 case MQTT_SUBACK: 969 case MQTT_PUBWAIT: 970 case MQTT_PUB_REMAIN: 971 result = mqtt_read_publish(data, done); 972 break; 973 974 default: 975 failf(data, "State not handled yet"); 976 *done = TRUE; 977 break; 978 } 979 980 if(result == CURLE_AGAIN) 981 result = CURLE_OK; 982 return result; 983 } 984 985 #endif /* CURL_DISABLE_MQTT */