package main import ( "fmt" "strings" "sync" "sync/atomic" "time" ) type workerQueue struct { // manifest job channel jobQueue chan manifestJob // channel for sending messages from jobs to the printer msgQueue chan string // channel for sending errors from jobs to the collector errQueue chan error // global error list errors []error // total job count defined on workerQueue creation // sets the length of the job queue so that pushing to the queue doesn't block njobs uint32 // total workers defined on workerQueue creation nworkers uint32 // active worker count activeWorkers int32 // wait group for all workers workerWG sync.WaitGroup // wait group for internal routines (printer and error collector) utilWG sync.WaitGroup } func newWorkerQueue(nworkers uint32, njobs uint32) *workerQueue { wq := workerQueue{ jobQueue: make(chan manifestJob, njobs), msgQueue: make(chan string, nworkers), errQueue: make(chan error, nworkers), errors: make([]error, 0, nworkers), nworkers: nworkers, activeWorkers: 0, njobs: njobs, } return &wq } func (wq *workerQueue) start() { wq.startMessagePrinter() wq.startErrorCollector() for idx := uint32(0); idx < wq.nworkers; idx++ { wq.startWorker(idx) } } // close all queues and wait for waitgroups func (wq *workerQueue) wait() []error { // close job channel and wait for workers to finish close(wq.jobQueue) wq.workerWG.Wait() // close message channels and wait for them to finish their work so we don't miss any messages or errors close(wq.msgQueue) close(wq.errQueue) wq.utilWG.Wait() return wq.errors } func (wq *workerQueue) startWorker(idx uint32) { wq.workerWG.Add(1) go func() { atomic.AddInt32(&(wq.activeWorkers), 1) defer atomic.AddInt32(&(wq.activeWorkers), -1) defer wq.workerWG.Done() for job := range wq.jobQueue { err := job(wq.msgQueue) if err != nil { wq.errQueue <- err } } }() wq.msgQueue <- fmt.Sprintf("Worker %d started", idx) } func (wq *workerQueue) startMessagePrinter() { wq.utilWG.Add(1) go func() { defer wq.utilWG.Done() var msglen int for { select { case msg, open := <-wq.msgQueue: // clear previous line (avoids leftover trailing characters from progress) fmt.Printf(strings.Repeat(" ", msglen) + "\r") if !open { fmt.Println() return } fmt.Println(msg) default: msglen, _ = fmt.Printf(" == Jobs == Queue: %4d Active: %4d Total: %4d\r", len(wq.jobQueue), wq.activeWorkers, wq.njobs) // sleep a bit when printing progress to avoid constantly pushing out the same progress message time.Sleep(10 * time.Millisecond) } } }() } func (wq *workerQueue) startErrorCollector() { wq.utilWG.Add(1) go func() { defer wq.utilWG.Done() for err := range wq.errQueue { wq.errors = append(wq.errors, err) } }() } func (wq *workerQueue) submitJob(j manifestJob) { wq.jobQueue <- j }