jobqueue: add DequeueByID

We will soon need to dequeue a job using its ID. This commit adds ability
to do that to the Jobqueue interface. As always, the fsjobqueue implementation
is slightly naive but it should fine for the usecases that it's designed for.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2021-11-12 11:36:44 +01:00 committed by Sanne Raymaekers
parent 2ecc48727f
commit d3a3dbafed
4 changed files with 145 additions and 4 deletions

View file

@ -18,8 +18,9 @@ import (
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
logrus "github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
)
const (
@ -42,6 +43,18 @@ const (
)
RETURNING id, token, type, args`
sqlDequeueByID = `
UPDATE jobs
SET token = $1, started_at = now()
WHERE id = (
SELECT id
FROM ready_jobs
WHERE id = $2
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING token, type, args`
sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)`
sqlQueryDependencies = `
SELECT dependency_id
@ -208,6 +221,44 @@ func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
return id, token, dependencies, jobType, args, nil
}
func (q *dbJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
// Return early if the context is already canceled.
if err := ctx.Err(); err != nil {
return uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
conn, err := q.pool.Acquire(ctx)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %v", err)
}
defer conn.Release()
var jobType string
var args json.RawMessage
token := uuid.New()
err = conn.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args)
if err == pgx.ErrNoRows {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
} else if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %v", err)
}
// insert heartbeat
_, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
}
dependencies, err := q.jobDependencies(ctx, conn, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
}
logrus.Infof("Dequeued job of type %v with ID %s", jobType, id)
return token, dependencies, jobType, args, nil
}
func (q *dbJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
conn, err := q.pool.Acquire(context.Background())

View file

@ -194,7 +194,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
chans = append(chans, c)
}
// Loop until finding a non-canceled job.
// Loop until finding a non-canceled job and pending.
var j *job
for {
// Unlock the mutex while polling channels, so that multiple goroutines
@ -215,7 +215,8 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
return uuid.Nil, uuid.Nil, nil, "", nil, err
}
if !j.Canceled {
// jobs must be non-cancelled and pending
if !j.Canceled && j.StartedAt.IsZero() {
break
}
}
@ -234,6 +235,41 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
return j.Id, j.Token, j.Dependencies, j.Type, j.Args, nil
}
func (q *fsJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return uuid.Nil, nil, "", nil, err
}
if !j.StartedAt.IsZero() {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
}
depsFinished, err := q.hasAllFinishedDependencies(j)
if err != nil {
return uuid.Nil, nil, "", nil, err
}
if !depsFinished {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
}
j.StartedAt = time.Now()
j.Token = uuid.New()
q.jobIdByToken[j.Token] = j.Id
q.heartbeats[j.Token] = time.Now()
err = q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
}
return j.Token, j.Dependencies, j.Type, j.Args, nil
}
func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
q.mu.Lock()
defer q.mu.Unlock()

View file

@ -39,10 +39,16 @@ type JobQueue interface {
// Waits until a job with a type of any of `jobTypes` is available, or `ctx` is
// canceled.
//
// Returns the job's id, dependencies, type, and arguments, or an error. Arguments
// Returns the job's id, token, dependencies, type, and arguments, or an error. Arguments
// can be unmarshaled to the type given in Enqueue().
Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
// Dequeues a pending job by its ID in a non-blocking way.
//
// Returns the job's token, dependencies, type, and arguments, or an error. Arguments
// can be unmarshaled to the type given in Enqueue().
DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
// Mark the job with `id` as finished. `result` must fit the associated
// job type and must be serializable to JSON.
FinishJob(id uuid.UUID, result interface{}) error
@ -75,6 +81,7 @@ type JobQueue interface {
var (
ErrNotExist = errors.New("job does not exist")
ErrNotPending = errors.New("job is not pending")
ErrNotRunning = errors.New("job is not running")
ErrCanceled = errors.New("job ws canceled")
ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled")

View file

@ -40,6 +40,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) {
t.Run("multiple-workers-single-job-type", wrap(testMultipleWorkersSingleJobType))
t.Run("heartbeats", wrap(testHeartbeats))
t.Run("timeout", wrap(testDequeueTimeout))
t.Run("dequeue-by-id", wrap(testDequeueByID))
}
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
@ -406,3 +407,49 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) {
_, err = q.IdFromToken(tok)
require.Equal(t, err, jobqueue.ErrNotExist)
}
func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
t.Run("basic", func(t *testing.T) {
one := pushTestJob(t, q, "octopus", nil, nil)
two := pushTestJob(t, q, "octopus", nil, nil)
tok, d, typ, args, err := q.DequeueByID(context.Background(), one)
require.NoError(t, err)
require.NotEmpty(t, tok)
require.Empty(t, d)
require.Equal(t, "octopus", typ)
require.NotNil(t, args)
err = q.FinishJob(one, nil)
require.NoError(t, err)
require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, nil))
})
t.Run("cannot dequeue a job without finished deps", func(t *testing.T) {
one := pushTestJob(t, q, "octopus", nil, nil)
two := pushTestJob(t, q, "octopus", nil, []uuid.UUID{one})
_, _, _, _, err := q.DequeueByID(context.Background(), two)
require.Equal(t, jobqueue.ErrNotPending, err)
require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil))
require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, []uuid.UUID{one}))
})
t.Run("cannot dequeue a non-pending job", func(t *testing.T) {
one := pushTestJob(t, q, "octopus", nil, nil)
_, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"})
require.NoError(t, err)
_, _, _, _, err = q.DequeueByID(context.Background(), one)
require.Equal(t, jobqueue.ErrNotPending, err)
err = q.FinishJob(one, nil)
require.NoError(t, err)
_, _, _, _, err = q.DequeueByID(context.Background(), one)
require.Equal(t, jobqueue.ErrNotPending, err)
})
}