From 187eb188da22c4ebff544a658e1954f4159e2f9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Budai?= Date: Fri, 18 Mar 2022 10:43:51 +0100 Subject: [PATCH] dbjoqbqueue: wait for listener to become ready before returning from New MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Otherwise, there might be an already waiting dequeuer and if something is enqueued before `sqlListen` is called, we will lost this notification. Also, a small log message was added when shutting down the listener. Signed-off-by: Ondřej Budai --- internal/jobqueue/dbjobqueue/dbjobqueue.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index 46d1a4e6d..be8da0af7 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -186,12 +186,16 @@ func New(url string) (*DBJobQueue, error) { stopListener: cancel, } - go q.listen(listenContext) + listenerReady := make(chan struct{}) + go q.listen(listenContext, listenerReady) + + // wait for the listener to become ready + <-listenerReady return q, nil } -func (q *DBJobQueue) listen(ctx context.Context) { +func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) { conn, err := q.pool.Acquire(ctx) if err != nil { panic(fmt.Errorf("error connecting to database: %v", err)) @@ -209,11 +213,14 @@ func (q *DBJobQueue) listen(ctx context.Context) { panic(fmt.Errorf("error listening on jobs channel: %v", err)) } + ready <- struct{}{} + for { _, err = conn.Conn().WaitForNotification(ctx) if err != nil { // shutdown the listener if the context is canceled if errors.Is(err, context.Canceled) { + logrus.Info("Shutting down the listener") return }