diff --git a/cmd/osbuild-service-maintenance/db.go b/cmd/osbuild-service-maintenance/db.go index eb6adf543..79ce79399 100644 --- a/cmd/osbuild-service-maintenance/db.go +++ b/cmd/osbuild-service-maintenance/db.go @@ -5,24 +5,20 @@ import ( "fmt" "time" - "github.com/google/uuid" "github.com/jackc/pgx/v4" "github.com/sirupsen/logrus" - - "github.com/osbuild/osbuild-composer/internal/worker" ) const ( // Maintenance queries - sqlQueryJobsUptoByType = ` - SELECT array_agg(id), type - FROM jobs - WHERE type = ANY($1) AND finished_at < $2 AND result IS NOT NULL - GROUP BY type` - sqlDeleteJobResult = ` - UPDATE jobs - SET result = NULL - WHERE id = ANY($1)` + sqlDeleteJob = ` + DELETE FROM jobs + WHERE id IN ( + SELECT id FROM jobs + WHERE expires_at < NOW() + ORDER BY expires_at + LIMIT 1000 + )` sqlVacuumAnalyze = ` VACUUM ANALYZE` sqlVacuumStats = ` @@ -52,32 +48,8 @@ func (d *db) Close() { d.Conn.Close(context.Background()) } -// return map id -> jobtype ? -func (d *db) JobsUptoByType(jobTypes []string, upto time.Time) (result map[string][]uuid.UUID, err error) { - result = make(map[string][]uuid.UUID) - - rows, err := d.Conn.Query(context.Background(), sqlQueryJobsUptoByType, jobTypes, upto) - if err != nil { - return - } - defer rows.Close() - - for rows.Next() { - var ids []uuid.UUID - var jt string - err = rows.Scan(&ids, &jt) - if err != nil { - return - } - - result[jt] = ids - } - err = rows.Err() - return -} - -func (d *db) DeleteJobResult(jobIds []uuid.UUID) (int64, error) { - tag, err := d.Conn.Exec(context.Background(), sqlDeleteJobResult, jobIds) +func (d *db) DeleteJob() (int64, error) { + tag, err := d.Conn.Exec(context.Background(), sqlDeleteJob) if err != nil { return tag.RowsAffected(), fmt.Errorf("Error deleting results from jobs: %v", err) } @@ -144,43 +116,31 @@ func DBCleanup(dbURL string, dryRun bool, cutoff time.Time) error { return err } - // The results of these jobs take up the most space and can contain sensitive data. Delete - // them after a while. - jobsByType, err := db.JobsUptoByType([]string{worker.JobTypeManifestIDOnly, worker.JobTypeDepsolve}, cutoff) - if err != nil { - return fmt.Errorf("Error querying jobs: %v", err) - } - err = db.LogVacuumStats() if err != nil { logrus.Errorf("Error running vacuum stats: %v", err) } - for k, v := range jobsByType { - logrus.Infof("Deleting results from %d %s jobs", len(v), k) - if dryRun { - logrus.Info("Dry run, skipping deletion of jobs") - continue + var rows int64 + + for { + rows, err = db.DeleteJob() + + if err != nil { + logrus.Errorf("Error deleting results for jobs: %v, %d rows affected", rows, err) + return err } - // Delete results in chunks to avoid starving the rds instance - for i := 0; i < len(v); i += 100 { - max := i + 100 - if max > len(v) { - max = len(v) - } - - rows, err := db.DeleteJobResult(v[i:max]) - if err != nil { - logrus.Errorf("Error deleting results for jobs: %v, %d rows affected", rows, err) - continue - } - logrus.Infof("Deleted results from %d jobs out of %d job ids", rows, len(v)) - err = db.VacuumAnalyze() - if err != nil { - logrus.Errorf("Error running vacuum analyze: %v", err) - } + if rows == 0 { + break } + + logrus.Infof("Deleted results for %d", rows) + } + + err = db.VacuumAnalyze() + if err != nil { + logrus.Errorf("Error running vacuum analyze: %v", err) } err = db.LogVacuumStats() diff --git a/cmd/osbuild-service-maintenance/db_test.go b/cmd/osbuild-service-maintenance/db_test.go index beeda9266..4ca008bfa 100644 --- a/cmd/osbuild-service-maintenance/db_test.go +++ b/cmd/osbuild-service-maintenance/db_test.go @@ -26,11 +26,8 @@ func TestDBJobQueueMaintenance(t *testing.T) { _, err = dbMaintenance.Conn.Exec(context.Background(), "DELETE FROM jobs") require.NoError(t, err) - t.Run("testJobsUptoByType", func(t *testing.T) { - testJobsUptoByType(t, dbMaintenance, q) - }) - t.Run("testDeleteJobResult", func(t *testing.T) { - testDeleteJobResult(t, dbMaintenance, q) + t.Run("testDeleteJob", func(t *testing.T) { + testDeleteJob(t, dbMaintenance, q) }) t.Run("testVacuum", func(t *testing.T) { testVacuum(t, dbMaintenance, q) @@ -38,46 +35,13 @@ func TestDBJobQueueMaintenance(t *testing.T) { } -func setFinishedAt(t *testing.T, d db, id uuid.UUID, finished time.Time) { - started := finished.Add(-time.Second) - queued := started.Add(-time.Second) - _, err := d.Conn.Exec(context.Background(), "UPDATE jobs SET queued_at = $1, started_at = $2, finished_at = $3, result = '{\"result\": \"success\" }' WHERE id = $4", queued, started, finished, id) +func setExpired(t *testing.T, d db, id uuid.UUID) { + expires := time.Now().Add(-time.Second) + _, err := d.Conn.Exec(context.Background(), "UPDATE jobs SET expires_at = $1 WHERE id = $2", expires, id) require.NoError(t, err) } -func testJobsUptoByType(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { - date80 := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC) - date85 := time.Date(1985, time.January, 1, 0, 0, 0, 0, time.UTC) - date90 := time.Date(1990, time.January, 1, 0, 0, 0, 0, time.UTC) - - id80, err := q.Enqueue("octopus", nil, nil, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id80) - _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) - require.NoError(t, err) - err = q.FinishJob(id80, nil) - require.NoError(t, err) - setFinishedAt(t, d, id80, date80) - - id85, err := q.Enqueue("octopus", nil, nil, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id85) - _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) - require.NoError(t, err) - err = q.FinishJob(id85, nil) - require.NoError(t, err) - setFinishedAt(t, d, id85, date85) - - ids, err := d.JobsUptoByType([]string{"octopus"}, date85) - require.NoError(t, err) - require.ElementsMatch(t, []uuid.UUID{id80}, ids["octopus"]) - - ids, err = d.JobsUptoByType([]string{"octopus"}, date90) - require.NoError(t, err) - require.ElementsMatch(t, []uuid.UUID{id80, id85}, ids["octopus"]) -} - -func testDeleteJobResult(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { +func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { id, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id) @@ -103,13 +67,18 @@ func testDeleteJobResult(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { require.NoError(t, json.Unmarshal(r, &r1)) require.Equal(t, result, r1) - rows, err := d.DeleteJobResult([]uuid.UUID{id}) + rows, err := d.DeleteJob() + require.NoError(t, err) + require.Equal(t, int64(0), rows) + + setExpired(t, d, id) + + rows, err = d.DeleteJob() require.NoError(t, err) require.Equal(t, int64(1), rows) - _, _, r, _, _, _, _, _, err = q.JobStatus(id) - require.NoError(t, err) - require.Nil(t, r) + _, _, _, _, _, _, _, _, err = q.JobStatus(id) + require.Error(t, err) } func testVacuum(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 62451fab5..b2c3fa464 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -72,6 +72,7 @@ type job struct { QueuedAt time.Time `json:"queued_at,omitempty"` StartedAt time.Time `json:"started_at,omitempty"` FinishedAt time.Time `json:"finished_at,omitempty"` + ExpiresAt time.Time `json:"expires_at,omitempty"` Canceled bool `json:"canceled,omitempty"` } diff --git a/pkg/jobqueue/dbjobqueue/schemas/005_jobs_expiry.sql b/pkg/jobqueue/dbjobqueue/schemas/005_jobs_expiry.sql new file mode 100644 index 000000000..5e0b35e18 --- /dev/null +++ b/pkg/jobqueue/dbjobqueue/schemas/005_jobs_expiry.sql @@ -0,0 +1,16 @@ +-- add the expires_at column +ALTER TABLE jobs +ADD COLUMN expires_at timestamp DEFAULT NOW() + interval '14 days'; + +-- We added a column, thus we have to recreate the view. +CREATE OR REPLACE VIEW ready_jobs AS +SELECT * +FROM jobs +WHERE started_at IS NULL + AND canceled = FALSE + AND id NOT IN ( + SELECT job_id + FROM job_dependencies JOIN jobs ON dependency_id = id + WHERE finished_at IS NULL +) +ORDER BY queued_at ASC