diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index 46d1a4e6d..be8da0af7 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -186,12 +186,16 @@ func New(url string) (*DBJobQueue, error) { stopListener: cancel, } - go q.listen(listenContext) + listenerReady := make(chan struct{}) + go q.listen(listenContext, listenerReady) + + // wait for the listener to become ready + <-listenerReady return q, nil } -func (q *DBJobQueue) listen(ctx context.Context) { +func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) { conn, err := q.pool.Acquire(ctx) if err != nil { panic(fmt.Errorf("error connecting to database: %v", err)) @@ -209,11 +213,14 @@ func (q *DBJobQueue) listen(ctx context.Context) { panic(fmt.Errorf("error listening on jobs channel: %v", err)) } + ready <- struct{}{} + for { _, err = conn.Conn().WaitForNotification(ctx) if err != nil { // shutdown the listener if the context is canceled if errors.Is(err, context.Canceled) { + logrus.Info("Shutting down the listener") return }