quickjs-tart

quickjs-based runtime for wallet-core logic
Log | Files | Refs | README | LICENSE

env.py (24085B)


      1 #!/usr/bin/env python3
      2 # -*- coding: utf-8 -*-
      3 #***************************************************************************
      4 #                                  _   _ ____  _
      5 #  Project                     ___| | | |  _ \| |
      6 #                             / __| | | | |_) | |
      7 #                            | (__| |_| |  _ <| |___
      8 #                             \___|\___/|_| \_\_____|
      9 #
     10 # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
     11 #
     12 # This software is licensed as described in the file COPYING, which
     13 # you should have received as part of this distribution. The terms
     14 # are also available at https://curl.se/docs/copyright.html.
     15 #
     16 # You may opt to use, copy, modify, merge, publish, distribute and/or sell
     17 # copies of the Software, and permit persons to whom the Software is
     18 # furnished to do so, under the terms of the COPYING file.
     19 #
     20 # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
     21 # KIND, either express or implied.
     22 #
     23 # SPDX-License-Identifier: curl
     24 #
     25 ###########################################################################
     26 #
     27 import gzip
     28 import logging
     29 import os
     30 import re
     31 import shutil
     32 import subprocess
     33 import tempfile
     34 from configparser import ConfigParser, ExtendedInterpolation
     35 from datetime import timedelta
     36 from typing import Optional, Dict, List
     37 
     38 import pytest
     39 from filelock import FileLock
     40 
     41 from .certs import CertificateSpec, Credentials, TestCA
     42 
     43 
     44 log = logging.getLogger(__name__)
     45 
     46 
     47 def init_config_from(conf_path):
     48     if os.path.isfile(conf_path):
     49         config = ConfigParser(interpolation=ExtendedInterpolation())
     50         config.read(conf_path)
     51         return config
     52     return None
     53 
     54 
     55 TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__))
     56 PROJ_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH))
     57 TOP_PATH = os.path.join(os.getcwd(), os.path.pardir)
     58 CONFIG_PATH = os.path.join(TOP_PATH, 'tests', 'http', 'config.ini')
     59 if not os.path.exists(CONFIG_PATH):
     60     ALT_CONFIG_PATH = os.path.join(PROJ_PATH, 'tests', 'http', 'config.ini')
     61     if not os.path.exists(ALT_CONFIG_PATH):
     62         raise Exception(f'unable to find config.ini in {CONFIG_PATH} nor {ALT_CONFIG_PATH}')
     63     TOP_PATH = PROJ_PATH
     64     CONFIG_PATH = ALT_CONFIG_PATH
     65 DEF_CONFIG = init_config_from(CONFIG_PATH)
     66 CURL = os.path.join(TOP_PATH, 'src', 'curl')
     67 
     68 
     69 class NghttpxUtil:
     70 
     71     CMD = None
     72     VERSION_FULL = None
     73 
     74     @classmethod
     75     def version(cls, cmd):
     76         if cmd is None:
     77             return None
     78         if cls.VERSION_FULL is None or cmd != cls.CMD:
     79             p = subprocess.run(args=[cmd, '--version'],
     80                                capture_output=True, text=True)
     81             if p.returncode != 0:
     82                 raise RuntimeError(f'{cmd} --version failed with exit code: {p.returncode}')
     83             cls.CMD = cmd
     84             for line in p.stdout.splitlines(keepends=False):
     85                 if line.startswith('nghttpx '):
     86                     cls.VERSION_FULL = line
     87             if cls.VERSION_FULL is None:
     88                 raise RuntimeError(f'{cmd}: unable to determine version')
     89         return cls.VERSION_FULL
     90 
     91     @staticmethod
     92     def version_with_h3(version):
     93         return re.match(r'.* ngtcp2/\d+\.\d+\.\d+.*', version) is not None
     94 
     95 
     96 class EnvConfig:
     97 
     98     def __init__(self, pytestconfig: Optional[pytest.Config] = None,
     99                  testrun_uid=None,
    100                  worker_id=None):
    101         self.pytestconfig = pytestconfig
    102         self.testrun_uid = testrun_uid
    103         self.worker_id = worker_id if worker_id is not None else 'master'
    104         self.tests_dir = TESTS_HTTPD_PATH
    105         self.gen_root = self.gen_dir = os.path.join(self.tests_dir, 'gen')
    106         if self.worker_id != 'master':
    107             self.gen_dir = os.path.join(self.gen_dir, self.worker_id)
    108         self.project_dir = os.path.dirname(os.path.dirname(self.tests_dir))
    109         self.build_dir = TOP_PATH
    110         self.config = DEF_CONFIG
    111         # check cur and its features
    112         self.curl = CURL
    113         if 'CURL' in os.environ:
    114             self.curl = os.environ['CURL']
    115         self.curl_props = {
    116             'version_string': '',
    117             'version': '',
    118             'os': '',
    119             'fullname': '',
    120             'features_string': '',
    121             'features': set(),
    122             'protocols_string': '',
    123             'protocols': set(),
    124             'libs': set(),
    125             'lib_versions': set(),
    126         }
    127         self.curl_is_debug = False
    128         self.curl_protos = []
    129         p = subprocess.run(args=[self.curl, '-V'],
    130                            capture_output=True, text=True)
    131         if p.returncode != 0:
    132             raise RuntimeError(f'{self.curl} -V failed with exit code: {p.returncode}')
    133         if p.stderr.startswith('WARNING:'):
    134             self.curl_is_debug = True
    135         for line in p.stdout.splitlines(keepends=False):
    136             if line.startswith('curl '):
    137                 self.curl_props['version_string'] = line
    138                 m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', line)
    139                 if m:
    140                     self.curl_props['fullname'] = m.group(0)
    141                     self.curl_props['version'] = m.group('version')
    142                     self.curl_props['os'] = m.group('os')
    143                     self.curl_props['lib_versions'] = {
    144                         lib.lower() for lib in m.group('libs').split(' ')
    145                     }
    146                     self.curl_props['libs'] = {
    147                         re.sub(r'/[a-z0-9.-]*', '', lib) for lib in self.curl_props['lib_versions']
    148                     }
    149             if line.startswith('Features: '):
    150                 self.curl_props['features_string'] = line[10:]
    151                 self.curl_props['features'] = {
    152                     feat.lower() for feat in line[10:].split(' ')
    153                 }
    154             if line.startswith('Protocols: '):
    155                 self.curl_props['protocols_string'] = line[11:]
    156                 self.curl_props['protocols'] = {
    157                     prot.lower() for prot in line[11:].split(' ')
    158                 }
    159 
    160         self.ports = {}
    161 
    162         self.httpd = self.config['httpd']['httpd']
    163         self.apxs = self.config['httpd']['apxs']
    164         if len(self.apxs) == 0:
    165             self.apxs = None
    166         self._httpd_version = None
    167 
    168         self.examples_pem = {
    169             'key': 'xxx',
    170             'cert': 'xxx',
    171         }
    172         self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs')
    173         self.tld = 'http.curl.se'
    174         self.domain1 = f"one.{self.tld}"
    175         self.domain1brotli = f"brotli.one.{self.tld}"
    176         self.domain2 = f"two.{self.tld}"
    177         self.ftp_domain = f"ftp.{self.tld}"
    178         self.proxy_domain = f"proxy.{self.tld}"
    179         self.expired_domain = f"expired.{self.tld}"
    180         self.cert_specs = [
    181             CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'),
    182             CertificateSpec(name='domain1-no-ip', domains=[self.domain1, self.domain1brotli], key_type='rsa2048'),
    183             CertificateSpec(domains=[self.domain2], key_type='rsa2048'),
    184             CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'),
    185             CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'),
    186             CertificateSpec(domains=[self.expired_domain], key_type='rsa2048',
    187                             valid_from=timedelta(days=-100), valid_to=timedelta(days=-10)),
    188             CertificateSpec(name="clientsX", sub_specs=[
    189                CertificateSpec(name="user1", client=True),
    190             ]),
    191         ]
    192 
    193         self.nghttpx = self.config['nghttpx']['nghttpx']
    194         if len(self.nghttpx.strip()) == 0:
    195             self.nghttpx = None
    196         self._nghttpx_version = None
    197         self.nghttpx_with_h3 = False
    198         if self.nghttpx is not None:
    199             try:
    200                 self._nghttpx_version = NghttpxUtil.version(self.nghttpx)
    201                 self.nghttpx_with_h3 = NghttpxUtil.version_with_h3(self._nghttpx_version)
    202             except RuntimeError:
    203                 # not a working nghttpx
    204                 log.exception('checking nghttpx version')
    205                 self.nghttpx = None
    206 
    207         self.caddy = self.config['caddy']['caddy']
    208         self._caddy_version = None
    209         if len(self.caddy.strip()) == 0:
    210             self.caddy = None
    211         if self.caddy is not None:
    212             try:
    213                 p = subprocess.run(args=[self.caddy, 'version'],
    214                                    capture_output=True, text=True)
    215                 if p.returncode != 0:
    216                     # not a working caddy
    217                     self.caddy = None
    218                 m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout)
    219                 if m:
    220                     self._caddy_version = m.group(1)
    221                 else:
    222                     raise RuntimeError(f'Unable to determine cadd version from: {p.stdout}')
    223             # TODO: specify specific exceptions here
    224             except:  # noqa: E722
    225                 self.caddy = None
    226 
    227         self.vsftpd = self.config['vsftpd']['vsftpd']
    228         self._vsftpd_version = None
    229         if self.vsftpd is not None:
    230             try:
    231                 with tempfile.TemporaryFile('w+') as tmp:
    232                     p = subprocess.run(args=[self.vsftpd, '-v'],
    233                                        capture_output=True, text=True, stdin=tmp)
    234                     if p.returncode != 0:
    235                         # not a working vsftpd
    236                         self.vsftpd = None
    237                     if p.stderr:
    238                         ver_text = p.stderr
    239                     else:
    240                         # Oddly, some versions of vsftpd write to stdin (!)
    241                         # instead of stderr, which is odd but works. If there
    242                         # is nothing on stderr, read the file on stdin and use
    243                         # any data there instead.
    244                         tmp.seek(0)
    245                         ver_text = tmp.read()
    246                 m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', ver_text)
    247                 if m:
    248                     self._vsftpd_version = m.group(1)
    249                 elif len(p.stderr) == 0:
    250                     # vsftp does not use stdout or stderr for printing its version... -.-
    251                     self._vsftpd_version = 'unknown'
    252                 else:
    253                     raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}')
    254             except Exception:
    255                 self.vsftpd = None
    256 
    257         self._tcpdump = shutil.which('tcpdump')
    258 
    259     @property
    260     def httpd_version(self):
    261         if self._httpd_version is None and self.apxs is not None:
    262             try:
    263                 p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'],
    264                                    capture_output=True, text=True)
    265                 if p.returncode != 0:
    266                     log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}')
    267                 else:
    268                     self._httpd_version = p.stdout.strip()
    269             except Exception:
    270                 log.exception(f'{self.apxs} failed to run')
    271         return self._httpd_version
    272 
    273     def versiontuple(self, v):
    274         v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v)
    275         return tuple(map(int, v.split('.')))
    276 
    277     def httpd_is_at_least(self, minv):
    278         if self.httpd_version is None:
    279             return False
    280         hv = self.versiontuple(self.httpd_version)
    281         return hv >= self.versiontuple(minv)
    282 
    283     def caddy_is_at_least(self, minv):
    284         if self.caddy_version is None:
    285             return False
    286         hv = self.versiontuple(self.caddy_version)
    287         return hv >= self.versiontuple(minv)
    288 
    289     def is_complete(self) -> bool:
    290         return os.path.isfile(self.httpd) and \
    291                self.apxs is not None and \
    292                os.path.isfile(self.apxs)
    293 
    294     def get_incomplete_reason(self) -> Optional[str]:
    295         if self.httpd is None or len(self.httpd.strip()) == 0:
    296             return 'httpd not configured, see `--with-test-httpd=<path>`'
    297         if not os.path.isfile(self.httpd):
    298             return f'httpd ({self.httpd}) not found'
    299         if self.apxs is None:
    300             return "command apxs not found (commonly provided in apache2-dev)"
    301         if not os.path.isfile(self.apxs):
    302             return f"apxs ({self.apxs}) not found"
    303         return None
    304 
    305     @property
    306     def nghttpx_version(self):
    307         return self._nghttpx_version
    308 
    309     @property
    310     def caddy_version(self):
    311         return self._caddy_version
    312 
    313     @property
    314     def vsftpd_version(self):
    315         return self._vsftpd_version
    316 
    317     @property
    318     def tcpdmp(self) -> Optional[str]:
    319         return self._tcpdump
    320 
    321     def clear_locks(self):
    322         ca_lock = os.path.join(self.gen_root, 'ca/ca.lock')
    323         if os.path.exists(ca_lock):
    324             os.remove(ca_lock)
    325 
    326 
    327 class Env:
    328 
    329     SERVER_TIMEOUT = 30  # seconds to wait for server to come up/reload
    330 
    331     CONFIG = EnvConfig()
    332 
    333     @staticmethod
    334     def setup_incomplete() -> bool:
    335         return not Env.CONFIG.is_complete()
    336 
    337     @staticmethod
    338     def incomplete_reason() -> Optional[str]:
    339         return Env.CONFIG.get_incomplete_reason()
    340 
    341     @staticmethod
    342     def have_nghttpx() -> bool:
    343         return Env.CONFIG.nghttpx is not None
    344 
    345     @staticmethod
    346     def have_h3_server() -> bool:
    347         return Env.CONFIG.nghttpx_with_h3
    348 
    349     @staticmethod
    350     def have_ssl_curl() -> bool:
    351         return Env.curl_has_feature('ssl') or Env.curl_has_feature('multissl')
    352 
    353     @staticmethod
    354     def have_h2_curl() -> bool:
    355         return 'http2' in Env.CONFIG.curl_props['features']
    356 
    357     @staticmethod
    358     def have_h3_curl() -> bool:
    359         return 'http3' in Env.CONFIG.curl_props['features']
    360 
    361     @staticmethod
    362     def curl_uses_lib(libname: str) -> bool:
    363         return libname.lower() in Env.CONFIG.curl_props['libs']
    364 
    365     @staticmethod
    366     def curl_uses_any_libs(libs: List[str]) -> bool:
    367         for libname in libs:
    368             if libname.lower() in Env.CONFIG.curl_props['libs']:
    369                 return True
    370         return False
    371 
    372     @staticmethod
    373     def curl_uses_ossl_quic() -> bool:
    374         if Env.have_h3_curl():
    375             return not Env.curl_uses_lib('ngtcp2') and Env.curl_uses_lib('nghttp3')
    376         return False
    377 
    378     @staticmethod
    379     def curl_version_string() -> str:
    380         return Env.CONFIG.curl_props['version_string']
    381 
    382     @staticmethod
    383     def curl_features_string() -> str:
    384         return Env.CONFIG.curl_props['features_string']
    385 
    386     @staticmethod
    387     def curl_has_feature(feature: str) -> bool:
    388         return feature.lower() in Env.CONFIG.curl_props['features']
    389 
    390     @staticmethod
    391     def curl_protocols_string() -> str:
    392         return Env.CONFIG.curl_props['protocols_string']
    393 
    394     @staticmethod
    395     def curl_has_protocol(protocol: str) -> bool:
    396         return protocol.lower() in Env.CONFIG.curl_props['protocols']
    397 
    398     @staticmethod
    399     def curl_lib_version(libname: str) -> str:
    400         prefix = f'{libname.lower()}/'
    401         for lversion in Env.CONFIG.curl_props['lib_versions']:
    402             if lversion.startswith(prefix):
    403                 return lversion[len(prefix):]
    404         return 'unknown'
    405 
    406     @staticmethod
    407     def curl_lib_version_at_least(libname: str, min_version) -> bool:
    408         lversion = Env.curl_lib_version(libname)
    409         if lversion != 'unknown':
    410             return Env.CONFIG.versiontuple(min_version) <= \
    411                    Env.CONFIG.versiontuple(lversion)
    412         return False
    413 
    414     @staticmethod
    415     def curl_lib_version_before(libname: str, lib_version) -> bool:
    416         lversion = Env.curl_lib_version(libname)
    417         if lversion != 'unknown':
    418             if m := re.match(r'(\d+\.\d+\.\d+).*', lversion):
    419                 lversion = m.group(1)
    420             return Env.CONFIG.versiontuple(lib_version) > \
    421                 Env.CONFIG.versiontuple(lversion)
    422         return False
    423 
    424     @staticmethod
    425     def curl_os() -> str:
    426         return Env.CONFIG.curl_props['os']
    427 
    428     @staticmethod
    429     def curl_fullname() -> str:
    430         return Env.CONFIG.curl_props['fullname']
    431 
    432     @staticmethod
    433     def curl_version() -> str:
    434         return Env.CONFIG.curl_props['version']
    435 
    436     @staticmethod
    437     def curl_is_debug() -> bool:
    438         return Env.CONFIG.curl_is_debug
    439 
    440     @staticmethod
    441     def curl_can_early_data() -> bool:
    442         return Env.curl_uses_any_libs(['gnutls', 'wolfssl', 'quictls', 'openssl'])
    443 
    444     @staticmethod
    445     def curl_can_h3_early_data() -> bool:
    446         return Env.curl_can_early_data() and \
    447             Env.curl_uses_lib('ngtcp2')
    448 
    449     @staticmethod
    450     def have_h3() -> bool:
    451         return Env.have_h3_curl() and Env.have_h3_server()
    452 
    453     @staticmethod
    454     def httpd_version() -> str:
    455         return Env.CONFIG.httpd_version
    456 
    457     @staticmethod
    458     def nghttpx_version() -> str:
    459         return Env.CONFIG.nghttpx_version
    460 
    461     @staticmethod
    462     def caddy_version() -> str:
    463         return Env.CONFIG.caddy_version
    464 
    465     @staticmethod
    466     def caddy_is_at_least(minv) -> bool:
    467         return Env.CONFIG.caddy_is_at_least(minv)
    468 
    469     @staticmethod
    470     def httpd_is_at_least(minv) -> bool:
    471         return Env.CONFIG.httpd_is_at_least(minv)
    472 
    473     @staticmethod
    474     def has_caddy() -> bool:
    475         return Env.CONFIG.caddy is not None
    476 
    477     @staticmethod
    478     def has_vsftpd() -> bool:
    479         return Env.CONFIG.vsftpd is not None
    480 
    481     @staticmethod
    482     def vsftpd_version() -> str:
    483         return Env.CONFIG.vsftpd_version
    484 
    485     @staticmethod
    486     def tcpdump() -> Optional[str]:
    487         return Env.CONFIG.tcpdmp
    488 
    489     def __init__(self, pytestconfig=None, env_config=None):
    490         if env_config:
    491             Env.CONFIG = env_config
    492         self._verbose = pytestconfig.option.verbose \
    493             if pytestconfig is not None else 0
    494         self._ca = None
    495         self._test_timeout = 300.0 if self._verbose > 1 else 60.0  # seconds
    496 
    497     def issue_certs(self):
    498         if self._ca is None:
    499             ca_dir = os.path.join(self.CONFIG.gen_root, 'ca')
    500             os.makedirs(ca_dir, exist_ok=True)
    501             lock_file = os.path.join(ca_dir, 'ca.lock')
    502             with FileLock(lock_file):
    503                 self._ca = TestCA.create_root(name=self.CONFIG.tld,
    504                                               store_dir=ca_dir,
    505                                               key_type="rsa2048")
    506                 self._ca.issue_certs(self.CONFIG.cert_specs)
    507 
    508     def setup(self):
    509         os.makedirs(self.gen_dir, exist_ok=True)
    510         os.makedirs(self.htdocs_dir, exist_ok=True)
    511         self.issue_certs()
    512 
    513     def get_credentials(self, domain) -> Optional[Credentials]:
    514         creds = self.ca.get_credentials_for_name(domain)
    515         if len(creds) > 0:
    516             return creds[0]
    517         return None
    518 
    519     @property
    520     def verbose(self) -> int:
    521         return self._verbose
    522 
    523     @property
    524     def test_timeout(self) -> Optional[float]:
    525         return self._test_timeout
    526 
    527     @test_timeout.setter
    528     def test_timeout(self, val: Optional[float]):
    529         self._test_timeout = val
    530 
    531     @property
    532     def gen_dir(self) -> str:
    533         return self.CONFIG.gen_dir
    534 
    535     @property
    536     def gen_root(self) -> str:
    537         return self.CONFIG.gen_root
    538 
    539     @property
    540     def project_dir(self) -> str:
    541         return self.CONFIG.project_dir
    542 
    543     @property
    544     def build_dir(self) -> str:
    545         return self.CONFIG.build_dir
    546 
    547     @property
    548     def ca(self):
    549         return self._ca
    550 
    551     @property
    552     def htdocs_dir(self) -> str:
    553         return self.CONFIG.htdocs_dir
    554 
    555     @property
    556     def tld(self) -> str:
    557         return self.CONFIG.tld
    558 
    559     @property
    560     def domain1(self) -> str:
    561         return self.CONFIG.domain1
    562 
    563     @property
    564     def domain1brotli(self) -> str:
    565         return self.CONFIG.domain1brotli
    566 
    567     @property
    568     def domain2(self) -> str:
    569         return self.CONFIG.domain2
    570 
    571     @property
    572     def ftp_domain(self) -> str:
    573         return self.CONFIG.ftp_domain
    574 
    575     @property
    576     def proxy_domain(self) -> str:
    577         return self.CONFIG.proxy_domain
    578 
    579     @property
    580     def expired_domain(self) -> str:
    581         return self.CONFIG.expired_domain
    582 
    583     @property
    584     def ports(self) -> Dict[str, int]:
    585         return self.CONFIG.ports
    586 
    587     def update_ports(self, ports: Dict[str, int]):
    588         self.CONFIG.ports.update(ports)
    589 
    590     @property
    591     def http_port(self) -> int:
    592         return self.CONFIG.ports.get('http', 0)
    593 
    594     @property
    595     def https_port(self) -> int:
    596         return self.CONFIG.ports['https']
    597 
    598     @property
    599     def https_only_tcp_port(self) -> int:
    600         return self.CONFIG.ports['https-tcp-only']
    601 
    602     @property
    603     def nghttpx_https_port(self) -> int:
    604         return self.CONFIG.ports['nghttpx_https']
    605 
    606     @property
    607     def h3_port(self) -> int:
    608         return self.https_port
    609 
    610     @property
    611     def proxy_port(self) -> int:
    612         return self.CONFIG.ports['proxy']
    613 
    614     @property
    615     def proxys_port(self) -> int:
    616         return self.CONFIG.ports['proxys']
    617 
    618     @property
    619     def ftp_port(self) -> int:
    620         return self.CONFIG.ports['ftp']
    621 
    622     @property
    623     def ftps_port(self) -> int:
    624         return self.CONFIG.ports['ftps']
    625 
    626     @property
    627     def h2proxys_port(self) -> int:
    628         return self.CONFIG.ports['h2proxys']
    629 
    630     def pts_port(self, proto: str = 'http/1.1') -> int:
    631         # proxy tunnel port
    632         return self.CONFIG.ports['h2proxys' if proto == 'h2' else 'proxys']
    633 
    634     @property
    635     def caddy(self) -> str:
    636         return self.CONFIG.caddy
    637 
    638     @property
    639     def caddy_https_port(self) -> int:
    640         return self.CONFIG.ports['caddys']
    641 
    642     @property
    643     def caddy_http_port(self) -> int:
    644         return self.CONFIG.ports['caddy']
    645 
    646     @property
    647     def vsftpd(self) -> str:
    648         return self.CONFIG.vsftpd
    649 
    650     @property
    651     def ws_port(self) -> int:
    652         return self.CONFIG.ports['ws']
    653 
    654     @property
    655     def curl(self) -> str:
    656         return self.CONFIG.curl
    657 
    658     @property
    659     def httpd(self) -> str:
    660         return self.CONFIG.httpd
    661 
    662     @property
    663     def apxs(self) -> str:
    664         return self.CONFIG.apxs
    665 
    666     @property
    667     def nghttpx(self) -> Optional[str]:
    668         return self.CONFIG.nghttpx
    669 
    670     @property
    671     def slow_network(self) -> bool:
    672         return "CURL_DBG_SOCK_WBLOCK" in os.environ or \
    673                "CURL_DBG_SOCK_WPARTIAL" in os.environ
    674 
    675     @property
    676     def ci_run(self) -> bool:
    677         return "CURL_CI" in os.environ
    678 
    679     def port_for(self, alpn_proto: Optional[str] = None):
    680         if alpn_proto is None or \
    681                 alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']:
    682             return self.https_port
    683         if alpn_proto in ['h3']:
    684             return self.h3_port
    685         return self.http_port
    686 
    687     def authority_for(self, domain: str, alpn_proto: Optional[str] = None):
    688         return f'{domain}:{self.port_for(alpn_proto=alpn_proto)}'
    689 
    690     def make_data_file(self, indir: str, fname: str, fsize: int,
    691                        line_length: int = 1024) -> str:
    692         if line_length < 11:
    693             raise RuntimeError('line_length less than 11 not supported')
    694         fpath = os.path.join(indir, fname)
    695         s10 = "0123456789"
    696         s = round((line_length / 10) + 1) * s10
    697         s = s[0:line_length-11]
    698         with open(fpath, 'w') as fd:
    699             for i in range(int(fsize / line_length)):
    700                 fd.write(f"{i:09d}-{s}\n")
    701             remain = int(fsize % line_length)
    702             if remain != 0:
    703                 i = int(fsize / line_length) + 1
    704                 fd.write(f"{i:09d}-{s}"[0:remain-1] + "\n")
    705         return fpath
    706 
    707     def make_data_gzipbomb(self, indir: str, fname: str, fsize: int) -> str:
    708         fpath = os.path.join(indir, fname)
    709         gzpath = f'{fpath}.gz'
    710         varpath = f'{fpath}.var'
    711 
    712         with open(fpath, 'w') as fd:
    713             fd.write('not what we are looking for!\n')
    714         count = int(fsize / 1024)
    715         zero1k = bytearray(1024)
    716         with gzip.open(gzpath, 'wb') as fd:
    717             for _ in range(count):
    718                 fd.write(zero1k)
    719         with open(varpath, 'w') as fd:
    720             fd.write(f'URI: {fname}\n')
    721             fd.write('\n')
    722             fd.write(f'URI: {fname}.gz\n')
    723             fd.write('Content-Type: text/plain\n')
    724             fd.write('Content-Encoding: x-gzip\n')
    725             fd.write('\n')
    726         return fpath