bufq.c (15033B)
1 /*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 9 * 10 * This software is licensed as described in the file COPYING, which 11 * you should have received as part of this distribution. The terms 12 * are also available at https://curl.se/docs/copyright.html. 13 * 14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 15 * copies of the Software, and permit persons to whom the Software is 16 * furnished to do so, under the terms of the COPYING file. 17 * 18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 19 * KIND, either express or implied. 20 * 21 * SPDX-License-Identifier: curl 22 * 23 ***************************************************************************/ 24 25 #include "curl_setup.h" 26 #include "bufq.h" 27 28 /* The last 3 #include files should be in this order */ 29 #include "curl_printf.h" 30 #include "curl_memory.h" 31 #include "memdebug.h" 32 33 static bool chunk_is_empty(const struct buf_chunk *chunk) 34 { 35 return chunk->r_offset >= chunk->w_offset; 36 } 37 38 static bool chunk_is_full(const struct buf_chunk *chunk) 39 { 40 return chunk->w_offset >= chunk->dlen; 41 } 42 43 static size_t chunk_len(const struct buf_chunk *chunk) 44 { 45 return chunk->w_offset - chunk->r_offset; 46 } 47 48 static void chunk_reset(struct buf_chunk *chunk) 49 { 50 chunk->next = NULL; 51 chunk->r_offset = chunk->w_offset = 0; 52 } 53 54 static size_t chunk_append(struct buf_chunk *chunk, 55 const unsigned char *buf, size_t len) 56 { 57 unsigned char *p = &chunk->x.data[chunk->w_offset]; 58 size_t n = chunk->dlen - chunk->w_offset; 59 DEBUGASSERT(chunk->dlen >= chunk->w_offset); 60 if(n) { 61 n = CURLMIN(n, len); 62 memcpy(p, buf, n); 63 chunk->w_offset += n; 64 } 65 return n; 66 } 67 68 static size_t chunk_read(struct buf_chunk *chunk, 69 unsigned char *buf, size_t len) 70 { 71 unsigned char *p = &chunk->x.data[chunk->r_offset]; 72 size_t n = chunk->w_offset - chunk->r_offset; 73 DEBUGASSERT(chunk->w_offset >= chunk->r_offset); 74 if(!n) { 75 return 0; 76 } 77 else if(n <= len) { 78 memcpy(buf, p, n); 79 chunk->r_offset = chunk->w_offset = 0; 80 return n; 81 } 82 else { 83 memcpy(buf, p, len); 84 chunk->r_offset += len; 85 return len; 86 } 87 } 88 89 static CURLcode chunk_slurpn(struct buf_chunk *chunk, size_t max_len, 90 Curl_bufq_reader *reader, 91 void *reader_ctx, size_t *pnread) 92 { 93 unsigned char *p = &chunk->x.data[chunk->w_offset]; 94 size_t n = chunk->dlen - chunk->w_offset; /* free amount */ 95 CURLcode result; 96 97 *pnread = 0; 98 DEBUGASSERT(chunk->dlen >= chunk->w_offset); 99 if(!n) 100 return CURLE_AGAIN; 101 if(max_len && n > max_len) 102 n = max_len; 103 result = reader(reader_ctx, p, n, pnread); 104 if(!result) { 105 DEBUGASSERT(*pnread <= n); 106 chunk->w_offset += *pnread; 107 } 108 return result; 109 } 110 111 static void chunk_peek(const struct buf_chunk *chunk, 112 const unsigned char **pbuf, size_t *plen) 113 { 114 DEBUGASSERT(chunk->w_offset >= chunk->r_offset); 115 *pbuf = &chunk->x.data[chunk->r_offset]; 116 *plen = chunk->w_offset - chunk->r_offset; 117 } 118 119 static void chunk_peek_at(const struct buf_chunk *chunk, size_t offset, 120 const unsigned char **pbuf, size_t *plen) 121 { 122 offset += chunk->r_offset; 123 DEBUGASSERT(chunk->w_offset >= offset); 124 *pbuf = &chunk->x.data[offset]; 125 *plen = chunk->w_offset - offset; 126 } 127 128 static size_t chunk_skip(struct buf_chunk *chunk, size_t amount) 129 { 130 size_t n = chunk->w_offset - chunk->r_offset; 131 DEBUGASSERT(chunk->w_offset >= chunk->r_offset); 132 if(n) { 133 n = CURLMIN(n, amount); 134 chunk->r_offset += n; 135 if(chunk->r_offset == chunk->w_offset) 136 chunk->r_offset = chunk->w_offset = 0; 137 } 138 return n; 139 } 140 141 static void chunk_list_free(struct buf_chunk **anchor) 142 { 143 struct buf_chunk *chunk; 144 while(*anchor) { 145 chunk = *anchor; 146 *anchor = chunk->next; 147 free(chunk); 148 } 149 } 150 151 152 153 void Curl_bufcp_init(struct bufc_pool *pool, 154 size_t chunk_size, size_t spare_max) 155 { 156 DEBUGASSERT(chunk_size > 0); 157 DEBUGASSERT(spare_max > 0); 158 memset(pool, 0, sizeof(*pool)); 159 pool->chunk_size = chunk_size; 160 pool->spare_max = spare_max; 161 } 162 163 static CURLcode bufcp_take(struct bufc_pool *pool, 164 struct buf_chunk **pchunk) 165 { 166 struct buf_chunk *chunk = NULL; 167 168 if(pool->spare) { 169 chunk = pool->spare; 170 pool->spare = chunk->next; 171 --pool->spare_count; 172 chunk_reset(chunk); 173 *pchunk = chunk; 174 return CURLE_OK; 175 } 176 177 chunk = calloc(1, sizeof(*chunk) + pool->chunk_size); 178 if(!chunk) { 179 *pchunk = NULL; 180 return CURLE_OUT_OF_MEMORY; 181 } 182 chunk->dlen = pool->chunk_size; 183 *pchunk = chunk; 184 return CURLE_OK; 185 } 186 187 static void bufcp_put(struct bufc_pool *pool, 188 struct buf_chunk *chunk) 189 { 190 if(pool->spare_count >= pool->spare_max) { 191 free(chunk); 192 } 193 else { 194 chunk_reset(chunk); 195 chunk->next = pool->spare; 196 pool->spare = chunk; 197 ++pool->spare_count; 198 } 199 } 200 201 void Curl_bufcp_free(struct bufc_pool *pool) 202 { 203 chunk_list_free(&pool->spare); 204 pool->spare_count = 0; 205 } 206 207 static void bufq_init(struct bufq *q, struct bufc_pool *pool, 208 size_t chunk_size, size_t max_chunks, int opts) 209 { 210 DEBUGASSERT(chunk_size > 0); 211 DEBUGASSERT(max_chunks > 0); 212 memset(q, 0, sizeof(*q)); 213 q->chunk_size = chunk_size; 214 q->max_chunks = max_chunks; 215 q->pool = pool; 216 q->opts = opts; 217 } 218 219 void Curl_bufq_init2(struct bufq *q, size_t chunk_size, size_t max_chunks, 220 int opts) 221 { 222 bufq_init(q, NULL, chunk_size, max_chunks, opts); 223 } 224 225 void Curl_bufq_init(struct bufq *q, size_t chunk_size, size_t max_chunks) 226 { 227 bufq_init(q, NULL, chunk_size, max_chunks, BUFQ_OPT_NONE); 228 } 229 230 void Curl_bufq_initp(struct bufq *q, struct bufc_pool *pool, 231 size_t max_chunks, int opts) 232 { 233 bufq_init(q, pool, pool->chunk_size, max_chunks, opts); 234 } 235 236 void Curl_bufq_free(struct bufq *q) 237 { 238 chunk_list_free(&q->head); 239 chunk_list_free(&q->spare); 240 q->tail = NULL; 241 q->chunk_count = 0; 242 } 243 244 void Curl_bufq_reset(struct bufq *q) 245 { 246 struct buf_chunk *chunk; 247 while(q->head) { 248 chunk = q->head; 249 q->head = chunk->next; 250 chunk->next = q->spare; 251 q->spare = chunk; 252 } 253 q->tail = NULL; 254 } 255 256 size_t Curl_bufq_len(const struct bufq *q) 257 { 258 const struct buf_chunk *chunk = q->head; 259 size_t len = 0; 260 while(chunk) { 261 len += chunk_len(chunk); 262 chunk = chunk->next; 263 } 264 return len; 265 } 266 267 bool Curl_bufq_is_empty(const struct bufq *q) 268 { 269 return !q->head || chunk_is_empty(q->head); 270 } 271 272 bool Curl_bufq_is_full(const struct bufq *q) 273 { 274 if(!q->tail || q->spare) 275 return FALSE; 276 if(q->chunk_count < q->max_chunks) 277 return FALSE; 278 if(q->chunk_count > q->max_chunks) 279 return TRUE; 280 /* we have no spares and cannot make more, is the tail full? */ 281 return chunk_is_full(q->tail); 282 } 283 284 static struct buf_chunk *get_spare(struct bufq *q) 285 { 286 struct buf_chunk *chunk = NULL; 287 288 if(q->spare) { 289 chunk = q->spare; 290 q->spare = chunk->next; 291 chunk_reset(chunk); 292 return chunk; 293 } 294 295 if(q->chunk_count >= q->max_chunks && (!(q->opts & BUFQ_OPT_SOFT_LIMIT))) 296 return NULL; 297 298 if(q->pool) { 299 if(bufcp_take(q->pool, &chunk)) 300 return NULL; 301 ++q->chunk_count; 302 return chunk; 303 } 304 else { 305 chunk = calloc(1, sizeof(*chunk) + q->chunk_size); 306 if(!chunk) 307 return NULL; 308 chunk->dlen = q->chunk_size; 309 ++q->chunk_count; 310 return chunk; 311 } 312 } 313 314 static void prune_head(struct bufq *q) 315 { 316 struct buf_chunk *chunk; 317 318 while(q->head && chunk_is_empty(q->head)) { 319 chunk = q->head; 320 q->head = chunk->next; 321 if(q->tail == chunk) 322 q->tail = q->head; 323 if(q->pool) { 324 bufcp_put(q->pool, chunk); 325 --q->chunk_count; 326 } 327 else if((q->chunk_count > q->max_chunks) || 328 (q->opts & BUFQ_OPT_NO_SPARES)) { 329 /* SOFT_LIMIT allowed us more than max. free spares until 330 * we are at max again. Or free them if we are configured 331 * to not use spares. */ 332 free(chunk); 333 --q->chunk_count; 334 } 335 else { 336 chunk->next = q->spare; 337 q->spare = chunk; 338 } 339 } 340 } 341 342 static struct buf_chunk *get_non_full_tail(struct bufq *q) 343 { 344 struct buf_chunk *chunk; 345 346 if(q->tail && !chunk_is_full(q->tail)) 347 return q->tail; 348 chunk = get_spare(q); 349 if(chunk) { 350 /* new tail, and possibly new head */ 351 if(q->tail) { 352 q->tail->next = chunk; 353 q->tail = chunk; 354 } 355 else { 356 DEBUGASSERT(!q->head); 357 q->head = q->tail = chunk; 358 } 359 } 360 return chunk; 361 } 362 363 CURLcode Curl_bufq_write(struct bufq *q, 364 const unsigned char *buf, size_t len, 365 size_t *pnwritten) 366 { 367 struct buf_chunk *tail; 368 size_t n; 369 370 DEBUGASSERT(q->max_chunks > 0); 371 *pnwritten = 0; 372 while(len) { 373 tail = get_non_full_tail(q); 374 if(!tail) { 375 if((q->chunk_count < q->max_chunks) || (q->opts & BUFQ_OPT_SOFT_LIMIT)) 376 /* should have gotten a tail, but did not */ 377 return CURLE_OUT_OF_MEMORY; 378 break; 379 } 380 n = chunk_append(tail, buf, len); 381 if(!n) 382 break; 383 *pnwritten += n; 384 buf += n; 385 len -= n; 386 } 387 return (!*pnwritten && len) ? CURLE_AGAIN : CURLE_OK; 388 } 389 390 CURLcode Curl_bufq_cwrite(struct bufq *q, 391 const char *buf, size_t len, 392 size_t *pnwritten) 393 { 394 return Curl_bufq_write(q, (const unsigned char *)buf, len, pnwritten); 395 } 396 397 CURLcode Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len, 398 size_t *pnread) 399 { 400 *pnread = 0; 401 while(len && q->head) { 402 size_t n = chunk_read(q->head, buf, len); 403 if(n) { 404 *pnread += n; 405 buf += n; 406 len -= n; 407 } 408 prune_head(q); 409 } 410 return (!*pnread) ? CURLE_AGAIN : CURLE_OK; 411 } 412 413 CURLcode Curl_bufq_cread(struct bufq *q, char *buf, size_t len, 414 size_t *pnread) 415 { 416 return Curl_bufq_read(q, (unsigned char *)buf, len, pnread); 417 } 418 419 bool Curl_bufq_peek(struct bufq *q, 420 const unsigned char **pbuf, size_t *plen) 421 { 422 if(q->head && chunk_is_empty(q->head)) { 423 prune_head(q); 424 } 425 if(q->head && !chunk_is_empty(q->head)) { 426 chunk_peek(q->head, pbuf, plen); 427 return TRUE; 428 } 429 *pbuf = NULL; 430 *plen = 0; 431 return FALSE; 432 } 433 434 bool Curl_bufq_peek_at(struct bufq *q, size_t offset, 435 const unsigned char **pbuf, size_t *plen) 436 { 437 struct buf_chunk *c = q->head; 438 size_t clen; 439 440 while(c) { 441 clen = chunk_len(c); 442 if(!clen) 443 break; 444 if(offset >= clen) { 445 offset -= clen; 446 c = c->next; 447 continue; 448 } 449 chunk_peek_at(c, offset, pbuf, plen); 450 return TRUE; 451 } 452 *pbuf = NULL; 453 *plen = 0; 454 return FALSE; 455 } 456 457 void Curl_bufq_skip(struct bufq *q, size_t amount) 458 { 459 size_t n; 460 461 while(amount && q->head) { 462 n = chunk_skip(q->head, amount); 463 amount -= n; 464 prune_head(q); 465 } 466 } 467 468 CURLcode Curl_bufq_pass(struct bufq *q, Curl_bufq_writer *writer, 469 void *writer_ctx, size_t *pwritten) 470 { 471 const unsigned char *buf; 472 size_t blen; 473 CURLcode result = CURLE_OK; 474 475 *pwritten = 0; 476 while(Curl_bufq_peek(q, &buf, &blen)) { 477 size_t chunk_written; 478 479 result = writer(writer_ctx, buf, blen, &chunk_written); 480 if(result) { 481 if((result == CURLE_AGAIN) && *pwritten) { 482 /* blocked on subsequent write, report success */ 483 result = CURLE_OK; 484 } 485 break; 486 } 487 if(!chunk_written) { 488 if(!*pwritten) { 489 /* treat as blocked */ 490 result = CURLE_AGAIN; 491 } 492 break; 493 } 494 *pwritten += chunk_written; 495 Curl_bufq_skip(q, chunk_written); 496 } 497 return result; 498 } 499 500 CURLcode Curl_bufq_write_pass(struct bufq *q, 501 const unsigned char *buf, size_t len, 502 Curl_bufq_writer *writer, void *writer_ctx, 503 size_t *pwritten) 504 { 505 CURLcode result = CURLE_OK; 506 size_t n; 507 508 *pwritten = 0; 509 while(len) { 510 if(Curl_bufq_is_full(q)) { 511 /* try to make room in case we are full */ 512 result = Curl_bufq_pass(q, writer, writer_ctx, &n); 513 if(result) { 514 if(result != CURLE_AGAIN) { 515 /* real error, fail */ 516 return result; 517 } 518 /* would block, bufq is full, give up */ 519 break; 520 } 521 } 522 523 /* Add to bufq as much as there is room for */ 524 result = Curl_bufq_write(q, buf, len, &n); 525 if(result) { 526 if(result != CURLE_AGAIN) 527 /* real error, fail */ 528 return result; 529 if((result == CURLE_AGAIN) && *pwritten) 530 /* we did write successfully before */ 531 result = CURLE_OK; 532 return result; 533 } 534 else if(n == 0) 535 /* edge case of writer returning 0 (and len is >0) 536 * break or we might enter an infinite loop here */ 537 break; 538 539 /* Track what we added to bufq */ 540 buf += n; 541 len -= n; 542 *pwritten += n; 543 } 544 545 return (!*pwritten && len) ? CURLE_AGAIN : CURLE_OK; 546 } 547 548 CURLcode Curl_bufq_sipn(struct bufq *q, size_t max_len, 549 Curl_bufq_reader *reader, void *reader_ctx, 550 size_t *pnread) 551 { 552 struct buf_chunk *tail = NULL; 553 554 *pnread = 0; 555 tail = get_non_full_tail(q); 556 if(!tail) { 557 if(q->chunk_count < q->max_chunks) 558 return CURLE_OUT_OF_MEMORY; 559 /* full, blocked */ 560 return CURLE_AGAIN; 561 } 562 563 return chunk_slurpn(tail, max_len, reader, reader_ctx, pnread); 564 } 565 566 /** 567 * Read up to `max_len` bytes and append it to the end of the buffer queue. 568 * if `max_len` is 0, no limit is imposed and the call behaves exactly 569 * the same as `Curl_bufq_slurp()`. 570 * Returns the total amount of buf read (may be 0) in `pnread` or error 571 * Note that even in case of an error chunks may have been read and 572 * the buffer queue will have different length than before. 573 */ 574 static CURLcode bufq_slurpn(struct bufq *q, size_t max_len, 575 Curl_bufq_reader *reader, void *reader_ctx, 576 size_t *pnread) 577 { 578 CURLcode result; 579 580 *pnread = 0; 581 while(1) { 582 size_t n; 583 result = Curl_bufq_sipn(q, max_len, reader, reader_ctx, &n); 584 if(result) { 585 if(!*pnread || result != CURLE_AGAIN) { 586 /* blocked on first read or real error, fail */ 587 return result; 588 } 589 result = CURLE_OK; 590 break; 591 } 592 else if(n == 0) { 593 /* eof */ 594 result = CURLE_OK; 595 break; 596 } 597 *pnread += n; 598 if(max_len) { 599 DEBUGASSERT(n <= max_len); 600 max_len -= n; 601 if(!max_len) 602 break; 603 } 604 /* give up slurping when we get less bytes than we asked for */ 605 if(q->tail && !chunk_is_full(q->tail)) 606 break; 607 } 608 return result; 609 } 610 611 CURLcode Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader, 612 void *reader_ctx, size_t *pnread) 613 { 614 return bufq_slurpn(q, 0, reader, reader_ctx, pnread); 615 }