diff options
Diffstat (limited to 'c2ec/listener.go')
-rw-r--r-- | c2ec/listener.go | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/c2ec/listener.go b/c2ec/listener.go new file mode 100644 index 0000000..9e53602 --- /dev/null +++ b/c2ec/listener.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + "errors" +) + +func RunListener( + ctx context.Context, + channel string, + callback func(*Notification, chan error), + notifications chan *Notification, + errs chan error, +) { + + listenFunc, err := DB.NewListener(channel, notifications) + if err != nil { + LogError("listener", err) + errs <- errors.New("failed setting up listener") + return + } + + go func() { + LogInfo("listener", "listener starts listening for notifications at the db for channel="+channel) + err := listenFunc(ctx) + if err != nil { + LogError("listener", err) + errs <- err + } + close(notifications) + close(errs) + }() + + // Listen is started async. We can therefore block here and must + // not run the retrieval logic in own goroutine + for { + select { + case notification := <-notifications: + // the dispatching and further processing can be done asynchronously + // thus not blocking further incoming notifications. + go callback(notification, errs) + case <-ctx.Done(): + errs <- ctx.Err() + return + } + } +} |