debian-forge-composer/vendor/github.com/vbauerster/mpb/v7/progress.go
Christian Kellner 986f076276 container: add support for uploading to registries
Add a new generic container registry client via a new `container`
package. Use this to create a command line utility as well as a
new upload target for container registries.

The code uses the github.com/containers/* project and packages to
interact with container registires that is also used by skopeo,
podman et al. One if the dependencies is `proglottis/gpgme` that
is using cgo to bind libgpgme, so we have to add the corresponding
devel package to the BuildRequires as well as installing it on CI.

Checks will follow later via an integration test.
2022-06-29 10:02:46 +02:00

409 lines
8.6 KiB
Go

package mpb
import (
"bytes"
"container/heap"
"context"
"fmt"
"io"
"math"
"os"
"sync"
"time"
"github.com/vbauerster/mpb/v7/cwriter"
"github.com/vbauerster/mpb/v7/decor"
)
const (
prr = 150 * time.Millisecond // default RefreshRate
)
// Progress represents a container that renders one or more progress bars.
type Progress struct {
ctx context.Context
uwg *sync.WaitGroup
cwg *sync.WaitGroup
bwg *sync.WaitGroup
operateState chan func(*pState)
done chan struct{}
refreshCh chan time.Time
once sync.Once
}
// pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
type pState struct {
bHeap priorityQueue
heapUpdated bool
pMatrix map[int][]chan int
aMatrix map[int][]chan int
// following are provided/overrided by user
idCount int
reqWidth int
popCompleted bool
outputDiscarded bool
rr time.Duration
uwg *sync.WaitGroup
externalRefresh <-chan interface{}
renderDelay <-chan struct{}
shutdownNotifier chan struct{}
queueBars map[*Bar]*Bar
output io.Writer
debugOut io.Writer
}
// New creates new Progress container instance. It's not possible to
// reuse instance after (*Progress).Wait method has been called.
func New(options ...ContainerOption) *Progress {
return NewWithContext(context.Background(), options...)
}
// NewWithContext creates new Progress container instance with provided
// context. It's not possible to reuse instance after (*Progress).Wait
// method has been called.
func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
s := &pState{
bHeap: priorityQueue{},
rr: prr,
queueBars: make(map[*Bar]*Bar),
output: os.Stdout,
}
for _, opt := range options {
if opt != nil {
opt(s)
}
}
p := &Progress{
ctx: ctx,
uwg: s.uwg,
cwg: new(sync.WaitGroup),
bwg: new(sync.WaitGroup),
operateState: make(chan func(*pState)),
done: make(chan struct{}),
}
p.cwg.Add(1)
go p.serve(s, cwriter.New(s.output))
return p
}
// AddBar creates a bar with default bar filler.
func (p *Progress) AddBar(total int64, options ...BarOption) *Bar {
return p.New(total, BarStyle(), options...)
}
// AddSpinner creates a bar with default spinner filler.
func (p *Progress) AddSpinner(total int64, options ...BarOption) *Bar {
return p.New(total, SpinnerStyle(), options...)
}
// New creates a bar with provided BarFillerBuilder.
func (p *Progress) New(total int64, builder BarFillerBuilder, options ...BarOption) *Bar {
return p.Add(total, builder.Build(), options...)
}
// Add creates a bar which renders itself by provided filler.
// If `total <= 0` triggering complete event by increment methods is disabled.
// Panics if *Progress instance is done, i.e. called after (*Progress).Wait().
func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) *Bar {
if filler == nil {
filler = NopStyle().Build()
}
p.bwg.Add(1)
result := make(chan *Bar)
select {
case p.operateState <- func(ps *pState) {
bs := ps.makeBarState(total, filler, options...)
bar := newBar(p, bs)
if bs.afterBar != nil {
ps.queueBars[bs.afterBar] = bar
} else {
heap.Push(&ps.bHeap, bar)
ps.heapUpdated = true
}
ps.idCount++
result <- bar
}:
bar := <-result
return bar
case <-p.done:
p.bwg.Done()
panic(fmt.Sprintf("%T instance can't be reused after it's done!", p))
}
}
func (p *Progress) traverseBars(cb func(b *Bar) bool) {
sync := make(chan struct{})
select {
case p.operateState <- func(s *pState) {
for i := 0; i < s.bHeap.Len(); i++ {
bar := s.bHeap[i]
if !cb(bar) {
break
}
}
close(sync)
}:
<-sync
case <-p.done:
}
}
// UpdateBarPriority same as *Bar.SetPriority(int).
func (p *Progress) UpdateBarPriority(b *Bar, priority int) {
select {
case p.operateState <- func(s *pState) {
if b.index < 0 {
return
}
b.priority = priority
heap.Fix(&s.bHeap, b.index)
}:
case <-p.done:
}
}
// BarCount returns bars count.
func (p *Progress) BarCount() int {
result := make(chan int)
select {
case p.operateState <- func(s *pState) { result <- s.bHeap.Len() }:
return <-result
case <-p.done:
return 0
}
}
// Wait waits for all bars to complete and finally shutdowns container.
// After this method has been called, there is no way to reuse *Progress
// instance.
func (p *Progress) Wait() {
// wait for user wg, if any
if p.uwg != nil {
p.uwg.Wait()
}
// wait for bars to quit, if any
p.bwg.Wait()
p.once.Do(p.shutdown)
// wait for container to quit
p.cwg.Wait()
}
func (p *Progress) shutdown() {
close(p.done)
}
func (p *Progress) serve(s *pState, cw *cwriter.Writer) {
defer p.cwg.Done()
p.refreshCh = s.newTicker(p.done)
for {
select {
case op := <-p.operateState:
op(s)
case <-p.refreshCh:
if err := s.render(cw); err != nil {
if s.debugOut != nil {
_, e := fmt.Fprintln(s.debugOut, err)
if e != nil {
panic(err)
}
} else {
panic(err)
}
}
case <-s.shutdownNotifier:
for s.heapUpdated {
if err := s.render(cw); err != nil {
if s.debugOut != nil {
_, e := fmt.Fprintln(s.debugOut, err)
if e != nil {
panic(err)
}
} else {
panic(err)
}
}
}
return
}
}
}
func (s *pState) render(cw *cwriter.Writer) error {
if s.heapUpdated {
s.updateSyncMatrix()
s.heapUpdated = false
}
syncWidth(s.pMatrix)
syncWidth(s.aMatrix)
tw, err := cw.GetWidth()
if err != nil {
tw = s.reqWidth
}
for i := 0; i < s.bHeap.Len(); i++ {
bar := s.bHeap[i]
go bar.render(tw)
}
return s.flush(cw)
}
func (s *pState) flush(cw *cwriter.Writer) error {
var lines int
pool := make([]*Bar, 0, s.bHeap.Len())
for s.bHeap.Len() > 0 {
b := heap.Pop(&s.bHeap).(*Bar)
frame := <-b.frameCh
lines += frame.lines
_, err := cw.ReadFrom(frame.reader)
if err != nil {
return err
}
if frame.shutdown {
b.Wait() // waiting for b.done, so it's safe to read b.bs
var toDrop bool
if qb, ok := s.queueBars[b]; ok {
delete(s.queueBars, b)
qb.priority = b.priority
pool = append(pool, qb)
toDrop = true
} else if s.popCompleted && !b.bs.noPop {
lines -= frame.lines
toDrop = true
}
if toDrop || b.bs.dropOnComplete {
s.heapUpdated = true
continue
}
}
pool = append(pool, b)
}
for _, b := range pool {
heap.Push(&s.bHeap, b)
}
return cw.Flush(lines)
}
func (s *pState) newTicker(done <-chan struct{}) chan time.Time {
ch := make(chan time.Time)
if s.shutdownNotifier == nil {
s.shutdownNotifier = make(chan struct{})
}
go func() {
if s.renderDelay != nil {
<-s.renderDelay
}
var internalRefresh <-chan time.Time
if !s.outputDiscarded {
if s.externalRefresh == nil {
ticker := time.NewTicker(s.rr)
defer ticker.Stop()
internalRefresh = ticker.C
}
} else {
s.externalRefresh = nil
}
for {
select {
case t := <-internalRefresh:
ch <- t
case x := <-s.externalRefresh:
if t, ok := x.(time.Time); ok {
ch <- t
} else {
ch <- time.Now()
}
case <-done:
close(s.shutdownNotifier)
return
}
}
}()
return ch
}
func (s *pState) updateSyncMatrix() {
s.pMatrix = make(map[int][]chan int)
s.aMatrix = make(map[int][]chan int)
for i := 0; i < s.bHeap.Len(); i++ {
bar := s.bHeap[i]
table := bar.wSyncTable()
pRow, aRow := table[0], table[1]
for i, ch := range pRow {
s.pMatrix[i] = append(s.pMatrix[i], ch)
}
for i, ch := range aRow {
s.aMatrix[i] = append(s.aMatrix[i], ch)
}
}
}
func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState {
bs := &bState{
id: s.idCount,
priority: s.idCount,
reqWidth: s.reqWidth,
total: total,
filler: filler,
extender: func(r io.Reader, _ int, _ decor.Statistics) (io.Reader, int) { return r, 0 },
debugOut: s.debugOut,
}
if total > 0 {
bs.triggerComplete = true
}
for _, opt := range options {
if opt != nil {
opt(bs)
}
}
if bs.middleware != nil {
bs.filler = bs.middleware(filler)
bs.middleware = nil
}
if s.popCompleted && !bs.noPop {
bs.priority = -(math.MaxInt32 - s.idCount)
}
for i := 0; i < len(bs.buffers); i++ {
bs.buffers[i] = bytes.NewBuffer(make([]byte, 0, 512))
}
bs.subscribeDecorators()
return bs
}
func syncWidth(matrix map[int][]chan int) {
for _, column := range matrix {
go maxWidthDistributor(column)
}
}
func maxWidthDistributor(column []chan int) {
var maxWidth int
for _, ch := range column {
if w := <-ch; w > maxWidth {
maxWidth = w
}
}
for _, ch := range column {
ch <- maxWidth
}
}