summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel-Haeberli <haebu@rubigen.ch>2024-04-22 21:52:28 +0200
committerJoel-Haeberli <haebu@rubigen.ch>2024-04-22 21:52:28 +0200
commit0bc574725b901e50910acb11ba21180d4c6fcd2f (patch)
tree49b0b876c4e167b05772df77debb68de487ce0fb
parent39f0572004f1cc8d9ee999eb47630e2e8a7099ca (diff)
downloadcashless2ecash-0bc574725b901e50910acb11ba21180d4c6fcd2f.tar.gz
cashless2ecash-0bc574725b901e50910acb11ba21180d4c6fcd2f.tar.bz2
cashless2ecash-0bc574725b901e50910acb11ba21180d4c6fcd2f.zip
fix: queries and allow ini config format
-rw-r--r--bruno/c2ec/(LOCAL-WIRE) Transaction History Incoming.bru2
-rw-r--r--c2ec/attestor.go58
-rw-r--r--c2ec/bank-integration.go39
-rw-r--r--c2ec/c2ec-config.conf58
-rw-r--r--c2ec/c2ec-config.yaml1
-rw-r--r--c2ec/config.go168
-rw-r--r--c2ec/db.go52
-rw-r--r--c2ec/db/0001-c2ec_schema.sql23
-rw-r--r--c2ec/db/proc-c2ec_transfer_listener.sql41
-rw-r--r--c2ec/db/procedures.sql41
-rw-r--r--c2ec/encoding.go5
-rw-r--r--c2ec/go.mod1
-rw-r--r--c2ec/go.sum2
-rw-r--r--c2ec/http-util.go88
-rw-r--r--c2ec/http-util_test.go7
-rw-r--r--c2ec/listener.go47
-rw-r--r--c2ec/main.go12
-rw-r--r--c2ec/model.go91
-rw-r--r--c2ec/postgres.go231
-rw-r--r--c2ec/provider.go (renamed from c2ec/provider-client.go)1
-rw-r--r--c2ec/retrier.go55
-rw-r--r--c2ec/simulation-client.go5
-rw-r--r--c2ec/transfer.go122
-rw-r--r--c2ec/utils.go31
-rw-r--r--c2ec/wallee-client.go272
-rw-r--r--c2ec/wallee-models.go197
-rw-r--r--c2ec/wire-gateway.go76
-rw-r--r--docs/content/implementation/c2ec.tex5
-rw-r--r--docs/content/introduction/introduction.tex6
-rw-r--r--docs/thesis.pdfbin1671172 -> 1671263 bytes
-rwxr-xr-xsimulation/c2ec-simulationbin7570759 -> 7570759 bytes
31 files changed, 1081 insertions, 656 deletions
diff --git a/bruno/c2ec/(LOCAL-WIRE) Transaction History Incoming.bru b/bruno/c2ec/(LOCAL-WIRE) Transaction History Incoming.bru
index 4c63876..f847dcb 100644
--- a/bruno/c2ec/(LOCAL-WIRE) Transaction History Incoming.bru
+++ b/bruno/c2ec/(LOCAL-WIRE) Transaction History Incoming.bru
@@ -5,7 +5,7 @@ meta {
}
get {
- url: http://localhost:8081/wire/history/incoming
+ url: http://localhost:8082/wire/history/incoming
body: none
auth: none
}
diff --git a/c2ec/attestor.go b/c2ec/attestor.go
index f268a0a..fb2bda2 100644
--- a/c2ec/attestor.go
+++ b/c2ec/attestor.go
@@ -18,58 +18,16 @@ func RunAttestor(
errs chan error,
) {
- for _, p := range CONFIG.Providers {
- if PROVIDER_CLIENTS[p.Name] == nil {
- err := errors.New("no provider client initialized for provider " + p.Name)
- LogError("attestor", err)
- errs <- err
- }
- }
-
- notifications := make(chan *Notification, PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE)
- go listenCallback(ctx, notifications, errs)
-}
-
-func listenCallback(
- ctx context.Context,
- notifications chan *Notification,
- errs chan error,
-) {
-
- listener, err := NewListener(PS_PAYMENT_NOTIFICATION_CHANNEL, notifications)
- if err != nil {
- LogError("attestor", err)
- errs <- errors.New("attestor needs to be setup first")
- }
-
- go func() {
- LogInfo("attestor", "attestor starts listening for payment notifications at the db")
- err := listener.Listen(ctx)
- if err != nil {
- LogError("attestor-listener", err)
- errs <- err
- }
- close(notifications)
- close(errs)
- }()
-
- // Listen is started async. We can therefore block here and must
- // not run the retrieval logic in own goroutine
- for {
- select {
- case notification := <-notifications:
- // the dispatching and further attestation can be done asynchronously
- // thus not blocking further incoming notifications.
- LogInfo("attestor", "received notification upon payment. channel="+notification.Channel+", payload="+notification.Payload)
- go dispatch(notification, errs)
- case <-ctx.Done():
- errs <- ctx.Err()
- return
- }
- }
+ go RunListener(
+ ctx,
+ PS_PAYMENT_NOTIFICATION_CHANNEL,
+ attestationCallback,
+ make(chan *Notification, PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE),
+ errs,
+ )
}
-func dispatch(notification *Notification, errs chan error) {
+func attestationCallback(notification *Notification, errs chan error) {
LogInfo("attestor", fmt.Sprintf("retrieved information on channel=%s with payload=%s", notification.Channel, notification.Payload))
diff --git a/c2ec/bank-integration.go b/c2ec/bank-integration.go
index d4d72f0..35710a6 100644
--- a/c2ec/bank-integration.go
+++ b/c2ec/bank-integration.go
@@ -189,10 +189,30 @@ func handleWithdrawalStatus(res http.ResponseWriter, req *http.Request) {
)
defer cancelFunc()
- statusChannel := make(chan WithdrawalOperationStatus)
- errChan := make(chan error)
+ notifications := make(chan *Notification)
+ channel := "w_" + base64.StdEncoding.EncodeToString(wpd)
- go DB.ListenForWithdrawalStatusChange(timeoutCtx, WithdrawalIdentifier(base64.StdEncoding.EncodeToString(wpd)), statusChannel, errChan)
+ listenFunc, err := DB.NewListener(
+ channel,
+ notifications,
+ )
+
+ if err != nil {
+ err := WriteProblem(res, HTTP_NO_CONTENT, &RFC9457Problem{
+ TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_LISTEN_FAILURE",
+ Title: "Failed setting up listener",
+ Detail: fmt.Sprintf("unable to start long polling due to %s", err.Error()),
+ Instance: req.RequestURI,
+ })
+ if err != nil {
+ res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR)
+ }
+ return
+ }
+
+ go listenFunc(timeoutCtx)
+
+ // go DB.ListenForWithdrawalStatusChange(timeoutCtx, WithdrawalIdentifier(base64.StdEncoding.EncodeToString(wpd)), statusChannel, errChan)
for {
select {
case <-timeoutCtx.Done():
@@ -206,18 +226,7 @@ func handleWithdrawalStatus(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR)
}
return
- case err := <-errChan:
- err = WriteProblem(res, HTTP_INTERNAL_SERVER_ERROR, &RFC9457Problem{
- TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_INTERNAL_SERVER_ERROR",
- Title: "internal server error",
- Detail: err.Error(),
- Instance: req.RequestURI,
- })
- if err != nil {
- res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR)
- }
- return
- case <-statusChannel:
+ case <-notifications:
writeWithdrawalOrError(wpd, res, req.RequestURI)
return
}
diff --git a/c2ec/c2ec-config.conf b/c2ec/c2ec-config.conf
new file mode 100644
index 0000000..3d0f94b
--- /dev/null
+++ b/c2ec/c2ec-config.conf
@@ -0,0 +1,58 @@
+[c2ec]
+
+# Will force production specific configuration
+# for example the simulation terminal cannot be
+# used in production
+PROD = false
+
+# tcp or unix
+SERVE = tcp
+
+# only effective when SERVE = tcp
+HOST = localhost
+
+# only effective when SERVE = tcp
+PORT = 8082
+
+# only effective when SERVE = unix
+UNIXPATH = c2ec.sock
+
+# only effective when SERVE = unix
+UNIXPATH_MODE = 660
+
+# how shall the application behave if
+# an attestor is not configured?
+# forced when PROD = true
+FAIL_ON_MISSING_ATTESTORS = false
+
+# The account where the exchange receives payments
+# of the providers. Must be the same, in the providers
+# backend.
+EXCHANGE_ACCOUNT = payto://iban/CH50030202099498
+
+# How many retries shall be triggered, when the attestation
+# of a transaction fails
+MAX_RETRIES = 3
+
+# How long shall the attestations retry be delayed in milliseconds.
+RETRY_DELAY_MS = 1000
+
+[wire-gateway]
+
+USERNAME = wire
+
+PASSWORD = secret
+
+[database]
+
+CONFIG = postgres://local:local@localhost:5432/postgres
+
+[provider-wallee]
+
+NAME = Wallee
+CREDENTIALS_PASSWORD = secret
+
+[provider-simulation]
+
+NAME = Simulation
+CREDENTIALS_PASSWORD = secret
diff --git a/c2ec/c2ec-config.yaml b/c2ec/c2ec-config.yaml
index 7698a60..7796384 100644
--- a/c2ec/c2ec-config.yaml
+++ b/c2ec/c2ec-config.yaml
@@ -4,6 +4,7 @@ c2ec:
port: 8082
unix-domain-socket: false
unix-socket-path: "c2ec.sock"
+ unix-path-mode: 660
fail-on-missing-attestors: false # forced if prod=true
credit-account: "payto://iban/CH50030202099498" # this account must be specified at the providers backends as well
max-retries: 3
diff --git a/c2ec/config.go b/c2ec/config.go
index 25617ff..b6ec403 100644
--- a/c2ec/config.go
+++ b/c2ec/config.go
@@ -3,7 +3,9 @@ package main
import (
"errors"
"os"
+ "strings"
+ "gopkg.in/ini.v1"
"gopkg.in/yaml.v3"
)
@@ -19,6 +21,7 @@ type C2ECServerConfig struct {
Port int `yaml:"port"`
UseUnixDomainSocket bool `yaml:"unix-domain-socket"`
UnixSocketPath string `yaml:"unix-socket-path"`
+ UnixPathMode int `yaml:"unix-path-mode"`
StrictAttestors bool `yaml:"fail-on-missing-attestors"`
CreditAccount string `yaml:"credit-account"`
MaxRetries int32 `yaml:"max-retries"`
@@ -63,8 +66,16 @@ func Parse(path string) (*C2ECConfig, error) {
return nil, err
}
- cfg := new(C2ECConfig)
- err = yaml.Unmarshal(content, cfg)
+ if strings.HasSuffix(path, ".yml") || strings.HasSuffix(path, ".yaml") {
+ cfg := new(C2ECConfig)
+ err = yaml.Unmarshal(content, cfg)
+ if err != nil {
+ return nil, err
+ }
+ return cfg, nil
+ }
+
+ cfg, err := ParseIni(content)
if err != nil {
return nil, err
}
@@ -82,3 +93,156 @@ func ConfigForProvider(name string) (*C2ECProviderConfig, error) {
}
return nil, errors.New("no such provider")
}
+
+func ParseIni(content []byte) (*C2ECConfig, error) {
+
+ ini, err := ini.Load(content)
+ if err != nil {
+ return nil, err
+ }
+
+ cfg := new(C2ECConfig)
+ for _, s := range ini.Sections() {
+
+ if s.Name() == "c2ec" {
+
+ value, err := s.GetKey("PROD")
+ if err != nil {
+ return nil, err
+ }
+
+ cfg.Server.IsProd, err = value.Bool()
+ if err != nil {
+ return nil, err
+ }
+
+ value, err = s.GetKey("SERVE")
+ if err != nil {
+ return nil, err
+ }
+
+ str := value.String()
+ cfg.Server.UseUnixDomainSocket = str == "unix"
+
+ value, err = s.GetKey("HOST")
+ if err != nil {
+ return nil, err
+ }
+
+ cfg.Server.Host = value.String()
+
+ value, err = s.GetKey("PORT")
+ if err != nil {
+ return nil, err
+ }
+
+ cfg.Server.Port, err = value.Int()
+ if err != nil {
+ return nil, err
+ }
+
+ value, err = s.GetKey("UNIXPATH")
+ if err != nil {
+ return nil, err
+ }
+
+ cfg.Server.UnixSocketPath = value.String()
+
+ value, err = s.GetKey("UNIXPATH_MODE")
+ if err != nil {
+ return nil, err
+ }
+
+ cfg.Server.UnixSocketPath = value.String()
+
+ value, err = s.GetKey("FAIL_ON_MISSING_ATTESTORS")
+ if err != nil {
+ return nil, err
+ }
+
+ cfg.Server.StrictAttestors, err = value.Bool()
+ if err != nil {
+ return nil, err
+ }
+
+ value, err = s.GetKey("EXCHANGE_ACCOUNT")
+ if err != nil {
+ return nil, err
+ }
+
+ cfg.Server.CreditAccount = value.String()
+
+ value, err = s.GetKey("MAX_RETRIES")
+ if err != nil {
+ return nil, err
+ }
+
+ num, err := value.Int()
+ if err != nil {
+ return nil, err
+ }
+ cfg.Server.MaxRetries = int32(num)
+
+ value, err = s.GetKey("RETRY_DELAY_MS")
+ if err != nil {
+ return nil, err
+ }
+
+ cfg.Server.RetryDelayMs, err = value.Int()
+ if err != nil {
+ return nil, err
+ }
+
+ }
+
+ if s.Name() == "wire-gateway" {
+
+ value, err := s.GetKey("USERNAME")
+ if err != nil {
+ return nil, err
+ }
+ cfg.Server.CreditAccount = value.String()
+
+ value, err = s.GetKey("PASSWORD")
+ if err != nil {
+ return nil, err
+ }
+ cfg.Server.CreditAccount = value.String()
+ }
+
+ if s.Name() == "database" {
+
+ value, err := s.GetKey("CONFIG")
+ if err != nil {
+ return nil, err
+ }
+
+ connstr := value.String()
+
+ err = os.Setenv("PGHOST", connstr)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if strings.HasPrefix(s.Name(), "provider-") {
+
+ provider := C2ECProviderConfig{}
+
+ value, err := s.GetKey("NAME")
+ if err != nil {
+ return nil, err
+ }
+ provider.Name = value.String()
+
+ value, err = s.GetKey("CREDENTIALS_PASSWORD")
+ if err != nil {
+ return nil, err
+ }
+ provider.CredentialsPassword = value.String()
+
+ cfg.Providers = append(cfg.Providers, provider)
+ }
+ }
+ return cfg, nil
+}
diff --git a/c2ec/db.go b/c2ec/db.go
index 6607415..b48d88f 100644
--- a/c2ec/db.go
+++ b/c2ec/db.go
@@ -39,6 +39,9 @@ const TRANSFER_FIELD_NAME_AMOUNT = "amount"
const TRANSFER_FIELD_NAME_EXCHANGE_BASE_URL = "exchange_base_url"
const TRANSFER_FIELD_NAME_WTID = "wtid"
const TRANSFER_FIELD_NAME_CREDIT_ACCOUNT = "credit_account"
+const TRANSFER_FIELD_NAME_TS = "transfer_ts"
+const TRANSFER_FIELD_NAME_STATUS = "transfer_status"
+const TRANSFER_FIELD_NAME_RETRIES = "retries"
type Provider struct {
ProviderId int64 `db:"provider_id"`
@@ -79,12 +82,14 @@ type TalerAmountCurrency struct {
type Transfer struct {
RowId int `db:"row_id"`
- RequestUid HashCode `db:"request_uid"`
+ RequestUid []byte `db:"request_uid"`
Amount *TalerAmountCurrency `db:"amount"`
ExchangeBaseUrl string `db:"exchange_base_url"`
Wtid string `db:"wtid"`
CreditAccount string `db:"credit_account"`
- TransactionTs int64 `db:"transaction_ts"`
+ TransferTs int64 `db:"transfer_ts"`
+ Status int16 `db:"transfer_status"`
+ Retries int16 `db:"retries"`
}
type Notification struct {
@@ -169,17 +174,26 @@ type C2ECDatabase interface {
GetTerminalById(id int) (*Terminal, error)
// Returns the transfer for the given hashcode.
- GetTransferById(requestUid HashCode) (*Transfer, error)
+ GetTransferById(requestUid []byte) (*Transfer, error)
// Inserts a new transfer into the database.
AddTransfer(
- requestUid HashCode,
+ requestUid []byte,
amount *Amount,
exchangeBaseUrl string,
wtid string,
credit_account string,
) error
+ // Updates the transfer, if retries is changed, the transfer will be
+ // triggered again.
+ UpdateTransfer(
+ requestUid []byte,
+ timestamp int64,
+ status int16,
+ retries int16,
+ ) error
+
// The wire gateway allows the exchange to retrieve transactions
// starting at a certain starting point up until a certain delta
// if the delta is negative, previous transactions relative to the
@@ -187,24 +201,14 @@ type C2ECDatabase interface {
// id shall be used as starting point.
GetTransfers(start int, delta int) ([]*Transfer, error)
- // This will listen for notifications on the
- // channel withdrawal notifications are sent.
- // Results will be written to the out channel.
- // Errors will be propagated through the errs
- // channel. Supply a context with timeout if
- // you want to use time limitations.
- ListenForWithdrawalStatusChange(
- ctx context.Context,
- wopid WithdrawalIdentifier,
- out chan WithdrawalOperationStatus,
- errs chan error,
- )
-
- // A listener can listen for the specified channel.
- // It will send received notifications through the channel
- // supplied. The specific implementation must convert the
- // database specific message to the generic Notification
- // type in order to decouple the database implementation
- // from the rest of the logic.
- Listen(ctx context.Context, channel string) (chan *Notification, chan error, error)
+ // A listener can listen for notifications ont the specified
+ // channel. Returns a listen function, which must be called
+ // by the caller to start listening on the channel. The returned
+ // listen function will return an error if it fails, and takes
+ // a context as argument which allows the underneath implementation
+ // to control the execution context of the listener.
+ NewListener(
+ channel string,
+ out chan *Notification,
+ ) (func(context.Context) error, error)
}
diff --git a/c2ec/db/0001-c2ec_schema.sql b/c2ec/db/0001-c2ec_schema.sql
index 80171c1..7fbfa60 100644
--- a/c2ec/db/0001-c2ec_schema.sql
+++ b/c2ec/db/0001-c2ec_schema.sql
@@ -124,11 +124,13 @@ COMMENT ON INDEX wopid_index
CREATE TABLE IF NOT EXISTS transfer (
request_uid BYTEA UNIQUE PRIMARY KEY,
row_id INT8 GENERATED BY DEFAULT AS IDENTITY,
- amount taler_amount_currency,
- exchange_base_url TEXT,
- wtid TEXT,
- credit_account TEXT,
- transaction_ts INT8,
+ amount taler_amount_currency NOT NULL,
+ exchange_base_url TEXT NOT NULL,
+ wtid TEXT NOT NULL,
+ credit_account TEXT NOT NULL,
+ transfer_ts INT8 NOT NULL,
+ transfer_status INT2 NOT NULL DEFAULT 1,
+ retries INT2 NOT NULL DEFAULT 0
);
COMMENT ON TABLE transfer
IS 'Table storing transfers which are sent by the exchange.';
@@ -144,5 +146,14 @@ COMMENT ON COLUMN transfer.wtid
IS 'The id of the transaction';
COMMENT ON COLUMN transfer.credit_account
IS 'The payto address of the transfer target';
+COMMENT ON COLUMN transfer.transfer_ts
+ IS 'Timestamp when the transfer was last processesd';
+COMMENT ON COLUMN transfer.transfer_status
+ IS 'Non-zero when the transfer failed at the last retry.
+ Zero if transfer succeeded. Negative, when max amount of
+ retries was exceeded. Because the transfer was not yet triggered
+ when it is added, the status is set to 1 by default.';
+COMMENT ON COLUMN transfer.retries
+ IS 'Number of retries';
-COMMIT;
+COMMIT; \ No newline at end of file
diff --git a/c2ec/db/proc-c2ec_transfer_listener.sql b/c2ec/db/proc-c2ec_transfer_listener.sql
new file mode 100644
index 0000000..ded172d
--- /dev/null
+++ b/c2ec/db/proc-c2ec_transfer_listener.sql
@@ -0,0 +1,41 @@
+
+BEGIN;
+
+SELECT _v.register_patch('proc-c2ec-transfer-listener', ARRAY['0001-c2ec-schema'], NULL);
+
+SET search_path TO c2ec;
+
+-- to create a function, the user needs USAGE privilege on arguments and return types
+CREATE OR REPLACE FUNCTION emit_transfer_notification()
+RETURNS TRIGGER AS $$
+BEGIN
+ PERFORM pg_notify('transfer', encode(NEW.request_uid::BYTEA, 'base64'));
+ RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+COMMENT ON FUNCTION emit_transfer_notification
+ IS 'The function emits the request_uid of a transfer which shall trigger a transfer
+ by the receiver of the notification.';
+
+-- for creating a trigger the user must have TRIGGER pivilege on the table.
+-- to execute the trigger, the user needs EXECUTE privilege on the trigger function.
+CREATE OR REPLACE TRIGGER c2ec_on_transfer_failed
+ AFTER INSERT
+ ON transfer
+ FOR EACH ROW
+ EXECUTE FUNCTION emit_transfer_notification();
+COMMENT ON TRIGGER c2ec_on_transfer_failed ON transfer
+ IS 'When a new transfer is set, the transfer shall executed. This trigger aims to
+ trigger this operation at its listeners.';
+
+CREATE OR REPLACE TRIGGER c2ec_on_transfer_failed
+ AFTER UPDATE OF retries
+ ON transfer
+ FOR EACH ROW
+ WHEN (NEW.retries > 0)
+ EXECUTE FUNCTION emit_transfer_notification();
+COMMENT ON TRIGGER c2ec_on_transfer_failed ON transfer
+ IS 'When retries is (re)set this will trigger the notification of the listening
+ receivers, which will further process the transfer';
+
+COMMIT; \ No newline at end of file
diff --git a/c2ec/db/procedures.sql b/c2ec/db/procedures.sql
index 8bac56b..50ff87f 100644
--- a/c2ec/db/procedures.sql
+++ b/c2ec/db/procedures.sql
@@ -119,3 +119,44 @@ COMMENT ON TRIGGER c2ec_on_payment_notify ON withdrawal
trigger the emit to the respective channel.';
COMMIT;
+
+BEGIN;
+
+SELECT _v.register_patch('proc-c2ec-transfer-listener', ARRAY['0001-c2ec-schema'], NULL);
+
+SET search_path TO c2ec;
+
+-- to create a function, the user needs USAGE privilege on arguments and return types
+CREATE OR REPLACE FUNCTION emit_transfer_notification()
+RETURNS TRIGGER AS $$
+BEGIN
+ PERFORM pg_notify('transfer', encode(NEW.request_uid::BYTEA, 'base64'));
+ RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+COMMENT ON FUNCTION emit_transfer_notification
+ IS 'The function emits the request_uid of a transfer which shall trigger a transfer
+ by the receiver of the notification.';
+
+-- for creating a trigger the user must have TRIGGER pivilege on the table.
+-- to execute the trigger, the user needs EXECUTE privilege on the trigger function.
+CREATE OR REPLACE TRIGGER c2ec_on_transfer_failed
+ AFTER INSERT
+ ON transfer
+ FOR EACH ROW
+ EXECUTE FUNCTION emit_transfer_notification();
+COMMENT ON TRIGGER c2ec_on_transfer_failed ON transfer
+ IS 'When a new transfer is set, the transfer shall executed. This trigger aims to
+ trigger this operation at its listeners.';
+
+CREATE OR REPLACE TRIGGER c2ec_on_transfer_failed
+ AFTER UPDATE OF retries
+ ON transfer
+ FOR EACH ROW
+ WHEN (NEW.retries > 0)
+ EXECUTE FUNCTION emit_transfer_notification();
+COMMENT ON TRIGGER c2ec_on_transfer_failed ON transfer
+ IS 'When retries is (re)set this will trigger the notification of the listening
+ receivers, which will further process the transfer';
+
+COMMIT; \ No newline at end of file
diff --git a/c2ec/encoding.go b/c2ec/encoding.go
index 970f062..e00a8f4 100644
--- a/c2ec/encoding.go
+++ b/c2ec/encoding.go
@@ -60,3 +60,8 @@ func ParseEddsaPubKey(key EddsaPublicKey) ([]byte, error) {
return talerBase32Decode(string(key))
}
+
+func FormatEddsaPubKey(key []byte) EddsaPublicKey {
+
+ return EddsaPublicKey(talerBase32Encode(key))
+}
diff --git a/c2ec/go.mod b/c2ec/go.mod
index 0875a33..e47d704 100644
--- a/c2ec/go.mod
+++ b/c2ec/go.mod
@@ -19,4 +19,5 @@ require (
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
+ gopkg.in/ini.v1 v1.67.0 // indirect
)
diff --git a/c2ec/go.sum b/c2ec/go.sum
index f7003f1..7fdb47b 100644
--- a/c2ec/go.sum
+++ b/c2ec/go.sum
@@ -42,6 +42,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
+gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/c2ec/http-util.go b/c2ec/http-util.go
index e4022c9..f753050 100644
--- a/c2ec/http-util.go
+++ b/c2ec/http-util.go
@@ -3,7 +3,6 @@ package main
import (
"bytes"
"errors"
- "fmt"
"io"
"net/http"
"strings"
@@ -185,86 +184,41 @@ func HttpGet[T any](
}
// execute a POST request and parse response or retrieve error
-func HttpPost2[T any, R any](
- req string,
- body *T,
- requestCodec Codec[T],
- responseCodec Codec[R],
-) (*R, int, error) {
-
- return HttpPost(
- req,
- nil,
- nil,
- body,
- requestCodec,
- responseCodec,
- )
-}
-
-// execute a POST request and parse response or retrieve error
// path- and query-parameters can be set to add query and path parameters
func HttpPost[T any, R any](
- req string,
- pathParams map[string]string,
- queryParams map[string]string,
+ url string,
+ headers map[string]string,
body *T,
- requestCodec Codec[T],
- responseCodec Codec[R],
+ reqCodec Codec[T],
+ resCodec Codec[R],
) (*R, int, error) {
- url := FormatUrl(req, pathParams, queryParams)
- fmt.Println("POST:", url)
-
- var res *http.Response
- if body == nil {
- if requestCodec == nil {
- res, err := http.Post(
- url,
- "",
- nil,
- )
-
- if err != nil {
- return nil, -1, err
- }
-
- return nil, res.StatusCode, nil
- } else {
- return nil, -1, errors.New("invalid arguments - body was not present but codec was defined")
- }
- } else {
- if requestCodec == nil {
- return nil, -1, errors.New("invalid arguments - body was present but no codec was defined")
- } else {
-
- encodedBody, err := requestCodec.Encode(body)
- if err != nil {
- return nil, -1, err
- }
-
- res, err = http.Post(
- url,
- requestCodec.HttpApplicationContentHeader(),
- encodedBody,
- )
+ bodyEncoded, err := reqCodec.EncodeToBytes(body)
+ if err != nil {
+ return nil, -1, err
+ }
- if err != nil {
- return nil, -1, err
- }
- }
+ req, err := http.NewRequest(HTTP_POST, url, bytes.NewBuffer(bodyEncoded))
+ if err != nil {
+ return nil, -1, err
}
- if responseCodec == nil {
- return nil, res.StatusCode, nil
+ for k, v := range headers {
+ req.Header.Add(k, v)
}
+ req.Header.Add("Accept", resCodec.HttpApplicationContentHeader())
- resBody, err := responseCodec.Decode(res.Body)
+ res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, -1, err
}
- return resBody, res.StatusCode, err
+ if resCodec == nil {
+ return nil, res.StatusCode, err
+ } else {
+ resBody, err := resCodec.Decode(res.Body)
+ return resBody, res.StatusCode, err
+ }
}
// builds request URL containing the path and query
diff --git a/c2ec/http-util_test.go b/c2ec/http-util_test.go
index e8d8d1c..a6b6424 100644
--- a/c2ec/http-util_test.go
+++ b/c2ec/http-util_test.go
@@ -42,12 +42,17 @@ func TestGET(t *testing.T) {
func TestPOST(t *testing.T) {
- res, status, err := HttpPost(
+ url := FormatUrl(
URL_POST,
map[string]string{
"id": "1",
},
map[string]string{},
+ )
+
+ res, status, err := HttpPost(
+ url,
+ map[string]string{},
&TestStruct{
UserId: 1,
Id: 1,
diff --git a/c2ec/listener.go b/c2ec/listener.go
new file mode 100644
index 0000000..9e53602
--- /dev/null
+++ b/c2ec/listener.go
@@ -0,0 +1,47 @@
+package main
+
+import (
+ "context"
+ "errors"
+)
+
+func RunListener(
+ ctx context.Context,
+ channel string,
+ callback func(*Notification, chan error),
+ notifications chan *Notification,
+ errs chan error,
+) {
+
+ listenFunc, err := DB.NewListener(channel, notifications)
+ if err != nil {
+ LogError("listener", err)
+ errs <- errors.New("failed setting up listener")
+ return
+ }
+
+ go func() {
+ LogInfo("listener", "listener starts listening for notifications at the db for channel="+channel)
+ err := listenFunc(ctx)
+ if err != nil {
+ LogError("listener", err)
+ errs <- err
+ }
+ close(notifications)
+ close(errs)
+ }()
+
+ // Listen is started async. We can therefore block here and must
+ // not run the retrieval logic in own goroutine
+ for {
+ select {
+ case notification := <-notifications:
+ // the dispatching and further processing can be done asynchronously
+ // thus not blocking further incoming notifications.
+ go callback(notification, errs)
+ case <-ctx.Done():
+ errs <- ctx.Err()
+ return
+ }
+ }
+}
diff --git a/c2ec/main.go b/c2ec/main.go
index f7be5d8..dcd551a 100644
--- a/c2ec/main.go
+++ b/c2ec/main.go
@@ -18,7 +18,7 @@ const POST = "POST "
const BANK_INTEGRATION_API = "/c2ec"
const WIRE_GATEWAY_API = "/wire"
-const DEFAULT_C2EC_CONFIG_PATH = "c2ec-config.yaml"
+const DEFAULT_C2EC_CONFIG_PATH = "c2ec-config.yaml" // "c2ec-config.conf"
var CONFIG C2ECConfig
@@ -66,7 +66,7 @@ func main() {
err = setupProviderClients(&CONFIG)
if err != nil {
- panic("unable initialize attestors: " + err.Error())
+ panic("unable initialize provider clients: " + err.Error())
}
LogInfo("main", "provider clients are setup")
@@ -211,6 +211,14 @@ func setupProviderClients(cfg *C2ECConfig) error {
// For new added provider, add the respective if-clause
}
+ for _, p := range CONFIG.Providers {
+ if PROVIDER_CLIENTS[p.Name] == nil {
+ err := errors.New("no provider client initialized for provider " + p.Name)
+ LogError("retrier", err)
+ return err
+ }
+ }
+
return nil
}
diff --git a/c2ec/model.go b/c2ec/model.go
deleted file mode 100644
index 9057241..0000000
--- a/c2ec/model.go
+++ /dev/null
@@ -1,91 +0,0 @@
-package main
-
-import (
- "fmt"
-)
-
-// https://docs.taler.net/core/api-common.html#hash-codes
-type WithdrawalIdentifier string
-
-// https://docs.taler.net/core/api-common.html#cryptographic-primitives
-type EddsaPublicKey string
-
-// https://docs.taler.net/core/api-common.html#hash-codes
-type HashCode string
-
-// https://docs.taler.net/core/api-common.html#hash-codes
-type ShortHashCode string
-
-// https://docs.taler.net/core/api-common.html#timestamps
-type Timestamp struct {
- Ts int `json:"t_s"`
-}
-
-// https://docs.taler.net/core/api-common.html#wadid
-type WadId [6]uint32
-
-// according to https://docs.taler.net/core/api-bank-integration.html#tsref-type-BankWithdrawalOperationStatus
-type WithdrawalOperationStatus string
-
-const (
- PENDING WithdrawalOperationStatus = "pending"
- SELECTED WithdrawalOperationStatus = "selected"
- ABORTED WithdrawalOperationStatus = "aborted"
- CONFIRMED WithdrawalOperationStatus = "confirmed"
-)
-
-func ToWithdrawalOpStatus(s string) (WithdrawalOperationStatus, error) {
- switch s {
- case string(PENDING):
- return PENDING, nil
- case string(SELECTED):
- return SELECTED, nil
- case string(ABORTED):
- return ABORTED, nil
- case string(CONFIRMED):
- return CONFIRMED, nil
- default:
- return "", fmt.Errorf("unknown withdrawal operation status '%s'", s)
- }
-}
-
-type ErrorDetail struct {
-
- // Numeric error code unique to the condition.
- // The other arguments are specific to the error value reported here.
- Code int `json:"code"`
-
- // Human-readable description of the error, i.e. "missing parameter", "commitment violation", ...
- // Should give a human-readable hint about the error's nature. Optional, may change without notice!
- Hint string `json:"hint"`
-
- // Optional detail about the specific input value that failed. May change without notice!
- Detail string `json:"detail"`
-
- // Name of the parameter that was bogus (if applicable).
- Parameter string `json:"parameter"`
-
- // Path to the argument that was bogus (if applicable).
- Path string `json:"path"`
-
- // Offset of the argument that was bogus (if applicable).
- Offset string `json:"offset"`
-
- // Index of the argument that was bogus (if applicable).
- Index string `json:"index"`
-
- // Name of the object that was bogus (if applicable).
- Object string `json:"object"`
-
- // Name of the currency that was problematic (if applicable).
- Currency string `json:"currency"`
-
- // Expected type (if applicable).
- TypeExpected string `json:"type_expected"`
-
- // Type that was provided instead (if applicable).
- TypeActual string `json:"type_actual"`
-
- // Extra information that doesn't fit into the above (if applicable).
- Extra []byte `json:"extra"`
-}
diff --git a/c2ec/postgres.go b/c2ec/postgres.go
index ce9bc9a..e9a085a 100644
--- a/c2ec/postgres.go
+++ b/c2ec/postgres.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
+ "os"
"strconv"
"time"
@@ -14,9 +15,6 @@ import (
"github.com/jackc/pgxlisten"
)
-const PS_ASC_SELECTOR = "ASC"
-const PS_DESC_SELECTOR = "DESC"
-
const PS_INSERT_WITHDRAWAL = "INSERT INTO " + WITHDRAWAL_TABLE_NAME + " (" +
WITHDRAWAL_FIELD_NAME_WOPID + "," +
WITHDRAWAL_FIELD_NAME_RESPUBKEY + "," +
@@ -45,13 +43,20 @@ const PS_SET_LAST_RETRY = "UPDATE " + WITHDRAWAL_TABLE_NAME +
" WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$2"
const PS_SET_RETRY_COUNTER = "UPDATE " + WITHDRAWAL_TABLE_NAME +
- " SET " + WITHDRAWAL_FIELD_NAME_RETRY_COUNTER + "=($1)" +
+ " SET " + WITHDRAWAL_FIELD_NAME_RETRY_COUNTER + "=$1" +
" WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$2"
-const PS_CONFIRMED_TRANSACTIONS = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
+const PS_CONFIRMED_TRANSACTIONS_ASC = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
+ " WHERE " + WITHDRAWAL_FIELD_NAME_STATUS + "='" + string(CONFIRMED) + "'" +
+ " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " ASC" +
+ " LIMIT $1" +
+ " OFFSET $2"
+
+const PS_CONFIRMED_TRANSACTIONS_DESC = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
+ " WHERE " + WITHDRAWAL_FIELD_NAME_STATUS + "='" + string(CONFIRMED) + "'" +
+ " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " DESC" +
" LIMIT $1" +
- " OFFSET $2" +
- " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " $3"
+ " OFFSET $2"
const PS_GET_WITHDRAWAL_BY_ID = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
" WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$1"
@@ -82,7 +87,22 @@ const PS_GET_TRANSFER_BY_ID = "SELECT * FROM " + TRANSFER_TABLE_NAME +
const PS_ADD_TRANSFER = "INSERT INTO " + TRANSFER_TABLE_NAME +
" (" + TRANSFER_FIELD_NAME_ID + ", " + TRANSFER_FIELD_NAME_AMOUNT + ", " +
TRANSFER_FIELD_NAME_EXCHANGE_BASE_URL + ", " + TRANSFER_FIELD_NAME_WTID + ", " +
- TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + ")" + " VALUES ($1, $2, $3, $4, $5, $6)"
+ TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + ") VALUES ($1, $2, $3, $4, $5)"
+
+const PS_UPDATE_TRANSFER = "UPDATE " + TRANSFER_TABLE_NAME + " SET (" +
+ TRANSFER_FIELD_NAME_TS + ", " + TRANSFER_FIELD_NAME_STATUS + ", " +
+ TRANSFER_FIELD_NAME_RETRIES + ") VALUES ($1,$2,$3) WHERE " +
+ TRANSFER_FIELD_NAME_ID + "=$4"
+
+const PS_GET_TRANSFERS_ASC = "SELECT * FROM " + TRANSFER_TABLE_NAME +
+ " ORDER BY " + TRANSFER_FIELD_NAME_ROW_ID + " ASC" +
+ " LIMIT $1" +
+ " OFFSET $2"
+
+const PS_GET_TRANSFERS_DESC = "SELECT * FROM " + TRANSFER_TABLE_NAME +
+ " ORDER BY " + TRANSFER_FIELD_NAME_ROW_ID + " DESC" +
+ " LIMIT $1" +
+ " OFFSET $2"
// Postgres implementation of the C2ECDatabase
type C2ECPostgres struct {
@@ -109,6 +129,11 @@ func NewC2ECPostgres(cfg *C2ECDatabseConfig) (*C2ECPostgres, error) {
db := new(C2ECPostgres)
connectionString := PostgresConnectionString(cfg)
+ pgHost := os.Getenv("PGHOST")
+ if pgHost != "" {
+ LogInfo("postgres", "pghost was set")
+ connectionString = pgHost
+ }
dbConnCfg, err := pgxpool.ParseConfig(connectionString)
if err != nil {
@@ -179,15 +204,8 @@ func (db *C2ECPostgres) GetWithdrawalById(withdrawalId int) (*Withdrawal, error)
} else {
defer row.Close()
-
- withdrawal, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
- if err != nil {
- LogError("postgres", err)
- return nil, err
- }
-
LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_ID)
- return withdrawal, nil
+ return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
}
}
@@ -206,15 +224,8 @@ func (db *C2ECPostgres) GetWithdrawalByWopid(wopid []byte) (*Withdrawal, error)
} else {
defer row.Close()
-
- withdrawal, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
- if err != nil {
- LogError("postgres", err)
- return nil, err
- }
-
LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_WOPID)
- return withdrawal, nil
+ return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
}
}
@@ -232,18 +243,8 @@ func (db *C2ECPostgres) GetWithdrawalByProviderTransactionId(tid string) (*Withd
} else {
defer row.Close()
-
- withdrawals, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[Withdrawal])
- if err != nil {
- LogError("postgres", err)
- return nil, err
- }
-
- if len(withdrawals) < 1 {
- return nil, nil
- }
LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_PTID)
- return withdrawals[0], nil
+ return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
}
}
@@ -298,7 +299,7 @@ func (db *C2ECPostgres) GetAttestableWithdrawals() ([]*Withdrawal, error) {
}
LogInfo("postgres", "query="+PS_GET_UNCONFIRMED_WITHDRAWALS)
- return withdrawals, nil
+ return removeNulls(withdrawals), nil
}
}
@@ -363,9 +364,9 @@ func (db *C2ECPostgres) SetRetryCounter(withdrawalId int, retryCounter int) erro
// wire gateway api.
func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdrawal, error) {
- sort := PS_ASC_SELECTOR
+ query := PS_CONFIRMED_TRANSACTIONS_ASC
if delta < 0 {
- sort = PS_DESC_SELECTOR
+ query = PS_CONFIRMED_TRANSACTIONS_DESC
}
limit := math.Abs(float64(delta))
@@ -385,21 +386,20 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdr
// recent ids.
row, err = db.pool.Query(
db.ctx,
- PS_CONFIRMED_TRANSACTIONS,
+ query,
limit,
"MAX("+WITHDRAWAL_FIELD_NAME_ID+")",
- sort,
)
} else {
row, err = db.pool.Query(
db.ctx,
- PS_CONFIRMED_TRANSACTIONS,
+ query,
limit,
offset,
- sort,
)
}
+ LogInfo("postgres", "query="+query)
if err != nil {
LogError("postgres", err)
if row != nil {
@@ -416,8 +416,7 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdr
return nil, err
}
- LogInfo("postgres", "query="+PS_CONFIRMED_TRANSACTIONS)
- return withdrawals, nil
+ return removeNulls(withdrawals), nil
}
}
@@ -529,7 +528,7 @@ func (db *C2ECPostgres) GetTerminalById(id int) (*Terminal, error) {
}
}
-func (db *C2ECPostgres) GetTransferById(requestUid HashCode) (*Transfer, error) {
+func (db *C2ECPostgres) GetTransferById(requestUid []byte) (*Transfer, error) {
if row, err := db.pool.Query(
db.ctx,
@@ -558,7 +557,7 @@ func (db *C2ECPostgres) GetTransferById(requestUid HashCode) (*Transfer, error)
}
func (db *C2ECPostgres) AddTransfer(
- requestUid HashCode,
+ requestUid []byte,
amount *Amount,
exchangeBaseUrl string,
wtid string,
@@ -589,81 +588,100 @@ func (db *C2ECPostgres) AddTransfer(
return nil
}
-func (db *C2ECPostgres) ListenForWithdrawalStatusChange(
- ctx context.Context,
- wopid WithdrawalIdentifier,
- out chan WithdrawalOperationStatus,
- errs chan error,
-) {
-
- notifications := make(chan *Notification)
- errsInternal := make(chan error)
+func (db *C2ECPostgres) UpdateTransfer(
+ requestUid []byte,
+ timestamp int64,
+ status int16,
+ retries int16,
+) error {
- go func() {
+ _, err := db.pool.Exec(
+ db.ctx,
+ PS_UPDATE_TRANSFER,
+ timestamp,
+ status,
+ retries,
+ requestUid,
+ )
+ if err != nil {
+ LogError("postgres", err)
+ return err
+ }
+ LogInfo("postgres", "query="+PS_UPDATE_TRANSFER)
+ return nil
+}
- channel := "w_" + string(wopid)
- listener, err := NewListener(channel, notifications)
- if err != nil {
- errsInternal <- err
- return
- }
- LogInfo("postgres", fmt.Sprintf("listening for status change of wopid=%s", wopid))
+func (db *C2ECPostgres) GetTransfers(start int, delta int) ([]*Transfer, error) {
- if err := listener.Listen(ctx); err != nil {
- LogError("postgres", err)
- errs <- err
- }
- }()
-
- for {
- select {
- case e := <-errsInternal:
- LogError("postgres", e)
- errs <- e
- case <-ctx.Done():
- err := ctx.Err()
- msg := "context sent done signal while listening for status change"
- if err != nil {
- LogError("postgres", err)
- }
- LogWarn("postgres", msg)
- errs <- errors.New(msg)
- case n := <-notifications:
- LogInfo("postgres", fmt.Sprintf("received notification for channel %s: %s", n.Channel, n.Payload))
- out <- WithdrawalOperationStatus(n.Payload)
- }
+ query := PS_GET_TRANSFERS_ASC
+ if delta < 0 {
+ query = PS_GET_TRANSFERS_DESC
}
-}
-func Listen(ctx context.Context, channel string) (chan *Notification, chan error, error) {
+ limit := math.Abs(float64(delta))
+ offset := start
+ if delta < 0 {
+ offset = start - int(limit)
+ }
+ if offset < 0 {
+ offset = 0
+ }
- out := make(chan *Notification)
- errs := make(chan error)
+ var row pgx.Rows
+ var err error
+ if start < 0 {
+ // use MAX(id) instead of a concrete id, because start
+ // identifier was negative. Inidicates to read the most
+ // recent ids.
+ row, err = db.pool.Query(
+ db.ctx,
+ query,
+ limit,
+ "MAX("+TRANSFER_FIELD_NAME_ROW_ID+")",
+ )
+ } else {
+ row, err = db.pool.Query(
+ db.ctx,
+ query,
+ limit,
+ offset,
+ )
+ }
- listener, err := NewListener(channel, out)
+ LogInfo("postgres", "query="+query)
if err != nil {
- return nil, nil, err
- }
+ LogError("postgres", err)
+ if row != nil {
+ row.Close()
+ }
+ return nil, err
+ } else {
- go func() {
+ defer row.Close()
- err := listener.Listen(ctx)
+ transfers, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[Transfer])
if err != nil {
- errs <- err
+ LogError("postgres", err)
+ return nil, err
}
- }()
- return out, errs, nil
+ return removeNulls(transfers), nil
+ }
}
// Sets up a a listener for the given channel.
// Notifications will be sent through the out channel.
-func NewListener(
+func (db *C2ECPostgres) NewListener(
cn string,
out chan *Notification,
-) (*pgxlisten.Listener, error) {
+) (func(context.Context) error, error) {
connectionString := PostgresConnectionString(&CONFIG.Database)
+ pgHost := os.Getenv("PGHOST")
+ if pgHost != "" {
+ LogInfo("postgres", "pghost was set")
+ connectionString = pgHost
+ }
cfg, err := pgx.ParseConfig(connectionString)
if err != nil {
@@ -687,5 +705,16 @@ func NewListener(
return nil
}))
- return listener, nil
+ return listener.Listen, nil
+}
+
+func removeNulls[T any](l []*T) []*T {
+
+ withoutNulls := make([]*T, 0)
+ for _, e := range l {
+ if e != nil {
+ withoutNulls = append(withoutNulls, e)
+ }
+ }
+ return withoutNulls
}
diff --git a/c2ec/provider-client.go b/c2ec/provider.go
index 023586d..33bee3b 100644
--- a/c2ec/provider-client.go
+++ b/c2ec/provider.go
@@ -10,4 +10,5 @@ type ProviderClient interface {
SetupClient(provider *Provider) error
GetTransaction(transactionId string) (ProviderTransaction, error)
Refund(transactionId string) error
+ FormatPayto(w *Withdrawal) string
}
diff --git a/c2ec/retrier.go b/c2ec/retrier.go
index d553708..a1ceee2 100644
--- a/c2ec/retrier.go
+++ b/c2ec/retrier.go
@@ -2,7 +2,6 @@ package main
import (
"context"
- "errors"
"strconv"
"time"
)
@@ -12,54 +11,16 @@ const PS_RETRY_CHANNEL = "retry"
func RunRetrier(ctx context.Context, errs chan error) {
- for _, p := range CONFIG.Providers {
- if PROVIDER_CLIENTS[p.Name] == nil {
- err := errors.New("no provider client initialized for provider " + p.Name)
- LogError("retrier", err)
- errs <- err
- }
- }
-
- notifications := make(chan *Notification, RETRY_CHANNEL_BUFFER_SIZE)
- go retryCallback(ctx, notifications, errs)
-}
-
-func retryCallback(ctx context.Context, notifications chan *Notification, errs chan error) {
-
- listener, err := NewListener(PS_RETRY_CHANNEL, notifications)
- if err != nil {
- LogError("retrier", err)
- errs <- errors.New("retrier needs to be setup first")
- }
-
- go func() {
- LogInfo("retrier", "retrier starts listening for retry notifications at the db")
- err := listener.Listen(ctx)
- if err != nil {
- LogError("retry-listener", err)
- errs <- err
- }
- close(notifications)
- close(errs)
- }()
-
- // Listen is started async. We can therefore block here and must
- // not run the retrieval logic in own goroutine
- for {
- select {
- case notification := <-notifications:
- // the dispatching and setup of the retry process can
- // be kicked off asynchronically, thus not blocking
- // further incoming notifications.
- go dispatchRetry(notification, errs)
- case <-ctx.Done():
- errs <- ctx.Err()
- return
- }
- }
+ go RunListener(
+ ctx,
+ PS_RETRY_CHANNEL,
+ retryCallback,
+ make(chan *Notification, RETRY_CHANNEL_BUFFER_SIZE),
+ errs,
+ )
}
-func dispatchRetry(n *Notification, errs chan error) {
+func retryCallback(n *Notification, errs chan error) {
withdrawalId, err := strconv.Atoi(n.Payload)
if err != nil {
diff --git a/c2ec/simulation-client.go b/c2ec/simulation-client.go
index a6708ec..b950167 100644
--- a/c2ec/simulation-client.go
+++ b/c2ec/simulation-client.go
@@ -37,6 +37,11 @@ func (st *SimulationTransaction) Bytes() []byte {
return bytes.NewBufferString("this is a simulated transaction and therefore has no content.").Bytes()
}
+func (sc *SimulationClient) FormatPayto(w *Withdrawal) string {
+
+ return fmt.Sprintf("payto://void/%s", *w.ProviderTransactionId)
+}
+
func (sc *SimulationClient) SetupClient(p *Provider) error {
fmt.Println("setting up simulation client. probably not what you want in production")
diff --git a/c2ec/transfer.go b/c2ec/transfer.go
new file mode 100644
index 0000000..915db67
--- /dev/null
+++ b/c2ec/transfer.go
@@ -0,0 +1,122 @@
+package main
+
+import (
+ "context"
+ "encoding/base64"
+ "errors"
+ "fmt"
+ "strconv"
+ "time"
+)
+
+const REFUND_CHANNEL_BUFFER_SIZE = 10
+const PS_REFUND_CHANNEL = "transfer"
+
+const TRANSFER_STATUS_SUCCESS = 0
+const TRANSFER_STATUS_RETRY = 1
+const TRANSFER_STATUS_FAILED = -1
+
+// Sets up and runs an attestor in the background. This must be called at startup.
+func RunRefunder(
+ ctx context.Context,
+ errs chan error,
+) {
+
+ go RunListener(
+ ctx,
+ PS_REFUND_CHANNEL,
+ transferCallback,
+ make(chan *Notification, REFUND_CHANNEL_BUFFER_SIZE),
+ errs,
+ )
+}
+
+func transferCallback(notification *Notification, errs chan error) {
+
+ LogInfo("refunder", fmt.Sprintf("retrieved information on channel=%s with payload=%s", notification.Channel, notification.Payload))
+
+ transferRequestUidBase64 := notification.Payload
+ if transferRequestUidBase64 == "" {
+ errs <- errors.New("the transfer to refund is not specified")
+ return
+ }
+
+ transferRequestUid, err := base64.StdEncoding.DecodeString(transferRequestUidBase64)
+ if err != nil {
+ errs <- errors.New("malformed transfer request uid: " + err.Error())
+ return
+ }
+
+ // Load Transfer from DB
+ transfer, err := DB.GetTransferById(transferRequestUid)
+ if err != nil {
+ LogError("refunder", err)
+ transferFailed(transfer, errs)
+ errs <- err
+ }
+
+ // Load Provider from DB (by payto uri)
+ paytoTargetType, tid, err := ParsePaytoWalleeTransaction(transfer.CreditAccount)
+ if err != nil {
+ errs <- errors.New("malformed transfer request uid: " + err.Error())
+ transferFailed(transfer, errs)
+ return
+ }
+
+ provider, err := DB.GetTerminalProviderByPaytoTargetType(paytoTargetType)
+ if err != nil {
+ LogError("refunder", err)
+ transferFailed(transfer, errs)
+ errs <- err
+ }
+
+ client := PROVIDER_CLIENTS[provider.Name]
+ if client == nil {
+ errs <- errors.New("no provider client registered for provider " + provider.Name)
+ }
+
+ err = client.Refund(strconv.Itoa(tid))
+ if err != nil {
+ LogError("refunder", err)
+ transferFailed(transfer, errs)
+ return
+ }
+
+ err = DB.UpdateTransfer(
+ transfer.RequestUid,
+ time.Now().Unix(),
+ TRANSFER_STATUS_SUCCESS, // success
+ transfer.Retries,
+ )
+ if err != nil {
+ errs <- err
+ }
+}
+
+func transferFailed(
+ transfer *Transfer,
+ errs chan error,
+) {
+
+ if transfer.Retries > 2 {
+ err := DB.UpdateTransfer(
+ transfer.RequestUid,
+ time.Now().Unix(),
+ TRANSFER_STATUS_FAILED, // transfer ultimatively failed.
+ transfer.Retries,
+ )
+ if err != nil {
+ errs <- err
+ }
+ } else {
+ err := DB.UpdateTransfer(
+ transfer.RequestUid,
+ time.Now().Unix(),
+ TRANSFER_STATUS_RETRY, // retry transfer.
+ transfer.Retries+1,
+ )
+ if err != nil {
+ errs <- err
+ }
+ }
+}
diff --git a/c2ec/utils.go b/c2ec/utils.go
new file mode 100644
index 0000000..f85569a
--- /dev/null
+++ b/c2ec/utils.go
@@ -0,0 +1,31 @@
+package main
+
+// https://docs.taler.net/core/api-common.html#hash-codes
+type WithdrawalIdentifier string
+
+// https://docs.taler.net/core/api-common.html#cryptographic-primitives
+type EddsaPublicKey string
+
+// https://docs.taler.net/core/api-common.html#hash-codes
+type HashCode string
+
+// https://docs.taler.net/core/api-common.html#hash-codes
+type ShortHashCode string
+
+// https://docs.taler.net/core/api-common.html#timestamps
+type Timestamp struct {
+ Ts int `json:"t_s"`
+}
+
+// https://docs.taler.net/core/api-common.html#wadid
+type WadId [6]uint32
+
+// according to https://docs.taler.net/core/api-bank-integration.html#tsref-type-BankWithdrawalOperationStatus
+type WithdrawalOperationStatus string
+
+const (
+ PENDING WithdrawalOperationStatus = "pending"
+ SELECTED WithdrawalOperationStatus = "selected"
+ ABORTED WithdrawalOperationStatus = "aborted"
+ CONFIRMED WithdrawalOperationStatus = "confirmed"
+)
diff --git a/c2ec/wallee-client.go b/c2ec/wallee-client.go
index 2f304d0..a7f9f14 100644
--- a/c2ec/wallee-client.go
+++ b/c2ec/wallee-client.go
@@ -52,6 +52,11 @@ func (wt *WalleeTransaction) AbortWithdrawal() bool {
strings.EqualFold(string(wt.State), string(StateDecline))
}
+func (wt *WalleeTransaction) FormatPayto() string {
+
+ return fmt.Sprintf("payto://wallee-transaction/%d", wt.ID)
+}
+
func (wt *WalleeTransaction) Bytes() []byte {
reader, err := NewJsonCodec[WalleeTransaction]().Encode(wt)
@@ -112,8 +117,44 @@ func (w *WalleeClient) GetTransaction(transactionId string) (ProviderTransaction
return t, nil
}
+func (sc *WalleeClient) FormatPayto(w *Withdrawal) string {
+
+ return fmt.Sprintf("payto://wallee-transaction/%s", *w.ProviderTransactionId)
+}
+
func (w *WalleeClient) Refund(transactionId string) error {
- return errors.New("not yet implemented")
+
+ call := WALLEE_CREATE_REFUND_API
+ queryParams := map[string]string{
+ WALLEE_API_SPACEID_PARAM_NAME: strconv.Itoa(w.credentials.SpaceId),
+ }
+ url := FormatUrl(call, map[string]string{}, queryParams)
+
+ hdrs, err := w.prepareWalleeHeaders(url, HTTP_GET)
+ if err != nil {
+ return err
+ }
+
+ // TODO generate refund object. needs Completion and Transaction IDs
+ refund := &WalleeRefund{
+ Amount: 10,
+ Completion: 10,
+ ExternalID: "",
+ MerchantReference: "",
+ Reductions: []WalleeLineItemReduction{},
+ Transaction: 10,
+ Type: "",
+ }
+
+ _, status, err := HttpPost[WalleeRefund, any](url, hdrs, refund, NewJsonCodec[WalleeRefund](), nil)
+ if err != nil {
+ LogError("wallee-client", err)
+ return err
+ }
+ if status != HTTP_OK {
+ return errors.New("no result")
+ }
+ return nil
}
func (w *WalleeClient) prepareWalleeHeaders(url string, method string) (map[string]string, error) {
@@ -218,232 +259,3 @@ func calculateWalleeAuthToken(
return base64.StdEncoding.EncodeToString(mac), nil
}
-
-type TransactionState string
-
-const (
- StateCreate TransactionState = "CREATE"
- StatePending TransactionState = "PENDING"
- StateConfirmed TransactionState = "CONFIRMED"
- StateProcessing TransactionState = "PROCESSING"
- StateFailed TransactionState = "FAILED"
- StateAuthorized TransactionState = "AUTHORIZED"
- StateCompleted TransactionState = "COMPLETED"
- StateFulfill TransactionState = "FULFILL"
- StateDecline TransactionState = "DECLINE"
- StateVoided TransactionState = "VOIDED"
-)
-
-type WalleeTransaction struct {
- ProviderTransaction
-
- // acceptHeader contains the header which indicates the language preferences of the buyer.
- AcceptHeader string `json:"acceptHeader"`
-
- // acceptLanguageHeader contains the header which indicates the language preferences of the buyer.
- AcceptLanguageHeader string `json:"acceptLanguageHeader"`
-
- // allowedPaymentMethodBrands is a collection of payment method brand IDs.
- AllowedPaymentMethodBrands []int64 `json:"allowedPaymentMethodBrands"`
-
- // allowedPaymentMethodConfigurations is a collection of payment method configuration IDs.
- AllowedPaymentMethodConfigurations []int64 `json:"allowedPaymentMethodConfigurations"`
-
- // authorizationAmount is the amount authorized for the transaction.
- AuthorizationAmount float64 `json:"authorizationAmount"`
-
- // authorizationEnvironment is the environment in which this transaction was successfully authorized.
- AuthorizationEnvironment string `json:"authorizationEnvironment"`
-
- // authorizationSalesChannel is the sales channel through which the transaction was placed.
- AuthorizationSalesChannel int64 `json:"authorizationSalesChannel"`
-
- // authorizationTimeoutOn is the time on which the transaction will be timed out when it is not at least authorized.
- AuthorizationTimeoutOn time.Time `json:"authorizationTimeoutOn"`
-
- // authorizedOn is the timestamp when the transaction was authorized.
- AuthorizedOn time.Time `json:"authorizedOn"`
-
- // autoConfirmationEnabled indicates whether auto confirmation is enabled for the transaction.
- AutoConfirmationEnabled bool `json:"autoConfirmationEnabled"`
-
- // billingAddress is the address associated with the transaction.
- BillingAddress string `json:"-"`
-
- // chargeRetryEnabled indicates whether charging retry is enabled for the transaction.
- ChargeRetryEnabled bool `json:"chargeRetryEnabled"`
-
- // completedAmount is the total amount which has been captured so far.
- CompletedAmount float64 `json:"completedAmount"`
-
- // completedOn is the timestamp when the transaction was completed.
- CompletedOn time.Time `json:"completedOn"`
-
- // completionBehavior controls when the transaction is completed.
- CompletionBehavior string `json:"completionBehavior"`
-
- // completionTimeoutOn is the timestamp when the transaction completion will time out.
- CompletionTimeoutOn time.Time `json:"completionTimeoutOn"`
-
- // confirmedBy is the user ID who confirmed the transaction.
- ConfirmedBy int64 `json:"confirmedBy"`
-
- // confirmedOn is the timestamp when the transaction was confirmed.
- ConfirmedOn time.Time `json:"confirmedOn"`
-
- // createdBy is the user ID who created the transaction.
- CreatedBy int64 `json:"createdBy"`
-
- // createdOn is the timestamp when the transaction was created.
- CreatedOn time.Time `json:"createdOn"`
-
- // currency is the currency code associated with the transaction.
- Currency string `json:"currency"`
-
- // customerEmailAddress is the email address of the customer.
- CustomerEmailAddress string `json:"customerEmailAddress"`
-
- // customerId is the ID of the customer associated with the transaction.
- CustomerID string `json:"customerId"`
-
- // customersPresence indicates what kind of authentication method was used during authorization.
- CustomersPresence string `json:"customersPresence"`
-
- // deliveryDecisionMadeOn is the timestamp when the decision has been made if a transaction should be delivered or not.
- DeliveryDecisionMadeOn time.Time `json:"deliveryDecisionMadeOn"`
-
- // deviceSessionIdentifier links the transaction with the session identifier provided in the URL of the device data JavaScript.
- DeviceSessionIdentifier string `json:"deviceSessionIdentifier"`
-
- // emailsDisabled indicates whether email sending is disabled for this particular transaction.
- EmailsDisabled bool `json:"emailsDisabled"`
-
- // endOfLife indicates the date from which on no operation can be carried out anymore.
- EndOfLife time.Time `json:"endOfLife"`
-
- // environment is the environment in which the transaction is processed.
- Environment string `json:"environment"`
-
- // environmentSelectionStrategy determines how the environment (test or production) for processing the transaction is selected.
- EnvironmentSelectionStrategy string `json:"environmentSelectionStrategy"`
-
- // failedOn is the timestamp when the transaction failed.
- FailedOn time.Time `json:"failedOn"`
-
- // failedUrl is the URL to which the user will be redirected when the transaction fails.
- FailedURL string `json:"failedUrl"`
-
- // failureReason describes why the transaction failed.
- FailureReason string `json:"failureReason"`
-
- // group is the transaction group associated with the transaction.
- Group string `json:"-"`
-
- // id is the unique identifier for the transaction.
- ID int64 `json:"id"`
-
- // internetProtocolAddress identifies the device of the buyer.
- InternetProtocolAddress string `json:"internetProtocolAddress"`
-
- // internetProtocolAddressCountry is the country associated with the Internet Protocol (IP) address.
- InternetProtocolAddressCountry string `json:"internetProtocolAddressCountry"`
-
- // invoiceMerchantReference is the merchant reference associated with the invoice.
- InvoiceMerchantReference string `json:"invoiceMerchantReference"`
-
- // javaEnabled indicates whether Java is enabled for the transaction.
- JavaEnabled bool `json:"javaEnabled"`
-
- // language is the language linked to the transaction.
- Language string `json:"language"`
-
- // lineItems is a collection of line items associated with the transaction.
- LineItems []string `json:"-"`
-
- // linkedSpaceId is the ID of the space this transaction belongs to.
- LinkedSpaceID int64 `json:"linkedSpaceId"`
-
- // merchantReference is the merchant reference associated with the transaction.
- MerchantReference string `json:"merchantReference"`
-
- // metaData allows storing additional information about the transaction.
- MetaData map[string]string `json:"metaData"`
-
- // parent is the parent transaction associated with this transaction.
- Parent int64 `json:"parent"`
-
- // paymentConnectorConfiguration is the connector configuration associated with the payment.
- PaymentConnectorConfiguration string `json:"-"`
-
- // plannedPurgeDate is the date when the transaction is planned to be permanently removed.
- PlannedPurgeDate time.Time `json:"plannedPurgeDate"`
-
- // processingOn is the timestamp when the transaction is being processed.
- ProcessingOn time.Time `json:"processingOn"`
-
- // refundedAmount is the total amount which has been refunded so far.
- RefundedAmount float64 `json:"refundedAmount"`
-
- // screenColorDepth is the color depth of the screen associated with the transaction.
- ScreenColorDepth string `json:"screenColorDepth"`
-
- // screenHeight is the height of the screen associated with the transaction.
- ScreenHeight string `json:"screenHeight"`
-
- // screenWidth is the width of the screen associated with the transaction.
- ScreenWidth string `json:"screenWidth"`
-
- // shippingAddress is the address associated with the shipping of the transaction.
- ShippingAddress string `json:"-"`
-
- // shippingMethod is the method used for shipping in the transaction.
- ShippingMethod string `json:"shippingMethod"`
-
- // spaceViewId is the ID of the space view associated with the transaction.
- SpaceViewID int64 `json:"spaceViewId"`
-
- // state is the current state of the transaction.
- State TransactionState `json:"state"`
-
- // successUrl is the URL to which the user will be redirected when the transaction succeeds.
- SuccessURL string `json:"successUrl"`
-
- // terminal is the terminal on which the payment was processed.
- Terminal string `json:"-"`
-
- // timeZone is the time zone in which the customer is located.
- TimeZone string `json:"timeZone"`
-
- // token is the token associated with the transaction.
- Token string `json:"-"`
-
- // tokenizationMode controls if and how the tokenization of payment information is applied to the transaction.
- TokenizationMode string `json:"tokenizationMode"`
-
- // totalAppliedFees is the sum of all fees that have been applied so far.
- TotalAppliedFees float64 `json:"totalAppliedFees"`
-
- // totalSettledAmount is the total amount which has been settled so far.
- TotalSettledAmount float64 `json:"totalSettledAmount"`
-
- // userAgentHeader provides the user agent of the buyer.
- UserAgentHeader string `json:"userAgentHeader"`
-
- // userFailureMessage describes why the transaction failed for the end user.
- UserFailureMessage string `json:"userFailureMessage"`
-
- // userInterfaceType defines through which user interface the transaction has been processed.
- UserInterfaceType string `json:"userInterfaceType"`
-
- // version is used for optimistic locking and incremented whenever the object is updated.
- Version int `json:"version"`
-
- // windowHeight is the height of the window associated with the transaction.
- WindowHeight string `json:"windowHeight"`
-
- // windowWidth is the width of the window associated with the transaction.
- WindowWidth string `json:"windowWidth"`
-
- // yearsToKeep is the number of years the transaction will be stored after it has been authorized.
- YearsToKeep int `json:"yearsToKeep"`
-}
diff --git a/c2ec/wallee-models.go b/c2ec/wallee-models.go
new file mode 100644
index 0000000..ebf1cf4
--- /dev/null
+++ b/c2ec/wallee-models.go
@@ -0,0 +1,197 @@
+package main
+
+import (
+ "time"
+)
+
+type WalleeTransactionCompletion struct {
+ Amount float64 `json:"amount"`
+ BaseLineItems []WalleeLineItem `json:"baseLineItems"`
+ CreatedBy int64 `json:"createdBy"`
+ CreatedOn time.Time `json:"createdOn"`
+ ExternalID string `json:"externalId"`
+ FailedOn time.Time `json:"failedOn"`
+ FailureReason string `json:"failureReason"`
+ ID int64 `json:"id"`
+ InvoiceMerchantRef string `json:"invoiceMerchantReference"`
+ Labels []WalleeLabel `json:"labels"`
+ Language string `json:"language"`
+ LastCompletion bool `json:"lastCompletion"`
+ LineItemVersion string `json:"lineItemVersion"`
+ LineItems []WalleeLineItem `json:"lineItems"`
+ LinkedSpaceID int64 `json:"linkedSpaceId"`
+ LinkedTransaction int64 `json:"linkedTransaction"`
+ Mode string `json:"mode"`
+ NextUpdateOn time.Time `json:"nextUpdateOn"`
+ PaymentInformation string `json:"paymentInformation"`
+ PlannedPurgeDate time.Time `json:"plannedPurgeDate"`
+ ProcessingOn time.Time `json:"processingOn"`
+ ProcessorReference string `json:"processorReference"`
+ RemainingLineItems []WalleeLineItem `json:"remainingLineItems"`
+ SpaceViewID int64 `json:"spaceViewId"`
+ State string `json:"state"`
+ StatementDescriptor string `json:"statementDescriptor"`
+ SucceededOn time.Time `json:"succeededOn"`
+ TaxAmount float64 `json:"taxAmount"`
+ TimeZone string `json:"timeZone"`
+ TimeoutOn time.Time `json:"timeoutOn"`
+ Version int `json:"version"`
+}
+
+type WalleeRefund struct {
+ Amount float64 `json:"amount"`
+ Completion int64 `json:"completion"` // ID of WalleeTransactionCompletion
+ ExternalID string `json:"externalId"` // Unique per transaction
+ MerchantReference string `json:"merchantReference"`
+ Reductions []WalleeLineItemReduction `json:"reductions"`
+ Transaction int64 `json:"transaction"` // ID of WalleeTransaction
+ Type string `json:"type"` // Refund Type
+}
+
+type WalleeLabel struct {
+ Content []byte `json:"content"`
+ ContentAsString string `json:"contentAsString"`
+ Descriptor WalleeLabelDescriptor `json:"descriptor"`
+ ID int64 `json:"id"`
+ Version int `json:"version"`
+}
+
+type WalleeLabelDescriptor struct {
+ Category string `json:"category"`
+ Description map[string]string `json:"description"`
+ Features []int64 `json:"features"`
+ Group int64 `json:"group"`
+ ID int64 `json:"id"`
+ Name map[string]string `json:"name"`
+ Type int64 `json:"type"`
+ Weight int `json:"weight"`
+}
+
+type WalleeLineItemReduction struct {
+ LineItemUniqueId string
+ QuantityReduction float64
+ UnitPriceReduction float64
+}
+
+type WalleeLineItemAttribute struct {
+ Label string
+ Value string
+}
+
+type WalleeTax struct {
+ Rate float64
+ Title string
+}
+
+type WalleeLineItem struct {
+ AggregatedTaxRate float64 `json:"aggregatedTaxRate"`
+ AmountExcludingTax float64 `json:"amountExcludingTax"`
+ AmountIncludingTax float64 `json:"amountIncludingTax"`
+ Attributes map[string]WalleeLineItemAttribute `json:"attributes"`
+ DiscountExcludingTax float64 `json:"discountExcludingTax"`
+ DiscountIncludingTax float64 `json:"discountIncludingTax"`
+ Name string `json:"name"`
+ Quantity float64 `json:"quantity"`
+ ShippingRequired bool `json:"shippingRequired"`
+ SKU string `json:"sku"`
+ TaxAmount float64 `json:"taxAmount"`
+ TaxAmountPerUnit float64 `json:"taxAmountPerUnit"`
+ Taxes []WalleeTax `json:"taxes"`
+ Type string `json:"type"`
+ UndiscountedAmountExcludingTax float64 `json:"undiscountedAmountExcludingTax"`
+ UndiscountedAmountIncludingTax float64 `json:"undiscountedAmountIncludingTax"`
+ UndiscountedUnitPriceExclTax float64 `json:"undiscountedUnitPriceExcludingTax"`
+ UndiscountedUnitPriceInclTax float64 `json:"undiscountedUnitPriceIncludingTax"`
+ UniqueID string `json:"uniqueId"`
+ UnitPriceExcludingTax float64 `json:"unitPriceExcludingTax"`
+ UnitPriceIncludingTax float64 `json:"unitPriceIncludingTax"`
+}
+
+type WalleeTransactionState string
+
+const (
+ StateCreate WalleeTransactionState = "CREATE"
+ StatePending WalleeTransactionState = "PENDING"
+ StateConfirmed WalleeTransactionState = "CONFIRMED"
+ StateProcessing WalleeTransactionState = "PROCESSING"
+ StateFailed WalleeTransactionState = "FAILED"
+ StateAuthorized WalleeTransactionState = "AUTHORIZED"
+ StateCompleted WalleeTransactionState = "COMPLETED"
+ StateFulfill WalleeTransactionState = "FULFILL"
+ StateDecline WalleeTransactionState = "DECLINE"
+ StateVoided WalleeTransactionState = "VOIDED"
+)
+
+type WalleeTransaction struct {
+ ProviderTransaction
+ AcceptHeader string `json:"acceptHeader"`
+ AcceptLanguageHeader string `json:"acceptLanguageHeader"`
+ AllowedPaymentMethodBrands []int64 `json:"allowedPaymentMethodBrands"`
+ AllowedPaymentMethodConfigurations []int64 `json:"allowedPaymentMethodConfigurations"`
+ AuthorizationAmount float64 `json:"authorizationAmount"`
+ AuthorizationEnvironment string `json:"authorizationEnvironment"`
+ AuthorizationSalesChannel int64 `json:"authorizationSalesChannel"`
+ AuthorizationTimeoutOn time.Time `json:"authorizationTimeoutOn"`
+ AuthorizedOn time.Time `json:"authorizedOn"`
+ AutoConfirmationEnabled bool `json:"autoConfirmationEnabled"`
+ BillingAddress string `json:"-"`
+ ChargeRetryEnabled bool `json:"chargeRetryEnabled"`
+ CompletedAmount float64 `json:"completedAmount"`
+ CompletedOn time.Time `json:"completedOn"`
+ CompletionBehavior string `json:"completionBehavior"`
+ CompletionTimeoutOn time.Time `json:"completionTimeoutOn"`
+ ConfirmedBy int64 `json:"confirmedBy"`
+ ConfirmedOn time.Time `json:"confirmedOn"`
+ CreatedBy int64 `json:"createdBy"`
+ CreatedOn time.Time `json:"createdOn"`
+ Currency string `json:"currency"`
+ CustomerEmailAddress string `json:"customerEmailAddress"`
+ CustomerID string `json:"customerId"`
+ CustomersPresence string `json:"customersPresence"`
+ DeliveryDecisionMadeOn time.Time `json:"deliveryDecisionMadeOn"`
+ DeviceSessionIdentifier string `json:"deviceSessionIdentifier"`
+ EmailsDisabled bool `json:"emailsDisabled"`
+ EndOfLife time.Time `json:"endOfLife"`
+ Environment string `json:"environment"`
+ EnvironmentSelectionStrategy string `json:"environmentSelectionStrategy"`
+ FailedOn time.Time `json:"failedOn"`
+ FailedURL string `json:"failedUrl"`
+ FailureReason string `json:"failureReason"`
+ Group string `json:"-"`
+ ID int64 `json:"id"`
+ InternetProtocolAddress string `json:"internetProtocolAddress"`
+ InternetProtocolAddressCountry string `json:"internetProtocolAddressCountry"`
+ InvoiceMerchantReference string `json:"invoiceMerchantReference"`
+ JavaEnabled bool `json:"javaEnabled"`
+ Language string `json:"language"`
+ LineItems []string `json:"-"`
+ LinkedSpaceID int64 `json:"linkedSpaceId"`
+ MerchantReference string `json:"merchantReference"`
+ MetaData map[string]string `json:"metaData"`
+ Parent int64 `json:"parent"`
+ PaymentConnectorConfiguration string `json:"-"`
+ PlannedPurgeDate time.Time `json:"plannedPurgeDate"`
+ ProcessingOn time.Time `json:"processingOn"`
+ RefundedAmount float64 `json:"refundedAmount"`
+ ScreenColorDepth string `json:"screenColorDepth"`
+ ScreenHeight string `json:"screenHeight"`
+ ScreenWidth string `json:"screenWidth"`
+ ShippingAddress string `json:"-"`
+ ShippingMethod string `json:"shippingMethod"`
+ SpaceViewID int64 `json:"spaceViewId"`
+ State WalleeTransactionState `json:"state"`
+ SuccessURL string `json:"successUrl"`
+ Terminal string `json:"-"`
+ TimeZone string `json:"timeZone"`
+ Token string `json:"-"`
+ TokenizationMode string `json:"tokenizationMode"`
+ TotalAppliedFees float64 `json:"totalAppliedFees"`
+ TotalSettledAmount float64 `json:"totalSettledAmount"`
+ UserAgentHeader string `json:"userAgentHeader"`
+ UserFailureMessage string `json:"userFailureMessage"`
+ UserInterfaceType string `json:"userInterfaceType"`
+ Version int `json:"version"`
+ WindowHeight string `json:"windowHeight"`
+ WindowWidth string `json:"windowWidth"`
+ YearsToKeep int `json:"yearsToKeep"`
+}
diff --git a/c2ec/wire-gateway.go b/c2ec/wire-gateway.go
index 424c69c..0e793af 100644
--- a/c2ec/wire-gateway.go
+++ b/c2ec/wire-gateway.go
@@ -2,6 +2,7 @@ package main
import (
"context"
+ "errors"
"log"
http "net/http"
"strconv"
@@ -73,6 +74,24 @@ type OutgoingBankTransaction struct {
}
func NewIncomingReserveTransaction(w *Withdrawal) *IncomingReserveTransaction {
+
+ if w == nil {
+ LogWarn("wire-gateway", "the withdrawal was nil")
+ return nil
+ }
+
+ provider, err := DB.GetProviderByTerminal(int(*w.TerminalId))
+ if err != nil {
+ LogError("wire-gateway", err)
+ return nil
+ }
+
+ client := PROVIDER_CLIENTS[provider.Name]
+ if client == nil {
+ LogError("wire-gateway", errors.New("no provider client with name="+provider.Name))
+ return nil
+ }
+
t := new(IncomingReserveTransaction)
t.Amount = Amount{
Value: uint64(w.Amount.Val),
@@ -82,8 +101,8 @@ func NewIncomingReserveTransaction(w *Withdrawal) *IncomingReserveTransaction {
t.Date = Timestamp{
Ts: int(w.RegistrationTs),
}
- t.DebitAccount = "" // TODO provider specific payto uri -> needs new interface operation
- t.ReservePub = EddsaPublicKey(w.ReservePubKey)
+ t.DebitAccount = client.FormatPayto(w)
+ t.ReservePub = FormatEddsaPubKey(w.ReservePubKey)
t.RowId = int(w.WithdrawalId)
t.Type = INCOMING_RESERVE_TRANSACTION_TYPE
return t
@@ -97,7 +116,7 @@ func NewOutgoingBankTransaction(tr *Transfer) *OutgoingBankTransaction {
Currency: tr.Amount.Curr,
}
t.Date = Timestamp{
- Ts: int(tr.TransactionTs),
+ Ts: int(tr.TransferTs),
}
t.CreditAccount = tr.CreditAccount
t.ExchangeBaseUrl = tr.ExchangeBaseUrl
@@ -170,7 +189,21 @@ func transfer(res http.ResponseWriter, req *http.Request) {
return
}
- t, err := DB.GetTransferById(transfer.RequestUid)
+ decodedRequestUid, err := talerBase32Decode(string(transfer.RequestUid))
+ if err != nil {
+ err := WriteProblem(res, HTTP_BAD_REQUEST, &RFC9457Problem{
+ TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_TRANSFER_INVALID_REQ",
+ Title: "invalid request",
+ Detail: "the transfer request is malformed (error: " + err.Error() + ")",
+ Instance: req.RequestURI,
+ })
+ if err != nil {
+ res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR)
+ }
+ return
+ }
+
+ t, err := DB.GetTransferById(decodedRequestUid)
if err != nil {
err := WriteProblem(res, HTTP_INTERNAL_SERVER_ERROR, &RFC9457Problem{
TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_DATABASE_FAILURE",
@@ -187,7 +220,7 @@ func transfer(res http.ResponseWriter, req *http.Request) {
if t == nil {
// no transfer for this request_id -> generate new
err := DB.AddTransfer(
- transfer.RequestUid,
+ decodedRequestUid,
&transfer.Amount,
transfer.ExchangeBaseUrl,
string(transfer.Wtid),
@@ -315,6 +348,10 @@ func historyIncoming(res http.ResponseWriter, req *http.Request) {
}
}
+ if delta == 0 {
+ delta = 10
+ }
+
if shouldStartLongPoll {
// wait for the completion of the context
@@ -331,7 +368,7 @@ func historyIncoming(res http.ResponseWriter, req *http.Request) {
err := WriteProblem(res, HTTP_INTERNAL_SERVER_ERROR, &RFC9457Problem{
TypeUri: TALER_URI_PROBLEM_PREFIX + "/C2EC_DATABASE_FAILURE",
Title: "database request failed",
- Detail: "there was an error processing the database query",
+ Detail: "there was an error processing the database query. error=" + err.Error(),
Instance: req.RequestURI,
})
if err != nil {
@@ -345,9 +382,12 @@ func historyIncoming(res http.ResponseWriter, req *http.Request) {
return
}
- transactions := make([]*IncomingReserveTransaction, len(withdrawals))
+ transactions := make([]*IncomingReserveTransaction, 0)
for _, w := range withdrawals {
- transactions = append(transactions, NewIncomingReserveTransaction(w))
+ transaction := NewIncomingReserveTransaction(w)
+ if transaction != nil {
+ transactions = append(transactions, transaction)
+ }
}
enc, err := NewJsonCodec[[]*IncomingReserveTransaction]().EncodeToBytes(&transactions)
@@ -412,12 +452,8 @@ func historyOutgoing(res http.ResponseWriter, req *http.Request) {
if shouldStartLongPoll {
- // wait for the completion of the context
- waitMs, cancelFunc := context.WithTimeout(req.Context(), time.Duration(longPollMilli)*time.Millisecond)
- defer cancelFunc()
-
// this will just wait / block until the milliseconds are exceeded.
- <-waitMs.Done()
+ time.Sleep(time.Duration(longPollMilli) * time.Millisecond)
}
transfers, err := DB.GetTransfers(start, delta)
@@ -435,13 +471,21 @@ func historyOutgoing(res http.ResponseWriter, req *http.Request) {
return
}
- if len(transfers) < 1 {
+ filtered := make([]*Transfer, 0)
+ for _, t := range transfers {
+ if t.Status == 0 {
+ // only consider transfer which were successful
+ filtered = append(filtered, t)
+ }
+ }
+
+ if len(filtered) < 1 {
res.WriteHeader(HTTP_NOT_FOUND)
return
}
- transactions := make([]*OutgoingBankTransaction, len(transfers))
- for _, t := range transfers {
+ transactions := make([]*OutgoingBankTransaction, len(filtered))
+ for _, t := range filtered {
transactions = append(transactions, NewOutgoingBankTransaction(t))
}
diff --git a/docs/content/implementation/c2ec.tex b/docs/content/implementation/c2ec.tex
index 0899670..b44ee26 100644
--- a/docs/content/implementation/c2ec.tex
+++ b/docs/content/implementation/c2ec.tex
@@ -27,6 +27,11 @@ Following a short list of events and from whom they are triggered and who listen
\item Registered by: Attestor
\item Listened by: Retrier
\end{itemize}
+ \item Transfers which represent refunds in C2EC.
+ \begin{itemize}
+ \item Registered by: Exchange (through the wire gateway API)
+ \item Listened by: Transfer
+ \end{itemize}
\end{itemize}
\subsection{Bank-Integration API}
diff --git a/docs/content/introduction/introduction.tex b/docs/content/introduction/introduction.tex
index 720f444..5c6ff0e 100644
--- a/docs/content/introduction/introduction.tex
+++ b/docs/content/introduction/introduction.tex
@@ -2,7 +2,7 @@
Which payment systems do you use in your daily live and why? Probably one you know it is universally accepted, reliable, secure and the payment goes through more or less instantly.
-The \textbf{universal acceptance} was identified as one of the most important in a report which was published on behalf of the ECB (European Central Bank) in march 2022 as result of a focus group concerning the acceptance of a digital euro \cite{panetta-speech-march-30} as new payment system. The universal acceptance was even identified as \textit{the} most important property amongst the general public and tech-savvy people in the report \cite{study-new-digital-payment-methods}.
+The \textbf{universal acceptance} was identified as one of the most important aspects in a report which was published on behalf of the ECB (European Central Bank) in march 2022 as result of a focus group concerning the acceptance of a digital euro \cite{panetta-speech-march-30} as new payment system. The universal acceptance was even identified as \textit{the} most important property amongst the general public and tech-savvy people in the report \cite{study-new-digital-payment-methods}.
In a world, where everything is connected and everything is accessible from everywhere (one might think), it is therefore very important to make it as easy as possible to on-board people on a product. This is also the case for Taler. For a wide acceptance of the payment system Taler, it is important that various ways exist to withdraw digital cash in Taler.
@@ -14,7 +14,7 @@ To make the withdrawals using a credit card possible, various loose ends must be
Therefore a new component C2EC shall help, establishing a trustworthy relationship, which makes it possible for the \textit{Exchange} to issue digital cash to a customer. Therefore the \textit{Exchange} is not putting his trust on cash received but rather on the promise of a trusted third party (a terminal provider) to put the received digital cash in a location, controlled by the \textit{Exchange} eventually (e.g. a bank account owned by the \textit{Exchange}).
-This enables a broader group of people to leverage Taler for their payments. Which eventually leads to wider adoption of the payment system Taler.
+This enables a broader group of people to leverage Taler for their payments. Which eventually leads to a wider adoption of the payment system Taler.
\section{Perspectives}
During the initial analysis of the task, three areas of work were discovered. One is the \textit{Taler Exchange}, one the Application for the terminal and the (Taler) \textit{Wallet}. This led to different views on the system by two different players within it. To allow a more concise view on the system and to support the readers and implementer, two perspectives shall be kept in mind. They have different views on the process but need to interact with each other seamlessly.
@@ -23,7 +23,7 @@ During the initial analysis of the task, three areas of work were discovered. On
The perspective of the \textit{Taler Exchange} includes all processes within C2EC component and the interaction with the terminal application, terminal backend and the wallet of the user. The \textit{Taler Exchange} wants to allow withdrawal of digital digital cash only to users who pay the equivalent value to the \textit{Exchange}. The \textit{Exchange} wants to stay out of any legal implications at all costs.
\subsection{Terminal Application}
-The perspective of the terminal application includes all processes within the application which interacts with the user, their \textit{Wallet} and credit card allowing the withdrawal of digital cash. The terminal application wants to conviently allow the withdrawal of digital cash and charge fees to cover its costs and risks.
+The perspective of the terminal application includes all processes within the application which interacts with the user, their \textit{Wallet} and credit card allowing the withdrawal of digital cash. The terminal application wants to conveniently allow the withdrawal of digital cash and charge fees to cover its costs and risks.
\subsection{Taler Wallet}
The \textit{Wallet} holds the digital cash owned by the customer. The \textit{Wallet} wants to eventually gather the digital cash from the \textit{Taler Exchange}. The owner of the \textit{Wallet} must therefore present their credit card at a \textit{Terminal} of the terminal provider and pay the \textit{Exchange} as well accept the fees of the provider.
diff --git a/docs/thesis.pdf b/docs/thesis.pdf
index 167e042..a247e4b 100644
--- a/docs/thesis.pdf
+++ b/docs/thesis.pdf
Binary files differ
diff --git a/simulation/c2ec-simulation b/simulation/c2ec-simulation
index b0be13e..d1066dd 100755
--- a/simulation/c2ec-simulation
+++ b/simulation/c2ec-simulation
Binary files differ