# This file is part of TALER # (C) 2014, 2015, 2016 INRIA # # TALER is free software; you can redistribute it and/or modify it under the # terms of the GNU Affero General Public License as published by the Free Software # Foundation; either version 3, or (at your option) any later version. # # TALER is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # TALER; see the file COPYING. If not, see # # @author Marcello Stanisci import json import timeit import logging from django.db import connection from django.test import TestCase, Client from django.urls import reverse from django.conf import settings from django.contrib.auth.models import User from mock import patch, MagicMock from urllib.parse import unquote from .models import BankAccount, BankTransaction from . import urls from .views import wire_transfer, LoginFailed from .amount import Amount, CurrencyMismatch, BadFormatAmount LOGGER = logging.getLogger() LOGGER.setLevel(logging.WARNING) def clear_db(): User.objects.all().delete() BankAccount.objects.all().delete() BankTransaction.objects.all().delete() with connection.cursor() as cursor: cursor.execute("ALTER SEQUENCE app_bankaccount_account_no_seq RESTART") cursor.execute("ALTER SEQUENCE app_banktransaction_id_seq RESTART") class WithdrawTestCase(TestCase): def setUp(self): self.user_bank_account = BankAccount( user=User.objects.create_user( username="test_user", password="test_password"), account_no=100) self.user_bank_account.save() self.exchange_bank_account = BankAccount( user=User.objects.create_user( username="test_exchange", password=""), account_no=99) self.exchange_bank_account.save() self.client = Client() def tearDown(self): clear_db() @patch('talerbank.app.views.wire_transfer') @patch('hashlib.new') @patch('time.time') def test_withdraw(self, mocked_time, mocked_hashlib, mocked_wire_transfer): wire_details = '''{ "test": { "type":"test", "account_number":99, "bank_uri":"http://bank.example/", "name":"example" } }''' amount = Amount(settings.TALER_CURRENCY, 0, 1) params = { "amount_value": str(amount.value), "amount_fraction": str(amount.fraction), "amount_currency": amount.currency, "reserve_pub": "UVZ789", "exchange": "https://exchange.example.com/", "exchange_wire_details": wire_details.replace("\n", "").replace(" ", "") } self.client.login(username="test_user", password="test_password") response = self.client.get( reverse("pin-question", urlconf=urls), params) self.assertEqual(response.status_code, 200) # We mock hashlib in order to fake the CAPTCHA. hasher = MagicMock() hasher.hexdigest = MagicMock() hasher.hexdigest.return_value = "0" mocked_hashlib.return_value = hasher mocked_time.return_value = 0 response = self.client.post( reverse("pin-verify", urlconf=urls), {"pin_1": "0"}) args, kwargs = mocked_wire_transfer.call_args self.assertTrue( args[0].dump() == amount.dump() \ and self.user_bank_account in args \ and "UVZ789" in args \ and self.exchange_bank_account in args) def tearDown(self): clear_db() class InternalWireTransferTestCase(TestCase): def setUp(self): BankAccount(user=User.objects.create_user( username='give_money', password="gm")).save() self.take_money = BankAccount( user=User.objects.create_user( username='take_money'), account_no=4) self.take_money.save() def tearDown(self): clear_db() def test_internal_wire_transfer(self): client = Client() client.login(username="give_money", password="gm") response = client.post(reverse("profile", urlconf=urls), {"amount": 3.0, "receiver": self.take_money.account_no, "subject": "charity"}) take_money = BankAccount.objects.get(account_no=4) self.assertEqual(0, Amount.cmp( Amount(settings.TALER_CURRENCY, 3), take_money.amount), msg=self.take_money.amount.stringify(2)) self.assertEqual(200, response.status_code) class RegisterTestCase(TestCase): """User registration""" def setUp(self): BankAccount( user=User.objects.create_user( username='Bank')).save() def tearDown(self): clear_db() def test_register(self): client = Client() response = client.post(reverse("register", urlconf=urls), {"username": "test_register", "password": "test_register"}, follow=True) self.assertIn(("/profile", 302), response.redirect_chain) # this assertion tests "/profile""s view self.assertEqual(200, response.status_code) class RegisterWrongCurrencyTestCase(TestCase): """User registration""" def setUp(self): # Note, config has KUDOS as currency. BankAccount( user=User.objects.create_user(username='Bank'), amount=Amount('WRONGCURRENCY')).save() # Takes account_no = 1, as the first one. def tearDown(self): clear_db() def test_register(self): client = Client() response = client.post(reverse("register", urlconf=urls), {"username": "test_register", "password": "test_register"}, follow=True) self.assertEqual(500, response.status_code) class LoginTestCase(TestCase): """User login""" def setUp(self): BankAccount( user=User.objects.create_user( username="test_user", password="test_password")).save() self.client = Client() def tearDown(self): clear_db() def test_login(self): self.assertTrue(self.client.login( username="test_user", password="test_password")) self.assertFalse(self.client.login( username="test_user", password="test_passwordii")) def test_failing_login(self): response = self.client.get( reverse("history", urlconf=urls), {"auth": "basic"}, **{"HTTP_X_TALER_BANK_USERNAME": "Wrong", "HTTP_X_TALER_BANK_PASSWORD": "Credentials"}) data = response.content.decode("utf-8") self.assertJSONEqual('{"error": "Wrong username/password", "ec": 5212}', json.loads(data)) self.assertEqual(401, response.status_code) class AmountTestCase(TestCase): def test_cmp(self): amount1 = Amount("X", 1) _amount1 = Amount("X", 1) amount2 = Amount("X", 2) self.assertEqual(-1, Amount.cmp(amount1, amount2)) self.assertEqual(1, Amount.cmp(amount2, amount1)) self.assertEqual(0, Amount.cmp(amount1, _amount1)) # Trying to compare amount of different currencies def test_cmp_diff_curr(self): amount1 = Amount("X", 1) amount2 = Amount("Y", 2) with self.assertRaises(CurrencyMismatch): Amount.cmp(amount1, amount2) class RejectTestCase(TestCase): def setUp(self): BankAccount( user=User.objects.create_user( username="rejected_user", password="rejected_password")).save() BankAccount( user=User.objects.create_user( username="rejecting_user", password="rejecting_password")).save() def tearDown(self): clear_db() def test_reject(self): client = Client() rejecting = User.objects.get(username="rejecting_user") data = '{"auth": {"type": "basic"}, \ "credit_account": %d, \ "subject": "TESTWTID", \ "exchange_url": "https://exchange.test", \ "amount": \ {"value": 5, \ "fraction": 0, \ "currency": "%s"}}' \ % (rejecting.bankaccount.account_no, settings.TALER_CURRENCY) response = client.post( reverse("add-incoming", urlconf=urls), data=data, content_type="application/json", follow=True, **{ "HTTP_X_TALER_BANK_USERNAME": "rejected_user", "HTTP_X_TALER_BANK_PASSWORD": "rejected_password"}) data = response.content.decode("utf-8") jdata = json.loads(data) rejected = User.objects.get(username="rejected_user") response = client.put( reverse("reject", urlconf=urls), data='{"row_id": %d, \ "auth": {"type": "basic"}, \ "account_number": %d}' \ % (jdata["row_id"], rejected.bankaccount.account_no), content_type="application/json", **{"HTTP_X_TALER_BANK_USERNAME": "rejecting_user", "HTTP_X_TALER_BANK_PASSWORD": "rejecting_password"}) self.assertEqual(response.status_code, 204) class AddIncomingTestCase(TestCase): """Test money transfer's API""" def setUp(self): BankAccount(user=User.objects.create_user( username="bank_user", password="bank_password")).save() BankAccount(user=User.objects.create_user( username="user_user", password="user_password")).save() def tearDown(self): clear_db() def test_add_incoming(self): client = Client() data = '{"auth": {"type": "basic"}, \ "credit_account": 1, \ "subject": "TESTWTID", \ "exchange_url": "https://exchange.test", \ "amount": \ {"value": 1, \ "fraction": 0, \ "currency": "%s"}}' \ % settings.TALER_CURRENCY response = client.post(reverse("add-incoming", urlconf=urls), data=data, content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "user_user", "HTTP_X_TALER_BANK_PASSWORD": "user_password"}) self.assertEqual(200, response.status_code) data = '{"auth": {"type": "basic"}, \ "credit_account": 1, \ "subject": "TESTWTID", \ "exchange_url": "https://exchange.test", \ "amount": \ {"value": 1, \ "fraction": 0, \ "currency": "%s"}}' \ % "WRONGCURRENCY" response = client.post(reverse("add-incoming", urlconf=urls), data=data, content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "user_user", "HTTP_X_TALER_BANK_PASSWORD": "user_password"}) self.assertEqual(406, response.status_code) # Try to go debit data = '{"auth": {"type": "basic"}, \ "credit_account": 1, \ "subject": "TESTWTID", \ "exchange_url": "https://exchange.test", \ "amount": \ {"value": 50, \ "fraction": 1, \ "currency": "%s"}}' \ % settings.TALER_CURRENCY response = client.post(reverse("add-incoming", urlconf=urls), data=data, content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "user_user", "HTTP_X_TALER_BANK_PASSWORD": "user_password"}) self.assertEqual(403, response.status_code) class HistoryContext: def __init__(self, expected_resp, **kwargs): self.expected_resp = expected_resp self.urlargs = kwargs self.urlargs.update({"auth": "basic"}) def dump(self): return self.urlargs class HistoryTestCase(TestCase): def setUp(self): debit_account = BankAccount( user=User.objects.create_user( username='User', password="Password"), amount=Amount(settings.TALER_CURRENCY, 100)) debit_account.save() credit_account = BankAccount( user=User.objects.create_user( username='User0', password="Password0")) credit_account.save() for subject in ("a", "b", "c", "d", "e", "f", "g", "h", "i"): wire_transfer(Amount(settings.TALER_CURRENCY, 1), debit_account, credit_account, subject) # reject transaction 'i'. trans_i = BankTransaction.objects.get(subject="i") self.client = Client() self.client.post( reverse("reject", urlconf=urls), data='{"auth": {"type": "basic"}, \ "row_id": %d, \ "account_number": 44}' % trans_i.id, # Ignored content_type="application/json", follow=True, **{"HTTP_X_TALER_BANK_USERNAME": "User0", "HTTP_X_TALER_BANK_PASSWORD": "Password0"}) def tearDown(self): clear_db() def test_history(self): for ctx in (HistoryContext( expected_resp={"status": 200}, delta="4", direction="both"), HistoryContext( expected_resp={ "fields": [("row_id", 6)], "status": 200}, delta="+1", start="5", direction="both"), HistoryContext( expected_resp={ "fields": [("wt_subject", "h")], "status": 200}, delta="-1", start=9, direction="both"), HistoryContext( expected_resp={"status": 204}, delta="1", start="11", direction="both"), HistoryContext( expected_resp={ "status": 200, "fields": [("wt_subject", "i"), ("sign", "cancel-")]}, start=8, delta="+1", direction="cancel-"), HistoryContext( expected_resp={"status": 204}, start=8, delta="+1", direction="cancel-", cancelled="omit"), HistoryContext( expected_resp={"status": 204}, start=8, delta="-1", direction="cancel-"), HistoryContext( expected_resp={"status": 204}, delta="+1", direction="cancel+"), HistoryContext(expected_resp={"status": 200}, delta="+1", direction="debit")): response = self.client.get( reverse("history", urlconf=urls), ctx.urlargs, **{"HTTP_X_TALER_BANK_USERNAME": "User", "HTTP_X_TALER_BANK_PASSWORD": "Password"}) data = response.content.decode("utf-8") try: data = json.loads(data)["data"][0] except (json.JSONDecodeError, KeyError): data = {} self.assertEqual( ctx.expected_resp.get("status"), response.status_code, "Failing request: %s?%s" % \ (response.request["PATH_INFO"], unquote(response.request["QUERY_STRING"]))) # extract expected data from response expected_data = {} response_data = {} for k, v in ctx.expected_resp.get("fields", []): response_data.update({k: data.get(k)}) expected_data.update({k: v}) self.assertEqual(expected_data, response_data) class DBAmountSubtraction(TestCase): def setUp(self): BankAccount( user=User.objects.create_user(username='U'), amount=Amount(settings.TALER_CURRENCY, 3)).save() def tearDown(self): clear_db() def test_subtraction(self): user_bankaccount = BankAccount.objects.get( user=User.objects.get(username='U')) user_bankaccount.amount.subtract( Amount(settings.TALER_CURRENCY, 2)) self.assertEqual( Amount.cmp(Amount(settings.TALER_CURRENCY, 1), user_bankaccount.amount), 0) class DBCustomColumnTestCase(TestCase): def setUp(self): BankAccount( user=User.objects.create_user(username='U')).save() def tearDown(self): clear_db() def test_exists(self): user_bankaccount = BankAccount.objects.get( user=User.objects.get(username='U')) self.assertTrue(isinstance(user_bankaccount.amount, Amount)) # This tests whether a bank account goes debit and then goes >=0 again class DebitTestCase(TestCase): def setUp(self): BankAccount( user=User.objects.create_user(username='U')).save() BankAccount( user=User.objects.create_user(username='U0')).save() def tearDown(self): clear_db() def test_green(self): user_bankaccount = BankAccount.objects.get( user=User.objects.get(username='U')) self.assertEqual(False, user_bankaccount.debit) def test_red(self): user_bankaccount = BankAccount.objects.get( user=User.objects.get(username='U')) user_bankaccount0 = BankAccount.objects.get( user=User.objects.get(username='U0')) wire_transfer(Amount(settings.TALER_CURRENCY, 10, 0), user_bankaccount0, user_bankaccount, "Go green") tmp = Amount(settings.TALER_CURRENCY, 10) self.assertEqual(0, Amount.cmp(user_bankaccount.amount, tmp)) self.assertEqual(0, Amount.cmp(user_bankaccount0.amount, tmp)) self.assertFalse(user_bankaccount.debit) self.assertTrue(user_bankaccount0.debit) wire_transfer(Amount(settings.TALER_CURRENCY, 11), user_bankaccount, user_bankaccount0, "Go red") tmp.value = 1 self.assertTrue(user_bankaccount.debit) self.assertFalse(user_bankaccount0.debit) self.assertEqual(0, Amount.cmp(user_bankaccount.amount, tmp)) self.assertEqual(0, Amount.cmp(user_bankaccount0.amount, tmp)) class ParseAmountTestCase(TestCase): def test_parse_amount(self): ret = Amount.parse("KUDOS:4.0") self.assertJSONEqual('{"value": 4, "fraction": 0, "currency": "KUDOS"}', ret.dump()) ret = Amount.parse("KUDOS:4.3") self.assertJSONEqual('{"value": 4, "fraction": 30000000, "currency": "KUDOS"}', ret.dump()) ret = Amount.parse("KUDOS:4") self.assertJSONEqual('{"value": 4, "fraction": 0, "currency": "KUDOS"}', ret.dump()) ret = Amount.parse("KUDOS:4.") # forbid? self.assertJSONEqual('{"value": 4, "fraction": 0, "currency": "KUDOS"}', ret.dump()) try: Amount.parse("Buggy") except BadFormatAmount: return # make sure the control doesn't get here self.assertEqual(True, False) class MeasureHistory(TestCase): def setUp(self): self.user_bankaccount0 = BankAccount( user=User.objects.create_user(username='U0'), amount=Amount(settings.TALER_CURRENCY, 3000)) self.user_bankaccount0.save() user_bankaccount = BankAccount( user=User.objects.create_user(username='U')) user_bankaccount.save() self.ntransfers = 1000 # Make sure logging level is WARNING, otherwise the loop # will overwhelm the console. for i in range(self.ntransfers): del i # to pacify PEP checkers wire_transfer(Amount(settings.TALER_CURRENCY, 1), self.user_bankaccount0, user_bankaccount, "bulk") def tearDown(self): clear_db() def test_extract_history(self): # Measure the time extract_history() needs to retrieve # ~ntransfers records. timer = timeit.Timer(stmt="extract_history(self.user_bankaccount0)", setup="from talerbank.app.views import extract_history", globals=locals()) total_time = timer.timeit(number=1) allowed_time_per_record = 0.003 self.assertLess(total_time, self.ntransfers*allowed_time_per_record)