From 26b8e2ff6ebdf06ecd71f23218b6dfee0bb7304c Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Thu, 3 Nov 2022 10:58:31 +0100 Subject: [PATCH] dbjobqueue: acquire a new connection for each listen query This fixes a bug where the listen function would keep trying to use a closed, unrecoverable connection to listen for a notification. This continued failing, which essentially made the queue instance useless. --- pkg/jobqueue/dbjobqueue/dbjobqueue.go | 56 +++++++++++++++++---------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 9ec482982..0af916077 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -199,8 +199,33 @@ func NewWithConfig(url string, config Config) (*DBJobQueue, error) { } func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) { + ready <- struct{}{} + + for { + err := q.waitAndNotify(ctx) + if err != nil { + // shutdown the listener if the context is canceled + if errors.Is(err, context.Canceled) { + q.logger.Info("Shutting down the listener") + return + } + + // otherwise, just log the error and continue, there might just + // be a temporary networking issue + q.logger.Error(err, "Error waiting for notification on jobs channel") + + // backoff to avoid log spam + time.Sleep(time.Millisecond * 500) + } + } +} + +func (q *DBJobQueue) waitAndNotify(ctx context.Context) error { conn, err := q.pool.Acquire(ctx) if err != nil { + if errors.Is(err, context.Canceled) { + return err + } panic(fmt.Errorf("error connecting to database: %v", err)) } defer func() { @@ -214,31 +239,20 @@ func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) { _, err = conn.Exec(ctx, sqlListen) if err != nil { + if errors.Is(err, context.Canceled) { + return err + } panic(fmt.Errorf("error listening on jobs channel: %v", err)) } - ready <- struct{}{} - - for { - _, err = conn.Conn().WaitForNotification(ctx) - if err != nil { - // shutdown the listener if the context is canceled - if errors.Is(err, context.Canceled) { - q.logger.Info("Shutting down the listener") - return - } - - // otherwise, just log the error and continue, there might just - // be a temporary networking issue - q.logger.Error(err, "Error waiting for notification on jobs channel") - // backoff to avoid log spam - time.Sleep(time.Millisecond * 500) - continue - } - - // something happened in the database, notify all dequeuers - q.dequeuers.notifyAll() + _, err = conn.Conn().WaitForNotification(ctx) + if err != nil { + return err } + + // something happened in the database, notify all dequeuers + q.dequeuers.notifyAll() + return nil } func (q *DBJobQueue) Close() {