gen-manifests: separate worker queue code

Add the worker queue code to a separate file for better organisation
and readability.
This commit is contained in:
Achilleas Koutsou 2022-05-30 11:57:27 +02:00 committed by Tom Gundersen
parent c1f7003e12
commit 150d490ba8
2 changed files with 132 additions and 126 deletions

View file

@ -15,9 +15,6 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/distro"
@ -265,129 +262,6 @@ func requestsByImageType(requestMap formatRequestMap) map[string]map[string]mani
return imgTypeRequestMap
}
type workerQueue struct {
// manifest job channel
jobQueue chan manifestJob
// channel for sending messages from jobs to the printer
msgQueue chan string
// channel for sending errors from jobs to the collector
errQueue chan error
// global error list
errors []error
// total job count defined on workerQueue creation
// sets the length of the job queue so that pushing to the queue doesn't block
njobs uint32
// total workers defined on workerQueue creation
nworkers uint32
// active worker count
activeWorkers int32
// wait group for all workers
workerWG sync.WaitGroup
// wait group for internal routines (printer and error collector)
utilWG sync.WaitGroup
}
func newWorkerQueue(nworkers uint32, njobs uint32) *workerQueue {
wq := workerQueue{
jobQueue: make(chan manifestJob, njobs),
msgQueue: make(chan string, nworkers),
errQueue: make(chan error, nworkers),
errors: make([]error, 0, nworkers),
nworkers: nworkers,
activeWorkers: 0,
njobs: njobs,
}
return &wq
}
func (wq *workerQueue) start() {
wq.startMessagePrinter()
wq.startErrorCollector()
for idx := uint32(0); idx < wq.nworkers; idx++ {
wq.startWorker(idx)
}
}
func (wq *workerQueue) wait() []error {
wq.finish()
return wq.errors
}
// close all queues and wait for waitgroups
func (wq *workerQueue) finish() {
// close job channel and wait for workers to finish
close(wq.jobQueue)
wq.workerWG.Wait()
// close message channels and wait for them to finish their work so we don't miss any messages or errors
close(wq.msgQueue)
close(wq.errQueue)
wq.utilWG.Wait()
}
func (wq *workerQueue) startWorker(idx uint32) {
wq.workerWG.Add(1)
go func() {
atomic.AddInt32(&(wq.activeWorkers), 1)
defer atomic.AddInt32(&(wq.activeWorkers), -1)
defer wq.workerWG.Done()
for job := range wq.jobQueue {
err := job(wq.msgQueue)
if err != nil {
wq.errQueue <- err
}
}
}()
wq.msgQueue <- fmt.Sprintf("Worker %d started", idx)
}
func (wq *workerQueue) startMessagePrinter() {
wq.utilWG.Add(1)
go func() {
defer wq.utilWG.Done()
var msglen int
for {
select {
case msg, open := <-wq.msgQueue:
// clear previous line (avoids leftover trailing characters from progress)
fmt.Printf(strings.Repeat(" ", msglen) + "\r")
if !open {
fmt.Println()
return
}
fmt.Println(msg)
default:
msglen, _ = fmt.Printf(" == Jobs == Queue: %4d Active: %4d Total: %4d\r", len(wq.jobQueue), wq.activeWorkers, wq.njobs)
// sleep a bit when printing progress to avoid constantly pushing out the same progress message
time.Sleep(10 * time.Millisecond)
}
}
}()
}
func (wq *workerQueue) startErrorCollector() {
wq.utilWG.Add(1)
go func() {
defer wq.utilWG.Done()
for err := range wq.errQueue {
wq.errors = append(wq.errors, err)
}
}()
}
func (wq *workerQueue) submitJob(j manifestJob) {
wq.jobQueue <- j
}
func archIsSupported(req manifestRequest, arch string) bool {
if len(req.SupportedArches) == 0 {
// none specified: all arches supported implicitly

View file

@ -0,0 +1,132 @@
package main
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
)
type workerQueue struct {
// manifest job channel
jobQueue chan manifestJob
// channel for sending messages from jobs to the printer
msgQueue chan string
// channel for sending errors from jobs to the collector
errQueue chan error
// global error list
errors []error
// total job count defined on workerQueue creation
// sets the length of the job queue so that pushing to the queue doesn't block
njobs uint32
// total workers defined on workerQueue creation
nworkers uint32
// active worker count
activeWorkers int32
// wait group for all workers
workerWG sync.WaitGroup
// wait group for internal routines (printer and error collector)
utilWG sync.WaitGroup
}
func newWorkerQueue(nworkers uint32, njobs uint32) *workerQueue {
wq := workerQueue{
jobQueue: make(chan manifestJob, njobs),
msgQueue: make(chan string, nworkers),
errQueue: make(chan error, nworkers),
errors: make([]error, 0, nworkers),
nworkers: nworkers,
activeWorkers: 0,
njobs: njobs,
}
return &wq
}
func (wq *workerQueue) start() {
wq.startMessagePrinter()
wq.startErrorCollector()
for idx := uint32(0); idx < wq.nworkers; idx++ {
wq.startWorker(idx)
}
}
func (wq *workerQueue) wait() []error {
wq.finish()
return wq.errors
}
// close all queues and wait for waitgroups
func (wq *workerQueue) finish() {
// close job channel and wait for workers to finish
close(wq.jobQueue)
wq.workerWG.Wait()
// close message channels and wait for them to finish their work so we don't miss any messages or errors
close(wq.msgQueue)
close(wq.errQueue)
wq.utilWG.Wait()
}
func (wq *workerQueue) startWorker(idx uint32) {
wq.workerWG.Add(1)
go func() {
atomic.AddInt32(&(wq.activeWorkers), 1)
defer atomic.AddInt32(&(wq.activeWorkers), -1)
defer wq.workerWG.Done()
for job := range wq.jobQueue {
err := job(wq.msgQueue)
if err != nil {
wq.errQueue <- err
}
}
}()
wq.msgQueue <- fmt.Sprintf("Worker %d started", idx)
}
func (wq *workerQueue) startMessagePrinter() {
wq.utilWG.Add(1)
go func() {
defer wq.utilWG.Done()
var msglen int
for {
select {
case msg, open := <-wq.msgQueue:
// clear previous line (avoids leftover trailing characters from progress)
fmt.Printf(strings.Repeat(" ", msglen) + "\r")
if !open {
fmt.Println()
return
}
fmt.Println(msg)
default:
msglen, _ = fmt.Printf(" == Jobs == Queue: %4d Active: %4d Total: %4d\r", len(wq.jobQueue), wq.activeWorkers, wq.njobs)
// sleep a bit when printing progress to avoid constantly pushing out the same progress message
time.Sleep(10 * time.Millisecond)
}
}
}()
}
func (wq *workerQueue) startErrorCollector() {
wq.utilWG.Add(1)
go func() {
defer wq.utilWG.Done()
for err := range wq.errQueue {
wq.errors = append(wq.errors, err)
}
}()
}
func (wq *workerQueue) submitJob(j manifestJob) {
wq.jobQueue <- j
}