Fedora 33 and rawhide got an updated version of the azblob library. Sadly, it introduced a non-compatible API change. This commit does the same thing asa67baf5adid for kolo/xmlrpc: We now have two wrappers around the affected part of the API. Fedora 32 uses the wrapper around the old API, whereas Fedora 33 and 34 (and RHEL with its vendored deps) use the wrapper around the new API. The switch is implemented using go build flags and spec file magic. Seea67baf5afor more thoughts. Also, there's v0.11.1-0.20201209121048-6df5d9af221d in go.mod, why? The maintainers of azblob probably tagged a wrong commit with v0.12.0 which breaks go. The long v0.11.1-.* version is basically the proper v0.12.0 commit. See https://github.com/Azure/azure-storage-blob-go/issues/236 for more information. Signed-off-by: Ondřej Budai <ondrej@budai.cz>
237 lines
6.8 KiB
Go
237 lines
6.8 KiB
Go
package azblob
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
guuid "github.com/google/uuid"
|
|
)
|
|
|
|
// blockWriter provides methods to upload blocks that represent a file to a server and commit them.
|
|
// This allows us to provide a local implementation that fakes the server for hermetic testing.
|
|
type blockWriter interface {
|
|
StageBlock(context.Context, string, io.ReadSeeker, LeaseAccessConditions, []byte, ClientProvidedKeyOptions) (*BlockBlobStageBlockResponse, error)
|
|
CommitBlockList(context.Context, []string, BlobHTTPHeaders, Metadata, BlobAccessConditions, AccessTierType, BlobTagsMap, ClientProvidedKeyOptions) (*BlockBlobCommitBlockListResponse, error)
|
|
}
|
|
|
|
// copyFromReader copies a source io.Reader to blob storage using concurrent uploads.
|
|
// TODO(someone): The existing model provides a buffer size and buffer limit as limiting factors. The buffer size is probably
|
|
// useless other than needing to be above some number, as the network stack is going to hack up the buffer over some size. The
|
|
// max buffers is providing a cap on how much memory we use (by multiplying it times the buffer size) and how many go routines can upload
|
|
// at a time. I think having a single max memory dial would be more efficient. We can choose an internal buffer size that works
|
|
// well, 4 MiB or 8 MiB, and autoscale to as many goroutines within the memory limit. This gives a single dial to tweak and we can
|
|
// choose a max value for the memory setting based on internal transfers within Azure (which will give us the maximum throughput model).
|
|
// We can even provide a utility to dial this number in for customer networks to optimize their copies.
|
|
func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o UploadStreamToBlockBlobOptions) (*BlockBlobCommitBlockListResponse, error) {
|
|
o.defaults()
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
cp := &copier{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
reader: from,
|
|
to: to,
|
|
id: newID(),
|
|
o: o,
|
|
ch: make(chan copierChunk, 1),
|
|
errCh: make(chan error, 1),
|
|
buffers: sync.Pool{
|
|
New: func() interface{} {
|
|
return make([]byte, o.BufferSize)
|
|
},
|
|
},
|
|
}
|
|
|
|
// Starts the pools of concurrent writers.
|
|
cp.wg.Add(o.MaxBuffers)
|
|
for i := 0; i < o.MaxBuffers; i++ {
|
|
go cp.writer()
|
|
}
|
|
|
|
// Send all our chunks until we get an error.
|
|
var err error
|
|
for {
|
|
if err = cp.sendChunk(); err != nil {
|
|
break
|
|
}
|
|
}
|
|
// If the error is not EOF, then we have a problem.
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return nil, err
|
|
}
|
|
|
|
// Close out our upload.
|
|
if err := cp.close(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return cp.result, nil
|
|
}
|
|
|
|
// copier streams a file via chunks in parallel from a reader representing a file.
|
|
// Do not use directly, instead use copyFromReader().
|
|
type copier struct {
|
|
// ctx holds the context of a copier. This is normally a faux pas to store a Context in a struct. In this case,
|
|
// the copier has the lifetime of a function call, so its fine.
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
// reader is the source to be written to storage.
|
|
reader io.Reader
|
|
// to is the location we are writing our chunks to.
|
|
to blockWriter
|
|
|
|
id *id
|
|
o UploadStreamToBlockBlobOptions
|
|
|
|
// num is the current chunk we are on.
|
|
num int32
|
|
// ch is used to pass the next chunk of data from our reader to one of the writers.
|
|
ch chan copierChunk
|
|
// errCh is used to hold the first error from our concurrent writers.
|
|
errCh chan error
|
|
// wg provides a count of how many writers we are waiting to finish.
|
|
wg sync.WaitGroup
|
|
// buffers provides a pool of chunks that can be reused.
|
|
buffers sync.Pool
|
|
|
|
// result holds the final result from blob storage after we have submitted all chunks.
|
|
result *BlockBlobCommitBlockListResponse
|
|
}
|
|
|
|
type copierChunk struct {
|
|
buffer []byte
|
|
id string
|
|
}
|
|
|
|
// getErr returns an error by priority. First, if a function set an error, it returns that error. Next, if the Context has an error
|
|
// it returns that error. Otherwise it is nil. getErr supports only returning an error once per copier.
|
|
func (c *copier) getErr() error {
|
|
select {
|
|
case err := <-c.errCh:
|
|
return err
|
|
default:
|
|
}
|
|
return c.ctx.Err()
|
|
}
|
|
|
|
// sendChunk reads data from out internal reader, creates a chunk, and sends it to be written via a channel.
|
|
// sendChunk returns io.EOF when the reader returns an io.EOF or io.ErrUnexpectedEOF.
|
|
func (c *copier) sendChunk() error {
|
|
if err := c.getErr(); err != nil {
|
|
return err
|
|
}
|
|
|
|
buffer := c.buffers.Get().([]byte)
|
|
n, err := io.ReadFull(c.reader, buffer)
|
|
switch {
|
|
case err == nil && n == 0:
|
|
return nil
|
|
case err == nil:
|
|
c.ch <- copierChunk{
|
|
buffer: buffer[0:n],
|
|
id: c.id.next(),
|
|
}
|
|
return nil
|
|
case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0:
|
|
return io.EOF
|
|
}
|
|
|
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
c.ch <- copierChunk{
|
|
buffer: buffer[0:n],
|
|
id: c.id.next(),
|
|
}
|
|
return io.EOF
|
|
}
|
|
if err := c.getErr(); err != nil {
|
|
return err
|
|
}
|
|
return err
|
|
}
|
|
|
|
// writer writes chunks sent on a channel.
|
|
func (c *copier) writer() {
|
|
defer c.wg.Done()
|
|
|
|
for chunk := range c.ch {
|
|
if err := c.write(chunk); err != nil {
|
|
if !errors.Is(err, context.Canceled) {
|
|
select {
|
|
case c.errCh <- err:
|
|
c.cancel()
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// write uploads a chunk to blob storage.
|
|
func (c *copier) write(chunk copierChunk) error {
|
|
defer c.buffers.Put(chunk.buffer)
|
|
|
|
if err := c.ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
_, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer), c.o.AccessConditions.LeaseAccessConditions, nil, c.o.ClientProvidedKeyOptions)
|
|
if err != nil {
|
|
return fmt.Errorf("write error: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// close commits our blocks to blob storage and closes our writer.
|
|
func (c *copier) close() error {
|
|
close(c.ch)
|
|
c.wg.Wait()
|
|
|
|
if err := c.getErr(); err != nil {
|
|
return err
|
|
}
|
|
|
|
var err error
|
|
c.result, err = c.to.CommitBlockList(c.ctx, c.id.issued(), c.o.BlobHTTPHeaders, c.o.Metadata, c.o.AccessConditions, c.o.BlobAccessTier, c.o.BlobTagsMap, c.o.ClientProvidedKeyOptions)
|
|
return err
|
|
}
|
|
|
|
// id allows the creation of unique IDs based on UUID4 + an int32. This auto-increments.
|
|
type id struct {
|
|
u [64]byte
|
|
num uint32
|
|
all []string
|
|
}
|
|
|
|
// newID constructs a new id.
|
|
func newID() *id {
|
|
uu := guuid.New()
|
|
u := [64]byte{}
|
|
copy(u[:], uu[:])
|
|
return &id{u: u}
|
|
}
|
|
|
|
// next returns the next ID. This is not thread-safe.
|
|
func (id *id) next() string {
|
|
defer func() { id.num++ }()
|
|
|
|
binary.BigEndian.PutUint32((id.u[len(guuid.UUID{}):]), id.num)
|
|
str := base64.StdEncoding.EncodeToString(id.u[:])
|
|
id.all = append(id.all, str)
|
|
|
|
return str
|
|
}
|
|
|
|
// issued returns all ids that have been issued. This returned value shares the internal slice so it is not safe to modify the return.
|
|
// The value is only valid until the next time next() is called.
|
|
func (id *id) issued() []string {
|
|
return id.all
|
|
}
|