From d3a3dbafede396526e805daa6544f1bf6292af29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Budai?= Date: Fri, 12 Nov 2021 11:36:44 +0100 Subject: [PATCH] jobqueue: add DequeueByID MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We will soon need to dequeue a job using its ID. This commit adds ability to do that to the Jobqueue interface. As always, the fsjobqueue implementation is slightly naive but it should fine for the usecases that it's designed for. Signed-off-by: Ondřej Budai --- internal/jobqueue/dbjobqueue/dbjobqueue.go | 53 ++++++++++++++++++- internal/jobqueue/fsjobqueue/fsjobqueue.go | 40 +++++++++++++- internal/jobqueue/jobqueue.go | 9 +++- .../jobqueue/jobqueuetest/jobqueuetest.go | 47 ++++++++++++++++ 4 files changed, 145 insertions(+), 4 deletions(-) diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index 89c9ffa81..98f7ef9f5 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -18,8 +18,9 @@ import ( "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" - "github.com/osbuild/osbuild-composer/internal/jobqueue" logrus "github.com/sirupsen/logrus" + + "github.com/osbuild/osbuild-composer/internal/jobqueue" ) const ( @@ -42,6 +43,18 @@ const ( ) RETURNING id, token, type, args` + sqlDequeueByID = ` + UPDATE jobs + SET token = $1, started_at = now() + WHERE id = ( + SELECT id + FROM ready_jobs + WHERE id = $2 + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING token, type, args` + sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)` sqlQueryDependencies = ` SELECT dependency_id @@ -208,6 +221,44 @@ func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, return id, token, dependencies, jobType, args, nil } +func (q *dbJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { + // Return early if the context is already canceled. + if err := ctx.Err(); err != nil { + return uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout + } + + conn, err := q.pool.Acquire(ctx) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %v", err) + } + defer conn.Release() + + var jobType string + var args json.RawMessage + token := uuid.New() + + err = conn.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args) + if err == pgx.ErrNoRows { + return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending + } else if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %v", err) + } + + // insert heartbeat + _, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + } + + dependencies, err := q.jobDependencies(ctx, conn, id) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err) + } + + logrus.Infof("Dequeued job of type %v with ID %s", jobType, id) + + return token, dependencies, jobType, args, nil +} func (q *dbJobQueue) FinishJob(id uuid.UUID, result interface{}) error { conn, err := q.pool.Acquire(context.Background()) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 4094123f2..f3389ee11 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -194,7 +194,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, chans = append(chans, c) } - // Loop until finding a non-canceled job. + // Loop until finding a non-canceled job and pending. var j *job for { // Unlock the mutex while polling channels, so that multiple goroutines @@ -215,7 +215,8 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, return uuid.Nil, uuid.Nil, nil, "", nil, err } - if !j.Canceled { + // jobs must be non-cancelled and pending + if !j.Canceled && j.StartedAt.IsZero() { break } } @@ -234,6 +235,41 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, return j.Id, j.Token, j.Dependencies, j.Type, j.Args, nil } +func (q *fsJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { + q.mu.Lock() + defer q.mu.Unlock() + + j, err := q.readJob(id) + if err != nil { + return uuid.Nil, nil, "", nil, err + } + + if !j.StartedAt.IsZero() { + return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending + } + + depsFinished, err := q.hasAllFinishedDependencies(j) + if err != nil { + return uuid.Nil, nil, "", nil, err + } + if !depsFinished { + return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending + } + + j.StartedAt = time.Now() + + j.Token = uuid.New() + q.jobIdByToken[j.Token] = j.Id + q.heartbeats[j.Token] = time.Now() + + err = q.db.Write(j.Id.String(), j) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err) + } + + return j.Token, j.Dependencies, j.Type, j.Args, nil +} + func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { q.mu.Lock() defer q.mu.Unlock() diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 50fe92d8c..047ed7857 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -39,10 +39,16 @@ type JobQueue interface { // Waits until a job with a type of any of `jobTypes` is available, or `ctx` is // canceled. // - // Returns the job's id, dependencies, type, and arguments, or an error. Arguments + // Returns the job's id, token, dependencies, type, and arguments, or an error. Arguments // can be unmarshaled to the type given in Enqueue(). Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) + // Dequeues a pending job by its ID in a non-blocking way. + // + // Returns the job's token, dependencies, type, and arguments, or an error. Arguments + // can be unmarshaled to the type given in Enqueue(). + DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) + // Mark the job with `id` as finished. `result` must fit the associated // job type and must be serializable to JSON. FinishJob(id uuid.UUID, result interface{}) error @@ -75,6 +81,7 @@ type JobQueue interface { var ( ErrNotExist = errors.New("job does not exist") + ErrNotPending = errors.New("job is not pending") ErrNotRunning = errors.New("job is not running") ErrCanceled = errors.New("job ws canceled") ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled") diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index 4e17b72a3..9d6b629fe 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -40,6 +40,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("multiple-workers-single-job-type", wrap(testMultipleWorkersSingleJobType)) t.Run("heartbeats", wrap(testHeartbeats)) t.Run("timeout", wrap(testDequeueTimeout)) + t.Run("dequeue-by-id", wrap(testDequeueByID)) } func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID { @@ -406,3 +407,49 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { _, err = q.IdFromToken(tok) require.Equal(t, err, jobqueue.ErrNotExist) } + +func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { + t.Run("basic", func(t *testing.T) { + one := pushTestJob(t, q, "octopus", nil, nil) + two := pushTestJob(t, q, "octopus", nil, nil) + + tok, d, typ, args, err := q.DequeueByID(context.Background(), one) + require.NoError(t, err) + require.NotEmpty(t, tok) + require.Empty(t, d) + require.Equal(t, "octopus", typ) + require.NotNil(t, args) + + err = q.FinishJob(one, nil) + require.NoError(t, err) + + require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, nil)) + }) + + t.Run("cannot dequeue a job without finished deps", func(t *testing.T) { + one := pushTestJob(t, q, "octopus", nil, nil) + two := pushTestJob(t, q, "octopus", nil, []uuid.UUID{one}) + + _, _, _, _, err := q.DequeueByID(context.Background(), two) + require.Equal(t, jobqueue.ErrNotPending, err) + + require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil)) + require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, []uuid.UUID{one})) + }) + + t.Run("cannot dequeue a non-pending job", func(t *testing.T) { + one := pushTestJob(t, q, "octopus", nil, nil) + + _, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}) + require.NoError(t, err) + + _, _, _, _, err = q.DequeueByID(context.Background(), one) + require.Equal(t, jobqueue.ErrNotPending, err) + + err = q.FinishJob(one, nil) + require.NoError(t, err) + + _, _, _, _, err = q.DequeueByID(context.Background(), one) + require.Equal(t, jobqueue.ErrNotPending, err) + }) +}