cashless2ecash

cashless2ecash: pay with cards for digital cash (experimental)
Log | Files | Refs | README

commit 338d1be4dd74bbb4374d2c59b79e444cb66b2146
parent 86598316f22771cebf467960d374922f2d14ccbe
Author: Joel-Haeberli <haebu@rubigen.ch>
Date:   Fri, 12 Apr 2024 10:06:55 +0200

fix: basic flow

Diffstat:
Mbruno/c2ec/(LOCAL-BIA) Register Withdrawal.bru | 13+++----------
Mbruno/c2ec/(LOCAL-BIA) Withdrawal Status.bru | 4++--
Mc2ec/amount.go | 11+++++++++--
Mc2ec/attestor.go | 152++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------
Mc2ec/bank-integration.go | 36+++++++++++++-----------------------
Mc2ec/db.go | 30++++++++++++++++++++++--------
Dc2ec/db/0000-c2ec_payment_notification_listener.sql | 47-----------------------------------------------
Dc2ec/db/0000-c2ec_schema.sql | 126-------------------------------------------------------------------------------
Dc2ec/db/0000-c2ec_status_listener.sql | 41-----------------------------------------
Dc2ec/db/0000-c2ec_test.sql | 17-----------------
Dc2ec/db/0000-c2ec_test_rollback.sql | 18------------------
Dc2ec/db/0000-c2ec_transfers.sql | 23-----------------------
Ac2ec/db/0001-c2ec_schema.sql | 139+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ac2ec/db/drop.sql | 7+++++++
Ac2ec/db/proc-c2ec_payment_notification_listener.sql | 47+++++++++++++++++++++++++++++++++++++++++++++++
Ac2ec/db/proc-c2ec_status_listener.sql | 41+++++++++++++++++++++++++++++++++++++++++
Ac2ec/db/procedures.sql | 2++
Ac2ec/db/procedures.sql.in | 2++
Ac2ec/db/test_c2ec_test.sql | 19+++++++++++++++++++
Ac2ec/db/test_c2ec_test_rollback.sql | 21+++++++++++++++++++++
Mc2ec/http-util.go | 10+---------
Mc2ec/main.go | 75++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------
Mc2ec/model.go | 11+++++++++++
Mc2ec/postgres.go | 139+++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
Dc2ec/simulation-attestor.go | 123-------------------------------------------------------------------------------
Mc2ec/simulation-client.go | 9++++++++-
Dc2ec/wallee-attestor.go | 128-------------------------------------------------------------------------------
Mc2ec/wire-gateway.go | 10+++++-----
Msimulation/c2ec-simulation | 0
Msimulation/http-util.go | 2+-
Msimulation/main.go | 2+-
Msimulation/sim-terminal.go | 48++++++++++++++++++++++++++++++------------------
Msimulation/sim-wallet.go | 28++++++++++++++--------------
33 files changed, 658 insertions(+), 723 deletions(-)

