summaryrefslogtreecommitdiff
path: root/talermerchantdemos/httpcommon/__init__.py
blob: 01b5c84b74f0a70d92f70d21b454a5c5ebf32150 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import flask
import requests
from urllib.parse import urljoin
from flask import request
from datetime import datetime
import time
from flask_babel import gettext
import os
import re
import logging

LOGGER = logging.getLogger(__name__)

class BackendException(Exception):
    """Exception for failed communication with the Taler merchant backend"""

    def __init__(self, message, backend_status=None, backend_json=None):
        super().__init__(backend_json.get("hint", message))
        self.backend_status = backend_status
        self.backend_json = backend_json

##
# POST a request to the backend, and return a error
# response if any error occurs.
#
# @param endpoint the backend endpoint where to POST
#        this request.
# @param json the POST's body.
# @return the backend response (JSON format).
def backend_post(backend_url, endpoint, json, auth_token=None):
    headers = dict()
    if auth_token:
        headers["Authorization"] = "Bearer " + auth_token
    final_url = urljoin(backend_url, endpoint)
    print("POSTing to: " + final_url)
    try:
        resp = requests.post(final_url, json=json, headers=headers)
    except requests.ConnectionError:
        raise BackendException(
            message=gettext("Could not establish connection to backend")
        )
    try:
        response_json = resp.json()
    except ValueError:
        raise BackendException(
            message=gettext("Could not parse response from backend"),
            backend_status=resp.status_code,
        )
    if resp.status_code != 200:
        raise BackendException(
            message=gettext("Backend returned error status"),
            backend_status=resp.status_code,
            backend_json=response_json,
        )
    print("Backend responds to {}: {}/{}".format(
        final_url,
        str(response_json),
        resp.status_code
    ))
    return response_json


##
# Issue a GET request to the backend.
#
# @param endpoint the backend endpoint where to issue the request.
# @param params (dict type of) URL parameters to append to the request.
# @return the JSON response from the backend, or a error response
#         if something unexpected happens.
def backend_get(backend_url, endpoint, params, auth_token=None):
    headers = dict()
    if auth_token is not None:
        headers["Authorization"] = "Bearer " + auth_token
    final_url = urljoin(backend_url, endpoint)
    print("GETting: " + final_url + " with params: " + str(params))
    try:
        resp = requests.get(final_url, params=params, headers=headers)
    except requests.ConnectionError:
        raise BackendException(
            message=gettext("Could not establish connection to backend")
        )
    try:
        response_json = resp.json()
    except ValueError:
        raise BackendException(message=gettext("Could not parse response from backend"))
    if resp.status_code != 200:
        raise BackendException(
            message=gettext("Backend returned error status"),
            backend_status=resp.status_code,
            backend_json=response_json,
        )
    print("Backend responds to {}: {}".format(final_url, str(response_json)))
    return response_json


def get_locale():
    parts = request.path.split("/", 2)
    if 2 >= len(parts):
        # Totally unexpected path format, do not localize
        return "en"
    lang = parts[1]
    if lang == "static":
        # Static resource, not a language indicator.
        # Do not localize then.
        return "en"
    return lang


##
# Helper function used inside Jinja2 logic to create a links
# to the current page but in a different language. Used to
# implement the "Language" menu.
#
def self_localized(lang):
    """
    Return URL for the current page in another locale.
    """
    path = request.path
    # path must have the form "/$LANG/$STUFF"
    parts = path.split("/", 2)
    if 2 >= len(parts):
        # Totally unexpected path format, do not localize
        return path
    return "/" + lang + "/" + parts[2]


class Deadline:
    def __init__(self, value):
        self.value = value

    def isExpired(self):
        if self.value == "never":
            return False
        now = int(round(time.time()) * 1000)
        now_dt = datetime.fromtimestamp(now / 1000)
        deadline_dt = datetime.fromtimestamp(self.value / 1000)
        print(
            "debug: checking refund expiration, now: {}, deadline: {}".format(
                now_dt.strftime("%c"), deadline_dt.strftime("%c")
            )
        )
        return now > self.value


all_languages = {
    "en": "English [en]",
    "ar": "عربى [ar]",
    "zh_Hant": "繁體中文 [zh]",
    "fr": "Français [fr]",
    "de": "Deutsch [de]",
    "hi": "हिंदी [hi]",
    "it": "Italiano [it]",
    "ja": "日本語 [ja]",
    "ko": "한국어 [ko]",
    "pt": "Português [pt]",
    "pt_BR": "Português (Brazil) [pt_BR]",
    "ru": "Ру́сский язы́к [ru]",
    "es": "Español [es]",
    "sv": "Svenska [sv]",
    "tr": "Türk [tr]",
}


##
# Make the environment available into templates.
#
# @return the environment-reading function
def make_utility_processor(pagename):
    def utility_processor():
        def getactive():
            return pagename

        def getlang():
            return get_locale()

        def env(name, default=None):
            return os.environ.get(name, default)

        def prettydate(talerdate):
            parsed_time = re.search(r"/Date\(([0-9]+)\)/", talerdate)
            if not parsed_time:
                return "malformed date given"
            parsed_time = int(parsed_time.group(1))
            timestamp = datetime.datetime.fromtimestamp(parsed_time)
            # returns the YYYY-MM-DD date format.
            return timestamp.strftime("%Y-%b-%d")

        def static(name):
            return flask.url_for("static", filename=name)

        return dict(
            env=env,
            prettydate=prettydate,
            getactive=getactive,
            getlang=getlang,
            all_languages=all_languages,
            static=static,
        )

    return utility_processor