curl.py (40095B)
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 #*************************************************************************** 4 # _ _ ____ _ 5 # Project ___| | | | _ \| | 6 # / __| | | | |_) | | 7 # | (__| |_| | _ <| |___ 8 # \___|\___/|_| \_\_____| 9 # 10 # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 11 # 12 # This software is licensed as described in the file COPYING, which 13 # you should have received as part of this distribution. The terms 14 # are also available at https://curl.se/docs/copyright.html. 15 # 16 # You may opt to use, copy, modify, merge, publish, distribute and/or sell 17 # copies of the Software, and permit persons to whom the Software is 18 # furnished to do so, under the terms of the COPYING file. 19 # 20 # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 21 # KIND, either express or implied. 22 # 23 # SPDX-License-Identifier: curl 24 # 25 ########################################################################### 26 # 27 import json 28 import logging 29 import os 30 import sys 31 import time 32 from threading import Thread 33 34 import psutil 35 import re 36 import shutil 37 import subprocess 38 from statistics import mean, fmean 39 from datetime import timedelta, datetime 40 from typing import List, Optional, Dict, Union, Any 41 from urllib.parse import urlparse 42 43 from .env import Env 44 45 46 log = logging.getLogger(__name__) 47 48 49 class RunProfile: 50 51 STAT_KEYS = ['cpu', 'rss', 'vsz'] 52 53 @classmethod 54 def AverageStats(cls, profiles: List['RunProfile']): 55 avg = {} 56 stats = [p.stats for p in profiles] 57 for key in cls.STAT_KEYS: 58 vals = [s[key] for s in stats] 59 avg[key] = mean(vals) if len(vals) else 0.0 60 return avg 61 62 def __init__(self, pid: int, started_at: datetime, run_dir): 63 self._pid = pid 64 self._started_at = started_at 65 self._duration = timedelta(seconds=0) 66 self._run_dir = run_dir 67 self._samples = [] 68 self._psu = None 69 self._stats = None 70 71 @property 72 def duration(self) -> timedelta: 73 return self._duration 74 75 @property 76 def stats(self) -> Optional[Dict[str,Any]]: 77 return self._stats 78 79 def sample(self): 80 elapsed = datetime.now() - self._started_at 81 try: 82 if self._psu is None: 83 self._psu = psutil.Process(pid=self._pid) 84 mem = self._psu.memory_info() 85 self._samples.append({ 86 'time': elapsed, 87 'cpu': self._psu.cpu_percent(), 88 'vsz': mem.vms, 89 'rss': mem.rss, 90 }) 91 except psutil.NoSuchProcess: 92 pass 93 94 def finish(self): 95 self._duration = datetime.now() - self._started_at 96 if len(self._samples) > 0: 97 weights = [s['time'].total_seconds() for s in self._samples] 98 self._stats = {} 99 for key in self.STAT_KEYS: 100 self._stats[key] = fmean([s[key] for s in self._samples], weights) 101 else: 102 self._stats = None 103 self._psu = None 104 105 def __repr__(self): 106 return f'RunProfile[pid={self._pid}, '\ 107 f'duration={self.duration.total_seconds():.3f}s, '\ 108 f'stats={self.stats}]' 109 110 111 class DTraceProfile: 112 113 def __init__(self, pid: int, run_dir): 114 self._pid = pid 115 self._run_dir = run_dir 116 self._proc = None 117 self._rc = 0 118 self._file = os.path.join(self._run_dir, 'curl.user_stacks') 119 120 def start(self): 121 if os.path.exists(self._file): 122 os.remove(self._file) 123 args = [ 124 'sudo', 'dtrace', 125 '-x', 'ustackframes=100', 126 '-n', f'profile-97 /pid == {self._pid}/ {{ @[ustack()] = count(); }} tick-60s {{ exit(0); }}', 127 '-o', f'{self._file}' 128 ] 129 self._proc = subprocess.Popen(args, text=True, cwd=self._run_dir, shell=False) 130 assert self._proc 131 132 def finish(self): 133 if self._proc: 134 self._proc.terminate() 135 self._rc = self._proc.returncode 136 137 @property 138 def file(self): 139 return self._file 140 141 142 class RunTcpDump: 143 144 def __init__(self, env, run_dir): 145 self._env = env 146 self._run_dir = run_dir 147 self._proc = None 148 self._stdoutfile = os.path.join(self._run_dir, 'tcpdump.out') 149 self._stderrfile = os.path.join(self._run_dir, 'tcpdump.err') 150 151 def get_rsts(self, ports: List[int]|None = None) -> Optional[List[str]]: 152 if self._proc: 153 raise Exception('tcpdump still running') 154 lines = [] 155 for line in open(self._stdoutfile): 156 m = re.match(r'.* IP 127\.0\.0\.1\.(\d+) [<>] 127\.0\.0\.1\.(\d+):.*', line) 157 if m: 158 sport = int(m.group(1)) 159 dport = int(m.group(2)) 160 if ports is None or sport in ports or dport in ports: 161 lines.append(line) 162 return lines 163 164 @property 165 def stats(self) -> Optional[List[str]]: 166 return self.get_rsts() 167 168 @property 169 def stderr(self) -> List[str]: 170 if self._proc: 171 raise Exception('tcpdump still running') 172 return open(self._stderrfile).readlines() 173 174 def sample(self): 175 # not sure how to make that detection reliable for all platforms 176 local_if = 'lo0' if sys.platform.startswith('darwin') else 'lo' 177 try: 178 tcpdump = self._env.tcpdump() 179 if tcpdump is None: 180 raise Exception('tcpdump not available') 181 # look with tcpdump for TCP RST packets which indicate 182 # we did not shut down connections cleanly 183 args = [] 184 # at least on Linux, we need root permissions to run tcpdump 185 if sys.platform.startswith('linux'): 186 args.append('sudo') 187 args.extend([ 188 tcpdump, '-i', local_if, '-n', 'tcp[tcpflags] & (tcp-rst)!=0' 189 ]) 190 with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr: 191 self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr, 192 text=True, cwd=self._run_dir, 193 shell=False) 194 assert self._proc 195 assert self._proc.returncode is None 196 while self._proc: 197 try: 198 self._proc.wait(timeout=1) 199 except subprocess.TimeoutExpired: 200 pass 201 except Exception: 202 log.exception('Tcpdump') 203 204 def start(self): 205 def do_sample(): 206 self.sample() 207 t = Thread(target=do_sample) 208 t.start() 209 210 def finish(self): 211 if self._proc: 212 time.sleep(1) 213 self._proc.terminate() 214 self._proc = None 215 216 217 class ExecResult: 218 219 def __init__(self, args: List[str], exit_code: int, 220 stdout: List[str], stderr: List[str], 221 duration: Optional[timedelta] = None, 222 with_stats: bool = False, 223 exception: Optional[str] = None, 224 profile: Optional[RunProfile] = None, 225 tcpdump: Optional[RunTcpDump] = None): 226 self._args = args 227 self._exit_code = exit_code 228 self._exception = exception 229 self._stdout = stdout 230 self._stderr = stderr 231 self._profile = profile 232 self._tcpdump = tcpdump 233 self._duration = duration if duration is not None else timedelta() 234 self._response = None 235 self._responses = [] 236 self._results = {} 237 self._assets = [] 238 self._stats = [] 239 self._json_out = None 240 self._with_stats = with_stats 241 if with_stats: 242 self._parse_stats() 243 else: 244 # noinspection PyBroadException 245 try: 246 out = ''.join(self._stdout) 247 self._json_out = json.loads(out) 248 except: # noqa: E722 249 pass 250 251 def __repr__(self): 252 return f"ExecResult[code={self.exit_code}, exception={self._exception}, "\ 253 f"args={self._args}, stdout={self._stdout}, stderr={self._stderr}]" 254 255 def _parse_stats(self): 256 self._stats = [] 257 for line in self._stdout: 258 try: 259 self._stats.append(json.loads(line)) 260 # TODO: specify specific exceptions here 261 except: # noqa: E722 262 log.exception(f'not a JSON stat: {line}') 263 break 264 265 @property 266 def exit_code(self) -> int: 267 return self._exit_code 268 269 @property 270 def args(self) -> List[str]: 271 return self._args 272 273 @property 274 def outraw(self) -> bytes: 275 return ''.join(self._stdout).encode() 276 277 @property 278 def stdout(self) -> str: 279 return ''.join(self._stdout) 280 281 @property 282 def json(self) -> Optional[Dict]: 283 """Output as JSON dictionary or None if not parseable.""" 284 return self._json_out 285 286 @property 287 def stderr(self) -> str: 288 return ''.join(self._stderr) 289 290 @property 291 def trace_lines(self) -> List[str]: 292 return self._stderr 293 294 @property 295 def duration(self) -> timedelta: 296 return self._duration 297 298 @property 299 def profile(self) -> Optional[RunProfile]: 300 return self._profile 301 302 @property 303 def tcpdump(self) -> Optional[RunTcpDump]: 304 return self._tcpdump 305 306 @property 307 def response(self) -> Optional[Dict]: 308 return self._response 309 310 @property 311 def responses(self) -> List[Dict]: 312 return self._responses 313 314 @property 315 def results(self) -> Dict: 316 return self._results 317 318 @property 319 def assets(self) -> List: 320 return self._assets 321 322 @property 323 def with_stats(self) -> bool: 324 return self._with_stats 325 326 @property 327 def stats(self) -> List: 328 return self._stats 329 330 @property 331 def total_connects(self) -> Optional[int]: 332 if len(self.stats): 333 n = 0 334 for stat in self.stats: 335 n += stat['num_connects'] 336 return n 337 return None 338 339 def add_response(self, resp: Dict): 340 self._response = resp 341 self._responses.append(resp) 342 343 def add_results(self, results: Dict): 344 self._results.update(results) 345 if 'response' in results: 346 self.add_response(results['response']) 347 348 def add_assets(self, assets: List): 349 self._assets.extend(assets) 350 351 def check_exit_code(self, code: Union[int, bool]): 352 if code is True: 353 assert self.exit_code == 0, f'expected exit code {code}, '\ 354 f'got {self.exit_code}\n{self.dump_logs()}' 355 elif code is False: 356 assert self.exit_code != 0, f'expected exit code {code}, '\ 357 f'got {self.exit_code}\n{self.dump_logs()}' 358 else: 359 assert self.exit_code == code, f'expected exit code {code}, '\ 360 f'got {self.exit_code}\n{self.dump_logs()}' 361 362 def check_response(self, http_status: Optional[int] = 200, 363 count: Optional[int] = 1, 364 protocol: Optional[str] = None, 365 exitcode: Optional[int] = 0, 366 connect_count: Optional[int] = None): 367 if exitcode: 368 self.check_exit_code(exitcode) 369 if self.with_stats and isinstance(exitcode, int): 370 for idx, x in enumerate(self.stats): 371 if 'exitcode' in x: 372 assert int(x['exitcode']) == exitcode, \ 373 f'response #{idx} exitcode: expected {exitcode}, '\ 374 f'got {x["exitcode"]}\n{self.dump_logs()}' 375 376 if self.with_stats: 377 assert len(self.stats) == count, \ 378 f'response count: expected {count}, ' \ 379 f'got {len(self.stats)}\n{self.dump_logs()}' 380 else: 381 assert len(self.responses) == count, \ 382 f'response count: expected {count}, ' \ 383 f'got {len(self.responses)}\n{self.dump_logs()}' 384 if http_status is not None: 385 if self.with_stats: 386 for idx, x in enumerate(self.stats): 387 assert 'http_code' in x, \ 388 f'response #{idx} reports no http_code\n{self.dump_stat(x)}' 389 assert x['http_code'] == http_status, \ 390 f'response #{idx} http_code: expected {http_status}, '\ 391 f'got {x["http_code"]}\n{self.dump_stat(x)}' 392 else: 393 for idx, x in enumerate(self.responses): 394 assert x['status'] == http_status, \ 395 f'response #{idx} status: expected {http_status},'\ 396 f'got {x["status"]}\n{self.dump_stat(x)}' 397 if protocol is not None: 398 if self.with_stats: 399 http_version = None 400 if protocol == 'HTTP/1.1': 401 http_version = '1.1' 402 elif protocol == 'HTTP/2': 403 http_version = '2' 404 elif protocol == 'HTTP/3': 405 http_version = '3' 406 if http_version is not None: 407 for idx, x in enumerate(self.stats): 408 assert x['http_version'] == http_version, \ 409 f'response #{idx} protocol: expected http/{http_version},' \ 410 f'got version {x["http_version"]}\n{self.dump_stat(x)}' 411 else: 412 for idx, x in enumerate(self.responses): 413 assert x['protocol'] == protocol, \ 414 f'response #{idx} protocol: expected {protocol},'\ 415 f'got {x["protocol"]}\n{self.dump_logs()}' 416 if connect_count is not None: 417 assert self.total_connects == connect_count, \ 418 f'expected {connect_count}, but {self.total_connects} '\ 419 f'were made\n{self.dump_logs()}' 420 421 def check_stats(self, count: int, http_status: Optional[int] = None, 422 exitcode: Optional[Union[int, List[int]]] = None, 423 remote_port: Optional[int] = None, 424 remote_ip: Optional[str] = None): 425 if exitcode is None: 426 self.check_exit_code(0) 427 elif isinstance(exitcode, int): 428 exitcode = [exitcode] 429 assert len(self.stats) == count, \ 430 f'stats count: expected {count}, got {len(self.stats)}\n{self.dump_logs()}' 431 if http_status is not None: 432 for idx, x in enumerate(self.stats): 433 assert 'http_code' in x, \ 434 f'status #{idx} reports no http_code\n{self.dump_stat(x)}' 435 assert x['http_code'] == http_status, \ 436 f'status #{idx} http_code: expected {http_status}, '\ 437 f'got {x["http_code"]}\n{self.dump_stat(x)}' 438 if exitcode is not None: 439 for idx, x in enumerate(self.stats): 440 if 'exitcode' in x: 441 assert x['exitcode'] in exitcode, \ 442 f'status #{idx} exitcode: expected {exitcode}, '\ 443 f'got {x["exitcode"]}\n{self.dump_stat(x)}' 444 if remote_port is not None: 445 for idx, x in enumerate(self.stats): 446 assert 'remote_port' in x, f'remote_port missing\n{self.dump_stat(x)}' 447 assert x['remote_port'] == remote_port, \ 448 f'status #{idx} remote_port: expected {remote_port}, '\ 449 f'got {x["remote_port"]}\n{self.dump_stat(x)}' 450 if remote_ip is not None: 451 for idx, x in enumerate(self.stats): 452 assert 'remote_ip' in x, f'remote_ip missing\n{self.dump_stat(x)}' 453 assert x['remote_ip'] == remote_ip, \ 454 f'status #{idx} remote_ip: expected {remote_ip}, '\ 455 f'got {x["remote_ip"]}\n{self.dump_stat(x)}' 456 457 def dump_logs(self): 458 lines = ['>>--stdout ----------------------------------------------\n'] 459 lines.extend(self._stdout) 460 lines.append('>>--stderr ----------------------------------------------\n') 461 lines.extend(self._stderr) 462 lines.append('<<-------------------------------------------------------\n') 463 return ''.join(lines) 464 465 def dump_stat(self, x): 466 lines = [ 467 'json stat from curl:', 468 json.JSONEncoder(indent=2).encode(x), 469 ] 470 if 'xfer_id' in x: 471 xfer_id = x['xfer_id'] 472 lines.append(f'>>--xfer {xfer_id} trace:\n') 473 lines.extend(self.xfer_trace_for(xfer_id)) 474 else: 475 lines.append('>>--full trace-------------------------------------------\n') 476 lines.extend(self._stderr) 477 lines.append('<<-------------------------------------------------------\n') 478 return ''.join(lines) 479 480 def xfer_trace_for(self, xfer_id) -> List[str]: 481 pat = re.compile(f'^[^[]* \\[{xfer_id}-.*$') 482 return [line for line in self._stderr if pat.match(line)] 483 484 485 class CurlClient: 486 487 ALPN_ARG = { 488 'http/0.9': '--http0.9', 489 'http/1.0': '--http1.0', 490 'http/1.1': '--http1.1', 491 'h2': '--http2', 492 'h2c': '--http2', 493 'h3': '--http3-only', 494 } 495 496 def __init__(self, env: Env, 497 run_dir: Optional[str] = None, 498 timeout: Optional[float] = None, 499 silent: bool = False, 500 run_env: Optional[Dict[str, str]] = None, 501 server_addr: Optional[str] = None, 502 with_dtrace: bool = False, 503 with_flame: bool = False): 504 self.env = env 505 self._timeout = timeout if timeout else env.test_timeout 506 self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl 507 self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, 'curl') 508 self._stdoutfile = f'{self._run_dir}/curl.stdout' 509 self._stderrfile = f'{self._run_dir}/curl.stderr' 510 self._headerfile = f'{self._run_dir}/curl.headers' 511 self._log_path = f'{self._run_dir}/curl.log' 512 self._with_dtrace = with_dtrace 513 self._with_flame = with_flame 514 if self._with_flame: 515 self._with_dtrace = True 516 self._silent = silent 517 self._run_env = run_env 518 self._server_addr = server_addr if server_addr else '127.0.0.1' 519 self._rmrf(self._run_dir) 520 self._mkpath(self._run_dir) 521 522 @property 523 def run_dir(self) -> str: 524 return self._run_dir 525 526 def download_file(self, i: int) -> str: 527 return os.path.join(self.run_dir, f'download_{i}.data') 528 529 def _rmf(self, path): 530 if os.path.exists(path): 531 return os.remove(path) 532 533 def _rmrf(self, path): 534 if os.path.exists(path): 535 return shutil.rmtree(path) 536 537 def _mkpath(self, path): 538 if not os.path.exists(path): 539 return os.makedirs(path) 540 541 def get_proxy_args(self, proto: str = 'http/1.1', 542 proxys: bool = True, tunnel: bool = False, 543 use_ip: bool = False): 544 proxy_name = self._server_addr if use_ip else self.env.proxy_domain 545 if proxys: 546 pport = self.env.pts_port(proto) if tunnel else self.env.proxys_port 547 xargs = [ 548 '--proxy', f'https://{proxy_name}:{pport}/', 549 '--resolve', f'{proxy_name}:{pport}:{self._server_addr}', 550 '--proxy-cacert', self.env.ca.cert_file, 551 ] 552 if proto == 'h2': 553 xargs.append('--proxy-http2') 554 else: 555 xargs = [ 556 '--proxy', f'http://{proxy_name}:{self.env.proxy_port}/', 557 '--resolve', f'{proxy_name}:{self.env.proxy_port}:{self._server_addr}', 558 ] 559 if tunnel: 560 xargs.append('--proxytunnel') 561 return xargs 562 563 def http_get(self, url: str, extra_args: Optional[List[str]] = None, 564 alpn_proto: Optional[str] = None, 565 def_tracing: bool = True, 566 with_stats: bool = False, 567 with_profile: bool = False, 568 with_tcpdump: bool = False): 569 return self._raw(url, options=extra_args, 570 with_stats=with_stats, 571 alpn_proto=alpn_proto, 572 def_tracing=def_tracing, 573 with_profile=with_profile, 574 with_tcpdump=with_tcpdump) 575 576 def http_download(self, urls: List[str], 577 alpn_proto: Optional[str] = None, 578 with_stats: bool = True, 579 with_headers: bool = False, 580 with_profile: bool = False, 581 with_tcpdump: bool = False, 582 no_save: bool = False, 583 extra_args: Optional[List[str]] = None): 584 if extra_args is None: 585 extra_args = [] 586 if no_save: 587 extra_args.extend([ 588 '-o', '/dev/null', 589 ]) 590 else: 591 extra_args.extend([ 592 '-o', 'download_#1.data', 593 ]) 594 # remove any existing ones 595 for i in range(100): 596 self._rmf(self.download_file(i)) 597 if with_stats: 598 extra_args.extend([ 599 '-w', '%{json}\\n' 600 ]) 601 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 602 with_stats=with_stats, 603 with_headers=with_headers, 604 with_profile=with_profile, 605 with_tcpdump=with_tcpdump) 606 607 def http_upload(self, urls: List[str], data: str, 608 alpn_proto: Optional[str] = None, 609 with_stats: bool = True, 610 with_headers: bool = False, 611 with_profile: bool = False, 612 with_tcpdump: bool = False, 613 extra_args: Optional[List[str]] = None): 614 if extra_args is None: 615 extra_args = [] 616 extra_args.extend([ 617 '--data-binary', data, '-o', 'download_#1.data', 618 ]) 619 if with_stats: 620 extra_args.extend([ 621 '-w', '%{json}\\n' 622 ]) 623 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 624 with_stats=with_stats, 625 with_headers=with_headers, 626 with_profile=with_profile, 627 with_tcpdump=with_tcpdump) 628 629 def http_delete(self, urls: List[str], 630 alpn_proto: Optional[str] = None, 631 with_stats: bool = True, 632 with_profile: bool = False, 633 extra_args: Optional[List[str]] = None): 634 if extra_args is None: 635 extra_args = [] 636 extra_args.extend([ 637 '-X', 'DELETE', '-o', '/dev/null', 638 ]) 639 if with_stats: 640 extra_args.extend([ 641 '-w', '%{json}\\n' 642 ]) 643 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 644 with_stats=with_stats, 645 with_headers=False, 646 with_profile=with_profile) 647 648 def http_put(self, urls: List[str], data=None, fdata=None, 649 alpn_proto: Optional[str] = None, 650 with_stats: bool = True, 651 with_headers: bool = False, 652 with_profile: bool = False, 653 extra_args: Optional[List[str]] = None): 654 if extra_args is None: 655 extra_args = [] 656 if fdata is not None: 657 extra_args.extend(['-T', fdata]) 658 elif data is not None: 659 extra_args.extend(['-T', '-']) 660 extra_args.extend([ 661 '-o', 'download_#1.data', 662 ]) 663 if with_stats: 664 extra_args.extend([ 665 '-w', '%{json}\\n' 666 ]) 667 return self._raw(urls, intext=data, 668 alpn_proto=alpn_proto, options=extra_args, 669 with_stats=with_stats, 670 with_headers=with_headers, 671 with_profile=with_profile) 672 673 def http_form(self, urls: List[str], form: Dict[str, str], 674 alpn_proto: Optional[str] = None, 675 with_stats: bool = True, 676 with_headers: bool = False, 677 extra_args: Optional[List[str]] = None): 678 if extra_args is None: 679 extra_args = [] 680 for key, val in form.items(): 681 extra_args.extend(['-F', f'{key}={val}']) 682 extra_args.extend([ 683 '-o', 'download_#1.data', 684 ]) 685 if with_stats: 686 extra_args.extend([ 687 '-w', '%{json}\\n' 688 ]) 689 return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, 690 with_stats=with_stats, 691 with_headers=with_headers) 692 693 def ftp_get(self, urls: List[str], 694 with_stats: bool = True, 695 with_profile: bool = False, 696 with_tcpdump: bool = False, 697 no_save: bool = False, 698 extra_args: Optional[List[str]] = None): 699 if extra_args is None: 700 extra_args = [] 701 if no_save: 702 extra_args.extend([ 703 '-o', '/dev/null', 704 ]) 705 else: 706 extra_args.extend([ 707 '-o', 'download_#1.data', 708 ]) 709 # remove any existing ones 710 for i in range(100): 711 self._rmf(self.download_file(i)) 712 if with_stats: 713 extra_args.extend([ 714 '-w', '%{json}\\n' 715 ]) 716 return self._raw(urls, options=extra_args, 717 with_stats=with_stats, 718 with_headers=False, 719 with_profile=with_profile, 720 with_tcpdump=with_tcpdump) 721 722 def ftp_ssl_get(self, urls: List[str], 723 with_stats: bool = True, 724 with_profile: bool = False, 725 with_tcpdump: bool = False, 726 no_save: bool = False, 727 extra_args: Optional[List[str]] = None): 728 if extra_args is None: 729 extra_args = [] 730 extra_args.extend([ 731 '--ssl-reqd', 732 ]) 733 return self.ftp_get(urls=urls, with_stats=with_stats, 734 with_profile=with_profile, no_save=no_save, 735 with_tcpdump=with_tcpdump, 736 extra_args=extra_args) 737 738 def ftp_upload(self, urls: List[str], 739 fupload: Optional[Any] = None, 740 updata: Optional[str] = None, 741 with_stats: bool = True, 742 with_profile: bool = False, 743 with_tcpdump: bool = False, 744 extra_args: Optional[List[str]] = None): 745 if extra_args is None: 746 extra_args = [] 747 if fupload is not None: 748 extra_args.extend([ 749 '--upload-file', fupload 750 ]) 751 elif updata is not None: 752 extra_args.extend([ 753 '--upload-file', '-' 754 ]) 755 else: 756 raise Exception('need either file or data to upload') 757 if with_stats: 758 extra_args.extend([ 759 '-w', '%{json}\\n' 760 ]) 761 return self._raw(urls, options=extra_args, 762 intext=updata, 763 with_stats=with_stats, 764 with_headers=False, 765 with_profile=with_profile, 766 with_tcpdump=with_tcpdump) 767 768 def ftp_ssl_upload(self, urls: List[str], 769 fupload: Optional[Any] = None, 770 updata: Optional[str] = None, 771 with_stats: bool = True, 772 with_profile: bool = False, 773 with_tcpdump: bool = False, 774 extra_args: Optional[List[str]] = None): 775 if extra_args is None: 776 extra_args = [] 777 extra_args.extend([ 778 '--ssl-reqd', 779 ]) 780 return self.ftp_upload(urls=urls, fupload=fupload, updata=updata, 781 with_stats=with_stats, with_profile=with_profile, 782 with_tcpdump=with_tcpdump, 783 extra_args=extra_args) 784 785 def response_file(self, idx: int): 786 return os.path.join(self._run_dir, f'download_{idx}.data') 787 788 def run_direct(self, args, with_stats: bool = False, with_profile: bool = False): 789 my_args = [self._curl] 790 if with_stats: 791 my_args.extend([ 792 '-w', '%{json}\\n' 793 ]) 794 my_args.extend([ 795 '-o', 'download.data', 796 ]) 797 my_args.extend(args) 798 return self._run(args=my_args, with_stats=with_stats, with_profile=with_profile) 799 800 def _run(self, args, intext='', with_stats: bool = False, 801 with_profile: bool = True, with_tcpdump: bool = False): 802 self._rmf(self._stdoutfile) 803 self._rmf(self._stderrfile) 804 self._rmf(self._headerfile) 805 exception = None 806 profile = None 807 tcpdump = None 808 dtrace = None 809 if with_tcpdump: 810 tcpdump = RunTcpDump(self.env, self._run_dir) 811 tcpdump.start() 812 started_at = datetime.now() 813 try: 814 with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr: 815 if with_profile: 816 end_at = started_at + timedelta(seconds=self._timeout) \ 817 if self._timeout else None 818 log.info(f'starting: {args}') 819 p = subprocess.Popen(args, stderr=cerr, stdout=cout, 820 cwd=self._run_dir, shell=False, 821 env=self._run_env) 822 profile = RunProfile(p.pid, started_at, self._run_dir) 823 if intext is not None and False: 824 p.communicate(input=intext.encode(), timeout=1) 825 if self._with_dtrace: 826 dtrace = DTraceProfile(p.pid, self._run_dir) 827 dtrace.start() 828 ptimeout = 0.0 829 while True: 830 try: 831 p.wait(timeout=ptimeout) 832 break 833 except subprocess.TimeoutExpired: 834 if end_at and datetime.now() >= end_at: 835 p.kill() 836 raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout) 837 profile.sample() 838 ptimeout = 0.01 839 exitcode = p.returncode 840 profile.finish() 841 log.info(f'done: exit={exitcode}, profile={profile}') 842 else: 843 p = subprocess.run(args, stderr=cerr, stdout=cout, 844 cwd=self._run_dir, shell=False, 845 input=intext.encode() if intext else None, 846 timeout=self._timeout, 847 env=self._run_env) 848 exitcode = p.returncode 849 except subprocess.TimeoutExpired: 850 now = datetime.now() 851 duration = now - started_at 852 log.warning(f'Timeout at {now} after {duration.total_seconds()}s ' 853 f'(configured {self._timeout}s): {args}') 854 exitcode = -1 855 exception = 'TimeoutExpired' 856 ended_at = datetime.now() 857 if tcpdump: 858 tcpdump.finish() 859 if dtrace: 860 dtrace.finish() 861 if self._with_flame and dtrace: 862 self._generate_flame(dtrace, args) 863 coutput = open(self._stdoutfile).readlines() 864 cerrput = open(self._stderrfile).readlines() 865 return ExecResult(args=args, exit_code=exitcode, exception=exception, 866 stdout=coutput, stderr=cerrput, 867 duration=ended_at - started_at, 868 with_stats=with_stats, 869 profile=profile, tcpdump=tcpdump) 870 871 def _raw(self, urls, intext='', timeout=None, options=None, insecure=False, 872 alpn_proto: Optional[str] = None, 873 force_resolve=True, 874 with_stats=False, 875 with_headers=True, 876 def_tracing=True, 877 with_profile=False, 878 with_tcpdump=False): 879 args = self._complete_args( 880 urls=urls, timeout=timeout, options=options, insecure=insecure, 881 alpn_proto=alpn_proto, force_resolve=force_resolve, 882 with_headers=with_headers, def_tracing=def_tracing) 883 r = self._run(args, intext=intext, with_stats=with_stats, 884 with_profile=with_profile, with_tcpdump=with_tcpdump) 885 if r.exit_code == 0 and with_headers: 886 self._parse_headerfile(self._headerfile, r=r) 887 return r 888 889 def _complete_args(self, urls, timeout=None, options=None, 890 insecure=False, force_resolve=True, 891 alpn_proto: Optional[str] = None, 892 with_headers: bool = True, 893 def_tracing: bool = True): 894 if not isinstance(urls, list): 895 urls = [urls] 896 897 args = [self._curl, "-s", "--path-as-is"] 898 if 'CURL_TEST_EVENT' in os.environ: 899 args.append('--test-event') 900 901 if with_headers: 902 args.extend(["-D", self._headerfile]) 903 if def_tracing is not False and not self._silent: 904 args.extend(['-v', '--trace-ids', '--trace-time']) 905 if self.env.verbose > 1: 906 args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy']) 907 908 active_options = options 909 if options is not None and '--next' in options: 910 active_options = options[options.index('--next') + 1:] 911 912 for url in urls: 913 u = urlparse(urls[0]) 914 if options: 915 args.extend(options) 916 if alpn_proto is not None: 917 if alpn_proto not in self.ALPN_ARG: 918 raise Exception(f'unknown ALPN protocol: "{alpn_proto}"') 919 args.append(self.ALPN_ARG[alpn_proto]) 920 921 if u.scheme == 'http': 922 pass 923 elif insecure: 924 args.append('--insecure') 925 elif active_options and "--cacert" in active_options: 926 pass 927 elif u.hostname: 928 args.extend(["--cacert", self.env.ca.cert_file]) 929 930 if force_resolve and u.hostname and u.hostname != 'localhost' \ 931 and not re.match(r'^(\d+|\[|:).*', u.hostname): 932 port = u.port if u.port else 443 933 args.extend([ 934 '--resolve', f'{u.hostname}:{port}:{self._server_addr}', 935 ]) 936 if timeout is not None and int(timeout) > 0: 937 args.extend(["--connect-timeout", str(int(timeout))]) 938 args.append(url) 939 return args 940 941 def _parse_headerfile(self, headerfile: str, r: Optional[ExecResult] = None) -> ExecResult: 942 lines = open(headerfile).readlines() 943 if r is None: 944 r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[]) 945 946 response = None 947 948 def fin_response(resp): 949 if resp: 950 r.add_response(resp) 951 952 expected = ['status'] 953 for line in lines: 954 line = line.strip() 955 if re.match(r'^$', line): 956 if 'trailer' in expected: 957 # end of trailers 958 fin_response(response) 959 response = None 960 expected = ['status'] 961 elif 'header' in expected: 962 # end of header, another status or trailers might follow 963 expected = ['status', 'trailer'] 964 else: 965 assert False, f"unexpected line: '{line}'" 966 continue 967 if 'status' in expected: 968 # log.debug("reading 1st response line: %s", line) 969 m = re.match(r'^(\S+) (\d+)( .*)?$', line) 970 if m: 971 fin_response(response) 972 response = { 973 "protocol": m.group(1), 974 "status": int(m.group(2)), 975 "description": m.group(3), 976 "header": {}, 977 "trailer": {}, 978 "body": r.outraw 979 } 980 expected = ['header'] 981 continue 982 if 'trailer' in expected: 983 m = re.match(r'^([^:]+):\s*(.*)$', line) 984 if m: 985 response['trailer'][m.group(1).lower()] = m.group(2) 986 continue 987 if 'header' in expected: 988 m = re.match(r'^([^:]+):\s*(.*)$', line) 989 if m: 990 response['header'][m.group(1).lower()] = m.group(2) 991 continue 992 assert False, f"unexpected line: '{line}, expected: {expected}'" 993 994 fin_response(response) 995 return r 996 997 def _generate_flame(self, dtrace: DTraceProfile, curl_args: List[str]): 998 log.info('generating flame graph from dtrace for this run') 999 if not os.path.exists(dtrace.file): 1000 raise Exception(f'dtrace output file does not exist: {dtrace.file}') 1001 if 'FLAMEGRAPH' not in os.environ: 1002 raise Exception('Env variable FLAMEGRAPH not set') 1003 fg_dir = os.environ['FLAMEGRAPH'] 1004 if not os.path.exists(fg_dir): 1005 raise Exception(f'FlameGraph directory not found: {fg_dir}') 1006 1007 fg_collapse = os.path.join(fg_dir, 'stackcollapse.pl') 1008 if not os.path.exists(fg_collapse): 1009 raise Exception(f'FlameGraph script not found: {fg_collapse}') 1010 1011 fg_gen_flame = os.path.join(fg_dir, 'flamegraph.pl') 1012 if not os.path.exists(fg_gen_flame): 1013 raise Exception(f'FlameGraph script not found: {fg_gen_flame}') 1014 1015 file_collapsed = f'{dtrace.file}.collapsed' 1016 file_svg = os.path.join(self._run_dir, 'curl.flamegraph.svg') 1017 file_err = os.path.join(self._run_dir, 'curl.flamegraph.stderr') 1018 log.info('waiting a sec for dtrace to finish flusheing its buffers') 1019 time.sleep(1) 1020 log.info(f'collapsing stacks into {file_collapsed}') 1021 with open(file_collapsed, 'w') as cout, open(file_err, 'w') as cerr: 1022 p = subprocess.run([ 1023 fg_collapse, dtrace.file 1024 ], stdout=cout, stderr=cerr, cwd=self._run_dir, shell=False) 1025 rc = p.returncode 1026 if rc != 0: 1027 raise Exception(f'{fg_collapse} returned error {rc}') 1028 log.info(f'generating graph into {file_svg}') 1029 cmdline = ' '.join(curl_args) 1030 if len(cmdline) > 80: 1031 title = f'{cmdline[:80]}...' 1032 subtitle = f'...{cmdline[-80:]}' 1033 else: 1034 title = cmdline 1035 subtitle = '' 1036 with open(file_svg, 'w') as cout, open(file_err, 'w') as cerr: 1037 p = subprocess.run([ 1038 fg_gen_flame, '--colors', 'green', 1039 '--title', title, '--subtitle', subtitle, 1040 file_collapsed 1041 ], stdout=cout, stderr=cerr, cwd=self._run_dir, shell=False) 1042 rc = p.returncode 1043 if rc != 0: 1044 raise Exception(f'{fg_gen_flame} returned error {rc}')