quickjs-tart

quickjs-based runtime for wallet-core logic
Log | Files | Refs | README | LICENSE

curl.py (40095B)


      1 #!/usr/bin/env python3
      2 # -*- coding: utf-8 -*-
      3 #***************************************************************************
      4 #                                  _   _ ____  _
      5 #  Project                     ___| | | |  _ \| |
      6 #                             / __| | | | |_) | |
      7 #                            | (__| |_| |  _ <| |___
      8 #                             \___|\___/|_| \_\_____|
      9 #
     10 # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
     11 #
     12 # This software is licensed as described in the file COPYING, which
     13 # you should have received as part of this distribution. The terms
     14 # are also available at https://curl.se/docs/copyright.html.
     15 #
     16 # You may opt to use, copy, modify, merge, publish, distribute and/or sell
     17 # copies of the Software, and permit persons to whom the Software is
     18 # furnished to do so, under the terms of the COPYING file.
     19 #
     20 # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
     21 # KIND, either express or implied.
     22 #
     23 # SPDX-License-Identifier: curl
     24 #
     25 ###########################################################################
     26 #
     27 import json
     28 import logging
     29 import os
     30 import sys
     31 import time
     32 from threading import Thread
     33 
     34 import psutil
     35 import re
     36 import shutil
     37 import subprocess
     38 from statistics import mean, fmean
     39 from datetime import timedelta, datetime
     40 from typing import List, Optional, Dict, Union, Any
     41 from urllib.parse import urlparse
     42 
     43 from .env import Env
     44 
     45 
     46 log = logging.getLogger(__name__)
     47 
     48 
     49 class RunProfile:
     50 
     51     STAT_KEYS = ['cpu', 'rss', 'vsz']
     52 
     53     @classmethod
     54     def AverageStats(cls, profiles: List['RunProfile']):
     55         avg = {}
     56         stats = [p.stats for p in profiles]
     57         for key in cls.STAT_KEYS:
     58             vals = [s[key] for s in stats]
     59             avg[key] = mean(vals) if len(vals) else 0.0
     60         return avg
     61 
     62     def __init__(self, pid: int, started_at: datetime, run_dir):
     63         self._pid = pid
     64         self._started_at = started_at
     65         self._duration = timedelta(seconds=0)
     66         self._run_dir = run_dir
     67         self._samples = []
     68         self._psu = None
     69         self._stats = None
     70 
     71     @property
     72     def duration(self) -> timedelta:
     73         return self._duration
     74 
     75     @property
     76     def stats(self) -> Optional[Dict[str,Any]]:
     77         return self._stats
     78 
     79     def sample(self):
     80         elapsed = datetime.now() - self._started_at
     81         try:
     82             if self._psu is None:
     83                 self._psu = psutil.Process(pid=self._pid)
     84             mem = self._psu.memory_info()
     85             self._samples.append({
     86                 'time': elapsed,
     87                 'cpu': self._psu.cpu_percent(),
     88                 'vsz': mem.vms,
     89                 'rss': mem.rss,
     90             })
     91         except psutil.NoSuchProcess:
     92             pass
     93 
     94     def finish(self):
     95         self._duration = datetime.now() - self._started_at
     96         if len(self._samples) > 0:
     97             weights = [s['time'].total_seconds() for s in self._samples]
     98             self._stats = {}
     99             for key in self.STAT_KEYS:
    100                 self._stats[key] = fmean([s[key] for s in self._samples], weights)
    101         else:
    102             self._stats = None
    103         self._psu = None
    104 
    105     def __repr__(self):
    106         return f'RunProfile[pid={self._pid}, '\
    107                f'duration={self.duration.total_seconds():.3f}s, '\
    108                f'stats={self.stats}]'
    109 
    110 
    111 class DTraceProfile:
    112 
    113     def __init__(self, pid: int, run_dir):
    114         self._pid = pid
    115         self._run_dir = run_dir
    116         self._proc = None
    117         self._rc = 0
    118         self._file = os.path.join(self._run_dir, 'curl.user_stacks')
    119 
    120     def start(self):
    121         if os.path.exists(self._file):
    122             os.remove(self._file)
    123         args = [
    124             'sudo', 'dtrace',
    125             '-x', 'ustackframes=100',
    126             '-n', f'profile-97 /pid == {self._pid}/ {{ @[ustack()] = count(); }} tick-60s {{ exit(0); }}',
    127             '-o', f'{self._file}'
    128         ]
    129         self._proc = subprocess.Popen(args, text=True, cwd=self._run_dir, shell=False)
    130         assert self._proc
    131 
    132     def finish(self):
    133         if self._proc:
    134             self._proc.terminate()
    135             self._rc = self._proc.returncode
    136 
    137     @property
    138     def file(self):
    139         return self._file
    140 
    141 
    142 class RunTcpDump:
    143 
    144     def __init__(self, env, run_dir):
    145         self._env = env
    146         self._run_dir = run_dir
    147         self._proc = None
    148         self._stdoutfile = os.path.join(self._run_dir, 'tcpdump.out')
    149         self._stderrfile = os.path.join(self._run_dir, 'tcpdump.err')
    150 
    151     def get_rsts(self, ports: List[int]|None = None) -> Optional[List[str]]:
    152         if self._proc:
    153             raise Exception('tcpdump still running')
    154         lines = []
    155         for line in open(self._stdoutfile):
    156             m = re.match(r'.* IP 127\.0\.0\.1\.(\d+) [<>] 127\.0\.0\.1\.(\d+):.*', line)
    157             if m:
    158                 sport = int(m.group(1))
    159                 dport = int(m.group(2))
    160                 if ports is None or sport in ports or dport in ports:
    161                     lines.append(line)
    162         return lines
    163 
    164     @property
    165     def stats(self) -> Optional[List[str]]:
    166         return self.get_rsts()
    167 
    168     @property
    169     def stderr(self) -> List[str]:
    170         if self._proc:
    171             raise Exception('tcpdump still running')
    172         return open(self._stderrfile).readlines()
    173 
    174     def sample(self):
    175         # not sure how to make that detection reliable for all platforms
    176         local_if = 'lo0' if sys.platform.startswith('darwin') else 'lo'
    177         try:
    178             tcpdump = self._env.tcpdump()
    179             if tcpdump is None:
    180                 raise Exception('tcpdump not available')
    181             # look with tcpdump for TCP RST packets which indicate
    182             # we did not shut down connections cleanly
    183             args = []
    184             # at least on Linux, we need root permissions to run tcpdump
    185             if sys.platform.startswith('linux'):
    186                 args.append('sudo')
    187             args.extend([
    188                 tcpdump, '-i', local_if, '-n', 'tcp[tcpflags] & (tcp-rst)!=0'
    189             ])
    190             with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
    191                 self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr,
    192                                               text=True, cwd=self._run_dir,
    193                                               shell=False)
    194                 assert self._proc
    195                 assert self._proc.returncode is None
    196                 while self._proc:
    197                     try:
    198                         self._proc.wait(timeout=1)
    199                     except subprocess.TimeoutExpired:
    200                         pass
    201         except Exception:
    202             log.exception('Tcpdump')
    203 
    204     def start(self):
    205         def do_sample():
    206             self.sample()
    207         t = Thread(target=do_sample)
    208         t.start()
    209 
    210     def finish(self):
    211         if self._proc:
    212             time.sleep(1)
    213             self._proc.terminate()
    214             self._proc = None
    215 
    216 
    217 class ExecResult:
    218 
    219     def __init__(self, args: List[str], exit_code: int,
    220                  stdout: List[str], stderr: List[str],
    221                  duration: Optional[timedelta] = None,
    222                  with_stats: bool = False,
    223                  exception: Optional[str] = None,
    224                  profile: Optional[RunProfile] = None,
    225                  tcpdump: Optional[RunTcpDump] = None):
    226         self._args = args
    227         self._exit_code = exit_code
    228         self._exception = exception
    229         self._stdout = stdout
    230         self._stderr = stderr
    231         self._profile = profile
    232         self._tcpdump = tcpdump
    233         self._duration = duration if duration is not None else timedelta()
    234         self._response = None
    235         self._responses = []
    236         self._results = {}
    237         self._assets = []
    238         self._stats = []
    239         self._json_out = None
    240         self._with_stats = with_stats
    241         if with_stats:
    242             self._parse_stats()
    243         else:
    244             # noinspection PyBroadException
    245             try:
    246                 out = ''.join(self._stdout)
    247                 self._json_out = json.loads(out)
    248             except:  # noqa: E722
    249                 pass
    250 
    251     def __repr__(self):
    252         return f"ExecResult[code={self.exit_code}, exception={self._exception}, "\
    253                f"args={self._args}, stdout={self._stdout}, stderr={self._stderr}]"
    254 
    255     def _parse_stats(self):
    256         self._stats = []
    257         for line in self._stdout:
    258             try:
    259                 self._stats.append(json.loads(line))
    260             # TODO: specify specific exceptions here
    261             except:  # noqa: E722
    262                 log.exception(f'not a JSON stat: {line}')
    263                 break
    264 
    265     @property
    266     def exit_code(self) -> int:
    267         return self._exit_code
    268 
    269     @property
    270     def args(self) -> List[str]:
    271         return self._args
    272 
    273     @property
    274     def outraw(self) -> bytes:
    275         return ''.join(self._stdout).encode()
    276 
    277     @property
    278     def stdout(self) -> str:
    279         return ''.join(self._stdout)
    280 
    281     @property
    282     def json(self) -> Optional[Dict]:
    283         """Output as JSON dictionary or None if not parseable."""
    284         return self._json_out
    285 
    286     @property
    287     def stderr(self) -> str:
    288         return ''.join(self._stderr)
    289 
    290     @property
    291     def trace_lines(self) -> List[str]:
    292         return self._stderr
    293 
    294     @property
    295     def duration(self) -> timedelta:
    296         return self._duration
    297 
    298     @property
    299     def profile(self) -> Optional[RunProfile]:
    300         return self._profile
    301 
    302     @property
    303     def tcpdump(self) -> Optional[RunTcpDump]:
    304         return self._tcpdump
    305 
    306     @property
    307     def response(self) -> Optional[Dict]:
    308         return self._response
    309 
    310     @property
    311     def responses(self) -> List[Dict]:
    312         return self._responses
    313 
    314     @property
    315     def results(self) -> Dict:
    316         return self._results
    317 
    318     @property
    319     def assets(self) -> List:
    320         return self._assets
    321 
    322     @property
    323     def with_stats(self) -> bool:
    324         return self._with_stats
    325 
    326     @property
    327     def stats(self) -> List:
    328         return self._stats
    329 
    330     @property
    331     def total_connects(self) -> Optional[int]:
    332         if len(self.stats):
    333             n = 0
    334             for stat in self.stats:
    335                 n += stat['num_connects']
    336             return n
    337         return None
    338 
    339     def add_response(self, resp: Dict):
    340         self._response = resp
    341         self._responses.append(resp)
    342 
    343     def add_results(self, results: Dict):
    344         self._results.update(results)
    345         if 'response' in results:
    346             self.add_response(results['response'])
    347 
    348     def add_assets(self, assets: List):
    349         self._assets.extend(assets)
    350 
    351     def check_exit_code(self, code: Union[int, bool]):
    352         if code is True:
    353             assert self.exit_code == 0, f'expected exit code {code}, '\
    354                                         f'got {self.exit_code}\n{self.dump_logs()}'
    355         elif code is False:
    356             assert self.exit_code != 0, f'expected exit code {code}, '\
    357                                                 f'got {self.exit_code}\n{self.dump_logs()}'
    358         else:
    359             assert self.exit_code == code, f'expected exit code {code}, '\
    360                                            f'got {self.exit_code}\n{self.dump_logs()}'
    361 
    362     def check_response(self, http_status: Optional[int] = 200,
    363                        count: Optional[int] = 1,
    364                        protocol: Optional[str] = None,
    365                        exitcode: Optional[int] = 0,
    366                        connect_count: Optional[int] = None):
    367         if exitcode:
    368             self.check_exit_code(exitcode)
    369             if self.with_stats and isinstance(exitcode, int):
    370                 for idx, x in enumerate(self.stats):
    371                     if 'exitcode' in x:
    372                         assert int(x['exitcode']) == exitcode, \
    373                             f'response #{idx} exitcode: expected {exitcode}, '\
    374                             f'got {x["exitcode"]}\n{self.dump_logs()}'
    375 
    376         if self.with_stats:
    377             assert len(self.stats) == count, \
    378                 f'response count: expected {count}, ' \
    379                 f'got {len(self.stats)}\n{self.dump_logs()}'
    380         else:
    381             assert len(self.responses) == count, \
    382                 f'response count: expected {count}, ' \
    383                 f'got {len(self.responses)}\n{self.dump_logs()}'
    384         if http_status is not None:
    385             if self.with_stats:
    386                 for idx, x in enumerate(self.stats):
    387                     assert 'http_code' in x, \
    388                         f'response #{idx} reports no http_code\n{self.dump_stat(x)}'
    389                     assert x['http_code'] == http_status, \
    390                         f'response #{idx} http_code: expected {http_status}, '\
    391                         f'got {x["http_code"]}\n{self.dump_stat(x)}'
    392             else:
    393                 for idx, x in enumerate(self.responses):
    394                     assert x['status'] == http_status, \
    395                         f'response #{idx} status: expected {http_status},'\
    396                         f'got {x["status"]}\n{self.dump_stat(x)}'
    397         if protocol is not None:
    398             if self.with_stats:
    399                 http_version = None
    400                 if protocol == 'HTTP/1.1':
    401                     http_version = '1.1'
    402                 elif protocol == 'HTTP/2':
    403                     http_version = '2'
    404                 elif protocol == 'HTTP/3':
    405                     http_version = '3'
    406                 if http_version is not None:
    407                     for idx, x in enumerate(self.stats):
    408                         assert x['http_version'] == http_version, \
    409                             f'response #{idx} protocol: expected http/{http_version},' \
    410                             f'got version {x["http_version"]}\n{self.dump_stat(x)}'
    411             else:
    412                 for idx, x in enumerate(self.responses):
    413                     assert x['protocol'] == protocol, \
    414                         f'response #{idx} protocol: expected {protocol},'\
    415                         f'got {x["protocol"]}\n{self.dump_logs()}'
    416         if connect_count is not None:
    417             assert self.total_connects == connect_count, \
    418                 f'expected {connect_count}, but {self.total_connects} '\
    419                 f'were made\n{self.dump_logs()}'
    420 
    421     def check_stats(self, count: int, http_status: Optional[int] = None,
    422                     exitcode: Optional[Union[int, List[int]]] = None,
    423                     remote_port: Optional[int] = None,
    424                     remote_ip: Optional[str] = None):
    425         if exitcode is None:
    426             self.check_exit_code(0)
    427         elif isinstance(exitcode, int):
    428             exitcode = [exitcode]
    429         assert len(self.stats) == count, \
    430             f'stats count: expected {count}, got {len(self.stats)}\n{self.dump_logs()}'
    431         if http_status is not None:
    432             for idx, x in enumerate(self.stats):
    433                 assert 'http_code' in x, \
    434                     f'status #{idx} reports no http_code\n{self.dump_stat(x)}'
    435                 assert x['http_code'] == http_status, \
    436                     f'status #{idx} http_code: expected {http_status}, '\
    437                     f'got {x["http_code"]}\n{self.dump_stat(x)}'
    438         if exitcode is not None:
    439             for idx, x in enumerate(self.stats):
    440                 if 'exitcode' in x:
    441                     assert x['exitcode'] in exitcode, \
    442                         f'status #{idx} exitcode: expected {exitcode}, '\
    443                         f'got {x["exitcode"]}\n{self.dump_stat(x)}'
    444         if remote_port is not None:
    445             for idx, x in enumerate(self.stats):
    446                 assert 'remote_port' in x, f'remote_port missing\n{self.dump_stat(x)}'
    447                 assert x['remote_port'] == remote_port, \
    448                         f'status #{idx} remote_port: expected {remote_port}, '\
    449                         f'got {x["remote_port"]}\n{self.dump_stat(x)}'
    450         if remote_ip is not None:
    451             for idx, x in enumerate(self.stats):
    452                 assert 'remote_ip' in x, f'remote_ip missing\n{self.dump_stat(x)}'
    453                 assert x['remote_ip'] == remote_ip, \
    454                         f'status #{idx} remote_ip: expected {remote_ip}, '\
    455                         f'got {x["remote_ip"]}\n{self.dump_stat(x)}'
    456 
    457     def dump_logs(self):
    458         lines = ['>>--stdout ----------------------------------------------\n']
    459         lines.extend(self._stdout)
    460         lines.append('>>--stderr ----------------------------------------------\n')
    461         lines.extend(self._stderr)
    462         lines.append('<<-------------------------------------------------------\n')
    463         return ''.join(lines)
    464 
    465     def dump_stat(self, x):
    466         lines = [
    467             'json stat from curl:',
    468             json.JSONEncoder(indent=2).encode(x),
    469         ]
    470         if 'xfer_id' in x:
    471             xfer_id = x['xfer_id']
    472             lines.append(f'>>--xfer {xfer_id} trace:\n')
    473             lines.extend(self.xfer_trace_for(xfer_id))
    474         else:
    475             lines.append('>>--full trace-------------------------------------------\n')
    476             lines.extend(self._stderr)
    477             lines.append('<<-------------------------------------------------------\n')
    478         return ''.join(lines)
    479 
    480     def xfer_trace_for(self, xfer_id) -> List[str]:
    481             pat = re.compile(f'^[^[]* \\[{xfer_id}-.*$')
    482             return [line for line in self._stderr if pat.match(line)]
    483 
    484 
    485 class CurlClient:
    486 
    487     ALPN_ARG = {
    488         'http/0.9': '--http0.9',
    489         'http/1.0': '--http1.0',
    490         'http/1.1': '--http1.1',
    491         'h2': '--http2',
    492         'h2c': '--http2',
    493         'h3': '--http3-only',
    494     }
    495 
    496     def __init__(self, env: Env,
    497                  run_dir: Optional[str] = None,
    498                  timeout: Optional[float] = None,
    499                  silent: bool = False,
    500                  run_env: Optional[Dict[str, str]] = None,
    501                  server_addr: Optional[str] = None,
    502                  with_dtrace: bool = False,
    503                  with_flame: bool = False):
    504         self.env = env
    505         self._timeout = timeout if timeout else env.test_timeout
    506         self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl
    507         self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, 'curl')
    508         self._stdoutfile = f'{self._run_dir}/curl.stdout'
    509         self._stderrfile = f'{self._run_dir}/curl.stderr'
    510         self._headerfile = f'{self._run_dir}/curl.headers'
    511         self._log_path = f'{self._run_dir}/curl.log'
    512         self._with_dtrace = with_dtrace
    513         self._with_flame = with_flame
    514         if self._with_flame:
    515             self._with_dtrace = True
    516         self._silent = silent
    517         self._run_env = run_env
    518         self._server_addr = server_addr if server_addr else '127.0.0.1'
    519         self._rmrf(self._run_dir)
    520         self._mkpath(self._run_dir)
    521 
    522     @property
    523     def run_dir(self) -> str:
    524         return self._run_dir
    525 
    526     def download_file(self, i: int) -> str:
    527         return os.path.join(self.run_dir, f'download_{i}.data')
    528 
    529     def _rmf(self, path):
    530         if os.path.exists(path):
    531             return os.remove(path)
    532 
    533     def _rmrf(self, path):
    534         if os.path.exists(path):
    535             return shutil.rmtree(path)
    536 
    537     def _mkpath(self, path):
    538         if not os.path.exists(path):
    539             return os.makedirs(path)
    540 
    541     def get_proxy_args(self, proto: str = 'http/1.1',
    542                        proxys: bool = True, tunnel: bool = False,
    543                        use_ip: bool = False):
    544         proxy_name = self._server_addr if use_ip else self.env.proxy_domain
    545         if proxys:
    546             pport = self.env.pts_port(proto) if tunnel else self.env.proxys_port
    547             xargs = [
    548                 '--proxy', f'https://{proxy_name}:{pport}/',
    549                 '--resolve', f'{proxy_name}:{pport}:{self._server_addr}',
    550                 '--proxy-cacert', self.env.ca.cert_file,
    551             ]
    552             if proto == 'h2':
    553                 xargs.append('--proxy-http2')
    554         else:
    555             xargs = [
    556                 '--proxy', f'http://{proxy_name}:{self.env.proxy_port}/',
    557                 '--resolve', f'{proxy_name}:{self.env.proxy_port}:{self._server_addr}',
    558             ]
    559         if tunnel:
    560             xargs.append('--proxytunnel')
    561         return xargs
    562 
    563     def http_get(self, url: str, extra_args: Optional[List[str]] = None,
    564                  alpn_proto: Optional[str] = None,
    565                  def_tracing: bool = True,
    566                  with_stats: bool = False,
    567                  with_profile: bool = False,
    568                  with_tcpdump: bool = False):
    569         return self._raw(url, options=extra_args,
    570                          with_stats=with_stats,
    571                          alpn_proto=alpn_proto,
    572                          def_tracing=def_tracing,
    573                          with_profile=with_profile,
    574                          with_tcpdump=with_tcpdump)
    575 
    576     def http_download(self, urls: List[str],
    577                       alpn_proto: Optional[str] = None,
    578                       with_stats: bool = True,
    579                       with_headers: bool = False,
    580                       with_profile: bool = False,
    581                       with_tcpdump: bool = False,
    582                       no_save: bool = False,
    583                       extra_args: Optional[List[str]] = None):
    584         if extra_args is None:
    585             extra_args = []
    586         if no_save:
    587             extra_args.extend([
    588                 '-o', '/dev/null',
    589             ])
    590         else:
    591             extra_args.extend([
    592                 '-o', 'download_#1.data',
    593             ])
    594         # remove any existing ones
    595         for i in range(100):
    596             self._rmf(self.download_file(i))
    597         if with_stats:
    598             extra_args.extend([
    599                 '-w', '%{json}\\n'
    600             ])
    601         return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
    602                          with_stats=with_stats,
    603                          with_headers=with_headers,
    604                          with_profile=with_profile,
    605                          with_tcpdump=with_tcpdump)
    606 
    607     def http_upload(self, urls: List[str], data: str,
    608                     alpn_proto: Optional[str] = None,
    609                     with_stats: bool = True,
    610                     with_headers: bool = False,
    611                     with_profile: bool = False,
    612                     with_tcpdump: bool = False,
    613                     extra_args: Optional[List[str]] = None):
    614         if extra_args is None:
    615             extra_args = []
    616         extra_args.extend([
    617             '--data-binary', data, '-o', 'download_#1.data',
    618         ])
    619         if with_stats:
    620             extra_args.extend([
    621                 '-w', '%{json}\\n'
    622             ])
    623         return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
    624                          with_stats=with_stats,
    625                          with_headers=with_headers,
    626                          with_profile=with_profile,
    627                          with_tcpdump=with_tcpdump)
    628 
    629     def http_delete(self, urls: List[str],
    630                     alpn_proto: Optional[str] = None,
    631                     with_stats: bool = True,
    632                     with_profile: bool = False,
    633                     extra_args: Optional[List[str]] = None):
    634         if extra_args is None:
    635             extra_args = []
    636         extra_args.extend([
    637             '-X', 'DELETE', '-o', '/dev/null',
    638         ])
    639         if with_stats:
    640             extra_args.extend([
    641                 '-w', '%{json}\\n'
    642             ])
    643         return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
    644                          with_stats=with_stats,
    645                          with_headers=False,
    646                          with_profile=with_profile)
    647 
    648     def http_put(self, urls: List[str], data=None, fdata=None,
    649                  alpn_proto: Optional[str] = None,
    650                  with_stats: bool = True,
    651                  with_headers: bool = False,
    652                  with_profile: bool = False,
    653                  extra_args: Optional[List[str]] = None):
    654         if extra_args is None:
    655             extra_args = []
    656         if fdata is not None:
    657             extra_args.extend(['-T', fdata])
    658         elif data is not None:
    659             extra_args.extend(['-T', '-'])
    660         extra_args.extend([
    661             '-o', 'download_#1.data',
    662         ])
    663         if with_stats:
    664             extra_args.extend([
    665                 '-w', '%{json}\\n'
    666             ])
    667         return self._raw(urls, intext=data,
    668                          alpn_proto=alpn_proto, options=extra_args,
    669                          with_stats=with_stats,
    670                          with_headers=with_headers,
    671                          with_profile=with_profile)
    672 
    673     def http_form(self, urls: List[str], form: Dict[str, str],
    674                   alpn_proto: Optional[str] = None,
    675                   with_stats: bool = True,
    676                   with_headers: bool = False,
    677                   extra_args: Optional[List[str]] = None):
    678         if extra_args is None:
    679             extra_args = []
    680         for key, val in form.items():
    681             extra_args.extend(['-F', f'{key}={val}'])
    682         extra_args.extend([
    683             '-o', 'download_#1.data',
    684         ])
    685         if with_stats:
    686             extra_args.extend([
    687                 '-w', '%{json}\\n'
    688             ])
    689         return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
    690                          with_stats=with_stats,
    691                          with_headers=with_headers)
    692 
    693     def ftp_get(self, urls: List[str],
    694                       with_stats: bool = True,
    695                       with_profile: bool = False,
    696                       with_tcpdump: bool = False,
    697                       no_save: bool = False,
    698                       extra_args: Optional[List[str]] = None):
    699         if extra_args is None:
    700             extra_args = []
    701         if no_save:
    702             extra_args.extend([
    703                 '-o', '/dev/null',
    704             ])
    705         else:
    706             extra_args.extend([
    707                 '-o', 'download_#1.data',
    708             ])
    709         # remove any existing ones
    710         for i in range(100):
    711             self._rmf(self.download_file(i))
    712         if with_stats:
    713             extra_args.extend([
    714                 '-w', '%{json}\\n'
    715             ])
    716         return self._raw(urls, options=extra_args,
    717                          with_stats=with_stats,
    718                          with_headers=False,
    719                          with_profile=with_profile,
    720                          with_tcpdump=with_tcpdump)
    721 
    722     def ftp_ssl_get(self, urls: List[str],
    723                       with_stats: bool = True,
    724                       with_profile: bool = False,
    725                       with_tcpdump: bool = False,
    726                       no_save: bool = False,
    727                       extra_args: Optional[List[str]] = None):
    728         if extra_args is None:
    729             extra_args = []
    730         extra_args.extend([
    731             '--ssl-reqd',
    732         ])
    733         return self.ftp_get(urls=urls, with_stats=with_stats,
    734                             with_profile=with_profile, no_save=no_save,
    735                             with_tcpdump=with_tcpdump,
    736                             extra_args=extra_args)
    737 
    738     def ftp_upload(self, urls: List[str],
    739                    fupload: Optional[Any] = None,
    740                    updata: Optional[str] = None,
    741                    with_stats: bool = True,
    742                    with_profile: bool = False,
    743                    with_tcpdump: bool = False,
    744                    extra_args: Optional[List[str]] = None):
    745         if extra_args is None:
    746             extra_args = []
    747         if fupload is not None:
    748             extra_args.extend([
    749                 '--upload-file', fupload
    750             ])
    751         elif updata is not None:
    752             extra_args.extend([
    753                 '--upload-file', '-'
    754             ])
    755         else:
    756             raise Exception('need either file or data to upload')
    757         if with_stats:
    758             extra_args.extend([
    759                 '-w', '%{json}\\n'
    760             ])
    761         return self._raw(urls, options=extra_args,
    762                          intext=updata,
    763                          with_stats=with_stats,
    764                          with_headers=False,
    765                          with_profile=with_profile,
    766                          with_tcpdump=with_tcpdump)
    767 
    768     def ftp_ssl_upload(self, urls: List[str],
    769                        fupload: Optional[Any] = None,
    770                        updata: Optional[str] = None,
    771                        with_stats: bool = True,
    772                        with_profile: bool = False,
    773                        with_tcpdump: bool = False,
    774                        extra_args: Optional[List[str]] = None):
    775         if extra_args is None:
    776             extra_args = []
    777         extra_args.extend([
    778             '--ssl-reqd',
    779         ])
    780         return self.ftp_upload(urls=urls, fupload=fupload, updata=updata,
    781                                with_stats=with_stats, with_profile=with_profile,
    782                                with_tcpdump=with_tcpdump,
    783                                extra_args=extra_args)
    784 
    785     def response_file(self, idx: int):
    786         return os.path.join(self._run_dir, f'download_{idx}.data')
    787 
    788     def run_direct(self, args, with_stats: bool = False, with_profile: bool = False):
    789         my_args = [self._curl]
    790         if with_stats:
    791             my_args.extend([
    792                 '-w', '%{json}\\n'
    793             ])
    794         my_args.extend([
    795             '-o', 'download.data',
    796         ])
    797         my_args.extend(args)
    798         return self._run(args=my_args, with_stats=with_stats, with_profile=with_profile)
    799 
    800     def _run(self, args, intext='', with_stats: bool = False,
    801              with_profile: bool = True, with_tcpdump: bool = False):
    802         self._rmf(self._stdoutfile)
    803         self._rmf(self._stderrfile)
    804         self._rmf(self._headerfile)
    805         exception = None
    806         profile = None
    807         tcpdump = None
    808         dtrace = None
    809         if with_tcpdump:
    810             tcpdump = RunTcpDump(self.env, self._run_dir)
    811             tcpdump.start()
    812         started_at = datetime.now()
    813         try:
    814             with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
    815                 if with_profile:
    816                     end_at = started_at + timedelta(seconds=self._timeout) \
    817                         if self._timeout else None
    818                     log.info(f'starting: {args}')
    819                     p = subprocess.Popen(args, stderr=cerr, stdout=cout,
    820                                          cwd=self._run_dir, shell=False,
    821                                          env=self._run_env)
    822                     profile = RunProfile(p.pid, started_at, self._run_dir)
    823                     if intext is not None and False:
    824                         p.communicate(input=intext.encode(), timeout=1)
    825                     if self._with_dtrace:
    826                         dtrace = DTraceProfile(p.pid, self._run_dir)
    827                         dtrace.start()
    828                     ptimeout = 0.0
    829                     while True:
    830                         try:
    831                             p.wait(timeout=ptimeout)
    832                             break
    833                         except subprocess.TimeoutExpired:
    834                             if end_at and datetime.now() >= end_at:
    835                                 p.kill()
    836                                 raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout)
    837                             profile.sample()
    838                             ptimeout = 0.01
    839                     exitcode = p.returncode
    840                     profile.finish()
    841                     log.info(f'done: exit={exitcode}, profile={profile}')
    842                 else:
    843                     p = subprocess.run(args, stderr=cerr, stdout=cout,
    844                                        cwd=self._run_dir, shell=False,
    845                                        input=intext.encode() if intext else None,
    846                                        timeout=self._timeout,
    847                                        env=self._run_env)
    848                     exitcode = p.returncode
    849         except subprocess.TimeoutExpired:
    850             now = datetime.now()
    851             duration = now - started_at
    852             log.warning(f'Timeout at {now} after {duration.total_seconds()}s '
    853                         f'(configured {self._timeout}s): {args}')
    854             exitcode = -1
    855             exception = 'TimeoutExpired'
    856         ended_at = datetime.now()
    857         if tcpdump:
    858             tcpdump.finish()
    859         if dtrace:
    860             dtrace.finish()
    861         if self._with_flame and dtrace:
    862             self._generate_flame(dtrace, args)
    863         coutput = open(self._stdoutfile).readlines()
    864         cerrput = open(self._stderrfile).readlines()
    865         return ExecResult(args=args, exit_code=exitcode, exception=exception,
    866                           stdout=coutput, stderr=cerrput,
    867                           duration=ended_at - started_at,
    868                           with_stats=with_stats,
    869                           profile=profile, tcpdump=tcpdump)
    870 
    871     def _raw(self, urls, intext='', timeout=None, options=None, insecure=False,
    872              alpn_proto: Optional[str] = None,
    873              force_resolve=True,
    874              with_stats=False,
    875              with_headers=True,
    876              def_tracing=True,
    877              with_profile=False,
    878              with_tcpdump=False):
    879         args = self._complete_args(
    880             urls=urls, timeout=timeout, options=options, insecure=insecure,
    881             alpn_proto=alpn_proto, force_resolve=force_resolve,
    882             with_headers=with_headers, def_tracing=def_tracing)
    883         r = self._run(args, intext=intext, with_stats=with_stats,
    884                       with_profile=with_profile, with_tcpdump=with_tcpdump)
    885         if r.exit_code == 0 and with_headers:
    886             self._parse_headerfile(self._headerfile, r=r)
    887         return r
    888 
    889     def _complete_args(self, urls, timeout=None, options=None,
    890                        insecure=False, force_resolve=True,
    891                        alpn_proto: Optional[str] = None,
    892                        with_headers: bool = True,
    893                        def_tracing: bool = True):
    894         if not isinstance(urls, list):
    895             urls = [urls]
    896 
    897         args = [self._curl, "-s", "--path-as-is"]
    898         if 'CURL_TEST_EVENT' in os.environ:
    899             args.append('--test-event')
    900 
    901         if with_headers:
    902             args.extend(["-D", self._headerfile])
    903         if def_tracing is not False and not self._silent:
    904             args.extend(['-v', '--trace-ids', '--trace-time'])
    905             if self.env.verbose > 1:
    906                 args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy'])
    907 
    908         active_options = options
    909         if options is not None and '--next' in options:
    910             active_options = options[options.index('--next') + 1:]
    911 
    912         for url in urls:
    913             u = urlparse(urls[0])
    914             if options:
    915                 args.extend(options)
    916             if alpn_proto is not None:
    917                 if alpn_proto not in self.ALPN_ARG:
    918                     raise Exception(f'unknown ALPN protocol: "{alpn_proto}"')
    919                 args.append(self.ALPN_ARG[alpn_proto])
    920 
    921             if u.scheme == 'http':
    922                 pass
    923             elif insecure:
    924                 args.append('--insecure')
    925             elif active_options and "--cacert" in active_options:
    926                 pass
    927             elif u.hostname:
    928                 args.extend(["--cacert", self.env.ca.cert_file])
    929 
    930             if force_resolve and u.hostname and u.hostname != 'localhost' \
    931                     and not re.match(r'^(\d+|\[|:).*', u.hostname):
    932                 port = u.port if u.port else 443
    933                 args.extend([
    934                     '--resolve', f'{u.hostname}:{port}:{self._server_addr}',
    935                 ])
    936             if timeout is not None and int(timeout) > 0:
    937                 args.extend(["--connect-timeout", str(int(timeout))])
    938             args.append(url)
    939         return args
    940 
    941     def _parse_headerfile(self, headerfile: str, r: Optional[ExecResult] = None) -> ExecResult:
    942         lines = open(headerfile).readlines()
    943         if r is None:
    944             r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[])
    945 
    946         response = None
    947 
    948         def fin_response(resp):
    949             if resp:
    950                 r.add_response(resp)
    951 
    952         expected = ['status']
    953         for line in lines:
    954             line = line.strip()
    955             if re.match(r'^$', line):
    956                 if 'trailer' in expected:
    957                     # end of trailers
    958                     fin_response(response)
    959                     response = None
    960                     expected = ['status']
    961                 elif 'header' in expected:
    962                     # end of header, another status or trailers might follow
    963                     expected = ['status', 'trailer']
    964                 else:
    965                     assert False, f"unexpected line: '{line}'"
    966                 continue
    967             if 'status' in expected:
    968                 # log.debug("reading 1st response line: %s", line)
    969                 m = re.match(r'^(\S+) (\d+)( .*)?$', line)
    970                 if m:
    971                     fin_response(response)
    972                     response = {
    973                         "protocol": m.group(1),
    974                         "status": int(m.group(2)),
    975                         "description": m.group(3),
    976                         "header": {},
    977                         "trailer": {},
    978                         "body": r.outraw
    979                     }
    980                     expected = ['header']
    981                     continue
    982             if 'trailer' in expected:
    983                 m = re.match(r'^([^:]+):\s*(.*)$', line)
    984                 if m:
    985                     response['trailer'][m.group(1).lower()] = m.group(2)
    986                     continue
    987             if 'header' in expected:
    988                 m = re.match(r'^([^:]+):\s*(.*)$', line)
    989                 if m:
    990                     response['header'][m.group(1).lower()] = m.group(2)
    991                     continue
    992             assert False, f"unexpected line: '{line}, expected: {expected}'"
    993 
    994         fin_response(response)
    995         return r
    996 
    997     def _generate_flame(self, dtrace: DTraceProfile, curl_args: List[str]):
    998         log.info('generating flame graph from dtrace for this run')
    999         if not os.path.exists(dtrace.file):
   1000             raise Exception(f'dtrace output file does not exist: {dtrace.file}')
   1001         if 'FLAMEGRAPH' not in os.environ:
   1002             raise Exception('Env variable FLAMEGRAPH not set')
   1003         fg_dir = os.environ['FLAMEGRAPH']
   1004         if not os.path.exists(fg_dir):
   1005             raise Exception(f'FlameGraph directory not found: {fg_dir}')
   1006 
   1007         fg_collapse = os.path.join(fg_dir, 'stackcollapse.pl')
   1008         if not os.path.exists(fg_collapse):
   1009             raise Exception(f'FlameGraph script not found: {fg_collapse}')
   1010 
   1011         fg_gen_flame = os.path.join(fg_dir, 'flamegraph.pl')
   1012         if not os.path.exists(fg_gen_flame):
   1013             raise Exception(f'FlameGraph script not found: {fg_gen_flame}')
   1014 
   1015         file_collapsed = f'{dtrace.file}.collapsed'
   1016         file_svg = os.path.join(self._run_dir, 'curl.flamegraph.svg')
   1017         file_err = os.path.join(self._run_dir, 'curl.flamegraph.stderr')
   1018         log.info('waiting a sec for dtrace to finish flusheing its buffers')
   1019         time.sleep(1)
   1020         log.info(f'collapsing stacks into {file_collapsed}')
   1021         with open(file_collapsed, 'w') as cout, open(file_err, 'w') as cerr:
   1022             p = subprocess.run([
   1023                 fg_collapse, dtrace.file
   1024             ], stdout=cout, stderr=cerr, cwd=self._run_dir, shell=False)
   1025             rc = p.returncode
   1026             if rc != 0:
   1027                 raise Exception(f'{fg_collapse} returned error {rc}')
   1028         log.info(f'generating graph into {file_svg}')
   1029         cmdline = ' '.join(curl_args)
   1030         if len(cmdline) > 80:
   1031             title = f'{cmdline[:80]}...'
   1032             subtitle = f'...{cmdline[-80:]}'
   1033         else:
   1034             title = cmdline
   1035             subtitle = ''
   1036         with open(file_svg, 'w') as cout, open(file_err, 'w') as cerr:
   1037             p = subprocess.run([
   1038                 fg_gen_flame, '--colors', 'green',
   1039                 '--title', title, '--subtitle', subtitle,
   1040                 file_collapsed
   1041             ], stdout=cout, stderr=cerr, cwd=self._run_dir, shell=False)
   1042             rc = p.returncode
   1043             if rc != 0:
   1044                 raise Exception(f'{fg_gen_flame} returned error {rc}')