summaryrefslogtreecommitdiff
path: root/regional-currency/config.py
diff options
context:
space:
mode:
Diffstat (limited to 'regional-currency/config.py')
-rwxr-xr-xregional-currency/config.py491
1 files changed, 491 insertions, 0 deletions
diff --git a/regional-currency/config.py b/regional-currency/config.py
new file mode 100755
index 0000000..e382927
--- /dev/null
+++ b/regional-currency/config.py
@@ -0,0 +1,491 @@
+#!/usr/bin/env python3
+"""Python script to ask questions using an interactive prompt"""
+
+import base64
+import os
+import re
+import subprocess
+import urllib.parse
+import uuid
+import getpass
+from base64 import b64decode, b64encode
+from typing import Callable, Dict, TypeVar
+
+import argon2
+from Crypto.Cipher import ChaCha20_Poly1305
+from Crypto.Hash import SHA512
+from Crypto.Protocol.KDF import PBKDF2
+from Crypto.Random import get_random_bytes
+
+# Early exit if already loaded
+if os.environ.get("CONFIG_LOADED") == "y":
+ exit(0)
+
+log = open("setup.log", "ab", buffering=0)
+CONFIG_FILE = "config/user.conf"
+BIC_PATTERN = re.compile("[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?")
+IBAN_PATTERN = re.compile("[A-Z]{2}[0-9]{2}[A-Z0-9]{,28}")
+
+
+def load_conf() -> Dict[str, str]:
+ """Load user configuration file"""
+ conf = {}
+ with open(CONFIG_FILE, "r") as f:
+ for kv in f.read().splitlines():
+ if len(kv) != 0:
+ [k, v] = [part.strip() for part in kv.split("=", 1)]
+ if v.startswith('"') and v.endswith('"'):
+ conf[k] = v.strip('"').replace('\\"', '"')
+ elif v.startswith("'") and v.endswith("'"):
+ conf[k] = v.strip("'").replace("'\\''", "'").replace("\\'", "'")
+ else:
+ conf[k] = v
+ return conf
+
+
+conf = load_conf()
+result_conf = {**conf, "CONFIG_LOADED": "y"}
+
+def store_conf():
+ """Update the configuration file"""
+ content = ""
+ for key, value in conf.items():
+ escaped = value.replace("'", "'\\''")
+ content += f"{key}='{escaped}'\n"
+ with open(CONFIG_FILE, "w") as f:
+ f.write(content)
+
+def add_conf(name: str, value: str):
+ """Update a user configuration value and update the configuration file"""
+ conf[name] = value
+ result_conf[name] = value
+ store_conf()
+
+def run_cmd(
+ cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None
+) -> int:
+ """Run a command in a child process and return its exit code"""
+ result = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ input=input.encode() if input is not None else None,
+ stdin=subprocess.DEVNULL if input is None else None,
+ env=env,
+ )
+ log.write(result.stdout)
+ if result.returncode != 0:
+ print(result.stdout.decode("utf-8"), end="")
+ return result.returncode
+
+
+def try_cmd(
+ cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None
+) -> bool:
+ """Run a command in a child process and return if successful"""
+ return run_cmd(cmd, input, env) == 0
+
+
+A = TypeVar("A")
+T = TypeVar("T")
+
+
+def conf_value(
+ name: str | None,
+ action: Callable[[], str | None],
+ default: T | None = None,
+ check: Callable[[str], T | None] = lambda it: it,
+ fmt: Callable[[T], str] = lambda it: str(it),
+) -> T:
+ """
+ Logic to configure a value
+
+ :param name: if present will try to fetch the current value and will store the new value
+ :param action: how a value will be obtained
+ :param default: default value to use if no value is given
+ :param check: check and normalize the value
+ :param fmt: format value for storage
+ :return: the configuration value
+ """
+ value = None
+
+ # Fetch current value
+ if name is not None:
+ curr = conf.get(name)
+ if curr is not None:
+ # Check the current value and ask again if invalid
+ value = check(curr)
+
+ # Ask for a new value until we get a valid one
+ while value is None:
+ new = action()
+ # Use default if no value was provided else check the new value
+ value = check(new) if new is not None else default
+
+ # Store the new value
+ if name is not None:
+ add_conf(name, fmt(value))
+
+ return value
+
+
+def ask(
+ name: str | None,
+ msg: str,
+ default: T | None = None,
+ check: Callable[[str], T | None] = lambda it: it,
+ fmt: Callable[[T], str] = lambda it: str(it),
+ secret: bool = False
+) -> T:
+ """
+ Prompt the user to configure a value
+ :param name: if present will try to fetch the current value and will store the new value
+ :param msg: the message to prompt the user with
+ :param default: default value to use if no value is obtained
+ :param check: check and normalize the value
+ :param fmt: format value for storage
+ :param secret: hide the input content
+ :return: the configuration value
+ """
+
+ def do_ask() -> str | None:
+ # Log the prompt
+ log.write(msg.encode() + "\n".encode())
+ # Actual prompt
+ if secret:
+ raw = getpass.getpass(msg).strip()
+ else:
+ raw = input(msg).strip()
+ if raw == "":
+ if default is None:
+ print("You must enter a value")
+ return None
+ return raw
+
+ return conf_value(name, do_ask, default, check, fmt)
+
+
+def ask_str(name: str | None, msg: str, default: str | None = None, secret: bool = False) -> str:
+ "Prompt the user to configure a string"
+ return ask(name, msg, default, secret=secret)
+
+
+def ask_bic(name: str | None, msg: str, default: str | None = None) -> str:
+ "Prompt the user to configure a BIC"
+
+ def check_bic(raw: str) -> str | None:
+ raw = raw.translate({ord(i): None for i in " -"})
+ if not BIC_PATTERN.fullmatch(raw):
+ print("Invalid BIC")
+ return None
+ else:
+ return raw
+
+ return ask(name, msg, default, check_bic)
+
+
+def ask_iban(name: str | None, msg: str, default: str | None = None) -> str:
+ "Prompt the user to configure a IBAN"
+
+ def check_iban(raw: str) -> str | None:
+ raw = raw.translate({ord(i): None for i in " -"})
+ if not IBAN_PATTERN.fullmatch(raw):
+ print("Invalid IBAN") # Checksum check ?
+ return None
+ else:
+ return raw
+
+ return ask(name, msg, default, check_iban)
+
+
+def ask_currency(name: str, msg: str, default: str | None = None) -> str:
+ "Prompt the user to configure a currency name"
+
+ def check_currency(currency: str) -> str | None:
+ currency = currency.upper()
+ if not all([c.isascii() and c.isalpha() for c in currency]):
+ print("The currency name must be an ASCII alphabetic string")
+ elif len(currency) < 3 or 11 < len(currency):
+ print("The currency name had to be between 3 and 11 characters long")
+ else:
+ return currency
+ return None
+
+ return ask(name, msg, default, check_currency)
+
+
+def ask_host(name: str, msg: str, default: str | None = None) -> str:
+ "Prompt the user to configure the installation hostname"
+
+ def check_host(host: str) -> str | None:
+ success = True
+ for subdomain in ["backend", "bank", "exchange"]:
+ success = try_cmd(["ping", "-c", "1", f"{subdomain}.{host}"]) and success
+ if success:
+ return host
+ else:
+ return None
+
+ return ask(name, msg, default, check_host)
+
+
+def ask_terms(name: str, msg: str, kind: str) -> str:
+ "Prompt the user to select a ToS/privacy policy"
+
+ # msg = "9.1. Enter the filename of the ToS. Some available options are:\n"
+ tos_msg = msg
+
+ # Recollect example ToS files
+ tos_path = "/usr/share/taler/terms"
+ for f in os.listdir(tos_path):
+ tos_file = os.path.join(tos_path, f)
+ if os.path.isfile(tos_file) and f.endswith(".rst") and kind in f:
+ tos_msg += f"- {tos_file}\n"
+
+ tos_msg += "=> "
+
+ def check_file(path: str) -> str | None:
+ if not os.path.isfile(path):
+ print("Not a file") # Checksum check ?
+ return None
+ else:
+ return path
+
+ return ask(name, tos_msg, None, check_file)
+
+
+def ask_yes_no(name: str | None, msg: str, default: bool | None = None) -> bool:
+ "Prompt the user to configure a boolean"
+
+ def check_yes_no(raw: str) -> bool | None:
+ raw = raw.lower()
+ if raw == "y" or raw == "yes":
+ return True
+ elif raw == "n" or raw == "no":
+ return False
+ else:
+ print("Expected 'y' or 'n'")
+ return None
+
+ return ask(name, msg, default, check_yes_no, lambda it: "y" if it else "n")
+
+
+# ----- Crypto ----- #
+
+
+def ask_config_password() -> str:
+ "Prompt the user to configure a password stored hashed with argon2id"
+ ph = argon2.PasswordHasher()
+ hash = conf.get("CONFIG_PASSWORD")
+ passwd = None
+ if hash is not None:
+ while True:
+ passwd = ask_str(None, "Enter the config password : ", secret=True)
+ try:
+ ph.verify(hash, passwd)
+ break
+ except argon2.exceptions.VerifyMismatchError:
+ print("invalid password")
+ else:
+ passwd = ask_str(None, "1.1 Choose a config password : ", secret=True)
+
+ if hash is None or ph.check_needs_rehash(hash):
+ add_conf("CONFIG_PASSWORD", ph.hash(passwd))
+
+ return passwd
+
+
+def ask_secret(
+ name: str, msg: str, passwd: str | None, default: str | None = None
+) -> str:
+ "Prompt the user to configure a string stored encryped using pbkdf2_sha512 and chacha20_poly1305"
+ if passwd is None:
+ return ask_str(name, msg, default)
+ else:
+ raw = conf.get(name)
+ plaintext = None
+ if raw is not None:
+ method = "$pbkdf2_sha512_chacha20_poly1305$1000000$"
+ if raw.startswith(method):
+ salt, nonce, tag, ciphertext = [
+ b64decode(it) for it in raw.removeprefix(method).split("$", 3)
+ ]
+ key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512)
+ cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
+ cipher.update(name.encode())
+ plaintext = cipher.decrypt_and_verify(ciphertext, tag).decode()
+ else:
+ salt = get_random_bytes(16)
+ key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512)
+ cipher = ChaCha20_Poly1305.new(key=key)
+ cipher.update(name.encode())
+ ciphertext, tag = cipher.encrypt_and_digest(raw.encode())
+ add_conf(
+ name,
+ f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}",
+ )
+ else:
+ plaintext = ask_str(None, msg, default, True)
+ salt = get_random_bytes(16)
+ key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512)
+ cipher = ChaCha20_Poly1305.new(key=key)
+ cipher.update(name.encode())
+ ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode())
+ add_conf(
+ name,
+ f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}",
+ )
+ result_conf[name] = plaintext
+ return plaintext
+
+
+# ----- Prompt ----- #
+
+config_passwd = (
+ ask_config_password()
+ if ask_yes_no(
+ "DO_CONFIG_ENCRYPTION",
+ "1. Do you want to encrypt sensitive config values (Y/n): ",
+ True,
+ )
+ else None
+)
+ask_currency(
+ "CURRENCY",
+ "2. Enter the name of the regional currency (e.g. 'NETZBON'): ",
+ "NETZBON",
+)
+do_conversion = ask_yes_no(
+ "DO_CONVERSION",
+ "3. Do you want setup regional currency conversion to fiat currency (Y/n): ",
+ True,
+)
+if do_conversion:
+ ask_currency(
+ "FIAT_CURRENCY",
+ "3.1. Enter the name of the fiat currency (e.g. 'CHF'): ",
+ "CHF",
+ )
+ ask_str(
+ "FIAT_BANK_NAME",
+ "3.2. Enter the name of your fiat bank (e.g. POSTFINANCE AG): ",
+ )
+ iban = ask_iban(
+ "FIAT_ACCOUNT_IBAN",
+ "3.3. Enter the IBAN of your fiat bank account (e.g. 'CH7789144474425692816'): ",
+ )
+ bic = ask_bic(
+ "FIAT_ACCOUNT_BIC",
+ "3.4. Enter the BIC of your fiat bank account (e.g. 'POFICHBEXXX'): ",
+ )
+ name = ask_str(
+ "FIAT_ACCOUNT_NAME", "3.5. Enter the legal name of your fiat bank account: "
+ )
+ params = urllib.parse.urlencode({"receiver-name": name})
+ add_conf("CONVERSION_PAYTO", f"payto://iban/{bic}/{iban}?{params}")
+bank_name = ask_str(
+ "BANK_NAME",
+ "4. Enter the human-readable name of the bank (e.g. 'Taler Bank'): ",
+ "Taler Bank",
+)
+ask_host("DOMAIN_NAME", "5. Enter the domain name (e.g. 'example.com'): ")
+if ask_yes_no("ENABLE_TLS", "6. Setup TLS using Let's Encrypt? (Y/n): ", True):
+ ask_str("TLS_EMAIL", "6.1. Enter an email address for Let's Encrypt: ")
+
+ def ask_tos():
+ print(
+ "6.2. Please read the Terms of Service at https://letsencrypt.org/documents/LE-SA-v1.3-September-21-2022.pdf."
+ )
+ if not ask_yes_no(
+ None,
+ "6.2. You must agree in order to register with the ACME server. Do you agree? (y/n): ",
+ False,
+ ):
+ print("You must agree in order to register with the ACME server")
+ return None
+ else:
+ return "y"
+
+ conf_value("TLS_TOS", ask_tos)
+ add_conf("PROTO", "https")
+else:
+ add_conf("PROTO", "http")
+
+add_conf(
+ "DO_OFFLINE", "y"
+) # TODO support offline setup again when the documentation is ready
+
+if ask_yes_no(
+ "DO_TELESIGN",
+ "7. Setup SMS two-factor authentication using Telesign https://www.telesign.com? (Y/n): ",
+ True,
+):
+
+ def ask_telesign():
+ customer_id = ask_str(None, "7.1. Enter your Telesign Customer ID: ")
+ api_key = ask_str(None, "7.2. Enter your Telesign API Key: ")
+ phone_number = ask_str(
+ None,
+ "6.3. Enter a phone number to test your API key (e.g. '+447911123456'): ",
+ )
+ auth_token = base64.b64encode(f"{customer_id}:{api_key}".encode()).decode()
+ if not try_cmd(
+ ["libeufin-tan-sms.sh", phone_number],
+ f"T-12345 is your verification code for {bank_name} setup",
+ {**os.environ, "AUTH_TOKEN": auth_token},
+ ):
+ print(
+ "Failed to send an SMS using Telesign API, check your credentials and phone number"
+ )
+ return None
+ code = ask_str(None, f"7.4. Enter the code received by {phone_number} : ")
+ if code != "12345" and code != "T-12345":
+ print(
+ f"Wrong code got '{code}' expected '12345', check your credentials and phone number"
+ )
+ return None
+ return auth_token
+
+ conf_value("TELESIGN_AUTH_TOKEN", ask_telesign)
+generated_password= str(uuid.uuid4())
+admin_password = ask_secret(
+ "BANK_ADMIN_PASSWORD",
+ "8. Enter the admin password for the bank (or press enter to autogenerate password): ",
+ config_passwd,
+ generated_password,
+)
+add_conf("BANK_ADMIN_PASSWORD_GENERATED", "y" if generated_password==admin_password else "n")
+
+if ask_yes_no(
+ "DO_EXCHANGE_TERMS",
+ "9. Do you wish to configure terms of service (ToS) for the exchange? (Y/n): ",
+ True,
+):
+ ask_terms(
+ "EXCHANGE_TERMS_FILE",
+ "9.1. Enter the filename of the ToS. Some available options are:\n",
+ "-tos-",
+ )
+
+if ask_yes_no(
+ "DO_EXCHANGE_PRIVACY",
+ "10. Do you wish to configure a privacy policy for the exchange? (Y/n): ",
+ True,
+):
+ ask_terms(
+ "EXCHANGE_PRIVACY_FILE",
+ "10.1. Enter the filename of the privacy policy. Some available options are:\n",
+ "-pp-",
+ )
+
+# Update on disk format even if nothing have changed
+store_conf()
+
+# ----- Return conf ----- #
+
+content = ""
+for key, value in result_conf.items():
+ escaped = value.replace("'", "'\\''")
+ content += f"export {key}='{escaped}'\n"
+with os.fdopen(3, "w") as f:
+ f.write(content)