commit 5e8a195e45ea5dbdd5b4c4a3b329f47f851ac901
parent 3856219284e7c27ae70ad8ffb96aec38ba19627c
Author: Joel-Haeberli <haebu@rubigen.ch>
Date: Mon, 1 Apr 2024 21:54:14 +0200
code: add simulation client and attestor
Diffstat:
15 files changed, 353 insertions(+), 37 deletions(-)
diff --git a/c2ec/attestor.go b/c2ec/attestor.go
@@ -26,7 +26,7 @@ type Attestor[T any] interface {
// be retrieved through the supplied error channel.
Listen(ctx context.Context, notificationChannel chan *T, errs chan error) error
// Attests a single withdrawal.
- Attest(withdrawalId int, providerTransactionId string, errs chan error) error
+ Attest(withdrawalId int, providerTransactionId string, errs chan error)
}
// Sets up and runs an attestor in the background. This must be called at startup.
diff --git a/c2ec/bank-integration.go b/c2ec/bank-integration.go
@@ -269,6 +269,18 @@ func handlePaymentNotification(res http.ResponseWriter, req *http.Request) {
paymentNotification.Amount,
paymentNotification.Amount,
)
+ if err != nil {
+ err := WriteProblem(res, HTTP_BAD_REQUEST, &RFC9457Problem{
+ TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_PAYMENT_NOTIFICATION_FAILED",
+ Title: "payment notification failed",
+ Detail: "the payment notification failed during the processing of the message: " + err.Error(),
+ Instance: req.RequestURI,
+ })
+ if err != nil {
+ res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR)
+ }
+ return
+ }
res.WriteHeader(HTTP_NO_CONTENT)
}
@@ -286,11 +298,24 @@ func getWithdrawalOrWriteError(wopid string, res http.ResponseWriter, reqUri str
withdrawal, err := DB.GetWithdrawalByWopid(wopid)
if err != nil {
- // TODO : What, if just no entry has been found -> 404?
err := WriteProblem(res, HTTP_INTERNAL_SERVER_ERROR, &RFC9457Problem{
TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_WITHDRAWAL_STATUS_DB_FAILURE",
Title: "database failure",
- Detail: "the registration of the withdrawal failed due to db failure (error:" + err.Error() + ")",
+ Detail: "db failure while requesting withdrawal (error:" + err.Error() + ")",
+ Instance: reqUri,
+ })
+ if err != nil {
+ res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR)
+ }
+ return
+ }
+
+ if withdrawal == nil {
+ // not found -> 404
+ err := WriteProblem(res, HTTP_NOT_FOUND, &RFC9457Problem{
+ TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_WITHDRAWAL_NOT_FOUND",
+ Title: "Not Found",
+ Detail: "No withdrawal with wopid=" + wopid + " could been found.",
Instance: reqUri,
})
if err != nil {
diff --git a/c2ec/c2ec b/c2ec/c2ec
Binary files differ.
diff --git a/c2ec/c2ec-config.yaml b/c2ec/c2ec-config.yaml
@@ -1,8 +1,10 @@
c2ec:
+ prod: false
host: "localhost"
port: 8081
- unix-domain-socket: true
+ unix-domain-socket: false
unix-socket-path: "c2ec.sock"
+ fail-on-missing-attestors: false # forced if prod=true
db:
host: "localhost"
port: 5432
@@ -11,3 +13,4 @@ db:
database: "postgres"
providers:
- "Wallee"
+ - "Simulation"
diff --git a/c2ec/config.go b/c2ec/config.go
@@ -13,10 +13,12 @@ type C2ECConfig struct {
}
type C2ECServerConfig struct {
+ IsProd bool `yaml:"prod"`
Host string `yaml:"host"`
Port int `yaml:"port"`
UseUnixDomainSocket bool `yaml:"unix-domain-socket"`
UnixSocketPath string `yaml:"unix-socket-path"`
+ StrictAttestors bool `yaml:"fail-on-missing-attestors"`
}
type C2ECDatabseConfig struct {
diff --git a/c2ec/db.go b/c2ec/db.go
@@ -4,20 +4,20 @@ import (
"context"
)
-const PROVIDER_TABLE_NAME = "provider"
+const PROVIDER_TABLE_NAME = "c2ec.provider"
const PROVIDER_FIELD_NAME_ID = "terminal_id"
const PROVIDER_FIELD_NAME_NAME = "name"
const PROVIDER_FIELD_NAME_BACKEND_URL = "backend_base_url"
const PROVIDER_FIELD_NAME_BACKEND_CREDENTIALS = "backend_credentials"
-const TERMINAL_TABLE_NAME = "terminal"
+const TERMINAL_TABLE_NAME = "c2ec.terminal"
const TERMINAL_FIELD_NAME_ID = "terminal_id"
const TERMINAL_FIELD_NAME_ACCESS_TOKEN = "access_token"
const TERMINAL_FIELD_NAME_ACTIVE = "active"
const TERMINAL_FIELD_NAME_DESCRIPTION = "description"
const TERMINAL_FIELD_NAME_PROVIDER_ID = "provider_id"
-const WITHDRAWAL_TABLE_NAME = "withdrawal"
+const WITHDRAWAL_TABLE_NAME = "c2ec.withdrawal"
const WITHDRAWAL_FIELD_NAME_ID = "withdrawal_id"
const WITHDRAWAL_FIELD_NAME_WOPID = "wopid"
const WITHDRAWAL_FIELD_NAME_RESPUBKEY = "reserve_pub_key"
@@ -110,8 +110,8 @@ type C2ECDatabase interface {
completionProof []byte,
) error
- // Get a provider entry by its identifier
- GetTerminalProviderById(id int) (*Provider, error)
+ // Get a provider entry by its name
+ GetTerminalProviderByName(name string) (*Provider, error)
// Get a terminal entry by its identifier
GetTerminalById(id int) (*Terminal, error)
diff --git a/c2ec/db/0000-c2ec_test.sql b/c2ec/db/0000-c2ec_test.sql
@@ -0,0 +1,17 @@
+BEGIN;
+
+SET search_path TO c2ec;
+
+DROP TABLE IF EXISTS p_id;
+
+INSERT INTO provider (name, backend_base_url, backend_credentials)
+ VALUES ('Simulation', 'will be simulated', 'no creds');
+
+SELECT provider_id INTO p_id FROM provider WHERE name = 'Simulation';
+
+INSERT INTO terminal (access_token, description, provider_id)
+ VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', 'this is a simulated terminal', (SELECT * FROM p_id));
+
+COMMIT;
+
+SELECT * FROM provider;
diff --git a/c2ec/db/0000-c2ec_test_rollback.sql b/c2ec/db/0000-c2ec_test_rollback.sql
@@ -0,0 +1,17 @@
+BEGIN;
+
+SET search_path TO c2ec;
+
+DROP TABLE IF EXISTS p_r_id;
+DROP TABLE IF EXISTS t_r_id;
+
+SELECT provider_id INTO p_r_id FROM provider WHERE name = 'Simulation';
+SELECT terminal_id INTO t_r_id FROM terminal WHERE provider_id = (SELECT * FROM p_r_id);
+
+DELETE FROM withdrawal WHERE terminal_id = (SELECT * FROM t_r_id);
+DELETE FROM terminal WHERE provider_id = (SELECT * FROM p_r_id);
+DELETE FROM provider WHERE provider_id = (SELECT * FROM p_r_id);
+
+COMMIT;
+
+SELECT * FROM provider;
+\ No newline at end of file
diff --git a/c2ec/http-util.go b/c2ec/http-util.go
@@ -40,8 +40,8 @@ func WriteProblem(res http.ResponseWriter, status int, problem *RFC9457Problem)
return err
}
- res.Write(problm)
res.WriteHeader(status)
+ res.Write(problm)
return nil
}
@@ -73,6 +73,19 @@ func AcceptOptionalParamOrWriteResponse[T any](
return nil, false
}
+ if ptr == nil {
+ err := WriteProblem(res, HTTP_INTERNAL_SERVER_ERROR, &RFC9457Problem{
+ TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_INVALID_REQUEST_QUERY_PARAMETER",
+ Title: "invalid request query parameter",
+ Detail: "the withdrawal status request parameter '" + name + "' resulted in a nil pointer)",
+ Instance: req.RequestURI,
+ })
+ if err != nil {
+ res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR)
+ }
+ return nil, false
+ }
+
obj := *ptr
assertedObj, ok := any(obj).(T)
if !ok {
diff --git a/c2ec/main.go b/c2ec/main.go
@@ -1,6 +1,8 @@
package main
import (
+ "context"
+ "errors"
"fmt"
"net"
http "net/http"
@@ -26,9 +28,10 @@ var DB C2ECDatabase
// The startup follows these steps:
// 1. load configuration or panic
// 2. setup database or panic
-// 3. setup routes for the bank-integration-api
-// 4. setup routes for the wire-gateway-api
-// 5. listen for incoming requests (as specified in config)
+// 3. setup attestors
+// 4. setup routes for the bank-integration-api
+// 5. setup routes for the wire-gateway-api
+// 6. listen for incoming requests (as specified in config)
func main() {
cfgPath := DEFAULT_C2EC_CONFIG_PATH
@@ -40,11 +43,16 @@ func main() {
panic("unable to load config: " + err.Error())
}
- DB, err = setupDatabase(cfg.Database)
+ DB, err = setupDatabase(&cfg.Database)
if err != nil {
panic("unable initialize datatbase: " + err.Error())
}
+ err = setupAttestors(cfg)
+ if err != nil {
+ panic("unable initialize attestors: " + err.Error())
+ }
+
router := http.NewServeMux()
setupBankIntegrationRoutes(router)
@@ -83,9 +91,59 @@ func main() {
}
}
-func setupDatabase(cfg C2ECDatabseConfig) (C2ECDatabase, error) {
+func setupDatabase(cfg *C2ECDatabseConfig) (C2ECDatabase, error) {
- return NewC2ECPostgres(&cfg)
+ return NewC2ECPostgres(cfg)
+}
+
+func setupAttestors(cfg *C2ECConfig) error {
+
+ if DB == nil {
+ return errors.New("setup database first")
+ }
+
+ for _, providerName := range cfg.Providers {
+
+ p, err := DB.GetTerminalProviderByName(providerName)
+ if err != nil {
+ return err
+ }
+
+ if p == nil {
+ if cfg.Server.IsProd || cfg.Server.StrictAttestors {
+ panic("no provider entry for " + providerName)
+ } else {
+ fmt.Println("non-strict attestor initialization. skipping", providerName)
+ continue
+ }
+ }
+
+ if !cfg.Server.IsProd {
+ // Prevent simulation provider to be loaded in productive environments.
+ if p.Name == "Simulation" {
+ attestor := new(SimulationAttestor)
+ errs, err := RunAttestor(context.Background(), attestor, p, &cfg.Database)
+ if err != nil {
+ return err
+ }
+ go EndlessChannelResultPrinter("Simulation Attestor Error:", errs)
+ }
+ }
+
+ if p.Name == "Wallee" {
+ attestor := new(WalleeAttestor)
+ errs, err := RunAttestor(context.Background(), attestor, p, &cfg.Database)
+ if err != nil {
+ return err
+ } else {
+ go EndlessChannelResultPrinter("Wallee Attestor Error:", errs)
+ }
+ }
+
+ // For new added provider, add the respective if-clause
+ }
+
+ return nil
}
func setupBankIntegrationRoutes(router *http.ServeMux) {
@@ -143,3 +201,10 @@ func setupWireGatewayRoutes(router *http.ServeMux) {
adminAddIncoming,
)
}
+
+func EndlessChannelResultPrinter[T any](prefix string, c chan T) {
+
+ out := <-c
+ fmt.Println(prefix, out)
+ EndlessChannelResultPrinter(prefix, c)
+}
diff --git a/c2ec/postgres.go b/c2ec/postgres.go
@@ -43,8 +43,8 @@ const PS_FINALISE_PAYMENT = "UPDATE " + WITHDRAWAL_TABLE_NAME + " SET (" +
const PS_GET_WITHDRAWAL_BY_WOPID = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
" WHERE " + WITHDRAWAL_FIELD_NAME_WOPID + "=$1"
-const PS_GET_PROVIDER_BY_ID = "SELECT * FROM " + PROVIDER_TABLE_NAME +
- " WHERE " + PROVIDER_FIELD_NAME_ID + "=$1"
+const PS_GET_PROVIDER_BY_NAME = "SELECT * FROM " + PROVIDER_TABLE_NAME +
+ " WHERE " + PROVIDER_FIELD_NAME_NAME + "=$1"
const PS_GET_TERMINAL_BY_ID = "SELECT * FROM " + TERMINAL_TABLE_NAME +
" WHERE " + TERMINAL_FIELD_NAME_ID + "=$1"
@@ -134,6 +134,9 @@ func (db *C2ECPostgres) GetWithdrawalByWopid(wopid string) (*Withdrawal, error)
return nil, err
}
+ if len(withdrawals) < 1 {
+ return nil, nil
+ }
return withdrawals[0], nil
}
}
@@ -206,12 +209,12 @@ func (db *C2ECPostgres) FinaliseWithdrawal(
return nil
}
-func (db *C2ECPostgres) GetTerminalProviderById(id int) (*Provider, error) {
+func (db *C2ECPostgres) GetTerminalProviderByName(name string) (*Provider, error) {
if row, err := db.pool.Query(
db.ctx,
- PS_GET_PROVIDER_BY_ID,
- id,
+ PS_GET_PROVIDER_BY_NAME,
+ name,
); err != nil {
if row != nil {
row.Close()
@@ -226,6 +229,10 @@ func (db *C2ECPostgres) GetTerminalProviderById(id int) (*Provider, error) {
return nil, err
}
+ if len(provider) < 1 {
+ return nil, nil
+ }
+
return provider[0], nil
}
}
diff --git a/c2ec/simulation-attestor.go b/c2ec/simulation-attestor.go
@@ -0,0 +1,125 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "strconv"
+ "strings"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgconn"
+ "github.com/jackc/pgxlisten"
+)
+
+type SimulationAttestor struct {
+ listener *pgxlisten.Listener
+ provider *Provider
+ providerClient ProviderClient[WalleeClient]
+}
+
+func (wa *SimulationAttestor) Setup(p *Provider, cfg *C2ECDatabseConfig) (chan *pgconn.Notification, error) {
+
+ connectionString := PostgresConnectionString(cfg)
+
+ dbCfg, err := pgx.ParseConfig(connectionString)
+ if err != nil {
+ panic(err.Error())
+ }
+
+ wa.provider = p
+
+ wa.providerClient = new(SimulationClient)
+ err = wa.providerClient.SetupClient(wa.provider)
+ if err != nil {
+ panic(err.Error())
+ }
+
+ notificationChannel := make(chan *pgconn.Notification, PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE)
+ wa.listener = newChannelListener(dbCfg, PS_PAYMENT_NOTIFICATION_CHANNEL, notificationChannel)
+ return notificationChannel, nil
+}
+
+func (wa *SimulationAttestor) Listen(
+ ctx context.Context,
+ notificationChannel chan *pgconn.Notification,
+ errs chan error,
+) error {
+
+ if wa.listener == nil {
+ return errors.New("attestor needs to be setup first")
+ }
+
+ go func() {
+ err := wa.listener.Listen(ctx)
+ if err != nil {
+ errs <- err
+ }
+ close(notificationChannel)
+ close(errs)
+ }()
+
+ // Listen is started async. We can therefore block here and must
+ // not run the retrieval logic in own goroutine
+ for {
+ select {
+ case notification := <-notificationChannel:
+ // the dispatching can be done asynchronously
+ go wa.dispatch(notification, errs)
+ case <-ctx.Done():
+ close(notificationChannel)
+ close(errs)
+ }
+ }
+}
+
+func (wa *SimulationAttestor) Attest(withdrawalId int, providerTransactionId string, errs chan error) {
+
+ transaction, err := wa.providerClient.GetTransaction(providerTransactionId)
+ if err != nil {
+ // TODO : do we abort the withdrawal here??
+ errs <- err
+ }
+
+ if transaction.AllowWithdrawal() {
+
+ err = DB.FinaliseWithdrawal(withdrawalId, CONFIRMED, transaction.Bytes())
+ if err != nil {
+ // TODO : do we abort the withdrawal here??
+ errs <- err
+ }
+ } else {
+ // TODO : this might be too early ?! What if the payment was not yet
+ // processed by the Wallee backend? Needs testing.
+ err = DB.FinaliseWithdrawal(withdrawalId, ABORTED, transaction.Bytes())
+ if err != nil {
+ // TODO : do we abort the withdrawal here??
+ errs <- err
+ }
+ }
+}
+
+func (wa *SimulationAttestor) dispatch(notification *pgconn.Notification, errs chan error) {
+
+ // The payload is formatted like: "{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}"
+ // the validation is strict. This means, that the dispatcher emits an error
+ // and returns, if a property is malformed.
+ payload := strings.Split(notification.Payload, "|")
+ if len(payload) != 3 {
+ errs <- errors.New("malformed notification payload: " + notification.Payload)
+ return
+ }
+
+ provider := payload[0]
+ if provider != "Simulation" {
+ // the Simulation attestor can only handle wallee transactions
+ return
+ }
+ withdrawalRowId, err := strconv.Atoi(payload[1])
+ if err != nil {
+ errs <- errors.New("malformed withdrawal_id: " + err.Error())
+ return
+ }
+ providerTransactionId := payload[2]
+
+ wa.Attest(withdrawalRowId, providerTransactionId, errs)
+}
diff --git a/c2ec/simulation-client.go b/c2ec/simulation-client.go
@@ -0,0 +1,43 @@
+package main
+
+import (
+ "fmt"
+)
+
+type SimulationTransaction struct {
+ ProviderTransaction
+
+ allow bool
+}
+
+type SimulationClient struct {
+ ProviderClient[SimulationTransaction]
+
+ // toggle this to simulate failed transactions.
+ AllowNextWithdrawal bool
+}
+
+func (st *SimulationTransaction) AllowWithdrawal() bool {
+
+ return st.allow
+}
+
+func (*SimulationClient) SetupClient(p *Provider) error {
+
+ fmt.Println("setting up simulation client. probably not what you want in production")
+ return nil
+}
+
+func (sc *SimulationClient) GetTransaction(transactionId string) (ProviderTransaction, error) {
+
+ fmt.Println("getting transaction from simulation provider")
+ st := new(SimulationTransaction)
+ st.allow = sc.AllowNextWithdrawal
+ return st, nil
+}
+
+func (*SimulationClient) Refund(transactionId string) error {
+
+ fmt.Println("refund triggered for simulation provider with transaction id: ", transactionId)
+ return nil
+}
diff --git a/c2ec/wallee-attestor.go b/c2ec/wallee-attestor.go
@@ -49,21 +49,6 @@ func (wa *WalleeAttestor) Listen(
return errors.New("attestor needs to be setup first")
}
- // we must listen for notifications async
- go func() {
- for {
- select {
- case notification := <-notificationChannel:
- // the dispatching can be done asynchronously
- go wa.dispatch(notification, errs)
- case <-ctx.Done():
- close(notificationChannel)
- close(errs)
- return
- }
- }
- }()
-
go func() {
err := wa.listener.Listen(ctx)
if err != nil {
@@ -73,7 +58,18 @@ func (wa *WalleeAttestor) Listen(
close(errs)
}()
- return nil
+ // Listen is started async. We can therefore block here and must
+ // not run the retrieval logic in own goroutine
+ for {
+ select {
+ case notification := <-notificationChannel:
+ // the dispatching can be done asynchronously
+ go wa.dispatch(notification, errs)
+ case <-ctx.Done():
+ close(notificationChannel)
+ close(errs)
+ }
+ }
}
func (wa *WalleeAttestor) Attest(withdrawalId int, providerTransactionId string, errs chan error) {
diff --git a/c2ec/wire-gateway.go b/c2ec/wire-gateway.go
@@ -103,6 +103,7 @@ func historyIncoming(res http.ResponseWriter, req *http.Request) {
// This method is currently dead and implemented for API conformance
func historyOutgoing(res http.ResponseWriter, req *http.Request) {
+ // not implemented, because not used
res.WriteHeader(HTTP_BAD_REQUEST)
}
@@ -125,5 +126,6 @@ type AddIncomingResponse struct {
// This method is currently dead and implemented for API conformance
func adminAddIncoming(res http.ResponseWriter, req *http.Request) {
+ // not implemented, because not used
res.WriteHeader(HTTP_BAD_REQUEST)
}