taler-deployment

Deployment scripts and configuration files
Log | Files | Refs | README

config.py (18024B)


      1 #!/usr/bin/env python3
      2 """Python script to ask questions using an interactive prompt"""
      3 
      4 import base64
      5 import getpass
      6 import os
      7 import re
      8 import subprocess
      9 import urllib.parse
     10 import uuid
     11 from base64 import b64decode
     12 from typing import Callable, Dict, TypeVar
     13 
     14 import argon2
     15 from Crypto.Cipher import ChaCha20_Poly1305
     16 from Crypto.Hash import SHA512
     17 from Crypto.Protocol.KDF import PBKDF2
     18 from Crypto.Random import get_random_bytes
     19 
     20 # Early exit if already loaded
     21 if os.environ.get("CONFIG_LOADED") == "y":
     22     exit(0)
     23 
     24 log = open("setup.log", "ab", buffering=0)
     25 CONFIG_FILE = "config/user.conf"
     26 BIC_PATTERN = re.compile("[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?")
     27 IBAN_PATTERN = re.compile("[A-Z]{2}[0-9]{2}[A-Z0-9]{,28}")
     28 
     29 
     30 def load_conf() -> Dict[str, str]:
     31     """Load user configuration file"""
     32     conf = {}
     33     with open(CONFIG_FILE, "r") as f:
     34         for kv in f.read().splitlines():
     35             if len(kv) != 0:
     36                 [k, v] = [part.strip() for part in kv.split("=", 1)]
     37                 if v.startswith('"') and v.endswith('"'):
     38                     conf[k] = v.strip('"').replace('\\"', '"')
     39                 elif v.startswith("'") and v.endswith("'"):
     40                     conf[k] = v.strip("'").replace("'\\''", "'").replace("\\'", "'")
     41                 else:
     42                     conf[k] = v
     43     return conf
     44 
     45 
     46 conf = load_conf()
     47 result_conf = {**conf, "CONFIG_LOADED": "y"}
     48 
     49 
     50 def store_conf():
     51     """Update the configuration file"""
     52     content = ""
     53     for key, value in conf.items():
     54         escaped = value.replace("'", "'\\''")
     55         content += f"{key}='{escaped}'\n"
     56     with open(CONFIG_FILE, "w") as f:
     57         f.write(content)
     58 
     59 
     60 def add_conf(name: str, value: str):
     61     """Update a user configuration value and update the configuration file"""
     62     conf[name] = value
     63     result_conf[name] = value
     64     store_conf()
     65 
     66 
     67 def run_cmd(
     68     cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None
     69 ) -> int:
     70     """Run a command in a child process and return its exit code"""
     71     result = subprocess.run(
     72         cmd,
     73         stdout=subprocess.PIPE,
     74         stderr=subprocess.STDOUT,
     75         input=input.encode() if input is not None else None,
     76         stdin=subprocess.DEVNULL if input is None else None,
     77         env=env,
     78     )
     79     log.write(result.stdout)
     80     if result.returncode != 0:
     81         print(result.stdout.decode("utf-8"), end="")
     82     return result.returncode
     83 
     84 
     85 def try_cmd(
     86     cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None
     87 ) -> bool:
     88     """Run a command in a child process and return if successful"""
     89     return run_cmd(cmd, input, env) == 0
     90 
     91 
     92 A = TypeVar("A")
     93 T = TypeVar("T")
     94 
     95 
     96 def conf_value(
     97     name: str | None,
     98     action: Callable[[], str | None],
     99     default: T | None = None,
    100     check: Callable[[str], T | None] = lambda it: it,
    101     fmt: Callable[[T], str] = lambda it: str(it),
    102 ) -> T:
    103     """
    104     Logic to configure a value
    105 
    106     :param name: if present will try to fetch the current value and will store the new value
    107     :param action: how a value will be obtained
    108     :param default: default value to use if no value is given
    109     :param check: check and normalize the value
    110     :param fmt: format value for storage
    111     :return: the configuration value
    112     """
    113     value = None
    114 
    115     # Fetch current value
    116     if name is not None:
    117         curr = conf.get(name)
    118         if curr is not None:
    119             # Check the current value and ask again if invalid
    120             value = check(curr)
    121 
    122     # Ask for a new value until we get a valid one
    123     while value is None:
    124         new = action()
    125         # Use default if no value was provided else check the new value
    126         value = check(new) if new is not None else default
    127 
    128     # Store the new value
    129     if name is not None:
    130         add_conf(name, fmt(value))
    131 
    132     return value
    133 
    134 
    135 progress = (1, 0)
    136 
    137 
    138 def step():
    139     global progress
    140     (step, _) = progress
    141     progress = (step + 1, 0)
    142 
    143 
    144 def substep():
    145     global progress
    146     (step, substep) = progress
    147     progress = (step, substep + 1)
    148 
    149 
    150 def fmt_progress():
    151     global progress
    152     (step, substep) = progress
    153     if substep == 0:
    154         return f"{step}."
    155     else:
    156         return f"{step}.{substep}."
    157 
    158 
    159 def ask(
    160     name: str | None,
    161     msg: str,
    162     default: T | None = None,
    163     example: T | None = None,
    164     check: Callable[[str], T | None] = lambda it: it,
    165     fmt: Callable[[T], str] = lambda it: str(it),
    166     fmt_hint: Callable[[], str] | None = None,
    167     secret: bool = False,
    168 ) -> T:
    169     """
    170     Prompt the user to configure a value
    171     :param name: if present will try to fetch the current value and will store the new value
    172     :param msg: the message to prompt the user with
    173     :param default: default value to use if no value is obtained
    174     :param example: an example to use as a user hint
    175     :param check: check and normalize the value
    176     :param fmt: format value for storage
    177     :param fmt_hint: format value for input hint
    178     :param secret: hide the input content
    179     :return: the configuration value
    180     """
    181 
    182     if fmt_hint is None:
    183         if default is not None and not secret:
    184             hint = f" (def. '{default}')"
    185         elif example is not None:
    186             hint = f" (e.g. '{example}')"
    187         else:
    188             hint = ""
    189     else:
    190         hint = fmt_hint()
    191 
    192     end = ": " if "\n" not in msg else ""
    193 
    194     fmt_msg = f"{fmt_progress()} {msg}{hint}{end}"
    195     substep()
    196 
    197     def do_ask() -> str | None:
    198         # Log the prompt
    199         log.write(fmt_msg.encode() + "\n".encode())
    200         # Actual prompt
    201         if secret:
    202             raw = getpass.getpass(fmt_msg).strip()
    203         else:
    204             print(fmt_msg, end="")
    205             raw = input().strip()
    206         if raw == "":
    207             if default is None:
    208                 print("You must enter a value")
    209             return None
    210         return raw
    211 
    212     return conf_value(name, do_ask, default, check, fmt)
    213 
    214 
    215 def ask_str(
    216     name: str | None,
    217     msg: str,
    218     default: str | None = None,
    219     example: str | None = None,
    220     secret: bool = False,
    221 ) -> str:
    222     "Prompt the user to configure a string"
    223     return ask(name, msg, default, example=example, secret=secret)
    224 
    225 
    226 def ask_bank_name(
    227     name: str | None, msg: str, test: bool, default: str | None = None
    228 ) -> str:
    229     "Prompt the user to configure a BIC"
    230 
    231     def check_bank_name(raw: str) -> str | None:
    232         if test and "test" not in raw.lower():
    233             print("Test bank name must contain 'Test'")
    234             return None
    235         else:
    236             return raw
    237 
    238     return ask(name, msg, default, check=check_bank_name)
    239 
    240 
    241 def ask_bic(
    242     name: str | None, msg: str, default: str | None = None, example: str | None = None
    243 ) -> str:
    244     "Prompt the user to configure a BIC"
    245 
    246     def check_bic(raw: str) -> str | None:
    247         raw = raw.translate({ord(i): None for i in " -"})
    248         if not BIC_PATTERN.fullmatch(raw):
    249             print("Invalid BIC")
    250             return None
    251         else:
    252             return raw
    253 
    254     return ask(name, msg, default, example, check=check_bic)
    255 
    256 
    257 def ask_iban(
    258     name: str | None, msg: str, default: str | None = None, example: str | None = None
    259 ) -> str:
    260     "Prompt the user to configure a IBAN"
    261 
    262     def check_iban(raw: str) -> str | None:
    263         raw = raw.translate({ord(i): None for i in " -"})
    264         if not IBAN_PATTERN.fullmatch(raw):
    265             print("Invalid IBAN")  # Checksum check ?
    266             return None
    267         else:
    268             return raw
    269 
    270     return ask(name, msg, default, example, check=check_iban)
    271 
    272 
    273 def ask_currency(
    274     name: str, msg: str, default: str | None = None, example: str | None = None
    275 ) -> str:
    276     "Prompt the user to configure a currency name"
    277 
    278     def check_currency(currency: str) -> str | None:
    279         currency = currency.upper()
    280         if not all([c.isascii() and c.isalpha() for c in currency]):
    281             print("The currency name must be an ASCII alphabetic string")
    282         elif len(currency) < 3 or 11 < len(currency):
    283             print("The currency name had to be between 3 and 11 characters long")
    284         else:
    285             return currency
    286         return None
    287 
    288     return ask(name, msg, default, example, check=check_currency)
    289 
    290 
    291 def ask_host(
    292     name: str, msg: str, default: str | None = None, example: str | None = None
    293 ) -> str:
    294     "Prompt the user to configure the installation hostname"
    295 
    296     def check_host(host: str) -> str | None:
    297         success = True
    298         for subdomain in ["backend", "bank", "exchange"]:
    299             success = try_cmd(["ping", "-c", "1", f"{subdomain}.{host}"]) and success
    300         if success:
    301             return host
    302         else:
    303             return None
    304 
    305     return ask(name, msg, default, example, check=check_host)
    306 
    307 
    308 def ask_terms(name: str, msg: str, kind: str) -> str:
    309     "Prompt the user to select a ToS/privacy policy"
    310 
    311     # msg = "9.1. Enter the filename of the ToS. Some available options are:\n"
    312     tos_msg = msg
    313 
    314     # Recollect example ToS files
    315     tos_path = "/usr/share/taler-exchange/terms"
    316     for f in os.listdir(tos_path):
    317         tos_file = os.path.join(tos_path, f)
    318         if os.path.isfile(tos_file) and f.endswith(".rst") and kind in f:
    319             tos_msg += f"- {tos_file}\n"
    320 
    321     tos_msg += "=> "
    322 
    323     def check_file(path: str) -> str | None:
    324         if not os.path.isfile(path):
    325             print("Not a file")  # Checksum check ?
    326             return None
    327         else:
    328             return path
    329 
    330     return ask(name, tos_msg, check=check_file)
    331 
    332 
    333 def ask_yes_no(name: str | None, msg: str, default: bool | None = None) -> bool:
    334     "Prompt the user to configure a boolean"
    335 
    336     def check_yes_no(raw: str) -> bool | None:
    337         raw = raw.lower()
    338         if raw == "y" or raw == "yes":
    339             return True
    340         elif raw == "n" or raw == "no":
    341             return False
    342         else:
    343             print("Expected 'y' or 'n'")
    344             return None
    345 
    346     def fmt_yes_no_default() -> str:
    347         if default is None:
    348             return " (y/n)"
    349         elif default is True:
    350             return " (Y/n)"
    351         else:
    352             return " (y/N)"
    353 
    354     return ask(
    355         name,
    356         msg,
    357         default,
    358         check=check_yes_no,
    359         fmt=lambda it: "y" if it else "n",
    360         fmt_hint=fmt_yes_no_default,
    361     )
    362 
    363 
    364 # ----- Crypto ----- #
    365 
    366 
    367 def ask_config_password() -> str:
    368     "Prompt the user to configure a password stored hashed with argon2id"
    369     ph = argon2.PasswordHasher()
    370     hash = conf.get("CONFIG_PASSWORD")
    371     passwd = None
    372     if hash is not None:
    373         while True:
    374             passwd = ask_str(None, "Enter the config password", secret=True)
    375             try:
    376                 ph.verify(hash, passwd)
    377                 break
    378             except argon2.exceptions.VerifyMismatchError:
    379                 print("invalid password")
    380     else:
    381         passwd = ask_str(None, "Choose a config password", secret=True)
    382 
    383     if hash is None or ph.check_needs_rehash(hash):
    384         add_conf("CONFIG_PASSWORD", ph.hash(passwd))
    385 
    386     return passwd
    387 
    388 
    389 def ask_secret(
    390     name: str, msg: str, passwd: str | None, default: str | None = None
    391 ) -> str:
    392     "Prompt the user to configure a string stored encryped using pbkdf2_sha512 and chacha20_poly1305"
    393     if passwd is None:
    394         return ask_str(name, msg, default, secret=True)
    395     else:
    396         raw = conf.get(name)
    397         plaintext = None
    398         if raw is not None:
    399             method = "$pbkdf2_sha512_chacha20_poly1305$1000000$"
    400             if raw.startswith(method):
    401                 salt, nonce, tag, ciphertext = [
    402                     b64decode(it) for it in raw.removeprefix(method).split("$", 3)
    403                 ]
    404                 key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512)
    405                 cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
    406                 cipher.update(name.encode())
    407                 plaintext = cipher.decrypt_and_verify(ciphertext, tag).decode()
    408             else:
    409                 salt = get_random_bytes(16)
    410                 key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512)
    411                 cipher = ChaCha20_Poly1305.new(key=key)
    412                 cipher.update(name.encode())
    413                 ciphertext, tag = cipher.encrypt_and_digest(raw.encode())
    414                 add_conf(
    415                     name,
    416                     f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}",
    417                 )
    418         else:
    419             plaintext = ask_str(None, msg, default, secret=True)
    420             salt = get_random_bytes(16)
    421             key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512)
    422             cipher = ChaCha20_Poly1305.new(key=key)
    423             cipher.update(name.encode())
    424             ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode())
    425             add_conf(
    426                 name,
    427                 f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}",
    428             )
    429         result_conf[name] = plaintext
    430         return plaintext
    431 
    432 
    433 # ----- Prompt ----- #
    434 
    435 config_passwd = (
    436     ask_config_password()
    437     if ask_yes_no(
    438         "DO_CONFIG_ENCRYPTION",
    439         "Do you want to encrypt sensitive config values",
    440         True,
    441     )
    442     else None
    443 )
    444 
    445 step()
    446 ask_currency(
    447     "CURRENCY",
    448     "Enter the name of the regional currency",
    449     default="NETZBON",
    450 )
    451 
    452 step()
    453 do_conversion = ask_yes_no(
    454     "DO_CONVERSION",
    455     "Do you want setup regional currency conversion to fiat currency",
    456     True,
    457 )
    458 if do_conversion:
    459     ask_currency(
    460         "FIAT_CURRENCY",
    461         "Enter the name of the fiat currency",
    462         default="CHF",
    463     )
    464     ask_str(
    465         "FIAT_BANK_NAME", "Enter the name of your fiat bank", example="POSTFINANCE AG"
    466     )
    467     iban = ask_iban(
    468         "FIAT_ACCOUNT_IBAN",
    469         "Enter the IBAN of your fiat bank account",
    470         example="CH7789144474425692816",
    471     )
    472     bic = ask_bic(
    473         "FIAT_ACCOUNT_BIC",
    474         "Enter the BIC of your fiat bank account",
    475         example="POFICHBEXXX",
    476     )
    477     name = ask_str(
    478         "FIAT_ACCOUNT_NAME", "Enter the legal name of your fiat bank account"
    479     )
    480     params = urllib.parse.urlencode({"receiver-name": name})
    481     add_conf("CONVERSION_PAYTO", f"payto://iban/{bic}/{iban}?{params}")
    482 
    483 step()
    484 do_test = ask_yes_no(
    485     "DO_TEST",
    486     "Do you want setup a testing deployment",
    487     False,
    488 )
    489 
    490 step()
    491 bank_name = ask_bank_name(
    492     "BANK_NAME",
    493     "Enter the human-readable name of the bank",
    494     test=do_test,
    495     default="Taler Bank" if not do_test else "Taler Test Bank",
    496 )
    497 
    498 step()
    499 ask_host("DOMAIN_NAME", "Enter the domain name", example="example.com")
    500 
    501 step()
    502 if ask_yes_no("ENABLE_TLS", "Setup TLS using Let's Encrypt?", True):
    503     ask_str("TLS_EMAIL", "Enter an email address for Let's Encrypt")
    504 
    505     def ask_tos():
    506         print(
    507             "Please read the Terms of Service at https://letsencrypt.org/documents/LE-SA-v1.3-September-21-2022.pdf."
    508         )
    509         if not ask_yes_no(
    510             None,
    511             "You must agree in order to register with the ACME server. Do you agree?",
    512             False,
    513         ):
    514             print("You must agree in order to register with the ACME server")
    515             return None
    516         else:
    517             return "y"
    518 
    519     conf_value("TLS_TOS", ask_tos)
    520     add_conf("PROTO", "https")
    521 else:
    522     add_conf("PROTO", "http")
    523 
    524 add_conf(
    525     "DO_OFFLINE", "y"
    526 )  # TODO support offline setup again when the documentation is ready
    527 
    528 step()
    529 if ask_yes_no(
    530     "DO_TELESIGN",
    531     "Setup SMS two-factor authentication using Telesign https://www.telesign.com?",
    532     True,
    533 ):
    534 
    535     def ask_telesign():
    536         customer_id = ask_str(None, "Enter your Telesign Customer ID")
    537         api_key = ask_str(None, "Enter your Telesign API Key")
    538         phone_number = ask_str(
    539             None, "Enter a phone number to test your API key", example="+447911123456"
    540         )
    541         auth_token = base64.b64encode(f"{customer_id}:{api_key}".encode()).decode()
    542         if not try_cmd(
    543             ["libeufin-tan-sms.sh", phone_number],
    544             f"T-12345 is your verification code for {bank_name} setup",
    545             {**os.environ, "TELESIGN_AUTH_TOKEN": auth_token},
    546         ):
    547             print(
    548                 "Failed to send an SMS using Telesign API, check your credentials and phone number"
    549             )
    550             return None
    551         code = ask_str(None, f"Enter the code received by {phone_number}")
    552         if code != "12345" and code != "T-12345":
    553             print(
    554                 f"Wrong code got '{code}' expected '12345', check your credentials and phone number"
    555             )
    556             return None
    557         return auth_token
    558 
    559     conf_value("TELESIGN_AUTH_TOKEN", ask_telesign)
    560 
    561 step()
    562 generated_password = str(uuid.uuid4())
    563 admin_password = ask_secret(
    564     "BANK_ADMIN_PASSWORD",
    565     "Enter the admin password for the bank (or press enter to autogenerate password)",
    566     config_passwd,
    567     generated_password,
    568 )
    569 add_conf(
    570     "BANK_ADMIN_PASSWORD_GENERATED",
    571     "y" if generated_password == admin_password else "n",
    572 )
    573 
    574 step()
    575 if ask_yes_no(
    576     "DO_EXCHANGE_TERMS",
    577     "Do you wish to configure terms of service (ToS) for the exchange?",
    578     True,
    579 ):
    580     ask_terms(
    581         "EXCHANGE_TERMS_FILE",
    582         "Enter the filename of the ToS. Some available options are:\n",
    583         "-tos-",
    584     )
    585 
    586 step()
    587 if ask_yes_no(
    588     "DO_EXCHANGE_PRIVACY",
    589     "Do you wish to configure a privacy policy for the exchange?",
    590     True,
    591 ):
    592     ask_terms(
    593         "EXCHANGE_PRIVACY_FILE",
    594         "Enter the filename of the privacy policy. Some available options are:\n",
    595         "-pp-",
    596     )
    597 
    598 # Update on disk format even if nothing have changed
    599 store_conf()
    600 
    601 # ----- Return conf ----- #
    602 
    603 content = ""
    604 for key, value in result_conf.items():
    605     escaped = value.replace("'", "'\\''")
    606     content += f"export {key}='{escaped}'\n"
    607 with os.fdopen(3, "w") as f:
    608     f.write(content)