dbjoqbqueue: wait for listener to become ready before returning from New
Otherwise, there might be an already waiting dequeuer and if something is enqueued before `sqlListen` is called, we will lost this notification. Also, a small log message was added when shutting down the listener. Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
parent
c21596cd99
commit
187eb188da
1 changed files with 9 additions and 2 deletions
|
|
@ -186,12 +186,16 @@ func New(url string) (*DBJobQueue, error) {
|
|||
stopListener: cancel,
|
||||
}
|
||||
|
||||
go q.listen(listenContext)
|
||||
listenerReady := make(chan struct{})
|
||||
go q.listen(listenContext, listenerReady)
|
||||
|
||||
// wait for the listener to become ready
|
||||
<-listenerReady
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (q *DBJobQueue) listen(ctx context.Context) {
|
||||
func (q *DBJobQueue) listen(ctx context.Context, ready chan<- struct{}) {
|
||||
conn, err := q.pool.Acquire(ctx)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("error connecting to database: %v", err))
|
||||
|
|
@ -209,11 +213,14 @@ func (q *DBJobQueue) listen(ctx context.Context) {
|
|||
panic(fmt.Errorf("error listening on jobs channel: %v", err))
|
||||
}
|
||||
|
||||
ready <- struct{}{}
|
||||
|
||||
for {
|
||||
_, err = conn.Conn().WaitForNotification(ctx)
|
||||
if err != nil {
|
||||
// shutdown the listener if the context is canceled
|
||||
if errors.Is(err, context.Canceled) {
|
||||
logrus.Info("Shutting down the listener")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue