diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 9ec482982..0af916077 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -199,8 +199,33 @@ func NewWithConfig(url string, config Config) (*DBJobQueue, error) { } func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) { + ready <- struct{}{} + + for { + err := q.waitAndNotify(ctx) + if err != nil { + // shutdown the listener if the context is canceled + if errors.Is(err, context.Canceled) { + q.logger.Info("Shutting down the listener") + return + } + + // otherwise, just log the error and continue, there might just + // be a temporary networking issue + q.logger.Error(err, "Error waiting for notification on jobs channel") + + // backoff to avoid log spam + time.Sleep(time.Millisecond * 500) + } + } +} + +func (q *DBJobQueue) waitAndNotify(ctx context.Context) error { conn, err := q.pool.Acquire(ctx) if err != nil { + if errors.Is(err, context.Canceled) { + return err + } panic(fmt.Errorf("error connecting to database: %v", err)) } defer func() { @@ -214,31 +239,20 @@ func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) { _, err = conn.Exec(ctx, sqlListen) if err != nil { + if errors.Is(err, context.Canceled) { + return err + } panic(fmt.Errorf("error listening on jobs channel: %v", err)) } - ready <- struct{}{} - - for { - _, err = conn.Conn().WaitForNotification(ctx) - if err != nil { - // shutdown the listener if the context is canceled - if errors.Is(err, context.Canceled) { - q.logger.Info("Shutting down the listener") - return - } - - // otherwise, just log the error and continue, there might just - // be a temporary networking issue - q.logger.Error(err, "Error waiting for notification on jobs channel") - // backoff to avoid log spam - time.Sleep(time.Millisecond * 500) - continue - } - - // something happened in the database, notify all dequeuers - q.dequeuers.notifyAll() + _, err = conn.Conn().WaitForNotification(ctx) + if err != nil { + return err } + + // something happened in the database, notify all dequeuers + q.dequeuers.notifyAll() + return nil } func (q *DBJobQueue) Close() {