summaryrefslogtreecommitdiff
path: root/contrib/taler-nexus-prepare
blob: 66a0b1c1e71f4ee9d2db5c409a8e0bbe11427fff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3
# This file is in the public domain.

from requests import get, post
from subprocess import call
import base64

# EBICS details
EBICS_URL = "http://localhost:5000/ebicsweb"
HOST_ID = "HOST01"
PARTNER_ID = "PARTNER1"
USER_ID = "USER1"
EBICS_VERSION = "H004"

SUBSCRIBER_IBAN = "ES9121000418450200051332"
SUBSCRIBER_BIC = "BIC"
SUBSCRIBER_NAME = "Exchange"

BANK_ACCOUNT_LABEL = "my-bank-account"
BANK_CONNECTION_LABEL = "my-bank-connection"
FACADE_LABEL="my-facade"

USERNAME="Exchange"
USER_AUTHORIZATION_HEADER = "basic {}".format(
    base64.b64encode(b"Exchange:x").decode("utf-8")
)

def assertResponse(response):
    if response.status_code != 200:
        print("Test failed on URL: {}".format(response.url))
        # stdout/stderr from both services is A LOT of text.
        # Confusing to dump all that to console.
        print("Check nexus.log and sandbox.log, probably under /tmp")
        exit(1)
    # Allows for finer grained checks.
    return response

# Create a nexus (super-) user
check_call(["libeufin-nexus",
      "superuser",
      "--db-name", "/tmp/nexus-exchange-test.sqlite3",
      "Exchange",
      "--password", "x"]
)

# Create a EBICS bank connection.
assertResponse(
    post(
        "http://localhost:5001/bank-connections",
        json=dict(
            name=BANK_CONNECTION_LABEL,
            source="new",
            type="ebics",
            data=dict(
                ebicsURL=EBICS_URL, hostID=HOST_ID, partnerID=PARTNER_ID, userID=USER_ID
            ),
        ),
        headers=dict(Authorization=USER_AUTHORIZATION_HEADER),
    )
)

# Create a facade
assertResponse(
    post(
        "http://localhost:5001/facades",
        json=dict(
            name=FACADE_LABEL,
            type="taler-wire-gateway",
            creator=USERNAME,
            config=dict(
                bankAccount=BANK_ACCOUNT_LABEL,
                bankConnection=BANK_CONNECTION_LABEL,
                reserveTransferLevel="UNUSED",
                intervalIncremental="UNUSED"
            )
        ),
        headers=dict(Authorization=USER_AUTHORIZATION_HEADER),
    )
)

# Create the EBICS host at the Sandbox.
assertResponse(
    post(
        "http://localhost:5000/admin/ebics/host",
        json=dict(hostID=HOST_ID, ebicsVersion=EBICS_VERSION),
    )
)

# Create Exchange EBICS subscriber at the Sandbox.
assertResponse(
    post(
        "http://localhost:5000/admin/ebics/subscribers",
        json=dict(hostID=HOST_ID, partnerID=PARTNER_ID, userID=USER_ID),
    )
)

# Create a bank account associated to the Exchange's EBICS subscriber,
# again at the Sandbox.
assertResponse(
    post(
        "http://localhost:5000/admin/ebics/bank-accounts",
        json=dict(
            subscriber=dict(hostID=HOST_ID, partnerID=PARTNER_ID, userID=USER_ID),
            iban=SUBSCRIBER_IBAN,
            bic=SUBSCRIBER_BIC,
            name=SUBSCRIBER_NAME,
            label=BANK_ACCOUNT_LABEL,
        ),
    )
)

# 'connect' to the bank: upload+download keys.
assertResponse(
    post(
        "http://localhost:5001/bank-connections/{}/connect".format(BANK_CONNECTION_LABEL),
        json=dict(),
        headers=dict(Authorization=USER_AUTHORIZATION_HEADER),
    )
)

# Download bank accounts.
assertResponse(
    post(
        "http://localhost:5001/bank-connections/{}/ebics/import-accounts".format(BANK_CONNECTION_LABEL),
        json=dict(),
        headers=dict(Authorization=USER_AUTHORIZATION_HEADER),
    )
)