go.mod: bump azure-storage-blob-go/azblob

Due to https://github.com/Azure/azure-storage-blob-go/issues/236 , we had to
use a weird version of this library (see 1b051922).

A new release came out yesterday that's tagged correctly so let's use it
so we can remove the hack from go.mod.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2021-01-29 08:59:20 +01:00 committed by Ondřej Budai
parent 440753a274
commit 2b41190cf3
10 changed files with 251 additions and 98 deletions

2
go.mod
View file

@ -4,7 +4,7 @@ go 1.14
require (
github.com/Azure/azure-sdk-for-go v41.3.0+incompatible
github.com/Azure/azure-storage-blob-go v0.11.1-0.20201209121048-6df5d9af221d
github.com/Azure/azure-storage-blob-go v0.13.0
github.com/Azure/go-autorest/autorest v0.10.0
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
github.com/Azure/go-autorest/autorest/to v0.3.0 // indirect

4
go.sum
View file

@ -2,8 +2,8 @@ github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVt
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v41.3.0+incompatible h1:W5px0x53aa47nmIAuF1XWR1ZzFuUnkJBGUuzHnNp+Nk=
github.com/Azure/azure-sdk-for-go v41.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.11.1-0.20201209121048-6df5d9af221d h1:YEjZNZ0HS7ITX+BJ7wUXtTk6GXM3g8xftaqQ94XU/cs=
github.com/Azure/azure-storage-blob-go v0.11.1-0.20201209121048-6df5d9af221d/go.mod h1:A0u4VjtpgZJ7Y7um/+ix2DHBuEKFC6sEIlj0xc13a4Q=
github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc=
github.com/Azure/azure-storage-blob-go v0.13.0/go.mod h1:pA9kNqtjUeQF2zOSu4s//nUdBD+e64lEuc4sVnuOfNs=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0 h1:MRvx8gncNaXJqOoLmhNjUAKh33JJF8LyxPhomEtOsjs=

View file

@ -1,24 +1,24 @@
package azblob
import (
"errors"
)
type bytesWriter []byte
func newBytesWriter(b []byte) bytesWriter {
return b
}
func (c bytesWriter) WriteAt(b []byte, off int64) (int, error) {
if off >= int64(len(c)) || off < 0 {
return 0, errors.New("Offset value is out of range")
}
n := copy(c[int(off):], b)
if n < len(b) {
return n, errors.New("Not enough space for all bytes")
}
return n, nil
}
package azblob
import (
"errors"
)
type bytesWriter []byte
func newBytesWriter(b []byte) bytesWriter {
return b
}
func (c bytesWriter) WriteAt(b []byte, off int64) (int, error) {
if off >= int64(len(c)) || off < 0 {
return 0, errors.New("Offset value is out of range")
}
n := copy(c[int(off):], b)
if n < len(b) {
return n, errors.New("Not enough space for all bytes")
}
return n, nil
}

View file

@ -9,6 +9,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
guuid "github.com/google/uuid"
)
@ -29,7 +30,9 @@ type blockWriter interface {
// choose a max value for the memory setting based on internal transfers within Azure (which will give us the maximum throughput model).
// We can even provide a utility to dial this number in for customer networks to optimize their copies.
func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o UploadStreamToBlockBlobOptions) (*BlockBlobCommitBlockListResponse, error) {
o.defaults()
if err := o.defaults(); err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -41,19 +44,7 @@ func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o Uploa
to: to,
id: newID(),
o: o,
ch: make(chan copierChunk, 1),
errCh: make(chan error, 1),
buffers: sync.Pool{
New: func() interface{} {
return make([]byte, o.BufferSize)
},
},
}
// Starts the pools of concurrent writers.
cp.wg.Add(o.MaxBuffers)
for i := 0; i < o.MaxBuffers; i++ {
go cp.writer()
}
// Send all our chunks until we get an error.
@ -84,24 +75,21 @@ type copier struct {
ctx context.Context
cancel context.CancelFunc
// o contains our options for uploading.
o UploadStreamToBlockBlobOptions
// id provides the ids for each chunk.
id *id
// reader is the source to be written to storage.
reader io.Reader
// to is the location we are writing our chunks to.
to blockWriter
id *id
o UploadStreamToBlockBlobOptions
// num is the current chunk we are on.
num int32
// ch is used to pass the next chunk of data from our reader to one of the writers.
ch chan copierChunk
// errCh is used to hold the first error from our concurrent writers.
errCh chan error
// wg provides a count of how many writers we are waiting to finish.
wg sync.WaitGroup
// buffers provides a pool of chunks that can be reused.
buffers sync.Pool
// result holds the final result from blob storage after we have submitted all chunks.
result *BlockBlobCommitBlockListResponse
@ -130,26 +118,38 @@ func (c *copier) sendChunk() error {
return err
}
buffer := c.buffers.Get().([]byte)
buffer := c.o.TransferManager.Get()
if len(buffer) == 0 {
return fmt.Errorf("TransferManager returned a 0 size buffer, this is a bug in the manager")
}
n, err := io.ReadFull(c.reader, buffer)
switch {
case err == nil && n == 0:
return nil
case err == nil:
c.ch <- copierChunk{
buffer: buffer[0:n],
id: c.id.next(),
}
id := c.id.next()
c.wg.Add(1)
c.o.TransferManager.Run(
func() {
defer c.wg.Done()
c.write(copierChunk{buffer: buffer[0:n], id: id})
},
)
return nil
case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0:
return io.EOF
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
c.ch <- copierChunk{
buffer: buffer[0:n],
id: c.id.next(),
}
id := c.id.next()
c.wg.Add(1)
c.o.TransferManager.Run(
func() {
defer c.wg.Done()
c.write(copierChunk{buffer: buffer[0:n], id: id})
},
)
return io.EOF
}
if err := c.getErr(); err != nil {
@ -158,41 +158,23 @@ func (c *copier) sendChunk() error {
return err
}
// writer writes chunks sent on a channel.
func (c *copier) writer() {
defer c.wg.Done()
for chunk := range c.ch {
if err := c.write(chunk); err != nil {
if !errors.Is(err, context.Canceled) {
select {
case c.errCh <- err:
c.cancel()
default:
}
return
}
}
}
}
// write uploads a chunk to blob storage.
func (c *copier) write(chunk copierChunk) error {
defer c.buffers.Put(chunk.buffer)
func (c *copier) write(chunk copierChunk) {
defer c.o.TransferManager.Put(chunk.buffer)
if err := c.ctx.Err(); err != nil {
return err
return
}
_, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer), c.o.AccessConditions.LeaseAccessConditions, nil, c.o.ClientProvidedKeyOptions)
if err != nil {
return fmt.Errorf("write error: %w", err)
c.errCh <- fmt.Errorf("write error: %w", err)
return
}
return nil
return
}
// close commits our blocks to blob storage and closes our writer.
func (c *copier) close() error {
close(c.ch)
c.wg.Wait()
if err := c.getErr(); err != nil {
@ -219,11 +201,11 @@ func newID() *id {
return &id{u: u}
}
// next returns the next ID. This is not thread-safe.
// next returns the next ID.
func (id *id) next() string {
defer func() { id.num++ }()
defer atomic.AddUint32(&id.num, 1)
binary.BigEndian.PutUint32((id.u[len(guuid.UUID{}):]), id.num)
binary.BigEndian.PutUint32((id.u[len(guuid.UUID{}):]), atomic.LoadUint32(&id.num))
str := base64.StdEncoding.EncodeToString(id.u[:])
id.all = append(id.all, str)

View file

@ -3,6 +3,7 @@ package azblob
import (
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
@ -361,9 +362,148 @@ func DoBatchTransfer(ctx context.Context, o BatchTransferOptions) error {
////////////////////////////////////////////////////////////////////////////////////////////////
// TransferManager provides a buffer and thread pool manager for certain transfer options.
// It is undefined behavior if code outside of this package call any of these methods.
type TransferManager interface {
// Get provides a buffer that will be used to read data into and write out to the stream.
// It is guaranteed by this package to not read or write beyond the size of the slice.
Get() []byte
// Put may or may not put the buffer into underlying storage, depending on settings.
// The buffer must not be touched after this has been called.
Put(b []byte)
// Run will use a goroutine pool entry to run a function. This blocks until a pool
// goroutine becomes available.
Run(func())
// Closes shuts down all internal goroutines. This must be called when the TransferManager
// will no longer be used. Not closing it will cause a goroutine leak.
Close()
}
type staticBuffer struct {
buffers chan []byte
size int
threadpool chan func()
}
// NewStaticBuffer creates a TransferManager that will use a channel as a circular buffer
// that can hold "max" buffers of "size". The goroutine pool is also sized at max. This
// can be shared between calls if you wish to control maximum memory and concurrency with
// multiple concurrent calls.
func NewStaticBuffer(size, max int) (TransferManager, error) {
if size < 1 || max < 1 {
return nil, fmt.Errorf("cannot be called with size or max set to < 1")
}
if size < _1MiB {
return nil, fmt.Errorf("cannot have size < 1MiB")
}
threadpool := make(chan func(), max)
buffers := make(chan []byte, max)
for i := 0; i < max; i++ {
go func() {
for f := range threadpool {
f()
}
}()
buffers <- make([]byte, size)
}
return staticBuffer{
buffers: buffers,
size: size,
threadpool: threadpool,
}, nil
}
// Get implements TransferManager.Get().
func (s staticBuffer) Get() []byte {
return <-s.buffers
}
// Put implements TransferManager.Put().
func (s staticBuffer) Put(b []byte) {
select {
case s.buffers <- b:
default: // This shouldn't happen, but just in case they call Put() with there own buffer.
}
}
// Run implements TransferManager.Run().
func (s staticBuffer) Run(f func()) {
s.threadpool <- f
}
// Close implements TransferManager.Close().
func (s staticBuffer) Close() {
close(s.threadpool)
close(s.buffers)
}
type syncPool struct {
threadpool chan func()
pool sync.Pool
}
// NewSyncPool creates a TransferManager that will use a sync.Pool
// that can hold a non-capped number of buffers constrained by concurrency. This
// can be shared between calls if you wish to share memory and concurrency.
func NewSyncPool(size, concurrency int) (TransferManager, error) {
if size < 1 || concurrency < 1 {
return nil, fmt.Errorf("cannot be called with size or max set to < 1")
}
if size < _1MiB {
return nil, fmt.Errorf("cannot have size < 1MiB")
}
threadpool := make(chan func(), concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for f := range threadpool {
f()
}
}()
}
return &syncPool{
threadpool: threadpool,
pool: sync.Pool{
New: func() interface{} {
return make([]byte, size)
},
},
}, nil
}
// Get implements TransferManager.Get().
func (s *syncPool) Get() []byte {
return s.pool.Get().([]byte)
}
// Put implements TransferManager.Put().
func (s *syncPool) Put(b []byte) {
s.pool.Put(b)
}
// Run implements TransferManager.Run().
func (s *syncPool) Run(f func()) {
s.threadpool <- f
}
// Close implements TransferManager.Close().
func (s *syncPool) Close() {
close(s.threadpool)
}
const _1MiB = 1024 * 1024
// UploadStreamToBlockBlobOptions is options for UploadStreamToBlockBlob.
type UploadStreamToBlockBlobOptions struct {
// TransferManager provides a TransferManager that controls buffer allocation/reuse and
// concurrency. This overrides BufferSize and MaxBuffers if set.
TransferManager TransferManager
transferMangerNotSet bool
// BufferSize sizes the buffer used to read data from source. If < 1 MiB, defaults to 1 MiB.
BufferSize int
// MaxBuffers defines the number of simultaneous uploads will be performed to upload the file.
@ -376,7 +516,11 @@ type UploadStreamToBlockBlobOptions struct {
ClientProvidedKeyOptions ClientProvidedKeyOptions
}
func (u *UploadStreamToBlockBlobOptions) defaults() {
func (u *UploadStreamToBlockBlobOptions) defaults() error {
if u.TransferManager != nil {
return nil
}
if u.MaxBuffers == 0 {
u.MaxBuffers = 1
}
@ -384,13 +528,27 @@ func (u *UploadStreamToBlockBlobOptions) defaults() {
if u.BufferSize < _1MiB {
u.BufferSize = _1MiB
}
var err error
u.TransferManager, err = NewStaticBuffer(u.BufferSize, u.MaxBuffers)
if err != nil {
return fmt.Errorf("bug: default transfer manager could not be created: %s", err)
}
u.transferMangerNotSet = true
return nil
}
// UploadStreamToBlockBlob copies the file held in io.Reader to the Blob at blockBlobURL.
// A Context deadline or cancellation will cause this to error.
func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL,
o UploadStreamToBlockBlobOptions) (CommonResponse, error) {
o.defaults()
func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL, o UploadStreamToBlockBlobOptions) (CommonResponse, error) {
if err := o.defaults(); err != nil {
return nil, err
}
// If we used the default manager, we need to close it.
if o.transferMangerNotSet {
defer o.TransferManager.Close()
}
result, err := copyFromReader(ctx, reader, blockBlobURL, o)
if err != nil {
@ -401,6 +559,7 @@ func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL
}
// UploadStreamOptions (defunct) was used internally. This will be removed or made private in a future version.
// TODO: Remove on next minor release in v0 or before v1.
type UploadStreamOptions struct {
BufferSize int
MaxBuffers int

View file

@ -1,3 +1,3 @@
package azblob
const serviceLibVersion = "0.12"
const serviceLibVersion = "0.13"

View file

@ -41,6 +41,5 @@ func NewPipeline(c Credential, o PipelineOptions) pipeline.Pipeline {
NewRequestLogPolicyFactory(o.RequestLog),
pipeline.MethodFactoryMarker()) // indicates at what stage in the pipeline the method factory is invoked
return pipeline.NewPipeline(f, pipeline.Options{HTTPSender: o.HTTPSender, Log: o.Log})
}

View file

@ -2,7 +2,7 @@ package azblob
import (
"context"
"errors"
"github.com/Azure/azure-pipeline-go/pipeline"
)
@ -14,9 +14,22 @@ func NewUniqueRequestIDPolicyFactory() pipeline.Factory {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
id := request.Header.Get(xMsClientRequestID)
if id == "" { // Add a unique request ID if the caller didn't specify one already
request.Header.Set(xMsClientRequestID, newUUID().String())
id = newUUID().String()
request.Header.Set(xMsClientRequestID, id)
}
return next.Do(ctx, request)
resp, err := next.Do(ctx, request)
if err == nil && resp != nil {
val := resp.Response().Header.Values(xMsClientRequestID)
if len(val) > 0 {
if val[0] != id {
err = errors.New("client Request ID from request and response does not match")
}
}
}
return resp, err
}
})
}

View file

@ -40,7 +40,7 @@ func FormatTimesForSASSigning(startTime, expiryTime, snapshotTime time.Time) (st
}
// SASTimeFormat represents the format of a SAS start or expiry time. Use it when formatting/parsing a time.Time.
const SASTimeFormat = "2006-01-02T15:04:05Z" //"2017-07-27T00:00:00Z" // ISO 8601
const SASTimeFormat = "2006-01-02T15:04:05Z" //"2017-07-27T00:00:00Z" // ISO 8601
var SASTimeFormats = []string{"2006-01-02T15:04:05.0000000Z", SASTimeFormat, "2006-01-02T15:04Z", "2006-01-02"} // ISO 8601 formats, please refer to https://docs.microsoft.com/en-us/rest/api/storageservices/constructing-a-service-sas for more details.
// formatSASTimeWithDefaultFormat format time with ISO 8601 in "yyyy-MM-ddTHH:mm:ssZ".
@ -108,8 +108,8 @@ type SASQueryParameters struct {
signedVersion string `param:"skv"`
// private member used for startTime and expiryTime formatting.
stTimeFormat string
seTimeFormat string
stTimeFormat string
seTimeFormat string
}
func (p *SASQueryParameters) SignedOid() string {

2
vendor/modules.txt vendored
View file

@ -5,7 +5,7 @@ github.com/Azure/azure-pipeline-go/pipeline
github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-09-01/network
github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2019-05-01/resources
github.com/Azure/azure-sdk-for-go/version
# github.com/Azure/azure-storage-blob-go v0.11.1-0.20201209121048-6df5d9af221d
# github.com/Azure/azure-storage-blob-go v0.13.0
## explicit
github.com/Azure/azure-storage-blob-go/azblob
# github.com/Azure/go-autorest v14.2.0+incompatible