#!/usr/bin/env python3 """Python script to ask questions using an interactive prompt""" import base64 import os import re import subprocess import urllib.parse import uuid from base64 import b64decode, b64encode from typing import Callable, Dict, TypeVar import argon2 from Crypto.Cipher import ChaCha20_Poly1305 from Crypto.Hash import SHA512 from Crypto.Protocol.KDF import PBKDF2 from Crypto.Random import get_random_bytes # Early exit if already loaded if os.environ.get("CONFIG_LOADED") == "y": exit(0) log = open("setup.log", "ab", buffering=0) CONFIG_FILE = "config/user.conf" BIC_PATTERN = re.compile("[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?") IBAN_PATTERN = re.compile("[A-Z]{2}[0-9]{2}[A-Z0-9]{,28}") def load_conf() -> Dict[str, str]: """Load user configuration file""" conf = {} with open(CONFIG_FILE, "r") as f: for kv in f.read().splitlines(): if len(kv) != 0: [k, v] = [part.strip() for part in kv.split("=", 1)] if v.startswith('"') and v.endswith('"'): conf[k] = v.strip('"').replace('\\"', '"') elif v.startswith("'") and v.endswith("'"): conf[k] = v.strip("'").replace("'\\''", "'").replace("\\'", "'") else: conf[k] = v return conf conf = load_conf() result_conf = {**conf, "CONFIG_LOADED": "y"} def add_conf(name: str, value: str): """Update a user configuration value and update the configuration file""" conf[name] = value result_conf[name] = value content = "" for key, value in conf.items(): escaped = value.replace("'", "'\\''") content += f"export {key}='{escaped}'\n" with open(CONFIG_FILE, "w") as f: f.write(content) def run_cmd( cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None ) -> int: """Run a command in a child process and return its exit code""" result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, input=input.encode() if input is not None else None, stdin=subprocess.DEVNULL if input is None else None, env=env, ) log.write(result.stdout) if result.returncode != 0: print(result.stdout.decode("utf-8"), end="") return result.returncode def try_cmd( cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None ) -> bool: """Run a command in a child process and return if successful""" return run_cmd(cmd, input, env) == 0 A = TypeVar("A") T = TypeVar("T") def conf_value( name: str | None, action: Callable[[], str | None], default: T | None = None, check: Callable[[str], T | None] = lambda it: it, fmt: Callable[[T], str] = lambda it: str(it), ) -> T: """ Logic to configure a value :param name: if present will try to fetch the current value and will store the new value :param action: how a value will be obtained :param default: default value to use if no value is given :param check: check and normalize the value :param fmt: format value for storage :return: the configuration value """ value = None # Fetch current value if name is not None: curr = conf.get(name) if curr is not None: # Check the current value and ask again if invalid value = check(curr) # Ask for a new value until we get a valid one while value is None: new = action() # Use default if no value was provided else check the new value value = check(new) if new is not None else default # Store the new value if name is not None: add_conf(name, fmt(value)) return value def ask( name: str | None, msg: str, default: T | None = None, check: Callable[[str], T | None] = lambda it: it, fmt: Callable[[T], str] = lambda it: str(it), ) -> T: """ Prompt the user to configurea value :param name: if present will try to fetch the current value and will store the new value :param msg: the message to prompt the user with :param default: default value to use if no value is obtained :param check: check and normalize the value :param fmt: format value for storage :return: the configuration value """ def do_ask() -> str | None: # Log the prompt log.write(msg.encode() + "\n".encode()) # Actual prompt raw = input(msg).strip() if raw == "": if default is None: print("You must enter a value") return None return raw return conf_value(name, do_ask, default, check, fmt) def ask_str(name: str | None, msg: str, default: str | None = None) -> str: "Prompt the user to configure a string" return ask(name, msg, default) def ask_bic(name: str | None, msg: str, default: str | None = None) -> str: "Prompt the user to configure a BIC" def check_bic(raw: str) -> str | None: raw = raw.translate({ord(i): None for i in " -"}) if not BIC_PATTERN.fullmatch(raw): print("Invalid BIC") return None else: return raw return ask(name, msg, default, check_bic) def ask_iban(name: str | None, msg: str, default: str | None = None) -> str: "Prompt the user to configure a IBAN" def check_iban(raw: str) -> str | None: raw = raw.translate({ord(i): None for i in " -"}) if not IBAN_PATTERN.fullmatch(raw): print("Invalid IBAN") # Checksum check ? return None else: return raw return ask(name, msg, default, check_iban) def ask_currency(name: str, msg: str, default: str | None = None) -> str: "Prompt the user to configure a currency name" def check_currency(currency: str) -> str | None: currency = currency.upper() if not all([c.isascii() and c.isalpha() for c in currency]): print("The currency name must be an ASCII alphabetic string") elif len(currency) < 3 or 11 < len(currency): print("The currency name had to be between 3 and 11 characters long") else: return currency return None return ask(name, msg, default, check_currency) def ask_host(name: str, msg: str, default: str | None = None) -> str: "Prompt the user to configure the installation hostname" def check_host(host: str) -> str | None: success = True for subdomain in ["backend", "bank", "exchange"]: success = try_cmd(["ping", "-c", "1", f"{subdomain}.{host}"]) and success if success: return host else: return None return ask(name, msg, default, check_host) def ask_terms(name: str, msg: str, kind: str) -> str: "Prompt the user to select a ToS/privacy policy" # msg = "9.1. Enter the filename of the ToS. Some available options are:\n" tos_msg = msg # Recollect example ToS files tos_path = "/usr/share/taler/terms" for f in os.listdir(tos_path): tos_file = os.path.join(tos_path, f) if os.path.isfile(tos_file) and f.endswith(".rst") and kind in f: tos_msg += f"- {tos_file}\n" tos_msg += "=> " def check_file(path: str) -> str | None: if not os.path.isfile(path): print("Not a file") # Checksum check ? return None else: return path return ask(name, tos_msg, None, check_file) def ask_yes_no(name: str | None, msg: str, default: bool | None = None) -> bool: "Prompt the user to configure a boolean" def check_yes_no(raw: str) -> bool | None: raw = raw.lower() if raw == "y" or raw == "yes": return True elif raw == "n" or raw == "no": return False else: print("Expected 'y' or 'n'") return None return ask(name, msg, default, check_yes_no, lambda it: "y" if it else "n") # ----- Crypto ----- # def ask_config_password() -> str: "Prompt the user to configure a password stored hashed with argon2id" ph = argon2.PasswordHasher() hash = conf.get("CONFIG_PASSWORD") passwd = None if hash is not None: while True: passwd = ask_str(None, "Enter the config password : ") try: ph.verify(hash, passwd) break except argon2.exceptions.VerifyMismatchError: print("invalid password") else: passwd = ask_str(None, "1.1 Choose a config password : ") if hash is None or ph.check_needs_rehash(hash): add_conf("CONFIG_PASSWORD", ph.hash(passwd)) return passwd def ask_secret( name: str, msg: str, passwd: str | None, default: str | None = None ) -> str: "Prompt the user to configure a string stored encryped using pbkdf2_sha512 and chacha20_poly1305" if passwd is None: return ask_str(name, msg, default) else: raw = conf.get(name) plaintext = None if raw is not None: method = "$pbkdf2_sha512_chacha20_poly1305$1000000$" if raw.startswith(method): salt, nonce, tag, ciphertext = [ b64decode(it) for it in raw.removeprefix(method).split("$", 3) ] key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512) cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce) cipher.update(name.encode()) plaintext = cipher.decrypt_and_verify(ciphertext, tag).decode() else: salt = get_random_bytes(16) key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512) cipher = ChaCha20_Poly1305.new(key=key) cipher.update(name.encode()) ciphertext, tag = cipher.encrypt_and_digest(raw.encode()) add_conf( name, f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}", ) else: plaintext = ask_str(None, msg, default) salt = get_random_bytes(16) key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512) cipher = ChaCha20_Poly1305.new(key=key) cipher.update(name.encode()) ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode()) add_conf( name, f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}", ) result_conf[name] = plaintext return plaintext # ----- Prompt ----- # config_passwd = ( ask_config_password() if ask_yes_no( "DO_CONFIG_ENCRYPTION", "1. Do you want to encrypt sensitive config values (Y/n): ", True, ) else None ) ask_currency( "CURRENCY", "2. Enter the name of the regional currency (e.g. 'NETZBON'): ", "NETZBON", ) do_conversion = ask_yes_no( "DO_CONVERSION", "3. Do you want setup regional currency conversion to fiat currency (Y/n): ", True, ) if do_conversion: ask_currency( "FIAT_CURRENCY", "3.1. Enter the name of the fiat currency (e.g. 'CHF'): ", "CHF", ) ask_str( "FIAT_BANK_NAME", "3.2. Enter the name of your fiat bank (e.g. POSTFINANCE AG): ", ) iban = ask_iban( "FIAT_ACCOUNT_IBAN", "3.3. Enter the IBAN of your fiat bank account (e.g. 'CH7789144474425692816'): ", ) bic = ask_bic( "FIAT_ACCOUNT_BIC", "3.4. Enter the BIC of your fiat bank account (e.g. 'POFICHBEXXX'): ", ) name = ask_str( "FIAT_ACCOUNT_NAME", "3.5. Enter the legal name of your fiat bank account: " ) params = urllib.parse.urlencode({"receiver-name": name}) add_conf("CONVERSION_PAYTO", f"payto://iban/{bic}/{iban}?{params}") bank_name = ask_str( "BANK_NAME", "4. Enter the human-readable name of the bank (e.g. 'Taler Bank'): ", "Taler Bank", ) ask_host("DOMAIN_NAME", "5. Enter the domain name (e.g. 'example.com'): ") if ask_yes_no("ENABLE_TLS", "6. Setup TLS using Let's Encrypt? (Y/n): ", True): ask_str("TLS_EMAIL", "6.1. Enter an email address for Let's Encrypt: ") def ask_tos(): print( "6.2. Please read the Terms of Service at https://letsencrypt.org/documents/LE-SA-v1.3-September-21-2022.pdf." ) if not ask_yes_no( None, "6.2. You must agree in order to register with the ACME server. Do you agree? (y/n): ", False, ): print("You must agree in order to register with the ACME server") return None else: return "y" conf_value("TLS_TOS", ask_tos) add_conf("PROTO", "https") else: add_conf("PROTO", "http") add_conf( "DO_OFFLINE", "y" ) # TODO support offline setup again when the documentation is ready if ask_yes_no( "DO_TELESIGN", "7. Setup SMS two-factor authentication using Telesign https://www.telesign.com? (Y/n): ", True, ): def ask_telesign(): customer_id = ask_str(None, "7.1. Enter your Telesign Customer ID: ") api_key = ask_str(None, "7.2. Enter your Telesign API Key: ") phone_number = ask_str( None, "6.3. Enter a phone number to test your API key (e.g. '+447911123456'): ", ) auth_token = base64.b64encode(f"{customer_id}:{api_key}".encode()).decode() if not try_cmd( ["libeufin-tan-sms.sh", phone_number], f"T-12345 is your verification code for {bank_name} setup", {**os.environ, "AUTH_TOKEN": auth_token}, ): print( "Failed to send an SMS using Telesign API, check your credentials and phone number" ) return None code = ask_str(None, f"7.4. Enter the code received by {phone_number} : ") if code != "12345" and code != "T-12345": print( f"Wrong code got '{code}' expected '12345', check your credentials and phone number" ) return None return auth_token conf_value("TELESIGN_AUTH_TOKEN", ask_telesign) ask_secret( "BANK_ADMIN_PASSWORD", "8. Enter the admin password for the bank (or press enter to autogenerate password): ", config_passwd, str(uuid.uuid4()), ) if ask_yes_no( "DO_EXCHANGE_TERMS", "9. Do you wish to configure terms of service (ToS) for the exchange? (Y/n): ", True, ): ask_terms( "EXCHANGE_TERMS_FILE", "9.1. Enter the filename of the ToS. Some available options are:\n", "-tos-", ) if ask_yes_no( "DO_EXCHANGE_PRIVACY", "10. Do you wish to configure a privacy policy for the exchange? (Y/n): ", True, ): ask_terms( "EXCHANGE_PRIVACY_FILE", "10.1. Enter the filename of the privacy policy. Some available options are:\n", "-pp-", ) # ----- Return conf ----- # content = "" for key, value in result_conf.items(): escaped = value.replace("'", "'\\''") content += f"export {key}='{escaped}'\n" with os.fdopen(3, "w") as f: f.write(content)