summaryrefslogtreecommitdiff
path: root/regional-currency/config.py
blob: e382927261f573be148dc1573b6a21058e327de5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
#!/usr/bin/env python3
"""Python script to ask questions using an interactive prompt"""

import base64
import os
import re
import subprocess
import urllib.parse
import uuid
import getpass
from base64 import b64decode, b64encode
from typing import Callable, Dict, TypeVar

import argon2
from Crypto.Cipher import ChaCha20_Poly1305
from Crypto.Hash import SHA512
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Random import get_random_bytes

# Early exit if already loaded
if os.environ.get("CONFIG_LOADED") == "y":
    exit(0)

log = open("setup.log", "ab", buffering=0)
CONFIG_FILE = "config/user.conf"
BIC_PATTERN = re.compile("[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?")
IBAN_PATTERN = re.compile("[A-Z]{2}[0-9]{2}[A-Z0-9]{,28}")


def load_conf() -> Dict[str, str]:
    """Load user configuration file"""
    conf = {}
    with open(CONFIG_FILE, "r") as f:
        for kv in f.read().splitlines():
            if len(kv) != 0:
                [k, v] = [part.strip() for part in kv.split("=", 1)]
                if v.startswith('"') and v.endswith('"'):
                    conf[k] = v.strip('"').replace('\\"', '"')
                elif v.startswith("'") and v.endswith("'"):
                    conf[k] = v.strip("'").replace("'\\''", "'").replace("\\'", "'")
                else:
                    conf[k] = v
    return conf


conf = load_conf()
result_conf = {**conf, "CONFIG_LOADED": "y"}

def store_conf():
    """Update the configuration file"""
    content = ""
    for key, value in conf.items():
        escaped = value.replace("'", "'\\''")
        content += f"{key}='{escaped}'\n"
    with open(CONFIG_FILE, "w") as f:
        f.write(content)

def add_conf(name: str, value: str):
    """Update a user configuration value and update the configuration file"""
    conf[name] = value
    result_conf[name] = value
    store_conf()

def run_cmd(
    cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None
) -> int:
    """Run a command in a child process and return its exit code"""
    result = subprocess.run(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        input=input.encode() if input is not None else None,
        stdin=subprocess.DEVNULL if input is None else None,
        env=env,
    )
    log.write(result.stdout)
    if result.returncode != 0:
        print(result.stdout.decode("utf-8"), end="")
    return result.returncode


def try_cmd(
    cmd: list[str], input: str | None = None, env: Dict[str, str] | None = None
) -> bool:
    """Run a command in a child process and return if successful"""
    return run_cmd(cmd, input, env) == 0


A = TypeVar("A")
T = TypeVar("T")


def conf_value(
    name: str | None,
    action: Callable[[], str | None],
    default: T | None = None,
    check: Callable[[str], T | None] = lambda it: it,
    fmt: Callable[[T], str] = lambda it: str(it),
) -> T:
    """
    Logic to configure a value

    :param name: if present will try to fetch the current value and will store the new value
    :param action: how a value will be obtained
    :param default: default value to use if no value is given
    :param check: check and normalize the value
    :param fmt: format value for storage
    :return: the configuration value
    """
    value = None

    # Fetch current value
    if name is not None:
        curr = conf.get(name)
        if curr is not None:
            # Check the current value and ask again if invalid
            value = check(curr)

    # Ask for a new value until we get a valid one
    while value is None:
        new = action()
        # Use default if no value was provided else check the new value
        value = check(new) if new is not None else default

    # Store the new value
    if name is not None:
        add_conf(name, fmt(value))

    return value


def ask(
    name: str | None,
    msg: str,
    default: T | None = None,
    check: Callable[[str], T | None] = lambda it: it,
    fmt: Callable[[T], str] = lambda it: str(it),
    secret: bool = False
) -> T:
    """
    Prompt the user to configure a value
    :param name: if present will try to fetch the current value and will store the new value
    :param msg: the message to prompt the user with
    :param default: default value to use if no value is obtained
    :param check: check and normalize the value
    :param fmt: format value for storage
    :param secret: hide the input content
    :return: the configuration value
    """

    def do_ask() -> str | None:
        # Log the prompt
        log.write(msg.encode() + "\n".encode())
        # Actual prompt
        if secret:
            raw = getpass.getpass(msg).strip()
        else:
            raw = input(msg).strip()
        if raw == "":
            if default is None:
                print("You must enter a value")
            return None
        return raw

    return conf_value(name, do_ask, default, check, fmt)


def ask_str(name: str | None, msg: str, default: str | None = None, secret: bool = False) -> str:
    "Prompt the user to configure a string"
    return ask(name, msg, default, secret=secret)


def ask_bic(name: str | None, msg: str, default: str | None = None) -> str:
    "Prompt the user to configure a BIC"

    def check_bic(raw: str) -> str | None:
        raw = raw.translate({ord(i): None for i in " -"})
        if not BIC_PATTERN.fullmatch(raw):
            print("Invalid BIC")
            return None
        else:
            return raw

    return ask(name, msg, default, check_bic)


