env.py (24085B)
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 #*************************************************************************** 4 # _ _ ____ _ 5 # Project ___| | | | _ \| | 6 # / __| | | | |_) | | 7 # | (__| |_| | _ <| |___ 8 # \___|\___/|_| \_\_____| 9 # 10 # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 11 # 12 # This software is licensed as described in the file COPYING, which 13 # you should have received as part of this distribution. The terms 14 # are also available at https://curl.se/docs/copyright.html. 15 # 16 # You may opt to use, copy, modify, merge, publish, distribute and/or sell 17 # copies of the Software, and permit persons to whom the Software is 18 # furnished to do so, under the terms of the COPYING file. 19 # 20 # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 21 # KIND, either express or implied. 22 # 23 # SPDX-License-Identifier: curl 24 # 25 ########################################################################### 26 # 27 import gzip 28 import logging 29 import os 30 import re 31 import shutil 32 import subprocess 33 import tempfile 34 from configparser import ConfigParser, ExtendedInterpolation 35 from datetime import timedelta 36 from typing import Optional, Dict, List 37 38 import pytest 39 from filelock import FileLock 40 41 from .certs import CertificateSpec, Credentials, TestCA 42 43 44 log = logging.getLogger(__name__) 45 46 47 def init_config_from(conf_path): 48 if os.path.isfile(conf_path): 49 config = ConfigParser(interpolation=ExtendedInterpolation()) 50 config.read(conf_path) 51 return config 52 return None 53 54 55 TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__)) 56 PROJ_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH)) 57 TOP_PATH = os.path.join(os.getcwd(), os.path.pardir) 58 CONFIG_PATH = os.path.join(TOP_PATH, 'tests', 'http', 'config.ini') 59 if not os.path.exists(CONFIG_PATH): 60 ALT_CONFIG_PATH = os.path.join(PROJ_PATH, 'tests', 'http', 'config.ini') 61 if not os.path.exists(ALT_CONFIG_PATH): 62 raise Exception(f'unable to find config.ini in {CONFIG_PATH} nor {ALT_CONFIG_PATH}') 63 TOP_PATH = PROJ_PATH 64 CONFIG_PATH = ALT_CONFIG_PATH 65 DEF_CONFIG = init_config_from(CONFIG_PATH) 66 CURL = os.path.join(TOP_PATH, 'src', 'curl') 67 68 69 class NghttpxUtil: 70 71 CMD = None 72 VERSION_FULL = None 73 74 @classmethod 75 def version(cls, cmd): 76 if cmd is None: 77 return None 78 if cls.VERSION_FULL is None or cmd != cls.CMD: 79 p = subprocess.run(args=[cmd, '--version'], 80 capture_output=True, text=True) 81 if p.returncode != 0: 82 raise RuntimeError(f'{cmd} --version failed with exit code: {p.returncode}') 83 cls.CMD = cmd 84 for line in p.stdout.splitlines(keepends=False): 85 if line.startswith('nghttpx '): 86 cls.VERSION_FULL = line 87 if cls.VERSION_FULL is None: 88 raise RuntimeError(f'{cmd}: unable to determine version') 89 return cls.VERSION_FULL 90 91 @staticmethod 92 def version_with_h3(version): 93 return re.match(r'.* ngtcp2/\d+\.\d+\.\d+.*', version) is not None 94 95 96 class EnvConfig: 97 98 def __init__(self, pytestconfig: Optional[pytest.Config] = None, 99 testrun_uid=None, 100 worker_id=None): 101 self.pytestconfig = pytestconfig 102 self.testrun_uid = testrun_uid 103 self.worker_id = worker_id if worker_id is not None else 'master' 104 self.tests_dir = TESTS_HTTPD_PATH 105 self.gen_root = self.gen_dir = os.path.join(self.tests_dir, 'gen') 106 if self.worker_id != 'master': 107 self.gen_dir = os.path.join(self.gen_dir, self.worker_id) 108 self.project_dir = os.path.dirname(os.path.dirname(self.tests_dir)) 109 self.build_dir = TOP_PATH 110 self.config = DEF_CONFIG 111 # check cur and its features 112 self.curl = CURL 113 if 'CURL' in os.environ: 114 self.curl = os.environ['CURL'] 115 self.curl_props = { 116 'version_string': '', 117 'version': '', 118 'os': '', 119 'fullname': '', 120 'features_string': '', 121 'features': set(), 122 'protocols_string': '', 123 'protocols': set(), 124 'libs': set(), 125 'lib_versions': set(), 126 } 127 self.curl_is_debug = False 128 self.curl_protos = [] 129 p = subprocess.run(args=[self.curl, '-V'], 130 capture_output=True, text=True) 131 if p.returncode != 0: 132 raise RuntimeError(f'{self.curl} -V failed with exit code: {p.returncode}') 133 if p.stderr.startswith('WARNING:'): 134 self.curl_is_debug = True 135 for line in p.stdout.splitlines(keepends=False): 136 if line.startswith('curl '): 137 self.curl_props['version_string'] = line 138 m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', line) 139 if m: 140 self.curl_props['fullname'] = m.group(0) 141 self.curl_props['version'] = m.group('version') 142 self.curl_props['os'] = m.group('os') 143 self.curl_props['lib_versions'] = { 144 lib.lower() for lib in m.group('libs').split(' ') 145 } 146 self.curl_props['libs'] = { 147 re.sub(r'/[a-z0-9.-]*', '', lib) for lib in self.curl_props['lib_versions'] 148 } 149 if line.startswith('Features: '): 150 self.curl_props['features_string'] = line[10:] 151 self.curl_props['features'] = { 152 feat.lower() for feat in line[10:].split(' ') 153 } 154 if line.startswith('Protocols: '): 155 self.curl_props['protocols_string'] = line[11:] 156 self.curl_props['protocols'] = { 157 prot.lower() for prot in line[11:].split(' ') 158 } 159 160 self.ports = {} 161 162 self.httpd = self.config['httpd']['httpd'] 163 self.apxs = self.config['httpd']['apxs'] 164 if len(self.apxs) == 0: 165 self.apxs = None 166 self._httpd_version = None 167 168 self.examples_pem = { 169 'key': 'xxx', 170 'cert': 'xxx', 171 } 172 self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs') 173 self.tld = 'http.curl.se' 174 self.domain1 = f"one.{self.tld}" 175 self.domain1brotli = f"brotli.one.{self.tld}" 176 self.domain2 = f"two.{self.tld}" 177 self.ftp_domain = f"ftp.{self.tld}" 178 self.proxy_domain = f"proxy.{self.tld}" 179 self.expired_domain = f"expired.{self.tld}" 180 self.cert_specs = [ 181 CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'), 182 CertificateSpec(name='domain1-no-ip', domains=[self.domain1, self.domain1brotli], key_type='rsa2048'), 183 CertificateSpec(domains=[self.domain2], key_type='rsa2048'), 184 CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'), 185 CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'), 186 CertificateSpec(domains=[self.expired_domain], key_type='rsa2048', 187 valid_from=timedelta(days=-100), valid_to=timedelta(days=-10)), 188 CertificateSpec(name="clientsX", sub_specs=[ 189 CertificateSpec(name="user1", client=True), 190 ]), 191 ] 192 193 self.nghttpx = self.config['nghttpx']['nghttpx'] 194 if len(self.nghttpx.strip()) == 0: 195 self.nghttpx = None 196 self._nghttpx_version = None 197 self.nghttpx_with_h3 = False 198 if self.nghttpx is not None: 199 try: 200 self._nghttpx_version = NghttpxUtil.version(self.nghttpx) 201 self.nghttpx_with_h3 = NghttpxUtil.version_with_h3(self._nghttpx_version) 202 except RuntimeError: 203 # not a working nghttpx 204 log.exception('checking nghttpx version') 205 self.nghttpx = None 206 207 self.caddy = self.config['caddy']['caddy'] 208 self._caddy_version = None 209 if len(self.caddy.strip()) == 0: 210 self.caddy = None 211 if self.caddy is not None: 212 try: 213 p = subprocess.run(args=[self.caddy, 'version'], 214 capture_output=True, text=True) 215 if p.returncode != 0: 216 # not a working caddy 217 self.caddy = None 218 m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout) 219 if m: 220 self._caddy_version = m.group(1) 221 else: 222 raise RuntimeError(f'Unable to determine cadd version from: {p.stdout}') 223 # TODO: specify specific exceptions here 224 except: # noqa: E722 225 self.caddy = None 226 227 self.vsftpd = self.config['vsftpd']['vsftpd'] 228 self._vsftpd_version = None 229 if self.vsftpd is not None: 230 try: 231 with tempfile.TemporaryFile('w+') as tmp: 232 p = subprocess.run(args=[self.vsftpd, '-v'], 233 capture_output=True, text=True, stdin=tmp) 234 if p.returncode != 0: 235 # not a working vsftpd 236 self.vsftpd = None 237 if p.stderr: 238 ver_text = p.stderr 239 else: 240 # Oddly, some versions of vsftpd write to stdin (!) 241 # instead of stderr, which is odd but works. If there 242 # is nothing on stderr, read the file on stdin and use 243 # any data there instead. 244 tmp.seek(0) 245 ver_text = tmp.read() 246 m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', ver_text) 247 if m: 248 self._vsftpd_version = m.group(1) 249 elif len(p.stderr) == 0: 250 # vsftp does not use stdout or stderr for printing its version... -.- 251 self._vsftpd_version = 'unknown' 252 else: 253 raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}') 254 except Exception: 255 self.vsftpd = None 256 257 self._tcpdump = shutil.which('tcpdump') 258 259 @property 260 def httpd_version(self): 261 if self._httpd_version is None and self.apxs is not None: 262 try: 263 p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'], 264 capture_output=True, text=True) 265 if p.returncode != 0: 266 log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}') 267 else: 268 self._httpd_version = p.stdout.strip() 269 except Exception: 270 log.exception(f'{self.apxs} failed to run') 271 return self._httpd_version 272 273 def versiontuple(self, v): 274 v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v) 275 return tuple(map(int, v.split('.'))) 276 277 def httpd_is_at_least(self, minv): 278 if self.httpd_version is None: 279 return False 280 hv = self.versiontuple(self.httpd_version) 281 return hv >= self.versiontuple(minv) 282 283 def caddy_is_at_least(self, minv): 284 if self.caddy_version is None: 285 return False 286 hv = self.versiontuple(self.caddy_version) 287 return hv >= self.versiontuple(minv) 288 289 def is_complete(self) -> bool: 290 return os.path.isfile(self.httpd) and \ 291 self.apxs is not None and \ 292 os.path.isfile(self.apxs) 293 294 def get_incomplete_reason(self) -> Optional[str]: 295 if self.httpd is None or len(self.httpd.strip()) == 0: 296 return 'httpd not configured, see `--with-test-httpd=<path>`' 297 if not os.path.isfile(self.httpd): 298 return f'httpd ({self.httpd}) not found' 299 if self.apxs is None: 300 return "command apxs not found (commonly provided in apache2-dev)" 301 if not os.path.isfile(self.apxs): 302 return f"apxs ({self.apxs}) not found" 303 return None 304 305 @property 306 def nghttpx_version(self): 307 return self._nghttpx_version 308 309 @property 310 def caddy_version(self): 311 return self._caddy_version 312 313 @property 314 def vsftpd_version(self): 315 return self._vsftpd_version 316 317 @property 318 def tcpdmp(self) -> Optional[str]: 319 return self._tcpdump 320 321 def clear_locks(self): 322 ca_lock = os.path.join(self.gen_root, 'ca/ca.lock') 323 if os.path.exists(ca_lock): 324 os.remove(ca_lock) 325 326 327 class Env: 328 329 SERVER_TIMEOUT = 30 # seconds to wait for server to come up/reload 330 331 CONFIG = EnvConfig() 332 333 @staticmethod 334 def setup_incomplete() -> bool: 335 return not Env.CONFIG.is_complete() 336 337 @staticmethod 338 def incomplete_reason() -> Optional[str]: 339 return Env.CONFIG.get_incomplete_reason() 340 341 @staticmethod 342 def have_nghttpx() -> bool: 343 return Env.CONFIG.nghttpx is not None 344 345 @staticmethod 346 def have_h3_server() -> bool: 347 return Env.CONFIG.nghttpx_with_h3 348 349 @staticmethod 350 def have_ssl_curl() -> bool: 351 return Env.curl_has_feature('ssl') or Env.curl_has_feature('multissl') 352 353 @staticmethod 354 def have_h2_curl() -> bool: 355 return 'http2' in Env.CONFIG.curl_props['features'] 356 357 @staticmethod 358 def have_h3_curl() -> bool: 359 return 'http3' in Env.CONFIG.curl_props['features'] 360 361 @staticmethod 362 def curl_uses_lib(libname: str) -> bool: 363 return libname.lower() in Env.CONFIG.curl_props['libs'] 364 365 @staticmethod 366 def curl_uses_any_libs(libs: List[str]) -> bool: 367 for libname in libs: 368 if libname.lower() in Env.CONFIG.curl_props['libs']: 369 return True 370 return False 371 372 @staticmethod 373 def curl_uses_ossl_quic() -> bool: 374 if Env.have_h3_curl(): 375 return not Env.curl_uses_lib('ngtcp2') and Env.curl_uses_lib('nghttp3') 376 return False 377 378 @staticmethod 379 def curl_version_string() -> str: 380 return Env.CONFIG.curl_props['version_string'] 381 382 @staticmethod 383 def curl_features_string() -> str: 384 return Env.CONFIG.curl_props['features_string'] 385 386 @staticmethod 387 def curl_has_feature(feature: str) -> bool: 388 return feature.lower() in Env.CONFIG.curl_props['features'] 389 390 @staticmethod 391 def curl_protocols_string() -> str: 392 return Env.CONFIG.curl_props['protocols_string'] 393 394 @staticmethod 395 def curl_has_protocol(protocol: str) -> bool: 396 return protocol.lower() in Env.CONFIG.curl_props['protocols'] 397 398 @staticmethod 399 def curl_lib_version(libname: str) -> str: 400 prefix = f'{libname.lower()}/' 401 for lversion in Env.CONFIG.curl_props['lib_versions']: 402 if lversion.startswith(prefix): 403 return lversion[len(prefix):] 404 return 'unknown' 405 406 @staticmethod 407 def curl_lib_version_at_least(libname: str, min_version) -> bool: 408 lversion = Env.curl_lib_version(libname) 409 if lversion != 'unknown': 410 return Env.CONFIG.versiontuple(min_version) <= \ 411 Env.CONFIG.versiontuple(lversion) 412 return False 413 414 @staticmethod 415 def curl_lib_version_before(libname: str, lib_version) -> bool: 416 lversion = Env.curl_lib_version(libname) 417 if lversion != 'unknown': 418 if m := re.match(r'(\d+\.\d+\.\d+).*', lversion): 419 lversion = m.group(1) 420 return Env.CONFIG.versiontuple(lib_version) > \ 421 Env.CONFIG.versiontuple(lversion) 422 return False 423 424 @staticmethod 425 def curl_os() -> str: 426 return Env.CONFIG.curl_props['os'] 427 428 @staticmethod 429 def curl_fullname() -> str: 430 return Env.CONFIG.curl_props['fullname'] 431 432 @staticmethod 433 def curl_version() -> str: 434 return Env.CONFIG.curl_props['version'] 435 436 @staticmethod 437 def curl_is_debug() -> bool: 438 return Env.CONFIG.curl_is_debug 439 440 @staticmethod 441 def curl_can_early_data() -> bool: 442 return Env.curl_uses_any_libs(['gnutls', 'wolfssl', 'quictls', 'openssl']) 443 444 @staticmethod 445 def curl_can_h3_early_data() -> bool: 446 return Env.curl_can_early_data() and \ 447 Env.curl_uses_lib('ngtcp2') 448 449 @staticmethod 450 def have_h3() -> bool: 451 return Env.have_h3_curl() and Env.have_h3_server() 452 453 @staticmethod 454 def httpd_version() -> str: 455 return Env.CONFIG.httpd_version 456 457 @staticmethod 458 def nghttpx_version() -> str: 459 return Env.CONFIG.nghttpx_version 460 461 @staticmethod 462 def caddy_version() -> str: 463 return Env.CONFIG.caddy_version 464 465 @staticmethod 466 def caddy_is_at_least(minv) -> bool: 467 return Env.CONFIG.caddy_is_at_least(minv) 468 469 @staticmethod 470 def httpd_is_at_least(minv) -> bool: 471 return Env.CONFIG.httpd_is_at_least(minv) 472 473 @staticmethod 474 def has_caddy() -> bool: 475 return Env.CONFIG.caddy is not None 476 477 @staticmethod 478 def has_vsftpd() -> bool: 479 return Env.CONFIG.vsftpd is not None 480 481 @staticmethod 482 def vsftpd_version() -> str: 483 return Env.CONFIG.vsftpd_version 484 485 @staticmethod 486 def tcpdump() -> Optional[str]: 487 return Env.CONFIG.tcpdmp 488 489 def __init__(self, pytestconfig=None, env_config=None): 490 if env_config: 491 Env.CONFIG = env_config 492 self._verbose = pytestconfig.option.verbose \ 493 if pytestconfig is not None else 0 494 self._ca = None 495 self._test_timeout = 300.0 if self._verbose > 1 else 60.0 # seconds 496 497 def issue_certs(self): 498 if self._ca is None: 499 ca_dir = os.path.join(self.CONFIG.gen_root, 'ca') 500 os.makedirs(ca_dir, exist_ok=True) 501 lock_file = os.path.join(ca_dir, 'ca.lock') 502 with FileLock(lock_file): 503 self._ca = TestCA.create_root(name=self.CONFIG.tld, 504 store_dir=ca_dir, 505 key_type="rsa2048") 506 self._ca.issue_certs(self.CONFIG.cert_specs) 507 508 def setup(self): 509 os.makedirs(self.gen_dir, exist_ok=True) 510 os.makedirs(self.htdocs_dir, exist_ok=True) 511 self.issue_certs() 512 513 def get_credentials(self, domain) -> Optional[Credentials]: 514 creds = self.ca.get_credentials_for_name(domain) 515 if len(creds) > 0: 516 return creds[0] 517 return None 518 519 @property 520 def verbose(self) -> int: 521 return self._verbose 522 523 @property 524 def test_timeout(self) -> Optional[float]: 525 return self._test_timeout 526 527 @test_timeout.setter 528 def test_timeout(self, val: Optional[float]): 529 self._test_timeout = val 530 531 @property 532 def gen_dir(self) -> str: 533 return self.CONFIG.gen_dir 534 535 @property 536 def gen_root(self) -> str: 537 return self.CONFIG.gen_root 538 539 @property 540 def project_dir(self) -> str: 541 return self.CONFIG.project_dir 542 543 @property 544 def build_dir(self) -> str: 545 return self.CONFIG.build_dir 546 547 @property 548 def ca(self): 549 return self._ca 550 551 @property 552 def htdocs_dir(self) -> str: 553 return self.CONFIG.htdocs_dir 554 555 @property 556 def tld(self) -> str: 557 return self.CONFIG.tld 558 559 @property 560 def domain1(self) -> str: 561 return self.CONFIG.domain1 562 563 @property 564 def domain1brotli(self) -> str: 565 return self.CONFIG.domain1brotli 566 567 @property 568 def domain2(self) -> str: 569 return self.CONFIG.domain2 570 571 @property 572 def ftp_domain(self) -> str: 573 return self.CONFIG.ftp_domain 574 575 @property 576 def proxy_domain(self) -> str: 577 return self.CONFIG.proxy_domain 578 579 @property 580 def expired_domain(self) -> str: 581 return self.CONFIG.expired_domain 582 583 @property 584 def ports(self) -> Dict[str, int]: 585 return self.CONFIG.ports 586 587 def update_ports(self, ports: Dict[str, int]): 588 self.CONFIG.ports.update(ports) 589 590 @property 591 def http_port(self) -> int: 592 return self.CONFIG.ports.get('http', 0) 593 594 @property 595 def https_port(self) -> int: 596 return self.CONFIG.ports['https'] 597 598 @property 599 def https_only_tcp_port(self) -> int: 600 return self.CONFIG.ports['https-tcp-only'] 601 602 @property 603 def nghttpx_https_port(self) -> int: 604 return self.CONFIG.ports['nghttpx_https'] 605 606 @property 607 def h3_port(self) -> int: 608 return self.https_port 609 610 @property 611 def proxy_port(self) -> int: 612 return self.CONFIG.ports['proxy'] 613 614 @property 615 def proxys_port(self) -> int: 616 return self.CONFIG.ports['proxys'] 617 618 @property 619 def ftp_port(self) -> int: 620 return self.CONFIG.ports['ftp'] 621 622 @property 623 def ftps_port(self) -> int: 624 return self.CONFIG.ports['ftps'] 625 626 @property 627 def h2proxys_port(self) -> int: 628 return self.CONFIG.ports['h2proxys'] 629 630 def pts_port(self, proto: str = 'http/1.1') -> int: 631 # proxy tunnel port 632 return self.CONFIG.ports['h2proxys' if proto == 'h2' else 'proxys'] 633 634 @property 635 def caddy(self) -> str: 636 return self.CONFIG.caddy 637 638 @property 639 def caddy_https_port(self) -> int: 640 return self.CONFIG.ports['caddys'] 641 642 @property 643 def caddy_http_port(self) -> int: 644 return self.CONFIG.ports['caddy'] 645 646 @property 647 def vsftpd(self) -> str: 648 return self.CONFIG.vsftpd 649 650 @property 651 def ws_port(self) -> int: 652 return self.CONFIG.ports['ws'] 653 654 @property 655 def curl(self) -> str: 656 return self.CONFIG.curl 657 658 @property 659 def httpd(self) -> str: 660 return self.CONFIG.httpd 661 662 @property 663 def apxs(self) -> str: 664 return self.CONFIG.apxs 665 666 @property 667 def nghttpx(self) -> Optional[str]: 668 return self.CONFIG.nghttpx 669 670 @property 671 def slow_network(self) -> bool: 672 return "CURL_DBG_SOCK_WBLOCK" in os.environ or \ 673 "CURL_DBG_SOCK_WPARTIAL" in os.environ 674 675 @property 676 def ci_run(self) -> bool: 677 return "CURL_CI" in os.environ 678 679 def port_for(self, alpn_proto: Optional[str] = None): 680 if alpn_proto is None or \ 681 alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']: 682 return self.https_port 683 if alpn_proto in ['h3']: 684 return self.h3_port 685 return self.http_port 686 687 def authority_for(self, domain: str, alpn_proto: Optional[str] = None): 688 return f'{domain}:{self.port_for(alpn_proto=alpn_proto)}' 689 690 def make_data_file(self, indir: str, fname: str, fsize: int, 691 line_length: int = 1024) -> str: 692 if line_length < 11: 693 raise RuntimeError('line_length less than 11 not supported') 694 fpath = os.path.join(indir, fname) 695 s10 = "0123456789" 696 s = round((line_length / 10) + 1) * s10 697 s = s[0:line_length-11] 698 with open(fpath, 'w') as fd: 699 for i in range(int(fsize / line_length)): 700 fd.write(f"{i:09d}-{s}\n") 701 remain = int(fsize % line_length) 702 if remain != 0: 703 i = int(fsize / line_length) + 1 704 fd.write(f"{i:09d}-{s}"[0:remain-1] + "\n") 705 return fpath 706 707 def make_data_gzipbomb(self, indir: str, fname: str, fsize: int) -> str: 708 fpath = os.path.join(indir, fname) 709 gzpath = f'{fpath}.gz' 710 varpath = f'{fpath}.var' 711 712 with open(fpath, 'w') as fd: 713 fd.write('not what we are looking for!\n') 714 count = int(fsize / 1024) 715 zero1k = bytearray(1024) 716 with gzip.open(gzpath, 'wb') as fd: 717 for _ in range(count): 718 fd.write(zero1k) 719 with open(varpath, 'w') as fd: 720 fd.write(f'URI: {fname}\n') 721 fd.write('\n') 722 fd.write(f'URI: {fname}.gz\n') 723 fd.write('Content-Type: text/plain\n') 724 fd.write('Content-Encoding: x-gzip\n') 725 fd.write('\n') 726 return fpath