mqttd.c (23082B)
1 /*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 9 * 10 * This software is licensed as described in the file COPYING, which 11 * you should have received as part of this distribution. The terms 12 * are also available at https://curl.se/docs/copyright.html. 13 * 14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 15 * copies of the Software, and permit persons to whom the Software is 16 * furnished to do so, under the terms of the COPYING file. 17 * 18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 19 * KIND, either express or implied. 20 * 21 * SPDX-License-Identifier: curl 22 * 23 ***************************************************************************/ 24 #include "first.h" 25 26 #include <stdlib.h> 27 #include <string.h> 28 29 /* Function 30 * 31 * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT. 32 * 33 * Read commands from FILE (set with --config). The commands control how to 34 * act and is reset to defaults each client TCP connect. 35 * 36 */ 37 38 /* based on sockfilt.c */ 39 40 #define MQTT_MSG_CONNECT 0x10 41 #define MQTT_MSG_CONNACK 0x20 42 #define MQTT_MSG_PUBLISH 0x30 43 /* #define MQTT_MSG_PUBACK 0x40 */ 44 #define MQTT_MSG_SUBSCRIBE 0x82 45 #define MQTT_MSG_SUBACK 0x90 46 #define MQTT_MSG_DISCONNECT 0xe0 47 48 struct mqttd_configurable { 49 unsigned char version; /* initial version byte in the request must match 50 this */ 51 bool publish_before_suback; 52 bool short_publish; 53 bool excessive_remaining; 54 unsigned char error_connack; 55 int testnum; 56 }; 57 58 #define REQUEST_DUMP "server.input" 59 #define CONFIG_VERSION 5 60 61 static struct mqttd_configurable m_config; 62 63 static void mqttd_resetdefaults(void) 64 { 65 logmsg("Reset to defaults"); 66 m_config.version = CONFIG_VERSION; 67 m_config.publish_before_suback = FALSE; 68 m_config.short_publish = FALSE; 69 m_config.excessive_remaining = FALSE; 70 m_config.error_connack = 0; 71 m_config.testnum = 0; 72 } 73 74 static void mqttd_getconfig(void) 75 { 76 FILE *fp = fopen(configfile, FOPEN_READTEXT); 77 mqttd_resetdefaults(); 78 if(fp) { 79 char buffer[512]; 80 logmsg("parse config file"); 81 while(fgets(buffer, sizeof(buffer), fp)) { 82 char key[32]; 83 char value[32]; 84 if(2 == sscanf(buffer, "%31s %31s", key, value)) { 85 if(!strcmp(key, "version")) { 86 m_config.version = byteval(value); 87 logmsg("version [%d] set", m_config.version); 88 } 89 else if(!strcmp(key, "PUBLISH-before-SUBACK")) { 90 logmsg("PUBLISH-before-SUBACK set"); 91 m_config.publish_before_suback = TRUE; 92 } 93 else if(!strcmp(key, "short-PUBLISH")) { 94 logmsg("short-PUBLISH set"); 95 m_config.short_publish = TRUE; 96 } 97 else if(!strcmp(key, "error-CONNACK")) { 98 m_config.error_connack = byteval(value); 99 logmsg("error-CONNACK = %d", m_config.error_connack); 100 } 101 else if(!strcmp(key, "Testnum")) { 102 m_config.testnum = atoi(value); 103 logmsg("testnum = %d", m_config.testnum); 104 } 105 else if(!strcmp(key, "excessive-remaining")) { 106 logmsg("excessive-remaining set"); 107 m_config.excessive_remaining = TRUE; 108 } 109 } 110 } 111 fclose(fp); 112 } 113 else { 114 logmsg("No config file '%s' to read", configfile); 115 } 116 } 117 118 typedef enum { 119 FROM_CLIENT, 120 FROM_SERVER 121 } mqttdir; 122 123 static void logprotocol(mqttdir dir, 124 const char *prefix, size_t remlen, 125 FILE *output, 126 unsigned char *buffer, ssize_t len) 127 { 128 char data[12000] = ""; 129 ssize_t i; 130 unsigned char *ptr = buffer; 131 char *optr = data; 132 int left = sizeof(data); 133 134 for(i = 0; i < len && (left >= 0); i++) { 135 snprintf(optr, left, "%02x", ptr[i]); 136 optr += 2; 137 left -= 2; 138 } 139 fprintf(output, "%s %s %x %s\n", 140 dir == FROM_CLIENT ? "client" : "server", 141 prefix, (int)remlen, data); 142 } 143 144 145 /* return 0 on success */ 146 static int connack(FILE *dump, curl_socket_t fd) 147 { 148 unsigned char packet[]={ 149 MQTT_MSG_CONNACK, 0x02, 150 0x00, 0x00 151 }; 152 ssize_t rc; 153 154 packet[3] = m_config.error_connack; 155 156 rc = swrite(fd, (char *)packet, sizeof(packet)); 157 if(rc > 0) { 158 logmsg("WROTE %zd bytes [CONNACK]", rc); 159 loghex(packet, rc); 160 logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet)); 161 } 162 if(rc == sizeof(packet)) { 163 return 0; 164 } 165 return 1; 166 } 167 168 /* return 0 on success */ 169 static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid) 170 { 171 unsigned char packet[]={ 172 MQTT_MSG_SUBACK, 0x03, 173 0, 0, /* filled in below */ 174 0x00 175 }; 176 ssize_t rc; 177 packet[2] = (unsigned char)(packetid >> 8); 178 packet[3] = (unsigned char)(packetid & 0xff); 179 180 rc = swrite(fd, (char *)packet, sizeof(packet)); 181 if(rc == sizeof(packet)) { 182 logmsg("WROTE %zd bytes [SUBACK]", rc); 183 loghex(packet, rc); 184 logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc); 185 return 0; 186 } 187 return 1; 188 } 189 190 #ifdef QOS 191 /* return 0 on success */ 192 static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid) 193 { 194 unsigned char packet[]={ 195 MQTT_MSG_PUBACK, 0x00, 196 0, 0 /* filled in below */ 197 }; 198 ssize_t rc; 199 packet[2] = (unsigned char)(packetid >> 8); 200 packet[3] = (unsigned char)(packetid & 0xff); 201 202 rc = swrite(fd, (char *)packet, sizeof(packet)); 203 if(rc == sizeof(packet)) { 204 logmsg("WROTE %zd bytes [PUBACK]", rc); 205 loghex(packet, rc); 206 logprotocol(FROM_SERVER, dump, packet, rc); 207 return 0; 208 } 209 logmsg("Failed sending [PUBACK]"); 210 return 1; 211 } 212 #endif 213 214 /* return 0 on success */ 215 static int disconnect(FILE *dump, curl_socket_t fd) 216 { 217 unsigned char packet[]={ 218 MQTT_MSG_DISCONNECT, 0x00, 219 }; 220 ssize_t rc = swrite(fd, (char *)packet, sizeof(packet)); 221 if(rc == sizeof(packet)) { 222 logmsg("WROTE %zd bytes [DISCONNECT]", rc); 223 loghex(packet, rc); 224 logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc); 225 return 0; 226 } 227 logmsg("Failed sending [DISCONNECT]"); 228 return 1; 229 } 230 231 232 233 /* 234 do 235 236 encodedByte = X MOD 128 237 238 X = X DIV 128 239 240 // if there are more data to encode, set the top bit of this byte 241 242 if ( X > 0 ) 243 244 encodedByte = encodedByte OR 128 245 246 endif 247 248 'output' encodedByte 249 250 while ( X > 0 ) 251 252 */ 253 254 /* return number of bytes used */ 255 static size_t encode_length(size_t packetlen, 256 unsigned char *remlength) /* 4 bytes */ 257 { 258 size_t bytes = 0; 259 unsigned char encode; 260 261 do { 262 encode = packetlen % 0x80; 263 packetlen /= 0x80; 264 if(packetlen) 265 encode |= 0x80; 266 267 remlength[bytes++] = encode; 268 269 if(bytes > 3) { 270 logmsg("too large packet!"); 271 return 0; 272 } 273 } while(packetlen); 274 275 return bytes; 276 } 277 278 279 static size_t decode_length(unsigned char *buffer, 280 size_t buflen, size_t *lenbytes) 281 { 282 size_t len = 0; 283 size_t mult = 1; 284 size_t i; 285 unsigned char encoded = 0x80; 286 287 for(i = 0; (i < buflen) && (encoded & 0x80); i++) { 288 encoded = buffer[i]; 289 len += (encoded & 0x7f) * mult; 290 mult *= 0x80; 291 } 292 293 if(lenbytes) 294 *lenbytes = i; 295 296 return len; 297 } 298 299 300 /* return 0 on success */ 301 static int publish(FILE *dump, 302 curl_socket_t fd, unsigned short packetid, 303 char *topic, const char *payload, size_t payloadlen) 304 { 305 size_t topiclen = strlen(topic); 306 unsigned char *packet; 307 size_t payloadindex; 308 size_t remaininglength = topiclen + 2 + payloadlen; 309 size_t packetlen; 310 size_t sendamount; 311 ssize_t rc; 312 unsigned char rembuffer[4]; 313 size_t encodedlen; 314 315 if(m_config.excessive_remaining) { 316 /* manually set illegal remaining length */ 317 rembuffer[0] = 0xff; 318 rembuffer[1] = 0xff; 319 rembuffer[2] = 0xff; 320 rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */ 321 encodedlen = 4; 322 } 323 else 324 encodedlen = encode_length(remaininglength, rembuffer); 325 326 /* one packet type byte (possibly two more for packetid) */ 327 packetlen = remaininglength + encodedlen + 1; 328 packet = malloc(packetlen); 329 if(!packet) 330 return 1; 331 332 packet[0] = MQTT_MSG_PUBLISH; 333 memcpy(&packet[1], rembuffer, encodedlen); 334 335 (void)packetid; 336 /* packet_id if QoS is set */ 337 338 packet[1 + encodedlen] = (unsigned char)(topiclen >> 8); 339 packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff); 340 memcpy(&packet[3 + encodedlen], topic, topiclen); 341 342 payloadindex = 3 + topiclen + encodedlen; 343 memcpy(&packet[payloadindex], payload, payloadlen); 344 345 sendamount = packetlen; 346 if(m_config.short_publish) 347 sendamount -= 2; 348 349 rc = swrite(fd, (char *)packet, sendamount); 350 if(rc > 0) { 351 logmsg("WROTE %zd bytes [PUBLISH]", rc); 352 loghex(packet, rc); 353 logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc); 354 } 355 free(packet); 356 if((size_t)rc == packetlen) 357 return 0; 358 return 1; 359 } 360 361 #define MAX_TOPIC_LENGTH 65535 362 #define MAX_CLIENT_ID_LENGTH 32 363 364 static char topic[MAX_TOPIC_LENGTH + 1]; 365 366 static int fixedheader(curl_socket_t fd, 367 unsigned char *bytep, 368 size_t *remaining_lengthp, 369 size_t *remaining_length_bytesp) 370 { 371 /* get the fixed header */ 372 unsigned char buffer[10]; 373 374 /* get the first two bytes */ 375 ssize_t rc = sread(fd, (char *)buffer, 2); 376 size_t i; 377 if(rc < 2) { 378 logmsg("READ %zd bytes [SHORT!]", rc); 379 return 1; /* fail */ 380 } 381 logmsg("READ %zd bytes", rc); 382 loghex(buffer, rc); 383 *bytep = buffer[0]; 384 385 /* if the length byte has the top bit set, get the next one too */ 386 i = 1; 387 while(buffer[i] & 0x80) { 388 i++; 389 rc = sread(fd, (char *)&buffer[i], 1); 390 if(rc != 1) { 391 logmsg("Remaining Length broken"); 392 return 1; 393 } 394 } 395 *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp); 396 logmsg("Remaining Length: %zu [%zu bytes]", *remaining_lengthp, 397 *remaining_length_bytesp); 398 return 0; 399 } 400 401 static curl_socket_t mqttit(curl_socket_t fd) 402 { 403 size_t buff_size = 10*1024; 404 unsigned char *buffer = NULL; 405 ssize_t rc; 406 unsigned char byte; 407 unsigned short packet_id; 408 size_t payload_len; 409 size_t client_id_length; 410 size_t topic_len; 411 size_t remaining_length = 0; 412 size_t bytes = 0; /* remaining length field size in bytes */ 413 char client_id[MAX_CLIENT_ID_LENGTH]; 414 long testno; 415 FILE *stream = NULL; 416 FILE *dump; 417 char dumpfile[256]; 418 419 static const char protocol[7] = { 420 0x00, 0x04, /* protocol length */ 421 'M','Q','T','T', /* protocol name */ 422 0x04 /* protocol level */ 423 }; 424 snprintf(dumpfile, sizeof(dumpfile), "%s/%s", logdir, REQUEST_DUMP); 425 dump = fopen(dumpfile, "ab"); 426 if(!dump) 427 goto end; 428 429 mqttd_getconfig(); 430 431 testno = m_config.testnum; 432 433 if(testno) 434 logmsg("Found test number %ld", testno); 435 436 buffer = malloc(buff_size); 437 if(!buffer) { 438 logmsg("Out of memory, unable to allocate buffer"); 439 goto end; 440 } 441 memset(buffer, 0, buff_size); 442 443 do { 444 unsigned char usr_flag = 0x80; 445 unsigned char passwd_flag = 0x40; 446 unsigned char conn_flags; 447 const size_t client_id_offset = 12; 448 size_t start_usr; 449 size_t start_passwd; 450 451 /* get the fixed header */ 452 rc = fixedheader(fd, &byte, &remaining_length, &bytes); 453 if(rc) 454 break; 455 456 if(remaining_length >= buff_size) { 457 unsigned char *newbuffer; 458 buff_size = remaining_length; 459 newbuffer = realloc(buffer, buff_size); 460 if(!newbuffer) { 461 logmsg("Failed realloc of size %zu", buff_size); 462 goto end; 463 } 464 buffer = newbuffer; 465 } 466 467 if(remaining_length) { 468 /* reading variable header and payload into buffer */ 469 rc = sread(fd, (char *)buffer, remaining_length); 470 if(rc > 0) { 471 logmsg("READ %zd bytes", rc); 472 loghex(buffer, rc); 473 } 474 } 475 476 if(byte == MQTT_MSG_CONNECT) { 477 logprotocol(FROM_CLIENT, "CONNECT", remaining_length, 478 dump, buffer, rc); 479 480 if(memcmp(protocol, buffer, sizeof(protocol))) { 481 logmsg("Protocol preamble mismatch"); 482 goto end; 483 } 484 /* ignore the connect flag byte and two keepalive bytes */ 485 payload_len = (size_t)(buffer[10] << 8) | buffer[11]; 486 /* first part of the payload is the client ID */ 487 client_id_length = payload_len; 488 489 /* checking if user and password flags were set */ 490 conn_flags = buffer[7]; 491 492 start_usr = client_id_offset + payload_len; 493 if(usr_flag == (unsigned char)(conn_flags & usr_flag)) { 494 logmsg("User flag is present in CONN flag"); 495 payload_len += (size_t)(buffer[start_usr] << 8) | 496 buffer[start_usr + 1]; 497 payload_len += 2; /* MSB and LSB for user length */ 498 } 499 500 start_passwd = client_id_offset + payload_len; 501 if(passwd_flag == (char)(conn_flags & passwd_flag)) { 502 logmsg("Password flag is present in CONN flags"); 503 payload_len += (size_t)(buffer[start_passwd] << 8) | 504 buffer[start_passwd + 1]; 505 payload_len += 2; /* MSB and LSB for password length */ 506 } 507 508 /* check the length of the payload */ 509 if((ssize_t)payload_len != (rc - 12)) { 510 logmsg("Payload length mismatch, expected %zx got %zx", 511 rc - 12, payload_len); 512 goto end; 513 } 514 /* check the length of the client ID */ 515 else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) { 516 logmsg("Too large client id"); 517 goto end; 518 } 519 memcpy(client_id, &buffer[12], client_id_length); 520 client_id[client_id_length] = 0; 521 522 logmsg("MQTT client connect accepted: %s", client_id); 523 524 /* The first packet sent from the Server to the Client MUST be a 525 CONNACK Packet */ 526 527 if(connack(dump, fd)) { 528 logmsg("failed sending CONNACK"); 529 goto end; 530 } 531 } 532 else if(byte == MQTT_MSG_SUBSCRIBE) { 533 int error; 534 char *data; 535 size_t datalen; 536 logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length, 537 dump, buffer, rc); 538 logmsg("Incoming SUBSCRIBE"); 539 540 if(rc < 6) { 541 logmsg("Too small SUBSCRIBE"); 542 goto end; 543 } 544 545 /* two bytes packet id */ 546 packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]); 547 548 /* two bytes topic length */ 549 topic_len = (size_t)(buffer[2] << 8) | buffer[3]; 550 if(topic_len != (remaining_length - 5)) { 551 logmsg("Wrong topic length, got %zu expected %zu", 552 topic_len, remaining_length - 5); 553 goto end; 554 } 555 memcpy(topic, &buffer[4], topic_len); 556 topic[topic_len] = 0; 557 558 /* there's a QoS byte (two bits) after the topic */ 559 560 logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id); 561 stream = test2fopen(testno, logdir); 562 if(!stream) { 563 error = errno; 564 logmsg("fopen() failed with error (%d) %s", error, strerror(error)); 565 logmsg("Couldn't open test file %ld", testno); 566 goto end; 567 } 568 error = getpart(&data, &datalen, "reply", "data", stream); 569 if(!error) { 570 if(!m_config.publish_before_suback) { 571 if(suback(dump, fd, packet_id)) { 572 logmsg("failed sending SUBACK"); 573 free(data); 574 goto end; 575 } 576 } 577 if(publish(dump, fd, packet_id, topic, data, datalen)) { 578 logmsg("PUBLISH failed"); 579 free(data); 580 goto end; 581 } 582 free(data); 583 if(m_config.publish_before_suback) { 584 if(suback(dump, fd, packet_id)) { 585 logmsg("failed sending SUBACK"); 586 goto end; 587 } 588 } 589 } 590 else { 591 const char *def = "this is random payload yes yes it is"; 592 publish(dump, fd, packet_id, topic, def, strlen(def)); 593 } 594 disconnect(dump, fd); 595 } 596 else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) { 597 size_t topiclen; 598 599 logmsg("Incoming PUBLISH"); 600 logprotocol(FROM_CLIENT, "PUBLISH", remaining_length, 601 dump, buffer, rc); 602 603 topiclen = (size_t)(buffer[1 + bytes] << 8) | buffer[2 + bytes]; 604 logmsg("Got %zu bytes topic", topiclen); 605 606 #ifdef QOS 607 /* Handle packetid if there is one. Send puback if QoS > 0 */ 608 puback(dump, fd, 0); 609 #endif 610 /* expect a disconnect here */ 611 /* get the request */ 612 rc = sread(fd, (char *)&buffer[0], 2); 613 614 logmsg("READ %zd bytes [DISCONNECT]", rc); 615 loghex(buffer, rc); 616 logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc); 617 goto end; 618 } 619 else { 620 /* not supported (yet) */ 621 goto end; 622 } 623 } while(1); 624 625 end: 626 if(buffer) 627 free(buffer); 628 if(dump) 629 fclose(dump); 630 if(stream) 631 fclose(stream); 632 return CURL_SOCKET_BAD; 633 } 634 635 /* 636 sockfdp is a pointer to an established stream or CURL_SOCKET_BAD 637 638 if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must 639 accept() 640 */ 641 static bool mqttd_incoming(curl_socket_t listenfd) 642 { 643 fd_set fds_read; 644 fd_set fds_write; 645 fd_set fds_err; 646 int clients = 0; /* connected clients */ 647 648 if(got_exit_signal) { 649 logmsg("signalled to die, exiting..."); 650 return FALSE; 651 } 652 653 #ifdef HAVE_GETPPID 654 /* As a last resort, quit if socks5 process becomes orphan. */ 655 if(getppid() <= 1) { 656 logmsg("process becomes orphan, exiting"); 657 return FALSE; 658 } 659 #endif 660 661 do { 662 ssize_t rc; 663 int error = 0; 664 curl_socket_t sockfd = listenfd; 665 int maxfd = (int)sockfd; 666 667 FD_ZERO(&fds_read); 668 FD_ZERO(&fds_write); 669 FD_ZERO(&fds_err); 670 671 /* there's always a socket to wait for */ 672 #if defined(__DJGPP__) 673 #pragma GCC diagnostic push 674 #pragma GCC diagnostic ignored "-Warith-conversion" 675 #endif 676 FD_SET(sockfd, &fds_read); 677 #if defined(__DJGPP__) 678 #pragma GCC diagnostic pop 679 #endif 680 681 do { 682 /* select() blocking behavior call on blocking descriptors please */ 683 rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL); 684 if(got_exit_signal) { 685 logmsg("signalled to die, exiting..."); 686 return FALSE; 687 } 688 } while((rc == -1) && ((error = SOCKERRNO) == SOCKEINTR)); 689 690 if(rc < 0) { 691 logmsg("select() failed with error (%d) %s", 692 error, strerror(error)); 693 return FALSE; 694 } 695 696 if(FD_ISSET(sockfd, &fds_read)) { 697 curl_socket_t newfd = accept(sockfd, NULL, NULL); 698 if(CURL_SOCKET_BAD == newfd) { 699 error = SOCKERRNO; 700 logmsg("accept() failed with error (%d) %s", error, sstrerror(error)); 701 } 702 else { 703 logmsg("====> Client connect, fd %ld. " 704 "Read config from %s", (long)newfd, configfile); 705 set_advisor_read_lock(loglockfile); 706 (void)mqttit(newfd); /* until done */ 707 clear_advisor_read_lock(loglockfile); 708 709 logmsg("====> Client disconnect"); 710 sclose(newfd); 711 } 712 } 713 } while(clients); 714 715 return TRUE; 716 } 717 718 static int test_mqttd(int argc, char *argv[]) 719 { 720 curl_socket_t sock = CURL_SOCKET_BAD; 721 curl_socket_t msgsock = CURL_SOCKET_BAD; 722 int wrotepidfile = 0; 723 int wroteportfile = 0; 724 bool juggle_again; 725 int error; 726 int arg = 1; 727 728 pidname = ".mqttd.pid"; 729 portname = ".mqttd.port"; 730 serverlogfile = "log/mqttd.log"; 731 configfile = "mqttd.config"; 732 server_port = 1883; /* MQTT default port */ 733 734 while(argc > arg) { 735 if(!strcmp("--version", argv[arg])) { 736 printf("mqttd IPv4%s\n", 737 #ifdef USE_IPV6 738 "/IPv6" 739 #else 740 "" 741 #endif 742 ); 743 return 0; 744 } 745 else if(!strcmp("--pidfile", argv[arg])) { 746 arg++; 747 if(argc > arg) 748 pidname = argv[arg++]; 749 } 750 else if(!strcmp("--portfile", argv[arg])) { 751 arg++; 752 if(argc > arg) 753 portname = argv[arg++]; 754 } 755 else if(!strcmp("--config", argv[arg])) { 756 arg++; 757 if(argc > arg) 758 configfile = argv[arg++]; 759 } 760 else if(!strcmp("--logfile", argv[arg])) { 761 arg++; 762 if(argc > arg) 763 serverlogfile = argv[arg++]; 764 } 765 else if(!strcmp("--logdir", argv[arg])) { 766 arg++; 767 if(argc > arg) 768 logdir = argv[arg++]; 769 } 770 else if(!strcmp("--ipv6", argv[arg])) { 771 #ifdef USE_IPV6 772 socket_domain = AF_INET6; 773 ipv_inuse = "IPv6"; 774 #endif 775 arg++; 776 } 777 else if(!strcmp("--ipv4", argv[arg])) { 778 /* for completeness, we support this option as well */ 779 #ifdef USE_IPV6 780 socket_domain = AF_INET; 781 ipv_inuse = "IPv4"; 782 #endif 783 arg++; 784 } 785 else if(!strcmp("--port", argv[arg])) { 786 arg++; 787 if(argc > arg) { 788 char *endptr; 789 unsigned long ulnum = strtoul(argv[arg], &endptr, 10); 790 if((endptr != argv[arg] + strlen(argv[arg])) || 791 ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) { 792 fprintf(stderr, "mqttd: invalid --port argument (%s)\n", 793 argv[arg]); 794 return 0; 795 } 796 server_port = util_ultous(ulnum); 797 arg++; 798 } 799 } 800 else { 801 puts("Usage: mqttd [option]\n" 802 " --config [file]\n" 803 " --version\n" 804 " --logfile [file]\n" 805 " --logdir [directory]\n" 806 " --pidfile [file]\n" 807 " --portfile [file]\n" 808 " --ipv4\n" 809 " --ipv6\n" 810 " --port [port]\n"); 811 return 0; 812 } 813 } 814 815 snprintf(loglockfile, sizeof(loglockfile), "%s/%s/mqtt-%s.lock", 816 logdir, SERVERLOGS_LOCKDIR, ipv_inuse); 817 818 #ifdef _WIN32 819 if(win32_init()) 820 return 2; 821 #endif 822 823 CURLX_SET_BINMODE(stdin); 824 CURLX_SET_BINMODE(stdout); 825 CURLX_SET_BINMODE(stderr); 826 827 install_signal_handlers(FALSE); 828 829 sock = socket(socket_domain, SOCK_STREAM, 0); 830 831 if(CURL_SOCKET_BAD == sock) { 832 error = SOCKERRNO; 833 logmsg("Error creating socket (%d) %s", error, sstrerror(error)); 834 goto mqttd_cleanup; 835 } 836 837 { 838 /* passive daemon style */ 839 sock = sockdaemon(sock, &server_port, NULL, FALSE); 840 if(CURL_SOCKET_BAD == sock) { 841 goto mqttd_cleanup; 842 } 843 msgsock = CURL_SOCKET_BAD; /* no stream socket yet */ 844 } 845 846 logmsg("Running %s version", ipv_inuse); 847 logmsg("Listening on port %hu", server_port); 848 849 wrotepidfile = write_pidfile(pidname); 850 if(!wrotepidfile) { 851 goto mqttd_cleanup; 852 } 853 854 wroteportfile = write_portfile(portname, server_port); 855 if(!wroteportfile) { 856 goto mqttd_cleanup; 857 } 858 859 do { 860 juggle_again = mqttd_incoming(sock); 861 } while(juggle_again); 862 863 mqttd_cleanup: 864 865 if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD)) 866 sclose(msgsock); 867 868 if(sock != CURL_SOCKET_BAD) 869 sclose(sock); 870 871 if(wrotepidfile) 872 unlink(pidname); 873 if(wroteportfile) 874 unlink(portname); 875 876 restore_signal_handlers(FALSE); 877 878 if(got_exit_signal) { 879 logmsg("============> mqttd exits with signal (%d)", exit_signal); 880 /* 881 * To properly set the return status of the process we 882 * must raise the same signal SIGINT or SIGTERM that we 883 * caught and let the old handler take care of it. 884 */ 885 raise(exit_signal); 886 } 887 888 logmsg("============> mqttd quits"); 889 return 0; 890 }