From c4360a67f592b29491e3bd5a4c5b26b706ac0225 Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Fri, 18 Jul 2025 12:58:34 +0200 Subject: [PATCH] jobqueue: handle escaped null bytes in postgres Postgres doesn't accept `\u0000` in the jsonb datatype. Switch to the json datatype which is larger and slower, but accepts escaped null bytes. As we don't actually query or index the result jsonb directly, the impact of this should be minimal. See: https://www.postgresql.org/docs/current/datatype-json.html --- .../jobqueue/jobqueuetest/jobqueuetest.go | 17 +++++- .../schemas/009_jobs_alter_result_type.sql | 53 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 pkg/jobqueue/dbjobqueue/schemas/009_jobs_alter_result_type.sql diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index 42dd3bde8..32b935100 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -25,6 +25,7 @@ import ( type MakeJobQueue func() (q jobqueue.JobQueue, stop func(), err error) type testResult struct { + Logs json.RawMessage `json:"logs,omitempty"` } func TestDbURL() string { @@ -50,6 +51,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("cancel", wrap(testCancel)) t.Run("requeue", wrap(testRequeue)) t.Run("requeue-limit", wrap(testRequeueLimit)) + t.Run("escaped-null-bytes", wrap(testEscapedNullBytes)) t.Run("job-types", wrap(testJobTypes)) t.Run("dependencies", wrap(testDependencies)) t.Run("multiple-workers", wrap(testMultipleWorkers)) @@ -110,7 +112,8 @@ func testErrors(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.Equal(t, id, idFromT) - requeued, err := q.RequeueOrFinishJob(id, 0, nil) + require.NoError(t, err) + requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{}) require.NoError(t, err) require.False(t, requeued) @@ -504,6 +507,18 @@ func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) { require.NotNil(t, result) } +func testEscapedNullBytes(t *testing.T, q jobqueue.JobQueue) { + pushTestJob(t, q, "octopus", nil, nil, "") + id, tok, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{""}) + require.NoError(t, err) + require.NotEmpty(t, tok) + + // Ensure postgres accepts escaped null bytes + requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{Logs: []byte("{\"blegh\\u0000\": \"\\u0000\"}")}) + require.NoError(t, err) + require.False(t, requeued) +} + func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { id := pushTestJob(t, q, "octopus", nil, nil, "") // No heartbeats for queued job diff --git a/pkg/jobqueue/dbjobqueue/schemas/009_jobs_alter_result_type.sql b/pkg/jobqueue/dbjobqueue/schemas/009_jobs_alter_result_type.sql new file mode 100644 index 000000000..9e488aa45 --- /dev/null +++ b/pkg/jobqueue/dbjobqueue/schemas/009_jobs_alter_result_type.sql @@ -0,0 +1,53 @@ +---- tern: disable-tx ---- +-- disable migration transaction, to allow for batched updates + +-- jsonb does not support unicode escaped null bytes, and we don't +-- control much of the output generated by osbuild. +ALTER TABLE jobs ADD COLUMN IF NOT EXISTS new_result json; + +-- backfill outside of the transaction to avoid locking the table +DO $$ +DECLARE + remaining_rows INTEGER := 1; +BEGIN + WHILE remaining_rows > 0 LOOP + -- CTE on jobs + WITH cte_jobs AS ( + SELECT ctid + FROM jobs + WHERE result IS NOT NULL AND new_result IS NULL + FOR UPDATE + LIMIT 500 + ) + -- self-join on ctid + UPDATE jobs set new_result = result::json + FROM cte_jobs + WHERE jobs.ctid = cte_jobs.ctid; + + SELECT COUNT(ctid) + FROM jobs + WHERE result IS NOT NULL AND new_result IS NULL + INTO remaining_rows; + RAISE NOTICE 'REMAINING ROWS: %', remaining_rows; + END LOOP; +END $$; + +-- switching over the columns needs to happen in a transaction +BEGIN; + +DROP VIEW ready_jobs; +ALTER TABLE jobs DROP COLUMN result; +ALTER TABLE jobs RENAME COLUMN new_result TO result; + +CREATE VIEW ready_jobs AS +SELECT * +FROM jobs +WHERE started_at IS NULL + AND canceled = FALSE + AND id NOT IN ( + SELECT job_id + FROM job_dependencies JOIN jobs ON dependency_id = id + WHERE finished_at IS NULL +) +ORDER BY queued_at ASC; +COMMIT;