I got confused as the jobqueue interface is asymmetric.
It expects an object and returns a json.RawMessage
and when handing over to postgres this is abstracted
away by postgres
Fixes the special case that if no worker is available and we
generate an internal timeout and cancel the depsolve including all
followup jobs, no error was propagated.
This handles corrupt job json files by skipping them. They still exist,
and errors are logged, but the system keeps working.
If one or more of the json files in /var/lib/osbuild-composer/jobs/
becomes corrupt they can stop the osbuild-composer service from
starting, or stop commands like 'composer-cli compose status' from
working because they quit on the first error and miss any job that
aren't broken.
UBI and the oldest support Fedora (37) now all have go 1.19, so we are
cleared to switch.
gofmt now reformats comments in certain cases, so that explains the formatting
changes in this commit.
See https://go.dev/doc/go1.19#go-doc
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
If a job is unresponsive the worker has most likely crashed or been shut
down and the in-progress job been lost.
Instead of failing these jobs, requeue them up to two times. Once a job is lost
a third time it fails. This avoids infinite loops.
This is implemented by extending FinishJob to RequeuOrFinish job. It takes a
max number of requeues as an argument, and if that is 0, it has the same
behavior as FinishJob used to have.
If the maximum number of requeues has not yet been reached, then the running
job is returned to pending state to be picked up again.
This introduces an expiry date (default: 14 days from insert date) and
adjust the service-maintenance script to delete jobs that are older than
the expiration date.
Call vacuum analyze after each chunk of updates, and dump vacuum stats
at the beginning and end of the db cleanup.
Nulling results can increase size on disk, but calling vacuum analyze
will free up space within the table (not on disk) and reuse the space
for new inserts and updates.
Include a tenant label for all prometheus metrics. Modify
jobstatus function in the worker accordingly to return channel
so it can be passed to prometheus.
Instead of deleting records, delete the results from the manifest and
depsolve jobs. This redacts sensitive data which the manifest can
contain, and this conserves space.
The directory created by `T.TempDir` is automatically removed when the
test and all its subtests complete.
Reference: https://pkg.go.dev/testing#T.TempDir
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
Transactions are tied to a connection so this is actually not a functional
change. Nevertheless, I think it's nice to explicitly state that we are
using a transaction.
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
Otherwise, there might be an already waiting dequeuer and if something is
enqueued before `sqlListen` is called, we will lost this notification.
Also, a small log message was added when shutting down the listener.
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
Previously, all dequeuers (goroutines waiting for a job to be dequeued) were
listening for new messages on postgres channel jobs (LISTEN jobs). This didn't
scale well as each dequeuer required to have its own DB connection and the
number of DB connections is hard-limited in the pool's config.
I changed the logic to work somewhat differently: dbjobqueue.New() now spawns
a goroutine that listens on the postgres channel. If there's a new message,
the goroutine just wakes up all dequeuers using a standard go channel.
Go channels are cheap so this should scale much better.
A test was added that confirms that 100 dequeuers are not a big deal now. This
test failed when I tried to run on it on the previous commit. I tried even 1000
locally and it was still fine.
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
Removing queued_at and started_at is pretty straightforward, it wasn't needed.
Removing token might seem concerning but basically we were just pulling
the same value from DB as we were pushing there. I think there's no value in
doing that.
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
jobqueue.Job must return the channel specified in jobqueue.Enqueue during
the whole lifecycle of the given job.
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
Channels are a concept similar to job types. Callers must specify a channel
name when queueing a new job. A list of channels is also specified when
dequeueing a job. The dequeued job's channel will always be from one of the
specified channel. Of course, the job types are also respected. The dequeued
job will also always be from one of the specified type.
Currently, all calls to jobqueue were changed so all queue operations use
an empty channel name and all dequeue operations use a list containing
an empty channel.
Thus, this is a non-functional change.
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
Previous implementation of fsjobqueue is amazing but it has its drawbacks:
- dequeueing can be done only based on a job type
- it's limited to 100 jobs per a job type
As we soon want to be able to dequeue also by another criteria (job channel),
we need to refactor the queue.
The new implementation is more naive but also more flexible. It basically
works like the dbjobqueue - dequeueing goroutines listen for newly added
jobs. When that happens, a signal is sent to all of them and they all inspect
all pending jobs and dequeue ones that match their needs. Ones that don't find
a suitable job, are waiting for the next signal.
This is certainly slower implementation as every time a new job is added into
the queue, all dequeueing goroutines will have to iterate over all
pending jobs. I think that's fine because fsjobqueue is not recommended
to use for composer instances with heavy load.
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
For simplicity, the collection of the job metrics
was carried out in the job queue. This was only
being done in the dbqueue and not in the fsqueue.
This pr refactors the metric collection and moves
the job metrics to the worker server, by adding a
wrapper function to enqueueing jobs so that the
metrics only have to be recorded in one place when
queueing a job.
Refactor the current metric collection to make use
of re-usable functions, since some of the same queries
are repeated. This will also make it easier to move
the collection of metrics from the job queue.
Replace Job() and JobStatus() with typesafe versions, and introduce JobType()
for the rare instances where we don't know the type up front.
Additionally, catch a few more error cases:
- if OSBuildResult is nil, then we failed to invoke osbuild
- make sure the same JobResult handling is done for osbuild-koji, as for osbuild
We will soon need to dequeue a job using its ID. This commit adds ability
to do that to the Jobqueue interface. As always, the fsjobqueue implementation
is slightly naive but it should fine for the usecases that it's designed for.
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
Previously, we deleted empty channels when a job was dequeued. This is
completely wrong because there still might be some clients waiting for
a job. This commit removes the cleanup and adds a regression test.
Note that this has the potential to leak memory if we ever use a lot of
job types. Currently, we have just handful of them, so this is fine.
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This is backwards compatible, as long as the timeout is 0 (never
timeout), which is the default.
In case of the dbjobqueue the underlying timeout is due to
context.Canceled, context.DeadlineExceeded, or net.Error with Timeout()
true. For the fsjobqueue only the first two are considered.
fsjobqueue_test contained tests that are generically testing the
JobQueue interface. Split those out into its own package `jobqueuetest`.
These tests will be useful when implementing a new package that conforms
to the JobQueue interface.
An occupied worker checks about every 15 seconds if it's current job was
cancelled. Use this to introduce a heartbeat mechanism, where if
composer hasn't heard from the worker in 2 minutes, the job times out
and is set to fail.