cashless2ecash

cashless2ecash: pay with cards for digital cash (experimental)
Log | Files | Refs | README

commit 470230fc83bc8ada370d6422e506d51f899d1a8d
parent 52ba853cc491b2333845bcdc79f491c85ca2eb0e
Author: Joel-Haeberli <haebu@rubigen.ch>
Date:   Tue,  9 Apr 2024 22:43:26 +0200

fix: long-poll

Diffstat:
Mc2ec/bank-integration.go | 3+--
Mc2ec/c2ec-config.yaml | 2+-
Mc2ec/db.go | 2+-
Mc2ec/db/0000-c2ec_schema.sql | 2+-
Mc2ec/main.go | 2+-
Mc2ec/postgres.go | 89++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------
Msimulation/c2ec-simulation | 0
Msimulation/http-util.go | 2+-
Msimulation/sim-terminal.go | 49+++++++++++++++++++++++++++++++++++++++++++------
Msimulation/sim-wallet.go | 15+++++++++------
10 files changed, 121 insertions(+), 45 deletions(-)

diff --git a/c2ec/bank-integration.go b/c2ec/bank-integration.go @@ -193,9 +193,7 @@ func handleWithdrawalStatus(res http.ResponseWriter, req *http.Request) { statusChannel := make(chan WithdrawalOperationStatus) errChan := make(chan error) - // listen for status change in goroutine go DB.ListenForWithdrawalStatusChange(timeoutCtx, WithdrawalIdentifier(wopid), statusChannel, errChan) - for { select { case <-timeoutCtx.Done(): @@ -222,6 +220,7 @@ func handleWithdrawalStatus(res http.ResponseWriter, req *http.Request) { return case <-statusChannel: getWithdrawalOrWriteError(wopid, res, req.RequestURI) + return } } } diff --git a/c2ec/c2ec-config.yaml b/c2ec/c2ec-config.yaml @@ -1,7 +1,7 @@ c2ec: prod: false host: "localhost" - port: 8081 + port: 8082 unix-domain-socket: false unix-socket-path: "c2ec.sock" fail-on-missing-attestors: false # forced if prod=true diff --git a/c2ec/db.go b/c2ec/db.go @@ -155,5 +155,5 @@ type C2ECDatabase interface { wopid WithdrawalIdentifier, out chan WithdrawalOperationStatus, errs chan error, - ) (WithdrawalOperationStatus, error) + ) } diff --git a/c2ec/db/0000-c2ec_schema.sql b/c2ec/db/0000-c2ec_schema.sql @@ -81,7 +81,7 @@ CREATE TABLE IF NOT EXISTS withdrawal ( wopid BYTEA CHECK (LENGTH(wopid)=32) NOT NULL, reserve_pub_key BYTEA CHECK (LENGTH(reserve_pub_key)=32) NOT NULL, registration_ts INT8 NOT NULL, - amount taler_amount_currency NOT NULL, + amount taler_amount_currency, fees taler_amount_currency, withdrawal_status withdrawal_operation_status NOT NULL DEFAULT 'pending', terminal_id INT8 NOT NULL REFERENCES terminal(terminal_id), diff --git a/c2ec/main.go b/c2ec/main.go @@ -127,7 +127,7 @@ func setupAttestors(cfg *C2ECConfig) error { if cfg.Server.IsProd || cfg.Server.StrictAttestors { panic("no provider entry for " + provider.Name) } else { - fmt.Println("non-strict attestor initialization. skipping", provider) + LogWarn("non-strict attestor initialization. skipping", provider.Name) continue } } diff --git a/c2ec/postgres.go b/c2ec/postgres.go @@ -3,10 +3,12 @@ package main import ( "bytes" "context" + "encoding/base32" "encoding/base64" "errors" "fmt" "math" + "strconv" "time" "github.com/jackc/pgx/v5" @@ -18,7 +20,7 @@ import ( const PS_ASC_SELECTOR = "ASC" const PS_DESC_SELECTOR = "DESC" -const PS_INSERT_WITHDRAWAL = "INSERT INTO " + WITHDRAWAL_TABLE_NAME + " (" + +const PS_INSERT_WITHDRAWAL = "INSERT INTO " + WITHDRAWAL_TABLE_NAME + " (" + WITHDRAWAL_FIELD_NAME_WOPID + "," + WITHDRAWAL_FIELD_NAME_RESPUBKEY + "," + WITHDRAWAL_FIELD_NAME_STATUS + "," + @@ -116,12 +118,17 @@ func (db *C2ECPostgres) RegisterWithdrawal( terminalId uint64, ) error { + resPubKeyBytes, err := base32.HexEncoding.DecodeString(string(resPubKey)) + if err != nil { + return err + } + ts := time.Now() res, err := db.pool.Query( db.ctx, PS_INSERT_WITHDRAWAL, wopid, - resPubKey, + resPubKeyBytes, SELECTED, ts.Unix(), terminalId, @@ -130,7 +137,13 @@ func (db *C2ECPostgres) RegisterWithdrawal( LogError("postgres", err) return err } - res.Close() + defer res.Close() + if res.Err() != nil { + LogError("postgres", err) + return err + } + LogInfo("postgres", "query="+PS_INSERT_WITHDRAWAL) + LogInfo("postgres", "registered withdrawal successfully. affected rows="+strconv.Itoa(int(res.CommandTag().RowsAffected()))) return nil } @@ -159,6 +172,7 @@ func (db *C2ECPostgres) GetWithdrawalByWopid(wopid string) (*Withdrawal, error) if len(withdrawals) < 1 { return nil, nil } + LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_WOPID) return withdrawals[0], nil } } @@ -187,6 +201,7 @@ func (db *C2ECPostgres) GetWithdrawalByProviderTransactionId(tid string) (*Withd if len(withdrawals) < 1 { return nil, nil } + LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_PTID) return withdrawals[0], nil } } @@ -210,6 +225,7 @@ func (db *C2ECPostgres) NotifyPayment( return err } res.Close() + LogInfo("postgres", "query="+PS_PAYMENT_NOTIFICATION) return nil } @@ -234,6 +250,7 @@ func (db *C2ECPostgres) GetAttestableWithdrawals() ([]*Withdrawal, error) { return nil, err } + LogInfo("postgres", "query="+PS_GET_UNCONFIRMED_WITHDRAWALS) return withdrawals, nil } } @@ -260,6 +277,7 @@ func (db *C2ECPostgres) FinaliseWithdrawal( return err } res.Close() + LogInfo("postgres", "query="+PS_FINALISE_PAYMENT) return nil } @@ -320,6 +338,7 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdr return nil, err } + LogInfo("postgres", "query="+PS_CONFIRMED_TRANSACTIONS) return withdrawals, nil } } @@ -350,6 +369,7 @@ func (db *C2ECPostgres) GetTerminalProviderByName(name string) (*Provider, error return nil, nil } + LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_NAME) return provider[0], nil } } @@ -380,6 +400,7 @@ func (db *C2ECPostgres) GetTerminalProviderByPaytoTargetType(paytoTargetType str return nil, nil } + LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE) return provider[0], nil } } @@ -410,6 +431,7 @@ func (db *C2ECPostgres) GetTerminalById(id int) (*Terminal, error) { return nil, nil } + LogInfo("postgres", "query="+PS_GET_TERMINAL_BY_ID) return terminals[0], nil } } @@ -439,6 +461,7 @@ func (db *C2ECPostgres) GetTransferById(requestUid HashCode) (*Transfer, error) if len(transfers) < 1 { return nil, nil } + LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_ID) return transfers[0], nil } @@ -457,6 +480,7 @@ func (db *C2ECPostgres) AddTransfer(requestId HashCode, requestHash string) erro return err } res.Close() + LogInfo("postgres", "query="+PS_ADD_TRANSFER) return nil } @@ -465,38 +489,45 @@ func (db *C2ECPostgres) ListenForWithdrawalStatusChange( wopid WithdrawalIdentifier, out chan WithdrawalOperationStatus, errs chan error, -) (WithdrawalOperationStatus, error) { +) { pgNotification := make(chan *pgconn.Notification) - channel := "w_" + base64.StdEncoding.EncodeToString(bytes.NewBufferString(string(wopid)).Bytes()) - listener := newChannelListener(db.pool.Config().ConnConfig, channel, pgNotification) go func() { + + connstr := PostgresConnectionString(&CONFIG.Database) + cfg, err := pgx.ParseConfig(connstr) + if err != nil { + errs <- err + } + + channel := "w_" + base64.StdEncoding.EncodeToString(bytes.NewBufferString(string(wopid)).Bytes()) + listener := newChannelListener(cfg, channel, pgNotification) LogInfo("postgres", fmt.Sprintf("listening for %s", wopid)) + if err := listener.Listen(ctx); err != nil { LogError("postgres", err) errs <- err } - // close the channel we send results, because listener has finished. - close(pgNotification) }() - select { - case e := <-errs: - LogError("postgres", e) - return "", e - case <-ctx.Done(): - err := ctx.Err() - msg := "context sent done signal while listening for status change" - if err != nil { - LogError("postgres", err) - } else { + for { + select { + case e := <-errs: + LogError("postgres", e) + errs <- e + case <-ctx.Done(): + err := ctx.Err() + msg := "context sent done signal while listening for status change" + if err != nil { + LogError("postgres", err) + } LogWarn("postgres", msg) + errs <- errors.New(msg) + case n := <-pgNotification: + LogInfo("postgres", fmt.Sprintf("received notification for channel %s: %s", n.Channel, n.Payload)) + out <- WithdrawalOperationStatus(n.Payload) } - return "", errors.New(msg) - case n := <-pgNotification: - LogInfo("postgres", fmt.Sprintf("received notification for channel %s: %s", n.Channel, n.Payload)) - return WithdrawalOperationStatus(n.Payload), nil } } @@ -509,16 +540,22 @@ func newChannelListener( listener := &pgxlisten.Listener{ Connect: func(ctx context.Context) (*pgx.Conn, error) { + LogInfo("postgres", "connecting to the database") return pgx.ConnectConfig(ctx, cfg) }, } listener.Handle(cn, pgxlisten.HandlerFunc(func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { - select { - case out <- notification: - case <-ctx.Done(): + LogInfo("postgres", fmt.Sprintf("handling postgres notification. channel=%s", notification.Channel)) + for { + select { + case out <- notification: + LogInfo("postgres", fmt.Sprintf("received notification. channel=%s, notification=%s", notification.Channel, notification.Payload)) + return nil + case <-ctx.Done(): + return ctx.Err() + } } - return nil })) return listener diff --git a/simulation/c2ec-simulation b/simulation/c2ec-simulation Binary files differ. diff --git a/simulation/http-util.go b/simulation/http-util.go @@ -178,7 +178,7 @@ func HttpGet[T any]( } req.Header.Add("Accept", codec.HttpApplicationContentHeader()) - fmt.Printf("requesting %s\n", url) + fmt.Printf("requesting GET %s\n", url) res, err := http.DefaultClient.Do(req) if err != nil { return nil, -1, err diff --git a/simulation/sim-terminal.go b/simulation/sim-terminal.go @@ -1,10 +1,12 @@ package main import ( + "bytes" "crypto/rand" "encoding/base64" "errors" "fmt" + "net/http" "strconv" "time" ) @@ -17,7 +19,7 @@ const TERMINAL_USER_ID = TERMINAL_PROVIDER + "-1" // retrieved from the cli tool when added the terminal const TERMINAL_ACCESS_TOKEN = "secret" -const SIM_TERMINAL_LONG_POLL_MS_STR = "20000" // 20 seconds +const SIM_TERMINAL_LONG_POLL_MS_STR = "5000" // 20 seconds const QR_CODE_CONTENT_BASE = "taler://withdraw/localhost:8081/c2ec/" @@ -43,8 +45,11 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysical // -> start long polling always before showing the QR code awaitSelection := make(chan *C2ECWithdrawalStatus) longPollFailed := make(chan error) + + out <- &SimulatedPhysicalInteraction{Msg: uri} + + fmt.Println("now sending long poll request to c2ec from terminal and await parameter selection") go func() { - // long poll for parameter selection notification by c2ec url := FormatUrl( C2EC_BANK_WITHDRAWAL_STATUS_URL, @@ -68,8 +73,6 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysical awaitSelection <- response }() - out <- &SimulatedPhysicalInteraction{Msg: uri} - for { select { case w := <-awaitSelection: @@ -78,12 +81,46 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysical if !DISABLE_DELAYS { time.Sleep(time.Duration(TERMINAL_ACCEPT_CARD_DELAY_MS)) } - fmt.Println("the card was tead by the terminal. simulating the payment using the providers backend. delay:", PROVIDER_BACKEND_PAYMENT_DELAY_MS) + fmt.Println("card accepted. terminal waits for response of provider backend.") if !DISABLE_DELAYS { time.Sleep(time.Duration(PROVIDER_BACKEND_PAYMENT_DELAY_MS)) } - // sending payment notification now... + fmt.Println("payment was processed at the provider backend. sending payment notification.") + paymentNotification := &C2ECPaymentNotification{ + ProviderTransactionId: "simulation-transaction-id-0", + Amount: Amount{ + Currency: "CHF", + Fraction: 10, + Value: 10, + }, + Fees: Amount{ + Currency: "CHF", + Fraction: 10, + Value: 0, + }, + } + cdc := NewJsonCodec[C2ECPaymentNotification]() + pnbytes, err := cdc.EncodeToBytes(paymentNotification) + if err != nil { + fmt.Println("failed serializing payment notification") + kill <- err + } + paymentUrl := FormatUrl( + C2EC_BANK_WITHDRAWAL_PAYMENT_URL, + map[string]string{"wopid": wopid}, + map[string]string{}, + ) + _, err = http.Post( + paymentUrl, + cdc.HttpApplicationContentHeader(), + bytes.NewReader(pnbytes), + ) + if err != nil { + fmt.Println("error on POST request:", err.Error()) + kill <- err + } + fmt.Println("Terminal flow ended") case f := <-longPollFailed: fmt.Println("long-polling for selection failed... error:", err.Error()) kill <- f diff --git a/simulation/sim-wallet.go b/simulation/sim-wallet.go @@ -13,7 +13,7 @@ import ( "time" ) -const SIM_WALLET_LONG_POLL_MS_STR = "20000" // 20 seconds +const SIM_WALLET_LONG_POLL_MS_STR = "5000" // 20 seconds func Wallet(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalInteraction, kill chan error) { @@ -60,6 +60,11 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalIn bytes.NewReader(regByte.Bytes()), ) + if err != nil { + fmt.Println("error on POST request:", err.Error()) + kill <- err + } + if res.StatusCode != 204 { fmt.Println("response status from registration:", res.StatusCode) kill <- errors.New("failed registering the withdrawal parameters") @@ -70,9 +75,9 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalIn // Start long poll for confirmed or abort awaitConfirmationOrAbortion := make(chan *C2ECWithdrawalStatus) longPollFailed := make(chan error) - go func() { - // long poll for parameter selection notification by c2ec + // long poll for parameter selection notification by c2ec + go func() { url := FormatUrl( C2EC_BANK_WITHDRAWAL_STATUS_URL, map[string]string{"wopid": wopid}, @@ -91,7 +96,6 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalIn longPollFailed <- errors.New("status of withdrawal status response was " + strconv.Itoa(status)) return } - awaitConfirmationOrAbortion <- response }() @@ -108,11 +112,10 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan *SimulatedPhysicalIn os.Exit(0) } case f := <-longPollFailed: - fmt.Println("long-polling for selection failed... error:", err.Error()) + fmt.Println("long-polling for selection failed... error:", f.Error()) kill <- f } } - } // returns wopid.