osbuild-service-maintenance: Move maintenance queries out of jobqueue

This commit is contained in:
Sanne Raymaekers 2022-06-10 15:10:17 +02:00 committed by Achilleas Koutsou
parent 8676d3342d
commit d9bd19404d
6 changed files with 256 additions and 235 deletions

View file

@ -126,6 +126,7 @@ build:
go test -c -tags=integration -o bin/osbuild-auth-tests ./cmd/osbuild-auth-tests/
go test -c -tags=integration -o bin/osbuild-koji-tests ./cmd/osbuild-koji-tests/
go test -c -tags=integration -o bin/osbuild-composer-dbjobqueue-tests ./cmd/osbuild-composer-dbjobqueue-tests/
go test -c -tags=integration -o bin/osbuild-composer-maintenance-tests ./cmd/osbuild-service-maintenance/
go test -c -tags=integration -o bin/osbuild-composer-manifest-tests ./cmd/osbuild-composer-manifest-tests/
.PHONY: install

View file

@ -4,14 +4,10 @@ package main
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/jobqueue/dbjobqueue"
@ -50,97 +46,4 @@ func TestJobQueueInterface(t *testing.T) {
}
jobqueuetest.TestJobQueue(t, makeJobQueue)
wrap := func(f func(t *testing.T, q *dbjobqueue.DBJobQueue)) func(*testing.T) {
q, stop, err := makeJobQueue()
require.NoError(t, err)
return func(t *testing.T) {
defer stop() // use defer because f() might call testing.T.FailNow()
dbq, ok := q.(*dbjobqueue.DBJobQueue)
require.True(t, ok)
f(t, dbq)
}
}
t.Run("maintenance-query-jobs-before", wrap(testJobsUptoByType))
t.Run("maintenance-delete-job-results", wrap(testDeleteJobResult))
}
func setFinishedAt(t *testing.T, q *dbjobqueue.DBJobQueue, id uuid.UUID, finished time.Time) {
conn, err := pgx.Connect(context.Background(), url)
require.NoError(t, err)
defer conn.Close(context.Background())
started := finished.Add(-time.Second)
queued := started.Add(-time.Second)
_, err = conn.Exec(context.Background(), "UPDATE jobs SET queued_at = $1, started_at = $2, finished_at = $3, result = '{\"result\": \"success\" }' WHERE id = $4", queued, started, finished, id)
require.NoError(t, err)
}
func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) {
date80 := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC)
date85 := time.Date(1985, time.January, 1, 0, 0, 0, 0, time.UTC)
date90 := time.Date(1990, time.January, 1, 0, 0, 0, 0, time.UTC)
id80, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id80)
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
err = q.FinishJob(id80, nil)
require.NoError(t, err)
setFinishedAt(t, q, id80, date80)
id85, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id85)
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
err = q.FinishJob(id85, nil)
require.NoError(t, err)
setFinishedAt(t, q, id85, date85)
ids, err := q.JobsUptoByType([]string{"octopus"}, date85)
require.NoError(t, err)
require.ElementsMatch(t, []uuid.UUID{id80}, ids["octopus"])
ids, err = q.JobsUptoByType([]string{"octopus"}, date90)
require.NoError(t, err)
require.ElementsMatch(t, []uuid.UUID{id80, id85}, ids["octopus"])
}
func testDeleteJobResult(t *testing.T, q *dbjobqueue.DBJobQueue) {
id, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id)
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
type Result struct {
Result string `json:"result"`
}
result := Result{
"deleteme",
}
res, err := json.Marshal(result)
require.NoError(t, err)
err = q.FinishJob(id, res)
require.NoError(t, err)
_, _, r, _, _, _, _, _, err := q.JobStatus(id)
require.NoError(t, err)
var r1 Result
require.NoError(t, json.Unmarshal(r, &r1))
require.Equal(t, result, r1)
rows, err := q.DeleteJobResult([]uuid.UUID{id})
require.NoError(t, err)
require.Equal(t, int64(1), rows)
_, _, r, _, _, _, _, _, err = q.JobStatus(id)
require.NoError(t, err)
require.Nil(t, r)
}

View file

@ -1,29 +1,157 @@
package main
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
"github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/internal/jobqueue/dbjobqueue"
"github.com/osbuild/osbuild-composer/internal/worker"
)
const (
// Maintenance queries
sqlQueryJobsUptoByType = `
SELECT array_agg(id), type
FROM jobs
WHERE type = ANY($1) AND finished_at < $2 AND result IS NOT NULL
GROUP BY type`
sqlDeleteJobResult = `
UPDATE jobs
SET result = NULL
WHERE id = ANY($1)`
sqlVacuumAnalyze = `
VACUUM ANALYZE`
sqlVacuumStats = `
SELECT relname, pg_size_pretty(pg_total_relation_size(relid)),
n_tup_ins, n_tup_upd, n_tup_del, n_live_tup, n_dead_tup,
vacuum_count, autovacuum_count, analyze_count, autoanalyze_count,
last_vacuum, last_autovacuum, last_analyze, last_autoanalyze
FROM pg_stat_user_tables`
)
type db struct {
Conn *pgx.Conn
}
func newDB(dbURL string) (db, error) {
conn, err := pgx.Connect(context.Background(), dbURL)
if err != nil {
return db{}, err
}
return db{
conn,
}, nil
}
func (d *db) Close() {
d.Conn.Close(context.Background())
}
// return map id -> jobtype ?
func (d *db) JobsUptoByType(jobTypes []string, upto time.Time) (result map[string][]uuid.UUID, err error) {
result = make(map[string][]uuid.UUID)
rows, err := d.Conn.Query(context.Background(), sqlQueryJobsUptoByType, jobTypes, upto)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var ids []uuid.UUID
var jt string
err = rows.Scan(&ids, &jt)
if err != nil {
return
}
result[jt] = ids
}
err = rows.Err()
return
}
func (d *db) DeleteJobResult(jobIds []uuid.UUID) (int64, error) {
tag, err := d.Conn.Exec(context.Background(), sqlDeleteJobResult, jobIds)
if err != nil {
return tag.RowsAffected(), fmt.Errorf("Error deleting results from jobs: %v", err)
}
return tag.RowsAffected(), nil
}
func (d *db) VacuumAnalyze() error {
_, err := d.Conn.Exec(context.Background(), sqlVacuumAnalyze)
if err != nil {
return fmt.Errorf("Error running VACUUM ANALYZE: %v", err)
}
return nil
}
func (d *db) LogVacuumStats() error {
rows, err := d.Conn.Query(context.Background(), sqlVacuumStats)
if err != nil {
return fmt.Errorf("Error querying vacuum stats: %v", err)
}
defer rows.Close()
for rows.Next() {
var relName, relSize string
var ins, upd, del, live, dead, vc, avc, ac, aac int64
var lvc, lavc, lan, laan *time.Time
err = rows.Scan(&relName, &relSize, &ins, &upd, &del, &live, &dead,
&vc, &avc, &ac, &aac,
&lvc, &lavc, &lan, &laan)
if err != nil {
return err
}
logrus.Infof("Stats for table %s", relName)
logrus.Infof(" Total table size: %s", relSize)
logrus.Info(" Tuples:")
logrus.Infof(" Inserted: %d", ins)
logrus.Infof(" Updated: %d", upd)
logrus.Infof(" Deleted: %d", del)
logrus.Infof(" Live: %d", live)
logrus.Infof(" Dead: %d", dead)
logrus.Info(" Vacuum stats:")
logrus.Infof(" Vacuum count: %d", vc)
logrus.Infof(" AutoVacuum count: %d", avc)
logrus.Infof(" Last vacuum: %v", lvc)
logrus.Infof(" Last autovacuum: %v", lavc)
logrus.Info(" Analyze stats:")
logrus.Infof(" Analyze count: %d", ac)
logrus.Infof(" AutoAnalyze count: %d", aac)
logrus.Infof(" Last analyze: %v", lan)
logrus.Infof(" Last autoanalyze: %v", laan)
logrus.Info("---")
}
if rows.Err() != nil {
return rows.Err()
}
return nil
}
func DBCleanup(dbURL string, dryRun bool, cutoff time.Time) error {
jobs, err := dbjobqueue.New(dbURL)
db, err := newDB(dbURL)
if err != nil {
return err
}
// The results of these jobs take up the most space and can contain sensitive data. Delete
// them after a while.
jobsByType, err := jobs.JobsUptoByType([]string{worker.JobTypeManifestIDOnly, worker.JobTypeDepsolve}, cutoff)
jobsByType, err := db.JobsUptoByType([]string{worker.JobTypeManifestIDOnly, worker.JobTypeDepsolve}, cutoff)
if err != nil {
return fmt.Errorf("Error querying jobs: %v", err)
}
err = jobs.LogVacuumStats()
err = db.LogVacuumStats()
if err != nil {
logrus.Errorf("Error running vacuum stats: %v", err)
}
@ -42,20 +170,20 @@ func DBCleanup(dbURL string, dryRun bool, cutoff time.Time) error {
max = len(v)
}
rows, err := jobs.DeleteJobResult(v[i:max])
rows, err := db.DeleteJobResult(v[i:max])
if err != nil {
logrus.Errorf("Error deleting results for jobs: %v, %d rows affected", rows, err)
continue
}
logrus.Infof("Deleted results from %d jobs out of %d job ids", rows, len(v))
err = jobs.VacuumAnalyze()
err = db.VacuumAnalyze()
if err != nil {
logrus.Errorf("Error running vacuum analyze: %v", err)
}
}
}
err = jobs.LogVacuumStats()
err = db.LogVacuumStats()
if err != nil {
logrus.Errorf("Error running vacuum stats: %v", err)
}

View file

@ -0,0 +1,118 @@
// +build integration
package main
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/jobqueue/dbjobqueue"
"github.com/stretchr/testify/require"
)
const url = "postgres://postgres:foobar@localhost:5432/osbuildcomposer"
func TestDBJobQueueMaintenance(t *testing.T) {
dbMaintenance, err := newDB(url)
require.NoError(t, err)
defer dbMaintenance.Close()
q, err := dbjobqueue.New(url)
require.NoError(t, err)
defer q.Close()
_, err = dbMaintenance.Conn.Exec(context.Background(), "DELETE FROM jobs")
require.NoError(t, err)
t.Run("testJobsUptoByType", func(t *testing.T) {
testJobsUptoByType(t, dbMaintenance, q)
})
t.Run("testDeleteJobResult", func(t *testing.T) {
testDeleteJobResult(t, dbMaintenance, q)
})
t.Run("testVacuum", func(t *testing.T) {
testVacuum(t, dbMaintenance, q)
})
}
func setFinishedAt(t *testing.T, d db, id uuid.UUID, finished time.Time) {
started := finished.Add(-time.Second)
queued := started.Add(-time.Second)
_, err := d.Conn.Exec(context.Background(), "UPDATE jobs SET queued_at = $1, started_at = $2, finished_at = $3, result = '{\"result\": \"success\" }' WHERE id = $4", queued, started, finished, id)
require.NoError(t, err)
}
func testJobsUptoByType(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
date80 := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC)
date85 := time.Date(1985, time.January, 1, 0, 0, 0, 0, time.UTC)
date90 := time.Date(1990, time.January, 1, 0, 0, 0, 0, time.UTC)
id80, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id80)
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
err = q.FinishJob(id80, nil)
require.NoError(t, err)
setFinishedAt(t, d, id80, date80)
id85, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id85)
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
err = q.FinishJob(id85, nil)
require.NoError(t, err)
setFinishedAt(t, d, id85, date85)
ids, err := d.JobsUptoByType([]string{"octopus"}, date85)
require.NoError(t, err)
require.ElementsMatch(t, []uuid.UUID{id80}, ids["octopus"])
ids, err = d.JobsUptoByType([]string{"octopus"}, date90)
require.NoError(t, err)
require.ElementsMatch(t, []uuid.UUID{id80, id85}, ids["octopus"])
}
func testDeleteJobResult(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
id, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id)
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
type Result struct {
Result string `json:"result"`
}
result := Result{
"deleteme",
}
res, err := json.Marshal(result)
require.NoError(t, err)
err = q.FinishJob(id, res)
require.NoError(t, err)
_, _, r, _, _, _, _, _, err := q.JobStatus(id)
require.NoError(t, err)
var r1 Result
require.NoError(t, json.Unmarshal(r, &r1))
require.Equal(t, result, r1)
rows, err := d.DeleteJobResult([]uuid.UUID{id})
require.NoError(t, err)
require.Equal(t, int64(1), rows)
_, _, r, _, _, _, _, _, err = q.JobStatus(id)
require.NoError(t, err)
require.Nil(t, r)
}
func testVacuum(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
require.NoError(t, d.VacuumAnalyze())
require.NoError(t, d.LogVacuumStats())
}

View file

@ -99,25 +99,6 @@ const (
sqlDeleteHeartbeat = `
DELETE FROM heartbeats
WHERE id = $1`
// Maintenance queries
sqlQueryJobsUptoByType = `
SELECT array_agg(id), type
FROM jobs
WHERE type = ANY($1) AND finished_at < $2 AND result IS NOT NULL
GROUP BY type`
sqlDeleteJobResult = `
UPDATE jobs
SET result = NULL
WHERE id = ANY($1)`
sqlVacuumAnalyze = `
VACUUM ANALYZE`
sqlVacuumStats = `
SELECT relname, pg_size_pretty(pg_total_relation_size(relid)),
n_tup_ins, n_tup_upd, n_tup_del, n_live_tup, n_dead_tup,
vacuum_count, autovacuum_count, analyze_count, autoanalyze_count,
last_vacuum, last_autovacuum, last_analyze, last_autoanalyze
FROM pg_stat_user_tables`
)
type DBJobQueue struct {
@ -621,115 +602,3 @@ func (q *DBJobQueue) jobDependencies(ctx context.Context, conn *pgxpool.Conn, id
return dependencies, nil
}
// return map id -> jobtype ?
func (q *DBJobQueue) JobsUptoByType(jobTypes []string, upto time.Time) (result map[string][]uuid.UUID, err error) {
result = make(map[string][]uuid.UUID)
conn, err := q.pool.Acquire(context.Background())
if err != nil {
err = fmt.Errorf("error connecting to database: %v", err)
return
}
defer conn.Release()
rows, err := conn.Query(context.Background(), sqlQueryJobsUptoByType, jobTypes, upto)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var ids []uuid.UUID
var jt string
err = rows.Scan(&ids, &jt)
if err != nil {
return
}
result[jt] = ids
}
err = rows.Err()
return
}
func (q *DBJobQueue) DeleteJobResult(jobIds []uuid.UUID) (int64, error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return -1, fmt.Errorf("error connecting to database: %v", err)
}
defer conn.Release()
tag, err := conn.Exec(context.Background(), sqlDeleteJobResult, jobIds)
if err != nil {
return tag.RowsAffected(), fmt.Errorf("Error deleting results from jobs: %v", err)
}
return tag.RowsAffected(), nil
}
func (q *DBJobQueue) VacuumAnalyze() error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return fmt.Errorf("error connecting to database: %v", err)
}
defer conn.Release()
_, err = conn.Exec(context.Background(), sqlVacuumAnalyze)
if err != nil {
return fmt.Errorf("Error running VACUUM ANALYZE: %v", err)
}
return nil
}
func (q *DBJobQueue) LogVacuumStats() error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return fmt.Errorf("error connecting to database: %v", err)
}
defer conn.Release()
rows, err := conn.Query(context.Background(), sqlVacuumStats)
if err != nil {
return fmt.Errorf("Error querying vacuum stats: %v", err)
}
defer rows.Close()
for rows.Next() {
var relName, relSize string
var ins, upd, del, live, dead, vc, avc, ac, aac int64
var lvc, lavc, lan, laan *time.Time
err = rows.Scan(&relName, &relSize, &ins, &upd, &del, &live, &dead,
&vc, &avc, &ac, &aac,
&lvc, &lavc, &lan, &laan)
if err != nil {
return err
}
logrus.Infof("Stats for table %s", relName)
logrus.Infof(" Total table size: %s", relSize)
logrus.Info(" Tuples:")
logrus.Infof(" Inserted: %d", ins)
logrus.Infof(" Updated: %d", upd)
logrus.Infof(" Deleted: %d", del)
logrus.Infof(" Live: %d", live)
logrus.Infof(" Dead: %d", dead)
logrus.Info(" Vacuum stats:")
logrus.Infof(" Vacuum count: %d", vc)
logrus.Infof(" AutoVacuum count: %d", avc)
logrus.Infof(" Last vacuum: %v", lvc)
logrus.Infof(" Last autovacuum: %v", lavc)
logrus.Info(" Analyze stats:")
logrus.Infof(" Analyze count: %d", ac)
logrus.Infof(" AutoAnalyze count: %d", aac)
logrus.Infof(" Last analyze: %v", lan)
logrus.Infof(" Last autoanalyze: %v", laan)
logrus.Info("---")
}
if rows.Err() != nil {
return rows.Err()
}
return nil
}

View file

@ -129,6 +129,7 @@ go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-auth-tes
go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-koji-tests %{goipath}/cmd/osbuild-koji-tests
go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-composer-dbjobqueue-tests %{goipath}/cmd/osbuild-composer-dbjobqueue-tests
go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-composer-manifest-tests %{goipath}/cmd/osbuild-composer-manifest-tests
go test -c -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-service-maintenance-tests %{goipath}/cmd/osbuild-service-maintenance
go build -tags=integration -ldflags="${TEST_LDFLAGS}" -o _bin/osbuild-mock-openid-provider %{goipath}/cmd/osbuild-mock-openid-provider
%endif
@ -194,6 +195,7 @@ install -m 0755 -vp _bin/osbuild-auth-tests %{buildroot}%
install -m 0755 -vp _bin/osbuild-koji-tests %{buildroot}%{_libexecdir}/osbuild-composer-test/
install -m 0755 -vp _bin/osbuild-composer-dbjobqueue-tests %{buildroot}%{_libexecdir}/osbuild-composer-test/
install -m 0755 -vp _bin/osbuild-composer-manifest-tests %{buildroot}%{_libexecdir}/osbuild-composer-test/
install -m 0755 -vp _bin/osbuild-service-maintenance-tests %{buildroot}%{_libexecdir}/osbuild-composer-test/
install -m 0755 -vp _bin/osbuild-mock-openid-provider %{buildroot}%{_libexecdir}/osbuild-composer-test/
install -m 0755 -vp tools/define-compose-url.sh %{buildroot}%{_libexecdir}/osbuild-composer-test/
install -m 0755 -vp tools/provision.sh %{buildroot}%{_libexecdir}/osbuild-composer-test/