diff --git a/bruno/c2ec/(LOCAL-BIA) Register Withdrawal.bru b/bruno/c2ec/(LOCAL-BIA) Register Withdrawal.bru @@ -5,21 +5,14 @@ meta { } post { - url: http://localhost:8081/c2ec/withdrawal-operation/ + url: http://localhost:8082/c2ec/withdrawal-operation/sN1zl2B3BuoOSlCYh9MZLi-iKUWLfMxLaFOjalKgFYE= body: json auth: none } body:json { { - "status": "pending", - "amount": { - "currency": "CHF", - "value": "", - "fraction": "" - }, - "sender_wire": "payto://wallee-transaction/asdfsadf", - "wire_types": ["wallee-transaction"], - "reserve_public_key": "" + "reserve_pub_key":"NJO606A4R7TDKBBT6075ME55GLJL42QS8DCPC3DLT4NOU8N6CM70====", + "terminal_id":1 } } diff --git a/bruno/c2ec/(LOCAL-BIA) Withdrawal Status.bru b/bruno/c2ec/(LOCAL-BIA) Withdrawal Status.bru @@ -5,12 +5,12 @@ meta { } get { - url: http://localhost:8081/c2ec/withdrawal-operation/WOPID?long_poll_ms=5000&old_state=pending + url: http://localhost:8082/c2ec/withdrawal-operation/sN1zl2B3BuoOSlCYh9MZLi-iKUWLfMxLaFOjalKgFYE=?long_poll_ms=10000&old_state=pending body: none auth: none } query { - long_poll_ms: 5000 + long_poll_ms: 10000 old_state: pending } diff --git a/c2ec/amount.go b/c2ec/amount.go @@ -40,8 +40,15 @@ type Amount struct { Fraction uint64 `json:"fraction"` } -func ToAmount(amount TalerAmountCurrency) (*Amount, error) { - +func ToAmount(amount *TalerAmountCurrency) (*Amount, error) { + + if amount == nil { + return &Amount{ + Currency: "", + Value: 0, + Fraction: 0, + }, nil + } a := new(Amount) a.Currency = amount.Curr a.Value = uint64(amount.Val) diff --git a/c2ec/attestor.go b/c2ec/attestor.go @@ -3,60 +3,138 @@ package main import ( "context" "errors" + "strconv" + "strings" ) const PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE = 10 const PS_PAYMENT_NOTIFICATION_CHANNEL = "payment_notification" -// An attestor attests a withdrawal using the providers -// backend. Attestations are triggered by a database event. -// An attestor registers itself to those events using the -// Listen function. An attestor shall be run on its own. -// The generic argument T is the type of the channel which -// is used to specify the expected result type. -type Attestor[T any] interface { - // This will setup the attestor. Don't call this function - // on your own. Instead, use RunAttestor which will setup - // and run the Attestor for you. - Setup(p *Provider) (chan *T, error) - // Listen for database event. If event is catched, - // dispatch and kick off attestation. The notifications - // will be sent through the notification channel. Since the - // process is started in the background, errors can - // be retrieved through the supplied error channel. - Listen(ctx context.Context, notificationChannel chan *T, errs chan error) error - // Attests a single withdrawal. - Attest(withdrawalId int, providerTransactionId string, errs chan error) +// Sets up and runs an attestor in the background. This must be called at startup. +func RunAttestor( + ctx context.Context, + errs chan error, +) { + + for _, p := range CONFIG.Providers { + if PROVIDER_CLIENTS[p.Name] == nil { + err := errors.New("no provider client initialized for provider " + p.Name) + LogError("attestor", err) + errs <- err + } + } + + notifications := make(chan *Notification, PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE) + go listenCallback(ctx, notifications, errs) } -// Sets up and runs an attestor in the background. This must be called at startup. -func RunAttestor[T any]( +func listenCallback( ctx context.Context, - attestor Attestor[T], - provider *Provider, -) (chan error, error) { + notifications chan *Notification, + errs chan error, +) { - if attestor == nil { - return nil, errors.New("the attestor was null") + listener, err := NewListener(PS_PAYMENT_NOTIFICATION_CHANNEL, notifications) + if err != nil { + LogError("attestor", err) + errs <- errors.New("attestor needs to be setup first") } - var providerCfg *C2ECProviderConfig - for _, p := range CONFIG.Providers { - if p.Name == provider.Name { - providerCfg = &p + go func() { + LogInfo("attestor", "attestor starts listening for payment notifications at the db") + err := listener.Listen(ctx) + if err != nil { + LogError("attestor-listener", err) + errs <- err + } + close(notifications) + close(errs) + }() + + // Listen is started async. We can therefore block here and must + // not run the retrieval logic in own goroutine + for { + select { + case notification := <-notifications: + // the dispatching and further attestation can be done asynchronously + // thus not blocking further incoming notifications. + LogInfo("attestor", "received notification upon payment. channel="+notification.Channel+", payload="+notification.Payload) + go dispatch(notification, errs) + case <-ctx.Done(): + errs <- ctx.Err() + return } } - if providerCfg == nil { - return nil, errors.New("provider is not configured in runtime configuration") +} + +func dispatch(notification *Notification, errs chan error) { + + // The payload is formatted like: "{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}" + // the validation is strict. This means, that the dispatcher emits an error + // and returns, if a property is malformed. + payload := strings.Split(notification.Payload, "|") + if len(payload) != 3 { + errs <- errors.New("malformed notification payload: " + notification.Payload) + return } - notificationChannel, err := attestor.Setup(provider) + providerName := payload[0] + if providerName == "" { + errs <- errors.New("the provider of the payment is not specified") + return + } + withdrawalRowId, err := strconv.Atoi(payload[1]) if err != nil { - return nil, err + errs <- errors.New("malformed withdrawal_id: " + err.Error()) + return } + providerTransactionId := payload[2] - errs := make(chan error) - go attestor.Listen(ctx, notificationChannel, errs) + client := PROVIDER_CLIENTS[providerName] + if client == nil { + errs <- errors.New("no provider client registered for provider " + providerName) + } - return errs, nil + transaction, err := client.GetTransaction(providerTransactionId) + finaliseOrSetRetry( + transaction, + providerName, + withdrawalRowId, + providerTransactionId, + err, + ) +} + +func finaliseOrSetRetry( + t ProviderTransaction, + providerName string, + withdrawalRowId int, + providerTransactionId string, + err error, +) { + + if err != nil { + LogError("attestor", err) + // set retry + } + + // TODO: call generic function with parameters. This function must trigger the retry flow when not (ABORTED | CONFIRMED) + if t.AllowWithdrawal() { + + err = DB.FinaliseWithdrawal(withdrawalRowId, CONFIRMED, t.Bytes()) + if err != nil { + LogError("attestor", err) + // set retry + //errs <- err + } + } else { + // TODO : this might be too early ?! What if the payment was not yet + // processed by the Wallee backend? Needs testing. + err = DB.FinaliseWithdrawal(withdrawalRowId, ABORTED, t.Bytes()) + if err != nil { + LogError("attestor", err) + // set retry + //errs <- err + } + } } diff --git a/c2ec/bank-integration.go b/c2ec/bank-integration.go @@ -98,7 +98,8 @@ func handleWithdrawalRegistration(res http.ResponseWriter, req *http.Request) { // read and validate the wopid path parameter wopid := req.PathValue(WOPID_PARAMETER) - if !WopidValid(wopid) { + wopid, err = ParseWopid(wopid) + if err != nil { LogWarn("bank-integration-api", "wopid "+wopid+" not valid") if wopid == "" { err := WriteProblem(res, HTTP_BAD_REQUEST, &RFC9457Problem{ @@ -166,7 +167,8 @@ func handleWithdrawalStatus(res http.ResponseWriter, req *http.Request) { // read and validate the wopid path parameter wopid := req.PathValue(WOPID_PARAMETER) - if !WopidValid(wopid) { + wopid, err := ParseWopid(wopid) + if err != nil { LogWarn("bank-integration-api", "wopid "+wopid+" not valid") if wopid == "" { err := WriteProblem(res, HTTP_BAD_REQUEST, &RFC9457Problem{ @@ -231,7 +233,8 @@ func handleWithdrawalStatus(res http.ResponseWriter, req *http.Request) { func handlePaymentNotification(res http.ResponseWriter, req *http.Request) { wopid := req.PathValue(WOPID_PARAMETER) - if !WopidValid(wopid) { + wopid, err := ParseWopid(wopid) + if err != nil { LogWarn("bank-integration-api", "wopid "+wopid+" not valid") if wopid == "" { err := WriteProblem(res, HTTP_BAD_REQUEST, &RFC9457Problem{ @@ -247,27 +250,14 @@ func handlePaymentNotification(res http.ResponseWriter, req *http.Request) { } } - body := make([]byte, req.ContentLength) - _, err := req.Body.Read(body) + jsonCodec := NewJsonCodec[C2ECPaymentNotification]() + paymentNotification, err := ReadStructFromBody(req, jsonCodec) if err != nil { + LogWarn("bank-integration-api", fmt.Sprintf("invalid body for payment notification error=%s", err.Error())) err := WriteProblem(res, HTTP_BAD_REQUEST, &RFC9457Problem{ - TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_INVALID_BODY", - Title: "invalid request body", - Detail: "the body of the payment notification request is malformed: " + err.Error(), - Instance: req.RequestURI, - }) - if err != nil { - res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR) - } - return - } - - paymentNotification, err := NewJsonCodec[C2ECPaymentNotification]().Decode(bytes.NewBuffer(body)) - if err != nil { - err := WriteProblem(res, HTTP_BAD_REQUEST, &RFC9457Problem{ - TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_INVALID_BODY", - Title: "invalid request body", - Detail: "the body of the payment notification request is malformed: " + err.Error(), + TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_WITHDRAW_REGISTRATION_INVALID_REQ", + Title: "invalid request", + Detail: "the payment notification request for the withdrawal is malformed (error: " + err.Error() + ")", Instance: req.RequestURI, }) if err != nil { @@ -314,7 +304,7 @@ func getWithdrawalOrWriteError(wopid string, res http.ResponseWriter, reqUri str err := WriteProblem(res, HTTP_INTERNAL_SERVER_ERROR, &RFC9457Problem{ TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_WITHDRAWAL_STATUS_DB_FAILURE", Title: "database failure", - Detail: "db failure while requesting withdrawal (error:" + err.Error() + ")", + Detail: "db failure while requesting withdrawal (error=" + err.Error() + ")", Instance: reqUri, }) if err != nil { diff --git a/c2ec/db.go b/c2ec/db.go @@ -54,15 +54,15 @@ type Terminal struct { type Withdrawal struct { WithdrawalId uint64 `db:"withdrawal_id"` - Wopid uint64 `db:"wopid"` + Wopid []byte `db:"wopid"` ReservePubKey []byte `db:"reserve_pub_key"` RegistrationTs int64 `db:"registration_ts"` - Amount TalerAmountCurrency `db:"amount"` - Fees TalerAmountCurrency `db:"fees"` + Amount *TalerAmountCurrency `db:"amount" scan:"follow"` + Fees *TalerAmountCurrency `db:"fees" scan:"follow"` WithdrawalStatus WithdrawalOperationStatus `db:"withdrawal_status"` TerminalId int64 `db:"terminal_id"` - ProviderTransactionId string `db:"provider_transaction_id"` - LastRetryTs int64 `db:"last_retry_ts"` + ProviderTransactionId *string `db:"provider_transaction_id"` + LastRetryTs *int64 `db:"last_retry_ts"` RetryCounter int32 `db:"retry_counter"` CompletionProof []byte `db:"completion_proof"` } @@ -78,6 +78,11 @@ type Transfer struct { RequestHash string `db:"request_hash"` } +type Notification struct { + Channel string + Payload string +} + // C2ECDatabase defines the operations which a // C2EC compliant database interface must implement // in order to be bound to the c2ec API. @@ -110,7 +115,7 @@ type C2ECDatabase interface { // a provider backend. This means that the provider // specific transaction id was set and the status is // 'selected'. The attestor can then attest and finalise - // the payments. + // the payments. GetAttestableWithdrawals() ([]*Withdrawal, error) // When an attestation (or fail message) could be @@ -145,8 +150,9 @@ type C2ECDatabase interface { // Inserts a new transfer into the database. AddTransfer(requestId HashCode, requestHash string) error - // This will listen for on the given channel - // and write results to the out channels. + // This will listen for notifications on the + // channel withdrawal notifications are sent. + // Results will be written to the out channel. // Errors will be propagated through the errs // channel. Supply a context with timeout if // you want to use time limitations. @@ -156,4 +162,12 @@ type C2ECDatabase interface { out chan WithdrawalOperationStatus, errs chan error, ) + + // A listener can listen for the specified channel. + // It will send received notifications through the channel + // supplied. The specific implementation must convert the + // database specific message to the generic Notification + // type in order to decouple the database implementation + // from the rest of the logic. + Listen(ctx context.Context, channel string) (chan *Notification, chan error, error) } diff --git a/c2ec/db/0000-c2ec_payment_notification_listener.sql b/c2ec/db/0000-c2ec_payment_notification_listener.sql @@ -1,47 +0,0 @@ -BEGIN; - -SELECT _v.register_patch('0000-c2ec-payment-notification-listener', ARRAY['0000-c2ec-schema'], NULL); - -SET search_path TO c2ec; - --- to create a function, the user needs USAGE privilege on arguments and return types -CREATE OR REPLACE FUNCTION emit_payment_notification() -RETURNS TRIGGER AS $$ -DECLARE - provider_name TEXT; -BEGIN - SELECT p.name INTO provider_name FROM provider AS p - LEFT JOIN terminal AS t - ON t.provider_id = p.provider_id - LEFT JOIN withdrawal AS w - ON t.terminal_id = w.terminal_id - WHERE w.withdrawal_id = NEW.withdrawal_id; - PERFORM pg_notify('payment_notification', - provider_name || '|' || - NEW.withdrawal_id || '|' || - NEW.provider_transaction_id - ); - RETURN NULL; -END; -$$ LANGUAGE plpgsql; -COMMENT ON FUNCTION emit_payment_notification - IS 'The function emits the name of the provider, row id of the withdrawal - and the provider_transaction_id, on the channel "payment_notification". - The format of the payload is as follows: - "{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}". The subscriber - shall decide which attestation process to use, based on the name of - the provider.'; - --- for creating a trigger the user must have TRIGGER pivilege on the table. --- to execute the trigger, the user needs EXECUTE privilege on the trigger function. -CREATE OR REPLACE TRIGGER c2ec_on_payment_notify - AFTER UPDATE OF provider_transaction_id - ON withdrawal - FOR EACH ROW - WHEN (NEW.provider_transaction_id IS NOT NULL) - EXECUTE FUNCTION emit_payment_notification(); -COMMENT ON TRIGGER c2ec_on_payment_notify ON withdrawal - IS 'After setting the provider transaction id following a payment notification, - trigger the emit to the respective channel.'; - -COMMIT; diff --git a/c2ec/db/0000-c2ec_schema.sql b/c2ec/db/0000-c2ec_schema.sql @@ -1,126 +0,0 @@ -BEGIN; - -SELECT _v.register_patch('0000-c2ec-schema', NULL, NULL); - -DROP SCHEMA IF EXISTS c2ec CASCADE; - -CREATE SCHEMA c2ec; -COMMENT ON SCHEMA c2ec - IS 'Schema containing all tables and types related to c2ec (cashless2ecash)'; - -SET search_path TO c2ec; - -CREATE TYPE withdrawal_operation_status AS ENUM ( - 'pending', - 'selected', - 'aborted', - 'confirmed' -); -COMMENT ON TYPE withdrawal_operation_status - IS 'Enumerates the states of a withdrawal operation. - The states are the same as in the bank-integration API: - pending : the operation is pending parameters selection (exchange and reserve public key) - selected : the operations has been selected and is pending confirmation - aborted : the operation has been aborted - confirmed: the transfer has been confirmed and registered by the bank'; - - -CREATE TYPE taler_amount_currency - AS (val INT8, frac INT4 , curr VARCHAR(12)); -COMMENT ON TYPE taler_amount_currency - IS 'Stores an amount, fraction is in units of 1/100000000 of the base value. - copied from https://git.taler.net/merchant.git/tree/src/backenddb/merchant-0001.sql'; - - -CREATE TABLE IF NOT EXISTS provider ( - provider_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - name TEXT NOT NULL UNIQUE, - payto_target_type TEXT NOT NULL UNIQUE, - backend_base_url TEXT NOT NULL, - backend_credentials TEXT NOT NULL -); -COMMENT ON TABLE provider - IS 'Table describing providers of c2ec terminal'; -COMMENT ON COLUMN provider.provider_id - IS 'Uniquely identifies a provider'; -COMMENT ON COLUMN provider.name - IS 'Name of the provider, used for selection in transaction proofing'; -COMMENT ON COLUMN provider.payto_target_type - IS 'The Payto target type associated with the provider. Each payto target type - has exctly one provider. This is needed so that the attestor client can be dynamically - selected by C2EC.'; -COMMENT ON COLUMN provider.backend_base_url - IS 'URL of the provider backend for transaction proofing'; -COMMENT ON COLUMN provider.backend_credentials - IS 'Credentials used to access the backend of the provider'; - - -CREATE TABLE IF NOT EXISTS terminal ( - terminal_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - access_token TEXT NOT NULL, - active BOOLEAN NOT NULL DEFAULT TRUE, - description TEXT, - provider_id INT8 NOT NULL REFERENCES provider(provider_id) -); -COMMENT ON TABLE terminal - IS 'Table containing information about terminals of providers'; -COMMENT ON COLUMN terminal.terminal_id - IS 'Uniquely identifies a terminal'; -COMMENT ON COLUMN terminal.access_token - IS 'The access token of the terminal used for authentication against the c2ec API. It is hashed using a PBKDF.'; -COMMENT ON COLUMN terminal.active - IS 'Indicates if the terminal is active or deactivated'; -COMMENT ON COLUMN terminal.description - IS 'Description to help identify the terminal. This may include the location and an identifier of the terminal.'; -COMMENT ON COLUMN terminal.provider_id - IS 'Indicates the terminal provider to which the terminal belongs'; - - -CREATE TABLE IF NOT EXISTS withdrawal ( - withdrawal_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - wopid BYTEA CHECK (LENGTH(wopid)=32) NOT NULL, - reserve_pub_key BYTEA CHECK (LENGTH(reserve_pub_key)=32) NOT NULL, - registration_ts INT8 NOT NULL, - amount taler_amount_currency, - fees taler_amount_currency, - withdrawal_status withdrawal_operation_status NOT NULL DEFAULT 'pending', - terminal_id INT8 NOT NULL REFERENCES terminal(terminal_id), - provider_transaction_id TEXT, - last_retry_ts INT8, - retry_counter INT4 NOT NULL DEFAULT 0, - completion_proof BYTEA -); -COMMENT ON TABLE withdrawal - IS 'Table representing withdrawal processes initiated by terminals'; -COMMENT ON COLUMN withdrawal.withdrawal_id - IS 'The withdrawal id is used a technical id used by the wire gateway to sequentially select new transactions'; -COMMENT ON COLUMN withdrawal.wopid - IS 'The wopid (withdrawal operation id) is a nonce generated by the terminal requesting a withdrawal. - The wopid identifies a specific withdrawal spawning all involved systems.'; -COMMENT ON COLUMN withdrawal.reserve_pub_key - IS 'Reserve public key for the reserve which will hold the withdrawal amount after completion'; -COMMENT ON COLUMN withdrawal.registration_ts - IS 'Timestamp of when the withdrawal request was registered'; -COMMENT ON COLUMN withdrawal.amount - IS 'Effective amount to be put into the reserve after completion'; -COMMENT ON COLUMN withdrawal.fees - IS 'Fees associated with the withdrawal, including exchange and provider fees'; -COMMENT ON COLUMN withdrawal.withdrawal_status - IS 'Status of the withdrawal process'; -COMMENT ON COLUMN withdrawal.terminal_id - IS 'ID of the terminal that initiated the withdrawal'; -COMMENT ON COLUMN withdrawal.provider_transaction_id - IS 'Transaction identifier supplied by the provider for backend request'; -COMMENT ON COLUMN withdrawal.last_retry_ts - IS 'Timestamp of the last retry attempt'; -COMMENT ON COLUMN withdrawal.retry_counter - IS 'Number of retry attempts'; -COMMENT ON COLUMN withdrawal.completion_proof - IS 'Proof of transaction upon final completion delivered by the providers system'; - -CREATE INDEX wopid_index ON withdrawal (wopid); -COMMENT ON INDEX wopid_index - IS 'The wopid is the search key for each bank-integration api related request. - Thus it makes sense to create an index on the column.'; - -COMMIT; diff --git a/c2ec/db/0000-c2ec_status_listener.sql b/c2ec/db/0000-c2ec_status_listener.sql @@ -1,41 +0,0 @@ -BEGIN; - -SELECT _v.register_patch('0000-c2ec-status-listener', ARRAY['0000-c2ec-schema'], NULL); - -SET search_path TO c2ec; - --- to create a function, the user needs USAGE privilege on arguments and return types -CREATE OR REPLACE FUNCTION emit_withdrawal_status() -RETURNS TRIGGER AS $$ -BEGIN - PERFORM pg_notify('w_' || encode(NEW.wopid::BYTEA, 'base64'), NEW.withdrawal_status::TEXT); - RETURN NULL; -END; -$$ LANGUAGE plpgsql; -COMMENT ON FUNCTION emit_withdrawal_status - IS 'The function encodes the wopid in base64 and - sends a notification on the channel "w_{wopid}" - with the status in the payload.'; - --- for creating a trigger the user must have TRIGGER pivilege on the table. --- to execute the trigger, the user needs EXECUTE privilege on the trigger function. -CREATE OR REPLACE TRIGGER c2ec_withdrawal_created - AFTER INSERT - ON withdrawal - FOR EACH ROW - EXECUTE FUNCTION emit_withdrawal_status(); -COMMENT ON TRIGGER c2ec_withdrawal_created ON withdrawal - IS 'After creation of the withdrawal entry a notification shall - be triggered using this trigger.'; - -CREATE OR REPLACE TRIGGER c2ec_withdrawal_changed - AFTER UPDATE OF withdrawal_status - ON withdrawal - FOR EACH ROW - WHEN (OLD.withdrawal_status IS DISTINCT FROM NEW.withdrawal_status) - EXECUTE FUNCTION emit_withdrawal_status(); -COMMENT ON TRIGGER c2ec_withdrawal_changed ON withdrawal - IS 'After the update of the status (only the status is of interest) - a notification shall be triggered using this trigger.'; - -COMMIT; diff --git a/c2ec/db/0000-c2ec_test.sql b/c2ec/db/0000-c2ec_test.sql @@ -1,17 +0,0 @@ -BEGIN; - -SET search_path TO c2ec; - -DROP TABLE IF EXISTS p_id; - -INSERT INTO provider (name, backend_base_url, backend_credentials) - VALUES ('Simulation', 'will be simulated', 'no creds'); - -SELECT provider_id INTO p_id FROM provider WHERE name = 'Simulation'; - -INSERT INTO terminal (access_token, description, provider_id) - VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', 'this is a simulated terminal', (SELECT * FROM p_id)); - -COMMIT; - -SELECT * FROM provider; diff --git a/c2ec/db/0000-c2ec_test_rollback.sql b/c2ec/db/0000-c2ec_test_rollback.sql @@ -1,17 +0,0 @@ -BEGIN; - -SET search_path TO c2ec; - -DROP TABLE IF EXISTS p_r_id; -DROP TABLE IF EXISTS t_r_id; - -SELECT provider_id INTO p_r_id FROM provider WHERE name = 'Simulation'; -SELECT terminal_id INTO t_r_id FROM terminal WHERE provider_id = (SELECT * FROM p_r_id); - -DELETE FROM withdrawal WHERE terminal_id = (SELECT * FROM t_r_id); -DELETE FROM terminal WHERE provider_id = (SELECT * FROM p_r_id); -DELETE FROM provider WHERE provider_id = (SELECT * FROM p_r_id); - -COMMIT; - -SELECT * FROM provider; -\ No newline at end of file diff --git a/c2ec/db/0000-c2ec_transfers.sql b/c2ec/db/0000-c2ec_transfers.sql @@ -1,22 +0,0 @@ -BEGIN; - -SELECT _v.register_patch('0000-c2ec-transfers', ARRAY['0000-c2ec-schema'], NULL); - -SET search_path TO c2ec; - -CREATE TABLE IF NOT EXISTS transfer ( - request_uid INT8 UNIQUE PRIMARY KEY, - request_hash TEXT NOT NULL -); -COMMENT ON TABLE transfer - IS 'Table storing transfers which are sent by the exchange.'; -COMMENT ON COLUMN transfers.request_uid - IS 'A unique identifier for the transfer. In the case of this - implementation its gonna be the wopid of the withdrawal which - is addressed by the transfer.'; -COMMENT ON COLUMN transfers.request_hash - IS 'Hash of the entire transfer request. Requests with the same - request identifier must have the identical hash to be processed - further.'; - -COMMIT; -\ No newline at end of file diff --git a/c2ec/db/0001-c2ec_schema.sql b/c2ec/db/0001-c2ec_schema.sql @@ -0,0 +1,139 @@ +BEGIN; + +SELECT _v.register_patch('0001-c2ec-schema', NULL, NULL); + +CREATE SCHEMA c2ec; +COMMENT ON SCHEMA c2ec + IS 'Schema containing all tables and types related to c2ec (cashless2ecash)'; + +SET search_path TO c2ec; + +CREATE TYPE withdrawal_operation_status AS ENUM ( + 'pending', + 'selected', + 'aborted', + 'confirmed' +); +COMMENT ON TYPE withdrawal_operation_status + IS 'Enumerates the states of a withdrawal operation. + The states are the same as in the bank-integration API: + pending : the operation is pending parameters selection (exchange and reserve public key) + selected : the operations has been selected and is pending confirmation + aborted : the operation has been aborted + confirmed: the transfer has been confirmed and registered by the bank'; + + +CREATE TYPE taler_amount_currency + AS (val INT8, frac INT4 , curr VARCHAR(12)); +COMMENT ON TYPE taler_amount_currency + IS 'Stores an amount, fraction is in units of 1/100000000 of the base value. + copied from https://git.taler.net/merchant.git/tree/src/backenddb/merchant-0001.sql'; + + +CREATE TABLE IF NOT EXISTS provider ( + provider_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + payto_target_type TEXT NOT NULL UNIQUE, + backend_base_url TEXT NOT NULL, + backend_credentials TEXT NOT NULL +); +COMMENT ON TABLE provider + IS 'Table describing providers of c2ec terminal'; +COMMENT ON COLUMN provider.provider_id + IS 'Uniquely identifies a provider'; +COMMENT ON COLUMN provider.name + IS 'Name of the provider, used for selection in transaction proofing'; +COMMENT ON COLUMN provider.payto_target_type + IS 'The Payto target type associated with the provider. Each payto target type + has exctly one provider. This is needed so that the attestor client can be dynamically + selected by C2EC.'; +COMMENT ON COLUMN provider.backend_base_url + IS 'URL of the provider backend for transaction proofing'; +COMMENT ON COLUMN provider.backend_credentials + IS 'Credentials used to access the backend of the provider'; + + +CREATE TABLE IF NOT EXISTS terminal ( + terminal_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + access_token TEXT NOT NULL, + active BOOLEAN NOT NULL DEFAULT TRUE, + description TEXT, + provider_id INT8 NOT NULL REFERENCES provider(provider_id) +); +COMMENT ON TABLE terminal + IS 'Table containing information about terminals of providers'; +COMMENT ON COLUMN terminal.terminal_id + IS 'Uniquely identifies a terminal'; +COMMENT ON COLUMN terminal.access_token + IS 'The access token of the terminal used for authentication against the c2ec API. It is hashed using a PBKDF.'; +COMMENT ON COLUMN terminal.active + IS 'Indicates if the terminal is active or deactivated'; +COMMENT ON COLUMN terminal.description + IS 'Description to help identify the terminal. This may include the location and an identifier of the terminal.'; +COMMENT ON COLUMN terminal.provider_id + IS 'Indicates the terminal provider to which the terminal belongs'; + + +CREATE TABLE IF NOT EXISTS withdrawal ( + withdrawal_id INT8 GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + wopid BYTEA CHECK (LENGTH(wopid)=32) UNIQUE NOT NULL, + reserve_pub_key BYTEA CHECK (LENGTH(reserve_pub_key)=32) NOT NULL, + registration_ts INT8 NOT NULL, + amount taler_amount_currency, + fees taler_amount_currency, + withdrawal_status withdrawal_operation_status NOT NULL DEFAULT 'pending', + terminal_id INT8 NOT NULL REFERENCES terminal(terminal_id), + provider_transaction_id TEXT, + last_retry_ts INT8, + retry_counter INT4 NOT NULL DEFAULT 0, + completion_proof BYTEA +); +COMMENT ON TABLE withdrawal + IS 'Table representing withdrawal processes initiated by terminals'; +COMMENT ON COLUMN withdrawal.withdrawal_id + IS 'The withdrawal id is used a technical id used by the wire gateway to sequentially select new transactions'; +COMMENT ON COLUMN withdrawal.wopid + IS 'The wopid (withdrawal operation id) is a nonce generated by the terminal requesting a withdrawal. + The wopid identifies a specific withdrawal spawning all involved systems.'; +COMMENT ON COLUMN withdrawal.reserve_pub_key + IS 'Reserve public key for the reserve which will hold the withdrawal amount after completion'; +COMMENT ON COLUMN withdrawal.registration_ts + IS 'Timestamp of when the withdrawal request was registered'; +COMMENT ON COLUMN withdrawal.amount + IS 'Effective amount to be put into the reserve after completion'; +COMMENT ON COLUMN withdrawal.fees + IS 'Fees associated with the withdrawal, including exchange and provider fees'; +COMMENT ON COLUMN withdrawal.withdrawal_status + IS 'Status of the withdrawal process'; +COMMENT ON COLUMN withdrawal.terminal_id + IS 'ID of the terminal that initiated the withdrawal'; +COMMENT ON COLUMN withdrawal.provider_transaction_id + IS 'Transaction identifier supplied by the provider for backend request'; +COMMENT ON COLUMN withdrawal.last_retry_ts + IS 'Timestamp of the last retry attempt'; +COMMENT ON COLUMN withdrawal.retry_counter + IS 'Number of retry attempts'; +COMMENT ON COLUMN withdrawal.completion_proof + IS 'Proof of transaction upon final completion delivered by the providers system'; + +CREATE INDEX wopid_index ON withdrawal (wopid); +COMMENT ON INDEX wopid_index + IS 'The wopid is the search key for each bank-integration api related request. + Thus it makes sense to create an index on the column.'; + +CREATE TABLE IF NOT EXISTS transfer ( + request_uid INT8 UNIQUE PRIMARY KEY, + request_hash TEXT NOT NULL +); +COMMENT ON TABLE transfer + IS 'Table storing transfers which are sent by the exchange.'; +COMMENT ON COLUMN transfer.request_uid + IS 'A unique identifier for the transfer. In the case of this + implementation its gonna be the wopid of the withdrawal which + is addressed by the transfer.'; +COMMENT ON COLUMN transfer.request_hash + IS 'Hash of the entire transfer request. Requests with the same + request identifier must have the identical hash to be processed + further.'; + +COMMIT; diff --git a/c2ec/db/drop.sql b/c2ec/db/drop.sql @@ -0,0 +1,6 @@ +-- counter part to initializing / schema +BEGIN; + +DROP SCHEMA IF EXISTS c2ec CASCADE; + +COMMIT; +\ No newline at end of file diff --git a/c2ec/db/proc-c2ec_payment_notification_listener.sql b/c2ec/db/proc-c2ec_payment_notification_listener.sql @@ -0,0 +1,47 @@ +BEGIN; + +SELECT _v.register_patch('proc-c2ec-payment-notification-listener', ARRAY['0001-c2ec-schema'], NULL); + +SET search_path TO c2ec; + +-- to create a function, the user needs USAGE privilege on arguments and return types +CREATE OR REPLACE FUNCTION emit_payment_notification() +RETURNS TRIGGER AS $$ +DECLARE + provider_name TEXT; +BEGIN + SELECT p.name INTO provider_name FROM c2ec.provider AS p + LEFT JOIN c2ec.terminal AS t + ON t.provider_id = p.provider_id + LEFT JOIN c2ec.withdrawal AS w + ON t.terminal_id = w.terminal_id + WHERE w.withdrawal_id = NEW.withdrawal_id; + PERFORM pg_notify('payment_notification', + provider_name || '|' || + NEW.withdrawal_id || '|' || + NEW.provider_transaction_id + ); + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +COMMENT ON FUNCTION emit_payment_notification + IS 'The function emits the name of the provider, row id of the withdrawal + and the provider_transaction_id, on the channel "payment_notification". + The format of the payload is as follows: + "{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}". The subscriber + shall decide which attestation process to use, based on the name of + the provider.'; + +-- for creating a trigger the user must have TRIGGER pivilege on the table. +-- to execute the trigger, the user needs EXECUTE privilege on the trigger function. +CREATE OR REPLACE TRIGGER c2ec_on_payment_notify + AFTER UPDATE OF provider_transaction_id + ON withdrawal + FOR EACH ROW + WHEN (NEW.provider_transaction_id IS NOT NULL) + EXECUTE FUNCTION emit_payment_notification(); +COMMENT ON TRIGGER c2ec_on_payment_notify ON withdrawal + IS 'After setting the provider transaction id following a payment notification, + trigger the emit to the respective channel.'; + +COMMIT; diff --git a/c2ec/db/proc-c2ec_status_listener.sql b/c2ec/db/proc-c2ec_status_listener.sql @@ -0,0 +1,41 @@ +BEGIN; + +SELECT _v.register_patch('proc-c2ec-status-listener', ARRAY['0001-c2ec-schema'], NULL); + +SET search_path TO c2ec; + +-- to create a function, the user needs USAGE privilege on arguments and return types +CREATE OR REPLACE FUNCTION emit_withdrawal_status() +RETURNS TRIGGER AS $$ +BEGIN + PERFORM pg_notify('w_' || encode(NEW.wopid::BYTEA, 'base64'), NEW.withdrawal_status::TEXT); + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +COMMENT ON FUNCTION emit_withdrawal_status + IS 'The function encodes the wopid in base64 and + sends a notification on the channel "w_{wopid}" + with the status in the payload.'; + +-- for creating a trigger the user must have TRIGGER pivilege on the table. +-- to execute the trigger, the user needs EXECUTE privilege on the trigger function. +CREATE OR REPLACE TRIGGER c2ec_withdrawal_created + AFTER INSERT + ON withdrawal + FOR EACH ROW + EXECUTE FUNCTION emit_withdrawal_status(); +COMMENT ON TRIGGER c2ec_withdrawal_created ON withdrawal + IS 'After creation of the withdrawal entry a notification shall + be triggered using this trigger.'; + +CREATE OR REPLACE TRIGGER c2ec_withdrawal_changed + AFTER UPDATE OF withdrawal_status + ON withdrawal + FOR EACH ROW + WHEN (OLD.withdrawal_status IS DISTINCT FROM NEW.withdrawal_status) + EXECUTE FUNCTION emit_withdrawal_status(); +COMMENT ON TRIGGER c2ec_withdrawal_changed ON withdrawal + IS 'After the update of the status (only the status is of interest) + a notification shall be triggered using this trigger.'; + +COMMIT; diff --git a/c2ec/db/procedures.sql b/c2ec/db/procedures.sql @@ -0,0 +1 @@ +-- generated from +\ No newline at end of file diff --git a/c2ec/db/procedures.sql.in b/c2ec/db/procedures.sql.in @@ -0,0 +1 @@ +Note: cat into procedures.sql +\ No newline at end of file diff --git a/c2ec/db/test_c2ec_test.sql b/c2ec/db/test_c2ec_test.sql @@ -0,0 +1,19 @@ +BEGIN; + +SET search_path TO c2ec; + +DROP TABLE IF EXISTS p_id; + +INSERT INTO provider (name, payto_target_type, backend_base_url, backend_credentials) + VALUES ('Simulation', 'void', 'will be simulated', 'no creds'); + +SELECT provider_id INTO p_id FROM provider WHERE name = 'Simulation'; + +INSERT INTO terminal (access_token, description, provider_id) + VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', 'this is a simulated terminal', (SELECT * FROM p_id)); + +DROP TABLE IF EXISTS p_id; + +COMMIT; + +SELECT * FROM provider; diff --git a/c2ec/db/test_c2ec_test_rollback.sql b/c2ec/db/test_c2ec_test_rollback.sql @@ -0,0 +1,20 @@ +BEGIN; + +SET search_path TO c2ec; + +DROP TABLE IF EXISTS p_r_id; +DROP TABLE IF EXISTS t_r_id; + +SELECT provider_id INTO p_r_id FROM provider WHERE name = 'Simulation'; +SELECT terminal_id INTO t_r_id FROM terminal WHERE provider_id = (SELECT * FROM p_r_id); + +DELETE FROM withdrawal WHERE terminal_id = (SELECT * FROM t_r_id); +DELETE FROM terminal WHERE provider_id = (SELECT * FROM p_r_id); +DELETE FROM provider WHERE provider_id = (SELECT * FROM p_r_id); + +DROP TABLE IF EXISTS p_r_id; +DROP TABLE IF EXISTS t_r_id; + +COMMIT; + +SELECT * FROM provider; +\ No newline at end of file diff --git a/c2ec/http-util.go b/c2ec/http-util.go @@ -75,15 +75,7 @@ func AcceptOptionalParamOrWriteResponse[T any]( } if ptr == nil { - err := WriteProblem(res, HTTP_INTERNAL_SERVER_ERROR, &RFC9457Problem{ - TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_INVALID_REQUEST_QUERY_PARAMETER", - Title: "invalid request query parameter", - Detail: "the withdrawal status request parameter '" + name + "' resulted in a nil pointer)", - Instance: req.RequestURI, - }) - if err != nil { - res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR) - } + LogInfo("http", "optional parameter "+name+" was not set") return nil, false } diff --git a/c2ec/main.go b/c2ec/main.go @@ -31,15 +31,16 @@ var PROVIDER_CLIENTS = map[string]ProviderClient{} // Starts the c2ec process. // The program takes following arguments (ordered): -// 1. path to configuration file (.yaml) (optional) +// 1. path to configuration file (.yaml | .ini style format) (optional) // // The startup follows these steps: // 1. load configuration or panic // 2. setup database or panic -// 3. setup attestors -// 4. setup routes for the bank-integration-api -// 5. setup routes for the wire-gateway-api -// 6. listen for incoming requests (as specified in config) +// 3. setup provider clients +// 4. setup attestor +// 5. setup routes for the bank-integration-api +// 6. setup routes for the wire-gateway-api +// 7. listen for incoming requests (as specified in config) func main() { LogInfo("main", fmt.Sprintf("starting c2ec at %s", time.Now().Format(time.UnixDate))) @@ -59,13 +60,22 @@ func main() { DB, err = setupDatabase(&CONFIG.Database) if err != nil { - panic("unable initialize datatbase: " + err.Error()) + panic("unable to connect to datatbase: " + err.Error()) } - err = setupAttestors(&CONFIG) + err = setupProviderClients(&CONFIG) if err != nil { panic("unable initialize attestors: " + err.Error()) } + LogInfo("main", "provider clients are setup") + + attestorCtx, attestorCancel := context.WithCancel(context.Background()) + defer attestorCancel() // run cancel anyway when main exits. + attestorErrs := make(chan error) + RunAttestor(attestorCtx, attestorErrs) + LogInfo("main", "attestor is running") + + // TODO run retry process here router := http.NewServeMux() @@ -93,16 +103,43 @@ func main() { os.Exit(1) }() + // move this to goroutine + LogInfo("main", "serving at unix-domain-socket "+server.Addr) if err = server.Serve(socket); err != nil { panic(err.Error()) } } else { + // move this to goroutine server.Addr = fmt.Sprintf("%s:%d", CONFIG.Server.Host, CONFIG.Server.Port) + LogInfo("main", "serving at "+server.Addr) if err = server.ListenAndServe(); err != nil { panic(err.Error()) } } + + // TODO : do proper + + // since listening for incoming request, attesting payments and + // retrying payments are three separated processes who can fail + // we must take care of this here. The main process is used to + // dispatch incoming http request and parent of the attestation + // and retry processes. If the main process fails somehow, also + // attestation and retries will end. But if somehow the attestation + // or retry process fail, they will be restarted and the error is + // written to the log. If some setup tasks are failing, the program + // panics. + // for { + // select { + // case attestationError := <-attestorErrs: + // LogError("main", attestationError) + // case <-attestorCtx.Done(): + // // The attestation process died for some reason. let's restart it. + // attestorCtx, attestorCancel = context.WithCancel(context.Background()) + // defer attestorCancel() // does this the right thing? + // RunAttestor(attestorCtx, attestorErrs) + // } + // } } func setupDatabase(cfg *C2ECDatabseConfig) (C2ECDatabase, error) { @@ -110,7 +147,7 @@ func setupDatabase(cfg *C2ECDatabseConfig) (C2ECDatabase, error) { return NewC2ECPostgres(cfg) } -func setupAttestors(cfg *C2ECConfig) error { +func setupProviderClients(cfg *C2ECConfig) error { if DB == nil { return errors.New("setup database first") @@ -133,24 +170,23 @@ func setupAttestors(cfg *C2ECConfig) error { } if !cfg.Server.IsProd { - // Prevent simulation provider to be loaded in productive environments. + // Prevent simulation client to be loaded in productive environments. if p.Name == "Simulation" { - attestor := new(SimulationAttestor) - errs, err := RunAttestor(context.Background(), attestor, p) + + simulationClient := new(SimulationClient) + err := simulationClient.SetupClient(p) if err != nil { return err } - go EndlessChannelResultPrinter("Simulation Attestor Error:", errs) } } if p.Name == "Wallee" { - attestor := new(WalleeAttestor) - errs, err := RunAttestor(context.Background(), attestor, p) + + walleeClient := new(WalleeClient) + err := walleeClient.SetupClient(p) if err != nil { return err - } else { - go EndlessChannelResultPrinter("Wallee Attestor Error:", errs) } } @@ -215,10 +251,3 @@ func setupWireGatewayRoutes(router *http.ServeMux) { adminAddIncoming, ) } - -func EndlessChannelResultPrinter[T any](prefix string, c chan T) { - - out := <-c - fmt.Println(prefix, out) - EndlessChannelResultPrinter(prefix, c) -} diff --git a/c2ec/model.go b/c2ec/model.go @@ -2,6 +2,7 @@ package main import ( "encoding/base64" + "errors" "fmt" ) @@ -50,6 +51,16 @@ func ToWithdrawalOpStatus(s string) (WithdrawalOperationStatus, error) { } } +func ParseWopid(wopid string) (string, error) { + + decoded, err := base64.URLEncoding.DecodeString(wopid) + if err != nil { + LogError("model", err) + return "", errors.New("decoding failed") + } + return base64.StdEncoding.EncodeToString(decoded), nil +} + func WopidValid(wopid string) bool { decoded, err := base64.URLEncoding.DecodeString(wopid) diff --git a/c2ec/postgres.go b/c2ec/postgres.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "context" "encoding/base32" "encoding/base64" @@ -33,15 +32,13 @@ const PS_GET_UNCONFIRMED_WITHDRAWALS = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME " AND " + WITHDRAWAL_FIELD_NAME_STATUS + " = '" + string(SELECTED) + "'" const PS_PAYMENT_NOTIFICATION = "UPDATE " + WITHDRAWAL_TABLE_NAME + " SET (" + - WITHDRAWAL_FIELD_NAME_TRANSACTION_ID + "," + - WITHDRAWAL_FIELD_NAME_AMOUNT + "," + - WITHDRAWAL_FIELD_NAME_FEES + "," + - " = ($1, $2, $3)" + - " WHERE " + WITHDRAWAL_FIELD_NAME_WOPID + "=$4" + WITHDRAWAL_FIELD_NAME_AMOUNT + "," + WITHDRAWAL_FIELD_NAME_FEES + "," + WITHDRAWAL_FIELD_NAME_TRANSACTION_ID + ")" + + " = (($1, $2, $3),($4, $5, $6),$7)" + + " WHERE " + WITHDRAWAL_FIELD_NAME_WOPID + "=$8" const PS_FINALISE_PAYMENT = "UPDATE " + WITHDRAWAL_TABLE_NAME + " SET (" + WITHDRAWAL_FIELD_NAME_STATUS + "," + - WITHDRAWAL_FIELD_NAME_COMPLETION_PROOF + "," + + WITHDRAWAL_FIELD_NAME_COMPLETION_PROOF + ")" + " = ($1, $2)" + " WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$3" @@ -102,6 +99,7 @@ func NewC2ECPostgres(cfg *C2ECDatabseConfig) (*C2ECPostgres, error) { if err != nil { panic(err.Error()) } + dbConnCfg.AfterConnect = db.registerCustomTypesHook db.pool, err = pgxpool.NewWithConfig(context.Background(), dbConnCfg) if err != nil { panic(err.Error()) @@ -112,22 +110,38 @@ func NewC2ECPostgres(cfg *C2ECDatabseConfig) (*C2ECPostgres, error) { return db, nil } +func (db *C2ECPostgres) registerCustomTypesHook(ctx context.Context, conn *pgx.Conn) error { + + t, err := conn.LoadType(ctx, "c2ec.taler_amount_currency") + if err != nil { + return err + } + + conn.TypeMap().RegisterType(t) + return nil +} + func (db *C2ECPostgres) RegisterWithdrawal( wopid WithdrawalIdentifier, resPubKey EddsaPublicKey, terminalId uint64, ) error { + wopidBytes, err := base64.StdEncoding.DecodeString(string(wopid)) + if err != nil { + return err + } + resPubKeyBytes, err := base32.HexEncoding.DecodeString(string(resPubKey)) if err != nil { return err } ts := time.Now() - res, err := db.pool.Query( + res, err := db.pool.Exec( db.ctx, PS_INSERT_WITHDRAWAL, - wopid, + wopidBytes, resPubKeyBytes, SELECTED, ts.Unix(), @@ -137,22 +151,22 @@ func (db *C2ECPostgres) RegisterWithdrawal( LogError("postgres", err) return err } - defer res.Close() - if res.Err() != nil { - LogError("postgres", err) - return err - } LogInfo("postgres", "query="+PS_INSERT_WITHDRAWAL) - LogInfo("postgres", "registered withdrawal successfully. affected rows="+strconv.Itoa(int(res.CommandTag().RowsAffected()))) + LogInfo("postgres", "registered withdrawal successfully. affected rows="+strconv.Itoa(int(res.RowsAffected()))) return nil } func (db *C2ECPostgres) GetWithdrawalByWopid(wopid string) (*Withdrawal, error) { + wopidBytes, err := base64.StdEncoding.DecodeString(string(wopid)) + if err != nil { + return nil, err + } + if row, err := db.pool.Query( db.ctx, PS_GET_WITHDRAWAL_BY_WOPID, - wopid, + wopidBytes, ); err != nil { LogError("postgres", err) if row != nil { @@ -213,19 +227,28 @@ func (db *C2ECPostgres) NotifyPayment( fees Amount, ) error { - res, err := db.pool.Query( + wopidBytes, err := base64.StdEncoding.DecodeString(string(wopid)) + if err != nil { + return err + } + + res, err := db.pool.Exec( db.ctx, PS_PAYMENT_NOTIFICATION, + amount.Value, + amount.Fraction, + amount.Currency, + fees.Value, + fees.Fraction, + fees.Currency, providerTransactionId, - amount, - fees, + wopidBytes, ) if err != nil { LogError("postgres", err) return err } - res.Close() - LogInfo("postgres", "query="+PS_PAYMENT_NOTIFICATION) + LogInfo("postgres", "query="+PS_PAYMENT_NOTIFICATION+", affected rows="+strconv.Itoa(int(res.RowsAffected()))) return nil } @@ -491,19 +514,18 @@ func (db *C2ECPostgres) ListenForWithdrawalStatusChange( errs chan error, ) { - pgNotification := make(chan *pgconn.Notification) + notifications := make(chan *Notification) + errsInternal := make(chan error) go func() { - connstr := PostgresConnectionString(&CONFIG.Database) - cfg, err := pgx.ParseConfig(connstr) + channel := "w_" + string(wopid) + listener, err := NewListener(channel, notifications) if err != nil { - errs <- err + errsInternal <- err + return } - - channel := "w_" + base64.StdEncoding.EncodeToString(bytes.NewBufferString(string(wopid)).Bytes()) - listener := newChannelListener(cfg, channel, pgNotification) - LogInfo("postgres", fmt.Sprintf("listening for %s", wopid)) + LogInfo("postgres", fmt.Sprintf("listening for status change of wopid=%s", wopid)) if err := listener.Listen(ctx); err != nil { LogError("postgres", err) @@ -513,7 +535,7 @@ func (db *C2ECPostgres) ListenForWithdrawalStatusChange( for { select { - case e := <-errs: + case e := <-errsInternal: LogError("postgres", e) errs <- e case <-ctx.Done(): @@ -524,39 +546,64 @@ func (db *C2ECPostgres) ListenForWithdrawalStatusChange( } LogWarn("postgres", msg) errs <- errors.New(msg) - case n := <-pgNotification: + case n := <-notifications: LogInfo("postgres", fmt.Sprintf("received notification for channel %s: %s", n.Channel, n.Payload)) out <- WithdrawalOperationStatus(n.Payload) } } } -// Sets up a channel with the given configurations. -func newChannelListener( - cfg *pgx.ConnConfig, +func Listen(ctx context.Context, channel string) (chan *Notification, chan error, error) { + + out := make(chan *Notification) + errs := make(chan error) + + listener, err := NewListener(channel, out) + if err != nil { + return nil, nil, err + } + + go func() { + + err := listener.Listen(ctx) + if err != nil { + errs <- err + } + }() + + return out, errs, nil +} + +// Sets up a a listener for the given channel. +// Notifications will be sent through the out channel. +func NewListener( cn string, - out chan *pgconn.Notification, -) *pgxlisten.Listener { + out chan *Notification, +) (*pgxlisten.Listener, error) { + + connectionString := PostgresConnectionString(&CONFIG.Database) + + cfg, err := pgx.ParseConfig(connectionString) + if err != nil { + return nil, err + } listener := &pgxlisten.Listener{ Connect: func(ctx context.Context) (*pgx.Conn, error) { - LogInfo("postgres", "connecting to the database") + LogInfo("postgres", "listener connecting to the database") return pgx.ConnectConfig(ctx, cfg) }, } + LogInfo("postgres", "handling notifications on channel="+cn) listener.Handle(cn, pgxlisten.HandlerFunc(func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { LogInfo("postgres", fmt.Sprintf("handling postgres notification. channel=%s", notification.Channel)) - for { - select { - case out <- notification: - LogInfo("postgres", fmt.Sprintf("received notification. channel=%s, notification=%s", notification.Channel, notification.Payload)) - return nil - case <-ctx.Done(): - return ctx.Err() - } + out <- &Notification{ + Channel: notification.Channel, + Payload: notification.Payload, } + return nil })) - return listener + return listener, nil } diff --git a/c2ec/simulation-attestor.go b/c2ec/simulation-attestor.go @@ -1,123 +0,0 @@ -package main - -import ( - "context" - "errors" - "strconv" - "strings" - - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgxlisten" -) - -type SimulationAttestor struct { - Attestor[*pgconn.Notification] - - listener *pgxlisten.Listener - provider *Provider - providerClient ProviderClient -} - -func (wa *SimulationAttestor) Setup(p *Provider) (chan *pgconn.Notification, error) { - - connectionString := PostgresConnectionString(&CONFIG.Database) - - dbCfg, err := pgx.ParseConfig(connectionString) - if err != nil { - panic(err.Error()) - } - - wa.provider = p - - wa.providerClient = new(SimulationClient) - err = wa.providerClient.SetupClient(wa.provider) - if err != nil { - panic(err.Error()) - } - - notificationChannel := make(chan *pgconn.Notification, PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE) - wa.listener = newChannelListener(dbCfg, PS_PAYMENT_NOTIFICATION_CHANNEL, notificationChannel) - return notificationChannel, nil -} - -func (wa *SimulationAttestor) Listen( - ctx context.Context, - notificationChannel chan *pgconn.Notification, - errs chan error, -) error { - - if wa.listener == nil { - return errors.New("attestor needs to be setup first") - } - - go func() { - err := wa.listener.Listen(ctx) - if err != nil { - errs <- err - } - close(notificationChannel) - close(errs) - }() - - // Listen is started async. We can therefore block here and must - // not run the retrieval logic in own goroutine - for { - select { - case notification := <-notificationChannel: - // the dispatching can be done asynchronously - go wa.dispatch(notification, errs) - case <-ctx.Done(): - close(notificationChannel) - close(errs) - } - } -} - -func (wa *SimulationAttestor) Attest(withdrawalId int, providerTransactionId string, errs chan error) { - - transaction, err := wa.providerClient.GetTransaction(providerTransactionId) - if err != nil { - // TODO : do we abort the withdrawal here?? - errs <- err - } - - if transaction.AllowWithdrawal() { - - err = DB.FinaliseWithdrawal(withdrawalId, CONFIRMED, transaction.Bytes()) - if err != nil { - errs <- err - } - } else { - err = DB.FinaliseWithdrawal(withdrawalId, ABORTED, transaction.Bytes()) - if err != nil { - errs <- err - } - } -} - -func (wa *SimulationAttestor) dispatch(notification *pgconn.Notification, errs chan error) { - - // The payload is formatted like: "{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}" - // the validation is strict. This means, that the dispatcher emits an error - // and returns, if a property is malformed. - payload := strings.Split(notification.Payload, "|") - if len(payload) != 3 { - errs <- errors.New("malformed notification payload: " + notification.Payload) - return - } - - provider := payload[0] - if provider != "Simulation" { - // the Simulation attestor can only handle wallee transactions - return - } - withdrawalRowId, err := strconv.Atoi(payload[1]) - if err != nil { - errs <- errors.New("malformed withdrawal_id: " + err.Error()) - return - } - providerTransactionId := payload[2] - - wa.Attest(withdrawalRowId, providerTransactionId, errs) -} diff --git a/c2ec/simulation-client.go b/c2ec/simulation-client.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "fmt" "time" ) @@ -26,11 +27,17 @@ func (st *SimulationTransaction) AllowWithdrawal() bool { return st.allow } +func (st *SimulationTransaction) Bytes() []byte { + + return bytes.NewBufferString("this is a simulated transaction and therefore has no content.").Bytes() +} + func (sc *SimulationClient) SetupClient(p *Provider) error { fmt.Println("setting up simulation client. probably not what you want in production") - sc.providerBackendAttestationDelayMs = 1000 // one second, might be a lot but for testing this is good. + sc.AllowNextWithdrawal = true + sc.providerBackendAttestationDelayMs = 1000 // one second, might be a lot but for testing this is fine. PROVIDER_CLIENTS["Simulation"] = sc return nil } diff --git a/c2ec/wallee-attestor.go b/c2ec/wallee-attestor.go @@ -1,128 +0,0 @@ -package main - -import ( - "context" - "errors" - "strconv" - "strings" - - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgxlisten" -) - -type WalleeAttestor struct { - Attestor[*pgconn.Notification] - - listener *pgxlisten.Listener - provider *Provider - providerClient ProviderClient -} - -func (wa *WalleeAttestor) Setup(p *Provider) (chan *pgconn.Notification, error) { - - connectionString := PostgresConnectionString(&CONFIG.Database) - - dbCfg, err := pgx.ParseConfig(connectionString) - if err != nil { - return nil, err - } - - wa.provider = p - - wa.providerClient = new(WalleeClient) - err = wa.providerClient.SetupClient(wa.provider) - if err != nil { - return nil, err - } - - notificationChannel := make(chan *pgconn.Notification, PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE) - wa.listener = newChannelListener(dbCfg, PS_PAYMENT_NOTIFICATION_CHANNEL, notificationChannel) - return notificationChannel, nil -} - -func (wa *WalleeAttestor) Listen( - ctx context.Context, - notificationChannel chan *pgconn.Notification, - errs chan error, -) error { - - if wa.listener == nil { - return errors.New("attestor needs to be setup first") - } - - go func() { - err := wa.listener.Listen(ctx) - if err != nil { - LogError("wallee-attestor", err) - errs <- err - } - close(notificationChannel) - close(errs) - }() - - // Listen is started async. We can therefore block here and must - // not run the retrieval logic in own goroutine - for { - select { - case notification := <-notificationChannel: - // the dispatching can be done asynchronously - go wa.dispatch(notification, errs) - case <-ctx.Done(): - close(notificationChannel) - close(errs) - } - } -} - -func (wa *WalleeAttestor) Attest(withdrawalId int, providerTransactionId string, errs chan error) { - - transaction, err := wa.providerClient.GetTransaction(providerTransactionId) - if err != nil { - // TODO : do we abort the withdrawal here?? - errs <- err - } - - if transaction.AllowWithdrawal() { - - err = DB.FinaliseWithdrawal(withdrawalId, CONFIRMED, transaction.Bytes()) - if err != nil { - // TODO : do we abort the withdrawal here?? - errs <- err - } - } else { - // TODO : this might be too early ?! What if the payment was not yet - // processed by the Wallee backend? Needs testing. - err = DB.FinaliseWithdrawal(withdrawalId, ABORTED, transaction.Bytes()) - if err != nil { - // TODO : do we abort the withdrawal here?? - errs <- err - } - } -} - -func (wa *WalleeAttestor) dispatch(notification *pgconn.Notification, errs chan error) { - - // The payload is formatted like: "{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}" - // the validation is strict. This means, that the dispatcher emits an error - // and returns, if a property is malformed. - payload := strings.Split(notification.Payload, "|") - if len(payload) != 3 { - errs <- errors.New("malformed notification payload: " + notification.Payload) - return - } - - provider := payload[0] - if provider != "Wallee" { - // the Wallee attestor can only handle wallee transactions - return - } - withdrawalRowId, err := strconv.Atoi(payload[1]) - if err != nil { - errs <- errors.New("malformed withdrawal_id: " + err.Error()) - return - } - providerTransactionId := payload[2] - - wa.Attest(withdrawalRowId, providerTransactionId, errs) -} diff --git a/c2ec/wire-gateway.go b/c2ec/wire-gateway.go @@ -62,11 +62,11 @@ type IncomingReserveTransaction struct { func NewIncomingReserveTransaction(w *Withdrawal) *IncomingReserveTransaction { t := new(IncomingReserveTransaction) - t.Amount = Amount{ - Value: uint64(w.Amount.Val), - Fraction: uint64(w.Amount.Frac), - Currency: w.Amount.Curr, - } + // t.Amount = Amount{ + // Value: uint64(w.Amount.Val), + // Fraction: uint64(w.Amount.Frac), + // Currency: w.Amount.Curr, + // } t.Date = Timestamp{ Ts: int(w.RegistrationTs), } diff --git a/simulation/c2ec-simulation b/simulation/c2ec-simulation Binary files differ. diff --git a/simulation/http-util.go b/simulation/http-util.go @@ -178,7 +178,7 @@ func HttpGet[T any]( } req.Header.Add("Accept", codec.HttpApplicationContentHeader()) - fmt.Printf("requesting GET %s\n", url) + fmt.Printf("HTTP : requesting GET %s\n", url) res, err := http.DefaultClient.Do(req) if err != nil { return nil, -1, err diff --git a/simulation/main.go b/simulation/main.go @@ -7,7 +7,7 @@ import ( const DISABLE_DELAYS = true -const C2EC_BASE_URL = "http://localhost:8081" +const C2EC_BASE_URL = "http://localhost:8082" const C2EC_BANK_BASE_URL = C2EC_BASE_URL + "/c2ec" const C2EC_BANK_CONFIG_URL = C2EC_BANK_BASE_URL + "/config" const C2EC_BANK_WITHDRAWAL_STATUS_URL = C2EC_BANK_BASE_URL + "/withdrawal-operation/:wopid" diff --git a/simulation/sim-terminal.go b/simulation/sim-terminal.go @@ -19,36 +19,34 @@ const TERMINAL_USER_ID = TERMINAL_PROVIDER + "-1" // retrieved from the cli tool when added the terminal const TERMINAL_ACCESS_TOKEN = "secret" -const SIM_TERMINAL_LONG_POLL_MS_STR = "5000" // 20 seconds +const SIM_TERMINAL_LONG_POLL_MS_STR = "20000" // 20 seconds -const QR_CODE_CONTENT_BASE = "taler://withdraw/localhost:8081/c2ec/" +const QR_CODE_CONTENT_BASE = "taler://withdraw/localhost:8082/c2ec/" func Terminal(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalInteraction, kill chan error) { - fmt.Println("Terminal idle... awaiting readiness message of sim-wallet") + fmt.Println("TERMINAL: Terminal idle... awaiting readiness message of sim-wallet") <-in - fmt.Println("Sim-Wallet ready, simulating QR-Code scan... sending withdrawal uri to wallet") + fmt.Println("TERMINAL: Sim-Wallet ready, generating WOPID... ") wopidBytes := make([]byte, 32) _, err := rand.Read(wopidBytes) if err != nil { - fmt.Println("failed creating the wopid:", err.Error(), "(ends simulation)") + fmt.Println("TERMINAL: failed creating the wopid:", err.Error(), "(ends simulation)") kill <- err } wopid := base64.URLEncoding.EncodeToString(wopidBytes) - fmt.Println("Generated Nonce (base64 url encoded):", wopid) + fmt.Println("TERMINAL: Generated Nonce (base64 url encoded):", wopid) uri := QR_CODE_CONTENT_BASE + wopid - fmt.Println("Taler Withdrawal URI:", uri) + fmt.Println("TERMINAL: Taler Withdrawal URI:", uri) // note for realworld implementation // -> start long polling always before showing the QR code awaitSelection := make(chan *C2ECWithdrawalStatus) longPollFailed := make(chan error) - out <- &SimulatedPhysicalInteraction{Msg: uri} - - fmt.Println("now sending long poll request to c2ec from terminal and await parameter selection") + fmt.Println("TERMINAL: now sending long poll request to c2ec from terminal and await parameter selection") go func() { url := FormatUrl( @@ -73,20 +71,34 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysical awaitSelection <- response }() + fmt.Println("Go is too fast :) ... need to sleep a bit that long polling request is guaranteed to be executed before the POST of the registration. This won't be a problem in real world appliance.") + time.Sleep(time.Duration(10) * time.Millisecond) + + if !DISABLE_DELAYS { + fmt.Println("TERMINAL: simulating QR Code scan. delay:", WALLET_SCAN_QR_CODE_DELAY_MS) + time.Sleep(time.Duration(WALLET_SCAN_QR_CODE_DELAY_MS) * time.Millisecond) + } else { + fmt.Println("TERMINAL: simulating QR Code scan.") + } + out <- &SimulatedPhysicalInteraction{Msg: uri} for { select { case w := <-awaitSelection: - fmt.Println("selected parameter:", w.ReservePubKey) - fmt.Println("simulating user interaction. customer presents card. delay:", TERMINAL_ACCEPT_CARD_DELAY_MS) + fmt.Println("TERMINAL: selected parameter:", w.ReservePubKey) if !DISABLE_DELAYS { + fmt.Println("TERMINAL: simulating user interaction. customer presents card. delay:", TERMINAL_ACCEPT_CARD_DELAY_MS) time.Sleep(time.Duration(TERMINAL_ACCEPT_CARD_DELAY_MS)) + } else { + fmt.Println("TERMINAL: simulating user interaction. customer presents card.") } - fmt.Println("card accepted. terminal waits for response of provider backend.") if !DISABLE_DELAYS { + fmt.Println("TERMINAL: card accepted. terminal waits for response of provider backend. delay:", PROVIDER_BACKEND_PAYMENT_DELAY_MS) time.Sleep(time.Duration(PROVIDER_BACKEND_PAYMENT_DELAY_MS)) + } else { + fmt.Println("TERMINAL: card accepted. terminal waits for response of provider backend.") } - fmt.Println("payment was processed at the provider backend. sending payment notification.") + fmt.Println("TERMINAL: payment was processed at the provider backend. sending payment notification.") paymentNotification := &C2ECPaymentNotification{ ProviderTransactionId: "simulation-transaction-id-0", Amount: Amount{ @@ -103,7 +115,7 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysical cdc := NewJsonCodec[C2ECPaymentNotification]() pnbytes, err := cdc.EncodeToBytes(paymentNotification) if err != nil { - fmt.Println("failed serializing payment notification") + fmt.Println("TERMINAL: failed serializing payment notification") kill <- err } paymentUrl := FormatUrl( @@ -117,12 +129,12 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysical bytes.NewReader(pnbytes), ) if err != nil { - fmt.Println("error on POST request:", err.Error()) + fmt.Println("TERMINAL: error on POST request:", err.Error()) kill <- err } - fmt.Println("Terminal flow ended") + fmt.Println("TERMINAL: Terminal flow ended") case f := <-longPollFailed: - fmt.Println("long-polling for selection failed... error:", err.Error()) + fmt.Println("TERMINAL: long-polling for selection failed... error:", err.Error()) kill <- f } } diff --git a/simulation/sim-wallet.go b/simulation/sim-wallet.go @@ -17,19 +17,19 @@ const SIM_WALLET_LONG_POLL_MS_STR = "5000" // 20 seconds func Wallet(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalInteraction, kill chan error) { - fmt.Println("Wallet started. Signaling terminal readiness (this is simulation specific)") + fmt.Println("WALLET : Wallet started. Signaling terminal readiness (this is simulation specific)") out <- &SimulatedPhysicalInteraction{Msg: "wallet ready"} uriFromQrCode := <-in if !DISABLE_DELAYS { time.Sleep(time.Duration(WALLET_SCAN_QR_CODE_DELAY_MS) * time.Millisecond) } - fmt.Println("simulated QR code scanning... scanned", uriFromQrCode) + fmt.Println("WALLET : simulated QR code scanning... scanned", uriFromQrCode) wopid, err := parseTalerWithdrawUri(uriFromQrCode.Msg) if err != nil { - fmt.Println("failed parsing taler withdraw uri. error:", err.Error()) + fmt.Println("WALLET : failed parsing taler withdraw uri. error:", err.Error()) } - fmt.Println("Wallet parsed wopid:", wopid) + fmt.Println("WALLET : Wallet parsed wopid:", wopid) // Register Withdrawal registrationUrl := FormatUrl( @@ -50,10 +50,12 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalIn reg.TerminalId = uint64(tid) body, err := cdc.EncodeToBytes(reg) regByte := bytes.NewBuffer(body) - // fmt.Println("body (bytes):", regByte.Bytes()) + // fmt.Println("WALLET : body (bytes):", regByte.Bytes()) if err != nil { kill <- err } + fmt.Println("WALLET : wallet sends withdrawal registration request with freshly generated public key.") + fmt.Printf("HTTP : requesting POST %s\n", registrationUrl) res, err := http.Post( registrationUrl, cdc.HttpApplicationContentHeader(), @@ -61,22 +63,20 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalIn ) if err != nil { - fmt.Println("error on POST request:", err.Error()) + fmt.Println("WALLET : error on POST request:", err.Error()) kill <- err } if res.StatusCode != 204 { - fmt.Println("response status from registration:", res.StatusCode) + fmt.Println("WALLET : response status from registration:", res.StatusCode) kill <- errors.New("failed registering the withdrawal parameters") } - fmt.Println("wallet sends withdrawal registration request with freshly generated public key.") - // Start long poll for confirmed or abort awaitConfirmationOrAbortion := make(chan *C2ECWithdrawalStatus) longPollFailed := make(chan error) - // long poll for parameter selection notification by c2ec + // long poll for parameter selection notification by c2ec go func() { url := FormatUrl( C2EC_BANK_WITHDRAWAL_STATUS_URL, @@ -102,17 +102,17 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalIn for { select { case w := <-awaitConfirmationOrAbortion: - fmt.Println("payment processed:", w.Status) + fmt.Println("WALLET : payment processed:", w.Status) if w.Status == CONFIRMED { - fmt.Println("the exchange would now create the reserve and the wallet can withdraw the reserve") + fmt.Println("WALLET : the exchange would now create the reserve and the wallet can withdraw the reserve") os.Exit(0) } if w.Status == ABORTED { - fmt.Println("the withdrawal was aborted. c2ec cleans up withdrawal") + fmt.Println("WALLET : the withdrawal was aborted. c2ec cleans up withdrawal") os.Exit(0) } case f := <-longPollFailed: - fmt.Println("long-polling for selection failed... error:", f.Error()) + fmt.Println("WALLET : long-polling for selection failed... error:", f.Error()) kill <- f } }