cw-out.c (14753B)
1 /*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 9 * 10 * This software is licensed as described in the file COPYING, which 11 * you should have received as part of this distribution. The terms 12 * are also available at https://curl.se/docs/copyright.html. 13 * 14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 15 * copies of the Software, and permit persons to whom the Software is 16 * furnished to do so, under the terms of the COPYING file. 17 * 18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 19 * KIND, either express or implied. 20 * 21 * SPDX-License-Identifier: curl 22 * 23 ***************************************************************************/ 24 25 #include "curl_setup.h" 26 27 #include <curl/curl.h> 28 29 #include "urldata.h" 30 #include "cfilters.h" 31 #include "headers.h" 32 #include "multiif.h" 33 #include "sendf.h" 34 #include "transfer.h" 35 #include "cw-out.h" 36 #include "cw-pause.h" 37 38 /* The last 3 #include files should be in this order */ 39 #include "curl_printf.h" 40 #include "curl_memory.h" 41 #include "memdebug.h" 42 43 44 /** 45 * OVERALL DESIGN of this client writer 46 * 47 * The 'cw-out' writer is supposed to be the last writer in a transfer's 48 * stack. It is always added when that stack is initialized. Its purpose 49 * is to pass BODY and HEADER bytes to the client-installed callback 50 * functions. 51 * 52 * These callback may return `CURL_WRITEFUNC_PAUSE` to indicate that the 53 * data had not been written and the whole transfer should stop receiving 54 * new data. Or at least, stop calling the functions. When the transfer 55 * is "unpaused" by the client, the previous data shall be passed as 56 * if nothing happened. 57 * 58 * The `cw-out` writer therefore manages buffers for bytes that could 59 * not be written. Data that was already in flight from the server also 60 * needs buffering on paused transfer when it arrives. 61 * 62 * In addition, the writer allows buffering of "small" body writes, 63 * so client functions are called less often. That is only enabled on a 64 * number of conditions. 65 * 66 * HEADER and BODY data may arrive in any order. For paused transfers, 67 * a list of `struct cw_out_buf` is kept for `cw_out_type` types. The 68 * list may be: [BODY]->[HEADER]->[BODY]->[HEADER].... 69 * When unpausing, this list is "played back" to the client callbacks. 70 * 71 * The amount of bytes being buffered is limited by `DYN_PAUSE_BUFFER` 72 * and when that is exceeded `CURLE_TOO_LARGE` is returned as error. 73 */ 74 typedef enum { 75 CW_OUT_NONE, 76 CW_OUT_BODY, 77 CW_OUT_HDS 78 } cw_out_type; 79 80 struct cw_out_buf { 81 struct cw_out_buf *next; 82 struct dynbuf b; 83 cw_out_type type; 84 }; 85 86 static struct cw_out_buf *cw_out_buf_create(cw_out_type otype) 87 { 88 struct cw_out_buf *cwbuf = calloc(1, sizeof(*cwbuf)); 89 if(cwbuf) { 90 cwbuf->type = otype; 91 curlx_dyn_init(&cwbuf->b, DYN_PAUSE_BUFFER); 92 } 93 return cwbuf; 94 } 95 96 static void cw_out_buf_free(struct cw_out_buf *cwbuf) 97 { 98 if(cwbuf) { 99 curlx_dyn_free(&cwbuf->b); 100 free(cwbuf); 101 } 102 } 103 104 struct cw_out_ctx { 105 struct Curl_cwriter super; 106 struct cw_out_buf *buf; 107 BIT(paused); 108 BIT(errored); 109 }; 110 111 static CURLcode cw_out_write(struct Curl_easy *data, 112 struct Curl_cwriter *writer, int type, 113 const char *buf, size_t nbytes); 114 static void cw_out_close(struct Curl_easy *data, struct Curl_cwriter *writer); 115 static CURLcode cw_out_init(struct Curl_easy *data, 116 struct Curl_cwriter *writer); 117 118 const struct Curl_cwtype Curl_cwt_out = { 119 "cw-out", 120 NULL, 121 cw_out_init, 122 cw_out_write, 123 cw_out_close, 124 sizeof(struct cw_out_ctx) 125 }; 126 127 static CURLcode cw_out_init(struct Curl_easy *data, 128 struct Curl_cwriter *writer) 129 { 130 struct cw_out_ctx *ctx = writer->ctx; 131 (void)data; 132 ctx->buf = NULL; 133 return CURLE_OK; 134 } 135 136 static void cw_out_bufs_free(struct cw_out_ctx *ctx) 137 { 138 while(ctx->buf) { 139 struct cw_out_buf *next = ctx->buf->next; 140 cw_out_buf_free(ctx->buf); 141 ctx->buf = next; 142 } 143 } 144 145 static size_t cw_out_bufs_len(struct cw_out_ctx *ctx) 146 { 147 struct cw_out_buf *cwbuf = ctx->buf; 148 size_t len = 0; 149 while(cwbuf) { 150 len += curlx_dyn_len(&cwbuf->b); 151 cwbuf = cwbuf->next; 152 } 153 return len; 154 } 155 156 static void cw_out_close(struct Curl_easy *data, struct Curl_cwriter *writer) 157 { 158 struct cw_out_ctx *ctx = writer->ctx; 159 160 (void)data; 161 cw_out_bufs_free(ctx); 162 } 163 164 /** 165 * Return the current curl_write_callback and user_data for the buf type 166 */ 167 static void cw_get_writefunc(struct Curl_easy *data, cw_out_type otype, 168 curl_write_callback *pwcb, void **pwcb_data, 169 size_t *pmax_write, size_t *pmin_write) 170 { 171 switch(otype) { 172 case CW_OUT_BODY: 173 *pwcb = data->set.fwrite_func; 174 *pwcb_data = data->set.out; 175 *pmax_write = CURL_MAX_WRITE_SIZE; 176 /* if we ever want buffering of BODY output, we can set `min_write` 177 * the preferred size. The default should always be to pass data 178 * to the client as it comes without delay */ 179 *pmin_write = 0; 180 break; 181 case CW_OUT_HDS: 182 *pwcb = data->set.fwrite_header ? data->set.fwrite_header : 183 (data->set.writeheader ? data->set.fwrite_func : NULL); 184 *pwcb_data = data->set.writeheader; 185 *pmax_write = 0; /* do not chunk-write headers, write them as they are */ 186 *pmin_write = 0; 187 break; 188 default: 189 *pwcb = NULL; 190 *pwcb_data = NULL; 191 *pmax_write = CURL_MAX_WRITE_SIZE; 192 *pmin_write = 0; 193 } 194 } 195 196 static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx, 197 struct Curl_easy *data, 198 cw_out_type otype, 199 bool flush_all, 200 const char *buf, size_t blen, 201 size_t *pconsumed) 202 { 203 curl_write_callback wcb = NULL; 204 void *wcb_data; 205 size_t max_write, min_write; 206 size_t wlen, nwritten; 207 208 /* If we errored once, we do not invoke the client callback again */ 209 if(ctx->errored) 210 return CURLE_WRITE_ERROR; 211 212 /* write callbacks may get NULLed by the client between calls. */ 213 cw_get_writefunc(data, otype, &wcb, &wcb_data, &max_write, &min_write); 214 if(!wcb) { 215 *pconsumed = blen; 216 return CURLE_OK; 217 } 218 219 *pconsumed = 0; 220 while(blen && !ctx->paused) { 221 if(!flush_all && blen < min_write) 222 break; 223 wlen = max_write ? CURLMIN(blen, max_write) : blen; 224 Curl_set_in_callback(data, TRUE); 225 nwritten = wcb((char *)CURL_UNCONST(buf), 1, wlen, wcb_data); 226 Curl_set_in_callback(data, FALSE); 227 CURL_TRC_WRITE(data, "[OUT] wrote %zu %s bytes -> %zu", 228 wlen, (otype == CW_OUT_BODY) ? "body" : "header", 229 nwritten); 230 if(CURL_WRITEFUNC_PAUSE == nwritten) { 231 if(data->conn && data->conn->handler->flags & PROTOPT_NONETWORK) { 232 /* Protocols that work without network cannot be paused. This is 233 actually only FILE:// just now, and it cannot pause since the 234 transfer is not done using the "normal" procedure. */ 235 failf(data, "Write callback asked for PAUSE when not supported"); 236 return CURLE_WRITE_ERROR; 237 } 238 ctx->paused = TRUE; 239 CURL_TRC_WRITE(data, "[OUT] PAUSE requested by client"); 240 return Curl_xfer_pause_recv(data, TRUE); 241 } 242 else if(CURL_WRITEFUNC_ERROR == nwritten) { 243 failf(data, "client returned ERROR on write of %zu bytes", wlen); 244 return CURLE_WRITE_ERROR; 245 } 246 else if(nwritten != wlen) { 247 failf(data, "Failure writing output to destination, " 248 "passed %zu returned %zd", wlen, nwritten); 249 return CURLE_WRITE_ERROR; 250 } 251 *pconsumed += nwritten; 252 blen -= nwritten; 253 buf += nwritten; 254 } 255 return CURLE_OK; 256 } 257 258 static CURLcode cw_out_buf_flush(struct cw_out_ctx *ctx, 259 struct Curl_easy *data, 260 struct cw_out_buf *cwbuf, 261 bool flush_all) 262 { 263 CURLcode result = CURLE_OK; 264 265 if(curlx_dyn_len(&cwbuf->b)) { 266 size_t consumed; 267 268 result = cw_out_ptr_flush(ctx, data, cwbuf->type, flush_all, 269 curlx_dyn_ptr(&cwbuf->b), 270 curlx_dyn_len(&cwbuf->b), 271 &consumed); 272 if(result) 273 return result; 274 275 if(consumed) { 276 if(consumed == curlx_dyn_len(&cwbuf->b)) { 277 curlx_dyn_free(&cwbuf->b); 278 } 279 else { 280 DEBUGASSERT(consumed < curlx_dyn_len(&cwbuf->b)); 281 result = curlx_dyn_tail(&cwbuf->b, 282 curlx_dyn_len(&cwbuf->b) - consumed); 283 if(result) 284 return result; 285 } 286 } 287 } 288 return result; 289 } 290 291 static CURLcode cw_out_flush_chain(struct cw_out_ctx *ctx, 292 struct Curl_easy *data, 293 struct cw_out_buf **pcwbuf, 294 bool flush_all) 295 { 296 struct cw_out_buf *cwbuf = *pcwbuf; 297 CURLcode result; 298 299 if(!cwbuf) 300 return CURLE_OK; 301 if(ctx->paused) 302 return CURLE_OK; 303 304 /* write the end of the chain until it blocks or gets empty */ 305 while(cwbuf->next) { 306 struct cw_out_buf **plast = &cwbuf->next; 307 while((*plast)->next) 308 plast = &(*plast)->next; 309 result = cw_out_flush_chain(ctx, data, plast, flush_all); 310 if(result) 311 return result; 312 if(*plast) { 313 /* could not write last, paused again? */ 314 DEBUGASSERT(ctx->paused); 315 return CURLE_OK; 316 } 317 } 318 319 result = cw_out_buf_flush(ctx, data, cwbuf, flush_all); 320 if(result) 321 return result; 322 if(!curlx_dyn_len(&cwbuf->b)) { 323 cw_out_buf_free(cwbuf); 324 *pcwbuf = NULL; 325 } 326 return CURLE_OK; 327 } 328 329 static CURLcode cw_out_append(struct cw_out_ctx *ctx, 330 struct Curl_easy *data, 331 cw_out_type otype, 332 const char *buf, size_t blen) 333 { 334 CURL_TRC_WRITE(data, "[OUT] paused, buffering %zu more bytes (%zu/%d)", 335 blen, cw_out_bufs_len(ctx), DYN_PAUSE_BUFFER); 336 if(cw_out_bufs_len(ctx) + blen > DYN_PAUSE_BUFFER) { 337 failf(data, "pause buffer not large enough -> CURLE_TOO_LARGE"); 338 return CURLE_TOO_LARGE; 339 } 340 341 /* if we do not have a buffer, or it is of another type, make a new one. 342 * And for CW_OUT_HDS always make a new one, so we "replay" headers 343 * exactly as they came in */ 344 if(!ctx->buf || (ctx->buf->type != otype) || (otype == CW_OUT_HDS)) { 345 struct cw_out_buf *cwbuf = cw_out_buf_create(otype); 346 if(!cwbuf) 347 return CURLE_OUT_OF_MEMORY; 348 cwbuf->next = ctx->buf; 349 ctx->buf = cwbuf; 350 } 351 DEBUGASSERT(ctx->buf && (ctx->buf->type == otype)); 352 return curlx_dyn_addn(&ctx->buf->b, buf, blen); 353 } 354 355 static CURLcode cw_out_do_write(struct cw_out_ctx *ctx, 356 struct Curl_easy *data, 357 cw_out_type otype, 358 bool flush_all, 359 const char *buf, size_t blen) 360 { 361 CURLcode result = CURLE_OK; 362 363 /* if we have buffered data and it is a different type than what 364 * we are writing now, try to flush all */ 365 if(ctx->buf && ctx->buf->type != otype) { 366 result = cw_out_flush_chain(ctx, data, &ctx->buf, TRUE); 367 if(result) 368 goto out; 369 } 370 371 if(ctx->buf) { 372 /* still have buffered data, append and flush */ 373 result = cw_out_append(ctx, data, otype, buf, blen); 374 if(result) 375 return result; 376 result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all); 377 if(result) 378 goto out; 379 } 380 else { 381 /* nothing buffered, try direct write */ 382 size_t consumed; 383 result = cw_out_ptr_flush(ctx, data, otype, flush_all, 384 buf, blen, &consumed); 385 if(result) 386 return result; 387 if(consumed < blen) { 388 /* did not write all, append the rest */ 389 result = cw_out_append(ctx, data, otype, 390 buf + consumed, blen - consumed); 391 if(result) 392 goto out; 393 } 394 } 395 396 out: 397 if(result) { 398 /* We do not want to invoked client callbacks a second time after 399 * encountering an error. See issue #13337 */ 400 ctx->errored = TRUE; 401 cw_out_bufs_free(ctx); 402 } 403 return result; 404 } 405 406 static CURLcode cw_out_write(struct Curl_easy *data, 407 struct Curl_cwriter *writer, int type, 408 const char *buf, size_t blen) 409 { 410 struct cw_out_ctx *ctx = writer->ctx; 411 CURLcode result; 412 bool flush_all = !!(type & CLIENTWRITE_EOS); 413 414 if((type & CLIENTWRITE_BODY) || 415 ((type & CLIENTWRITE_HEADER) && data->set.include_header)) { 416 result = cw_out_do_write(ctx, data, CW_OUT_BODY, flush_all, buf, blen); 417 if(result) 418 return result; 419 } 420 421 if(type & (CLIENTWRITE_HEADER|CLIENTWRITE_INFO)) { 422 result = cw_out_do_write(ctx, data, CW_OUT_HDS, flush_all, buf, blen); 423 if(result) 424 return result; 425 } 426 427 return CURLE_OK; 428 } 429 430 bool Curl_cw_out_is_paused(struct Curl_easy *data) 431 { 432 struct Curl_cwriter *cw_out; 433 struct cw_out_ctx *ctx; 434 435 cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out); 436 if(!cw_out) 437 return FALSE; 438 439 ctx = (struct cw_out_ctx *)cw_out; 440 return ctx->paused; 441 } 442 443 static CURLcode cw_out_flush(struct Curl_easy *data, 444 struct Curl_cwriter *cw_out, 445 bool flush_all) 446 { 447 struct cw_out_ctx *ctx = (struct cw_out_ctx *)cw_out; 448 CURLcode result = CURLE_OK; 449 450 if(ctx->errored) 451 return CURLE_WRITE_ERROR; 452 if(ctx->paused) 453 return CURLE_OK; /* not doing it */ 454 455 result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all); 456 if(result) { 457 ctx->errored = TRUE; 458 cw_out_bufs_free(ctx); 459 return result; 460 } 461 return result; 462 } 463 464 CURLcode Curl_cw_out_unpause(struct Curl_easy *data) 465 { 466 struct Curl_cwriter *cw_out; 467 CURLcode result = CURLE_OK; 468 469 cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out); 470 if(cw_out) { 471 struct cw_out_ctx *ctx = (struct cw_out_ctx *)cw_out; 472 CURL_TRC_WRITE(data, "[OUT] unpause"); 473 ctx->paused = FALSE; 474 result = Curl_cw_pause_flush(data); 475 if(!result) 476 result = cw_out_flush(data, cw_out, FALSE); 477 } 478 return result; 479 } 480 481 CURLcode Curl_cw_out_done(struct Curl_easy *data) 482 { 483 struct Curl_cwriter *cw_out; 484 CURLcode result = CURLE_OK; 485 486 cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out); 487 if(cw_out) { 488 CURL_TRC_WRITE(data, "[OUT] done"); 489 result = Curl_cw_pause_flush(data); 490 if(!result) 491 result = cw_out_flush(data, cw_out, TRUE); 492 } 493 return result; 494 }