proc-attestor.go (6383B)
1 // This file is part of taler-cashless2ecash. 2 // Copyright (C) 2024 Joel Häberli 3 // 4 // taler-cashless2ecash is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Affero General Public License as published 6 // by the Free Software Foundation, either version 3 of the License, 7 // or (at your option) any later version. 8 // 9 // taler-cashless2ecash is distributed in the hope that it will be useful, but 10 // WITHOUT ANY WARRANTY; without even the implied warranty of 11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 // Affero General Public License for more details. 13 // 14 // You should have received a copy of the GNU Affero General Public License 15 // along with this program. If not, see <http://www.gnu.org/licenses/>. 16 // 17 // SPDX-License-Identifier: AGPL3.0-or-later 18 19 package internal_proc 20 21 import ( 22 internal_utils "c2ec/internal/utils" 23 "c2ec/pkg/config" 24 "c2ec/pkg/db" 25 "c2ec/pkg/provider" 26 "context" 27 "errors" 28 "fmt" 29 "strconv" 30 "strings" 31 "time" 32 ) 33 34 const PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE = 10 35 const PS_PAYMENT_NOTIFICATION_CHANNEL = "payment_notification" 36 const MAX_BACKOFF_MS = 30 * 60000 // thirty minutes 37 38 // Sets up and runs an attestor in the background. This must be called at startup. 39 func RunAttestor( 40 ctx context.Context, 41 errs chan error, 42 ) { 43 44 go RunListener( 45 ctx, 46 PS_PAYMENT_NOTIFICATION_CHANNEL, 47 confirmationCallback, 48 make(chan *db.Notification, PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE), 49 errs, 50 ) 51 } 52 53 func confirmationCallback(notification *db.Notification, errs chan error) { 54 55 internal_utils.LogInfo("proc-attestor", fmt.Sprintf("retrieved information on channel=%s with payload=%s", notification.Channel, notification.Payload)) 56 57 // The payload is formatted like: "{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}" 58 // the validation is strict. This means, that the dispatcher emits an error 59 // and returns, if a property is malformed. 60 payload := strings.Split(notification.Payload, "|") 61 if len(payload) != 3 { 62 errs <- errors.New("malformed notification payload: " + notification.Payload) 63 return 64 } 65 66 providerName := payload[0] 67 if providerName == "" { 68 errs <- errors.New("the provider of the payment is not specified") 69 return 70 } 71 withdrawalRowId, err := strconv.Atoi(payload[1]) 72 if err != nil { 73 errs <- errors.New("malformed withdrawal_row_id: " + err.Error()) 74 return 75 } 76 providerTransactionId := payload[2] 77 78 client := provider.PROVIDER_CLIENTS[providerName] 79 if client == nil { 80 errs <- errors.New("no provider client registered for provider " + providerName) 81 } 82 83 transaction, err := client.GetTransaction(providerTransactionId) 84 if err != nil { 85 internal_utils.LogError("proc-attestor", err) 86 prepareRetryOrAbort(withdrawalRowId, errs) 87 return 88 } 89 90 finaliseOrSetRetry( 91 transaction, 92 withdrawalRowId, 93 errs, 94 ) 95 } 96 97 func finaliseOrSetRetry( 98 transaction provider.ProviderTransaction, 99 withdrawalRowId int, 100 errs chan error, 101 ) { 102 103 if transaction == nil { 104 err := errors.New("transaction was nil. will set retry or abort") 105 internal_utils.LogError("proc-attestor", err) 106 errs <- err 107 prepareRetryOrAbort(withdrawalRowId, errs) 108 return 109 } 110 111 if w, err := db.DB.GetWithdrawalById(withdrawalRowId); err != nil { 112 internal_utils.LogError("proc-attestor", err) 113 errs <- err 114 prepareRetryOrAbort(withdrawalRowId, errs) 115 return 116 } else { 117 if w.WithdrawalStatus == internal_utils.CONFIRMED || w.WithdrawalStatus == internal_utils.ABORTED { 118 return 119 } 120 if err := transaction.Confirm(w); err != nil { 121 internal_utils.LogError("proc-attestor", err) 122 errs <- err 123 prepareRetryOrAbort(withdrawalRowId, errs) 124 return 125 } 126 } 127 128 completionProof := transaction.Bytes() 129 if len(completionProof) > 0 { 130 // only allow finalization operation, when the completion 131 // proof of the transaction could be retrieved 132 if transaction.AllowWithdrawal() { 133 134 err := db.DB.FinaliseWithdrawal(withdrawalRowId, internal_utils.CONFIRMED, completionProof) 135 if err != nil { 136 internal_utils.LogError("proc-attestor", err) 137 prepareRetryOrAbort(withdrawalRowId, errs) 138 } 139 } else { 140 // when the received transaction is not allowed, we first check if the 141 // transaction is in a final state which will not allow the withdrawal 142 // and therefore the operation can be aborted, without further retries. 143 if transaction.AbortWithdrawal() { 144 err := db.DB.FinaliseWithdrawal(withdrawalRowId, internal_utils.ABORTED, completionProof) 145 if err != nil { 146 internal_utils.LogError("proc-attestor", err) 147 prepareRetryOrAbort(withdrawalRowId, errs) 148 return 149 } 150 } 151 prepareRetryOrAbort(withdrawalRowId, errs) 152 } 153 return 154 } 155 // when the transaction proof was not present (empty proof), retry. 156 prepareRetryOrAbort(withdrawalRowId, errs) 157 } 158 159 // Checks wether the maximal amount of retries was already 160 // reached and the withdrawal operation shall be aborted or 161 // triggers the next retry by setting the last_retry_ts field 162 // which will trigger the stored procedure triggering the retry 163 // process. The retry counter of the retries is handled by the 164 // retrier logic and shall not be set here! 165 func prepareRetryOrAbort( 166 withdrawalRowId int, 167 errs chan error, 168 ) { 169 170 withdrawal, err := db.DB.GetWithdrawalById(withdrawalRowId) 171 if err != nil { 172 internal_utils.LogError("proc-attestor", err) 173 errs <- err 174 return 175 } 176 177 if config.CONFIG.Server.MaxRetries < 0 { 178 prepareRetry(withdrawal, errs) 179 } else { 180 181 if withdrawal.RetryCounter >= config.CONFIG.Server.MaxRetries { 182 183 internal_utils.LogInfo("proc-attestor", fmt.Sprintf("max retries for withdrawal with id=%d was reached. withdrawal is aborted.", withdrawal.WithdrawalRowId)) 184 err := db.DB.FinaliseWithdrawal(withdrawalRowId, internal_utils.ABORTED, make([]byte, 0)) 185 if err != nil { 186 internal_utils.LogError("proc-attestor", err) 187 } 188 } else { 189 prepareRetry(withdrawal, errs) 190 } 191 } 192 } 193 194 func prepareRetry(w *db.Withdrawal, errs chan error) { 195 // refactor this section to set retry counter and last retry field in one query... 196 err := db.DB.SetRetryCounter(int(w.WithdrawalRowId), int(w.RetryCounter)+1) 197 if err != nil { 198 internal_utils.LogError("proc-attestor", err) 199 errs <- err 200 return 201 } 202 lastRetryTs := time.Now().Unix() 203 err = db.DB.SetLastRetry(int(w.WithdrawalRowId), lastRetryTs) 204 if err != nil { 205 internal_utils.LogError("proc-attestor", err) 206 errs <- err 207 return 208 } 209 }