dbjobqueue: acquire a new connection for each listen query
This fixes a bug where the listen function would keep trying to use a closed, unrecoverable connection to listen for a notification. This continued failing, which essentially made the queue instance useless.
This commit is contained in:
parent
63f1b8d9fb
commit
26b8e2ff6e
1 changed files with 35 additions and 21 deletions
|
|
@ -199,8 +199,33 @@ func NewWithConfig(url string, config Config) (*DBJobQueue, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) {
|
func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) {
|
||||||
|
ready <- struct{}{}
|
||||||
|
|
||||||
|
for {
|
||||||
|
err := q.waitAndNotify(ctx)
|
||||||
|
if err != nil {
|
||||||
|
// shutdown the listener if the context is canceled
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
q.logger.Info("Shutting down the listener")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise, just log the error and continue, there might just
|
||||||
|
// be a temporary networking issue
|
||||||
|
q.logger.Error(err, "Error waiting for notification on jobs channel")
|
||||||
|
|
||||||
|
// backoff to avoid log spam
|
||||||
|
time.Sleep(time.Millisecond * 500)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *DBJobQueue) waitAndNotify(ctx context.Context) error {
|
||||||
conn, err := q.pool.Acquire(ctx)
|
conn, err := q.pool.Acquire(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
panic(fmt.Errorf("error connecting to database: %v", err))
|
panic(fmt.Errorf("error connecting to database: %v", err))
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -214,31 +239,20 @@ func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) {
|
||||||
|
|
||||||
_, err = conn.Exec(ctx, sqlListen)
|
_, err = conn.Exec(ctx, sqlListen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
panic(fmt.Errorf("error listening on jobs channel: %v", err))
|
panic(fmt.Errorf("error listening on jobs channel: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
ready <- struct{}{}
|
_, err = conn.Conn().WaitForNotification(ctx)
|
||||||
|
if err != nil {
|
||||||
for {
|
return err
|
||||||
_, err = conn.Conn().WaitForNotification(ctx)
|
|
||||||
if err != nil {
|
|
||||||
// shutdown the listener if the context is canceled
|
|
||||||
if errors.Is(err, context.Canceled) {
|
|
||||||
q.logger.Info("Shutting down the listener")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// otherwise, just log the error and continue, there might just
|
|
||||||
// be a temporary networking issue
|
|
||||||
q.logger.Error(err, "Error waiting for notification on jobs channel")
|
|
||||||
// backoff to avoid log spam
|
|
||||||
time.Sleep(time.Millisecond * 500)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// something happened in the database, notify all dequeuers
|
|
||||||
q.dequeuers.notifyAll()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// something happened in the database, notify all dequeuers
|
||||||
|
q.dequeuers.notifyAll()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *DBJobQueue) Close() {
|
func (q *DBJobQueue) Close() {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue