c2ec.go (10619B)
1 package internal 2 3 import ( 4 internal_api "c2ec/internal/api" 5 internal_postgres "c2ec/internal/db/postgres" 6 internal_proc "c2ec/internal/proc" 7 internal_provider_simulation "c2ec/internal/provider/simulation" 8 internal_provider_wallee "c2ec/internal/provider/wallee" 9 internal_utils "c2ec/internal/utils" 10 "c2ec/pkg/config" 11 "c2ec/pkg/db" 12 "c2ec/pkg/provider" 13 "context" 14 "errors" 15 "fmt" 16 "net" 17 "net/http" 18 "os" 19 "os/signal" 20 "syscall" 21 ) 22 23 const GET = "GET " 24 const POST = "POST " 25 const DELETE = "DELETE " 26 27 // https://docs.taler.net/core/api-terminal.html#endpoints-for-integrated-sub-apis 28 const BANK_INTEGRATION_API = "/taler-integration" 29 const WIRE_GATEWAY_API = "/taler-wire-gateway" 30 31 const WIRE_GATEWAY_CONFIG_ENDPOINT = "/config" 32 const WIRE_GATEWAY_HISTORY_ENDPOINT = "/history" 33 34 const WIRE_GATEWAY_CONFIG_PATTERN = WIRE_GATEWAY_CONFIG_ENDPOINT 35 const WIRE_TRANSFER_PATTERN = "/transfer" 36 const WIRE_HISTORY_INCOMING_PATTERN = WIRE_GATEWAY_HISTORY_ENDPOINT + "/incoming" 37 const WIRE_HISTORY_OUTGOING_PATTERN = WIRE_GATEWAY_HISTORY_ENDPOINT + "/outgoing" 38 const WIRE_ADMIN_ADD_INCOMING_PATTERN = "/admin/add-incoming" 39 40 const WITHDRAWAL_OPERATION = "/withdrawal-operation" 41 42 const WOPID_PARAMETER = "wopid" 43 const BANK_INTEGRATION_CONFIG_PATTERN = "/config" 44 const WITHDRAWAL_OPERATION_PATTERN = WITHDRAWAL_OPERATION 45 const WITHDRAWAL_OPERATION_BY_WOPID_PATTERN = WITHDRAWAL_OPERATION + "/{" + WOPID_PARAMETER + "}" 46 const WITHDRAWAL_OPERATION_ABORTION_PATTERN = WITHDRAWAL_OPERATION_BY_WOPID_PATTERN + "/abort" 47 48 const TERMINAL_API_CONFIG = "/config" 49 const TERMINAL_API_REGISTER_WITHDRAWAL = "/withdrawals" 50 const TERMINAL_API_WITHDRAWAL_STATUS = "/withdrawals/{wopid}" 51 const TERMINAL_API_CHECK_WITHDRAWAL = "/withdrawals/{wopid}/check" 52 const TERMINAL_API_ABORT_WITHDRAWAL = "/withdrawals/{wopid}/abort" 53 54 func C2EC() { 55 56 d, err := setupDatabase(&config.CONFIG.Database) 57 if err != nil { 58 panic("unable to connect to datatbase: " + err.Error()) 59 } 60 db.DB = d 61 62 err = setupProviderClients(&config.CONFIG) 63 if err != nil { 64 panic("unable initialize provider clients: " + err.Error()) 65 } 66 internal_utils.LogInfo("c2ec", "provider clients are setup") 67 68 retryCtx, retryCancel := context.WithCancel(context.Background()) 69 defer retryCancel() 70 retryErrs := make(chan error) 71 internal_proc.RunRetrier(retryCtx, retryErrs) 72 internal_utils.LogInfo("c2ec", "retrier is running") 73 74 attestorCtx, attestorCancel := context.WithCancel(context.Background()) 75 defer attestorCancel() 76 attestorErrs := make(chan error) 77 internal_proc.RunAttestor(attestorCtx, attestorErrs) 78 internal_utils.LogInfo("c2ec", "attestor is running") 79 80 transferCtx, transferCancel := context.WithCancel(context.Background()) 81 defer transferCancel() 82 transferErrs := make(chan error) 83 internal_proc.RunTransferrer(transferCtx, transferErrs) 84 internal_utils.LogInfo("c2ec", "refunder is running") 85 86 router := http.NewServeMux() 87 routerErrs := make(chan error) 88 89 setupBankIntegrationRoutes(router) 90 setupWireGatewayRoutes(router) 91 setupTerminalRoutes(router) 92 setupAgplRoutes(router) 93 94 startListening(router, routerErrs) 95 96 // since listening for incoming request, attesting payments and 97 // retrying payments are separated processes who can fail 98 // we must take care of this here. The main process is used to 99 // dispatch incoming http request and parent of the confirmation 100 // and retry processes. If the main process fails somehow, also 101 // confirmation and retries will end. But if somehow the confirmation 102 // or retry process fail, they will be restarted and the error is 103 // written to the log. If some setup tasks are failing, the program 104 // panics. 105 for { 106 select { 107 case routerError := <-routerErrs: 108 internal_utils.LogError("c2ec", routerError) 109 attestorCancel() 110 retryCancel() 111 transferCancel() 112 panic(routerError) 113 case <-attestorCtx.Done(): 114 attestorCancel() // first run old cancellation function 115 attestorCtx, attestorCancel = context.WithCancel(context.Background()) 116 internal_proc.RunAttestor(attestorCtx, attestorErrs) 117 case <-retryCtx.Done(): 118 retryCancel() // first run old cancellation function 119 retryCtx, retryCancel = context.WithCancel(context.Background()) 120 internal_proc.RunRetrier(retryCtx, retryErrs) 121 case <-transferCtx.Done(): 122 transferCancel() // first run old cancellation function 123 transferCtx, transferCancel = context.WithCancel(context.Background()) 124 internal_proc.RunTransferrer(transferCtx, transferErrs) 125 case confirmationError := <-attestorErrs: 126 internal_utils.LogError("c2ec-from-proc-attestor", confirmationError) 127 case retryError := <-retryErrs: 128 internal_utils.LogError("c2ec-from-proc-retrier", retryError) 129 case transferError := <-transferErrs: 130 internal_utils.LogError("c2ec-from-proc-transfer", transferError) 131 } 132 } 133 } 134 135 func setupDatabase(cfg *config.C2ECDatabseConfig) (db.C2ECDatabase, error) { 136 137 return internal_postgres.NewC2ECPostgres(cfg) 138 } 139 140 func setupProviderClients(cfg *config.C2ECConfig) error { 141 142 if db.DB == nil { 143 return errors.New("setup database first") 144 } 145 146 for _, provider := range cfg.Providers { 147 148 p, err := db.DB.GetTerminalProviderByName(provider.Name) 149 if err != nil { 150 return err 151 } 152 153 if p == nil { 154 if cfg.Server.IsProd || cfg.Server.StrictAttestors { 155 panic("no provider entry for " + provider.Name) 156 } else { 157 internal_utils.LogWarn("non-strict attestor initialization. skipping", provider.Name) 158 continue 159 } 160 } 161 162 if !cfg.Server.IsProd { 163 // Prevent simulation client to be loaded in productive environments. 164 if p.Name == "Simulation" { 165 166 simulationClient := new(internal_provider_simulation.SimulationClient) 167 err := simulationClient.SetupClient(p) 168 if err != nil { 169 return err 170 } 171 internal_utils.LogInfo("c2ec", "setup the Simulation provider") 172 } 173 } 174 175 if p.Name == "Wallee" { 176 177 walleeClient := new(internal_provider_wallee.WalleeClient) 178 err := walleeClient.SetupClient(p) 179 if err != nil { 180 return err 181 } 182 internal_utils.LogInfo("c2ec", "setup the Wallee provider") 183 } 184 185 // For new added provider, add the respective if-clause 186 } 187 188 for _, p := range config.CONFIG.Providers { 189 if provider.PROVIDER_CLIENTS[p.Name] == nil { 190 err := errors.New("no provider client initialized for provider " + p.Name) 191 internal_utils.LogError("retrier", err) 192 return err 193 } 194 } 195 196 return nil 197 } 198 199 func setupBankIntegrationRoutes(router *http.ServeMux) { 200 201 router.HandleFunc( 202 GET+BANK_INTEGRATION_API+BANK_INTEGRATION_CONFIG_PATTERN, 203 internal_api.BankIntegrationConfigApi, 204 ) 205 internal_utils.LogInfo("c2ec", "setup "+GET+BANK_INTEGRATION_API+BANK_INTEGRATION_CONFIG_PATTERN) 206 207 router.HandleFunc( 208 GET+BANK_INTEGRATION_API+WITHDRAWAL_OPERATION_BY_WOPID_PATTERN, 209 internal_api.HandleWithdrawalStatus, 210 ) 211 internal_utils.LogInfo("c2ec", "setup "+GET+BANK_INTEGRATION_API+WITHDRAWAL_OPERATION_BY_WOPID_PATTERN) 212 213 router.HandleFunc( 214 POST+BANK_INTEGRATION_API+WITHDRAWAL_OPERATION_BY_WOPID_PATTERN, 215 internal_api.HandleParameterRegistration, 216 ) 217 internal_utils.LogInfo("c2ec", "setup "+POST+BANK_INTEGRATION_API+WITHDRAWAL_OPERATION_BY_WOPID_PATTERN) 218 219 router.HandleFunc( 220 POST+BANK_INTEGRATION_API+WITHDRAWAL_OPERATION_ABORTION_PATTERN, 221 internal_api.HandleWithdrawalAbort, 222 ) 223 internal_utils.LogInfo("c2ec", "setup "+POST+BANK_INTEGRATION_API+WITHDRAWAL_OPERATION_ABORTION_PATTERN) 224 } 225 226 func setupWireGatewayRoutes(router *http.ServeMux) { 227 228 router.HandleFunc( 229 GET+WIRE_GATEWAY_API+WIRE_GATEWAY_CONFIG_PATTERN, 230 internal_api.WireGatewayConfig, 231 ) 232 internal_utils.LogInfo("c2ec", "setup "+GET+WIRE_GATEWAY_API+WIRE_GATEWAY_CONFIG_PATTERN) 233 234 router.HandleFunc( 235 POST+WIRE_GATEWAY_API+WIRE_TRANSFER_PATTERN, 236 internal_api.Transfer, 237 ) 238 internal_utils.LogInfo("c2ec", "setup "+POST+WIRE_GATEWAY_API+WIRE_TRANSFER_PATTERN) 239 240 router.HandleFunc( 241 GET+WIRE_GATEWAY_API+WIRE_HISTORY_INCOMING_PATTERN, 242 internal_api.HistoryIncoming, 243 ) 244 internal_utils.LogInfo("c2ec", "setup "+GET+WIRE_GATEWAY_API+WIRE_HISTORY_INCOMING_PATTERN) 245 246 router.HandleFunc( 247 GET+WIRE_GATEWAY_API+WIRE_HISTORY_OUTGOING_PATTERN, 248 internal_api.HistoryOutgoing, 249 ) 250 internal_utils.LogInfo("c2ec", "setup "+GET+WIRE_GATEWAY_API+WIRE_HISTORY_OUTGOING_PATTERN) 251 252 router.HandleFunc( 253 POST+WIRE_GATEWAY_API+WIRE_ADMIN_ADD_INCOMING_PATTERN, 254 internal_api.AdminAddIncoming, 255 ) 256 internal_utils.LogInfo("c2ec", "setup "+POST+WIRE_GATEWAY_API+WIRE_ADMIN_ADD_INCOMING_PATTERN) 257 } 258 259 func setupTerminalRoutes(router *http.ServeMux) { 260 261 router.HandleFunc( 262 GET+TERMINAL_API_CONFIG, 263 internal_api.HandleTerminalConfig, 264 ) 265 internal_utils.LogInfo("c2ec", "setup "+GET+TERMINAL_API_CONFIG) 266 267 router.HandleFunc( 268 POST+TERMINAL_API_REGISTER_WITHDRAWAL, 269 internal_api.HandleWithdrawalSetup, 270 ) 271 internal_utils.LogInfo("c2ec", "setup "+POST+TERMINAL_API_REGISTER_WITHDRAWAL) 272 273 router.HandleFunc( 274 POST+TERMINAL_API_CHECK_WITHDRAWAL, 275 internal_api.HandleWithdrawalCheck, 276 ) 277 internal_utils.LogInfo("c2ec", "setup "+POST+TERMINAL_API_CHECK_WITHDRAWAL) 278 279 router.HandleFunc( 280 GET+TERMINAL_API_WITHDRAWAL_STATUS, 281 internal_api.HandleWithdrawalStatusTerminal, 282 ) 283 internal_utils.LogInfo("c2ec", "setup "+GET+TERMINAL_API_WITHDRAWAL_STATUS) 284 285 router.HandleFunc( 286 DELETE+TERMINAL_API_ABORT_WITHDRAWAL, 287 internal_api.HandleWithdrawalAbortTerminal, 288 ) 289 internal_utils.LogInfo("c2ec", "setup "+DELETE+TERMINAL_API_ABORT_WITHDRAWAL) 290 } 291 292 func setupAgplRoutes(router *http.ServeMux) { 293 294 router.HandleFunc( 295 GET+"/agpl", 296 internal_api.Agpl, 297 ) 298 internal_utils.LogInfo("c2ec", "setup "+GET+"/agpl") 299 } 300 301 func startListening(router *http.ServeMux, errs chan error) { 302 303 server := http.Server{ 304 Handler: internal_utils.LoggingHandler(router), 305 } 306 307 if config.CONFIG.Server.UseUnixDomainSocket { 308 309 internal_utils.LogInfo("c2ec", "using domain sockets") 310 socket, err := net.Listen("unix", config.CONFIG.Server.UnixSocketPath) 311 if err != nil { 312 panic("failed listening on socket: " + err.Error()) 313 } 314 315 // cleans up socket when process fails and is shutdown. 316 c := make(chan os.Signal, 1) 317 signal.Notify(c, os.Interrupt, syscall.SIGTERM) 318 go func() { 319 <-c 320 os.Remove(config.CONFIG.Server.UnixSocketPath) 321 os.Exit(1) 322 }() 323 324 go func() { 325 internal_utils.LogInfo("c2ec", "serving at unix-domain-socket "+server.Addr) 326 if err = server.Serve(socket); err != nil { 327 errs <- err 328 } 329 }() 330 } else { 331 332 internal_utils.LogInfo("c2ec", "using tcp") 333 go func() { 334 server.Addr = fmt.Sprintf("%s:%d", config.CONFIG.Server.Host, config.CONFIG.Server.Port) 335 internal_utils.LogInfo("c2ec", "serving at "+server.Addr) 336 if err := server.ListenAndServe(); err != nil { 337 internal_utils.LogError("c2ec", err) 338 errs <- err 339 } 340 }() 341 } 342 }