osbuild-service-maintenance: Delete results from select jobs

Instead of deleting records, delete the results from the manifest and
depsolve jobs. This redacts sensitive data which the manifest can
contain, and this conserves space.
This commit is contained in:
Sanne Raymaekers 2022-04-13 12:25:36 +02:00 committed by Ondřej Budai
parent eeb2238b12
commit 9b119fa4cf
3 changed files with 44 additions and 179 deletions

View file

@ -4,6 +4,7 @@ package main
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"testing" "testing"
"time" "time"
@ -62,7 +63,7 @@ func TestJobQueueInterface(t *testing.T) {
} }
t.Run("maintenance-query-jobs-before", wrap(testJobsUptoByType)) t.Run("maintenance-query-jobs-before", wrap(testJobsUptoByType))
t.Run("maintenance-delete-job-and-dependencies", wrap(testDeleteJobAndDependencies)) t.Run("maintenance-delete-job-results", wrap(testDeleteJobResult))
} }
func setFinishedAt(t *testing.T, q *dbjobqueue.DBJobQueue, id uuid.UUID, finished time.Time) { func setFinishedAt(t *testing.T, q *dbjobqueue.DBJobQueue, id uuid.UUID, finished time.Time) {
@ -109,112 +110,37 @@ func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) {
require.ElementsMatch(t, []uuid.UUID{id80, id85}, ids["octopus"]) require.ElementsMatch(t, []uuid.UUID{id80, id85}, ids["octopus"])
} }
func testDeleteJobAndDependencies(t *testing.T, q *dbjobqueue.DBJobQueue) { func testDeleteJobResult(t *testing.T, q *dbjobqueue.DBJobQueue) {
// id1 -> id2 -> id3 id, err := q.Enqueue("octopus", nil, nil, "")
id1, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id1) require.NotEqual(t, uuid.Nil, id)
id2, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id2)
id3, err := q.Enqueue("octopus", nil, []uuid.UUID{id2}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id3)
c1, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, c1)
c2, err := q.Enqueue("octopus", nil, []uuid.UUID{c1}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, c2)
c3, err := q.Enqueue("octopus", nil, []uuid.UUID{c2}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, c3)
controls := []uuid.UUID{c1, c2, c3}
_, _, _, _, err = q.Job(c1)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, q.DeleteJobIncludingDependencies(id3)) type Result struct {
for _, id := range []uuid.UUID{id1, id2, id3} { Result string `json:"result"`
_, _, _, _, err = q.Job(id) }
require.ErrorIs(t, err, jobqueue.ErrNotExist) result := Result{
"deleteme",
} }
// controls should still exist res, err := json.Marshal(result)
for _, c := range controls {
_, _, _, _, err = q.Job(c)
require.NoError(t, err)
}
// id1 -> id2 -> id4 && id3 -> id4
id1, err = q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id1) err = q.FinishJob(id, res)
id2, err = q.Enqueue("octopus", nil, []uuid.UUID{id1}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id2)
id3, err = q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id3)
id4, err := q.Enqueue("octopus", nil, []uuid.UUID{id2, id3}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id4)
require.NoError(t, q.DeleteJobIncludingDependencies(id4))
for _, id := range []uuid.UUID{id1, id2, id3, id4} {
_, _, _, _, err = q.Job(id)
require.ErrorIs(t, err, jobqueue.ErrNotExist)
}
// controls should still exist
for _, c := range controls {
_, _, _, _, err = q.Job(c)
require.NoError(t, err)
}
// id1 has 2 dependants, and the maintenance queries currently do not account for this
// situation as it does not occur in the service. This should be changed once we allow
// multiple build job per depsolve job, and the depsolve job should only be removed once all
// the build jobs have been dealt with.
id1, err = q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id1)
id2a, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id2a)
id2b, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id2b)
id3, err = q.Enqueue("octopus", nil, []uuid.UUID{id2a}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id3)
require.NoError(t, q.DeleteJobIncludingDependencies(id3))
for _, id := range []uuid.UUID{id1, id2a, id3} {
_, _, _, _, err = q.Job(id)
require.ErrorIs(t, err, jobqueue.ErrNotExist)
}
// id2b still exists
_, _, _, _, err = q.Job(id2b)
require.NoError(t, err) require.NoError(t, err)
// id2b can still be deleted with it's dependencies missing _,r,_,_,_,_,_,err := q.JobStatus(id)
require.NoError(t, q.DeleteJobIncludingDependencies(id2b)) require.NoError(t, err)
_, _, _, _, err = q.Job(id2b)
require.ErrorIs(t, err, jobqueue.ErrNotExist)
// controls should still exist var r1 Result
for _, c := range controls { require.NoError(t, json.Unmarshal(r, &r1))
_, _, _, _, err = q.Job(c) require.Equal(t, result, r1)
require.NoError(t, err)
}
require.NoError(t, q.DeleteJobIncludingDependencies(uuid.Nil)) rows, err := q.DeleteJobResult([]uuid.UUID{id})
// controls should still exist require.NoError(t, err)
for _, c := range controls { require.Equal(t, int64(1), rows)
_, _, _, _, err = q.Job(c)
require.NoError(t, err) _,r,_,_,_,_,_,err = q.JobStatus(id)
} require.NoError(t, err)
require.Nil(t, r)
} }

View file

