worker: Separate goroutine for depsolve jobs

The worker client is thread-safe, so we can share it between routines.
This commit is contained in:
sanne 2021-10-13 13:09:35 +02:00 committed by Sanne Raymaekers
parent 87cc722021
commit 963688eb07

View file

@ -24,6 +24,7 @@ import (
)
const configFile = "/etc/osbuild-worker/osbuild-worker.toml"
const backoffDuration = time.Second * 10
type connectionConfig struct {
CACertFile string
@ -83,6 +84,39 @@ func WatchJob(ctx context.Context, job worker.Job) {
}
}
// Requests and runs 1 job of specified type(s)
// Returning an error here will result in the worker backing off for a while and retrying
func RequestAndRunJob(client *worker.Client, acceptedJobTypes []string, jobImpls map[string]JobImplementation) error {
logrus.Info("Waiting for a new job...")
job, err := client.RequestJob(acceptedJobTypes, common.CurrentArch())
if err != nil {
logrus.Errorf("Requesting job failed: %v", err)
return err
}
impl, exists := jobImpls[job.Type()]
if !exists {
logrus.Errorf("Ignoring job with unknown type %s", job.Type())
return err
}
logrus.Infof("Running '%s' job %v\n", job.Type(), job.Id())
ctx, cancelWatcher := context.WithCancel(context.Background())
go WatchJob(ctx, job)
err = impl.Run(job)
cancelWatcher()
if err != nil {
logrus.Warnf("Job %s failed: %v", job.Id(), err)
// Don't return this error so the worker picks up the next job immediately
return nil
}
logrus.Infof("Job %s finished", job.Id())
return nil
}
func main() {
var config struct {
KojiServers map[string]struct {
@ -243,6 +277,38 @@ func main() {
awsCredentials = config.AWS.Credentials
}
// depsolve jobs can be done during other jobs
depsolveCtx, depsolveCtxCancel := context.WithCancel(context.Background())
defer depsolveCtxCancel()
go func() {
jobImpls := map[string]JobImplementation{
"depsolve": &DepsolveJobImpl{
RPMMD: rpmmd.NewRPMMD(rpmmd_cache, "/usr/libexec/osbuild-composer/dnf-json"),
},
}
acceptedJobTypes := []string{}
for jt := range jobImpls {
acceptedJobTypes = append(acceptedJobTypes, jt)
}
for {
err := RequestAndRunJob(client, acceptedJobTypes, jobImpls)
if err != nil {
logrus.Warn("Received error from RequestAndRunJob, backing off")
time.Sleep(backoffDuration)
}
select {
case <-depsolveCtx.Done():
return
default:
continue
}
}
}()
// non-depsolve job
jobImpls := map[string]JobImplementation{
"osbuild": &OSBuildJobImpl{
Store: store,
@ -263,9 +329,6 @@ func main() {
"koji-finalize": &KojiFinalizeJobImpl{
KojiServers: kojiServers,
},
"depsolve": &DepsolveJobImpl{
RPMMD: rpmmd.NewRPMMD(rpmmd_cache, "/usr/libexec/osbuild-composer/dnf-json"),
},
}
acceptedJobTypes := []string{}
@ -274,30 +337,10 @@ func main() {
}
for {
logrus.Info("Waiting for a new job...")
job, err := client.RequestJob(acceptedJobTypes, common.CurrentArch())
err = RequestAndRunJob(client, acceptedJobTypes, jobImpls)
if err != nil {
logrus.Fatal(err)
logrus.Warn("Received error from RequestAndRunJob, backing off")
time.Sleep(backoffDuration)
}
impl, exists := jobImpls[job.Type()]
if !exists {
logrus.Warnf("Ignoring job with unknown type %s", job.Type())
continue
}
logrus.Infof("Running '%s' job %v\n", job.Type(), job.Id())
ctx, cancelWatcher := context.WithCancel(context.Background())
go WatchJob(ctx, job)
err = impl.Run(job)
cancelWatcher()
if err != nil {
logrus.Warnf("Job %s failed: %v", job.Id(), err)
continue
}
logrus.Infof("Job %s finished", job.Id())
}
}