debian-forge-composer/vendor/github.com/klauspost/pgzip/gzip.go
Christian Kellner 986f076276 container: add support for uploading to registries
Add a new generic container registry client via a new `container`
package. Use this to create a command line utility as well as a
new upload target for container registries.

The code uses the github.com/containers/* project and packages to
interact with container registires that is also used by skopeo,
podman et al. One if the dependencies is `proglottis/gpgme` that
is using cgo to bind libgpgme, so we have to add the corresponding
devel package to the BuildRequires as well as installing it on CI.

Checks will follow later via an integration test.
2022-06-29 10:02:46 +02:00

519 lines
13 KiB
Go

// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pgzip
import (
"bytes"
"errors"
"fmt"
"hash"
"hash/crc32"
"io"
"runtime"
"sync"
"time"
"github.com/klauspost/compress/flate"
)
const (
defaultBlockSize = 1 << 20
tailSize = 16384
defaultBlocks = 4
)
// These constants are copied from the flate package, so that code that imports
// "compress/gzip" does not also have to import "compress/flate".
const (
NoCompression = flate.NoCompression
BestSpeed = flate.BestSpeed
BestCompression = flate.BestCompression
DefaultCompression = flate.DefaultCompression
ConstantCompression = flate.ConstantCompression
HuffmanOnly = flate.HuffmanOnly
)
// A Writer is an io.WriteCloser.
// Writes to a Writer are compressed and written to w.
type Writer struct {
Header
w io.Writer
level int
wroteHeader bool
blockSize int
blocks int
currentBuffer []byte
prevTail []byte
digest hash.Hash32
size int
closed bool
buf [10]byte
errMu sync.RWMutex
err error
pushedErr chan struct{}
results chan result
dictFlatePool sync.Pool
dstPool sync.Pool
wg sync.WaitGroup
}
type result struct {
result chan []byte
notifyWritten chan struct{}
}
// Use SetConcurrency to finetune the concurrency level if needed.
//
// With this you can control the approximate size of your blocks,
// as well as how many you want to be processing in parallel.
//
// Default values for this is SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)),
// meaning blocks are split at 1 MB and up to the number of CPU threads
// can be processing at once before the writer blocks.
func (z *Writer) SetConcurrency(blockSize, blocks int) error {
if blockSize <= tailSize {
return fmt.Errorf("gzip: block size cannot be less than or equal to %d", tailSize)
}
if blocks <= 0 {
return errors.New("gzip: blocks cannot be zero or less")
}
if blockSize == z.blockSize && blocks == z.blocks {
return nil
}
z.blockSize = blockSize
z.results = make(chan result, blocks)
z.blocks = blocks
z.dstPool.New = func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }
return nil
}
// NewWriter returns a new Writer.
// Writes to the returned writer are compressed and written to w.
//
// It is the caller's responsibility to call Close on the WriteCloser when done.
// Writes may be buffered and not flushed until Close.
//
// Callers that wish to set the fields in Writer.Header must do so before
// the first call to Write or Close. The Comment and Name header fields are
// UTF-8 strings in Go, but the underlying format requires NUL-terminated ISO
// 8859-1 (Latin-1). NUL or non-Latin-1 runes in those strings will lead to an
// error on Write.
func NewWriter(w io.Writer) *Writer {
z, _ := NewWriterLevel(w, DefaultCompression)
return z
}
// NewWriterLevel is like NewWriter but specifies the compression level instead
// of assuming DefaultCompression.
//
// The compression level can be DefaultCompression, NoCompression, or any
// integer value between BestSpeed and BestCompression inclusive. The error
// returned will be nil if the level is valid.
func NewWriterLevel(w io.Writer, level int) (*Writer, error) {
if level < ConstantCompression || level > BestCompression {
return nil, fmt.Errorf("gzip: invalid compression level: %d", level)
}
z := new(Writer)
z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
z.init(w, level)
return z, nil
}
// This function must be used by goroutines to set an
// error condition, since z.err access is restricted
// to the callers goruotine.
func (z *Writer) pushError(err error) {
z.errMu.Lock()
if z.err != nil {
z.errMu.Unlock()
return
}
z.err = err
close(z.pushedErr)
z.errMu.Unlock()
}
func (z *Writer) init(w io.Writer, level int) {
z.wg.Wait()
digest := z.digest
if digest != nil {
digest.Reset()
} else {
digest = crc32.NewIEEE()
}
z.Header = Header{OS: 255}
z.w = w
z.level = level
z.digest = digest
z.pushedErr = make(chan struct{}, 0)
z.results = make(chan result, z.blocks)
z.err = nil
z.closed = false
z.Comment = ""
z.Extra = nil
z.ModTime = time.Time{}
z.wroteHeader = false
z.currentBuffer = nil
z.buf = [10]byte{}
z.prevTail = nil
z.size = 0
if z.dictFlatePool.New == nil {
z.dictFlatePool.New = func() interface{} {
f, _ := flate.NewWriterDict(w, level, nil)
return f
}
}
}
// Reset discards the Writer z's state and makes it equivalent to the
// result of its original state from NewWriter or NewWriterLevel, but
// writing to w instead. This permits reusing a Writer rather than
// allocating a new one.
func (z *Writer) Reset(w io.Writer) {
if z.results != nil && !z.closed {
close(z.results)
}
z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
z.init(w, z.level)
}
// GZIP (RFC 1952) is little-endian, unlike ZLIB (RFC 1950).
func put2(p []byte, v uint16) {
p[0] = uint8(v >> 0)
p[1] = uint8(v >> 8)
}
func put4(p []byte, v uint32) {
p[0] = uint8(v >> 0)
p[1] = uint8(v >> 8)
p[2] = uint8(v >> 16)
p[3] = uint8(v >> 24)
}
// writeBytes writes a length-prefixed byte slice to z.w.
func (z *Writer) writeBytes(b []byte) error {
if len(b) > 0xffff {
return errors.New("gzip.Write: Extra data is too large")
}
put2(z.buf[0:2], uint16(len(b)))
_, err := z.w.Write(z.buf[0:2])
if err != nil {
return err
}
_, err = z.w.Write(b)
return err
}
// writeString writes a UTF-8 string s in GZIP's format to z.w.
// GZIP (RFC 1952) specifies that strings are NUL-terminated ISO 8859-1 (Latin-1).
func (z *Writer) writeString(s string) (err error) {
// GZIP stores Latin-1 strings; error if non-Latin-1; convert if non-ASCII.
needconv := false
for _, v := range s {
if v == 0 || v > 0xff {
return errors.New("gzip.Write: non-Latin-1 header string")
}
if v > 0x7f {
needconv = true
}
}
if needconv {
b := make([]byte, 0, len(s))
for _, v := range s {
b = append(b, byte(v))
}
_, err = z.w.Write(b)
} else {
_, err = io.WriteString(z.w, s)
}
if err != nil {
return err
}
// GZIP strings are NUL-terminated.
z.buf[0] = 0
_, err = z.w.Write(z.buf[0:1])
return err
}
// compressCurrent will compress the data currently buffered
// This should only be called from the main writer/flush/closer
func (z *Writer) compressCurrent(flush bool) {
c := z.currentBuffer
if len(c) > z.blockSize {
// This can never happen through the public interface.
panic("len(z.currentBuffer) > z.blockSize (most likely due to concurrent Write race)")
}
r := result{}
r.result = make(chan []byte, 1)
r.notifyWritten = make(chan struct{}, 0)
// Reserve a result slot
select {
case z.results <- r:
case <-z.pushedErr:
return
}
z.wg.Add(1)
tail := z.prevTail
if len(c) > tailSize {
buf := z.dstPool.Get().([]byte) // Put in .compressBlock
// Copy tail from current buffer before handing the buffer over to the
// compressBlock goroutine.
buf = append(buf[:0], c[len(c)-tailSize:]...)
z.prevTail = buf
} else {
z.prevTail = nil
}
go z.compressBlock(c, tail, r, z.closed)
z.currentBuffer = z.dstPool.Get().([]byte) // Put in .compressBlock
z.currentBuffer = z.currentBuffer[:0]
// Wait if flushing
if flush {
<-r.notifyWritten
}
}
// Returns an error if it has been set.
// Cannot be used by functions that are from internal goroutines.
func (z *Writer) checkError() error {
z.errMu.RLock()
err := z.err
z.errMu.RUnlock()
return err
}
// Write writes a compressed form of p to the underlying io.Writer. The
// compressed bytes are not necessarily flushed to output until
// the Writer is closed or Flush() is called.
//
// The function will return quickly, if there are unused buffers.
// The sent slice (p) is copied, and the caller is free to re-use the buffer
// when the function returns.
//
// Errors that occur during compression will be reported later, and a nil error
// does not signify that the compression succeeded (since it is most likely still running)
// That means that the call that returns an error may not be the call that caused it.
// Only Flush and Close functions are guaranteed to return any errors up to that point.
func (z *Writer) Write(p []byte) (int, error) {
if err := z.checkError(); err != nil {
return 0, err
}
// Write the GZIP header lazily.
if !z.wroteHeader {
z.wroteHeader = true
z.buf[0] = gzipID1
z.buf[1] = gzipID2
z.buf[2] = gzipDeflate
z.buf[3] = 0
if z.Extra != nil {
z.buf[3] |= 0x04
}
if z.Name != "" {
z.buf[3] |= 0x08
}
if z.Comment != "" {
z.buf[3] |= 0x10
}
put4(z.buf[4:8], uint32(z.ModTime.Unix()))
if z.level == BestCompression {
z.buf[8] = 2
} else if z.level == BestSpeed {
z.buf[8] = 4
} else {
z.buf[8] = 0
}
z.buf[9] = z.OS
var n int
var err error
n, err = z.w.Write(z.buf[0:10])
if err != nil {
z.pushError(err)
return n, err
}
if z.Extra != nil {
err = z.writeBytes(z.Extra)
if err != nil {
z.pushError(err)
return n, err
}
}
if z.Name != "" {
err = z.writeString(z.Name)
if err != nil {
z.pushError(err)
return n, err
}
}
if z.Comment != "" {
err = z.writeString(z.Comment)
if err != nil {
z.pushError(err)
return n, err
}
}
// Start receiving data from compressors
go func() {
listen := z.results
var failed bool
for {
r, ok := <-listen
// If closed, we are finished.
if !ok {
return
}
if failed {
close(r.notifyWritten)
continue
}
buf := <-r.result
n, err := z.w.Write(buf)
if err != nil {
z.pushError(err)
close(r.notifyWritten)
failed = true
continue
}
if n != len(buf) {
z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf)))
failed = true
close(r.notifyWritten)
continue
}
z.dstPool.Put(buf)
close(r.notifyWritten)
}
}()
z.currentBuffer = z.dstPool.Get().([]byte)
z.currentBuffer = z.currentBuffer[:0]
}
q := p
for len(q) > 0 {
length := len(q)
if length+len(z.currentBuffer) > z.blockSize {
length = z.blockSize - len(z.currentBuffer)
}
z.digest.Write(q[:length])
z.currentBuffer = append(z.currentBuffer, q[:length]...)
if len(z.currentBuffer) > z.blockSize {
panic("z.currentBuffer too large (most likely due to concurrent Write race)")
}
if len(z.currentBuffer) == z.blockSize {
z.compressCurrent(false)
if err := z.checkError(); err != nil {
return len(p) - len(q), err
}
}
z.size += length
q = q[length:]
}
return len(p), z.checkError()
}
// Step 1: compresses buffer to buffer
// Step 2: send writer to channel
// Step 3: Close result channel to indicate we are done
func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) {
defer func() {
close(r.result)
z.wg.Done()
}()
buf := z.dstPool.Get().([]byte) // Corresponding Put in .Write's result writer
dest := bytes.NewBuffer(buf[:0])
compressor := z.dictFlatePool.Get().(*flate.Writer) // Put below
compressor.ResetDict(dest, prevTail)
compressor.Write(p)
z.dstPool.Put(p) // Corresponding Get in .Write and .compressCurrent
err := compressor.Flush()
if err != nil {
z.pushError(err)
return
}
if closed {
err = compressor.Close()
if err != nil {
z.pushError(err)
return
}
}
z.dictFlatePool.Put(compressor) // Get above
if prevTail != nil {
z.dstPool.Put(prevTail) // Get in .compressCurrent
}
// Read back buffer
buf = dest.Bytes()
r.result <- buf
}
// Flush flushes any pending compressed data to the underlying writer.
//
// It is useful mainly in compressed network protocols, to ensure that
// a remote reader has enough data to reconstruct a packet. Flush does
// not return until the data has been written. If the underlying
// writer returns an error, Flush returns that error.
//
// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
func (z *Writer) Flush() error {
if err := z.checkError(); err != nil {
return err
}
if z.closed {
return nil
}
if !z.wroteHeader {
_, err := z.Write(nil)
if err != nil {
return err
}
}
// We send current block to compression
z.compressCurrent(true)
return z.checkError()
}
// UncompressedSize will return the number of bytes written.
// pgzip only, not a function in the official gzip package.
func (z *Writer) UncompressedSize() int {
return z.size
}
// Close closes the Writer, flushing any unwritten data to the underlying
// io.Writer, but does not close the underlying io.Writer.
func (z *Writer) Close() error {
if err := z.checkError(); err != nil {
return err
}
if z.closed {
return nil
}
z.closed = true
if !z.wroteHeader {
z.Write(nil)
if err := z.checkError(); err != nil {
return err
}
}
z.compressCurrent(true)
if err := z.checkError(); err != nil {
return err
}
close(z.results)
put4(z.buf[0:4], z.digest.Sum32())
put4(z.buf[4:8], uint32(z.size))
_, err := z.w.Write(z.buf[0:8])
if err != nil {
z.pushError(err)
return err
}
return nil
}