From d9bd19404d9512a4afe4710b51b744d66832f7bd Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Fri, 10 Jun 2022 15:10:17 +0200 Subject: [PATCH] osbuild-service-maintenance: Move maintenance queries out of jobqueue --- Makefile | 1 + .../main_test.go | 97 ------------ cmd/osbuild-service-maintenance/db.go | 142 +++++++++++++++++- cmd/osbuild-service-maintenance/db_test.go | 118 +++++++++++++++ internal/jobqueue/dbjobqueue/dbjobqueue.go | 131 ---------------- osbuild-composer.spec | 2 + 6 files changed, 256 insertions(+), 235 deletions(-) create mode 100644 cmd/osbuild-service-maintenance/db_test.go diff --git a/Makefile b/Makefile index 5a22e1f37..f371da4cd 100644 --- a/Makefile +++ b/Makefile @@ -126,6 +126,7 @@ build: go test -c -tags=integration -o bin/osbuild-auth-tests ./cmd/osbuild-auth-tests/ go test -c -tags=integration -o bin/osbuild-koji-tests ./cmd/osbuild-koji-tests/ go test -c -tags=integration -o bin/osbuild-composer-dbjobqueue-tests ./cmd/osbuild-composer-dbjobqueue-tests/ + go test -c -tags=integration -o bin/osbuild-composer-maintenance-tests ./cmd/osbuild-service-maintenance/ go test -c -tags=integration -o bin/osbuild-composer-manifest-tests ./cmd/osbuild-composer-manifest-tests/ .PHONY: install diff --git a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go index 5537f45e4..39cfcf02f 100644 --- a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go +++ b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go @@ -4,14 +4,10 @@ package main import ( "context" - "encoding/json" "fmt" "testing" - "time" - "github.com/google/uuid" "github.com/jackc/pgx/v4" - "github.com/stretchr/testify/require" "github.com/osbuild/osbuild-composer/internal/jobqueue" "github.com/osbuild/osbuild-composer/internal/jobqueue/dbjobqueue" @@ -50,97 +46,4 @@ func TestJobQueueInterface(t *testing.T) { } jobqueuetest.TestJobQueue(t, makeJobQueue) - - wrap := func(f func(t *testing.T, q *dbjobqueue.DBJobQueue)) func(*testing.T) { - q, stop, err := makeJobQueue() - require.NoError(t, err) - return func(t *testing.T) { - defer stop() // use defer because f() might call testing.T.FailNow() - dbq, ok := q.(*dbjobqueue.DBJobQueue) - require.True(t, ok) - f(t, dbq) - } - } - - t.Run("maintenance-query-jobs-before", wrap(testJobsUptoByType)) - t.Run("maintenance-delete-job-results", wrap(testDeleteJobResult)) -} - -func setFinishedAt(t *testing.T, q *dbjobqueue.DBJobQueue, id uuid.UUID, finished time.Time) { - conn, err := pgx.Connect(context.Background(), url) - require.NoError(t, err) - defer conn.Close(context.Background()) - - started := finished.Add(-time.Second) - queued := started.Add(-time.Second) - - _, err = conn.Exec(context.Background(), "UPDATE jobs SET queued_at = $1, started_at = $2, finished_at = $3, result = '{\"result\": \"success\" }' WHERE id = $4", queued, started, finished, id) - require.NoError(t, err) -} - -func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) { - date80 := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC) - date85 := time.Date(1985, time.January, 1, 0, 0, 0, 0, time.UTC) - date90 := time.Date(1990, time.January, 1, 0, 0, 0, 0, time.UTC) - - id80, err := q.Enqueue("octopus", nil, nil, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id80) - _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) - require.NoError(t, err) - err = q.FinishJob(id80, nil) - require.NoError(t, err) - setFinishedAt(t, q, id80, date80) - - id85, err := q.Enqueue("octopus", nil, nil, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id85) - _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) - require.NoError(t, err) - err = q.FinishJob(id85, nil) - require.NoError(t, err) - setFinishedAt(t, q, id85, date85) - - ids, err := q.JobsUptoByType([]string{"octopus"}, date85) - require.NoError(t, err) - require.ElementsMatch(t, []uuid.UUID{id80}, ids["octopus"]) - - ids, err = q.JobsUptoByType([]string{"octopus"}, date90) - require.NoError(t, err) - require.ElementsMatch(t, []uuid.UUID{id80, id85}, ids["octopus"]) -} - -func testDeleteJobResult(t *testing.T, q *dbjobqueue.DBJobQueue) { - id, err := q.Enqueue("octopus", nil, nil, "") - require.NoError(t, err) - require.NotEqual(t, uuid.Nil, id) - _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) - require.NoError(t, err) - - type Result struct { - Result string `json:"result"` - } - result := Result{ - "deleteme", - } - - res, err := json.Marshal(result) - require.NoError(t, err) - err = q.FinishJob(id, res) - require.NoError(t, err) - - _, _, r, _, _, _, _, _, err := q.JobStatus(id) - require.NoError(t, err) - - var r1 Result - require.NoError(t, json.Unmarshal(r, &r1)) - require.Equal(t, result, r1) - - rows, err := q.DeleteJobResult([]uuid.UUID{id}) - require.NoError(t, err) - require.Equal(t, int64(1), rows) - - _, _, r, _, _, _, _, _, err = q.JobStatus(id) - require.NoError(t, err) - require.Nil(t, r) } diff --git a/cmd/osbuild-service-maintenance/db.go b/cmd/osbuild-service-maintenance/db.go index 5f8f8ebbc..eb6adf543 100644 --- a/cmd/osbuild-service-maintenance/db.go +++ b/cmd/osbuild-service-maintenance/db.go @@ -1,29 +1,157 @@ package main import ( + "context" "fmt" "time" + "github.com/google/uuid" + "github.com/jackc/pgx/v4" "github.com/sirupsen/logrus" - "github.com/osbuild/osbuild-composer/internal/jobqueue/dbjobqueue" "github.com/osbuild/osbuild-composer/internal/worker" ) +const ( + // Maintenance queries + sqlQueryJobsUptoByType = ` + SELECT array_agg(id), type + FROM jobs + WHERE type = ANY($1) AND finished_at < $2 AND result IS NOT NULL + GROUP BY type` + sqlDeleteJobResult = ` + UPDATE jobs + SET result = NULL + WHERE id = ANY($1)` + sqlVacuumAnalyze = ` + VACUUM ANALYZE` + sqlVacuumStats = ` + SELECT relname, pg_size_pretty(pg_total_relation_size(relid)), + n_tup_ins, n_tup_upd, n_tup_del, n_live_tup, n_dead_tup, + vacuum_count, autovacuum_count, analyze_count, autoanalyze_count, + last_vacuum, last_autovacuum, last_analyze, last_autoanalyze + FROM pg_stat_user_tables` +) + +type db struct { + Conn *pgx.Conn +} + +func newDB(dbURL string) (db, error) { + conn, err := pgx.Connect(context.Background(), dbURL) + if err != nil { + return db{}, err + } + + return db{ + conn, + }, nil +} + +func (d *db) Close() { + d.Conn.Close(context.Background()) +} + +// return map id -> jobtype ? +func (d *db) JobsUptoByType(jobTypes []string, upto time.Time) (result map[string][]uuid.UUID, err error) { + result = make(map[string][]uuid.UUID) + + rows, err := d.Conn.Query(context.Background(), sqlQueryJobsUptoByType, jobTypes, upto) + if err != nil { + return + } + defer rows.Close() + + for rows.Next() { + var ids []uuid.UUID + var jt string + err = rows.Scan(&ids, &jt) + if err != nil { + return + } + + result[jt] = ids + } + err = rows.Err() + return +} + +func (d *db) DeleteJobResult(jobIds []uuid.UUID) (int64, error) { + tag, err := d.Conn.Exec(context.Background(), sqlDeleteJobResult, jobIds) + if err != nil { + return tag.RowsAffected(), fmt.Errorf("Error deleting results from jobs: %v", err) + } + return tag.RowsAffected(), nil +} + +func (d *db) VacuumAnalyze() error { + _, err := d.Conn.Exec(context.Background(), sqlVacuumAnalyze) + if err != nil { + return fmt.Errorf("Error running VACUUM ANALYZE: %v", err) + } + return nil +} + +func (d *db) LogVacuumStats() error { + rows, err := d.Conn.Query(context.Background(), sqlVacuumStats) + if err != nil { + return fmt.Errorf("Error querying vacuum stats: %v", err) + } + defer rows.Close() + + for rows.Next() { + var relName, relSize string + var ins, upd, del, live, dead, vc, avc, ac, aac int64 + var lvc, lavc, lan, laan *time.Time + + err = rows.Scan(&relName, &relSize, &ins, &upd, &del, &live, &dead, + &vc, &avc, &ac, &aac, + &lvc, &lavc, &lan, &laan) + if err != nil { + return err + } + + logrus.Infof("Stats for table %s", relName) + logrus.Infof(" Total table size: %s", relSize) + logrus.Info(" Tuples:") + logrus.Infof(" Inserted: %d", ins) + logrus.Infof(" Updated: %d", upd) + logrus.Infof(" Deleted: %d", del) + logrus.Infof(" Live: %d", live) + logrus.Infof(" Dead: %d", dead) + logrus.Info(" Vacuum stats:") + logrus.Infof(" Vacuum count: %d", vc) + logrus.Infof(" AutoVacuum count: %d", avc) + logrus.Infof(" Last vacuum: %v", lvc) + logrus.Infof(" Last autovacuum: %v", lavc) + logrus.Info(" Analyze stats:") + logrus.Infof(" Analyze count: %d", ac) + logrus.Infof(" AutoAnalyze count: %d", aac) + logrus.Infof(" Last analyze: %v", lan) + logrus.Infof(" Last autoanalyze: %v", laan) + logrus.Info("---") + } + if rows.Err() != nil { + return rows.Err() + } + return nil + +} + func DBCleanup(dbURL string, dryRun bool, cutoff time.Time) error { - jobs, err := dbjobqueue.New(dbURL) + db, err := newDB(dbURL) if err != nil { return err } // The results of these jobs take up the most space and can contain sensitive data. Delete // them after a while. - jobsByType, err := jobs.JobsUptoByType([]string{worker.JobTypeManifestIDOnly, worker.JobTypeDepsolve}, cutoff) + jobsByType, err := db.JobsUptoByType([]string{worker.JobTypeManifestIDOnly, worker.JobTypeDepsolve}, cutoff) if err != nil { return fmt.Errorf("Error querying jobs: %v", err) } - err = jobs.LogVacuumStats() + err = db.LogVacuumStats() if err != nil { logrus.Errorf("Error running vacuum stats: %v", err) } @@ -42,20 +170,20 @@ func DBCleanup(dbURL string, dryRun bool, cutoff time.Time) error { max = len(v) } - rows, err := jobs.DeleteJobResult(v[i:max]) + rows, err := db.DeleteJobResult(v[i:max]) if err != nil { logrus.Errorf("Error deleting results for jobs: %v, %d rows affected", rows, err) continue } logrus.Infof("Deleted results from %d jobs out of %d job ids", rows, len(v)) - err = jobs.VacuumAnalyze() + err = db.VacuumAnalyze() if err != nil { logrus.Errorf("Error running vacuum analyze: %v", err) } } } - err = jobs.LogVacuumStats() + err = db.LogVacuumStats() if err != nil { logrus.Errorf("Error running vacuum stats: %v", err) } diff --git a/cmd/osbuild-service-maintenance/db_test.go b/cmd/osbuild-service-maintenance/db_test.go new file mode 100644 index 000000000..85831625e --- /dev/null +++ b/cmd/osbuild-service-maintenance/db_test.go @@ -0,0 +1,118 @@ +// +build integration + +package main + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/google/uuid" + "github.com/osbuild/osbuild-composer/internal/jobqueue/dbjobqueue" + "github.com/stretchr/testify/require" +) + +const url = "postgres://postgres:foobar@localhost:5432/osbuildcomposer" + +func TestDBJobQueueMaintenance(t *testing.T) { + dbMaintenance, err := newDB(url) + require.NoError(t, err) + defer dbMaintenance.Close() + q, err := dbjobqueue.New(url) + require.NoError(t, err) + defer q.Close() + + _, err = dbMaintenance.Conn.Exec(context.Background(), "DELETE FROM jobs") + require.NoError(t, err) + + t.Run("testJobsUptoByType", func(t *testing.T) { + testJobsUptoByType(t, dbMaintenance, q) + }) + t.Run("testDeleteJobResult", func(t *testing.T) { + testDeleteJobResult(t, dbMaintenance, q) + }) + t.Run("testVacuum", func(t *testing.T) { + testVacuum(t, dbMaintenance, q) + }) + +} + +func setFinishedAt(t *testing.T, d db, id uuid.UUID, finished time.Time) { + started := finished.Add(-time.Second) + queued := started.Add(-time.Second) + _, err := d.Conn.Exec(context.Background(), "UPDATE jobs SET queued_at = $1, started_at = $2, finished_at = $3, result = '{\"result\": \"success\" }' WHERE id = $4", queued, started, finished, id) + require.NoError(t, err) +} + +func testJobsUptoByType(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { + date80 := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC) + date85 := time.Date(1985, time.January, 1, 0, 0, 0, 0, time.UTC) + date90 := time.Date(1990, time.January, 1, 0, 0, 0, 0, time.UTC) + + id80, err := q.Enqueue("octopus", nil, nil, "") + require.NoError(t, err) + require.NotEqual(t, uuid.Nil, id80) + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) + require.NoError(t, err) + err = q.FinishJob(id80, nil) + require.NoError(t, err) + setFinishedAt(t, d, id80, date80) + + id85, err := q.Enqueue("octopus", nil, nil, "") + require.NoError(t, err) + require.NotEqual(t, uuid.Nil, id85) + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) + require.NoError(t, err) + err = q.FinishJob(id85, nil) + require.NoError(t, err) + setFinishedAt(t, d, id85, date85) + + ids, err := d.JobsUptoByType([]string{"octopus"}, date85) + require.NoError(t, err) + require.ElementsMatch(t, []uuid.UUID{id80}, ids["octopus"]) + + ids, err = d.JobsUptoByType([]string{"octopus"}, date90) + require.NoError(t, err) + require.ElementsMatch(t, []uuid.UUID{id80, id85}, ids["octopus"]) +} + +func testDeleteJobResult(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { + id, err := q.Enqueue("octopus", nil, nil, "") + require.NoError(t, err) + require.NotEqual(t, uuid.Nil, id) + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) + require.NoError(t, err) + + type Result struct { + Result string `json:"result"` + } + result := Result{ + "deleteme", + } + + res, err := json.Marshal(result) + require.NoError(t, err) + err = q.FinishJob(id, res) + require.NoError(t, err) + + _, _, r, _, _, _, _, _, err := q.JobStatus(id) + require.NoError(t, err) + + var r1 Result + require.NoError(t, json.Unmarshal(r, &r1)) + require.Equal(t, result, r1) + + rows, err := d.DeleteJobResult([]uuid.UUID{id}) + require.NoError(t, err) + require.Equal(t, int64(1), rows) + + _, _, r, _, _, _, _, _, err = q.JobStatus(id) + require.NoError(t, err) + require.Nil(t, r) +} + +func testVacuum(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { + require.NoError(t, d.VacuumAnalyze()) + require.NoError(t, d.LogVacuumStats()) +} diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index e3059dc25..106614ee0 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -99,25 +99,6 @@ const ( sqlDeleteHeartbeat = ` DELETE FROM heartbeats WHERE id = $1` - - // Maintenance queries - sqlQueryJobsUptoByType = ` - SELECT array_agg(id), type - FROM jobs - WHERE type = ANY($1) AND finished_at < $2 AND result IS NOT NULL - GROUP BY type` - sqlDeleteJobResult = ` - UPDATE jobs - SET result = NULL - WHERE id = ANY($1)` - sqlVacuumAnalyze = ` - VACUUM ANALYZE` - sqlVacuumStats = ` - SELECT relname, pg_size_pretty(pg_total_relation_size(relid)), - n_tup_ins, n_tup_upd, n_tup_del, n_live_tup, n_dead_tup, - vacuum_count, autovacuum_count, analyze_count, autoanalyze_count, - last_vacuum, last_autovacuum, last_analyze, last_autoanalyze - FROM pg_stat_user_tables` ) type DBJobQueue struct { @@ -621,115 +602,3 @@ func (q *DBJobQueue) jobDependencies(ctx context.Context, conn *pgxpool.Conn, id return dependencies, nil } - -// return map id -> jobtype ? -func (q *DBJobQueue) JobsUptoByType(jobTypes []string, upto time.Time) (result map[string][]uuid.UUID, err error) { - result = make(map[string][]uuid.UUID) - - conn, err := q.pool.Acquire(context.Background()) - if err != nil { - err = fmt.Errorf("error connecting to database: %v", err) - return - } - defer conn.Release() - - rows, err := conn.Query(context.Background(), sqlQueryJobsUptoByType, jobTypes, upto) - if err != nil { - return - } - defer rows.Close() - - for rows.Next() { - var ids []uuid.UUID - var jt string - err = rows.Scan(&ids, &jt) - if err != nil { - return - } - - result[jt] = ids - } - err = rows.Err() - return -} - -func (q *DBJobQueue) DeleteJobResult(jobIds []uuid.UUID) (int64, error) { - conn, err := q.pool.Acquire(context.Background()) - if err != nil { - return -1, fmt.Errorf("error connecting to database: %v", err) - } - defer conn.Release() - - tag, err := conn.Exec(context.Background(), sqlDeleteJobResult, jobIds) - if err != nil { - return tag.RowsAffected(), fmt.Errorf("Error deleting results from jobs: %v", err) - } - return tag.RowsAffected(), nil -} - -func (q *DBJobQueue) VacuumAnalyze() error { - conn, err := q.pool.Acquire(context.Background()) - if err != nil { - return fmt.Errorf("error connecting to database: %v", err) - } - defer conn.Release() - - _, err = conn.Exec(context.Background(), sqlVacuumAnalyze) - if err != nil { - return fmt.Errorf("Error running VACUUM ANALYZE: %v", err) - } - - return nil -} - -func (q *DBJobQueue) LogVacuumStats() error { - conn, err := q.pool.Acquire(context.Background()) - if err != nil { - return fmt.Errorf("error connecting to database: %v", err) - } - defer conn.Release() - - rows, err := conn.Query(context.Background(), sqlVacuumStats) - if err != nil { - return fmt.Errorf("Error querying vacuum stats: %v", err) - } - defer rows.Close() - - for rows.Next() { - var relName, relSize string - var ins, upd, del, live, dead, vc, avc, ac, aac int64 - var lvc, lavc, lan, laan *time.Time - - err = rows.Scan(&relName, &relSize, &ins, &upd, &del, &live, &dead, - &vc, &avc, &ac, &aac, - &lvc, &lavc, &lan, &laan) - if err != nil { - return err - } - - logrus.Infof("Stats for table %s", relName) - logrus.Infof(" Total table size: %s", relSize) - logrus.Info(" Tuples:") - logrus.Infof(" Inserted: %d", ins) - logrus.Infof(" Updated: %d", upd) - logrus.Infof(" Deleted: %d", del) - logrus.Infof(" Live: %d", live) - logrus.Infof(" Dead: %d", dead) - logrus.Info(" Vacuum stats:") - logrus.Infof(" Vacuum count: %d", vc) - logrus.Infof(" AutoVacuum count: %d", avc) - logrus.Infof(" Last vacuum: %v", lvc) - logrus.Infof(" Last autovacuum: %v", lavc) - logrus.Info(" Analyze stats:") - logrus.Infof(" Analyze count: %d", ac) - logrus.Infof(" AutoAnalyze count: %d", aac) - logrus.Infof(" Last analyze: %v", lan) - logrus.Infof(" Last autoanalyze: %v", laan) - logrus.Info("---") - } - if rows.Err() != nil { - return rows.Err() - } - return nil - -} diff --git a/osbuild-composer.spec b/osbuild-composer.spec index 1d1aeb331..cee037835 100644 --- a/osbuild-composer.spec +++ b/osbuild-composer.spec @@ -129,6 +129,7 @@ go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-auth-tes go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-koji-tests %{goipath}/cmd/osbuild-koji-tests go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-composer-dbjobqueue-tests %{goipath}/cmd/osbuild-composer-dbjobqueue-tests go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-composer-manifest-tests %{goipath}/cmd/osbuild-composer-manifest-tests +go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-service-maintenance-tests %{goipath}/cmd/osbuild-service-maintenance go build -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-mock-openid-provider %{goipath}/cmd/osbuild-mock-openid-provider %endif @@ -194,6 +195,7 @@ install -m 0755 -vp _bin/osbuild-auth-tests %{buildroot}% install -m 0755 -vp _bin/osbuild-koji-tests %{buildroot}%{_libexecdir}/osbuild-composer-test/ install -m 0755 -vp _bin/osbuild-composer-dbjobqueue-tests %{buildroot}%{_libexecdir}/osbuild-composer-test/ install -m 0755 -vp _bin/osbuild-composer-manifest-tests %{buildroot}%{_libexecdir}/osbuild-composer-test/ +install -m 0755 -vp _bin/osbuild-service-maintenance-tests %{buildroot}%{_libexecdir}/osbuild-composer-test/ install -m 0755 -vp _bin/osbuild-mock-openid-provider %{buildroot}%{_libexecdir}/osbuild-composer-test/ install -m 0755 -vp tools/define-compose-url.sh %{buildroot}%{_libexecdir}/osbuild-composer-test/ install -m 0755 -vp tools/provision.sh %{buildroot}%{_libexecdir}/osbuild-composer-test/