diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index d98171044..ba7f69646 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -40,6 +40,17 @@ type fsJobQueue struct { // Maps job ids to the jobs that depend on it, if any of those // dependants have not yet finished. dependants map[uuid.UUID][]uuid.UUID + + // Currently running jobs. Workers are not handed job ids, but + // independent tokens which serve as an indirection. This enables + // race-free uploading of artifacts and makes restarting composer more + // robust (workers from an old run cannot report results for jobs + // composer thinks are not running). + // This map maps these tokens to job ids. Artifacts are stored in + // `$STATE_DIRECTORY/artifacts/tmp/$TOKEN` while the worker is running, + // and renamed to `$STATE_DIRECTORY/artifacts/$JOB_ID` once the job is + // reported as done. + jobIdByToken map[uuid.UUID]uuid.UUID } // On-disk job struct. Contains all necessary (but non-redundant) information @@ -47,6 +58,7 @@ type fsJobQueue struct { // (de)serialized on each access. type job struct { Id uuid.UUID `json:"id"` + Token uuid.UUID `json:"token"` Type string `json:"type"` Args json.RawMessage `json:"args,omitempty"` Dependencies []uuid.UUID `json:"dependencies"` @@ -68,9 +80,10 @@ const channelSize = 100 // loaded and rescheduled to run if necessary. func New(dir string) (*fsJobQueue, error) { q := &fsJobQueue{ - db: jsondb.New(dir, 0600), - pending: make(map[string]chan uuid.UUID), - dependants: make(map[uuid.UUID][]uuid.UUID), + db: jsondb.New(dir, 0600), + pending: make(map[string]chan uuid.UUID), + dependants: make(map[uuid.UUID][]uuid.UUID), + jobIdByToken: make(map[uuid.UUID]uuid.UUID), } // Look for jobs that are still pending and build the dependant map. @@ -78,15 +91,30 @@ func New(dir string) (*fsJobQueue, error) { if err != nil { return nil, fmt.Errorf("error listing jobs: %v", err) } + for _, id := range ids { - uuid, err := uuid.Parse(id) + jobId, err := uuid.Parse(id) if err != nil { return nil, fmt.Errorf("invalid job '%s' in db: %v", id, err) } - j, err := q.readJob(uuid) + j, err := q.readJob(jobId) if err != nil { return nil, err } + + // If a job is running, and not cancelled, track the token + if !j.StartedAt.IsZero() && j.FinishedAt.IsZero() && !j.Canceled { + // Fail older running jobs which don't have a token stored + if j.Token == uuid.Nil { + err = q.FinishJob(j.Id, nil) + if err != nil { + return nil, fmt.Errorf("Error finishing job '%s' without a token: %v", j.Id, err) + } + } else { + q.jobIdByToken[j.Token] = j.Id + } + } + err = q.maybeEnqueue(j, true) if err != nil { return nil, err @@ -102,6 +130,7 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu var j = job{ Id: uuid.New(), + Token: uuid.Nil, Type: jobType, Dependencies: dependencies, QueuedAt: time.Now(), @@ -140,13 +169,13 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return j.Id, nil } -func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { q.mu.Lock() defer q.mu.Unlock() // Return early if the context is already canceled. if err := ctx.Err(); err != nil { - return uuid.Nil, nil, "", nil, err + return uuid.Nil, uuid.Nil, nil, "", nil, err } // Filter q.pending by the `jobTypes`. Ignore those job types that this @@ -180,12 +209,12 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, } if err != nil { - return uuid.Nil, nil, "", nil, err + return uuid.Nil, uuid.Nil, nil, "", nil, err } j, err = q.readJob(id) if err != nil { - return uuid.Nil, nil, "", nil, err + return uuid.Nil, uuid.Nil, nil, "", nil, err } if !j.Canceled { @@ -195,12 +224,15 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, j.StartedAt = time.Now() + j.Token = uuid.New() + q.jobIdByToken[j.Token] = j.Id + err := q.db.Write(j.Id.String(), j) if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err) + return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err) } - return j.Id, j.Dependencies, j.Type, j.Args, nil + return j.Id, j.Token, j.Dependencies, j.Type, j.Args, nil } func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { @@ -227,6 +259,9 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { return fmt.Errorf("error marshaling result: %v", err) } + delete(q.jobIdByToken, j.Token) + j.Token = uuid.Nil + // Write before notifying dependants, because it will be read again. err = q.db.Write(id.String(), j) if err != nil { @@ -300,6 +335,16 @@ func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de return } +func (q *fsJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) { + q.mu.Lock() + defer q.mu.Unlock() + id, ok := q.jobIdByToken[token] + if !ok { + return uuid.Nil, jobqueue.ErrNotExist + } + return id, nil +} + // Reads job with `id`. This is a thin wrapper around `q.db.Read`, which // returns the job directly, or and error if a job with `id` does not exist. func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) { diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index 1348e4252..cf7dee86a 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -43,9 +43,10 @@ func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interfa } func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID { - id, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType}) + id, tok, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType}) require.NoError(t, err) require.NotEmpty(t, id) + require.NotEmpty(t, tok) require.ElementsMatch(t, deps, d) require.Equal(t, jobType, typ) require.NotNil(t, args) @@ -75,6 +76,24 @@ func TestErrors(t *testing.T) { id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()}) require.Error(t, err) require.Equal(t, uuid.Nil, id) + + // token gets removed + pushTestJob(t, q, "octopus", nil, nil) + id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}) + require.NoError(t, err) + require.NotEmpty(t, tok) + + idFromT, err := q.IdFromToken(tok) + require.NoError(t, err) + require.Equal(t, id, idFromT) + + err = q.FinishJob(id, nil) + require.NoError(t, err) + + // Make sure the token gets removed + id, err = q.IdFromToken(tok) + require.Equal(t, uuid.Nil, id) + require.Equal(t, jobqueue.ErrNotExist, err) } func TestArgs(t *testing.T) { @@ -94,9 +113,10 @@ func TestArgs(t *testing.T) { var parsedArgs argument - id, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}) + id, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}) require.NoError(t, err) require.Equal(t, two, id) + require.NotEmpty(t, tok) require.Empty(t, deps) require.Equal(t, "octopus", typ) err = json.Unmarshal(args, &parsedArgs) @@ -110,9 +130,10 @@ func TestArgs(t *testing.T) { require.Equal(t, deps, jdeps) require.Equal(t, typ, jtype) - id, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}) + id, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}) require.NoError(t, err) require.Equal(t, one, id) + require.NotEmpty(t, tok) require.Empty(t, deps) require.Equal(t, "fish", typ) err = json.Unmarshal(args, &parsedArgs) @@ -141,9 +162,10 @@ func TestJobTypes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - id, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}) require.Equal(t, err, context.Canceled) require.Equal(t, uuid.Nil, id) + require.Equal(t, uuid.Nil, tok) require.Empty(t, deps) require.Equal(t, "", typ) require.Nil(t, args) @@ -229,9 +251,10 @@ func TestMultipleWorkers(t *testing.T) { defer close(done) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"}) require.NoError(t, err) require.NotEmpty(t, id) + require.NotEmpty(t, tok) require.Empty(t, deps) require.Equal(t, "octopus", typ) require.Equal(t, json.RawMessage("null"), args) @@ -243,9 +266,10 @@ func TestMultipleWorkers(t *testing.T) { // This call to Dequeue() should not block on the one in the goroutine. id := pushTestJob(t, q, "clownfish", nil, nil) - r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) + r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) require.NoError(t, err) require.Equal(t, id, r) + require.NotEmpty(t, tok) require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) @@ -278,9 +302,10 @@ func TestCancel(t *testing.T) { // Cancel a running job, which should not dequeue the canceled job from above id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) + r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) require.NoError(t, err) require.Equal(t, id, r) + require.NotEmpty(t, tok) require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) @@ -296,9 +321,10 @@ func TestCancel(t *testing.T) { // Cancel a finished job, which is a no-op id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}) + r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}) require.NoError(t, err) require.Equal(t, id, r) + require.NotEmpty(t, tok) require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 6330930b6..7bc217034 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -41,7 +41,7 @@ type JobQueue interface { // // Returns the job's id, dependencies, type, and arguments, or an error. Arguments // can be unmarshaled to the type given in Enqueue(). - Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) + Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) // Mark the job with `id` as finished. `result` must fit the associated // job type and must be serializable to JSON. @@ -62,6 +62,9 @@ type JobQueue interface { // Job returns all the parameters that define a job (everything provided during Enqueue). Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) + + // Find job by token, this will return an error if the job hasn't been dequeued + IdFromToken(token uuid.UUID) (id uuid.UUID, err error) } var ( diff --git a/internal/kojiapi/server_test.go b/internal/kojiapi/server_test.go index 1ef97a46f..dd6d01d7b 100644 --- a/internal/kojiapi/server_test.go +++ b/internal/kojiapi/server_test.go @@ -235,7 +235,7 @@ func TestCompose(t *testing.T) { wg.Add(1) go func(t *testing.T, result worker.KojiInitJobResult) { - token, _, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-init"}) + _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-init"}) require.NoError(t, err) require.Equal(t, "koji-init", jobType) @@ -287,7 +287,7 @@ func TestCompose(t *testing.T) { c.composeReplyCode, c.composeReply, "id") wg.Wait() - token, _, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -302,7 +302,7 @@ func TestCompose(t *testing.T) { require.NoError(t, err) test.TestRoute(t, workerHandler, false, "PATCH", fmt.Sprintf("/api/worker/v1/jobs/%v", token), string(buildJobResult), http.StatusOK, `{}`) - token, _, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -324,7 +324,7 @@ func TestCompose(t *testing.T) { } }`, test_distro.TestArchName, test_distro.TestDistroName), http.StatusOK, `{}`) - token, finalizeID, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-finalize"}) + finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-finalize"}) require.NoError(t, err) require.Equal(t, "koji-finalize", jobType) diff --git a/internal/worker/server.go b/internal/worker/server.go index 43f8a6a00..d817af22a 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -13,7 +13,6 @@ import ( "os" "path" "strings" - "sync" "time" "github.com/google/uuid" @@ -28,18 +27,6 @@ type Server struct { logger *log.Logger artifactsDir string identityFilter []string - - // Currently running jobs. Workers are not handed job ids, but - // independent tokens which serve as an indirection. This enables - // race-free uploading of artifacts and makes restarting composer more - // robust (workers from an old run cannot report results for jobs - // composer thinks are not running). - // This map maps these tokens to job ids. Artifacts are stored in - // `$STATE_DIRECTORY/artifacts/tmp/$TOKEN` while the worker is running, - // and renamed to `$STATE_DIRECTORY/artifacts/$JOB_ID` once the job is - // reported as done. - running map[uuid.UUID]uuid.UUID - runningMutex sync.Mutex } type JobStatus struct { @@ -49,7 +36,8 @@ type JobStatus struct { Canceled bool } -var ErrTokenNotExist = errors.New("worker token does not exist") +var ErrInvalidToken = errors.New("token does not exist") +var ErrJobNotRunning = errors.New("job isn't running") func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, identityFilter []string) *Server { @@ -58,7 +46,6 @@ func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, logger: logger, artifactsDir: artifactsDir, identityFilter: identityFilter, - running: make(map[uuid.UUID]uuid.UUID), } } @@ -236,8 +223,6 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { } func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { - token := uuid.New() - // treat osbuild jobs specially until we have found a generic way to // specify dequeuing restrictions. For now, we only have one // restriction: arch for osbuild jobs. @@ -249,7 +234,7 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) jts = append(jts, t) } - jobId, depIDs, jobType, args, err := s.jobs.Dequeue(ctx, jts) + jobId, token, depIDs, jobType, args, err := s.jobs.Dequeue(ctx, jts) if err != nil { return uuid.Nil, uuid.Nil, "", nil, nil, err } @@ -267,47 +252,34 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) } } - s.runningMutex.Lock() - defer s.runningMutex.Unlock() - s.running[token] = jobId - if jobType == "osbuild:"+arch { jobType = "osbuild" } else if jobType == "osbuild-koji:"+arch { jobType = "osbuild-koji" } - return token, jobId, jobType, args, dynamicArgs, nil -} - -func (s *Server) RunningJob(token uuid.UUID) (uuid.UUID, error) { - s.runningMutex.Lock() - defer s.runningMutex.Unlock() - - jobId, ok := s.running[token] - if !ok { - return uuid.Nil, ErrTokenNotExist - } - - return jobId, nil + return jobId, token, jobType, args, dynamicArgs, nil } func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error { - s.runningMutex.Lock() - defer s.runningMutex.Unlock() - - jobId, ok := s.running[token] - if !ok { - return ErrTokenNotExist + jobId, err := s.jobs.IdFromToken(token) + if err != nil { + switch err { + case jobqueue.ErrNotExist: + return ErrInvalidToken + default: + return err + } } - // Always delete the running job, even if there are errors finishing - // the job, because callers won't call this a second time on error. - delete(s.running, token) - - err := s.jobs.FinishJob(jobId, result) + err = s.jobs.FinishJob(jobId, result) if err != nil { - return fmt.Errorf("error finishing job: %v", err) + switch err { + case jobqueue.ErrNotRunning: + return ErrJobNotRunning + default: + return fmt.Errorf("error finishing job: %v", err) + } } // Move artifacts from the temporary location to the final job @@ -343,7 +315,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { return err } - token, jobId, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types) + jobId, token, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types) if err != nil { return err } @@ -369,11 +341,11 @@ func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error { return echo.NewHTTPError(http.StatusBadRequest, "cannot parse job token") } - jobId, err := h.server.RunningJob(token) + jobId, err := h.server.jobs.IdFromToken(token) if err != nil { switch err { - case ErrTokenNotExist: - return echo.NewHTTPError(http.StatusNotFound, "not found") + case jobqueue.ErrNotExist: + return ErrInvalidToken default: return err } @@ -408,7 +380,9 @@ func (h *apiHandlers) UpdateJob(ctx echo.Context, idstr string) error { err = h.server.FinishJob(token, body.Result) if err != nil { switch err { - case ErrTokenNotExist: + case ErrInvalidToken: + fallthrough + case ErrJobNotRunning: return echo.NewHTTPError(http.StatusNotFound, "not found") default: return err diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index a7609362c..76d620e0c 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -126,7 +126,7 @@ func TestCancel(t *testing.T) { jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) require.NoError(t, err) - token, j, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) require.NoError(t, err) require.Equal(t, jobId, j) require.Equal(t, "osbuild", typ) @@ -167,7 +167,7 @@ func TestUpdate(t *testing.T) { jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) require.NoError(t, err) - token, j, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) require.NoError(t, err) require.Equal(t, jobId, j) require.Equal(t, "osbuild", typ) @@ -236,7 +236,7 @@ func TestUpload(t *testing.T) { jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) require.NoError(t, err) - token, j, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) require.NoError(t, err) require.Equal(t, jobID, j) require.Equal(t, "osbuild", typ)