@ -10,37 +10,31 @@ import (
) )
func DBCleanup(dbURL string, dryRun bool, cutoff time.Time) error { func DBCleanup(dbURL string, dryRun bool, cutoff time.Time) error {
archs := []string{"x86_64"}
jobType := "osbuild"
jobs, err := dbjobqueue.New(dbURL) jobs, err := dbjobqueue.New(dbURL)
if err != nil { if err != nil {
return err return err
} }
var jobTypes []string // The results of these jobs take up the most space and can contain sensitive data. Delete
for _, a := range archs { // them after a while.
jobTypes = append(jobTypes, fmt.Sprintf("%s:%s", jobType, a)) jobsByType, err := jobs.JobsUptoByType([]string{"manifest-id-only", "depsolve"}, cutoff)
}
jobsByType, err := jobs.JobsUptoByType(jobTypes, cutoff)
if err != nil { if err != nil {
return fmt.Errorf("Error querying jobs: %v", err) return fmt.Errorf("Error querying jobs: %v", err)
} }
for k, v := range jobsByType { for k, v := range jobsByType {
logrus.Infof("Deleting jobs and their dependencies of type %v", k) logrus.Infof("Deleting results from %d %s jobs", len(v), k)
if dryRun { if dryRun {
logrus.Infof("Dry run, skipping deletion of jobs: %v", v) logrus.Info("Dry run, skipping deletion of jobs")
continue continue
} }
rows, err := jobs.DeleteJobResult(v)
for _, jobId := range v { if err != nil {
err = jobs.DeleteJobIncludingDependencies(jobId) logrus.Errorf("Error deleting results for jobs: %v, %d rows affected", rows, err)
if err != nil { continue
return fmt.Errorf("Error deleting job: %v", jobId)
}
} }
logrus.Infof("Deleted results from %d jobs out of %d job ids", rows, len(v))
} }
return nil return nil
} }

View file

@ -106,21 +106,9 @@ const (
FROM jobs FROM jobs
WHERE type = ANY($1) AND finished_at < $2 WHERE type = ANY($1) AND finished_at < $2
GROUP BY type` GROUP BY type`
sqlQueryDepedenciesRecursively = ` sqlDeleteJobResult = `
WITH RECURSIVE dependencies(d) AS ( UPDATE jobs
SELECT dependency_id SET result = NULL
FROM job_dependencies
WHERE job_id = $1
UNION ALL
SELECT dependency_id
FROM dependencies, job_dependencies
WHERE job_dependencies.job_id = d )
SELECT * FROM dependencies`
sqlDeleteJobDependencies = `
DELETE FROM job_dependencies
WHERE dependency_id = ANY($1)`
sqlDeleteJobs = `
DELETE FROM jobs
WHERE id = ANY($1)` WHERE id = ANY($1)`
) )
@ -657,59 +645,16 @@ func (q *DBJobQueue) JobsUptoByType(jobTypes []string, upto time.Time) (result m
return return
} }
// Deletes single job and dependencies (recursively) func (q *DBJobQueue) DeleteJobResult(jobIds []uuid.UUID) (int64, error) {
func (q *DBJobQueue) DeleteJobIncludingDependencies(jobId uuid.UUID) error {
conn, err := q.pool.Acquire(context.Background()) conn, err := q.pool.Acquire(context.Background())
if err != nil { if err != nil {
return -1, fmt.Errorf("error connecting to database: %v", err)
return fmt.Errorf("error connecting to database: %v", err)
} }
defer conn.Release() defer conn.Release()
tx, err := conn.Begin(context.Background()) tag, err := conn.Exec(context.Background(), sqlDeleteJobResult, jobIds)
if err != nil { if err != nil {
return fmt.Errorf("error starting database transaction: %v", err) return tag.RowsAffected(), fmt.Errorf("Error deleting results from jobs: %v", err)
} }
defer func() { return tag.RowsAffected(), nil
err := tx.Rollback(context.Background())
if err != nil && !errors.As(err, &pgx.ErrTxClosed) {
logrus.Error("error rolling back enqueue transaction: ", err)
}
}()
rows, err := tx.Query(context.Background(), sqlQueryDepedenciesRecursively, jobId)
if err != nil {
return fmt.Errorf("error querying the job's dependencies: %v", err)
}
var dependencies []uuid.UUID
for rows.Next() {
var dep uuid.UUID
err = rows.Scan(&dep)
if err != nil {
return err
}
dependencies = append(dependencies, dep)
}
depTag, err := tx.Exec(context.Background(), sqlDeleteJobDependencies, dependencies)
if err != nil {
return fmt.Errorf("Error removing from dependencies recursively for job %v: %v", jobId, err)
}
jobAndDependencies := append(dependencies, jobId)
jobsTag, err := tx.Exec(context.Background(), sqlDeleteJobs, jobAndDependencies)
if err != nil {
return fmt.Errorf("Error removing from jobs recursively for job %v: %v", jobId, err)
}
err = tx.Commit(context.Background())
if err != nil {
return fmt.Errorf("unable to commit database transaction: %v", err)
}
logrus.Infof("Removed %d rows from dependencies for job %v", depTag.RowsAffected(), jobId)
logrus.Infof("Removed %d rows from jobs for job %v, this includes dependencies", jobsTag.RowsAffected(), jobId)
return nil
} }