taler-merchant-demos

Python-based Frontends for the Demonstration Web site
Log | Files | Refs | Submodules | README | LICENSE

__init__.py (7622B)


      1 import flask
      2 import requests
      3 from urllib.parse import urljoin, urlencode, urlparse, urlunparse
      4 from flask import request
      5 from datetime import datetime
      6 import time
      7 from flask_babel import gettext
      8 import babel  # used for lang sanity check
      9 import os
     10 import re
     11 import logging
     12 
     13 logger = logging.getLogger(__name__)
     14 
     15 
     16 class BackendException(Exception):
     17     """Exception for failed communication with the Taler merchant backend"""
     18 
     19     def __init__(self, message, backend_status=None, backend_json={}):
     20         super().__init__(backend_json.get("hint", message))
     21         self.backend_status = backend_status
     22         self.backend_json = backend_json
     23 
     24 
     25 ##
     26 # POST a request to the backend, and return a error
     27 # response if any error occurs.
     28 #
     29 # @param endpoint the backend endpoint where to POST
     30 #        this request.
     31 # @param json the POST's body.
     32 # @return the backend response (JSON format).
     33 def backend_post(backend_url, endpoint, json, auth_token=None):
     34     headers = dict()
     35     if auth_token:
     36         headers["Authorization"] = "Bearer " + auth_token
     37     final_url = urljoin(backend_url, endpoint)
     38     print("POSTing to: " + final_url)
     39     try:
     40         resp = requests.post(final_url, json=json, headers=headers)
     41     except requests.ConnectionError:
     42         raise BackendException(
     43             message=gettext("Could not establish connection to backend")
     44         )
     45     try:
     46         response_json = resp.json()
     47     except Exception:
     48         raise BackendException(
     49             message=gettext("Could not parse the response from backend"),
     50             backend_status=resp.status_code,
     51         )
     52     print(
     53         "Backend responds to {}: {}/{}".format(
     54             final_url, str(response_json), resp.status_code
     55         )
     56     )
     57     if resp.status_code != 200:
     58         raise BackendException(
     59             message=gettext("Backend returned error status"),
     60             backend_status=resp.status_code,
     61             backend_json=response_json,
     62         )
     63     return response_json
     64 
     65 
     66 ##
     67 # Issue a GET request to the backend.
     68 #
     69 # @param endpoint the backend endpoint where to issue the request.
     70 # @param params (dict type of) URL parameters to append to the request.
     71 # @return the JSON response from the backend, or a error response
     72 #         if something unexpected happens.
     73 def backend_get(backend_url, endpoint, params, auth_token=None):
     74     status, json = backend_get_with_status(backend_url, endpoint, params, auth_token)
     75     if status != 200:
     76         raise BackendException(
     77             message=gettext("Backend returned error status"),
     78             backend_status=status,
     79             backend_json=json,
     80         )
     81     return json
     82 
     83 
     84 def backend_get_with_status(backend_url, endpoint, params, auth_token=None):
     85     headers = dict()
     86     if auth_token is not None:
     87         headers["Authorization"] = "Bearer " + auth_token
     88     final_url = urljoin(backend_url, endpoint)
     89     print("GETting: " + final_url + " with params: " + str(params))
     90     try:
     91         resp = requests.get(final_url, params=params, headers=headers)
     92     except requests.ConnectionError:
     93         raise BackendException(
     94             message=gettext("Could not establish connection to backend")
     95         )
     96     try:
     97         response_json = resp.json()
     98     except Exception:
     99         raise BackendException(message=gettext("Could not parse response from backend"))
    100     print("Backend responds to {}: {}".format(final_url, str(response_json)))
    101     return resp.status_code, response_json
    102 
    103 
    104 def get_locale():
    105     parts = request.path.split("/", 2)
    106     if 2 >= len(parts):
    107         # Totally unexpected path format, do not localize
    108         return "en"
    109     lang = parts[1]
    110 
    111     # Sanity check on the language code.
    112     try:
    113         babel.core.Locale.parse(lang)
    114     except Exception as err:
    115         # Not a locale, default to english.
    116         logger.error(f"language {lang} did not parse, default to english")
    117         return "en"
    118     if lang == "static":
    119         # Static resource, not a language indicator.
    120         # Do not localize then.
    121         return "en"
    122     return lang
    123 
    124 
    125 ##
    126 # Construct the payment URL where customer can pay the order
    127 #
    128 # @param backend_url where the backend is located
    129 # @param order_id id of the order already created
    130 # @param session_id session in which the order is going to be paid
    131 # @param token if the order requires a token
    132 # @return the JSON response from the backend, or a error response
    133 #         if something unexpected happens.
    134 def backend_payment_url(backend_url, endpoint, session_id, token):
    135     final_url = urljoin(backend_url, endpoint)
    136     query = urlencode({"token": token, "session_id": session_id})
    137     redirect_url = urlparse(final_url)._replace(query=query)
    138     return urlunparse(redirect_url)
    139 
    140 
    141 ##
    142 # Helper function used inside Jinja2 logic to create a links
    143 # to the current page but in a different language. Used to
    144 # implement the "Language" menu.
    145 #
    146 def self_localized(lang):
    147     """
    148     Return URL for the current page in another locale.
    149     """
    150     path = request.path
    151     # path must have the form "/$LANG/$STUFF"
    152     parts = path.split("/", 2)
    153     if 2 >= len(parts):
    154         # Totally unexpected path format, do not localize
    155         return path
    156     return "/" + lang + "/" + parts[2]
    157 
    158 
    159 class Deadline:
    160     def __init__(self, value):
    161         self.value = value
    162 
    163     def isExpired(self):
    164         if self.value == "never":
    165             return False
    166         now = int(round(time.time()) * 1000)
    167         now_dt = datetime.fromtimestamp(now / 1000)
    168         deadline_dt = datetime.fromtimestamp(self.value / 1000)
    169         print(
    170             "debug: checking refund expiration, now: {}, deadline: {}".format(
    171                 now_dt.strftime("%c"), deadline_dt.strftime("%c")
    172             )
    173         )
    174         return now > self.value
    175 
    176 
    177 all_languages = {
    178     "en": "English [en]",
    179     "ar": "عربى [ar]",
    180     "zh_Hant": "繁體中文 [zh]",
    181     "fr": "Français [fr]",
    182     "de": "Deutsch [de]",
    183     "hi": "हिंदी [hi]",
    184     "it": "Italiano [it]",
    185     "ja": "日本語 [ja]",
    186     "ko": "한국어 [ko]",
    187     "pt": "Português [pt]",
    188     "pt_BR": "Português (Brazil) [pt_BR]",
    189     "ru": "Ру́сский язы́к [ru]",
    190     "es": "Español [es]",
    191     "sv": "Svenska [sv]",
    192     "tr": "Türkçe [tr]",
    193     "uk": "Українська [uk]",
    194 }
    195 
    196 
    197 ##
    198 # Make the environment available into templates.
    199 #
    200 # @param pagename name of the target app
    201 # @param base_url url of the app
    202 # @param add additional dictionary elements to add or overwrite
    203 #
    204 # @return the environment-reading function
    205 def make_utility_processor(pagename, base_url, add={}):
    206     def utility_processor():
    207         def getactive():
    208             return pagename
    209 
    210         def getlang():
    211             return get_locale()
    212 
    213         def env(name, default=None):
    214             return os.environ.get(name, default)
    215 
    216         def prettydate(talerdate):
    217             parsed_time = re.search(r"/Date\(([0-9]+)\)/", talerdate)
    218             if not parsed_time:
    219                 return "malformed date given"
    220             parsed_time = int(parsed_time.group(1))
    221             timestamp = datetime.datetime.fromtimestamp(parsed_time)
    222             # returns the YYYY-MM-DD date format.
    223             return timestamp.strftime("%Y-%b-%d")
    224 
    225         def static(name):
    226             return flask.url_for("static", filename=name)
    227 
    228         rd = dict(
    229             env=env,
    230             prettydate=prettydate,
    231             getactive=getactive,
    232             getlang=getlang,
    233             all_languages=all_languages,
    234             static=static,
    235         )
    236         rd.update(add)
    237 
    238         return rd
    239 
    240     return utility_processor