cw-pause.c (7388B)
1 /*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 9 * 10 * This software is licensed as described in the file COPYING, which 11 * you should have received as part of this distribution. The terms 12 * are also available at https://curl.se/docs/copyright.html. 13 * 14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 15 * copies of the Software, and permit persons to whom the Software is 16 * furnished to do so, under the terms of the COPYING file. 17 * 18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 19 * KIND, either express or implied. 20 * 21 * SPDX-License-Identifier: curl 22 * 23 ***************************************************************************/ 24 25 #include "curl_setup.h" 26 27 #include <curl/curl.h> 28 29 #include "urldata.h" 30 #include "bufq.h" 31 #include "cfilters.h" 32 #include "headers.h" 33 #include "multiif.h" 34 #include "sendf.h" 35 #include "cw-pause.h" 36 37 /* The last 3 #include files should be in this order */ 38 #include "curl_printf.h" 39 #include "curl_memory.h" 40 #include "memdebug.h" 41 42 43 /* body dynbuf sizes */ 44 #define CW_PAUSE_BUF_CHUNK (16 * 1024) 45 /* when content decoding, write data in chunks */ 46 #define CW_PAUSE_DEC_WRITE_CHUNK (4096) 47 48 struct cw_pause_buf { 49 struct cw_pause_buf *next; 50 struct bufq b; 51 int type; 52 }; 53 54 static struct cw_pause_buf *cw_pause_buf_create(int type, size_t buflen) 55 { 56 struct cw_pause_buf *cwbuf = calloc(1, sizeof(*cwbuf)); 57 if(cwbuf) { 58 cwbuf->type = type; 59 if(type & CLIENTWRITE_BODY) 60 Curl_bufq_init2(&cwbuf->b, CW_PAUSE_BUF_CHUNK, 1, 61 (BUFQ_OPT_SOFT_LIMIT|BUFQ_OPT_NO_SPARES)); 62 else 63 Curl_bufq_init(&cwbuf->b, buflen, 1); 64 } 65 return cwbuf; 66 } 67 68 static void cw_pause_buf_free(struct cw_pause_buf *cwbuf) 69 { 70 if(cwbuf) { 71 Curl_bufq_free(&cwbuf->b); 72 free(cwbuf); 73 } 74 } 75 76 struct cw_pause_ctx { 77 struct Curl_cwriter super; 78 struct cw_pause_buf *buf; 79 size_t buf_total; 80 }; 81 82 static CURLcode cw_pause_write(struct Curl_easy *data, 83 struct Curl_cwriter *writer, int type, 84 const char *buf, size_t nbytes); 85 static void cw_pause_close(struct Curl_easy *data, 86 struct Curl_cwriter *writer); 87 static CURLcode cw_pause_init(struct Curl_easy *data, 88 struct Curl_cwriter *writer); 89 90 const struct Curl_cwtype Curl_cwt_pause = { 91 "cw-pause", 92 NULL, 93 cw_pause_init, 94 cw_pause_write, 95 cw_pause_close, 96 sizeof(struct cw_pause_ctx) 97 }; 98 99 static CURLcode cw_pause_init(struct Curl_easy *data, 100 struct Curl_cwriter *writer) 101 { 102 struct cw_pause_ctx *ctx = writer->ctx; 103 (void)data; 104 ctx->buf = NULL; 105 return CURLE_OK; 106 } 107 108 static void cw_pause_bufs_free(struct cw_pause_ctx *ctx) 109 { 110 while(ctx->buf) { 111 struct cw_pause_buf *next = ctx->buf->next; 112 cw_pause_buf_free(ctx->buf); 113 ctx->buf = next; 114 } 115 } 116 117 static void cw_pause_close(struct Curl_easy *data, struct Curl_cwriter *writer) 118 { 119 struct cw_pause_ctx *ctx = writer->ctx; 120 121 (void)data; 122 cw_pause_bufs_free(ctx); 123 } 124 125 static CURLcode cw_pause_flush(struct Curl_easy *data, 126 struct Curl_cwriter *cw_pause) 127 { 128 struct cw_pause_ctx *ctx = (struct cw_pause_ctx *)cw_pause; 129 bool decoding = Curl_cwriter_is_content_decoding(data); 130 CURLcode result = CURLE_OK; 131 132 /* write the end of the chain until it blocks or gets empty */ 133 while(ctx->buf && !Curl_cwriter_is_paused(data)) { 134 struct cw_pause_buf **plast = &ctx->buf; 135 size_t blen, wlen = 0; 136 const unsigned char *buf = NULL; 137 138 while((*plast)->next) /* got to last in list */ 139 plast = &(*plast)->next; 140 if(Curl_bufq_peek(&(*plast)->b, &buf, &blen)) { 141 wlen = (decoding && ((*plast)->type & CLIENTWRITE_BODY)) ? 142 CURLMIN(blen, CW_PAUSE_DEC_WRITE_CHUNK) : blen; 143 result = Curl_cwriter_write(data, cw_pause->next, (*plast)->type, 144 (const char *)buf, wlen); 145 CURL_TRC_WRITE(data, "[PAUSE] flushed %zu/%zu bytes, type=%x -> %d", 146 wlen, ctx->buf_total, (*plast)->type, result); 147 Curl_bufq_skip(&(*plast)->b, wlen); 148 DEBUGASSERT(ctx->buf_total >= wlen); 149 ctx->buf_total -= wlen; 150 if(result) 151 return result; 152 } 153 else if((*plast)->type & CLIENTWRITE_EOS) { 154 result = Curl_cwriter_write(data, cw_pause->next, (*plast)->type, 155 (const char *)buf, 0); 156 CURL_TRC_WRITE(data, "[PAUSE] flushed 0/%zu bytes, type=%x -> %d", 157 ctx->buf_total, (*plast)->type, result); 158 } 159 160 if(Curl_bufq_is_empty(&(*plast)->b)) { 161 cw_pause_buf_free(*plast); 162 *plast = NULL; 163 } 164 } 165 return result; 166 } 167 168 static CURLcode cw_pause_write(struct Curl_easy *data, 169 struct Curl_cwriter *writer, int type, 170 const char *buf, size_t blen) 171 { 172 struct cw_pause_ctx *ctx = writer->ctx; 173 CURLcode result = CURLE_OK; 174 size_t wlen = 0; 175 bool decoding = Curl_cwriter_is_content_decoding(data); 176 177 if(ctx->buf && !Curl_cwriter_is_paused(data)) { 178 result = cw_pause_flush(data, writer); 179 if(result) 180 return result; 181 } 182 183 while(!ctx->buf && !Curl_cwriter_is_paused(data)) { 184 int wtype = type; 185 DEBUGASSERT(!ctx->buf); 186 /* content decoding might blow up size considerably, write smaller 187 * chunks to make pausing need buffer less. */ 188 wlen = (decoding && (type & CLIENTWRITE_BODY)) ? 189 CURLMIN(blen, CW_PAUSE_DEC_WRITE_CHUNK) : blen; 190 if(wlen < blen) 191 wtype &= ~CLIENTWRITE_EOS; 192 result = Curl_cwriter_write(data, writer->next, wtype, buf, wlen); 193 CURL_TRC_WRITE(data, "[PAUSE] writing %zu/%zu bytes of type %x -> %d", 194 wlen, blen, wtype, result); 195 if(result) 196 return result; 197 buf += wlen; 198 blen -= wlen; 199 if(!blen) 200 return result; 201 } 202 203 do { 204 size_t nwritten = 0; 205 if(ctx->buf && (ctx->buf->type == type) && (type & CLIENTWRITE_BODY)) { 206 /* same type and body, append to current buffer which has a soft 207 * limit and should take everything up to OOM. */ 208 result = Curl_bufq_cwrite(&ctx->buf->b, buf, blen, &nwritten); 209 } 210 else { 211 /* Need a new buf, type changed */ 212 struct cw_pause_buf *cwbuf = cw_pause_buf_create(type, blen); 213 if(!cwbuf) 214 return CURLE_OUT_OF_MEMORY; 215 cwbuf->next = ctx->buf; 216 ctx->buf = cwbuf; 217 result = Curl_bufq_cwrite(&ctx->buf->b, buf, blen, &nwritten); 218 } 219 CURL_TRC_WRITE(data, "[PAUSE] buffer %zu more bytes of type %x, " 220 "total=%zu -> %d", nwritten, type, ctx->buf_total + wlen, 221 result); 222 if(result) 223 return result; 224 buf += nwritten; 225 blen -= nwritten; 226 ctx->buf_total += nwritten; 227 } while(blen); 228 229 return result; 230 } 231 232 CURLcode Curl_cw_pause_flush(struct Curl_easy *data) 233 { 234 struct Curl_cwriter *cw_pause; 235 CURLcode result = CURLE_OK; 236 237 cw_pause = Curl_cwriter_get_by_type(data, &Curl_cwt_pause); 238 if(cw_pause) 239 result = cw_pause_flush(data, cw_pause); 240 241 return result; 242 }