def ask_iban(name: str | None, msg: str, default: str | None = None) -> str:
    "Prompt the user to configure a IBAN"

    def check_iban(raw: str) -> str | None:
        raw = raw.translate({ord(i): None for i in " -"})
        if not IBAN_PATTERN.fullmatch(raw):
            print("Invalid IBAN")  # Checksum check ?
            return None
        else:
            return raw

    return ask(name, msg, default, check_iban)


def ask_currency(name: str, msg: str, default: str | None = None) -> str:
    "Prompt the user to configure a currency name"

    def check_currency(currency: str) -> str | None:
        currency = currency.upper()
        if not all([c.isascii() and c.isalpha() for c in currency]):
            print("The currency name must be an ASCII alphabetic string")
        elif len(currency) < 3 or 11 < len(currency):
            print("The currency name had to be between 3 and 11 characters long")
        else:
            return currency
        return None

    return ask(name, msg, default, check_currency)


def ask_host(name: str, msg: str, default: str | None = None) -> str:
    "Prompt the user to configure the installation hostname"

    def check_host(host: str) -> str | None:
        success = True
        for subdomain in ["backend", "bank", "exchange"]:
            success = try_cmd(["ping", "-c", "1", f"{subdomain}.{host}"]) and success
        if success:
            return host
        else:
            return None

    return ask(name, msg, default, check_host)


def ask_terms(name: str, msg: str, kind: str) -> str:
    "Prompt the user to select a ToS/privacy policy"

    # msg = "9.1. Enter the filename of the ToS. Some available options are:\n"
    tos_msg = msg

    # Recollect example ToS files
    tos_path = "/usr/share/taler/terms"
    for f in os.listdir(tos_path):
        tos_file = os.path.join(tos_path, f)
        if os.path.isfile(tos_file) and f.endswith(".rst") and kind in f:
            tos_msg += f"- {tos_file}\n"

    tos_msg += "=> "

    def check_file(path: str) -> str | None:
        if not os.path.isfile(path):
            print("Not a file")  # Checksum check ?
            return None
        else:
            return path

    return ask(name, tos_msg, None, check_file)


def ask_yes_no(name: str | None, msg: str, default: bool | None = None) -> bool:
    "Prompt the user to configure a boolean"

    def check_yes_no(raw: str) -> bool | None:
        raw = raw.lower()
        if raw == "y" or raw == "yes":
            return True
        elif raw == "n" or raw == "no":
            return False
        else:
            print("Expected 'y' or 'n'")
            return None

    return ask(name, msg, default, check_yes_no, lambda it: "y" if it else "n")


# ----- Crypto ----- #


def ask_config_password() -> str:
    "Prompt the user to configure a password stored hashed with argon2id"
    ph = argon2.PasswordHasher()
    hash = conf.get("CONFIG_PASSWORD")
    passwd = None
    if hash is not None:
        while True:
            passwd = ask_str(None, "Enter the config password : ", secret=True)
            try:
                ph.verify(hash, passwd)
                break
            except argon2.exceptions.VerifyMismatchError:
                print("invalid password")
    else:
        passwd = ask_str(None, "1.1 Choose a config password : ", secret=True)

    if hash is None or ph.check_needs_rehash(hash):
        add_conf("CONFIG_PASSWORD", ph.hash(passwd))

    return passwd


def ask_secret(
    name: str, msg: str, passwd: str | None, default: str | None = None
) -> str:
    "Prompt the user to configure a string stored encryped using pbkdf2_sha512 and chacha20_poly1305"
    if passwd is None:
        return ask_str(name, msg, default)
    else:
        raw = conf.get(name)
        plaintext = None
        if raw is not None:
            method = "$pbkdf2_sha512_chacha20_poly1305$1000000$"
            if raw.startswith(method):
                salt, nonce, tag, ciphertext = [
                    b64decode(it) for it in raw.removeprefix(method).split("$", 3)
                ]
                key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512)
                cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
                cipher.update(name.encode())
                plaintext = cipher.decrypt_and_verify(ciphertext, tag).decode()
            else:
                salt = get_random_bytes(16)
                key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512)
                cipher = ChaCha20_Poly1305.new(key=key)
                cipher.update(name.encode())
                ciphertext, tag = cipher.encrypt_and_digest(raw.encode())
                add_conf(
                    name,
                    f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}",
                )
        else:
            plaintext = ask_str(None, msg, default, True)
            salt = get_random_bytes(16)
            key = PBKDF2(passwd, salt, 32, count=1000000, hmac_hash_module=SHA512)
            cipher = ChaCha20_Poly1305.new(key=key)
            cipher.update(name.encode())
            ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode())
            add_conf(
                name,
                f"$pbkdf2_sha512_chacha20_poly1305$1000000${base64.b64encode(salt).decode()}${base64.b64encode(cipher.nonce).decode()}${base64.b64encode(tag).decode()}${base64.b64encode(ciphertext).decode()}",
            )
        result_conf[name] = plaintext
        return plaintext


# ----- Prompt ----- #

