jobqueue: store an expiry date

This introduces an expiry date (default: 14 days from insert date) and
adjust the service-maintenance script to delete jobs that are older than
the expiration date.
This commit is contained in:
Simon de Vlieger 2022-07-08 16:55:27 +02:00 committed by Sanne Raymaekers
parent 37fc807bfa
commit 78ae275c61
4 changed files with 59 additions and 113 deletions

View file

@ -5,24 +5,20 @@ import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
"github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/internal/worker"
)
const (
// Maintenance queries
sqlQueryJobsUptoByType = `
SELECT array_agg(id), type
FROM jobs
WHERE type = ANY($1) AND finished_at < $2 AND result IS NOT NULL
GROUP BY type`
sqlDeleteJobResult = `
UPDATE jobs
SET result = NULL
WHERE id = ANY($1)`
sqlDeleteJob = `
DELETE FROM jobs
WHERE id IN (
SELECT id FROM jobs
WHERE expires_at < NOW()
ORDER BY expires_at
LIMIT 1000
)`
sqlVacuumAnalyze = `
VACUUM ANALYZE`
sqlVacuumStats = `
@ -52,32 +48,8 @@ func (d *db) Close() {
d.Conn.Close(context.Background())
}
// return map id -> jobtype ?
func (d *db) JobsUptoByType(jobTypes []string, upto time.Time) (result map[string][]uuid.UUID, err error) {
result = make(map[string][]uuid.UUID)
rows, err := d.Conn.Query(context.Background(), sqlQueryJobsUptoByType, jobTypes, upto)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var ids []uuid.UUID
var jt string
err = rows.Scan(&ids, &jt)
if err != nil {
return
}
result[jt] = ids
}
err = rows.Err()
return
}
func (d *db) DeleteJobResult(jobIds []uuid.UUID) (int64, error) {
tag, err := d.Conn.Exec(context.Background(), sqlDeleteJobResult, jobIds)
func (d *db) DeleteJob() (int64, error) {
tag, err := d.Conn.Exec(context.Background(), sqlDeleteJob)
if err != nil {
return tag.RowsAffected(), fmt.Errorf("Error deleting results from jobs: %v", err)
}
@ -144,43 +116,31 @@ func DBCleanup(dbURL string, dryRun bool, cutoff time.Time) error {
return err
}
// The results of these jobs take up the most space and can contain sensitive data. Delete
// them after a while.
jobsByType, err := db.JobsUptoByType([]string{worker.JobTypeManifestIDOnly, worker.JobTypeDepsolve}, cutoff)
if err != nil {
return fmt.Errorf("Error querying jobs: %v", err)
}
err = db.LogVacuumStats()
if err != nil {
logrus.Errorf("Error running vacuum stats: %v", err)
}
for k, v := range jobsByType {
logrus.Infof("Deleting results from %d %s jobs", len(v), k)
if dryRun {
logrus.Info("Dry run, skipping deletion of jobs")
continue
var rows int64
for {
rows, err = db.DeleteJob()
if err != nil {
logrus.Errorf("Error deleting results for jobs: %v, %d rows affected", rows, err)
return err
}
// Delete results in chunks to avoid starving the rds instance
for i := 0; i < len(v); i += 100 {
max := i + 100
if max > len(v) {
max = len(v)
}
rows, err := db.DeleteJobResult(v[i:max])
if err != nil {
logrus.Errorf("Error deleting results for jobs: %v, %d rows affected", rows, err)
continue
}
logrus.Infof("Deleted results from %d jobs out of %d job ids", rows, len(v))
err = db.VacuumAnalyze()
if err != nil {
logrus.Errorf("Error running vacuum analyze: %v", err)
}
if rows == 0 {
break
}
logrus.Infof("Deleted results for %d", rows)
}
err = db.VacuumAnalyze()
if err != nil {
logrus.Errorf("Error running vacuum analyze: %v", err)
}
err = db.LogVacuumStats()

View file

@ -26,11 +26,8 @@ func TestDBJobQueueMaintenance(t *testing.T) {
_, err = dbMaintenance.Conn.Exec(context.Background(), "DELETE FROM jobs")
require.NoError(t, err)
t.Run("testJobsUptoByType", func(t *testing.T) {
testJobsUptoByType(t, dbMaintenance, q)
})
t.Run("testDeleteJobResult", func(t *testing.T) {
testDeleteJobResult(t, dbMaintenance, q)
t.Run("testDeleteJob", func(t *testing.T) {
testDeleteJob(t, dbMaintenance, q)
})
t.Run("testVacuum", func(t *testing.T) {
testVacuum(t, dbMaintenance, q)
@ -38,46 +35,13 @@ func TestDBJobQueueMaintenance(t *testing.T) {
}
func setFinishedAt(t *testing.T, d db, id uuid.UUID, finished time.Time) {
started := finished.Add(-time.Second)
queued := started.Add(-time.Second)
_, err := d.Conn.Exec(context.Background(), "UPDATE jobs SET queued_at = $1, started_at = $2, finished_at = $3, result = '{\"result\": \"success\" }' WHERE id = $4", queued, started, finished, id)
func setExpired(t *testing.T, d db, id uuid.UUID) {
expires := time.Now().Add(-time.Second)
_, err := d.Conn.Exec(context.Background(), "UPDATE jobs SET expires_at = $1 WHERE id = $2", expires, id)
require.NoError(t, err)
}
func testJobsUptoByType(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
date80 := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC)
date85 := time.Date(1985, time.January, 1, 0, 0, 0, 0, time.UTC)
date90 := time.Date(1990, time.January, 1, 0, 0, 0, 0, time.UTC)
id80, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id80)
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
err = q.FinishJob(id80, nil)
require.NoError(t, err)
setFinishedAt(t, d, id80, date80)
id85, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id85)
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
err = q.FinishJob(id85, nil)
require.NoError(t, err)
setFinishedAt(t, d, id85, date85)
ids, err := d.JobsUptoByType([]string{"octopus"}, date85)
require.NoError(t, err)
require.ElementsMatch(t, []uuid.UUID{id80}, ids["octopus"])
ids, err = d.JobsUptoByType([]string{"octopus"}, date90)
require.NoError(t, err)
require.ElementsMatch(t, []uuid.UUID{id80, id85}, ids["octopus"])
}
func testDeleteJobResult(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
id, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id)
@ -103,13 +67,18 @@ func testDeleteJobResult(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
require.NoError(t, json.Unmarshal(r, &r1))
require.Equal(t, result, r1)
rows, err := d.DeleteJobResult([]uuid.UUID{id})
rows, err := d.DeleteJob()
require.NoError(t, err)
require.Equal(t, int64(0), rows)
setExpired(t, d, id)
rows, err = d.DeleteJob()
require.NoError(t, err)
require.Equal(t, int64(1), rows)
_, _, r, _, _, _, _, _, err = q.JobStatus(id)
require.NoError(t, err)
require.Nil(t, r)
_, _, _, _, _, _, _, _, err = q.JobStatus(id)
require.Error(t, err)
}
func testVacuum(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {

View file

@ -72,6 +72,7 @@ type job struct {
QueuedAt time.Time `json:"queued_at,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
Canceled bool `json:"canceled,omitempty"`
}

View file

@ -0,0 +1,16 @@
-- add the expires_at column
ALTER TABLE jobs
ADD COLUMN expires_at timestamp DEFAULT NOW() + interval '14 days';
-- We added a column, thus we have to recreate the view.
CREATE OR REPLACE VIEW ready_jobs AS
SELECT *
FROM jobs
WHERE started_at IS NULL
AND canceled = FALSE
AND id NOT IN (
SELECT job_id
FROM job_dependencies JOIN jobs ON dependency_id = id
WHERE finished_at IS NULL
)
ORDER BY queued_at ASC