Add a new generic container registry client via a new `container` package. Use this to create a command line utility as well as a new upload target for container registries. The code uses the github.com/containers/* project and packages to interact with container registires that is also used by skopeo, podman et al. One if the dependencies is `proglottis/gpgme` that is using cgo to bind libgpgme, so we have to add the corresponding devel package to the BuildRequires as well as installing it on CI. Checks will follow later via an integration test.
519 lines
13 KiB
Go
519 lines
13 KiB
Go
// Copyright 2010 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package pgzip
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"hash/crc32"
|
|
"io"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/klauspost/compress/flate"
|
|
)
|
|
|
|
const (
|
|
defaultBlockSize = 1 << 20
|
|
tailSize = 16384
|
|
defaultBlocks = 4
|
|
)
|
|
|
|
// These constants are copied from the flate package, so that code that imports
|
|
// "compress/gzip" does not also have to import "compress/flate".
|
|
const (
|
|
NoCompression = flate.NoCompression
|
|
BestSpeed = flate.BestSpeed
|
|
BestCompression = flate.BestCompression
|
|
DefaultCompression = flate.DefaultCompression
|
|
ConstantCompression = flate.ConstantCompression
|
|
HuffmanOnly = flate.HuffmanOnly
|
|
)
|
|
|
|
// A Writer is an io.WriteCloser.
|
|
// Writes to a Writer are compressed and written to w.
|
|
type Writer struct {
|
|
Header
|
|
w io.Writer
|
|
level int
|
|
wroteHeader bool
|
|
blockSize int
|
|
blocks int
|
|
currentBuffer []byte
|
|
prevTail []byte
|
|
digest hash.Hash32
|
|
size int
|
|
closed bool
|
|
buf [10]byte
|
|
errMu sync.RWMutex
|
|
err error
|
|
pushedErr chan struct{}
|
|
results chan result
|
|
dictFlatePool sync.Pool
|
|
dstPool sync.Pool
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type result struct {
|
|
result chan []byte
|
|
notifyWritten chan struct{}
|
|
}
|
|
|
|
// Use SetConcurrency to finetune the concurrency level if needed.
|
|
//
|
|
// With this you can control the approximate size of your blocks,
|
|
// as well as how many you want to be processing in parallel.
|
|
//
|
|
// Default values for this is SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)),
|
|
// meaning blocks are split at 1 MB and up to the number of CPU threads
|
|
// can be processing at once before the writer blocks.
|
|
func (z *Writer) SetConcurrency(blockSize, blocks int) error {
|
|
if blockSize <= tailSize {
|
|
return fmt.Errorf("gzip: block size cannot be less than or equal to %d", tailSize)
|
|
}
|
|
if blocks <= 0 {
|
|
return errors.New("gzip: blocks cannot be zero or less")
|
|
}
|
|
if blockSize == z.blockSize && blocks == z.blocks {
|
|
return nil
|
|
}
|
|
z.blockSize = blockSize
|
|
z.results = make(chan result, blocks)
|
|
z.blocks = blocks
|
|
z.dstPool.New = func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }
|
|
return nil
|
|
}
|
|
|
|
// NewWriter returns a new Writer.
|
|
// Writes to the returned writer are compressed and written to w.
|
|
//
|
|
// It is the caller's responsibility to call Close on the WriteCloser when done.
|
|
// Writes may be buffered and not flushed until Close.
|
|
//
|
|
// Callers that wish to set the fields in Writer.Header must do so before
|
|
// the first call to Write or Close. The Comment and Name header fields are
|
|
// UTF-8 strings in Go, but the underlying format requires NUL-terminated ISO
|
|
// 8859-1 (Latin-1). NUL or non-Latin-1 runes in those strings will lead to an
|
|
// error on Write.
|
|
func NewWriter(w io.Writer) *Writer {
|
|
z, _ := NewWriterLevel(w, DefaultCompression)
|
|
return z
|
|
}
|
|
|
|
// NewWriterLevel is like NewWriter but specifies the compression level instead
|
|
// of assuming DefaultCompression.
|
|
//
|
|
// The compression level can be DefaultCompression, NoCompression, or any
|
|
// integer value between BestSpeed and BestCompression inclusive. The error
|
|
// returned will be nil if the level is valid.
|
|
func NewWriterLevel(w io.Writer, level int) (*Writer, error) {
|
|
if level < ConstantCompression || level > BestCompression {
|
|
return nil, fmt.Errorf("gzip: invalid compression level: %d", level)
|
|
}
|
|
z := new(Writer)
|
|
z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
|
|
z.init(w, level)
|
|
return z, nil
|
|
}
|
|
|
|
// This function must be used by goroutines to set an
|
|
// error condition, since z.err access is restricted
|
|
// to the callers goruotine.
|
|
func (z *Writer) pushError(err error) {
|
|
z.errMu.Lock()
|
|
if z.err != nil {
|
|
z.errMu.Unlock()
|
|
return
|
|
}
|
|
z.err = err
|
|
close(z.pushedErr)
|
|
z.errMu.Unlock()
|
|
}
|
|
|
|
func (z *Writer) init(w io.Writer, level int) {
|
|
z.wg.Wait()
|
|
digest := z.digest
|
|
if digest != nil {
|
|
digest.Reset()
|
|
} else {
|
|
digest = crc32.NewIEEE()
|
|
}
|
|
z.Header = Header{OS: 255}
|
|
z.w = w
|
|
z.level = level
|
|
z.digest = digest
|
|
z.pushedErr = make(chan struct{}, 0)
|
|
z.results = make(chan result, z.blocks)
|
|
z.err = nil
|
|
z.closed = false
|
|
z.Comment = ""
|
|
z.Extra = nil
|
|
z.ModTime = time.Time{}
|
|
z.wroteHeader = false
|
|
z.currentBuffer = nil
|
|
z.buf = [10]byte{}
|
|
z.prevTail = nil
|
|
z.size = 0
|
|
if z.dictFlatePool.New == nil {
|
|
z.dictFlatePool.New = func() interface{} {
|
|
f, _ := flate.NewWriterDict(w, level, nil)
|
|
return f
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reset discards the Writer z's state and makes it equivalent to the
|
|
// result of its original state from NewWriter or NewWriterLevel, but
|
|
// writing to w instead. This permits reusing a Writer rather than
|
|
// allocating a new one.
|
|
func (z *Writer) Reset(w io.Writer) {
|
|
if z.results != nil && !z.closed {
|
|
close(z.results)
|
|
}
|
|
z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
|
|
z.init(w, z.level)
|
|
}
|
|
|
|
// GZIP (RFC 1952) is little-endian, unlike ZLIB (RFC 1950).
|
|
func put2(p []byte, v uint16) {
|
|
p[0] = uint8(v >> 0)
|
|
p[1] = uint8(v >> 8)
|
|
}
|
|
|
|
func put4(p []byte, v uint32) {
|
|
p[0] = uint8(v >> 0)
|
|
p[1] = uint8(v >> 8)
|
|
p[2] = uint8(v >> 16)
|
|
p[3] = uint8(v >> 24)
|
|
}
|
|
|
|
// writeBytes writes a length-prefixed byte slice to z.w.
|
|
func (z *Writer) writeBytes(b []byte) error {
|
|
if len(b) > 0xffff {
|
|
return errors.New("gzip.Write: Extra data is too large")
|
|
}
|
|
put2(z.buf[0:2], uint16(len(b)))
|
|
_, err := z.w.Write(z.buf[0:2])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = z.w.Write(b)
|
|
return err
|
|
}
|
|
|
|
// writeString writes a UTF-8 string s in GZIP's format to z.w.
|
|
// GZIP (RFC 1952) specifies that strings are NUL-terminated ISO 8859-1 (Latin-1).
|
|
func (z *Writer) writeString(s string) (err error) {
|
|
// GZIP stores Latin-1 strings; error if non-Latin-1; convert if non-ASCII.
|
|
needconv := false
|
|
for _, v := range s {
|
|
if v == 0 || v > 0xff {
|
|
return errors.New("gzip.Write: non-Latin-1 header string")
|
|
}
|
|
if v > 0x7f {
|
|
needconv = true
|
|
}
|
|
}
|
|
if needconv {
|
|
b := make([]byte, 0, len(s))
|
|
for _, v := range s {
|
|
b = append(b, byte(v))
|
|
}
|
|
_, err = z.w.Write(b)
|
|
} else {
|
|
_, err = io.WriteString(z.w, s)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// GZIP strings are NUL-terminated.
|
|
z.buf[0] = 0
|
|
_, err = z.w.Write(z.buf[0:1])
|
|
return err
|
|
}
|
|
|
|
// compressCurrent will compress the data currently buffered
|
|
// This should only be called from the main writer/flush/closer
|
|
func (z *Writer) compressCurrent(flush bool) {
|
|
c := z.currentBuffer
|
|
if len(c) > z.blockSize {
|
|
// This can never happen through the public interface.
|
|
panic("len(z.currentBuffer) > z.blockSize (most likely due to concurrent Write race)")
|
|
}
|
|
|
|
r := result{}
|
|
r.result = make(chan []byte, 1)
|
|
r.notifyWritten = make(chan struct{}, 0)
|
|
// Reserve a result slot
|
|
select {
|
|
case z.results <- r:
|
|
case <-z.pushedErr:
|
|
return
|
|
}
|
|
|
|
z.wg.Add(1)
|
|
tail := z.prevTail
|
|
if len(c) > tailSize {
|
|
buf := z.dstPool.Get().([]byte) // Put in .compressBlock
|
|
// Copy tail from current buffer before handing the buffer over to the
|
|
// compressBlock goroutine.
|
|
buf = append(buf[:0], c[len(c)-tailSize:]...)
|
|
z.prevTail = buf
|
|
} else {
|
|
z.prevTail = nil
|
|
}
|
|
go z.compressBlock(c, tail, r, z.closed)
|
|
|
|
z.currentBuffer = z.dstPool.Get().([]byte) // Put in .compressBlock
|
|
z.currentBuffer = z.currentBuffer[:0]
|
|
|
|
// Wait if flushing
|
|
if flush {
|
|
<-r.notifyWritten
|
|
}
|
|
}
|
|
|
|
// Returns an error if it has been set.
|
|
// Cannot be used by functions that are from internal goroutines.
|
|
func (z *Writer) checkError() error {
|
|
z.errMu.RLock()
|
|
err := z.err
|
|
z.errMu.RUnlock()
|
|
return err
|
|
}
|
|
|
|
// Write writes a compressed form of p to the underlying io.Writer. The
|
|
// compressed bytes are not necessarily flushed to output until
|
|
// the Writer is closed or Flush() is called.
|
|
//
|
|
// The function will return quickly, if there are unused buffers.
|
|
// The sent slice (p) is copied, and the caller is free to re-use the buffer
|
|
// when the function returns.
|
|
//
|
|
// Errors that occur during compression will be reported later, and a nil error
|
|
// does not signify that the compression succeeded (since it is most likely still running)
|
|
// That means that the call that returns an error may not be the call that caused it.
|
|
// Only Flush and Close functions are guaranteed to return any errors up to that point.
|
|
func (z *Writer) Write(p []byte) (int, error) {
|
|
if err := z.checkError(); err != nil {
|
|
return 0, err
|
|
}
|
|
// Write the GZIP header lazily.
|
|
if !z.wroteHeader {
|
|
z.wroteHeader = true
|
|
z.buf[0] = gzipID1
|
|
z.buf[1] = gzipID2
|
|
z.buf[2] = gzipDeflate
|
|
z.buf[3] = 0
|
|
if z.Extra != nil {
|
|
z.buf[3] |= 0x04
|
|
}
|
|
if z.Name != "" {
|
|
z.buf[3] |= 0x08
|
|
}
|
|
if z.Comment != "" {
|
|
z.buf[3] |= 0x10
|
|
}
|
|
put4(z.buf[4:8], uint32(z.ModTime.Unix()))
|
|
if z.level == BestCompression {
|
|
z.buf[8] = 2
|
|
} else if z.level == BestSpeed {
|
|
z.buf[8] = 4
|
|
} else {
|
|
z.buf[8] = 0
|
|
}
|
|
z.buf[9] = z.OS
|
|
var n int
|
|
var err error
|
|
n, err = z.w.Write(z.buf[0:10])
|
|
if err != nil {
|
|
z.pushError(err)
|
|
return n, err
|
|
}
|
|
if z.Extra != nil {
|
|
err = z.writeBytes(z.Extra)
|
|
if err != nil {
|
|
z.pushError(err)
|
|
return n, err
|
|
}
|
|
}
|
|
if z.Name != "" {
|
|
err = z.writeString(z.Name)
|
|
if err != nil {
|
|
z.pushError(err)
|
|
return n, err
|
|
}
|
|
}
|
|
if z.Comment != "" {
|
|
err = z.writeString(z.Comment)
|
|
if err != nil {
|
|
z.pushError(err)
|
|
return n, err
|
|
}
|
|
}
|
|
// Start receiving data from compressors
|
|
go func() {
|
|
listen := z.results
|
|
var failed bool
|
|
for {
|
|
r, ok := <-listen
|
|
// If closed, we are finished.
|
|
if !ok {
|
|
return
|
|
}
|
|
if failed {
|
|
close(r.notifyWritten)
|
|
continue
|
|
}
|
|
buf := <-r.result
|
|
n, err := z.w.Write(buf)
|
|
if err != nil {
|
|
z.pushError(err)
|
|
close(r.notifyWritten)
|
|
failed = true
|
|
continue
|
|
}
|
|
if n != len(buf) {
|
|
z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf)))
|
|
failed = true
|
|
close(r.notifyWritten)
|
|
continue
|
|
}
|
|
z.dstPool.Put(buf)
|
|
close(r.notifyWritten)
|
|
}
|
|
}()
|
|
z.currentBuffer = z.dstPool.Get().([]byte)
|
|
z.currentBuffer = z.currentBuffer[:0]
|
|
}
|
|
q := p
|
|
for len(q) > 0 {
|
|
length := len(q)
|
|
if length+len(z.currentBuffer) > z.blockSize {
|
|
length = z.blockSize - len(z.currentBuffer)
|
|
}
|
|
z.digest.Write(q[:length])
|
|
z.currentBuffer = append(z.currentBuffer, q[:length]...)
|
|
if len(z.currentBuffer) > z.blockSize {
|
|
panic("z.currentBuffer too large (most likely due to concurrent Write race)")
|
|
}
|
|
if len(z.currentBuffer) == z.blockSize {
|
|
z.compressCurrent(false)
|
|
if err := z.checkError(); err != nil {
|
|
return len(p) - len(q), err
|
|
}
|
|
}
|
|
z.size += length
|
|
q = q[length:]
|
|
}
|
|
return len(p), z.checkError()
|
|
}
|
|
|
|
// Step 1: compresses buffer to buffer
|
|
// Step 2: send writer to channel
|
|
// Step 3: Close result channel to indicate we are done
|
|
func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) {
|
|
defer func() {
|
|
close(r.result)
|
|
z.wg.Done()
|
|
}()
|
|
buf := z.dstPool.Get().([]byte) // Corresponding Put in .Write's result writer
|
|
dest := bytes.NewBuffer(buf[:0])
|
|
|
|
compressor := z.dictFlatePool.Get().(*flate.Writer) // Put below
|
|
compressor.ResetDict(dest, prevTail)
|
|
compressor.Write(p)
|
|
z.dstPool.Put(p) // Corresponding Get in .Write and .compressCurrent
|
|
|
|
err := compressor.Flush()
|
|
if err != nil {
|
|
z.pushError(err)
|
|
return
|
|
}
|
|
if closed {
|
|
err = compressor.Close()
|
|
if err != nil {
|
|
z.pushError(err)
|
|
return
|
|
}
|
|
}
|
|
z.dictFlatePool.Put(compressor) // Get above
|
|
|
|
if prevTail != nil {
|
|
z.dstPool.Put(prevTail) // Get in .compressCurrent
|
|
}
|
|
|
|
// Read back buffer
|
|
buf = dest.Bytes()
|
|
r.result <- buf
|
|
}
|
|
|
|
// Flush flushes any pending compressed data to the underlying writer.
|
|
//
|
|
// It is useful mainly in compressed network protocols, to ensure that
|
|
// a remote reader has enough data to reconstruct a packet. Flush does
|
|
// not return until the data has been written. If the underlying
|
|
// writer returns an error, Flush returns that error.
|
|
//
|
|
// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
|
|
func (z *Writer) Flush() error {
|
|
if err := z.checkError(); err != nil {
|
|
return err
|
|
}
|
|
if z.closed {
|
|
return nil
|
|
}
|
|
if !z.wroteHeader {
|
|
_, err := z.Write(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// We send current block to compression
|
|
z.compressCurrent(true)
|
|
|
|
return z.checkError()
|
|
}
|
|
|
|
// UncompressedSize will return the number of bytes written.
|
|
// pgzip only, not a function in the official gzip package.
|
|
func (z *Writer) UncompressedSize() int {
|
|
return z.size
|
|
}
|
|
|
|
// Close closes the Writer, flushing any unwritten data to the underlying
|
|
// io.Writer, but does not close the underlying io.Writer.
|
|
func (z *Writer) Close() error {
|
|
if err := z.checkError(); err != nil {
|
|
return err
|
|
}
|
|
if z.closed {
|
|
return nil
|
|
}
|
|
|
|
z.closed = true
|
|
if !z.wroteHeader {
|
|
z.Write(nil)
|
|
if err := z.checkError(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
z.compressCurrent(true)
|
|
if err := z.checkError(); err != nil {
|
|
return err
|
|
}
|
|
close(z.results)
|
|
put4(z.buf[0:4], z.digest.Sum32())
|
|
put4(z.buf[4:8], uint32(z.size))
|
|
_, err := z.w.Write(z.buf[0:8])
|
|
if err != nil {
|
|
z.pushError(err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|