worker: Configurable timeout for RequestJob
This is backwards compatible, as long as the timeout is 0 (never timeout), which is the default. In case of the dbjobqueue the underlying timeout is due to context.Canceled, context.DeadlineExceeded, or net.Error with Timeout() true. For the fsjobqueue only the first two are considered.
This commit is contained in:
parent
9075dbc61d
commit
d25ae71fef
19 changed files with 171 additions and 74 deletions
|
|
@ -14,6 +14,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgconn"
|
||||
"github.com/jackc/pgtype"
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
|
|
@ -149,7 +150,7 @@ func (q *dbJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
|
|||
func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
|
||||
// Return early if the context is already canceled.
|
||||
if err := ctx.Err(); err != nil {
|
||||
return uuid.Nil, uuid.Nil, nil, "", nil, err
|
||||
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
|
||||
}
|
||||
|
||||
conn, err := q.pool.Acquire(ctx)
|
||||
|
|
@ -183,6 +184,9 @@ func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
|
|||
}
|
||||
_, err = conn.Conn().WaitForNotification(ctx)
|
||||
if err != nil {
|
||||
if pgconn.Timeout(err) {
|
||||
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
|
||||
}
|
||||
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error waiting for notification on jobs channel: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -178,7 +178,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
|
|||
|
||||
// Return early if the context is already canceled.
|
||||
if err := ctx.Err(); err != nil {
|
||||
return uuid.Nil, uuid.Nil, nil, "", nil, err
|
||||
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
|
||||
}
|
||||
|
||||
// Filter q.pending by the `jobTypes`. Ignore those job types that this
|
||||
|
|
@ -212,6 +212,9 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
|
|||
}
|
||||
|
||||
if err != nil {
|
||||
if errors.As(err, &context.Canceled) || errors.As(err, &context.DeadlineExceeded) {
|
||||
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
|
||||
}
|
||||
return uuid.Nil, uuid.Nil, nil, "", nil, err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -74,7 +74,8 @@ type JobQueue interface {
|
|||
}
|
||||
|
||||
var (
|
||||
ErrNotExist = errors.New("job does not exist")
|
||||
ErrNotRunning = errors.New("job is not running")
|
||||
ErrCanceled = errors.New("job ws canceled")
|
||||
ErrNotExist = errors.New("job does not exist")
|
||||
ErrNotRunning = errors.New("job is not running")
|
||||
ErrCanceled = errors.New("job ws canceled")
|
||||
ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled")
|
||||
)
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) {
|
|||
t.Run("dependencies", wrap(testDependencies))
|
||||
t.Run("multiple-workers", wrap(testMultipleWorkers))
|
||||
t.Run("heartbeats", wrap(testHeartbeats))
|
||||
t.Run("timeout", wrap(testDequeueTimeout))
|
||||
}
|
||||
|
||||
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
|
||||
|
|
@ -153,7 +154,7 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"})
|
||||
require.Equal(t, err, context.Canceled)
|
||||
require.Equal(t, err, jobqueue.ErrDequeueTimeout)
|
||||
require.Equal(t, uuid.Nil, id)
|
||||
require.Equal(t, uuid.Nil, tok)
|
||||
require.Empty(t, deps)
|
||||
|
|
@ -161,6 +162,18 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) {
|
|||
require.Nil(t, args)
|
||||
}
|
||||
|
||||
func testDequeueTimeout(t *testing.T, q jobqueue.JobQueue) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20)
|
||||
defer cancel()
|
||||
_, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"})
|
||||
require.Equal(t, jobqueue.ErrDequeueTimeout, err)
|
||||
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
cancel2()
|
||||
_, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"})
|
||||
require.Equal(t, jobqueue.ErrDequeueTimeout, err)
|
||||
}
|
||||
|
||||
func testDependencies(t *testing.T, q jobqueue.JobQueue) {
|
||||
t.Run("done-before-pushing-dependant", func(t *testing.T) {
|
||||
one := pushTestJob(t, q, "test", nil, nil)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue