From d7686244cdcdec3dbc3009e5c7eb31bf91e5bbfe Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Wed, 23 Jul 2025 18:05:16 +0200 Subject: [PATCH] cmd/osbuild-composer-dbjobqueue-tests: add migration test Migration 9 alters the result column in the jobs table is relied on for compose statuses. Because it has to be kept consistent across migrations, add a test to verify this. As a side effect, the test itself handles the migration now, so remove that part from the tests GHA. --- .github/workflows/tests.yml | 1 - .../main_test.go | 116 ++++++++++++++++-- .../jobqueue/jobqueuetest/jobqueuetest.go | 52 ++++---- 3 files changed, 131 insertions(+), 38 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index f2af6fa74..9bf247e43 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -80,7 +80,6 @@ jobs: PGPORT: 5432 run: | ./tools/dbtest-prepare-env.sh - ./tools/dbtest-run-migrations.sh - run: ./tools/dbtest-entrypoint.sh python-lint: diff --git a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go index 24e8deffc..7034a3983 100644 --- a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go +++ b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go @@ -4,33 +4,74 @@ package main import ( "context" + "encoding/json" "fmt" + "os" + "os/exec" + "strings" "testing" + "github.com/google/uuid" "github.com/jackc/pgx/v4" + "github.com/stretchr/testify/require" + "github.com/osbuild/osbuild-composer/pkg/jobqueue" "github.com/osbuild/osbuild-composer/pkg/jobqueue/dbjobqueue" "github.com/osbuild/osbuild-composer/internal/jobqueue/jobqueuetest" ) +func migrate(migration string) error { + // migrate + gopath, err := exec.Command("go", "env", "GOPATH").Output() + if err != nil { + return err + } + tern := fmt.Sprintf("%s/bin/tern", strings.TrimSpace(string(gopath))) + wd, err := os.Getwd() + if err != nil { + return err + } + + if out, err := exec.Command( + tern, + "migrate", + "--conn-string", + jobqueuetest.TestDbURL(), + "-m", + fmt.Sprintf("%s/../../pkg/jobqueue/dbjobqueue/schemas", wd), + "-d", + migration, + ).CombinedOutput(); err != nil { + fmt.Println("tern output:", string(out)) + return err + } + return nil +} + func TestJobQueueInterface(t *testing.T) { - makeJobQueue := func() (jobqueue.JobQueue, func(), error) { - // clear db before each run - conn, err := pgx.Connect(context.Background(), jobqueuetest.TestDbURL()) + makeJobQueue := func(migration string, clean bool) (jobqueue.JobQueue, func(), error) { + err := migrate(migration) if err != nil { return nil, nil, err } - defer conn.Close(context.Background()) - for _, table := range []string{"job_dependencies", "heartbeats", "jobs"} { - _, err = conn.Exec(context.Background(), fmt.Sprintf("DELETE FROM %s", table)) + + if clean { + conn, err := pgx.Connect(context.Background(), jobqueuetest.TestDbURL()) + if err != nil { + return nil, nil, err + } + defer conn.Close(context.Background()) + for _, table := range []string{"job_dependencies", "heartbeats", "jobs"} { + _, err = conn.Exec(context.Background(), fmt.Sprintf("DELETE FROM %s", table)) + if err != nil { + return nil, nil, err + } + } + err = conn.Close(context.Background()) if err != nil { return nil, nil, err } - } - err = conn.Close(context.Background()) - if err != nil { - return nil, nil, err } q, err := dbjobqueue.New(jobqueuetest.TestDbURL()) @@ -43,5 +84,58 @@ func TestJobQueueInterface(t *testing.T) { return q, stop, nil } - jobqueuetest.TestJobQueue(t, makeJobQueue) + // run first, as migrations aren't reversible + testMigrationPath(t, makeJobQueue) + + jobqueuetest.TestJobQueue(t, func () (jobqueue.JobQueue, func(), error) { + return makeJobQueue("last", true) + }) +} + +func testMigrationPath(t *testing.T, makeJobQueue func(migration string, clean bool) (jobqueue.JobQueue, func(), error)) { + q, stop, err := makeJobQueue("8", false) + defer stop() + require.NoError(t, err) + + id, err := q.Enqueue("test", "{\"arg\": \"impormtanmt\"}", nil, "") + require.NoError(t, err) + require.NotEmpty(t, id) + id, tok, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"test"}, []string{""}) + require.NoError(t, err) + require.NotEmpty(t, id) + require.NotEmpty(t, tok) + + // make sure entering escaped nullbytes fails in 8 + _, err = q.RequeueOrFinishJob(id, 0, &jobqueuetest.TestResult{Logs: []byte("{\"blegh\\u0000\": \"\\u0000reallyimportant stuff!\"}")}) + require.Error(t, err) + + tr := jobqueuetest.TestResult{Logs: []byte("{\"blegh\": \"really important stuff!\"}")} + _, err = q.RequeueOrFinishJob(id, 0, &tr) + require.NoError(t, err) + + require.NoError(t, migrate("last")) + + // reopen the connection to the db, as the old connection will still treat the result column as jsonb + stop() + db_q, err := dbjobqueue.New(jobqueuetest.TestDbURL()) + require.NoError(t, err) + defer db_q.Close() + require.NoError(t, err) + + // make sure entering escaped nullbytes works in last + id2, err := db_q.Enqueue("test", "{\"arg\": \"impormtanmt\"}", nil, "") + require.NoError(t, err) + require.NotEmpty(t, id2) + id2, tok2, _, _, _, err := db_q.Dequeue(context.Background(), uuid.Nil, []string{"test"}, []string{""}) + require.NoError(t, err) + require.NotEmpty(t, id) + require.NotEmpty(t, tok2) + _, err = db_q.RequeueOrFinishJob(id2, 0, &jobqueuetest.TestResult{Logs: []byte("{\"blegh\\u0000\": \"\\u0000\"}")}) + require.NoError(t, err) + + _, _, result, _, _, _, _, _, _, err := db_q.JobStatus(id) + require.NoError(t, err) + var tr2 jobqueuetest.TestResult + require.NoError(t, json.Unmarshal(result, &tr2)) + require.Equal(t, tr, tr2) } diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index 32b935100..f5bb6abcf 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -24,7 +24,7 @@ import ( type MakeJobQueue func() (q jobqueue.JobQueue, stop func(), err error) -type testResult struct { +type TestResult struct { Logs json.RawMessage `json:"logs,omitempty"` } @@ -113,7 +113,7 @@ func testErrors(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, id, idFromT) require.NoError(t, err) - requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{}) + requeued, err := q.RequeueOrFinishJob(id, 0, &TestResult{}) require.NoError(t, err) require.False(t, requeued) @@ -200,8 +200,8 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { one := pushTestJob(t, q, "octopus", nil, nil, "") two := pushTestJob(t, q, "clownfish", nil, nil, "") - require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}, nil)) - require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil)) + require.Equal(t, two, finishNextTestJob(t, q, "clownfish", TestResult{}, nil)) + require.Equal(t, one, finishNextTestJob(t, q, "octopus", TestResult{}, nil)) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -232,8 +232,8 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { two := pushTestJob(t, q, "test", nil, nil, "") r := []uuid.UUID{} - r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) - r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) + r = append(r, finishNextTestJob(t, q, "test", TestResult{}, nil)) + r = append(r, finishNextTestJob(t, q, "test", TestResult{}, nil)) require.ElementsMatch(t, []uuid.UUID{one, two}, r) j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "") @@ -251,7 +251,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.ElementsMatch(t, deps, []uuid.UUID{one, two}) require.Empty(t, dependents) - require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) + require.Equal(t, j, finishNextTestJob(t, q, "test", TestResult{}, []uuid.UUID{one, two})) jobType, _, result, queued, started, finished, canceled, deps, dependents, err := q.JobStatus(j) require.NoError(t, err) @@ -263,7 +263,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.ElementsMatch(t, deps, []uuid.UUID{one, two}) require.Empty(t, dependents) - err = json.Unmarshal(result, &testResult{}) + err = json.Unmarshal(result, &TestResult{}) require.NoError(t, err) }) @@ -282,11 +282,11 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.ElementsMatch(t, deps, []uuid.UUID{one, two}) r := []uuid.UUID{} - r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) - r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) + r = append(r, finishNextTestJob(t, q, "test", TestResult{}, nil)) + r = append(r, finishNextTestJob(t, q, "test", TestResult{}, nil)) require.ElementsMatch(t, []uuid.UUID{one, two}, r) - require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) + require.Equal(t, j, finishNextTestJob(t, q, "test", TestResult{}, []uuid.UUID{one, two})) jobType, _, result, queued, started, finished, canceled, deps, _, err := q.JobStatus(j) require.NoError(t, err) @@ -297,7 +297,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.False(t, canceled) require.ElementsMatch(t, deps, []uuid.UUID{one, two}) - err = json.Unmarshal(result, &testResult{}) + err = json.Unmarshal(result, &TestResult{}) require.NoError(t, err) }) } @@ -389,7 +389,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, jobType, "clownfish") require.True(t, canceled) require.Nil(t, result) - _, err = q.RequeueOrFinishJob(id, 0, &testResult{}) + _, err = q.RequeueOrFinishJob(id, 0, &TestResult{}) require.Error(t, err) // Cancel a running job, which should not dequeue the canceled job from above @@ -409,7 +409,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, jobType, "clownfish") require.True(t, canceled) require.Nil(t, result) - _, err = q.RequeueOrFinishJob(id, 0, &testResult{}) + _, err = q.RequeueOrFinishJob(id, 0, &TestResult{}) require.Error(t, err) // Cancel a finished job, which is a no-op @@ -422,7 +422,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) - requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{}) + requeued, err := q.RequeueOrFinishJob(id, 0, &TestResult{}) require.NoError(t, err) require.False(t, requeued) err = q.CancelJob(id) @@ -432,7 +432,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.Equal(t, jobType, "clownfish") require.False(t, canceled) - err = json.Unmarshal(result, &testResult{}) + err = json.Unmarshal(result, &TestResult{}) require.NoError(t, err) } @@ -471,7 +471,7 @@ func testRequeue(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, jobType, "clownfish") require.False(t, canceled) require.Nil(t, result) - requeued, err = q.RequeueOrFinishJob(id, 0, &testResult{}) + requeued, err = q.RequeueOrFinishJob(id, 0, &TestResult{}) require.NoError(t, err) require.False(t, requeued) @@ -498,7 +498,7 @@ func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) { require.True(t, finished.IsZero()) require.Nil(t, result) // Requeue a second time, this time finishing it - requeued, err = q.RequeueOrFinishJob(id, 1, &testResult{}) + requeued, err = q.RequeueOrFinishJob(id, 1, &TestResult{}) require.NoError(t, err) require.False(t, requeued) _, _, result, _, _, finished, _, _, _, err = q.JobStatus(id) @@ -514,7 +514,7 @@ func testEscapedNullBytes(t *testing.T, q jobqueue.JobQueue) { require.NotEmpty(t, tok) // Ensure postgres accepts escaped null bytes - requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{Logs: []byte("{\"blegh\\u0000\": \"\\u0000\"}")}) + requeued, err := q.RequeueOrFinishJob(id, 0, &TestResult{Logs: []byte("{\"blegh\\u0000\": \"\\u0000\"}")}) require.NoError(t, err) require.False(t, requeued) } @@ -544,7 +544,7 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.Equal(t, id2, id) - requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{}) + requeued, err := q.RequeueOrFinishJob(id, 0, &TestResult{}) require.NoError(t, err) require.False(t, requeued) @@ -571,7 +571,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.False(t, requeued) - require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, nil)) + require.Equal(t, two, finishNextTestJob(t, q, "octopus", TestResult{}, nil)) }) t.Run("cannot dequeue a job without finished deps", func(t *testing.T) { @@ -581,8 +581,8 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { _, _, _, _, err := q.DequeueByID(context.Background(), two, uuid.Nil) require.Equal(t, jobqueue.ErrNotPending, err) - require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil)) - require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, []uuid.UUID{one})) + require.Equal(t, one, finishNextTestJob(t, q, "octopus", TestResult{}, nil)) + require.Equal(t, two, finishNextTestJob(t, q, "octopus", TestResult{}, []uuid.UUID{one})) }) t.Run("cannot dequeue a non-pending job", func(t *testing.T) { @@ -721,7 +721,7 @@ func test100dequeuers(t *testing.T, q jobqueue.JobQueue) { wg.Done() }() - finishNextTestJob(t, q, "octopus", testResult{}, nil) + finishNextTestJob(t, q, "octopus", TestResult{}, nil) }() } @@ -734,7 +734,7 @@ func test100dequeuers(t *testing.T, q jobqueue.JobQueue) { _, _, _, _, _, _, _, _, _, err := q.JobStatus(id) require.NoError(t, err) - finishNextTestJob(t, q, "clownfish", testResult{}, nil) + finishNextTestJob(t, q, "clownfish", TestResult{}, nil) // fulfill the needs of all dequeuers for i := 0; i < count; i++ { @@ -775,7 +775,7 @@ func testWorkers(t *testing.T, q jobqueue.JobQueue) { err = q.UpdateWorkerStatus(uuid.New()) require.Equal(t, err, jobqueue.ErrWorkerNotExist) - requeued, err := q.RequeueOrFinishJob(one, 0, &testResult{}) + requeued, err := q.RequeueOrFinishJob(one, 0, &TestResult{}) require.NoError(t, err) require.False(t, requeued)