# This file is part of TALER # (C) 2014, 2015, 2016 INRIA # # TALER is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation; either version 3, # or (at your option) any later version. # # TALER is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty # of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public # License along with TALER; see the file COPYING. If not, # see . # # @author Marcello Stanisci import json import time import zlib import timeit import logging from urllib.parse import unquote from django.db import connection from django.test import TestCase, Client from django.urls import reverse from django.conf import settings from django.contrib.auth.models import User from mock import patch, MagicMock from .models import BankAccount, BankTransaction from . import urls from .views import wire_transfer from .amount import Amount, CurrencyMismatch, BadFormatAmount LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def clear_db(): User.objects.all().delete() BankAccount.objects.all().delete() BankTransaction.objects.all().delete() with connection.cursor() as cursor: cursor.execute( "ALTER SEQUENCE app_bankaccount_account_no_seq" \ " RESTART") cursor.execute( "ALTER SEQUENCE app_banktransaction_id_seq RESTART") class PublicAccountsTestCase(TestCase): def setUp(self): clear_db() self.user = User.objects.create_user( username="Bank", password="Bank") self.user.save() self.user_bank_account = BankAccount( account_no=100, is_public = True, user=self.user) self.user_bank_account.save() def test_public_accounts(self): self.assertTrue(User.objects.get(username="Bank")) response = self.client.get( reverse("public-accounts", urlconf=urls)) class WithdrawTestCase(TestCase): def setUp(self): self.user_bank_account = BankAccount( user=User.objects.create_user( username="test_user", password="test_password"), account_no=100) self.user_bank_account.save() self.exchange_bank_account = BankAccount( user=User.objects.create_user( username="test_exchange", password=""), account_no=99) self.exchange_bank_account.save() self.client = Client() @patch('talerbank.app.views.wire_transfer') @patch('hashlib.new') @patch('time.time') def test_withdraw(self, mocked_time, mocked_hashlib, mocked_wire_transfer): amount = Amount(settings.TALER_CURRENCY, 0, 1) params = { "amount_value": str(amount.value), "amount_fraction": str(amount.fraction), "amount_currency": amount.currency, "reserve_pub": "UVZ789", "exchange": "https://exchange.example.com/", "exchange_wire_details": "payto://x-taler-bank/bank.example/99" } self.client.login(username="test_user", password="test_password") response = self.client.get( reverse("pin-question", urlconf=urls), params) self.assertEqual(response.status_code, 200) # We mock hashlib in order to fake the CAPTCHA. hasher = MagicMock() hasher.hexdigest = MagicMock() hasher.hexdigest.return_value = "0" mocked_hashlib.return_value = hasher mocked_time.return_value = 0 response = self.client.post( reverse("pin-verify", urlconf=urls), {"pin_1": "0"}) args, kwargs = mocked_wire_transfer.call_args del kwargs self.assertTrue( args[0].dump() == amount.dump() \ and self.user_bank_account in args \ and "UVZ789" in args \ and self.exchange_bank_account in args) def tearDown(self): clear_db() class InternalWireTransferTestCase(TestCase): def setUp(self): BankAccount(user=User.objects.create_user( username='give_money', password="gm")).save() self.take_money = BankAccount( user=User.objects.create_user( username='take_money'), account_no=4) self.take_money.save() def tearDown(self): clear_db() def test_internal_wire_transfer(self): client = Client() client.login(username="give_money", password="gm") response = client.post( reverse("profile", urlconf=urls), {"amount": 3.0, "receiver": self.take_money.account_no, "subject": "charity"}) take_money = BankAccount.objects.get(account_no=4) self.assertEqual(0, Amount.cmp( Amount(settings.TALER_CURRENCY, 3), take_money.amount)) self.assertEqual(302, response.status_code) class RegisterTestCase(TestCase): """User registration""" def setUp(self): clear_db() BankAccount( user=User.objects.create_user( username='Bank')).save() def tearDown(self): clear_db() def test_register(self): client = Client() response = client.post(reverse("register", urlconf=urls), {"username": "test_register", "password": "test_register"}, follow=True) self.assertIn(("/profile", 302), response.redirect_chain) # this assertion tests "/profile""s view self.assertEqual(200, response.status_code) class LoginTestCase(TestCase): """User login""" def setUp(self): BankAccount( user=User.objects.create_user( username="test_user", password="test_password")).save() self.client = Client() def tearDown(self): clear_db() def test_login(self): self.assertTrue(self.client.login( username="test_user", password="test_password")) self.assertFalse(self.client.login( username="test_user", password="test_passwordii")) def test_failing_login(self): response = self.client.get( reverse("history", urlconf=urls), {"auth": "basic"}, **{"HTTP_X_TALER_BANK_USERNAME": "Wrong", "HTTP_X_TALER_BANK_PASSWORD": "Credentials"}) data = response.content.decode("utf-8") self.assertEqual(401, response.status_code) class AmountTestCase(TestCase): def test_cmp(self): amount1 = Amount("X", 1) _amount1 = Amount("X", 1) amount2 = Amount("X", 2) self.assertEqual(-1, Amount.cmp(amount1, amount2)) self.assertEqual(1, Amount.cmp(amount2, amount1)) self.assertEqual(0, Amount.cmp(amount1, _amount1)) # Trying to compare amount of different currencies def test_cmp_diff_curr(self): amount1 = Amount("X", 1) amount2 = Amount("Y", 2) with self.assertRaises(CurrencyMismatch): Amount.cmp(amount1, amount2) class RejectTestCase(TestCase): def setUp(self): BankAccount( user=User.objects.create_user( username="rejected_user", password="rejected_password")).save() BankAccount( user=User.objects.create_user( username="rejecting_user", password="rejecting_password")).save() def tearDown(self): clear_db() def test_reject(self): client = Client() rejecting = User.objects.get(username="rejecting_user") data = '{"auth": {"type": "basic"}, \ "credit_account": %d, \ "subject": "TESTWTID", \ "exchange_url": "https://exchange.test", \ "amount": "%s:5.0"}' \ % (rejecting.bankaccount.account_no, settings.TALER_CURRENCY) response = client.post( reverse("add-incoming", urlconf=urls), data=data, content_type="application/json", follow=True, **{ "HTTP_X_TALER_BANK_USERNAME": "rejected_user", "HTTP_X_TALER_BANK_PASSWORD": "rejected_password"}) self.assertEqual(response.status_code, 200) data = response.content.decode("utf-8") jdata = json.loads(data) rejected = User.objects.get(username="rejected_user") response = client.put( reverse("reject", urlconf=urls), data='{"row_id": %d, \ "auth": {"type": "basic"}, \ "account_number": %d}' \ % (jdata["row_id"], rejected.bankaccount.account_no), content_type="application/json", **{"HTTP_X_TALER_BANK_USERNAME": "rejecting_user", "HTTP_X_TALER_BANK_PASSWORD": "rejecting_password"}) self.assertEqual(response.status_code, 204) class AddIncomingTestCase(TestCase): """Test money transfer's API""" def setUp(self): BankAccount(user=User.objects.create_user( username="bank_user", password="bank_password")).save() BankAccount(user=User.objects.create_user( username="user_user", password="user_password")).save() def tearDown(self): clear_db() def test_add_incoming(self): client = Client() data = '{"auth": {"type": "basic"}, \ "credit_account": 1, \ "subject": "TESTWTID", \ "exchange_url": "https://exchange.test", \ "amount": "%s:1.0"}' \ % settings.TALER_CURRENCY response = client.post( reverse("add-incoming", urlconf=urls), data=data, content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "user_user", "HTTP_X_TALER_BANK_PASSWORD": "user_password"}) self.assertEqual(200, response.status_code) # Trying the same request, but compressed. zdata = zlib.compress(bytes(data, "utf-8")) response = client.post( reverse("add-incoming", urlconf=urls), data=zdata, content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "user_user", "HTTP_X_TALER_BANK_PASSWORD": "user_password", "HTTP_CONTENT_ENCODING": "deflate"}) self.assertEqual(200, response.status_code) data = '{"auth": {"type": "basic"}, \ "credit_account": 1, \ "subject": "TESTWTID", \ "exchange_url": "https://exchange.test", \ "amount": "WRONGCURRENCY:1.0"}' response = client.post( reverse("add-incoming", urlconf=urls), data=data, content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "user_user", "HTTP_X_TALER_BANK_PASSWORD": "user_password"}) # note: a bad currency request gets 400. self.assertRaises(CurrencyMismatch) self.assertEqual(406, response.status_code) LOGGER.info(response.content.decode("utf-8")) # Try to go debit data = '{"auth": {"type": "basic"}, \ "credit_account": 1, \ "subject": "TESTWTID", \ "exchange_url": "https://exchange.test", \ "amount": "%s:50.1"}' % settings.TALER_CURRENCY response = client.post( reverse("add-incoming", urlconf=urls), data=data, content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "user_user", "HTTP_X_TALER_BANK_PASSWORD": "user_password"}) self.assertEqual(403, response.status_code) # Try use a non-existent recipient. data = '{"auth": {"type": "basic"}, \ "credit_account": 1987, \ "subject": "TESTWTID", \ "exchange_url": "https://exchange.test", \ "amount": "%s:1"}' % settings.TALER_CURRENCY response = client.post( reverse("add-incoming", urlconf=urls), data=data, content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "user_user", "HTTP_X_TALER_BANK_PASSWORD": "user_password"}) self.assertEqual(404, response.status_code) class HistoryContext: def __init__(self, expected_resp, **kwargs): self.expected_resp = expected_resp self.urlargs = kwargs self.urlargs.update({"auth": "basic"}) def dump(self): return self.urlargs class HistoryTestCase(TestCase): def setUp(self): clear_db() debit_account = BankAccount( user=User.objects.create_user( username='User', password="Password"), amount=Amount(settings.TALER_CURRENCY, 100)) debit_account.save() credit_account = BankAccount( user=User.objects.create_user( username='User0', password="Password0")) credit_account.save() for subject in ( "a", "b", "c", "d", "e", "f", "g", "h", "i"): wire_transfer(Amount(settings.TALER_CURRENCY, 1), debit_account, credit_account, subject) # reject transaction 'i'. trans_i = BankTransaction.objects.get(subject="i") self.client = Client() self.client.post( reverse("reject", urlconf=urls), data='{"auth": {"type": "basic"}, \ "row_id": %d, \ "account_number": 44}' % trans_i.id, # Ignored content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "User0", "HTTP_X_TALER_BANK_PASSWORD": "Password0"}) def tearDown(self): clear_db() def assert_result(self, response, ctx): data = response.content.decode("utf-8") try: # FIXME, not always data is found this way. data = json.loads(data)["data"][0] except (json.JSONDecodeError, KeyError): data = {} self.assertEqual( ctx.expected_resp.get("status"), response.status_code, "Failing request: %s?%s => raw body: %s" % \ (response.request["PATH_INFO"], unquote(response.request["QUERY_STRING"]), response.content.decode("utf-8"))) # extract expected data from response expected_data = {} response_data = {} for key, value in ctx.expected_resp.get("fields", []): response_data.update({key: data.get(key)}) expected_data.update({key: value}) self.assertEqual(expected_data, response_data) def test_history_range(self): now = int(time.time()) for ctx in ( # Expect empty results, range too ancient. HistoryContext(expected_resp={"status": 204}, start = 1, end = 2, direction="both"), # Expect empty results, range too ahead. HistoryContext(expected_resp={"status": 200}, start = now + 40, end = now + 50, direction="both"), # Expect non empty results. HistoryContext(expected_resp={"status": 200}, start = now - 30, end = now + 30, direction="both")): response = self.client.get( reverse("history-range", urlconf=urls), ctx.urlargs, **{"HTTP_X_TALER_BANK_USERNAME": "User", "HTTP_X_TALER_BANK_PASSWORD": "Password"}) self.assert_result(response, ctx) def test_history(self): for ctx in ( HistoryContext( expected_resp={"status": 200}, delta="-4", direction="both"), HistoryContext( expected_resp={ "fields": [("row_id", 9)], "status": 200}, delta="+1", start="5", direction="both"), HistoryContext( expected_resp={ "fields": [("wt_subject", "c")], "status": 200}, delta="1", start=2, direction="both", ordering="ascending"), HistoryContext( expected_resp={ "fields": [("wt_subject", "a")], "status": 200}, delta="-1", start=2, direction="both"), HistoryContext( expected_resp={"status": 204}, delta="1", start="11", direction="both"), HistoryContext( expected_resp={ "status": 200, "fields": [("wt_subject", "i"), ("sign", "cancel-")]}, start=8, delta="+1", direction="cancel-"), HistoryContext( expected_resp={"status": 204}, start=8, delta="+1", direction="cancel-", cancelled="omit"), HistoryContext( expected_resp={"status": 204}, start=8, delta="-1", direction="cancel-"), HistoryContext( expected_resp={"status": 204}, delta="+1", direction="cancel+"), HistoryContext(expected_resp={"status": 200}, delta="-1", direction="debit")): response = self.client.get( reverse("history", urlconf=urls), ctx.urlargs, **{"HTTP_X_TALER_BANK_USERNAME": "User", "HTTP_X_TALER_BANK_PASSWORD": "Password"}) self.assert_result(response, ctx) class DBAmountSubtraction(TestCase): def setUp(self): BankAccount( user=User.objects.create_user(username='U'), amount=Amount(settings.TALER_CURRENCY, 3)).save() def tearDown(self): clear_db() def test_subtraction(self): user_bankaccount = BankAccount.objects.get( user=User.objects.get(username='U')) user_bankaccount.amount.subtract( Amount(settings.TALER_CURRENCY, 2)) self.assertEqual( Amount.cmp(Amount(settings.TALER_CURRENCY, 1), user_bankaccount.amount), 0) class DBCustomColumnTestCase(TestCase): def setUp(self): BankAccount( user=User.objects.create_user(username='U')).save() def tearDown(self): clear_db() def test_exists(self): user_bankaccount = BankAccount.objects.get( user=User.objects.get(username='U')) self.assertTrue( isinstance(user_bankaccount.amount, Amount)) # This tests whether a bank account goes debit and then goes >=0 # again class DebitTestCase(TestCase): def setUp(self): BankAccount( user=User.objects.create_user(username='U')).save() BankAccount( user=User.objects.create_user(username='U0')).save() def tearDown(self): clear_db() def test_green(self): user_bankaccount = BankAccount.objects.get( user=User.objects.get(username='U')) self.assertEqual(False, user_bankaccount.debit) def test_red(self): user_bankaccount = BankAccount.objects.get( user=User.objects.get(username='U')) user_bankaccount0 = BankAccount.objects.get( user=User.objects.get(username='U0')) wire_transfer(Amount(settings.TALER_CURRENCY, 10, 0), user_bankaccount0, user_bankaccount, "Go green") tmp = Amount(settings.TALER_CURRENCY, 10) self.assertEqual( 0, Amount.cmp(user_bankaccount.amount, tmp)) self.assertEqual( 0, Amount.cmp(user_bankaccount0.amount, tmp)) self.assertFalse(user_bankaccount.debit) self.assertTrue(user_bankaccount0.debit) wire_transfer(Amount(settings.TALER_CURRENCY, 11), user_bankaccount, user_bankaccount0, "Go red") tmp.value = 1 self.assertTrue(user_bankaccount.debit) self.assertFalse(user_bankaccount0.debit) self.assertEqual( 0, Amount.cmp(user_bankaccount.amount, tmp)) self.assertEqual( 0, Amount.cmp(user_bankaccount0.amount, tmp)) class ParseAmountTestCase(TestCase): def test_parse_amount(self): ret = Amount.parse("KUDOS:4.0") self.assertJSONEqual( '{"value": 4, \ "fraction": 0, \ "currency": "KUDOS"}', ret.dump()) ret = Amount.parse("KUDOS:4.3") self.assertJSONEqual( '{"value": 4, \ "fraction": 30000000, \ "currency": "KUDOS"}', ret.dump()) ret = Amount.parse("KUDOS:4") self.assertJSONEqual( '{"value": 4, "fraction": 0, "currency": "KUDOS"}', ret.dump()) ret = Amount.parse("KUDOS:4.") # forbid? self.assertJSONEqual( '{"value": 4, "fraction": 0, "currency": "KUDOS"}', ret.dump()) try: Amount.parse("Buggy") except BadFormatAmount as err: return # make sure the control doesn't get here self.assertEqual(True, False) class MeasureHistory(TestCase): def setUp(self): self.user_bankaccount0 = BankAccount( user=User.objects.create_user(username='U0'), amount=Amount(settings.TALER_CURRENCY, 3000)) self.user_bankaccount0.save() user_bankaccount = BankAccount( user=User.objects.create_user(username='U')) user_bankaccount.save() self.ntransfers = 1000 # Make sure logging level is WARNING, otherwise the loop # will overwhelm the console. for i in range(self.ntransfers): del i # to pacify PEP checkers wire_transfer(Amount(settings.TALER_CURRENCY, 1), self.user_bankaccount0, user_bankaccount, "bulk") def tearDown(self): clear_db() def test_extract_history(self): # Measure the time extract_history() needs to retrieve # ~ntransfers records. timer = timeit.Timer( stmt="extract_history(self.user_bankaccount0, False)", setup="from talerbank.app.views import extract_history", globals=locals()) total_time = timer.timeit(number=1) allowed_time_per_record = 0.003 self.assertLess( total_time, self.ntransfers*allowed_time_per_record) class BalanceTestCase(TestCase): def setUp(self): self.the_bank = BankAccount( user=User.objects.create_user( username='U0', password='U0PASS'), amount=Amount(settings.TALER_CURRENCY, 3)) self.the_bank.save() user = BankAccount( user=User.objects.create_user(username='U'), amount=Amount(settings.TALER_CURRENCY, 10)) user.save() # bank: 3, user: 10 (START). # bank: 2, user: 11 wire_transfer(Amount(settings.TALER_CURRENCY, 1), self.the_bank, user, "mock") # bank: 4, user: 9 wire_transfer(Amount(settings.TALER_CURRENCY, 2), user, self.the_bank, "mock") # bank: -1, user: 14 wire_transfer(Amount(settings.TALER_CURRENCY, 5), self.the_bank, user, "mock") # bank: 7, user: 6 (END) wire_transfer(Amount(settings.TALER_CURRENCY, 8), user, self.the_bank, "mock") # bank: -3, user: 16 (END) wire_transfer(Amount(settings.TALER_CURRENCY, 10), user, self.the_bank, "mock") self.client = Client() def tearDown(self): clear_db() def test_balance(self): self.client.login(username="U0", password="U0PASS") response = self.client.get( reverse("history", urlconf=urls), {"auth": "basic", "delta": -30, "direction": "both", "account_number": 55}, # unused **{"HTTP_X_TALER_BANK_USERNAME": "U0", "HTTP_X_TALER_BANK_PASSWORD": "U0PASS"}) data = response.content.decode("utf-8") self.assertEqual(response.status_code, 200) entries = json.loads(data) acc_in = Amount(settings.TALER_CURRENCY) acc_out = Amount(settings.TALER_CURRENCY) for entry in entries["data"]: if entry["sign"] == "+": acc_in.add(Amount(**entry["amount"])) if entry["sign"] == "-": acc_out.add(Amount(**entry["amount"])) expected_amount = Amount(settings.TALER_CURRENCY, 3) try: debit = False acc_in.subtract(acc_out) expected_amount.add(acc_in) except ValueError: # "out" is bigger than "in" LOGGER.info("out > in") acc_out.subtract(acc_in) try: expected_amount.subtract(acc_out) except ValueError: # initial amount wasn't enough to cover expenses debit = True acc_out.subtract(expected_amount) expected_amount = acc_out self.assertEqual( Amount.cmp(expected_amount, self.the_bank.amount), 0)