proc-transfer.go (7393B)
1 // This file is part of taler-cashless2ecash. 2 // Copyright (C) 2024 Joel Häberli 3 // 4 // taler-cashless2ecash is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Affero General Public License as published 6 // by the Free Software Foundation, either version 3 of the License, 7 // or (at your option) any later version. 8 // 9 // taler-cashless2ecash is distributed in the hope that it will be useful, but 10 // WITHOUT ANY WARRANTY; without even the implied warranty of 11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 // Affero General Public License for more details. 13 // 14 // You should have received a copy of the GNU Affero General Public License 15 // along with this program. If not, see <http://www.gnu.org/licenses/>. 16 // 17 // SPDX-License-Identifier: AGPL3.0-or-later 18 19 package internal_proc 20 21 import ( 22 internal_utils "c2ec/internal/utils" 23 "c2ec/pkg/db" 24 "c2ec/pkg/provider" 25 "context" 26 "errors" 27 "fmt" 28 "strings" 29 "time" 30 ) 31 32 const REFUND_RETRY_INTERVAL_SECONDS = 1 33 34 const REFUND_CHANNEL_BUFFER_SIZE = 10 35 const PS_REFUND_CHANNEL = "transfer" 36 37 const TRANSFER_STATUS_SUCCESS = 0 38 const TRANSFER_STATUS_RETRY = 1 39 const TRANSFER_STATUS_FAILED = -1 40 41 const MAX_TRANSFER_BACKOFF_MS = 24 * 60 * 60 * 1000 // 1 day 42 43 // Sets up and runs an attestor in the background. This must be called at startup. 44 func RunTransferrer( 45 ctx context.Context, 46 errs chan error, 47 ) { 48 49 // go RunListener( 50 // ctx, 51 // PS_REFUND_CHANNEL, 52 // transferCallback, 53 // make(chan *Notification, REFUND_CHANNEL_BUFFER_SIZE), 54 // errs, 55 // ) 56 57 go func() { 58 lastlog := time.Now().Add(time.Minute * -3) 59 lastlog2 := time.Now().Add(time.Minute * -3) 60 for { 61 time.Sleep(REFUND_RETRY_INTERVAL_SECONDS * time.Second) 62 if lastlog.Before(time.Now().Add(time.Second * -30)) { 63 internal_utils.LogInfo("proc-transfer", "transferrer executing transfers") 64 lastlog = time.Now() 65 } 66 executePendingTransfers(errs, lastlog2) 67 if lastlog2.Before(time.Now().Add(time.Second * -30)) { 68 lastlog2 = time.Now() 69 } 70 } 71 }() 72 } 73 74 // func transferCallback(notification *Notification, errs chan error) { 75 76 // internal_utils.LogInfo("proc-transfer", fmt.Sprintf("retrieved information on channel=%s with payload=%s", notification.Channel, notification.Payload)) 77 78 // transferRequestUidBase64 := notification.Payload 79 // if transferRequestUidBase64 == "" { 80 // errs <- errors.New("the transfer to refund is not specified") 81 // return 82 // } 83 84 // transferRequestUid, err := base64.StdEncoding.DecodeString(transferRequestUidBase64) 85 // if err != nil { 86 // errs <- errors.New("malformed transfer request uid: " + err.Error()) 87 // return 88 // } 89 90 // transfer, err := db.DB.GetTransferById(transferRequestUid) 91 // if err != nil { 92 // internal_utils.LogWarn("proc-transfer", "unable to retrieve transfer with requestUid") 93 // internal_utils.LogError("proc-transfer", err) 94 // transferFailed(transfer, errs) 95 // errs <- err 96 // return 97 // } 98 99 // if transfer == nil { 100 // err := errors.New("expected an existing transfer. very strange") 101 // internal_utils.LogError("proc-transfer", err) 102 // transferFailed(transfer, errs) 103 // errs <- err 104 // return 105 // } 106 107 // paytoTargetType, tid, err := ParsePaytoUri(transfer.CreditAccount) 108 // internal_utils.LogInfo("proc-transfer", "parsed payto-target-type="+paytoTargetType) 109 // if err != nil { 110 // internal_utils.LogWarn("proc-transfer", "unable to parse payto-uri="+transfer.CreditAccount) 111 // errs <- errors.New("malformed transfer request uid: " + err.Error()) 112 // transferFailed(transfer, errs) 113 // return 114 // } 115 116 // provider, err := db.DB.GetTerminalProviderByPaytoTargetType(paytoTargetType) 117 // if err != nil { 118 // internal_utils.LogWarn("proc-transfer", "unable to find provider for provider-target-type="+paytoTargetType) 119 // internal_utils.LogError("proc-transfer", err) 120 // transferFailed(transfer, errs) 121 // errs <- err 122 // } 123 124 // client := PROVIDER_CLIENTS[provider.Name] 125 // if client == nil { 126 // errs <- errors.New("no provider client registered for provider " + provider.Name) 127 // } 128 129 // err = client.Refund(tid) 130 // if err != nil { 131 // internal_utils.LogError("proc-transfer", err) 132 // transferFailed(transfer, errs) 133 // return 134 // } 135 136 // err = db.DB.UpdateTransfer( 137 // transfer.RequestUid, 138 // time.Now().Unix(), 139 // TRANSFER_STATUS_SUCCESS, // success 140 // transfer.Retries, 141 // ) 142 // if err != nil { 143 // errs <- err 144 // } 145 // } 146 147 func executePendingTransfers(errs chan error, lastlog time.Time) { 148 149 transfers, err := db.DB.GetTransfersByState(TRANSFER_STATUS_RETRY) 150 if err != nil { 151 internal_utils.LogError("proc-transfer-1", err) 152 errs <- err 153 return 154 } 155 156 if lastlog.Before(time.Now().Add(time.Second * -30)) { 157 internal_utils.LogInfo("proc-transfer", fmt.Sprintf("found %d pending transfers", len(transfers))) 158 } 159 for _, t := range transfers { 160 161 shouldRetry := internal_utils.ShouldStartRetry(time.Unix(t.TransferTs, 0), int(t.Retries), MAX_TRANSFER_BACKOFF_MS) 162 if !shouldRetry { 163 if lastlog.Before(time.Now().Add(time.Second * -30)) { 164 internal_utils.LogInfo("proc-transfer", fmt.Sprintf("not retrying transfer id=%d, because backoff not yet exceeded", t.RowId)) 165 } 166 continue 167 } 168 169 paytoTargetType, tid, err := internal_utils.ParsePaytoUri(t.CreditAccount) 170 internal_utils.LogInfo("proc-transfer", "parsed payto-target-type="+paytoTargetType) 171 if err != nil { 172 internal_utils.LogWarn("proc-transfer", "parsing payto-target-type failed") 173 internal_utils.LogError("proc-transfer-2", err) 174 continue 175 } 176 177 prvdr, err := db.DB.GetTerminalProviderByPaytoTargetType(paytoTargetType) 178 if err != nil { 179 internal_utils.LogWarn("proc-transfer", "finding terminal by payto target type failed") 180 internal_utils.LogError("proc-transfer-3", err) 181 continue 182 } 183 184 client := provider.PROVIDER_CLIENTS[prvdr.Name] 185 if client == nil { 186 errs <- errors.New("no provider client registered for provider " + prvdr.Name) 187 continue 188 } 189 190 internal_utils.LogInfo("proc-transfer", "refunding transaction "+tid) 191 err = client.Refund(strings.Trim(tid, " \n")) 192 if err != nil { 193 internal_utils.LogWarn("proc-transfer", "refunding using provider client failed") 194 internal_utils.LogError("proc-transfer-4", err) 195 transferFailed(t, errs) 196 continue 197 } 198 199 internal_utils.LogInfo("proc-transfer", "setting transfer to success state") 200 err = db.DB.UpdateTransfer( 201 t.RowId, 202 t.RequestUid, 203 time.Now().Unix(), 204 TRANSFER_STATUS_SUCCESS, // success 205 t.Retries, 206 ) 207 if err != nil { 208 internal_utils.LogWarn("proc-transfer", "failed setting refund to success state") 209 internal_utils.LogError("proc-transfer", err) 210 } 211 } 212 } 213 214 func transferFailed( 215 transfer *db.Transfer, 216 errs chan error, 217 ) { 218 219 err := db.DB.UpdateTransfer( 220 transfer.RowId, 221 transfer.RequestUid, 222 time.Now().Unix(), 223 TRANSFER_STATUS_RETRY, // retry transfer. 224 transfer.Retries+1, 225 ) 226 if err != nil { 227 errs <- err 228 } 229 230 // if transfer.Retries > 2 { 231 // err := db.DB.UpdateTransfer( 232 // transfer.RequestUid, 233 // time.Now().Unix(), 234 // TRANSFER_STATUS_FAILED, // transfer ultimatively failed. 235 // transfer.Retries, 236 // ) 237 // if err != nil { 238 // errs <- err 239 // } 240 // } else { 241 // err := db.DB.UpdateTransfer( 242 // transfer.RequestUid, 243 // time.Now().Unix(), 244 // TRANSFER_STATUS_RETRY, // retry transfer. 245 // transfer.Retries+1, 246 // ) 247 // if err != nil { 248 // errs <- err 249 // } 250 //} 251 }