config.py (18024B)
1 #!/usr/bin/env python3 2 """Python script to ask questions using an interactive prompt""" 3 4 import base64 5 import getpass 6 import os 7 import re 8 import subprocess 9 import urllib.parse 10 import uuid 11 from base64 import b64decode 12 from typing import Callable, Dict, TypeVar 13 14 import argon2 15 from Crypto.Cipher import ChaCha20_Poly1305 16 from Crypto.Hash import SHA512 17 from Crypto.Protocol.KDF import PBKDF2 18 from Crypto.Random import get_random_bytes 19 20 # Early exit if already loaded 21 if os.environ.get("CONFIG_LOADED") == "y": 22 exit(0) 23 24 log = open("setup.log", "ab", buffering=0) 25 CONFIG_FILE = "config/user.conf" 26 BIC_PATTERN = re.compile("[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?") 27 IBAN_PATTERN = re.compile("[A-Z]{2}[0-9]{2}[A-Z0-9]{,28}") 28 29 30 def load_conf() -> Dict[str, str]: 31 """Load user configuration file""" 32 conf = {} 33 with open(CONFIG_FILE, "r") as f: 34 for kv in f.read().splitlines(): 35 if len(kv) != 0: 36 [k, v] = [part.strip() for part in kv.split("=", 1)] 37 if v.startswith('"') and v.endswith('"'): 38 conf[k] = v.strip('"').replace('\\"', '"') 39 elif v.startswith("'") and v.endswith("'"): 40 conf[k] = v.strip("'").replace("'\\''", "'").replace("\\'", "'") 41 else: 42 conf[k] = v 43 return conf 44 45 46 conf = load_conf() 47 result_conf = {**conf, "CONFIG_LOADED": "y"} 48 49 50 def store_conf(): 51 """Update the configuration file""" 52 content = "" 53 for key, value in conf.items(): 54 escaped = value.replace("'", "'\\''") 55 content += f"{key}='{escaped}'\n" 56 with open(CONFIG_FILE, "w") as f: 57 f.write(content) 58 59 60 def add_conf(name: str, value: str): 61 """Update a user configuration value and update the configuration file""" 62 conf[name] = value 63 result_conf[name] = value 64 store_conf() 65 66 67 def run_cmd( 68 cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None 69 ) -> int: 70 """Run a command in a child process and return its exit code""" 71 result = subprocess.run( 72 cmd, 73 stdout=subprocess.PIPE, 74 stderr=subprocess.STDOUT, 75 input=input.encode() if input is not None else None, 76 stdin=subprocess.DEVNULL if input is None else None, 77 env=env, 78 ) 79 log.write(result.stdout) 80 if result.returncode != 0: 81 print(result.stdout.decode("utf-8"), end="") 82 return result.returncode 83 84 85 def try_cmd( 86 cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None 87 ) -> bool: 88 """Run a command in a child process and return if successful""" 89 return run_cmd(cmd, input, env) == 0 90 91 92 A = TypeVar("A") 93 T = TypeVar("T") 94 95 96 def conf_value( 97 name: str | None, 98 action: Callable[[], str | None], 99 default: T | None = None, 100 check: Callable[[str], T | None] = lambda it: it, 101 fmt: Callable[[T], str] = lambda it: str(it), 102 ) -> T: 103 """ 104 Logic to configure a value 105 106 :param name: if present will try to fetch the current value and will store the new value 107 :param action: how a value will be obtained 108 :param default: default value to use if no value is given 109 :param check: check and normalize the value 110 :param fmt: format value for storage 111 :return: the configuration value 112 """ 113 value = None 114 115 # Fetch current value 116 if name is not None: 117 curr = conf.get(name) 118 if curr is not None: 119 # Check the current value and ask again if invalid 120 value = check(curr) 121 122 # Ask for a new value until we get a valid one 123 while value is None: 124 new = action() 125 # Use default if no value was provided else check the new value 126 value = check(new) if new is not None else default 127 128 # Store the new value 129 if name is not None: 130 add_conf(name, fmt(value)) 131 132 return value 133 134 135 progress = (1, 0) 136 137 138 def step(): 139 global progress 140 (step, _) = progress 141 progress = (step + 1, 0) 142 143 144 def substep(): 145 global progress 146 (step, substep) = progress 147 progress = (step, substep + 1) 148 149 150 def fmt_progress(): 151 global progress 152 (step, substep) = progress 153 if substep == 0: 154 return f"{step}." 155 else: 156 return f"{step}.{substep}." 157 158 159 def ask( 160 name: str | None, 161 msg: str, 162 default: T | None = None, 163 example: T | None = None, 164 check: Callable[[str], T | None] = lambda it: it, 165 fmt: Callable[[T], str] = lambda it: str(it), 166 fmt_hint: Callable[[], str] | None = None, 167 secret: bool = False, 168 ) -> T: 169 """ 170 Prompt the user to configure a value 171 :param name: if present will try to fetch the current value and will store the new value 172 :param msg: the message to prompt the user with 173 :param default: default value to use if no value is obtained 174 :param example: an example to use as a user hint 175 :param check: check and normalize the value 176 :param fmt: format value for storage 177 :param fmt_hint: format value for input hint 178 :param secret: hide the input content 179 :return: the configuration value 180 """ 181 182 if fmt_hint is None: 183 if default is not None and not secret: 184 hint = f" (def. '{default}')" 185 elif example is not None: 186 hint = f" (e.g. '{example}')" 187 else: 188 hint = "" 189 else: 190 hint = fmt_hint() 191 192 end = ": " if "\n" not in msg else "" 193 194 fmt_msg = f"{fmt_progress()} {msg}{hint}{end}" 195 substep() 196 197 def do_ask() -> str | None: 198 # Log the prompt 199 log.write(fmt_msg.encode() + "\n".encode()) 200 # Actual prompt 201 if secret: 202 raw = getpass.getpass(fmt_msg).strip() 203 else: 204 print(fmt_msg, end="") 205 raw = input().strip() 206 if raw == "": 207 if default is None: 208 print("You must enter a value") 209 return None 210 return raw 211 212 return conf_value(name, do_ask, default, check, fmt) 213 214 215 def ask_str( 216 name: str | None, 217 msg: str, 218 default: str | None = None, 219 example: str | None = None, 220 secret: bool = False, 221 ) -> str: 222 "Prompt the user to configure a string" 223 return ask(name, msg, default, example=example, secret=secret) 224 225 226 def ask_bank_name( 227 name: str | None, msg: str, test: bool, default: str | None = None 228 ) -> str: 229 "Prompt the user to configure a BIC" 230 231 def check_bank_name(raw: str) -> str | None: 232 if test and "test" not in raw.lower(): 233 print("Test bank name must contain 'Test'") 234 return None 235 else: 236 return raw 237 238 return ask(name, msg, default, check=check_bank_name) 239 240 241 def ask_bic( 242 name: str | None, msg: str, default: str | None = None, example: str | None = None 243 ) -> str: 244 "Prompt the user to configure a BIC" 245 246 def check_bic(raw: str) -> str | None: 247 raw = raw.translate({ord(i): None for i in " -"}) 248 if not BIC_PATTERN.fullmatch(raw): 249 print("Invalid BIC") 250 return None 251 else: 252 return raw 253 254 return ask(name, msg, default, example, check=check_bic) 255 256 257 def ask_iban( 258 name: str | None, msg: str, default: str | None = None, example: str | None = None 259 ) -> str: 260 "Prompt the user to configure a IBAN" 261 262 def check_iban(raw: str) -> str | None: 263 raw = raw.translate({ord(i): None for i in " -"}) 264 if not IBAN_PATTERN.fullmatch(raw): 265 print("Invalid IBAN") # Checksum check ? 266 return None 267 else: 268 return raw 269 270 return ask(name, msg, default, example, check=check_iban) 271 272 273 def ask_currency( 274 name: str, msg: str, default: str | None = None, example: str | None = None 275 ) -> str: 276 "Prompt the user to configure a currency name" 277 278 def check_currency(currency: str) -> str | None: 279 currency = currency.upper() 280 if not all([c.isascii() and c.isalpha() for c in currency]): 281 print("The currency name must be an ASCII alphabetic string") 282 elif len(currency) < 3 or 11 < len(currency): 283 print("The currency name had to be between 3 and 11 characters long") 284 else: 285 return currency 286 return None 287 288 return ask(name, msg, default, example, check=check_currency) 289 290 291 def ask_host( 292 name: str, msg: str, default: str | None = None, example: str | None = None 293 ) -> str: 294 "Prompt the user to configure the installation hostname" 295 296 def check_host(host: str) -> str | None: 297 success = True 298 for subdomain in ["backend", "bank", "exchange"]: 299 success = try_cmd(["ping", "-c", "1", f"{subdomain}.{host}"]) and success 300 if success: 301 return host 302 else: 303 return None 304 305 return ask(name, msg, default, example, check=check_host) 306 307 308 def ask_terms(name: str, msg: str, kind: str) -> str: 309 "Prompt the user to select a ToS/privacy policy" 310 311 # msg = "9.1. Enter the filename of the ToS. Some available options are:\n" 312 tos_msg = msg 313 314 # Recollect example ToS files 315 tos_path = "/usr/share/taler-exchange/terms" 316 for f in os.listdir(tos_path): 317 tos_file = os.path.join(tos_path, f) 318 if os.path.isfile(tos_file) and f.endswith(".rst") and kind in f: 319 tos_msg += f"- {tos_file}\n" 320 321 tos_msg += "=> " 322 323 def check_file(path: str) -> str | None: 324 if not os.path.isfile(path): 325 print("Not a file") # Checksum check ? 326 return None 327 else: 328 return path 329 330 return ask(name, tos_msg, check=check_file) 331 332 333 def ask_yes_no(name: str | None, msg: str, default: bool | None = None) -> bool: 334 "Prompt the user to configure a boolean" 335 336 def check_yes_no(raw: str) -> bool | None: 337 raw = raw.lower() 338 if raw == "y" or raw == "yes": 339 return True 340 elif raw == "n" or raw == "no": 341 return False 342 else: 343 print("Expected 'y' or 'n'") 344 return None 345 346 def fmt_yes_no_default() -> str: 347 if default is None: 348 return " (y/n)" 349 elif default is True: 350 return " (Y/n)" 351 else: 352 return " (y/N)" 353 354 return ask( 355 name, 356 msg, 357 default, 358 check=check_yes_no, 359 fmt=lambda it: "y" if it else "n", 360 fmt_hint=fmt_yes_no_default, 361 ) 362 363 364 # ----- Crypto ----- # 365 366 367 def ask_config_password() -> str: 368 "Prompt the user to configure a password stored hashed with argon2id" 369 ph = argon2.PasswordHasher() 370 hash = conf.get("CONFIG_PASSWORD") 371 passwd = None 372 if hash is not None: 373 while True: 374 passwd = ask_str(None, "Enter the config password", secret=True) 375 try: 376 ph.verify(hash, passwd) 377 break 378 except argon2.exceptions.VerifyMismatchError: 379 print("invalid password") 380 else: 381 passwd = ask_str(None, "Choose a config password", secret=True) 382 383 if hash is None or ph.check_needs_rehash(hash): 384 add_conf("CONFIG_PASSWORD", ph.hash(passwd)) 385 386 return passwd 387 388 389 def ask_secret( 390 name: str, msg: str, passwd: str | None, default: str | None = None 391 ) -> str: 392 "Prompt the user to configure a string stored encryped using pbkdf2_sha512 and chacha20_poly1305" 393 if passwd is None: 394 return ask_str(name, msg, default, secret=True) 395 else: 396 raw = conf.get(name) 397 plaintext = None 398 if raw is not None: 399 method = "$pbkdf2_sha512_chacha20_poly1305$1000000$" 400 if raw.startswith(method): 401 salt, nonce, tag, ciphertext = [ 402 b64decode(it) for it in raw.removeprefix(method).split("$", 3) 403 ] 404 key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512) 405 cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce) 406 cipher.update(name.encode()) 407 plaintext = cipher.decrypt_and_verify(ciphertext, tag).decode() 408 else: 409 salt = get_random_bytes(16) 410 key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512) 411 cipher = ChaCha20_Poly1305.new(key=key) 412 cipher.update(name.encode()) 413 ciphertext, tag = cipher.encrypt_and_digest(raw.encode()) 414 add_conf( 415 name, 416 f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}", 417 ) 418 else: 419 plaintext = ask_str(None, msg, default, secret=True) 420 salt = get_random_bytes(16) 421 key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512) 422 cipher = ChaCha20_Poly1305.new(key=key) 423 cipher.update(name.encode()) 424 ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode()) 425 add_conf( 426 name, 427 f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}", 428 ) 429 result_conf[name] = plaintext 430 return plaintext 431 432 433 # ----- Prompt ----- # 434 435 config_passwd = ( 436 ask_config_password() 437 if ask_yes_no( 438 "DO_CONFIG_ENCRYPTION", 439 "Do you want to encrypt sensitive config values", 440 True, 441 ) 442 else None 443 ) 444 445 step() 446 ask_currency( 447 "CURRENCY", 448 "Enter the name of the regional currency", 449 default="NETZBON", 450 ) 451 452 step() 453 do_conversion = ask_yes_no( 454 "DO_CONVERSION", 455 "Do you want setup regional currency conversion to fiat currency", 456 True, 457 ) 458 if do_conversion: 459 ask_currency( 460 "FIAT_CURRENCY", 461 "Enter the name of the fiat currency", 462 default="CHF", 463 ) 464 ask_str( 465 "FIAT_BANK_NAME", "Enter the name of your fiat bank", example="POSTFINANCE AG" 466 ) 467 iban = ask_iban( 468 "FIAT_ACCOUNT_IBAN", 469 "Enter the IBAN of your fiat bank account", 470 example="CH7789144474425692816", 471 ) 472 bic = ask_bic( 473 "FIAT_ACCOUNT_BIC", 474 "Enter the BIC of your fiat bank account", 475 example="POFICHBEXXX", 476 ) 477 name = ask_str( 478 "FIAT_ACCOUNT_NAME", "Enter the legal name of your fiat bank account" 479 ) 480 params = urllib.parse.urlencode({"receiver-name": name}) 481 add_conf("CONVERSION_PAYTO", f"payto://iban/{bic}/{iban}?{params}") 482 483 step() 484 do_test = ask_yes_no( 485 "DO_TEST", 486 "Do you want setup a testing deployment", 487 False, 488 ) 489 490 step() 491 bank_name = ask_bank_name( 492 "BANK_NAME", 493 "Enter the human-readable name of the bank", 494 test=do_test, 495 default="Taler Bank" if not do_test else "Taler Test Bank", 496 ) 497 498 step() 499 ask_host("DOMAIN_NAME", "Enter the domain name", example="example.com") 500 501 step() 502 if ask_yes_no("ENABLE_TLS", "Setup TLS using Let's Encrypt?", True): 503 ask_str("TLS_EMAIL", "Enter an email address for Let's Encrypt") 504 505 def ask_tos(): 506 print( 507 "Please read the Terms of Service at https://letsencrypt.org/documents/LE-SA-v1.3-September-21-2022.pdf." 508 ) 509 if not ask_yes_no( 510 None, 511 "You must agree in order to register with the ACME server. Do you agree?", 512 False, 513 ): 514 print("You must agree in order to register with the ACME server") 515 return None 516 else: 517 return "y" 518 519 conf_value("TLS_TOS", ask_tos) 520 add_conf("PROTO", "https") 521 else: 522 add_conf("PROTO", "http") 523 524 add_conf( 525 "DO_OFFLINE", "y" 526 ) # TODO support offline setup again when the documentation is ready 527 528 step() 529 if ask_yes_no( 530 "DO_TELESIGN", 531 "Setup SMS two-factor authentication using Telesign https://www.telesign.com?", 532 True, 533 ): 534 535 def ask_telesign(): 536 customer_id = ask_str(None, "Enter your Telesign Customer ID") 537 api_key = ask_str(None, "Enter your Telesign API Key") 538 phone_number = ask_str( 539 None, "Enter a phone number to test your API key", example="+447911123456" 540 ) 541 auth_token = base64.b64encode(f"{customer_id}:{api_key}".encode()).decode() 542 if not try_cmd( 543 ["libeufin-tan-sms.sh", phone_number], 544 f"T-12345 is your verification code for {bank_name} setup", 545 {**os.environ, "TELESIGN_AUTH_TOKEN": auth_token}, 546 ): 547 print( 548 "Failed to send an SMS using Telesign API, check your credentials and phone number" 549 ) 550 return None 551 code = ask_str(None, f"Enter the code received by {phone_number}") 552 if code != "12345" and code != "T-12345": 553 print( 554 f"Wrong code got '{code}' expected '12345', check your credentials and phone number" 555 ) 556 return None 557 return auth_token 558 559 conf_value("TELESIGN_AUTH_TOKEN", ask_telesign) 560 561 step() 562 generated_password = str(uuid.uuid4()) 563 admin_password = ask_secret( 564 "BANK_ADMIN_PASSWORD", 565 "Enter the admin password for the bank (or press enter to autogenerate password)", 566 config_passwd, 567 generated_password, 568 ) 569 add_conf( 570 "BANK_ADMIN_PASSWORD_GENERATED", 571 "y" if generated_password == admin_password else "n", 572 ) 573 574 step() 575 if ask_yes_no( 576 "DO_EXCHANGE_TERMS", 577 "Do you wish to configure terms of service (ToS) for the exchange?", 578 True, 579 ): 580 ask_terms( 581 "EXCHANGE_TERMS_FILE", 582 "Enter the filename of the ToS. Some available options are:\n", 583 "-tos-", 584 ) 585 586 step() 587 if ask_yes_no( 588 "DO_EXCHANGE_PRIVACY", 589 "Do you wish to configure a privacy policy for the exchange?", 590 True, 591 ): 592 ask_terms( 593 "EXCHANGE_PRIVACY_FILE", 594 "Enter the filename of the privacy policy. Some available options are:\n", 595 "-pp-", 596 ) 597 598 # Update on disk format even if nothing have changed 599 store_conf() 600 601 # ----- Return conf ----- # 602 603 content = "" 604 for key, value in result_conf.items(): 605 escaped = value.replace("'", "'\\''") 606 content += f"export {key}='{escaped}'\n" 607 with os.fdopen(3, "w") as f: 608 f.write(content)