diff --git a/go.mod b/go.mod index 9d1f797df..0637b4972 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.14 require ( github.com/Azure/azure-sdk-for-go v41.3.0+incompatible - github.com/Azure/azure-storage-blob-go v0.11.1-0.20201209121048-6df5d9af221d + github.com/Azure/azure-storage-blob-go v0.13.0 github.com/Azure/go-autorest/autorest v0.10.0 github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 github.com/Azure/go-autorest/autorest/to v0.3.0 // indirect diff --git a/go.sum b/go.sum index 9f1661d72..af30ea5a9 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVt github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-sdk-for-go v41.3.0+incompatible h1:W5px0x53aa47nmIAuF1XWR1ZzFuUnkJBGUuzHnNp+Nk= github.com/Azure/azure-sdk-for-go v41.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-storage-blob-go v0.11.1-0.20201209121048-6df5d9af221d h1:YEjZNZ0HS7ITX+BJ7wUXtTk6GXM3g8xftaqQ94XU/cs= -github.com/Azure/azure-storage-blob-go v0.11.1-0.20201209121048-6df5d9af221d/go.mod h1:A0u4VjtpgZJ7Y7um/+ix2DHBuEKFC6sEIlj0xc13a4Q= +github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc= +github.com/Azure/azure-storage-blob-go v0.13.0/go.mod h1:pA9kNqtjUeQF2zOSu4s//nUdBD+e64lEuc4sVnuOfNs= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.9.0 h1:MRvx8gncNaXJqOoLmhNjUAKh33JJF8LyxPhomEtOsjs= diff --git a/vendor/github.com/Azure/azure-storage-blob-go/azblob/bytes_writer.go b/vendor/github.com/Azure/azure-storage-blob-go/azblob/bytes_writer.go index 5926eaa10..8d82ebe8c 100644 --- a/vendor/github.com/Azure/azure-storage-blob-go/azblob/bytes_writer.go +++ b/vendor/github.com/Azure/azure-storage-blob-go/azblob/bytes_writer.go @@ -1,24 +1,24 @@ -package azblob - -import ( - "errors" -) - -type bytesWriter []byte - -func newBytesWriter(b []byte) bytesWriter { - return b -} - -func (c bytesWriter) WriteAt(b []byte, off int64) (int, error) { - if off >= int64(len(c)) || off < 0 { - return 0, errors.New("Offset value is out of range") - } - - n := copy(c[int(off):], b) - if n < len(b) { - return n, errors.New("Not enough space for all bytes") - } - - return n, nil -} +package azblob + +import ( + "errors" +) + +type bytesWriter []byte + +func newBytesWriter(b []byte) bytesWriter { + return b +} + +func (c bytesWriter) WriteAt(b []byte, off int64) (int, error) { + if off >= int64(len(c)) || off < 0 { + return 0, errors.New("Offset value is out of range") + } + + n := copy(c[int(off):], b) + if n < len(b) { + return n, errors.New("Not enough space for all bytes") + } + + return n, nil +} diff --git a/vendor/github.com/Azure/azure-storage-blob-go/azblob/chunkwriting.go b/vendor/github.com/Azure/azure-storage-blob-go/azblob/chunkwriting.go index 179e49c74..b7dc0d739 100644 --- a/vendor/github.com/Azure/azure-storage-blob-go/azblob/chunkwriting.go +++ b/vendor/github.com/Azure/azure-storage-blob-go/azblob/chunkwriting.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" guuid "github.com/google/uuid" ) @@ -29,7 +30,9 @@ type blockWriter interface { // choose a max value for the memory setting based on internal transfers within Azure (which will give us the maximum throughput model). // We can even provide a utility to dial this number in for customer networks to optimize their copies. func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o UploadStreamToBlockBlobOptions) (*BlockBlobCommitBlockListResponse, error) { - o.defaults() + if err := o.defaults(); err != nil { + return nil, err + } ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -41,19 +44,7 @@ func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o Uploa to: to, id: newID(), o: o, - ch: make(chan copierChunk, 1), errCh: make(chan error, 1), - buffers: sync.Pool{ - New: func() interface{} { - return make([]byte, o.BufferSize) - }, - }, - } - - // Starts the pools of concurrent writers. - cp.wg.Add(o.MaxBuffers) - for i := 0; i < o.MaxBuffers; i++ { - go cp.writer() } // Send all our chunks until we get an error. @@ -84,24 +75,21 @@ type copier struct { ctx context.Context cancel context.CancelFunc + // o contains our options for uploading. + o UploadStreamToBlockBlobOptions + + // id provides the ids for each chunk. + id *id + // reader is the source to be written to storage. reader io.Reader // to is the location we are writing our chunks to. to blockWriter - id *id - o UploadStreamToBlockBlobOptions - - // num is the current chunk we are on. - num int32 - // ch is used to pass the next chunk of data from our reader to one of the writers. - ch chan copierChunk // errCh is used to hold the first error from our concurrent writers. errCh chan error // wg provides a count of how many writers we are waiting to finish. wg sync.WaitGroup - // buffers provides a pool of chunks that can be reused. - buffers sync.Pool // result holds the final result from blob storage after we have submitted all chunks. result *BlockBlobCommitBlockListResponse @@ -130,26 +118,38 @@ func (c *copier) sendChunk() error { return err } - buffer := c.buffers.Get().([]byte) + buffer := c.o.TransferManager.Get() + if len(buffer) == 0 { + return fmt.Errorf("TransferManager returned a 0 size buffer, this is a bug in the manager") + } + n, err := io.ReadFull(c.reader, buffer) switch { case err == nil && n == 0: return nil case err == nil: - c.ch <- copierChunk{ - buffer: buffer[0:n], - id: c.id.next(), - } + id := c.id.next() + c.wg.Add(1) + c.o.TransferManager.Run( + func() { + defer c.wg.Done() + c.write(copierChunk{buffer: buffer[0:n], id: id}) + }, + ) return nil case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0: return io.EOF } if err == io.EOF || err == io.ErrUnexpectedEOF { - c.ch <- copierChunk{ - buffer: buffer[0:n], - id: c.id.next(), - } + id := c.id.next() + c.wg.Add(1) + c.o.TransferManager.Run( + func() { + defer c.wg.Done() + c.write(copierChunk{buffer: buffer[0:n], id: id}) + }, + ) return io.EOF } if err := c.getErr(); err != nil { @@ -158,41 +158,23 @@ func (c *copier) sendChunk() error { return err } -// writer writes chunks sent on a channel. -func (c *copier) writer() { - defer c.wg.Done() - - for chunk := range c.ch { - if err := c.write(chunk); err != nil { - if !errors.Is(err, context.Canceled) { - select { - case c.errCh <- err: - c.cancel() - default: - } - return - } - } - } -} - // write uploads a chunk to blob storage. -func (c *copier) write(chunk copierChunk) error { - defer c.buffers.Put(chunk.buffer) +func (c *copier) write(chunk copierChunk) { + defer c.o.TransferManager.Put(chunk.buffer) if err := c.ctx.Err(); err != nil { - return err + return } _, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer), c.o.AccessConditions.LeaseAccessConditions, nil, c.o.ClientProvidedKeyOptions) if err != nil { - return fmt.Errorf("write error: %w", err) + c.errCh <- fmt.Errorf("write error: %w", err) + return } - return nil + return } // close commits our blocks to blob storage and closes our writer. func (c *copier) close() error { - close(c.ch) c.wg.Wait() if err := c.getErr(); err != nil { @@ -219,11 +201,11 @@ func newID() *id { return &id{u: u} } -// next returns the next ID. This is not thread-safe. +// next returns the next ID. func (id *id) next() string { - defer func() { id.num++ }() + defer atomic.AddUint32(&id.num, 1) - binary.BigEndian.PutUint32((id.u[len(guuid.UUID{}):]), id.num) + binary.BigEndian.PutUint32((id.u[len(guuid.UUID{}):]), atomic.LoadUint32(&id.num)) str := base64.StdEncoding.EncodeToString(id.u[:]) id.all = append(id.all, str) diff --git a/vendor/github.com/Azure/azure-storage-blob-go/azblob/highlevel.go b/vendor/github.com/Azure/azure-storage-blob-go/azblob/highlevel.go index 3fc7f8b85..7d5a13b3b 100644 --- a/vendor/github.com/Azure/azure-storage-blob-go/azblob/highlevel.go +++ b/vendor/github.com/Azure/azure-storage-blob-go/azblob/highlevel.go @@ -3,6 +3,7 @@ package azblob import ( "context" "encoding/base64" + "fmt" "io" "net/http" @@ -361,9 +362,148 @@ func DoBatchTransfer(ctx context.Context, o BatchTransferOptions) error { //////////////////////////////////////////////////////////////////////////////////////////////// +// TransferManager provides a buffer and thread pool manager for certain transfer options. +// It is undefined behavior if code outside of this package call any of these methods. +type TransferManager interface { + // Get provides a buffer that will be used to read data into and write out to the stream. + // It is guaranteed by this package to not read or write beyond the size of the slice. + Get() []byte + // Put may or may not put the buffer into underlying storage, depending on settings. + // The buffer must not be touched after this has been called. + Put(b []byte) + // Run will use a goroutine pool entry to run a function. This blocks until a pool + // goroutine becomes available. + Run(func()) + // Closes shuts down all internal goroutines. This must be called when the TransferManager + // will no longer be used. Not closing it will cause a goroutine leak. + Close() +} + +type staticBuffer struct { + buffers chan []byte + size int + threadpool chan func() +} + +// NewStaticBuffer creates a TransferManager that will use a channel as a circular buffer +// that can hold "max" buffers of "size". The goroutine pool is also sized at max. This +// can be shared between calls if you wish to control maximum memory and concurrency with +// multiple concurrent calls. +func NewStaticBuffer(size, max int) (TransferManager, error) { + if size < 1 || max < 1 { + return nil, fmt.Errorf("cannot be called with size or max set to < 1") + } + + if size < _1MiB { + return nil, fmt.Errorf("cannot have size < 1MiB") + } + + threadpool := make(chan func(), max) + buffers := make(chan []byte, max) + for i := 0; i < max; i++ { + go func() { + for f := range threadpool { + f() + } + }() + + buffers <- make([]byte, size) + } + return staticBuffer{ + buffers: buffers, + size: size, + threadpool: threadpool, + }, nil +} + +// Get implements TransferManager.Get(). +func (s staticBuffer) Get() []byte { + return <-s.buffers +} + +// Put implements TransferManager.Put(). +func (s staticBuffer) Put(b []byte) { + select { + case s.buffers <- b: + default: // This shouldn't happen, but just in case they call Put() with there own buffer. + } +} + +// Run implements TransferManager.Run(). +func (s staticBuffer) Run(f func()) { + s.threadpool <- f +} + +// Close implements TransferManager.Close(). +func (s staticBuffer) Close() { + close(s.threadpool) + close(s.buffers) +} + +type syncPool struct { + threadpool chan func() + pool sync.Pool +} + +// NewSyncPool creates a TransferManager that will use a sync.Pool +// that can hold a non-capped number of buffers constrained by concurrency. This +// can be shared between calls if you wish to share memory and concurrency. +func NewSyncPool(size, concurrency int) (TransferManager, error) { + if size < 1 || concurrency < 1 { + return nil, fmt.Errorf("cannot be called with size or max set to < 1") + } + + if size < _1MiB { + return nil, fmt.Errorf("cannot have size < 1MiB") + } + + threadpool := make(chan func(), concurrency) + for i := 0; i < concurrency; i++ { + go func() { + for f := range threadpool { + f() + } + }() + } + + return &syncPool{ + threadpool: threadpool, + pool: sync.Pool{ + New: func() interface{} { + return make([]byte, size) + }, + }, + }, nil +} + +// Get implements TransferManager.Get(). +func (s *syncPool) Get() []byte { + return s.pool.Get().([]byte) +} + +// Put implements TransferManager.Put(). +func (s *syncPool) Put(b []byte) { + s.pool.Put(b) +} + +// Run implements TransferManager.Run(). +func (s *syncPool) Run(f func()) { + s.threadpool <- f +} + +// Close implements TransferManager.Close(). +func (s *syncPool) Close() { + close(s.threadpool) +} + const _1MiB = 1024 * 1024 +// UploadStreamToBlockBlobOptions is options for UploadStreamToBlockBlob. type UploadStreamToBlockBlobOptions struct { + // TransferManager provides a TransferManager that controls buffer allocation/reuse and + // concurrency. This overrides BufferSize and MaxBuffers if set. + TransferManager TransferManager + transferMangerNotSet bool // BufferSize sizes the buffer used to read data from source. If < 1 MiB, defaults to 1 MiB. BufferSize int // MaxBuffers defines the number of simultaneous uploads will be performed to upload the file. @@ -376,7 +516,11 @@ type UploadStreamToBlockBlobOptions struct { ClientProvidedKeyOptions ClientProvidedKeyOptions } -func (u *UploadStreamToBlockBlobOptions) defaults() { +func (u *UploadStreamToBlockBlobOptions) defaults() error { + if u.TransferManager != nil { + return nil + } + if u.MaxBuffers == 0 { u.MaxBuffers = 1 } @@ -384,13 +528,27 @@ func (u *UploadStreamToBlockBlobOptions) defaults() { if u.BufferSize < _1MiB { u.BufferSize = _1MiB } + + var err error + u.TransferManager, err = NewStaticBuffer(u.BufferSize, u.MaxBuffers) + if err != nil { + return fmt.Errorf("bug: default transfer manager could not be created: %s", err) + } + u.transferMangerNotSet = true + return nil } // UploadStreamToBlockBlob copies the file held in io.Reader to the Blob at blockBlobURL. // A Context deadline or cancellation will cause this to error. -func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL, - o UploadStreamToBlockBlobOptions) (CommonResponse, error) { - o.defaults() +func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL, o UploadStreamToBlockBlobOptions) (CommonResponse, error) { + if err := o.defaults(); err != nil { + return nil, err + } + + // If we used the default manager, we need to close it. + if o.transferMangerNotSet { + defer o.TransferManager.Close() + } result, err := copyFromReader(ctx, reader, blockBlobURL, o) if err != nil { @@ -401,6 +559,7 @@ func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL } // UploadStreamOptions (defunct) was used internally. This will be removed or made private in a future version. +// TODO: Remove on next minor release in v0 or before v1. type UploadStreamOptions struct { BufferSize int MaxBuffers int diff --git a/vendor/github.com/Azure/azure-storage-blob-go/azblob/version.go b/vendor/github.com/Azure/azure-storage-blob-go/azblob/version.go index 6d316546c..287e1e4b8 100644 --- a/vendor/github.com/Azure/azure-storage-blob-go/azblob/version.go +++ b/vendor/github.com/Azure/azure-storage-blob-go/azblob/version.go @@ -1,3 +1,3 @@ package azblob -const serviceLibVersion = "0.12" +const serviceLibVersion = "0.13" diff --git a/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_pipeline.go b/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_pipeline.go index 7c249a298..ba99255c1 100644 --- a/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_pipeline.go +++ b/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_pipeline.go @@ -41,6 +41,5 @@ func NewPipeline(c Credential, o PipelineOptions) pipeline.Pipeline { NewRequestLogPolicyFactory(o.RequestLog), pipeline.MethodFactoryMarker()) // indicates at what stage in the pipeline the method factory is invoked - return pipeline.NewPipeline(f, pipeline.Options{HTTPSender: o.HTTPSender, Log: o.Log}) } diff --git a/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_policy_unique_request_id.go b/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_policy_unique_request_id.go index a75c7d1d2..db8cee7b4 100644 --- a/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_policy_unique_request_id.go +++ b/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_policy_unique_request_id.go @@ -2,7 +2,7 @@ package azblob import ( "context" - + "errors" "github.com/Azure/azure-pipeline-go/pipeline" ) @@ -14,9 +14,22 @@ func NewUniqueRequestIDPolicyFactory() pipeline.Factory { return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { id := request.Header.Get(xMsClientRequestID) if id == "" { // Add a unique request ID if the caller didn't specify one already - request.Header.Set(xMsClientRequestID, newUUID().String()) + id = newUUID().String() + request.Header.Set(xMsClientRequestID, id) } - return next.Do(ctx, request) + + resp, err := next.Do(ctx, request) + + if err == nil && resp != nil { + val := resp.Response().Header.Values(xMsClientRequestID) + if len(val) > 0 { + if val[0] != id { + err = errors.New("client Request ID from request and response does not match") + } + } + } + + return resp, err } }) } diff --git a/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_sas_query_params.go b/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_sas_query_params.go index 427b1709a..f87ef2b7f 100644 --- a/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_sas_query_params.go +++ b/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_sas_query_params.go @@ -40,7 +40,7 @@ func FormatTimesForSASSigning(startTime, expiryTime, snapshotTime time.Time) (st } // SASTimeFormat represents the format of a SAS start or expiry time. Use it when formatting/parsing a time.Time. -const SASTimeFormat = "2006-01-02T15:04:05Z" //"2017-07-27T00:00:00Z" // ISO 8601 +const SASTimeFormat = "2006-01-02T15:04:05Z" //"2017-07-27T00:00:00Z" // ISO 8601 var SASTimeFormats = []string{"2006-01-02T15:04:05.0000000Z", SASTimeFormat, "2006-01-02T15:04Z", "2006-01-02"} // ISO 8601 formats, please refer to https://docs.microsoft.com/en-us/rest/api/storageservices/constructing-a-service-sas for more details. // formatSASTimeWithDefaultFormat format time with ISO 8601 in "yyyy-MM-ddTHH:mm:ssZ". @@ -108,8 +108,8 @@ type SASQueryParameters struct { signedVersion string `param:"skv"` // private member used for startTime and expiryTime formatting. - stTimeFormat string - seTimeFormat string + stTimeFormat string + seTimeFormat string } func (p *SASQueryParameters) SignedOid() string { diff --git a/vendor/modules.txt b/vendor/modules.txt index 4191e0e89..8cac2b8b2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -5,7 +5,7 @@ github.com/Azure/azure-pipeline-go/pipeline github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-09-01/network github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2019-05-01/resources github.com/Azure/azure-sdk-for-go/version -# github.com/Azure/azure-storage-blob-go v0.11.1-0.20201209121048-6df5d9af221d +# github.com/Azure/azure-storage-blob-go v0.13.0 ## explicit github.com/Azure/azure-storage-blob-go/azblob # github.com/Azure/go-autorest v14.2.0+incompatible