config_passwd = (
    ask_config_password()
    if ask_yes_no(
        "DO_CONFIG_ENCRYPTION",
        "1. Do you want to encrypt sensitive config values (Y/n): ",
        True,
    )
    else None
)
ask_currency(
    "CURRENCY",
    "2. Enter the name of the regional currency (e.g. 'NETZBON'): ",
    "NETZBON",
)
do_conversion = ask_yes_no(
    "DO_CONVERSION",
    "3. Do you want setup regional currency conversion to fiat currency (Y/n): ",
    True,
)
if do_conversion:
    ask_currency(
        "FIAT_CURRENCY",
        "3.1. Enter the name of the fiat currency (e.g. 'CHF'): ",
        "CHF",
    )
    ask_str(
        "FIAT_BANK_NAME",
        "3.2. Enter the name of your fiat bank (e.g. POSTFINANCE AG): ",
    )
    iban = ask_iban(
        "FIAT_ACCOUNT_IBAN",
        "3.3. Enter the IBAN of your fiat bank account (e.g. 'CH7789144474425692816'): ",
    )
    bic = ask_bic(
        "FIAT_ACCOUNT_BIC",
        "3.4. Enter the BIC of your fiat bank account (e.g. 'POFICHBEXXX'): ",
    )
    name = ask_str(
        "FIAT_ACCOUNT_NAME", "3.5. Enter the legal name of your fiat bank account: "
    )
    params = urllib.parse.urlencode({"receiver-name": name})
    add_conf("CONVERSION_PAYTO", f"payto://iban/{bic}/{iban}?{params}")
bank_name = ask_str(
    "BANK_NAME",
    "4. Enter the human-readable name of the bank (e.g. 'Taler Bank'): ",
    "Taler Bank",
)
ask_host("DOMAIN_NAME", "5. Enter the domain name (e.g. 'example.com'): ")
if ask_yes_no("ENABLE_TLS", "6. Setup TLS using Let's Encrypt? (Y/n): ", True):
    ask_str("TLS_EMAIL", "6.1. Enter an email address for Let's Encrypt: ")

    def ask_tos():
        print(
            "6.2. Please read the Terms of Service at https://letsencrypt.org/documents/LE-SA-v1.3-September-21-2022.pdf."
        )
        if not ask_yes_no(
            None,
            "6.2. You must agree in order to register with the ACME server. Do you agree? (y/n): ",
            False,
        ):
            print("You must agree in order to register with the ACME server")
            return None
        else:
            return "y"

    conf_value("TLS_TOS", ask_tos)
    add_conf("PROTO", "https")
else:
    add_conf("PROTO", "http")

add_conf(
    "DO_OFFLINE", "y"
)  # TODO support offline setup again when the documentation is ready

if ask_yes_no(
    "DO_TELESIGN",
    "7. Setup SMS two-factor authentication using Telesign https://www.telesign.com? (Y/n): ",
    True,
):

    def ask_telesign():
        customer_id = ask_str(None, "7.1. Enter your Telesign Customer ID: ")
        api_key = ask_str(None, "7.2. Enter your Telesign API Key: ")
        phone_number = ask_str(
            None,
            "6.3. Enter a phone number to test your API key (e.g. '+447911123456'): ",
        )
        auth_token = base64.b64encode(f"{customer_id}:{api_key}".encode()).decode()
        if not try_cmd(
            ["libeufin-tan-sms.sh", phone_number],
            f"T-12345 is your verification code for {bank_name} setup",
            {**os.environ, "AUTH_TOKEN": auth_token},
        ):
            print(
                "Failed to send an SMS using Telesign API, check your credentials and phone number"
            )
            return None
        code = ask_str(None, f"7.4. Enter the code received by {phone_number} : ")
        if code != "12345" and code != "T-12345":
            print(
                f"Wrong code got '{code}' expected '12345', check your credentials and phone number"
            )
            return None
        return auth_token

    conf_value("TELESIGN_AUTH_TOKEN", ask_telesign)
generated_password= str(uuid.uuid4())
admin_password = ask_secret(
    "BANK_ADMIN_PASSWORD",
    "8. Enter the admin password for the bank (or press enter to autogenerate password): ",
    config_passwd,
    generated_password,
)
add_conf("BANK_ADMIN_PASSWORD_GENERATED", "y" if generated_password==admin_password else "n")

if ask_yes_no(
    "DO_EXCHANGE_TERMS",
    "9. Do you wish to configure terms of service (ToS) for the exchange? (Y/n): ",
    True,
):
    ask_terms(
        "EXCHANGE_TERMS_FILE",
        "9.1. Enter the filename of the ToS. Some available options are:\n",
        "-tos-",
    )

if ask_yes_no(
    "DO_EXCHANGE_PRIVACY",
    "10. Do you wish to configure a privacy policy for the exchange? (Y/n): ",
    True,
):
    ask_terms(
        "EXCHANGE_PRIVACY_FILE",
        "10.1. Enter the filename of the privacy policy. Some available options are:\n",
        "-pp-",
    )

# Update on disk format even if nothing have changed
store_conf()

# ----- Return conf ----- #

content = ""
for key, value in result_conf.items():
    escaped = value.replace("'", "'\\''")
    content += f"export {key}='{escaped}'\n"
with os.fdopen(3, "w") as f:
    f.write(content)