summaryrefslogtreecommitdiff
path: root/c2ec/listener.go
diff options
context:
space:
mode:
Diffstat (limited to 'c2ec/listener.go')
-rw-r--r--c2ec/listener.go47
1 files changed, 47 insertions, 0 deletions
diff --git a/c2ec/listener.go b/c2ec/listener.go
new file mode 100644
index 0000000..9e53602
--- /dev/null
+++ b/c2ec/listener.go
@@ -0,0 +1,47 @@
+package main
+
+import (
+ "context"
+ "errors"
+)
+
+func RunListener(
+ ctx context.Context,
+ channel string,
+ callback func(*Notification, chan error),
+ notifications chan *Notification,
+ errs chan error,
+) {
+
+ listenFunc, err := DB.NewListener(channel, notifications)
+ if err != nil {
+ LogError("listener", err)
+ errs <- errors.New("failed setting up listener")
+ return
+ }
+
+ go func() {
+ LogInfo("listener", "listener starts listening for notifications at the db for channel="+channel)
+ err := listenFunc(ctx)
+ if err != nil {
+ LogError("listener", err)
+ errs <- err
+ }
+ close(notifications)
+ close(errs)
+ }()
+
+ // Listen is started async. We can therefore block here and must
+ // not run the retrieval logic in own goroutine
+ for {
+ select {
+ case notification := <-notifications:
+ // the dispatching and further processing can be done asynchronously
+ // thus not blocking further incoming notifications.
+ go callback(notification, errs)
+ case <-ctx.Done():
+ errs <- ctx.Err()
+ return
+ }
+ }
+}