diff options
Diffstat (limited to 'regional-currency/config.py')
-rwxr-xr-x | regional-currency/config.py | 476 |
1 files changed, 476 insertions, 0 deletions
diff --git a/regional-currency/config.py b/regional-currency/config.py new file mode 100755 index 0000000..d00ff51 --- /dev/null +++ b/regional-currency/config.py @@ -0,0 +1,476 @@ +#!/usr/bin/env python3 +"""Python script to ask questions using an interactive prompt""" + +import base64 +import os +import re +import subprocess +import urllib.parse +import uuid +import argon2 +from base64 import b64decode, b64encode +from typing import Callable, Dict, TypeVar +from Crypto.Cipher import ChaCha20_Poly1305 +from Crypto.Hash import SHA512 +from Crypto.Protocol.KDF import PBKDF2 +from Crypto.Random import get_random_bytes + +# Early exit if already loaded +if os.environ.get("CONFIG_LOADED") == "y": + exit(0) + +log = open("setup.log", "ab", buffering=0) +CONFIG_FILE = "config/user.conf" +BIC_PATTERN = re.compile("[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?") +IBAN_PATTERN = re.compile("[A-Z]{2}[0-9]{2}[A-Z0-9]{,28}") + + +def load_conf() -> Dict[str, str]: + """Load user configuration file""" + conf = {} + with open(CONFIG_FILE, "r") as f: + for kv in f.read().splitlines(): + if len(kv) != 0: + [k, v] = [part.strip() for part in kv.split("=", 1)] + if v.startswith('"') and v.endswith('"'): + conf[k] = v.strip('"').replace('\\"', '"') + elif v.startswith("'") and v.endswith("'"): + conf[k] = v.strip("'").replace("\\'", "'") + else: + conf[k] = v + return conf + + +conf = load_conf() +result_conf = { + **conf, + "CONFIG_LOADED": "y" +} + + +def add_conf(name: str, value: str): + """Update a user configuration value and update the configuration file""" + conf[name] = value + result_conf[name] = value + content = "" + for key, value in conf.items(): + escaped = value.replace("'", "\\'") + content += f'{key}=\'{escaped}\'\n' + with open(CONFIG_FILE, "w") as f: + f.write(content) + + +def run_cmd( + cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None +) -> int: + """Run a command in a child process and return its exit code""" + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + input=input.encode() if input is not None else None, + stdin=subprocess.DEVNULL if input is None else None, + env=env, + ) + log.write(result.stdout) + if result.returncode != 0: + print(result.stdout.decode("utf-8"), end="") + return result.returncode + + +def try_cmd( + cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None +) -> bool: + """Run a command in a child process and return if successful""" + return run_cmd(cmd, input, env) == 0 + + +A = TypeVar("A") +T = TypeVar("T") + + +def conf_value( + name: str | None, + action: Callable[[], str | None], + default: T | None = None, + check: Callable[[str], T | None] = lambda it: it, + fmt: Callable[[T], str] = lambda it: str(it), +) -> T: + """ + Logic to configure a value + + :param name: if present will try to fetch the current value and will store the new value + :param action: how a value will be obtained + :param default: default value to use if no value is given + :param check: check and normalize the value + :param fmt: format value for storage + :return: the configuration value + """ + value = None + + # Fetch current value + if name is not None: + curr = conf.get(name) + if curr is not None: + # Check the current value and ask again if invalid + value = check(curr) + + # Ask for a new value until we get a valid one + while value is None: + new = action() + # Use default if no value was provided else check the new value + value = check(new) if new is not None else default + + # Store the new value + if name is not None: + add_conf(name, fmt(value)) + + return value + + +def ask( + name: str | None, + msg: str, + default: T | None = None, + check: Callable[[str], T | None] = lambda it: it, + fmt: Callable[[T], str] = lambda it: str(it), +) -> T: + """ + Prompt the user to configurea value + :param name: if present will try to fetch the current value and will store the new value + :param msg: the message to prompt the user with + :param default: default value to use if no value is obtained + :param check: check and normalize the value + :param fmt: format value for storage + :return: the configuration value + """ + + def do_ask() -> str | None: + # Log the prompt + log.write(msg.encode() + "\n".encode()) + # Actual prompt + raw = input(msg).strip() + if raw == "": + if default is None: + print("You must enter a value") + return None + return raw + + return conf_value(name, do_ask, default, check, fmt) + + +def ask_str(name: str | None, msg: str, default: str | None = None) -> str: + "Prompt the user to configure a string" + return ask(name, msg, default) + + +def ask_bic(name: str | None, msg: str, default: str | None = None) -> str: + "Prompt the user to configure a BIC" + + def check_bic(raw: str) -> str | None: + raw = raw.translate({ord(i): None for i in " -"}) + if not BIC_PATTERN.fullmatch(raw): + print("Invalid BIC") + return None + else: + return raw + + return ask(name, msg, default, check_bic) + + +def ask_iban(name: str | None, msg: str, default: str | None = None) -> str: + "Prompt the user to configure a IBAN" + + def check_iban(raw: str) -> str | None: + raw = raw.translate({ord(i): None for i in " -"}) + if not IBAN_PATTERN.fullmatch(raw): + print("Invalid IBAN") # Checksum check ? + return None + else: + return raw + + return ask(name, msg, default, check_iban) + + +def ask_currency(name: str, msg: str, default: str | None = None) -> str: + "Prompt the user to configure a currency name" + + def check_currency(currency: str) -> str | None: + currency = currency.upper() + if not all([c.isascii() and c.isalpha() for c in currency]): + print("The currency name must be an ASCII alphabetic string") + elif len(currency) < 3 or 11 < len(currency): + print("The currency name had to be between 3 and 11 characters long") + else: + return currency + return None + + return ask(name, msg, default, check_currency) + + +def ask_host(name: str, msg: str, default: str | None = None) -> str: + "Prompt the user to configure the installation hostname" + + def check_host(host: str) -> str | None: + success = True + for subdomain in ["backend", "bank", "exchange"]: + success = try_cmd(["ping", "-c", "1", f"{subdomain}.{host}"]) and success + if success: + return host + else: + return None + + return ask(name, msg, default, check_host) + + +def ask_terms(name: str, msg: str, kind: str) -> str: + "Prompt the user to select a ToS/privacy policy" + + # msg = "9.1. Enter the filename of the ToS. Some available options are:\n" + tos_msg = msg + + # Recollect example ToS files + tos_path = "/usr/share/taler/terms" + for f in os.listdir(tos_path): + tos_file = os.path.join(tos_path, f) + if os.path.isfile(tos_file) and f.endswith(".rst") and kind in f: + tos_msg += f"- {tos_file}\n" + + tos_msg += "=> " + + def check_file(path: str) -> str | None: + if not os.path.isfile(path): + print("Not a file") # Checksum check ? + return None + else: + return path + + return ask(name, tos_msg, None, check_file) + + +def ask_yes_no(name: str | None, msg: str, default: bool | None = None) -> bool: + "Prompt the user to configure a boolean" + + def check_yes_no(raw: str) -> bool | None: + raw = raw.lower() + if raw == "y" or raw == "yes": + return True + elif raw == "n" or raw == "no": + return False + else: + print("Expected 'y' or 'n'") + return None + + return ask(name, msg, default, check_yes_no, lambda it: "y" if it else "n") + + +# ----- Crypto ----- # + + +def ask_config_password() -> str: + "Prompt the user to configure a password stored hashed with argon2id" + ph = argon2.PasswordHasher() + hash = conf.get("CONFIG_PASSWORD") + passwd = None + if hash is not None: + while True: + passwd = ask_str(None, "Enter the config password : ") + try: + ph.verify(hash, passwd) + break + except argon2.exceptions.VerifyMismatchError: + print("invalid password") + else: + passwd = ask_str(None, "1.1 Choose a config password : ") + + if hash is None or ph.check_needs_rehash(hash): + add_conf("CONFIG_PASSWORD", ph.hash(passwd)) + + return passwd + + +def ask_secret( + name: str, msg: str, passwd: str | None, default: str | None = None +) -> str: + "Prompt the user to configure a string stored encryped using pbkdf2_sha512 and chacha20_poly1305" + if passwd is None: + return ask_str(name, msg, default) + else: + raw = conf.get(name) + plaintext = None + if raw is not None: + method = "$pbkdf2_sha512_chacha20_poly1305$1000000$" + if raw.startswith(method): + salt, nonce, tag, ciphertext = [ + b64decode(it) for it in raw.removeprefix(method).split("$", 3) + ] + key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512) + cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce) + cipher.update(name.encode()) + plaintext = cipher.decrypt_and_verify(ciphertext, tag).decode() + else: + salt = get_random_bytes(16) + key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512) + cipher = ChaCha20_Poly1305.new(key=key) + cipher.update(name.encode()) + ciphertext, tag = cipher.encrypt_and_digest(raw.encode()) + add_conf( + name, + f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}", + ) + else: + plaintext = ask_str(None, msg, default) + salt = get_random_bytes(16) + key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512) + cipher = ChaCha20_Poly1305.new(key=key) + cipher.update(name.encode()) + ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode()) + add_conf( + name, + f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}", + ) + result_conf[name] = plaintext + return plaintext + + +# ----- Prompt ----- # + +config_passwd = ( + ask_config_password() + if ask_yes_no( + "DO_CONFIG_ENCRYPTION", + "1. Do you want to encrypt sensitive config values (Y/n): ", + True, + ) + else None +) +ask_currency( + "CURRENCY", + "2. Enter the name of the regional currency (e.g. 'NETZBON'): ", + "NETZBON", +) +do_conversion = ask_yes_no( + "DO_CONVERSION", + "3. Do you want setup regional currency conversion to fiat currency (Y/n): ", + True, +) +if do_conversion: + ask_currency( + "FIAT_CURRENCY", + "3.1. Enter the name of the fiat currency (e.g. 'CHF'): ", + "CHF", + ) + iban = ask_iban( + "FIAT_ACCOUNT_IBAN", + "3.2. Enter the IBAN of your fiat bank account (e.g. 'CH7789144474425692816'): ", + ) + bic = ask_bic( + "FIAT_ACCOUNT_BIC", + "3.3. Enter the BIC of your fiat bank account (e.g. 'POFICHBEXXX'): ", + ) + name = ask_str( + "FIAT_ACCOUNT_NAME", "3.4. Enter the legal name of your fiat bank account: " + ) + params = urllib.parse.urlencode({"receiver-name": name}) + add_conf("CONVERSION_PAYTO", f"payto://iban/{bic}/{iban}?{params}") +bank_name = ask_str( + "BANK_NAME", + "4. Enter the human-readable name of the bank (e.g. 'Taler Bank'): ", + "Taler Bank", +) +ask_host("DOMAIN_NAME", "5. Enter the domain name (e.g. 'example.com'): ") +if ask_yes_no("ENABLE_TLS", "6. Setup TLS using Let's Encrypt? (Y/n): ", True): + ask_str("TLS_EMAIL", "6.1. Enter an email address for Let's Encrypt: ") + + def ask_tos(): + print( + "6.2. Please read the Terms of Service at https://letsencrypt.org/documents/LE-SA-v1.3-September-21-2022.pdf." + ) + if not ask_yes_no( + None, + "6.2. You must agree in order to register with the ACME server. Do you agree? (y/n): ", + False, + ): + print("You must agree in order to register with the ACME server") + return None + else: + return "y" + + conf_value("TLS_TOS", ask_tos) + add_conf("PROTO", "https") +else: + add_conf("PROTO", "http") + +add_conf( + "DO_OFFLINE", "y" +) # TODO support offline setup again when the documentation is ready + +if ask_yes_no( + "DO_TELESIGN", + "7. Setup SMS two-factor authentication using Telesign https://www.telesign.com? (Y/n): ", + True, +): + + def ask_telesign(): + customer_id = ask_str(None, "7.1. Enter your Telesign Customer ID: ") + api_key = ask_str(None, "7.2. Enter your Telesign API Key: ") + phone_number = ask_str( + None, + "6.3. Enter a phone number to test your API key (e.g. '+447911123456'): ", + ) + auth_token = base64.b64encode(f"{customer_id}:{api_key}".encode()).decode() + if not try_cmd( + ["libeufin-tan-sms.sh", phone_number], + f"T-12345 is your verification code for {bank_name} setup", + {**os.environ, "AUTH_TOKEN": auth_token}, + ): + print( + "Failed to send an SMS using Telesign API, check your credentials and phone number" + ) + return None + code = ask_str(None, f"7.4. Enter the code received by {phone_number} : ") + if code != "12345" and code != "T-12345": + print( + f"Wrong code got '{code}' expected '12345', check your credentials and phone number" + ) + return None + return auth_token + + conf_value("TELESIGN_AUTH_TOKEN", ask_telesign) +ask_secret( + "BANK_ADMIN_PASSWORD", + "8. Enter the admin password for the bank (or press enter to autogenerate password): ", + config_passwd, + str(uuid.uuid4()), +) + +if ask_yes_no( + "DO_EXCHANGE_TERMS", + "9. Do you wish to configure terms of service (ToS) for the exchange? (Y/n): ", + True, +): + ask_terms( + "EXCHANGE_TERMS_FILE", + "9.1. Enter the filename of the ToS. Some available options are:\n", + "-tos-", + ) + +if ask_yes_no( + "DO_EXCHANGE_PRIVACY", + "10. Do you wish to configure a privacy policy for the exchange? (Y/n): ", + True, +): + ask_terms( + "EXCHANGE_PRIVACY_FILE", + "10.1. Enter the filename of the privacy policy. Some available options are:\n", + "-pp-", + ) + +# ----- Return conf ----- # + +content = "" +for key, value in result_conf.items(): + escaped = value.replace('\'', '\\\'') + content += f'export {key}=\'{escaped}\'\n' +with os.fdopen(3, "w") as f: + f.write(content) |