build(deps): bump cloud.google.com/go/storage from 1.16.1 to 1.18.1

Bumps [cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go) from 1.16.1 to 1.18.1.
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
- [Changelog](https://github.com/googleapis/google-cloud-go/blob/master/CHANGES.md)
- [Commits](https://github.com/googleapis/google-cloud-go/compare/storage/v1.16.1...storage/v1.18.1)

---
updated-dependencies:
- dependency-name: cloud.google.com/go/storage
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot] 2021-10-18 04:42:23 +00:00 committed by Tom Gundersen
parent 51024c482d
commit 9075dbc61d
79 changed files with 4237 additions and 1468 deletions

View file

@ -1,5 +1,37 @@
# Changes
### [1.18.1](https://www.github.com/googleapis/google-cloud-go/compare/storage/v1.18.0...storage/v1.18.1) (2021-10-14)
### Bug Fixes
* **storage:** don't assume auth from a client option ([#4982](https://www.github.com/googleapis/google-cloud-go/issues/4982)) ([e17334d](https://www.github.com/googleapis/google-cloud-go/commit/e17334d1fe7645d89d14ae7148313498b984dfbb))
## [1.18.0](https://www.github.com/googleapis/google-cloud-go/compare/storage/v1.17.0...storage/v1.18.0) (2021-10-11)
### Features
* **storage:** returned wrapped error for timeouts ([#4802](https://www.github.com/googleapis/google-cloud-go/issues/4802)) ([0e102a3](https://www.github.com/googleapis/google-cloud-go/commit/0e102a385dc67a06f6b444b3a93e6998428529be)), refs [#4197](https://www.github.com/googleapis/google-cloud-go/issues/4197)
* **storage:** SignedUrl can use existing creds to authenticate ([#4604](https://www.github.com/googleapis/google-cloud-go/issues/4604)) ([b824c89](https://www.github.com/googleapis/google-cloud-go/commit/b824c897e6941270747b612f6d36a8d6ae081315))
### Bug Fixes
* **storage:** update PAP to use inherited instead of unspecified ([#4909](https://www.github.com/googleapis/google-cloud-go/issues/4909)) ([dac26b1](https://www.github.com/googleapis/google-cloud-go/commit/dac26b1af2f2972f12775341173bcc5f982438b8))
## [1.17.0](https://www.github.com/googleapis/google-cloud-go/compare/storage/v1.16.1...storage/v1.17.0) (2021-09-28)
### Features
* **storage:** add projectNumber field to bucketAttrs. ([#4805](https://www.github.com/googleapis/google-cloud-go/issues/4805)) ([07343af](https://www.github.com/googleapis/google-cloud-go/commit/07343afc15085b164cc41d202d13f9d46f5c0d02))
### Bug Fixes
* **storage:** align retry idempotency (part 1) ([#4715](https://www.github.com/googleapis/google-cloud-go/issues/4715)) ([ffa903e](https://www.github.com/googleapis/google-cloud-go/commit/ffa903eeec61aa3869e5220e2f09371127b5c393))
### [1.16.1](https://www.github.com/googleapis/google-cloud-go/compare/storage/v1.16.0...storage/v1.16.1) (2021-08-30)

View file

@ -133,11 +133,9 @@ func (a *ACLHandle) bucketDefaultList(ctx context.Context) ([]ACLRule, error) {
}
func (a *ACLHandle) bucketDefaultDelete(ctx context.Context, entity ACLEntity) error {
return runWithRetry(ctx, func() error {
req := a.c.raw.DefaultObjectAccessControls.Delete(a.bucket, string(entity))
a.configureCall(ctx, req)
return req.Do()
})
req := a.c.raw.DefaultObjectAccessControls.Delete(a.bucket, string(entity))
a.configureCall(ctx, req)
return req.Do()
}
func (a *ACLHandle) bucketList(ctx context.Context) ([]ACLRule, error) {
@ -161,24 +159,16 @@ func (a *ACLHandle) bucketSet(ctx context.Context, entity ACLEntity, role ACLRol
Entity: string(entity),
Role: string(role),
}
err := runWithRetry(ctx, func() error {
req := a.c.raw.BucketAccessControls.Update(a.bucket, string(entity), acl)
a.configureCall(ctx, req)
_, err := req.Do()
return err
})
if err != nil {
return err
}
return nil
req := a.c.raw.BucketAccessControls.Update(a.bucket, string(entity), acl)
a.configureCall(ctx, req)
_, err := req.Do()
return err
}
func (a *ACLHandle) bucketDelete(ctx context.Context, entity ACLEntity) error {
return runWithRetry(ctx, func() error {
req := a.c.raw.BucketAccessControls.Delete(a.bucket, string(entity))
a.configureCall(ctx, req)
return req.Do()
})
req := a.c.raw.BucketAccessControls.Delete(a.bucket, string(entity))
a.configureCall(ctx, req)
return req.Do()
}
func (a *ACLHandle) objectList(ctx context.Context) ([]ACLRule, error) {
@ -214,18 +204,14 @@ func (a *ACLHandle) objectSet(ctx context.Context, entity ACLEntity, role ACLRol
req = a.c.raw.ObjectAccessControls.Update(a.bucket, a.object, string(entity), acl)
}
a.configureCall(ctx, req)
return runWithRetry(ctx, func() error {
_, err := req.Do()
return err
})
_, err := req.Do()
return err
}
func (a *ACLHandle) objectDelete(ctx context.Context, entity ACLEntity) error {
return runWithRetry(ctx, func() error {
req := a.c.raw.ObjectAccessControls.Delete(a.bucket, a.object, string(entity))
a.configureCall(ctx, req)
return req.Do()
})
req := a.c.raw.ObjectAccessControls.Delete(a.bucket, a.object, string(entity))
a.configureCall(ctx, req)
return req.Do()
}
func (a *ACLHandle) configureCall(ctx context.Context, call interface{ Header() http.Header }) {

View file

@ -16,15 +16,22 @@ package storage
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
"time"
"cloud.google.com/go/compute/metadata"
"cloud.google.com/go/internal/optional"
"cloud.google.com/go/internal/trace"
"golang.org/x/xerrors"
"google.golang.org/api/googleapi"
"google.golang.org/api/iamcredentials/v1"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
raw "google.golang.org/api/storage/v1"
)
@ -166,7 +173,8 @@ func (b *BucketHandle) Attrs(ctx context.Context) (attrs *BucketAttrs, err error
resp, err = req.Context(ctx).Do()
return err
})
if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrBucketNotExist
}
if err != nil {
@ -223,6 +231,118 @@ func (b *BucketHandle) newPatchCall(uattrs *BucketAttrsToUpdate) (*raw.BucketsPa
return req, nil
}
// SignedURL returns a URL for the specified object. Signed URLs allow anyone
// access to a restricted resource for a limited time without needing a
// Google account or signing in. For more information about signed URLs, see
// https://cloud.google.com/storage/docs/accesscontrol#signed_urls_query_string_authentication
//
// This method only requires the Method and Expires fields in the specified
// SignedURLOptions opts to be non-nil. If not provided, it attempts to fill the
// GoogleAccessID and PrivateKey from the GOOGLE_APPLICATION_CREDENTIALS environment variable.
// If you are authenticating with a custom HTTP client, Service Account based
// auto-detection will be hindered.
//
// If no private key is found, it attempts to use the GoogleAccessID to sign the URL.
// This requires the IAM Service Account Credentials API to be enabled
// (https://console.developers.google.com/apis/api/iamcredentials.googleapis.com/overview)
// and iam.serviceAccounts.signBlob permissions on the GoogleAccessID service account.
// If you do not want these fields set for you, you may pass them in through opts or use
// SignedURL(bucket, name string, opts *SignedURLOptions) instead.
func (b *BucketHandle) SignedURL(object string, opts *SignedURLOptions) (string, error) {
if opts.GoogleAccessID != "" && (opts.SignBytes != nil || len(opts.PrivateKey) > 0) {
return SignedURL(b.name, object, opts)
}
// Make a copy of opts so we don't modify the pointer parameter.
newopts := opts.clone()
if newopts.GoogleAccessID == "" {
id, err := b.detectDefaultGoogleAccessID()
if err != nil {
return "", err
}
newopts.GoogleAccessID = id
}
if newopts.SignBytes == nil && len(newopts.PrivateKey) == 0 {
if b.c.creds != nil && len(b.c.creds.JSON) > 0 {
var sa struct {
PrivateKey string `json:"private_key"`
}
err := json.Unmarshal(b.c.creds.JSON, &sa)
if err == nil && sa.PrivateKey != "" {
newopts.PrivateKey = []byte(sa.PrivateKey)
}
}
// Don't error out if we can't unmarshal the private key from the client,
// fallback to the default sign function for the service account.
if len(newopts.PrivateKey) == 0 {
newopts.SignBytes = b.defaultSignBytesFunc(newopts.GoogleAccessID)
}
}
return SignedURL(b.name, object, newopts)
}
// TODO: Add a similar wrapper for GenerateSignedPostPolicyV4 allowing users to
// omit PrivateKey/SignBytes
func (b *BucketHandle) detectDefaultGoogleAccessID() (string, error) {
returnErr := errors.New("no credentials found on client and not on GCE (Google Compute Engine)")
if b.c.creds != nil && len(b.c.creds.JSON) > 0 {
var sa struct {
ClientEmail string `json:"client_email"`
}
err := json.Unmarshal(b.c.creds.JSON, &sa)
if err == nil && sa.ClientEmail != "" {
return sa.ClientEmail, nil
} else if err != nil {
returnErr = err
} else {
returnErr = errors.New("storage: empty client email in credentials")
}
}
// Don't error out if we can't unmarshal, fallback to GCE check.
if metadata.OnGCE() {
email, err := metadata.Email("default")
if err == nil && email != "" {
return email, nil
} else if err != nil {
returnErr = err
} else {
returnErr = errors.New("got empty email from GCE metadata service")
}
}
return "", fmt.Errorf("storage: unable to detect default GoogleAccessID: %v", returnErr)
}
func (b *BucketHandle) defaultSignBytesFunc(email string) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
ctx := context.Background()
// It's ok to recreate this service per call since we pass in the http client,
// circumventing the cost of recreating the auth/transport layer
svc, err := iamcredentials.NewService(ctx, option.WithHTTPClient(b.c.hc))
if err != nil {
return nil, fmt.Errorf("unable to create iamcredentials client: %v", err)
}
resp, err := svc.Projects.ServiceAccounts.SignBlob(fmt.Sprintf("projects/-/serviceAccounts/%s", email), &iamcredentials.SignBlobRequest{
Payload: base64.StdEncoding.EncodeToString(in),
}).Do()
if err != nil {
return nil, fmt.Errorf("unable to sign bytes: %v", err)
}
out, err := base64.StdEncoding.DecodeString(resp.SignedBlob)
if err != nil {
return nil, fmt.Errorf("unable to base64 decode response: %v", err)
}
return out, nil
}
}
// BucketAttrs represents the metadata for a Google Cloud Storage bucket.
// Read-only fields are ignored by BucketHandle.Create.
type BucketAttrs struct {
@ -337,6 +457,10 @@ type BucketAttrs struct {
// Typical values are "multi-region", "region" and "dual-region".
// This field is read-only.
LocationType string
// The project number of the project the bucket belongs to.
// This field is read-only.
ProjectNumber uint64
}
// BucketPolicyOnly is an alias for UniformBucketLevelAccess.
@ -372,23 +496,29 @@ const (
// not set in a call to GCS.
PublicAccessPreventionUnknown PublicAccessPrevention = iota
// PublicAccessPreventionUnspecified corresponds to a value of "unspecified"
// and is the default for buckets.
// PublicAccessPreventionUnspecified corresponds to a value of "unspecified".
// Deprecated: use PublicAccessPreventionInherited
PublicAccessPreventionUnspecified
// PublicAccessPreventionEnforced corresponds to a value of "enforced". This
// enforces Public Access Prevention on the bucket.
PublicAccessPreventionEnforced
publicAccessPreventionUnknown string = ""
publicAccessPreventionUnspecified = "unspecified"
publicAccessPreventionEnforced = "enforced"
// PublicAccessPreventionInherited corresponds to a value of "inherited"
// and is the default for buckets.
PublicAccessPreventionInherited
publicAccessPreventionUnknown string = ""
// TODO: remove unspecified when change is fully completed
publicAccessPreventionUnspecified = "unspecified"
publicAccessPreventionEnforced = "enforced"
publicAccessPreventionInherited = "inherited"
)
func (p PublicAccessPrevention) String() string {
switch p {
case PublicAccessPreventionUnspecified:
return publicAccessPreventionUnspecified
case PublicAccessPreventionInherited, PublicAccessPreventionUnspecified:
return publicAccessPreventionInherited
case PublicAccessPreventionEnforced:
return publicAccessPreventionEnforced
default:
@ -597,6 +727,7 @@ func newBucket(b *raw.Bucket) (*BucketAttrs, error) {
PublicAccessPrevention: toPublicAccessPrevention(b.IamConfiguration),
Etag: b.Etag,
LocationType: b.LocationType,
ProjectNumber: b.ProjectNumber,
}, nil
}
@ -1207,8 +1338,8 @@ func toPublicAccessPrevention(b *raw.BucketIamConfiguration) PublicAccessPrevent
return PublicAccessPreventionUnknown
}
switch b.PublicAccessPrevention {
case publicAccessPreventionUnspecified:
return PublicAccessPreventionUnspecified
case publicAccessPreventionInherited, publicAccessPreventionUnspecified:
return PublicAccessPreventionInherited
case publicAccessPreventionEnforced:
return PublicAccessPreventionEnforced
default:
@ -1308,7 +1439,8 @@ func (it *ObjectIterator) fetch(pageSize int, pageToken string) (string, error)
return err
})
if err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
err = ErrBucketNotExist
}
return "", err

View file

@ -247,9 +247,10 @@ as the documentation of GenerateSignedPostPolicyV4.
Errors
Errors returned by this client are often of the type [`googleapi.Error`](https://godoc.org/google.golang.org/api/googleapi#Error).
These errors can be introspected for more information by type asserting to the richer `googleapi.Error` type. For example:
These errors can be introspected for more information by using `errors.As` with the richer `googleapi.Error` type. For example:
if e, ok := err.(*googleapi.Error); ok {
var e *googleapi.Error
if ok := errors.As(err, &e); ok {
if e.Code == 409 { ... }
}
*/

View file

@ -3,14 +3,14 @@ module cloud.google.com/go/storage
go 1.11
require (
cloud.google.com/go v0.93.3
cloud.google.com/go v0.97.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a
github.com/googleapis/gax-go/v2 v2.1.1
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.54.0
google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda
google.golang.org/api v0.58.0
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.27.1
)

View file

@ -22,8 +22,10 @@ cloud.google.com/go v0.83.0/go.mod h1:Z7MJUsANfY0pYPdw0lbnivPx4/vhy/e2FEkSkF7vAV
cloud.google.com/go v0.84.0/go.mod h1:RazrYuxIK6Kb7YrzzhPoLmCVzl7Sup4NrbKPg8KHSUM=
cloud.google.com/go v0.87.0/go.mod h1:TpDYlFy7vuLzZMMZ+B6iRiELaY7z/gJPaqbMx6mlWcY=
cloud.google.com/go v0.90.0/go.mod h1:kRX0mNRHe0e2rC6oNakvwQqzyDmg57xJ+SZU1eT2aDQ=
cloud.google.com/go v0.93.3 h1:wPBktZFzYBcCZVARvwVKqH1uEj+aLXofJEtrb4oOsio=
cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+YI=
cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4=
cloud.google.com/go v0.97.0 h1:3DXvAyifywvq64LfkKaMOmkWPS1CikIQdMe2lY9vxU8=
cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
@ -141,8 +143,10 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLe
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0=
github.com/googleapis/gax-go/v2 v2.1.1 h1:dp3bWCh+PPO1zjRRiCSczJav13sBvG4UhNyVTa1KqdU=
github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
@ -266,8 +270,10 @@ golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a h1:4Kd8OPUx1xgUwrHDaviWZO8MsgoZTZYC3g+8m16RBww=
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 h1:B333XXssMuKQeBwiNODx4TupZy7bf4sxFZnN2ZOcvUE=
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -321,8 +327,11 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678 h1:J27LZFQBFoihqXoegpscI10HpjZ7B5WQLLKL2FZXQKw=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -417,8 +426,12 @@ google.golang.org/api v0.47.0/go.mod h1:Wbvgpq1HddcWVtzsVLyfLp8lDg6AA241LmgIL59t
google.golang.org/api v0.48.0/go.mod h1:71Pr1vy+TAZRPkPs/xlCf5SsU8WjuAWv1Pfjbtukyy4=
google.golang.org/api v0.50.0/go.mod h1:4bNT5pAuq5ji4SRZm+5QIkjny9JAyVD/3gaSihNefaw=
google.golang.org/api v0.51.0/go.mod h1:t4HdrdoNgyN5cbEfm7Lum0lcLDLiise1F8qDKX00sOU=
google.golang.org/api v0.54.0 h1:ECJUVngj71QI6XEm7b1sAf8BljU5inEhMbKPR8Lxhhk=
google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6z3k=
google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI=
google.golang.org/api v0.58.0 h1:MDkAbYIB1JpSgCTOCYYoIec/coMlKK4oVbpnBLLcyT0=
google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@ -477,8 +490,14 @@ google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea/go.mod h1:AxrInvYm
google.golang.org/genproto v0.0.0-20210728212813-7823e685a01f/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48=
google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48=
google.golang.org/genproto v0.0.0-20210813162853-db860fec028c/go.mod h1:cFeNkxwySK631ADgubI+/XFU/xp8FD5KIVV4rj8UC5w=
google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda h1:iT5uhT54PtbqUsWddv/nnEWdE5e/MTr+Nv3vjxlBP1A=
google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0 h1:5Tbluzus3QxoAJx4IefGt1W0HQZW4nuMrVk684jI74Q=
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=

View file

@ -214,12 +214,7 @@ func (c *Client) CreateHMACKey(ctx context.Context, projectID, serviceAccountEma
setClientHeader(call.Header())
var hkPb *raw.HmacKey
var err error
err = runWithRetry(ctx, func() error {
hkPb, err = call.Context(ctx).Do()
return err
})
hkPb, err := call.Context(ctx).Do()
if err != nil {
return nil, err
}

View file

@ -68,7 +68,7 @@ import (
type clientHookParams struct{}
type clientHook func(context.Context, clientHookParams) ([]option.ClientOption, error)
const versionClient = "20210821"
const versionClient = "20210921"
func insertMetadata(ctx context.Context, mds ...metadata.MD) context.Context {
out, _ := metadata.FromOutgoingContext(ctx)

View file

@ -36,7 +36,7 @@ func runWithRetry(ctx context.Context, call func() error) error {
return true, nil
}
if shouldRetry(err) {
return false, nil
return false, err
}
return true, err
})

View file

@ -184,5 +184,7 @@ func (b *BucketHandle) DeleteNotification(ctx context.Context, id string) (err e
if b.userProject != "" {
call.UserProject(b.userProject)
}
return call.Context(ctx).Do()
return runWithRetry(ctx, func() error {
return call.Context(ctx).Do()
})
}

View file

@ -30,7 +30,6 @@ import (
"cloud.google.com/go/internal/trace"
"google.golang.org/api/googleapi"
storagepb "google.golang.org/genproto/googleapis/storage/v2"
"google.golang.org/protobuf/proto"
)
var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
@ -475,7 +474,10 @@ func (o *ObjectHandle) newRangeReaderWithGRPC(ctx context.Context, offset, lengt
}
req.ReadOffset = start
setRequestConditions(req, o.conds)
if err := applyCondsProto("reopenWithGRPC", o.gen, o.conds, req); err != nil {
cancel()
return nil, nil, err
}
var stream storagepb.Storage_ReadObjectClient
var msg *storagepb.ReadObjectResponse
@ -631,10 +633,7 @@ func (r *Reader) readWithGRPC(p []byte) (int, error) {
if leftover > 0 {
// Wasn't able to copy all of the data in the message, store for
// future Read calls.
// TODO: Instead of acquiring a new block of memory, should we reuse
// the existing leftovers slice, expanding it if necessary?
r.leftovers = make([]byte, leftover)
copy(r.leftovers, content[n:])
r.leftovers = content[n:]
}
r.seen += int64(n)
@ -683,29 +682,6 @@ func (r *Reader) reopenStream(seen int64) (*storagepb.ReadObjectResponse, error)
return res.response, nil
}
// setRequestConditions is used to apply the given Conditions to a gRPC request
// message.
//
// This is an experimental API and not intended for public use.
func setRequestConditions(req *storagepb.ReadObjectRequest, conds *Conditions) {
if conds == nil {
return
}
if conds.MetagenerationMatch != 0 {
req.IfMetagenerationMatch = proto.Int64(conds.MetagenerationMatch)
} else if conds.MetagenerationNotMatch != 0 {
req.IfMetagenerationNotMatch = proto.Int64(conds.MetagenerationNotMatch)
}
switch {
case conds.GenerationNotMatch != 0:
req.IfGenerationNotMatch = proto.Int64(conds.GenerationNotMatch)
case conds.GenerationMatch != 0:
req.IfGenerationMatch = proto.Int64(conds.GenerationMatch)
case conds.DoesNotExist:
req.IfGenerationMatch = proto.Int64(0)
}
}
// Size returns the size of the object in bytes.
// The returned value is always the same and is not affected by
// calls to Read or Close.

View file

@ -41,13 +41,17 @@ import (
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
gapic "cloud.google.com/go/storage/internal/apiv2"
"golang.org/x/oauth2/google"
"golang.org/x/xerrors"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
raw "google.golang.org/api/storage/v1"
"google.golang.org/api/transport"
htransport "google.golang.org/api/transport/http"
storagepb "google.golang.org/genproto/googleapis/storage/v2"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/timestamppb"
)
@ -96,6 +100,8 @@ type Client struct {
scheme string
// ReadHost is the default host used on the reader.
readHost string
// May be nil.
creds *google.Credentials
// gc is an optional gRPC-based, GAPIC client.
//
@ -110,6 +116,7 @@ type Client struct {
// Clients should be reused instead of created as needed. The methods of Client
// are safe for concurrent use by multiple goroutines.
func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error) {
var creds *google.Credentials
// In general, it is recommended to use raw.NewService instead of htransport.NewClient
// since raw.NewService configures the correct default endpoints when initializing the
@ -120,10 +127,18 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error
// need to account for STORAGE_EMULATOR_HOST override when setting the default endpoints.
if host := os.Getenv("STORAGE_EMULATOR_HOST"); host == "" {
// Prepend default options to avoid overriding options passed by the user.
opts = append([]option.ClientOption{option.WithScopes(ScopeFullControl), option.WithUserAgent(userAgent)}, opts...)
opts = append([]option.ClientOption{option.WithScopes(ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"), option.WithUserAgent(userAgent)}, opts...)
opts = append(opts, internaloption.WithDefaultEndpoint("https://storage.googleapis.com/storage/v1/"))
opts = append(opts, internaloption.WithDefaultMTLSEndpoint("https://storage.mtls.googleapis.com/storage/v1/"))
// Don't error out here. The user may have passed in their own HTTP
// client which does not auth with ADC or other common conventions.
c, err := transport.Creds(ctx, opts...)
if err == nil {
creds = c
opts = append(opts, internaloption.WithCredentials(creds))
}
} else {
var hostURL *url.URL
@ -170,6 +185,7 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error
raw: rawService,
scheme: u.Scheme,
readHost: u.Host,
creds: creds,
}, nil
}
@ -208,6 +224,7 @@ func (c *Client) Close() error {
// Set fields to nil so that subsequent uses will panic.
c.hc = nil
c.raw = nil
c.creds = nil
if c.gc != nil {
return c.gc.Close()
}
@ -394,6 +411,23 @@ type SignedURLOptions struct {
Scheme SigningScheme
}
func (opts *SignedURLOptions) clone() *SignedURLOptions {
return &SignedURLOptions{
GoogleAccessID: opts.GoogleAccessID,
SignBytes: opts.SignBytes,
PrivateKey: opts.PrivateKey,
Method: opts.Method,
Expires: opts.Expires,
ContentType: opts.ContentType,
Headers: opts.Headers,
QueryParameters: opts.QueryParameters,
MD5: opts.MD5,
Style: opts.Style,
Insecure: opts.Insecure,
Scheme: opts.Scheme,
}
}
var (
tabRegex = regexp.MustCompile(`[\t]+`)
// I was tempted to call this spacex. :)
@ -507,11 +541,11 @@ func v4SanitizeHeaders(hdrs []string) []string {
return sanitizedHeaders
}
// SignedURL returns a URL for the specified object. Signed URLs allow
// the users access to a restricted resource for a limited time without having a
// Google account or signing in. For more information about the signed
// URLs, see https://cloud.google.com/storage/docs/accesscontrol#Signed-URLs.
func SignedURL(bucket, name string, opts *SignedURLOptions) (string, error) {
// SignedURL returns a URL for the specified object. Signed URLs allow anyone
// access to a restricted resource for a limited time without needing a
// Google account or signing in. For more information about signed URLs, see
// https://cloud.google.com/storage/docs/accesscontrol#signed_urls_query_string_authentication
func SignedURL(bucket, object string, opts *SignedURLOptions) (string, error) {
now := utcNow()
if err := validateOptions(opts, now); err != nil {
return "", err
@ -520,13 +554,13 @@ func SignedURL(bucket, name string, opts *SignedURLOptions) (string, error) {
switch opts.Scheme {
case SigningSchemeV2:
opts.Headers = v2SanitizeHeaders(opts.Headers)
return signedURLV2(bucket, name, opts)
return signedURLV2(bucket, object, opts)
case SigningSchemeV4:
opts.Headers = v4SanitizeHeaders(opts.Headers)
return signedURLV4(bucket, name, opts, now)
return signedURLV4(bucket, object, opts, now)
default: // SigningSchemeDefault
opts.Headers = v2SanitizeHeaders(opts.Headers)
return signedURLV2(bucket, name, opts)
return signedURLV2(bucket, object, opts)
}
}
@ -866,7 +900,8 @@ func (o *ObjectHandle) Attrs(ctx context.Context) (attrs *ObjectAttrs, err error
var obj *raw.Object
setClientHeader(call.Header())
err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrObjectNotExist
}
if err != nil {
@ -966,7 +1001,8 @@ func (o *ObjectHandle) Update(ctx context.Context, uattrs ObjectAttrsToUpdate) (
var obj *raw.Object
setClientHeader(call.Header())
err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrObjectNotExist
}
if err != nil {
@ -1029,13 +1065,9 @@ func (o *ObjectHandle) Delete(ctx context.Context) error {
// Encryption doesn't apply to Delete.
setClientHeader(call.Header())
err := runWithRetry(ctx, func() error { return call.Do() })
switch e := err.(type) {
case nil:
return nil
case *googleapi.Error:
if e.Code == http.StatusNotFound {
return ErrObjectNotExist
}
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return ErrObjectNotExist
}
return err
}
@ -1388,9 +1420,8 @@ func newObject(o *raw.Object) *ObjectAttrs {
}
}
func newObjectFromProto(r *storagepb.WriteObjectResponse) *ObjectAttrs {
o := r.GetResource()
if r == nil || o == nil {
func newObjectFromProto(o *storagepb.Object) *ObjectAttrs {
if o == nil {
return nil
}
return &ObjectAttrs{
@ -1769,7 +1800,12 @@ func setEncryptionHeaders(headers http.Header, key []byte, copySource bool) erro
// ServiceAccount fetches the email address of the given project's Google Cloud Storage service account.
func (c *Client) ServiceAccount(ctx context.Context, projectID string) (string, error) {
r := c.raw.Projects.ServiceAccount.Get(projectID)
res, err := r.Context(ctx).Do()
var res *raw.ServiceAccount
var err error
err = runWithRetry(ctx, func() error {
res, err = r.Context(ctx).Do()
return err
})
if err != nil {
return "", err
}
@ -1788,3 +1824,63 @@ func bucketResourceName(p, b string) string {
func parseBucketName(b string) string {
return strings.TrimPrefix(b, "projects/_/buckets/")
}
// setConditionProtoField uses protobuf reflection to set named condition field
// to the given condition value if supported on the protobuf message.
//
// This is an experimental API and not intended for public use.
func setConditionProtoField(m protoreflect.Message, f string, v int64) bool {
fields := m.Descriptor().Fields()
if rf := fields.ByName(protoreflect.Name(f)); rf != nil {
m.Set(rf, protoreflect.ValueOfInt64(v))
return true
}
return false
}
// applyCondsProto validates and attempts to set the conditions on a protobuf
// message using protobuf reflection.
//
// This is an experimental API and not intended for public use.
func applyCondsProto(method string, gen int64, conds *Conditions, msg proto.Message) error {
rmsg := msg.ProtoReflect()
if gen >= 0 {
if !setConditionProtoField(rmsg, "generation", gen) {
return fmt.Errorf("storage: %s: generation not supported", method)
}
}
if conds == nil {
return nil
}
if err := conds.validate(method); err != nil {
return err
}
switch {
case conds.GenerationMatch != 0:
if !setConditionProtoField(rmsg, "if_generation_match", conds.GenerationMatch) {
return fmt.Errorf("storage: %s: ifGenerationMatch not supported", method)
}
case conds.GenerationNotMatch != 0:
if !setConditionProtoField(rmsg, "if_generation_not_match", conds.GenerationNotMatch) {
return fmt.Errorf("storage: %s: ifGenerationNotMatch not supported", method)
}
case conds.DoesNotExist:
if !setConditionProtoField(rmsg, "if_generation_match", int64(0)) {
return fmt.Errorf("storage: %s: DoesNotExist not supported", method)
}
}
switch {
case conds.MetagenerationMatch != 0:
if !setConditionProtoField(rmsg, "if_metageneration_match", conds.MetagenerationMatch) {
return fmt.Errorf("storage: %s: ifMetagenerationMatch not supported", method)
}
case conds.MetagenerationNotMatch != 0:
if !setConditionProtoField(rmsg, "if_metageneration_not_match", conds.MetagenerationNotMatch) {
return fmt.Errorf("storage: %s: ifMetagenerationNotMatch not supported", method)
}
}
return nil
}

View file

@ -23,8 +23,21 @@ import (
"sync"
"unicode/utf8"
"github.com/golang/protobuf/proto"
"golang.org/x/xerrors"
"google.golang.org/api/googleapi"
raw "google.golang.org/api/storage/v1"
storagepb "google.golang.org/genproto/googleapis/storage/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// Maximum amount of content that can be sent per WriteObjectRequest message.
// A buffer reaching this amount will precipitate a flush of the buffer.
//
// This is only used for the gRPC-based Writer.
maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)
)
// A Writer writes a Cloud Storage object.
@ -84,30 +97,30 @@ type Writer struct {
mu sync.Mutex
err error
// The gRPC client-stream used for sending buffers.
//
// This is an experimental API and not intended for public use.
stream storagepb.Storage_WriteObjectClient
// The Resumable Upload ID started by a gRPC-based Writer.
//
// This is an experimental API and not intended for public use.
upid string
}
func (w *Writer) open() error {
attrs := w.ObjectAttrs
// Check the developer didn't change the object Name (this is unfortunate, but
// we don't want to store an object under the wrong name).
if attrs.Name != w.o.object {
return fmt.Errorf("storage: Writer.Name %q does not match object name %q", attrs.Name, w.o.object)
}
if !utf8.ValidString(attrs.Name) {
return fmt.Errorf("storage: object name %q is not valid UTF-8", attrs.Name)
}
if attrs.KMSKeyName != "" && w.o.encryptionKey != nil {
return errors.New("storage: cannot use KMSKeyName with a customer-supplied encryption key")
}
if w.ChunkSize < 0 {
return errors.New("storage: Writer.ChunkSize must be non-negative")
if err := w.validateWriteAttrs(); err != nil {
return err
}
pr, pw := io.Pipe()
w.pw = pw
w.opened = true
go w.monitorCancel()
attrs := w.ObjectAttrs
mediaOpts := []googleapi.MediaOption{
googleapi.ChunkSize(w.ChunkSize),
}
@ -190,7 +203,12 @@ func (w *Writer) Write(p []byte) (n int, err error) {
return 0, werr
}
if !w.opened {
if err := w.open(); err != nil {
// gRPC client has been initialized - use gRPC to upload.
if w.o.c.gc != nil {
if err := w.openGRPC(); err != nil {
return 0, err
}
} else if err := w.open(); err != nil {
return 0, err
}
}
@ -202,7 +220,7 @@ func (w *Writer) Write(p []byte) (n int, err error) {
// Preserve existing functionality that when context is canceled, Write will return
// context.Canceled instead of "io: read/write on closed pipe". This hides the
// pipe implementation detail from users and makes Write seem as though it's an RPC.
if werr == context.Canceled || werr == context.DeadlineExceeded {
if xerrors.Is(werr, context.Canceled) || xerrors.Is(werr, context.DeadlineExceeded) {
return n, werr
}
}
@ -263,3 +281,373 @@ func (w *Writer) CloseWithError(err error) error {
func (w *Writer) Attrs() *ObjectAttrs {
return w.obj
}
func (w *Writer) validateWriteAttrs() error {
attrs := w.ObjectAttrs
// Check the developer didn't change the object Name (this is unfortunate, but
// we don't want to store an object under the wrong name).
if attrs.Name != w.o.object {
return fmt.Errorf("storage: Writer.Name %q does not match object name %q", attrs.Name, w.o.object)
}
if !utf8.ValidString(attrs.Name) {
return fmt.Errorf("storage: object name %q is not valid UTF-8", attrs.Name)
}
if attrs.KMSKeyName != "" && w.o.encryptionKey != nil {
return errors.New("storage: cannot use KMSKeyName with a customer-supplied encryption key")
}
if w.ChunkSize < 0 {
return errors.New("storage: Writer.ChunkSize must be non-negative")
}
return nil
}
// progress is a convenience wrapper that reports write progress to the Writer
// ProgressFunc if it is set and progress is non-zero.
func (w *Writer) progress(p int64) {
if w.ProgressFunc != nil && p != 0 {
w.ProgressFunc(p)
}
}
// error acquires the Writer's lock, sets the Writer's err to the given error,
// then relinquishes the lock.
func (w *Writer) error(err error) {
w.mu.Lock()
w.err = err
w.mu.Unlock()
}
// openGRPC initializes a pipe for the user to write data to, and a routine to
// read from that pipe and upload the data to GCS via gRPC.
//
// This is an experimental API and not intended for public use.
func (w *Writer) openGRPC() error {
if err := w.validateWriteAttrs(); err != nil {
return err
}
pr, pw := io.Pipe()
w.pw = pw
w.opened = true
go w.monitorCancel()
bufSize := w.ChunkSize
if w.ChunkSize == 0 {
// TODO: Should we actually use the minimum of 256 KB here when the user
// indicates they want minimal memory usage? We cannot do a zero-copy,
// bufferless upload like HTTP/JSON can.
// TODO: We need to determine if we can avoid starting a
// resumable upload when the user *plans* to send more than bufSize but
// with a bufferless upload.
bufSize = maxPerMessageWriteSize
}
buf := make([]byte, bufSize)
var offset int64
// This function reads the data sent to the pipe and sends sets of messages
// on the gRPC client-stream as the buffer is filled.
go func() {
defer close(w.donec)
// Loop until there is an error or the Object has been finalized.
for {
// Note: This blocks until either the buffer is full or EOF is read.
recvd, doneReading, err := read(pr, buf)
if err != nil {
err = checkCanceled(err)
w.error(err)
pr.CloseWithError(err)
return
}
toWrite := buf[:recvd]
// TODO: Figure out how to set up encryption via CommonObjectRequestParams.
// The chunk buffer is full, but there is no end in sight. This
// means that a resumable upload will need to be used to send
// multiple chunks, until we are done reading data. Start a
// resumable upload if it has not already been started.
// Otherwise, all data will be sent over a single gRPC stream.
if !doneReading && w.upid == "" {
err = w.startResumableUpload()
if err != nil {
err = checkCanceled(err)
w.error(err)
pr.CloseWithError(err)
return
}
}
o, off, finalized, err := w.uploadBuffer(toWrite, recvd, offset, doneReading)
if err != nil {
err = checkCanceled(err)
w.error(err)
pr.CloseWithError(err)
return
}
// At this point, the current buffer has been uploaded. Capture the
// committed offset here in case the upload was not finalized and
// another chunk is to be uploaded.
offset = off
w.progress(offset)
// When we are done reading data and the chunk has been finalized,
// we are done.
if doneReading && finalized {
// Build Object from server's response.
w.obj = newObjectFromProto(o)
return
}
}
}()
return nil
}
// startResumableUpload initializes a Resumable Upload with gRPC and sets the
// upload ID on the Writer.
//
// This is an experimental API and not intended for public use.
func (w *Writer) startResumableUpload() error {
var common *storagepb.CommonRequestParams
if w.o.userProject != "" {
common = &storagepb.CommonRequestParams{UserProject: w.o.userProject}
}
spec, err := w.writeObjectSpec()
if err != nil {
return err
}
upres, err := w.o.c.gc.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{
WriteObjectSpec: spec,
CommonRequestParams: common,
})
w.upid = upres.GetUploadId()
return err
}
// queryProgress is a helper that queries the status of the resumable upload
// associated with the given upload ID.
//
// This is an experimental API and not intended for public use.
func (w *Writer) queryProgress() (int64, error) {
q, err := w.o.c.gc.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid})
// q.GetCommittedSize() will return 0 if q is nil.
return q.GetCommittedSize(), err
}
// uploadBuffer opens a Write stream and uploads the buffer at the given offset (if
// uploading a chunk for a resumable uploadBuffer), and will mark the write as
// finished if we are done receiving data from the user. The resulting write
// offset after uploading the buffer is returned, as well as a boolean
// indicating if the Object has been finalized. If it has been finalized, the
// final Object will be returned as well. Finalizing the upload is primarily
// important for Resumable Uploads. A simple or multi-part upload will always
// be finalized once the entire buffer has been written.
//
// This is an experimental API and not intended for public use.
func (w *Writer) uploadBuffer(buf []byte, recvd int, start int64, doneReading bool) (*storagepb.Object, int64, bool, error) {
var err error
var finishWrite bool
var sent, limit int = 0, maxPerMessageWriteSize
offset := start
for {
first := sent == 0
// This indicates that this is the last message and the remaining
// data fits in one message.
belowLimit := recvd-sent <= limit
if belowLimit {
limit = recvd - sent
}
if belowLimit && doneReading {
finishWrite = true
}
// Prepare chunk section for upload.
data := buf[sent : sent+limit]
req := &storagepb.WriteObjectRequest{
Data: &storagepb.WriteObjectRequest_ChecksummedData{
ChecksummedData: &storagepb.ChecksummedData{
Content: data,
},
},
WriteOffset: offset,
FinishWrite: finishWrite,
}
// Open a new stream and set the first_message field on the request.
// The first message on the WriteObject stream must either be the
// Object or the Resumable Upload ID.
if first {
w.stream, err = w.o.c.gc.WriteObject(w.ctx)
if err != nil {
return nil, 0, false, err
}
if w.upid != "" {
req.FirstMessage = &storagepb.WriteObjectRequest_UploadId{UploadId: w.upid}
} else {
spec, err := w.writeObjectSpec()
if err != nil {
return nil, 0, false, err
}
req.FirstMessage = &storagepb.WriteObjectRequest_WriteObjectSpec{
WriteObjectSpec: spec,
}
}
// TODO: Currently the checksums are only sent on the first message
// of the stream, but in the future, we must also support sending it
// on the *last* message of the stream (instead of the first).
if w.SendCRC32C {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(w.CRC32C),
Md5Hash: w.MD5,
}
}
}
err = w.stream.Send(req)
if err == io.EOF {
// err was io.EOF. The client-side of a stream only gets an EOF on Send
// when the backend closes the stream and wants to return an error
// status. Closing the stream receives the status as an error.
_, err = w.stream.CloseAndRecv()
// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
sent = 0
finishWrite = false
// TODO: Add test case for failure modes of querying progress.
offset, err = w.determineOffset(start)
if err == nil {
continue
}
}
}
if err != nil {
return nil, 0, false, err
}
// Update the immediate stream's sent total and the upload offset with
// the data sent.
sent += len(data)
offset += int64(len(data))
// Not done sending data, do not attempt to commit it yet, loop around
// and send more data.
if recvd-sent > 0 {
continue
}
// Done sending data. Close the stream to "commit" the data sent.
resp, finalized, err := w.commit()
// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
sent = 0
finishWrite = false
offset, err = w.determineOffset(start)
if err == nil {
continue
}
}
if err != nil {
return nil, 0, false, err
}
return resp.GetResource(), offset, finalized, nil
}
}
// determineOffset either returns the offset given to it in the case of a simple
// upload, or queries the write status in the case a resumable upload is being
// used.
//
// This is an experimental API and not intended for public use.
func (w *Writer) determineOffset(offset int64) (int64, error) {
// For a Resumable Upload, we must start from however much data
// was committed.
if w.upid != "" {
committed, err := w.queryProgress()
if err != nil {
return 0, err
}
offset = committed
}
return offset, nil
}
// commit closes the stream to commit the data sent and potentially receive
// the finalized object if finished uploading. If the last request sent
// indicated that writing was finished, the Object will be finalized and
// returned. If not, then the Object will be nil, and the boolean returned will
// be false.
//
// This is an experimental API and not intended for public use.
func (w *Writer) commit() (*storagepb.WriteObjectResponse, bool, error) {
finalized := true
resp, err := w.stream.CloseAndRecv()
if err == io.EOF {
// Closing a stream for a resumable upload finish_write = false results
// in an EOF which can be ignored, as we aren't done uploading yet.
finalized = false
err = nil
}
// Drop the stream reference as it has been closed.
w.stream = nil
return resp, finalized, err
}
// writeObjectSpec constructs a WriteObjectSpec proto using the Writer's
// ObjectAttrs and applies its Conditions. This is only used for gRPC.
//
// This is an experimental API and not intended for public use.
func (w *Writer) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
spec := &storagepb.WriteObjectSpec{
Resource: w.ObjectAttrs.toProtoObject(w.o.bucket),
}
// WriteObject doesn't support the generation condition, so use -1.
if err := applyCondsProto("WriteObject", -1, w.o.conds, spec); err != nil {
return nil, err
}
return spec, nil
}
// read copies the data in the reader to the given buffer and reports how much
// data was read into the buffer and if there is no more data to read (EOF).
//
// This is an experimental API and not intended for public use.
func read(r io.Reader, buf []byte) (int, bool, error) {
// Set n to -1 to start the Read loop.
var n, recvd int = -1, 0
var err error
for err == nil && n != 0 {
// The routine blocks here until data is received.
n, err = r.Read(buf[recvd:])
recvd += n
}
var done bool
if err == io.EOF {
done = true
err = nil
}
return recvd, done, err
}
func checkCanceled(err error) error {
if status.Code(err) == codes.Canceled {
return context.Canceled
}
return err
}