diff --git a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go index be3f20ed2..5e3623575 100644 --- a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go +++ b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go @@ -4,6 +4,7 @@ package main import ( "context" + "encoding/json" "fmt" "testing" "time" @@ -62,7 +63,7 @@ func TestJobQueueInterface(t *testing.T) { } t.Run("maintenance-query-jobs-before", wrap(testJobsUptoByType)) - t.Run("maintenance-delete-job-and-dependencies", wrap(testDeleteJobAndDependencies)) + t.Run("maintenance-delete-job-results", wrap(testDeleteJobResult)) } func setFinishedAt(t *testing.T, q *dbjobqueue.DBJobQueue, id uuid.UUID, finished time.Time) { @@ -109,112 +110,37 @@ func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) { require.ElementsMatch(t, []uuid.UUID{id80, id85}, ids["octopus"]) } -func testDeleteJobAndDependencies(t *testing.T, q *dbjobqueue.DBJobQueue) { - // id1 -> id2 -> id3 - id1, err := q.Enqueue("octopus", nil, nil, "") +func testDeleteJobResult(t *testing.T, q *dbjobqueue.DBJobQueue) { + id, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id1) - id2, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id2) - id3, err := q.Enqueue("octopus", nil, []uuid.UUID{id2}, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id3) - - c1, err := q.Enqueue("octopus", nil, nil, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, c1) - c2, err := q.Enqueue("octopus", nil, []uuid.UUID{c1}, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, c2) - c3, err := q.Enqueue("octopus", nil, []uuid.UUID{c2}, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, c3) - controls := []uuid.UUID{c1, c2, c3} - - _, _, _, _, err = q.Job(c1) + require.NotEqual(t, uuid.Nil, id) + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) - require.NoError(t, q.DeleteJobIncludingDependencies(id3)) - for _, id := range []uuid.UUID{id1, id2, id3} { - _, _, _, _, err = q.Job(id) - require.ErrorIs(t, err, jobqueue.ErrNotExist) + type Result struct { + Result string `json:"result"` + } + result := Result{ + "deleteme", } - // controls should still exist - for _, c := range controls { - _, _, _, _, err = q.Job(c) - require.NoError(t, err) - } - - // id1 -> id2 -> id4 && id3 -> id4 - id1, err = q.Enqueue("octopus", nil, nil, "") + res, err := json.Marshal(result) require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id1) - id2, err = q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id2) - id3, err = q.Enqueue("octopus", nil, nil, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id3) - id4, err := q.Enqueue("octopus", nil, []uuid.UUID{id2, id3}, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id4) - - require.NoError(t, q.DeleteJobIncludingDependencies(id4)) - for _, id := range []uuid.UUID{id1, id2, id3, id4} { - _, _, _, _, err = q.Job(id) - require.ErrorIs(t, err, jobqueue.ErrNotExist) - } - - // controls should still exist - for _, c := range controls { - _, _, _, _, err = q.Job(c) - require.NoError(t, err) - } - - // id1 has 2 dependants, and the maintenance queries currently do not account for this - // situation as it does not occur in the service. This should be changed once we allow - // multiple build job per depsolve job, and the depsolve job should only be removed once all - // the build jobs have been dealt with. - id1, err = q.Enqueue("octopus", nil, nil, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id1) - id2a, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id2a) - id2b, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id2b) - id3, err = q.Enqueue("octopus", nil, []uuid.UUID{id2a}, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id3) - - require.NoError(t, q.DeleteJobIncludingDependencies(id3)) - for _, id := range []uuid.UUID{id1, id2a, id3} { - _, _, _, _, err = q.Job(id) - require.ErrorIs(t, err, jobqueue.ErrNotExist) - } - - // id2b still exists - _, _, _, _, err = q.Job(id2b) + err = q.FinishJob(id, res) require.NoError(t, err) - // id2b can still be deleted with it's dependencies missing - require.NoError(t, q.DeleteJobIncludingDependencies(id2b)) - _, _, _, _, err = q.Job(id2b) - require.ErrorIs(t, err, jobqueue.ErrNotExist) + _,r,_,_,_,_,_,err := q.JobStatus(id) + require.NoError(t, err) - // controls should still exist - for _, c := range controls { - _, _, _, _, err = q.Job(c) - require.NoError(t, err) - } + var r1 Result + require.NoError(t, json.Unmarshal(r, &r1)) + require.Equal(t, result, r1) - require.NoError(t, q.DeleteJobIncludingDependencies(uuid.Nil)) - // controls should still exist - for _, c := range controls { - _, _, _, _, err = q.Job(c) - require.NoError(t, err) - } + rows, err := q.DeleteJobResult([]uuid.UUID{id}) + require.NoError(t, err) + require.Equal(t, int64(1), rows) + + _,r,_,_,_,_,_,err = q.JobStatus(id) + require.NoError(t, err) + require.Nil(t, r) } diff --git a/cmd/osbuild-service-maintenance/db.go b/cmd/osbuild-service-maintenance/db.go index 75486dfe4..f7867b95a 100644 --- a/cmd/osbuild-service-maintenance/db.go +++ b/cmd/osbuild-service-maintenance/db.go @@ -10,37 +10,31 @@ import ( ) func DBCleanup(dbURL string, dryRun bool, cutoff time.Time) error { - archs := []string{"x86_64"} - jobType := "osbuild" - jobs, err := dbjobqueue.New(dbURL) if err != nil { return err } - var jobTypes []string - for _, a := range archs { - jobTypes = append(jobTypes, fmt.Sprintf("%s:%s", jobType, a)) - } - - jobsByType, err := jobs.JobsUptoByType(jobTypes, cutoff) + // The results of these jobs take up the most space and can contain sensitive data. Delete + // them after a while. + jobsByType, err := jobs.JobsUptoByType([]string{"manifest-id-only", "depsolve"}, cutoff) if err != nil { return fmt.Errorf("Error querying jobs: %v", err) } for k, v := range jobsByType { - logrus.Infof("Deleting jobs and their dependencies of type %v", k) + logrus.Infof("Deleting results from %d %s jobs", len(v), k) if dryRun { - logrus.Infof("Dry run, skipping deletion of jobs: %v", v) + logrus.Info("Dry run, skipping deletion of jobs") continue } - - for _, jobId := range v { - err = jobs.DeleteJobIncludingDependencies(jobId) - if err != nil { - return fmt.Errorf("Error deleting job: %v", jobId) - } + rows, err := jobs.DeleteJobResult(v) + if err != nil { + logrus.Errorf("Error deleting results for jobs: %v, %d rows affected", rows, err) + continue } + logrus.Infof("Deleted results from %d jobs out of %d job ids", rows, len(v)) } + return nil } diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index 6679032bd..1df982b50 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -106,21 +106,9 @@ const ( FROM jobs WHERE type = ANY($1) AND finished_at < $2 GROUP BY type` - sqlQueryDepedenciesRecursively = ` - WITH RECURSIVE dependencies(d) AS ( - SELECT dependency_id - FROM job_dependencies - WHERE job_id = $1 - UNION ALL - SELECT dependency_id - FROM dependencies, job_dependencies - WHERE job_dependencies.job_id = d ) - SELECT * FROM dependencies` - sqlDeleteJobDependencies = ` - DELETE FROM job_dependencies - WHERE dependency_id = ANY($1)` - sqlDeleteJobs = ` - DELETE FROM jobs + sqlDeleteJobResult = ` + UPDATE jobs + SET result = NULL WHERE id = ANY($1)` ) @@ -657,59 +645,16 @@ func (q *DBJobQueue) JobsUptoByType(jobTypes []string, upto time.Time) (result m return } -// Deletes single job and dependencies (recursively) -func (q *DBJobQueue) DeleteJobIncludingDependencies(jobId uuid.UUID) error { +func (q *DBJobQueue) DeleteJobResult(jobIds []uuid.UUID) (int64, error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { - - return fmt.Errorf("error connecting to database: %v", err) + return -1, fmt.Errorf("error connecting to database: %v", err) } defer conn.Release() - tx, err := conn.Begin(context.Background()) + tag, err := conn.Exec(context.Background(), sqlDeleteJobResult, jobIds) if err != nil { - return fmt.Errorf("error starting database transaction: %v", err) + return tag.RowsAffected(), fmt.Errorf("Error deleting results from jobs: %v", err) } - defer func() { - err := tx.Rollback(context.Background()) - if err != nil && !errors.As(err, &pgx.ErrTxClosed) { - logrus.Error("error rolling back enqueue transaction: ", err) - } - }() - - rows, err := tx.Query(context.Background(), sqlQueryDepedenciesRecursively, jobId) - if err != nil { - return fmt.Errorf("error querying the job's dependencies: %v", err) - } - - var dependencies []uuid.UUID - for rows.Next() { - var dep uuid.UUID - err = rows.Scan(&dep) - if err != nil { - return err - } - - dependencies = append(dependencies, dep) - } - - depTag, err := tx.Exec(context.Background(), sqlDeleteJobDependencies, dependencies) - if err != nil { - return fmt.Errorf("Error removing from dependencies recursively for job %v: %v", jobId, err) - } - - jobAndDependencies := append(dependencies, jobId) - jobsTag, err := tx.Exec(context.Background(), sqlDeleteJobs, jobAndDependencies) - if err != nil { - return fmt.Errorf("Error removing from jobs recursively for job %v: %v", jobId, err) - } - - err = tx.Commit(context.Background()) - if err != nil { - return fmt.Errorf("unable to commit database transaction: %v", err) - } - - logrus.Infof("Removed %d rows from dependencies for job %v", depTag.RowsAffected(), jobId) - logrus.Infof("Removed %d rows from jobs for job %v, this includes dependencies", jobsTag.RowsAffected(), jobId) - return nil + return tag.RowsAffected(), nil }