go.mod: vendor aws-sdk-v2

Removes the v1 aws sdk.
This commit is contained in:
Sanne Raymaekers 2024-08-06 15:23:28 +02:00
parent f27f9a2f80
commit 5e3bc8a705
1523 changed files with 628224 additions and 358932 deletions

View file

@ -0,0 +1,659 @@
# v1.16.9 (2024-03-07)
* **Bug Fix**: Remove dependency on go-cmp.
* **Dependency Update**: Updated to the latest SDK module versions
# v1.16.8 (2024-03-05)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.16.7 (2024-03-04)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.16.6 (2024-02-23)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.16.5 (2024-02-22)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.16.4 (2024-02-21)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.16.3 (2024-02-20)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.16.2 (2024-02-19)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.16.1 (2024-02-16)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.16.0 (2024-02-13)
* **Feature**: Bump minimum Go version to 1.20 per our language support policy.
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.15 (2024-01-24)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.14 (2024-01-22)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.13 (2024-01-18)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.12 (2024-01-16)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.11 (2024-01-05)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.10 (2024-01-04)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.9 (2023-12-20)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.8 (2023-12-18)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.7 (2023-12-08)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.6 (2023-12-07)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.5 (2023-12-06)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.4 (2023-12-01)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.3 (2023-11-30)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.2 (2023-11-29)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.1 (2023-11-28.3)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.15.0 (2023-11-28.2)
* **Feature**: Add S3Express support.
* **Dependency Update**: Updated to the latest SDK module versions
# v1.14.4 (2023-11-28)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.14.3 (2023-11-27)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.14.2 (2023-11-21)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.14.1 (2023-11-20)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.14.0 (2023-11-17)
* **Feature**: **BREAKING CHANGE** Correct nullability of a large number of S3 structure fields. See https://github.com/aws/aws-sdk-go-v2/issues/2162.
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.9 (2023-11-16)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.8 (2023-11-15)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.7 (2023-11-14)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.6 (2023-11-13)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.5 (2023-11-09.2)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.4 (2023-11-09)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.3 (2023-11-07)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.2 (2023-11-06)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.1 (2023-11-02)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.13.0 (2023-11-01)
* **Feature**: Adds support for configured endpoints via environment variables and the AWS shared configuration file.
* **Dependency Update**: Updated to the latest SDK module versions
# v1.12.0 (2023-10-31)
* **Feature**: **BREAKING CHANGE**: Bump minimum go version to 1.19 per the revised [go version support policy](https://aws.amazon.com/blogs/developer/aws-sdk-for-go-aligns-with-go-release-policy-on-supported-runtimes/).
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.92 (2023-10-24)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.91 (2023-10-16)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.90 (2023-10-12)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.89 (2023-10-06)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.88 (2023-10-02)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.87 (2023-09-26)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.86 (2023-09-22)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.85 (2023-09-20)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.84 (2023-09-18)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.83 (2023-09-05)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.82 (2023-08-31)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.81 (2023-08-23)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.80 (2023-08-21)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.79 (2023-08-18)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.78 (2023-08-17)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.77 (2023-08-07)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.76 (2023-08-01)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.75 (2023-07-31)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.74 (2023-07-28)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.73 (2023-07-25)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.72 (2023-07-13)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.71 (2023-06-28)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.70 (2023-06-16)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.69 (2023-06-15)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.68 (2023-06-13)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.67 (2023-05-09)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.66 (2023-05-08)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.65 (2023-05-04)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.64 (2023-04-24)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.63 (2023-04-19)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.62 (2023-04-10)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.61 (2023-04-07)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.60 (2023-03-31)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.59 (2023-03-21)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.58 (2023-03-16)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.57 (2023-03-14)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.56 (2023-03-10)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.55 (2023-02-22)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.54 (2023-02-20)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.53 (2023-02-15)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.52 (2023-02-14)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.51 (2023-02-03)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.50 (2023-02-01)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.49 (2023-01-25)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.48 (2023-01-23)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.47 (2023-01-05)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.46 (2022-12-20)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.45 (2022-12-19)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.44 (2022-12-15)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.43 (2022-12-02)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.42 (2022-11-22)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.41 (2022-11-17)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.40 (2022-11-16)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.39 (2022-11-11)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.38 (2022-11-10)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.37 (2022-10-24)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.36 (2022-10-21)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.35 (2022-10-19)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.34 (2022-09-30)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.33 (2022-09-20)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.32 (2022-09-14)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.31 (2022-09-02)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.30 (2022-08-31)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.29 (2022-08-30)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.28 (2022-08-29)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.27 (2022-08-15)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.26 (2022-08-14)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.25 (2022-08-11)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.24 (2022-08-10)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.23 (2022-08-09)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.22 (2022-08-08)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.21 (2022-08-01)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.20 (2022-07-11)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.19 (2022-07-05)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.18 (2022-07-01)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.17 (2022-06-29)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.16 (2022-06-16)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.15 (2022-06-07)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.14 (2022-05-26)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.13 (2022-05-25)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.12 (2022-05-17)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.11 (2022-05-16)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.10 (2022-05-09)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.9 (2022-05-06)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.8 (2022-05-03)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.7 (2022-04-27)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.6 (2022-04-25)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.5 (2022-04-12)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.4 (2022-04-07)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.3 (2022-03-30)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.2 (2022-03-24)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.1 (2022-03-23)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.11.0 (2022-03-08)
* **Feature**: Updated `github.com/aws/smithy-go` to latest version
* **Dependency Update**: Updated to the latest SDK module versions
# v1.10.0 (2022-02-24)
* **Feature**: Updated `github.com/aws/smithy-go` to latest version
* **Dependency Update**: Updated to the latest SDK module versions
# v1.9.1 (2022-01-28)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.9.0 (2022-01-14)
* **Feature**: Updated `github.com/aws/smithy-go` to latest version
* **Dependency Update**: Updated to the latest SDK module versions
# v1.8.0 (2022-01-07)
* **Feature**: Updated `github.com/aws/smithy-go` to latest version
* **Dependency Update**: Updated to the latest SDK module versions
# v1.7.5 (2021-12-21)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.7.4 (2021-12-02)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.7.3 (2021-11-30)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.7.2 (2021-11-19)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.7.1 (2021-11-12)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.7.0 (2021-11-06)
* **Feature**: The SDK now supports configuration of FIPS and DualStack endpoints using environment variables, shared configuration, or programmatically.
* **Feature**: Updated `github.com/aws/smithy-go` to latest version
* **Dependency Update**: Updated to the latest SDK module versions
# v1.6.0 (2021-10-21)
* **Feature**: Updated to latest version
* **Dependency Update**: Updated to the latest SDK module versions
# v1.5.4 (2021-10-11)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.5.3 (2021-09-17)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.5.2 (2021-09-10)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.5.1 (2021-09-02)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.5.0 (2021-08-27)
* **Feature**: Updated `github.com/aws/smithy-go` to latest version
* **Dependency Update**: Updated to the latest SDK module versions
# v1.4.1 (2021-08-19)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.4.0 (2021-08-04)
* **Feature**: adds error handling for defered close calls
* **Dependency Update**: Updated `github.com/aws/smithy-go` to latest version.
* **Dependency Update**: Updated to the latest SDK module versions
# v1.3.2 (2021-07-15)
* **Dependency Update**: Updated `github.com/aws/smithy-go` to latest version
* **Dependency Update**: Updated to the latest SDK module versions
# v1.3.1 (2021-07-01)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.3.0 (2021-06-25)
* **Feature**: Updated `github.com/aws/smithy-go` to latest version
* **Dependency Update**: Updated to the latest SDK module versions
# v1.2.3 (2021-06-04)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.2.2 (2021-05-25)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.2.1 (2021-05-20)
* **Dependency Update**: Updated to the latest SDK module versions
# v1.2.0 (2021-05-14)
* **Feature**: Constant has been added to modules to enable runtime version inspection for reporting.
* **Dependency Update**: Updated to the latest SDK module versions

View file

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View file

@ -0,0 +1,37 @@
package manager
import (
"context"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// DeleteObjectsAPIClient is an S3 API client that can invoke the DeleteObjects operation.
type DeleteObjectsAPIClient interface {
DeleteObjects(context.Context, *s3.DeleteObjectsInput, ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error)
}
// DownloadAPIClient is an S3 API client that can invoke the GetObject operation.
type DownloadAPIClient interface {
GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
}
// HeadBucketAPIClient is an S3 API client that can invoke the HeadBucket operation.
type HeadBucketAPIClient interface {
HeadBucket(context.Context, *s3.HeadBucketInput, ...func(*s3.Options)) (*s3.HeadBucketOutput, error)
}
// ListObjectsV2APIClient is an S3 API client that can invoke the ListObjectV2 operation.
type ListObjectsV2APIClient interface {
ListObjectsV2(context.Context, *s3.ListObjectsV2Input, ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)
}
// UploadAPIClient is an S3 API client that can invoke PutObject, UploadPart, CreateMultipartUpload,
// CompleteMultipartUpload, and AbortMultipartUpload operations.
type UploadAPIClient interface {
PutObject(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error)
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
}

View file

@ -0,0 +1,23 @@
package manager
import (
"fmt"
"github.com/aws/aws-sdk-go-v2/aws/arn"
)
func validateSupportedARNType(bucket string) error {
if !arn.IsARN(bucket) {
return nil
}
parsedARN, err := arn.Parse(bucket)
if err != nil {
return err
}
if parsedARN.Service == "s3-object-lambda" {
return fmt.Errorf("manager does not support s3-object-lambda service ARNs")
}
return nil
}

View file

@ -0,0 +1,138 @@
package manager
import (
"context"
"errors"
"fmt"
"net/http"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go/middleware"
smithyhttp "github.com/aws/smithy-go/transport/http"
)
const bucketRegionHeader = "X-Amz-Bucket-Region"
// GetBucketRegion will attempt to get the region for a bucket using the
// client's configured region to determine which AWS partition to perform the query on.
//
// The request will not be signed, and will not use your AWS credentials.
//
// A BucketNotFound error will be returned if the bucket does not exist in the
// AWS partition the client region belongs to.
//
// For example to get the region of a bucket which exists in "eu-central-1"
// you could provide a region hint of "us-west-2".
//
// cfg, err := config.LoadDefaultConfig(context.TODO())
// if err != nil {
// log.Println("error:", err)
// return
// }
//
// bucket := "my-bucket"
// region, err := manager.GetBucketRegion(ctx, s3.NewFromConfig(cfg), bucket)
// if err != nil {
// var bnf manager.BucketNotFound
// if errors.As(err, &bnf) {
// fmt.Fprintf(os.Stderr, "unable to find bucket %s's region\n", bucket)
// }
// return
// }
// fmt.Printf("Bucket %s is in %s region\n", bucket, region)
//
// By default the request will be made to the Amazon S3 endpoint using the virtual-hosted-style addressing.
//
// bucketname.s3.us-west-2.amazonaws.com/
//
// To configure the GetBucketRegion to make a request via the Amazon
// S3 FIPS endpoints directly when a FIPS region name is not available, (e.g.
// fips-us-gov-west-1) set the EndpointResolver on the config or client the
// utility is called with.
//
// cfg, err := config.LoadDefaultConfig(context.TODO(),
// config.WithEndpointResolver(
// aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
// return aws.Endpoint{URL: "https://s3-fips.us-west-2.amazonaws.com"}, nil
// }),
// )
// if err != nil {
// panic(err)
// }
func GetBucketRegion(ctx context.Context, client HeadBucketAPIClient, bucket string, optFns ...func(*s3.Options)) (string, error) {
var captureBucketRegion deserializeBucketRegion
clientOptionFns := make([]func(*s3.Options), len(optFns)+1)
clientOptionFns[0] = func(options *s3.Options) {
options.APIOptions = append(options.APIOptions, captureBucketRegion.RegisterMiddleware)
}
copy(clientOptionFns[1:], optFns)
_, err := client.HeadBucket(ctx, &s3.HeadBucketInput{
Bucket: aws.String(bucket),
}, clientOptionFns...)
if len(captureBucketRegion.BucketRegion) == 0 && err != nil {
var httpStatusErr interface {
HTTPStatusCode() int
}
if !errors.As(err, &httpStatusErr) {
return "", err
}
if httpStatusErr.HTTPStatusCode() == http.StatusNotFound {
return "", &bucketNotFound{}
}
return "", err
}
return captureBucketRegion.BucketRegion, nil
}
type deserializeBucketRegion struct {
BucketRegion string
}
func (d *deserializeBucketRegion) RegisterMiddleware(stack *middleware.Stack) error {
return stack.Deserialize.Add(d, middleware.After)
}
func (d *deserializeBucketRegion) ID() string {
return "DeserializeBucketRegion"
}
func (d *deserializeBucketRegion) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) (
out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
) {
out, metadata, err = next.HandleDeserialize(ctx, in)
if err != nil {
return out, metadata, err
}
resp, ok := out.RawResponse.(*smithyhttp.Response)
if !ok {
return out, metadata, fmt.Errorf("unknown transport type %T", out.RawResponse)
}
d.BucketRegion = resp.Header.Get(bucketRegionHeader)
return out, metadata, err
}
// BucketNotFound indicates the bucket was not found in the partition when calling GetBucketRegion.
type BucketNotFound interface {
error
isBucketNotFound()
}
type bucketNotFound struct{}
func (b *bucketNotFound) Error() string {
return "bucket not found"
}
func (b *bucketNotFound) isBucketNotFound() {}
var _ BucketNotFound = (*bucketNotFound)(nil)

View file

@ -0,0 +1,79 @@
package manager
import (
"io"
)
// BufferedReadSeeker is buffered io.ReadSeeker
type BufferedReadSeeker struct {
r io.ReadSeeker
buffer []byte
readIdx, writeIdx int
}
// NewBufferedReadSeeker returns a new BufferedReadSeeker
// if len(b) == 0 then the buffer will be initialized to 64 KiB.
func NewBufferedReadSeeker(r io.ReadSeeker, b []byte) *BufferedReadSeeker {
if len(b) == 0 {
b = make([]byte, 64*1024)
}
return &BufferedReadSeeker{r: r, buffer: b}
}
func (b *BufferedReadSeeker) reset(r io.ReadSeeker) {
b.r = r
b.readIdx, b.writeIdx = 0, 0
}
// Read will read up len(p) bytes into p and will return
// the number of bytes read and any error that occurred.
// If the len(p) > the buffer size then a single read request
// will be issued to the underlying io.ReadSeeker for len(p) bytes.
// A Read request will at most perform a single Read to the underlying
// io.ReadSeeker, and may return < len(p) if serviced from the buffer.
func (b *BufferedReadSeeker) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return n, err
}
if b.readIdx == b.writeIdx {
if len(p) >= len(b.buffer) {
n, err = b.r.Read(p)
return n, err
}
b.readIdx, b.writeIdx = 0, 0
n, err = b.r.Read(b.buffer)
if n == 0 {
return n, err
}
b.writeIdx += n
}
n = copy(p, b.buffer[b.readIdx:b.writeIdx])
b.readIdx += n
return n, err
}
// Seek will position then underlying io.ReadSeeker to the given offset
// and will clear the buffer.
func (b *BufferedReadSeeker) Seek(offset int64, whence int) (int64, error) {
n, err := b.r.Seek(offset, whence)
b.reset(b.r)
return n, err
}
// ReadAt will read up to len(p) bytes at the given file offset.
// This will result in the buffer being cleared.
func (b *BufferedReadSeeker) ReadAt(p []byte, off int64) (int, error) {
_, err := b.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
return b.Read(p)
}

View file

@ -0,0 +1,8 @@
//go:build !windows
// +build !windows
package manager
func defaultUploadBufferProvider() ReadSeekerWriteToProvider {
return nil
}

View file

@ -0,0 +1,5 @@
package manager
func defaultUploadBufferProvider() ReadSeekerWriteToProvider {
return NewBufferedReadSeekerWriteToPool(1024 * 1024)
}

View file

@ -0,0 +1,8 @@
//go:build !windows
// +build !windows
package manager
func defaultDownloadBufferProvider() WriterReadFromProvider {
return nil
}

View file

@ -0,0 +1,5 @@
package manager
func defaultDownloadBufferProvider() WriterReadFromProvider {
return NewPooledBufferedWriterReadFromProvider(1024 * 1024)
}

View file

@ -0,0 +1,3 @@
// Package manager provides utilities to upload and download objects from
// S3 concurrently. Helpful for when working with large objects.
package manager

View file

@ -0,0 +1,525 @@
package manager
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/middleware"
"github.com/aws/aws-sdk-go-v2/internal/awsutil"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go/logging"
)
const userAgentKey = "s3-transfer"
// DefaultDownloadPartSize is the default range of bytes to get at a time when
// using Download().
const DefaultDownloadPartSize = 1024 * 1024 * 5
// DefaultDownloadConcurrency is the default number of goroutines to spin up
// when using Download().
const DefaultDownloadConcurrency = 5
// DefaultPartBodyMaxRetries is the default number of retries to make when a part fails to download.
const DefaultPartBodyMaxRetries = 3
type errReadingBody struct {
err error
}
func (e *errReadingBody) Error() string {
return fmt.Sprintf("failed to read part body: %v", e.err)
}
func (e *errReadingBody) Unwrap() error {
return e.err
}
// The Downloader structure that calls Download(). It is safe to call Download()
// on this structure for multiple objects and across concurrent goroutines.
// Mutating the Downloader's properties is not safe to be done concurrently.
type Downloader struct {
// The size (in bytes) to request from S3 for each part.
// The minimum allowed part size is 5MB, and if this value is set to zero,
// the DefaultDownloadPartSize value will be used.
//
// PartSize is ignored if the Range input parameter is provided.
PartSize int64
// PartBodyMaxRetries is the number of retry attempts to make for failed part downloads.
PartBodyMaxRetries int
// Logger to send logging messages to
Logger logging.Logger
// Enable Logging of part download retry attempts
LogInterruptedDownloads bool
// The number of goroutines to spin up in parallel when sending parts.
// If this is set to zero, the DefaultDownloadConcurrency value will be used.
//
// Concurrency of 1 will download the parts sequentially.
//
// Concurrency is ignored if the Range input parameter is provided.
Concurrency int
// An S3 client to use when performing downloads.
S3 DownloadAPIClient
// List of client options that will be passed down to individual API
// operation requests made by the downloader.
ClientOptions []func(*s3.Options)
// Defines the buffer strategy used when downloading a part.
//
// If a WriterReadFromProvider is given the Download manager
// will pass the io.WriterAt of the Download request to the provider
// and will use the returned WriterReadFrom from the provider as the
// destination writer when copying from http response body.
BufferProvider WriterReadFromProvider
}
// WithDownloaderClientOptions appends to the Downloader's API request options.
func WithDownloaderClientOptions(opts ...func(*s3.Options)) func(*Downloader) {
return func(d *Downloader) {
d.ClientOptions = append(d.ClientOptions, opts...)
}
}
// NewDownloader creates a new Downloader instance to downloads objects from
// S3 in concurrent chunks. Pass in additional functional options to customize
// the downloader behavior. Requires a client.ConfigProvider in order to create
// a S3 service client. The session.Session satisfies the client.ConfigProvider
// interface.
//
// Example:
//
// // Load AWS Config
// cfg, err := config.LoadDefaultConfig(context.TODO())
// if err != nil {
// panic(err)
// }
//
// // Create an S3 client using the loaded configuration
// s3.NewFromConfig(cfg)
//
// // Create a downloader passing it the S3 client
// downloader := manager.NewDownloader(s3.NewFromConfig(cfg))
//
// // Create a downloader with the client and custom downloader options
// downloader := manager.NewDownloader(client, func(d *manager.Downloader) {
// d.PartSize = 64 * 1024 * 1024 // 64MB per part
// })
func NewDownloader(c DownloadAPIClient, options ...func(*Downloader)) *Downloader {
d := &Downloader{
S3: c,
PartSize: DefaultDownloadPartSize,
PartBodyMaxRetries: DefaultPartBodyMaxRetries,
Concurrency: DefaultDownloadConcurrency,
BufferProvider: defaultDownloadBufferProvider(),
}
for _, option := range options {
option(d)
}
return d
}
// Download downloads an object in S3 and writes the payload into w
// using concurrent GET requests. The n int64 returned is the size of the object downloaded
// in bytes.
//
// DownloadWithContext is the same as Download with the additional support for
// Context input parameters. The Context must not be nil. A nil Context will
// cause a panic. Use the Context to add deadlining, timeouts, etc. The
// DownloadWithContext may create sub-contexts for individual underlying
// requests.
//
// Additional functional options can be provided to configure the individual
// download. These options are copies of the Downloader instance Download is
// called from. Modifying the options will not impact the original Downloader
// instance. Use the WithDownloaderClientOptions helper function to pass in request
// options that will be applied to all API operations made with this downloader.
//
// The w io.WriterAt can be satisfied by an os.File to do multipart concurrent
// downloads, or in memory []byte wrapper using aws.WriteAtBuffer. In case you download
// files into memory do not forget to pre-allocate memory to avoid additional allocations
// and GC runs.
//
// Example:
//
// // pre-allocate in memory buffer, where headObject type is *s3.HeadObjectOutput
// buf := make([]byte, int(headObject.ContentLength))
// // wrap with aws.WriteAtBuffer
// w := s3manager.NewWriteAtBuffer(buf)
// // download file into the memory
// numBytesDownloaded, err := downloader.Download(ctx, w, &s3.GetObjectInput{
// Bucket: aws.String(bucket),
// Key: aws.String(item),
// })
//
// Specifying a Downloader.Concurrency of 1 will cause the Downloader to
// download the parts from S3 sequentially.
//
// It is safe to call this method concurrently across goroutines.
//
// If the GetObjectInput's Range value is provided that will cause the downloader
// to perform a single GetObjectInput request for that object's range. This will
// caused the part size, and concurrency configurations to be ignored.
func (d Downloader) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) {
if err := validateSupportedARNType(aws.ToString(input.Bucket)); err != nil {
return 0, err
}
impl := downloader{w: w, in: input, cfg: d, ctx: ctx}
// Copy ClientOptions
clientOptions := make([]func(*s3.Options), 0, len(impl.cfg.ClientOptions)+1)
clientOptions = append(clientOptions, func(o *s3.Options) {
o.APIOptions = append(o.APIOptions, middleware.AddSDKAgentKey(middleware.FeatureMetadata, userAgentKey))
})
clientOptions = append(clientOptions, impl.cfg.ClientOptions...)
impl.cfg.ClientOptions = clientOptions
for _, option := range options {
option(&impl.cfg)
}
// Ensures we don't need nil checks later on
impl.cfg.Logger = logging.WithContext(ctx, impl.cfg.Logger)
impl.partBodyMaxRetries = d.PartBodyMaxRetries
impl.totalBytes = -1
if impl.cfg.Concurrency == 0 {
impl.cfg.Concurrency = DefaultDownloadConcurrency
}
if impl.cfg.PartSize == 0 {
impl.cfg.PartSize = DefaultDownloadPartSize
}
return impl.download()
}
// downloader is the implementation structure used internally by Downloader.
type downloader struct {
ctx context.Context
cfg Downloader
in *s3.GetObjectInput
w io.WriterAt
wg sync.WaitGroup
m sync.Mutex
pos int64
totalBytes int64
written int64
err error
partBodyMaxRetries int
}
// download performs the implementation of the object download across ranged
// GETs.
func (d *downloader) download() (n int64, err error) {
// If range is specified fall back to single download of that range
// this enables the functionality of ranged gets with the downloader but
// at the cost of no multipart downloads.
if rng := aws.ToString(d.in.Range); len(rng) > 0 {
d.downloadRange(rng)
return d.written, d.err
}
// Spin off first worker to check additional header information
d.getChunk()
if total := d.getTotalBytes(); total >= 0 {
// Spin up workers
ch := make(chan dlchunk, d.cfg.Concurrency)
for i := 0; i < d.cfg.Concurrency; i++ {
d.wg.Add(1)
go d.downloadPart(ch)
}
// Assign work
for d.getErr() == nil {
if d.pos >= total {
break // We're finished queuing chunks
}
// Queue the next range of bytes to read.
ch <- dlchunk{w: d.w, start: d.pos, size: d.cfg.PartSize}
d.pos += d.cfg.PartSize
}
// Wait for completion
close(ch)
d.wg.Wait()
} else {
// Checking if we read anything new
for d.err == nil {
d.getChunk()
}
// We expect a 416 error letting us know we are done downloading the
// total bytes. Since we do not know the content's length, this will
// keep grabbing chunks of data until the range of bytes specified in
// the request is out of range of the content. Once, this happens, a
// 416 should occur.
var responseError interface {
HTTPStatusCode() int
}
if errors.As(d.err, &responseError) {
if responseError.HTTPStatusCode() == http.StatusRequestedRangeNotSatisfiable {
d.err = nil
}
}
}
// Return error
return d.written, d.err
}
// downloadPart is an individual goroutine worker reading from the ch channel
// and performing a GetObject request on the data with a given byte range.
//
// If this is the first worker, this operation also resolves the total number
// of bytes to be read so that the worker manager knows when it is finished.
func (d *downloader) downloadPart(ch chan dlchunk) {
defer d.wg.Done()
for {
chunk, ok := <-ch
if !ok {
break
}
if d.getErr() != nil {
// Drain the channel if there is an error, to prevent deadlocking
// of download producer.
continue
}
if err := d.downloadChunk(chunk); err != nil {
d.setErr(err)
}
}
}
// getChunk grabs a chunk of data from the body.
// Not thread safe. Should only used when grabbing data on a single thread.
func (d *downloader) getChunk() {
if d.getErr() != nil {
return
}
chunk := dlchunk{w: d.w, start: d.pos, size: d.cfg.PartSize}
d.pos += d.cfg.PartSize
if err := d.downloadChunk(chunk); err != nil {
d.setErr(err)
}
}
// downloadRange downloads an Object given the passed in Byte-Range value.
// The chunk used down download the range will be configured for that range.
func (d *downloader) downloadRange(rng string) {
if d.getErr() != nil {
return
}
chunk := dlchunk{w: d.w, start: d.pos}
// Ranges specified will short circuit the multipart download
chunk.withRange = rng
if err := d.downloadChunk(chunk); err != nil {
d.setErr(err)
}
// Update the position based on the amount of data received.
d.pos = d.written
}
// downloadChunk downloads the chunk from s3
func (d *downloader) downloadChunk(chunk dlchunk) error {
var params s3.GetObjectInput
awsutil.Copy(&params, d.in)
// Get the next byte range of data
params.Range = aws.String(chunk.ByteRange())
var n int64
var err error
for retry := 0; retry <= d.partBodyMaxRetries; retry++ {
n, err = d.tryDownloadChunk(&params, &chunk)
if err == nil {
break
}
// Check if the returned error is an errReadingBody.
// If err is errReadingBody this indicates that an error
// occurred while copying the http response body.
// If this occurs we unwrap the err to set the underlying error
// and attempt any remaining retries.
if bodyErr, ok := err.(*errReadingBody); ok {
err = bodyErr.Unwrap()
} else {
return err
}
chunk.cur = 0
d.cfg.Logger.Logf(logging.Debug,
"object part body download interrupted %s, err, %v, retrying attempt %d",
aws.ToString(params.Key), err, retry)
}
d.incrWritten(n)
return err
}
func (d *downloader) tryDownloadChunk(params *s3.GetObjectInput, w io.Writer) (int64, error) {
cleanup := func() {}
if d.cfg.BufferProvider != nil {
w, cleanup = d.cfg.BufferProvider.GetReadFrom(w)
}
defer cleanup()
resp, err := d.cfg.S3.GetObject(d.ctx, params, d.cfg.ClientOptions...)
if err != nil {
return 0, err
}
d.setTotalBytes(resp) // Set total if not yet set.
var src io.Reader = resp.Body
if d.cfg.BufferProvider != nil {
src = &suppressWriterAt{suppressed: src}
}
n, err := io.Copy(w, src)
resp.Body.Close()
if err != nil {
return n, &errReadingBody{err: err}
}
return n, nil
}
// getTotalBytes is a thread-safe getter for retrieving the total byte status.
func (d *downloader) getTotalBytes() int64 {
d.m.Lock()
defer d.m.Unlock()
return d.totalBytes
}
// setTotalBytes is a thread-safe setter for setting the total byte status.
// Will extract the object's total bytes from the Content-Range if the file
// will be chunked, or Content-Length. Content-Length is used when the response
// does not include a Content-Range. Meaning the object was not chunked. This
// occurs when the full file fits within the PartSize directive.
func (d *downloader) setTotalBytes(resp *s3.GetObjectOutput) {
d.m.Lock()
defer d.m.Unlock()
if d.totalBytes >= 0 {
return
}
if resp.ContentRange == nil {
// ContentRange is nil when the full file contents is provided, and
// is not chunked. Use ContentLength instead.
if aws.ToInt64(resp.ContentLength) > 0 {
d.totalBytes = aws.ToInt64(resp.ContentLength)
return
}
} else {
parts := strings.Split(*resp.ContentRange, "/")
total := int64(-1)
var err error
// Checking for whether or not a numbered total exists
// If one does not exist, we will assume the total to be -1, undefined,
// and sequentially download each chunk until hitting a 416 error
totalStr := parts[len(parts)-1]
if totalStr != "*" {
total, err = strconv.ParseInt(totalStr, 10, 64)
if err != nil {
d.err = err
return
}
}
d.totalBytes = total
}
}
func (d *downloader) incrWritten(n int64) {
d.m.Lock()
defer d.m.Unlock()
d.written += n
}
// getErr is a thread-safe getter for the error object
func (d *downloader) getErr() error {
d.m.Lock()
defer d.m.Unlock()
return d.err
}
// setErr is a thread-safe setter for the error object
func (d *downloader) setErr(e error) {
d.m.Lock()
defer d.m.Unlock()
d.err = e
}
// dlchunk represents a single chunk of data to write by the worker routine.
// This structure also implements an io.SectionReader style interface for
// io.WriterAt, effectively making it an io.SectionWriter (which does not
// exist).
type dlchunk struct {
w io.WriterAt
start int64
size int64
cur int64
// specifies the byte range the chunk should be downloaded with.
withRange string
}
// Write wraps io.WriterAt for the dlchunk, writing from the dlchunk's start
// position to its end (or EOF).
//
// If a range is specified on the dlchunk the size will be ignored when writing.
// as the total size may not of be known ahead of time.
func (c *dlchunk) Write(p []byte) (n int, err error) {
if c.cur >= c.size && len(c.withRange) == 0 {
return 0, io.EOF
}
n, err = c.w.WriteAt(p, c.start+c.cur)
c.cur += int64(n)
return
}
// ByteRange returns a HTTP Byte-Range header value that should be used by the
// client to request the chunk's range.
func (c *dlchunk) ByteRange() string {
if len(c.withRange) != 0 {
return c.withRange
}
return fmt.Sprintf("bytes=%d-%d", c.start, c.start+c.size-1)
}

View file

@ -0,0 +1,6 @@
// Code generated by internal/repotools/cmd/updatemodulemeta DO NOT EDIT.
package manager
// goModuleVersion is the tagged release for this module
const goModuleVersion = "1.16.9"

View file

@ -0,0 +1,251 @@
package manager
import (
"context"
"fmt"
"sync"
)
type byteSlicePool interface {
Get(context.Context) (*[]byte, error)
Put(*[]byte)
ModifyCapacity(int)
SliceSize() int64
Close()
}
type maxSlicePool struct {
// allocator is defined as a function pointer to allow
// for test cases to instrument custom tracers when allocations
// occur.
allocator sliceAllocator
slices chan *[]byte
allocations chan struct{}
capacityChange chan struct{}
max int
sliceSize int64
mtx sync.RWMutex
}
func newMaxSlicePool(sliceSize int64) *maxSlicePool {
p := &maxSlicePool{sliceSize: sliceSize}
p.allocator = p.newSlice
return p
}
var errZeroCapacity = fmt.Errorf("get called on zero capacity pool")
func (p *maxSlicePool) Get(ctx context.Context) (*[]byte, error) {
// check if context is canceled before attempting to get a slice
// this ensures priority is given to the cancel case first
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
p.mtx.RLock()
for {
select {
case bs, ok := <-p.slices:
p.mtx.RUnlock()
if !ok {
// attempt to get on a zero capacity pool
return nil, errZeroCapacity
}
return bs, nil
case <-ctx.Done():
p.mtx.RUnlock()
return nil, ctx.Err()
default:
// pass
}
select {
case _, ok := <-p.allocations:
p.mtx.RUnlock()
if !ok {
// attempt to get on a zero capacity pool
return nil, errZeroCapacity
}
return p.allocator(), nil
case <-ctx.Done():
p.mtx.RUnlock()
return nil, ctx.Err()
default:
// In the event that there are no slices or allocations available
// This prevents some deadlock situations that can occur around sync.RWMutex
// When a lock request occurs on ModifyCapacity, no new readers are allowed to acquire a read lock.
// By releasing the read lock here and waiting for a notification, we prevent a deadlock situation where
// Get could hold the read lock indefinitely waiting for capacity, ModifyCapacity is waiting for a write lock,
// and a Put is blocked trying to get a read-lock which is blocked by ModifyCapacity.
// Short-circuit if the pool capacity is zero.
if p.max == 0 {
p.mtx.RUnlock()
return nil, errZeroCapacity
}
// Since we will be releasing the read-lock we need to take the reference to the channel.
// Since channels are references we will still get notified if slices are added, or if
// the channel is closed due to a capacity modification. This specifically avoids a data race condition
// where ModifyCapacity both closes a channel and initializes a new one while we don't have a read-lock.
c := p.capacityChange
p.mtx.RUnlock()
select {
case _ = <-c:
p.mtx.RLock()
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
}
func (p *maxSlicePool) Put(bs *[]byte) {
p.mtx.RLock()
defer p.mtx.RUnlock()
if p.max == 0 {
return
}
select {
case p.slices <- bs:
p.notifyCapacity()
default:
// If the new channel when attempting to add the slice then we drop the slice.
// The logic here is to prevent a deadlock situation if channel is already at max capacity.
// Allows us to reap allocations that are returned and are no longer needed.
}
}
func (p *maxSlicePool) ModifyCapacity(delta int) {
if delta == 0 {
return
}
p.mtx.Lock()
defer p.mtx.Unlock()
p.max += delta
if p.max == 0 {
p.empty()
return
}
if p.capacityChange != nil {
close(p.capacityChange)
}
p.capacityChange = make(chan struct{}, p.max)
origAllocations := p.allocations
p.allocations = make(chan struct{}, p.max)
newAllocs := len(origAllocations) + delta
for i := 0; i < newAllocs; i++ {
p.allocations <- struct{}{}
}
if origAllocations != nil {
close(origAllocations)
}
origSlices := p.slices
p.slices = make(chan *[]byte, p.max)
if origSlices == nil {
return
}
close(origSlices)
for bs := range origSlices {
select {
case p.slices <- bs:
default:
// If the new channel blocks while adding slices from the old channel
// then we drop the slice. The logic here is to prevent a deadlock situation
// if the new channel has a smaller capacity then the old.
}
}
}
func (p *maxSlicePool) notifyCapacity() {
select {
case p.capacityChange <- struct{}{}:
default:
// This *shouldn't* happen as the channel is both buffered to the max pool capacity size and is resized
// on capacity modifications. This is just a safety to ensure that a blocking situation can't occur.
}
}
func (p *maxSlicePool) SliceSize() int64 {
return p.sliceSize
}
func (p *maxSlicePool) Close() {
p.mtx.Lock()
defer p.mtx.Unlock()
p.empty()
}
func (p *maxSlicePool) empty() {
p.max = 0
if p.capacityChange != nil {
close(p.capacityChange)
p.capacityChange = nil
}
if p.allocations != nil {
close(p.allocations)
for range p.allocations {
// drain channel
}
p.allocations = nil
}
if p.slices != nil {
close(p.slices)
for range p.slices {
// drain channel
}
p.slices = nil
}
}
func (p *maxSlicePool) newSlice() *[]byte {
bs := make([]byte, p.sliceSize)
return &bs
}
type returnCapacityPoolCloser struct {
byteSlicePool
returnCapacity int
}
func (n *returnCapacityPoolCloser) ModifyCapacity(delta int) {
if delta > 0 {
n.returnCapacity = -1 * delta
}
n.byteSlicePool.ModifyCapacity(delta)
}
func (n *returnCapacityPoolCloser) Close() {
if n.returnCapacity < 0 {
n.byteSlicePool.ModifyCapacity(n.returnCapacity)
}
}
type sliceAllocator func() *[]byte
var newByteSlicePool = func(sliceSize int64) byteSlicePool {
return newMaxSlicePool(sliceSize)
}

View file

@ -0,0 +1,65 @@
package manager
import (
"io"
"sync"
)
// ReadSeekerWriteTo defines an interface implementing io.WriteTo and io.ReadSeeker
type ReadSeekerWriteTo interface {
io.ReadSeeker
io.WriterTo
}
// BufferedReadSeekerWriteTo wraps a BufferedReadSeeker with an io.WriteAt
// implementation.
type BufferedReadSeekerWriteTo struct {
*BufferedReadSeeker
}
// WriteTo writes to the given io.Writer from BufferedReadSeeker until there's no more data to write or
// an error occurs. Returns the number of bytes written and any error encountered during the write.
func (b *BufferedReadSeekerWriteTo) WriteTo(writer io.Writer) (int64, error) {
return io.Copy(writer, b.BufferedReadSeeker)
}
// ReadSeekerWriteToProvider provides an implementation of io.WriteTo for an io.ReadSeeker
type ReadSeekerWriteToProvider interface {
GetWriteTo(seeker io.ReadSeeker) (r ReadSeekerWriteTo, cleanup func())
}
// BufferedReadSeekerWriteToPool uses a sync.Pool to create and reuse
// []byte slices for buffering parts in memory
type BufferedReadSeekerWriteToPool struct {
pool sync.Pool
}
// NewBufferedReadSeekerWriteToPool will return a new BufferedReadSeekerWriteToPool that will create
// a pool of reusable buffers . If size is less then < 64 KiB then the buffer
// will default to 64 KiB. Reason: io.Copy from writers or readers that don't support io.WriteTo or io.ReadFrom
// respectively will default to copying 32 KiB.
func NewBufferedReadSeekerWriteToPool(size int) *BufferedReadSeekerWriteToPool {
if size < 65536 {
size = 65536
}
return &BufferedReadSeekerWriteToPool{
pool: sync.Pool{New: func() interface{} {
return make([]byte, size)
}},
}
}
// GetWriteTo will wrap the provided io.ReadSeeker with a BufferedReadSeekerWriteTo.
// The provided cleanup must be called after operations have been completed on the
// returned io.ReadSeekerWriteTo in order to signal the return of resources to the pool.
func (p *BufferedReadSeekerWriteToPool) GetWriteTo(seeker io.ReadSeeker) (r ReadSeekerWriteTo, cleanup func()) {
buffer := p.pool.Get().([]byte)
r = &BufferedReadSeekerWriteTo{BufferedReadSeeker: NewBufferedReadSeeker(seeker, buffer)}
cleanup = func() {
p.pool.Put(buffer)
}
return r, cleanup
}

View file

@ -0,0 +1,187 @@
package manager
import (
"io"
"sync"
)
// ReadSeekCloser wraps a io.Reader returning a ReaderSeekerCloser. Allows the
// SDK to accept an io.Reader that is not also an io.Seeker for unsigned
// streaming payload API operations.
//
// A readSeekCloser wrapping an nonseekable io.Reader used in an API operation's
// input will prevent that operation being retried in the case of
// network errors, and cause operation requests to fail if yhe operation
// requires payload signing.
//
// Note: If using with S3 PutObject to stream an object upload. The SDK's S3
// Upload Manager(s3manager.Uploader) provides support for streaming
// with the ability to retry network errors.
func ReadSeekCloser(r io.Reader) *ReaderSeekerCloser {
return &ReaderSeekerCloser{r}
}
// ReaderSeekerCloser represents a reader that can also delegate io.Seeker and
// io.Closer interfaces to the underlying object if they are available.
type ReaderSeekerCloser struct {
r io.Reader
}
// seekerLen attempts to get the number of bytes remaining at the seeker's
// current position. Returns the number of bytes remaining or error.
func seekerLen(s io.Seeker) (int64, error) {
// Determine if the seeker is actually seekable. ReaderSeekerCloser
// hides the fact that a io.Readers might not actually be seekable.
switch v := s.(type) {
case *ReaderSeekerCloser:
return v.GetLen()
}
return computeSeekerLength(s)
}
// GetLen returns the length of the bytes remaining in the underlying reader.
// Checks first for Len(), then io.Seeker to determine the size of the
// underlying reader.
//
// Will return -1 if the length cannot be determined.
func (r *ReaderSeekerCloser) GetLen() (int64, error) {
if l, ok := r.HasLen(); ok {
return int64(l), nil
}
if s, ok := r.r.(io.Seeker); ok {
return computeSeekerLength(s)
}
return -1, nil
}
func computeSeekerLength(s io.Seeker) (int64, error) {
curOffset, err := s.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
}
endOffset, err := s.Seek(0, io.SeekEnd)
if err != nil {
return 0, err
}
_, err = s.Seek(curOffset, io.SeekStart)
if err != nil {
return 0, err
}
return endOffset - curOffset, nil
}
// HasLen returns the length of the underlying reader if the value implements
// the Len() int method.
func (r *ReaderSeekerCloser) HasLen() (int, bool) {
type lenner interface {
Len() int
}
if lr, ok := r.r.(lenner); ok {
return lr.Len(), true
}
return 0, false
}
// Read reads from the reader up to size of p. The number of bytes read, and
// error if it occurred will be returned.
//
// If the reader is not an io.Reader zero bytes read, and nil error will be
// returned.
//
// Performs the same functionality as io.Reader Read
func (r *ReaderSeekerCloser) Read(p []byte) (int, error) {
switch t := r.r.(type) {
case io.Reader:
return t.Read(p)
}
return 0, nil
}
// Seek sets the offset for the next Read to offset, interpreted according to
// whence: 0 means relative to the origin of the file, 1 means relative to the
// current offset, and 2 means relative to the end. Seek returns the new offset
// and an error, if any.
//
// If the ReaderSeekerCloser is not an io.Seeker nothing will be done.
func (r *ReaderSeekerCloser) Seek(offset int64, whence int) (int64, error) {
switch t := r.r.(type) {
case io.Seeker:
return t.Seek(offset, whence)
}
return int64(0), nil
}
// IsSeeker returns if the underlying reader is also a seeker.
func (r *ReaderSeekerCloser) IsSeeker() bool {
_, ok := r.r.(io.Seeker)
return ok
}
// Close closes the ReaderSeekerCloser.
//
// If the ReaderSeekerCloser is not an io.Closer nothing will be done.
func (r *ReaderSeekerCloser) Close() error {
switch t := r.r.(type) {
case io.Closer:
return t.Close()
}
return nil
}
// A WriteAtBuffer provides a in memory buffer supporting the io.WriterAt interface
// Can be used with the s3manager.Downloader to download content to a buffer
// in memory. Safe to use concurrently.
type WriteAtBuffer struct {
buf []byte
m sync.Mutex
// GrowthCoeff defines the growth rate of the internal buffer. By
// default, the growth rate is 1, where expanding the internal
// buffer will allocate only enough capacity to fit the new expected
// length.
GrowthCoeff float64
}
// NewWriteAtBuffer creates a WriteAtBuffer with an internal buffer
// provided by buf.
func NewWriteAtBuffer(buf []byte) *WriteAtBuffer {
return &WriteAtBuffer{buf: buf}
}
// WriteAt writes a slice of bytes to a buffer starting at the position provided
// The number of bytes written will be returned, or error. Can overwrite previous
// written slices if the write ats overlap.
func (b *WriteAtBuffer) WriteAt(p []byte, pos int64) (n int, err error) {
pLen := len(p)
expLen := pos + int64(pLen)
b.m.Lock()
defer b.m.Unlock()
if int64(len(b.buf)) < expLen {
if int64(cap(b.buf)) < expLen {
if b.GrowthCoeff < 1 {
b.GrowthCoeff = 1
}
newBuf := make([]byte, expLen, int64(b.GrowthCoeff*float64(expLen)))
copy(newBuf, b.buf)
b.buf = newBuf
}
b.buf = b.buf[:expLen]
}
copy(b.buf[pos:], p)
return pLen, nil
}
// Bytes returns a slice of bytes written to the buffer.
func (b *WriteAtBuffer) Bytes() []byte {
b.m.Lock()
defer b.m.Unlock()
return b.buf
}

View file

@ -0,0 +1,855 @@
package manager
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"sort"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/middleware"
"github.com/aws/aws-sdk-go-v2/internal/awsutil"
internalcontext "github.com/aws/aws-sdk-go-v2/internal/context"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
smithymiddleware "github.com/aws/smithy-go/middleware"
smithyhttp "github.com/aws/smithy-go/transport/http"
)
// MaxUploadParts is the maximum allowed number of parts in a multi-part upload
// on Amazon S3.
const MaxUploadParts int32 = 10000
// MinUploadPartSize is the minimum allowed part size when uploading a part to
// Amazon S3.
const MinUploadPartSize int64 = 1024 * 1024 * 5
// DefaultUploadPartSize is the default part size to buffer chunks of a
// payload into.
const DefaultUploadPartSize = MinUploadPartSize
// DefaultUploadConcurrency is the default number of goroutines to spin up when
// using Upload().
const DefaultUploadConcurrency = 5
// A MultiUploadFailure wraps a failed S3 multipart upload. An error returned
// will satisfy this interface when a multi part upload failed to upload all
// chucks to S3. In the case of a failure the UploadID is needed to operate on
// the chunks, if any, which were uploaded.
//
// Example:
//
// u := manager.NewUploader(client)
// output, err := u.upload(context.Background(), input)
// if err != nil {
// var multierr manager.MultiUploadFailure
// if errors.As(err, &multierr) {
// fmt.Printf("upload failure UploadID=%s, %s\n", multierr.UploadID(), multierr.Error())
// } else {
// fmt.Printf("upload failure, %s\n", err.Error())
// }
// }
type MultiUploadFailure interface {
error
// UploadID returns the upload id for the S3 multipart upload that failed.
UploadID() string
}
// A multiUploadError wraps the upload ID of a failed s3 multipart upload.
// Composed of BaseError for code, message, and original error
//
// Should be used for an error that occurred failing a S3 multipart upload,
// and a upload ID is available. If an uploadID is not available a more relevant
type multiUploadError struct {
err error
// ID for multipart upload which failed.
uploadID string
}
// batchItemError returns the string representation of the error.
//
// # See apierr.BaseError ErrorWithExtra for output format
//
// Satisfies the error interface.
func (m *multiUploadError) Error() string {
var extra string
if m.err != nil {
extra = fmt.Sprintf(", cause: %s", m.err.Error())
}
return fmt.Sprintf("upload multipart failed, upload id: %s%s", m.uploadID, extra)
}
// Unwrap returns the underlying error that cause the upload failure
func (m *multiUploadError) Unwrap() error {
return m.err
}
// UploadID returns the id of the S3 upload which failed.
func (m *multiUploadError) UploadID() string {
return m.uploadID
}
// UploadOutput represents a response from the Upload() call.
type UploadOutput struct {
// The URL where the object was uploaded to.
Location string
// The ID for a multipart upload to S3. In the case of an error the error
// can be cast to the MultiUploadFailure interface to extract the upload ID.
// Will be empty string if multipart upload was not used, and the object
// was uploaded as a single PutObject call.
UploadID string
// The list of parts that were uploaded and their checksums. Will be empty
// if multipart upload was not used, and the object was uploaded as a
// single PutObject call.
CompletedParts []types.CompletedPart
// Indicates whether the uploaded object uses an S3 Bucket Key for server-side
// encryption with Amazon Web Services KMS (SSE-KMS).
BucketKeyEnabled bool
// The base64-encoded, 32-bit CRC32 checksum of the object.
ChecksumCRC32 *string
// The base64-encoded, 32-bit CRC32C checksum of the object.
ChecksumCRC32C *string
// The base64-encoded, 160-bit SHA-1 digest of the object.
ChecksumSHA1 *string
// The base64-encoded, 256-bit SHA-256 digest of the object.
ChecksumSHA256 *string
// Entity tag for the uploaded object.
ETag *string
// If the object expiration is configured, this will contain the expiration date
// (expiry-date) and rule ID (rule-id). The value of rule-id is URL encoded.
Expiration *string
// The object key of the newly created object.
Key *string
// If present, indicates that the requester was successfully charged for the
// request.
RequestCharged types.RequestCharged
// If present, specifies the ID of the Amazon Web Services Key Management Service
// (Amazon Web Services KMS) symmetric customer managed customer master key (CMK)
// that was used for the object.
SSEKMSKeyId *string
// If you specified server-side encryption either with an Amazon S3-managed
// encryption key or an Amazon Web Services KMS customer master key (CMK) in your
// initiate multipart upload request, the response includes this header. It
// confirms the encryption algorithm that Amazon S3 used to encrypt the object.
ServerSideEncryption types.ServerSideEncryption
// The version of the object that was uploaded. Will only be populated if
// the S3 Bucket is versioned. If the bucket is not versioned this field
// will not be set.
VersionID *string
}
// WithUploaderRequestOptions appends to the Uploader's API client options.
func WithUploaderRequestOptions(opts ...func(*s3.Options)) func(*Uploader) {
return func(u *Uploader) {
u.ClientOptions = append(u.ClientOptions, opts...)
}
}
// The Uploader structure that calls Upload(). It is safe to call Upload()
// on this structure for multiple objects and across concurrent goroutines.
// Mutating the Uploader's properties is not safe to be done concurrently.
//
// # Pre-computed Checksums
//
// Care must be taken when using pre-computed checksums the transfer upload
// manager. The format and value of the checksum differs based on if the upload
// will preformed as a single or multipart upload.
//
// Uploads that are smaller than the Uploader's PartSize will be uploaded using
// the PutObject API operation. Pre-computed checksum of the uploaded object's
// content are valid for these single part uploads. If the checksum provided
// does not match the uploaded content the upload will fail.
//
// Uploads that are larger than the Uploader's PartSize will be uploaded using
// multi-part upload. The Pre-computed checksums for these uploads are a
// checksum of checksums of each part. Not a checksum of the full uploaded
// bytes. With the format of "<checksum of checksum>-<numberParts>", (e.g.
// "DUoRhQ==-3"). If a pre-computed checksum is provided that does not match
// this format, as matches the content uploaded, the upload will fail.
//
// ContentMD5 for multipart upload is explicitly ignored for multipart upload,
// and its value is suppressed.
//
// # Automatically Computed Checksums
//
// When the ChecksumAlgorithm member of Upload's input parameter PutObjectInput
// is set to a valid value, the SDK will automatically compute the checksum of
// the individual uploaded parts. The UploadOutput result from Upload will
// include the checksum of part checksums provided by S3
// CompleteMultipartUpload API call.
type Uploader struct {
// The buffer size (in bytes) to use when buffering data into chunks and
// sending them as parts to S3. The minimum allowed part size is 5MB, and
// if this value is set to zero, the DefaultUploadPartSize value will be used.
PartSize int64
// The number of goroutines to spin up in parallel per call to Upload when
// sending parts. If this is set to zero, the DefaultUploadConcurrency value
// will be used.
//
// The concurrency pool is not shared between calls to Upload.
Concurrency int
// Setting this value to true will cause the SDK to avoid calling
// AbortMultipartUpload on a failure, leaving all successfully uploaded
// parts on S3 for manual recovery.
//
// Note that storing parts of an incomplete multipart upload counts towards
// space usage on S3 and will add additional costs if not cleaned up.
LeavePartsOnError bool
// MaxUploadParts is the max number of parts which will be uploaded to S3.
// Will be used to calculate the partsize of the object to be uploaded.
// E.g: 5GB file, with MaxUploadParts set to 100, will upload the file
// as 100, 50MB parts. With a limited of s3.MaxUploadParts (10,000 parts).
//
// MaxUploadParts must not be used to limit the total number of bytes uploaded.
// Use a type like to io.LimitReader (https://golang.org/pkg/io/#LimitedReader)
// instead. An io.LimitReader is helpful when uploading an unbounded reader
// to S3, and you know its maximum size. Otherwise the reader's io.EOF returned
// error must be used to signal end of stream.
//
// Defaults to package const's MaxUploadParts value.
MaxUploadParts int32
// The client to use when uploading to S3.
S3 UploadAPIClient
// List of request options that will be passed down to individual API
// operation requests made by the uploader.
ClientOptions []func(*s3.Options)
// Defines the buffer strategy used when uploading a part
BufferProvider ReadSeekerWriteToProvider
// partPool allows for the re-usage of streaming payload part buffers between upload calls
partPool byteSlicePool
}
// NewUploader creates a new Uploader instance to upload objects to S3. Pass In
// additional functional options to customize the uploader's behavior. Requires a
// client.ConfigProvider in order to create a S3 service client. The session.Session
// satisfies the client.ConfigProvider interface.
//
// Example:
//
// // Load AWS Config
// cfg, err := config.LoadDefaultConfig(context.TODO())
// if err != nil {
// panic(err)
// }
//
// // Create an S3 Client with the config
// client := s3.NewFromConfig(cfg)
//
// // Create an uploader passing it the client
// uploader := manager.NewUploader(client)
//
// // Create an uploader with the client and custom options
// uploader := manager.NewUploader(client, func(u *manager.Uploader) {
// u.PartSize = 64 * 1024 * 1024 // 64MB per part
// })
func NewUploader(client UploadAPIClient, options ...func(*Uploader)) *Uploader {
u := &Uploader{
S3: client,
PartSize: DefaultUploadPartSize,
Concurrency: DefaultUploadConcurrency,
LeavePartsOnError: false,
MaxUploadParts: MaxUploadParts,
BufferProvider: defaultUploadBufferProvider(),
}
for _, option := range options {
option(u)
}
u.partPool = newByteSlicePool(u.PartSize)
return u
}
// Upload uploads an object to S3, intelligently buffering large
// files into smaller chunks and sending them in parallel across multiple
// goroutines. You can configure the buffer size and concurrency through the
// Uploader parameters.
//
// Additional functional options can be provided to configure the individual
// upload. These options are copies of the Uploader instance Upload is called from.
// Modifying the options will not impact the original Uploader instance.
//
// Use the WithUploaderRequestOptions helper function to pass in request
// options that will be applied to all API operations made with this uploader.
//
// It is safe to call this method concurrently across goroutines.
func (u Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*Uploader)) (
*UploadOutput, error,
) {
i := uploader{in: input, cfg: u, ctx: ctx}
// Copy ClientOptions
clientOptions := make([]func(*s3.Options), 0, len(i.cfg.ClientOptions)+1)
clientOptions = append(clientOptions, func(o *s3.Options) {
o.APIOptions = append(o.APIOptions,
middleware.AddSDKAgentKey(middleware.FeatureMetadata, userAgentKey),
func(s *smithymiddleware.Stack) error {
return s.Finalize.Insert(&setS3ExpressDefaultChecksum{}, "ResolveEndpointV2", smithymiddleware.After)
},
)
})
clientOptions = append(clientOptions, i.cfg.ClientOptions...)
i.cfg.ClientOptions = clientOptions
for _, opt := range opts {
opt(&i.cfg)
}
return i.upload()
}
// internal structure to manage an upload to S3.
type uploader struct {
ctx context.Context
cfg Uploader
in *s3.PutObjectInput
readerPos int64 // current reader position
totalSize int64 // set to -1 if the size is not known
}
// internal logic for deciding whether to upload a single part or use a
// multipart upload.
func (u *uploader) upload() (*UploadOutput, error) {
if err := u.init(); err != nil {
return nil, fmt.Errorf("unable to initialize upload: %w", err)
}
defer u.cfg.partPool.Close()
if u.cfg.PartSize < MinUploadPartSize {
return nil, fmt.Errorf("part size must be at least %d bytes", MinUploadPartSize)
}
// Do one read to determine if we have more than one part
reader, _, cleanup, err := u.nextReader()
if err == io.EOF { // single part
return u.singlePart(reader, cleanup)
} else if err != nil {
cleanup()
return nil, fmt.Errorf("read upload data failed: %w", err)
}
mu := multiuploader{uploader: u}
return mu.upload(reader, cleanup)
}
// init will initialize all default options.
func (u *uploader) init() error {
if err := validateSupportedARNType(aws.ToString(u.in.Bucket)); err != nil {
return err
}
if u.cfg.Concurrency == 0 {
u.cfg.Concurrency = DefaultUploadConcurrency
}
if u.cfg.PartSize == 0 {
u.cfg.PartSize = DefaultUploadPartSize
}
if u.cfg.MaxUploadParts == 0 {
u.cfg.MaxUploadParts = MaxUploadParts
}
// Try to get the total size for some optimizations
if err := u.initSize(); err != nil {
return err
}
// If PartSize was changed or partPool was never setup then we need to allocated a new pool
// so that we return []byte slices of the correct size
poolCap := u.cfg.Concurrency + 1
if u.cfg.partPool == nil || u.cfg.partPool.SliceSize() != u.cfg.PartSize {
u.cfg.partPool = newByteSlicePool(u.cfg.PartSize)
u.cfg.partPool.ModifyCapacity(poolCap)
} else {
u.cfg.partPool = &returnCapacityPoolCloser{byteSlicePool: u.cfg.partPool}
u.cfg.partPool.ModifyCapacity(poolCap)
}
return nil
}
// initSize tries to detect the total stream size, setting u.totalSize. If
// the size is not known, totalSize is set to -1.
func (u *uploader) initSize() error {
u.totalSize = -1
switch r := u.in.Body.(type) {
case io.Seeker:
n, err := seekerLen(r)
if err != nil {
return err
}
u.totalSize = n
// Try to adjust partSize if it is too small and account for
// integer division truncation.
if u.totalSize/u.cfg.PartSize >= int64(u.cfg.MaxUploadParts) {
// Add one to the part size to account for remainders
// during the size calculation. e.g odd number of bytes.
u.cfg.PartSize = (u.totalSize / int64(u.cfg.MaxUploadParts)) + 1
}
}
return nil
}
// nextReader returns a seekable reader representing the next packet of data.
// This operation increases the shared u.readerPos counter, but note that it
// does not need to be wrapped in a mutex because nextReader is only called
// from the main thread.
func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) {
switch r := u.in.Body.(type) {
case readerAtSeeker:
var err error
n := u.cfg.PartSize
if u.totalSize >= 0 {
bytesLeft := u.totalSize - u.readerPos
if bytesLeft <= u.cfg.PartSize {
err = io.EOF
n = bytesLeft
}
}
var (
reader io.ReadSeeker
cleanup func()
)
reader = io.NewSectionReader(r, u.readerPos, n)
if u.cfg.BufferProvider != nil {
reader, cleanup = u.cfg.BufferProvider.GetWriteTo(reader)
} else {
cleanup = func() {}
}
u.readerPos += n
return reader, int(n), cleanup, err
default:
part, err := u.cfg.partPool.Get(u.ctx)
if err != nil {
return nil, 0, func() {}, err
}
n, err := readFillBuf(r, *part)
u.readerPos += int64(n)
cleanup := func() {
u.cfg.partPool.Put(part)
}
return bytes.NewReader((*part)[0:n]), n, cleanup, err
}
}
func readFillBuf(r io.Reader, b []byte) (offset int, err error) {
for offset < len(b) && err == nil {
var n int
n, err = r.Read(b[offset:])
offset += n
}
return offset, err
}
// singlePart contains upload logic for uploading a single chunk via
// a regular PutObject request. Multipart requests require at least two
// parts, or at least 5MB of data.
func (u *uploader) singlePart(r io.ReadSeeker, cleanup func()) (*UploadOutput, error) {
defer cleanup()
var params s3.PutObjectInput
awsutil.Copy(&params, u.in)
params.Body = r
// Need to use request form because URL generated in request is
// used in return.
var locationRecorder recordLocationClient
out, err := u.cfg.S3.PutObject(u.ctx, &params,
append(u.cfg.ClientOptions, locationRecorder.WrapClient())...)
if err != nil {
return nil, err
}
return &UploadOutput{
Location: locationRecorder.location,
BucketKeyEnabled: aws.ToBool(out.BucketKeyEnabled),
ChecksumCRC32: out.ChecksumCRC32,
ChecksumCRC32C: out.ChecksumCRC32C,
ChecksumSHA1: out.ChecksumSHA1,
ChecksumSHA256: out.ChecksumSHA256,
ETag: out.ETag,
Expiration: out.Expiration,
Key: params.Key,
RequestCharged: out.RequestCharged,
SSEKMSKeyId: out.SSEKMSKeyId,
ServerSideEncryption: out.ServerSideEncryption,
VersionID: out.VersionId,
}, nil
}
type httpClient interface {
Do(r *http.Request) (*http.Response, error)
}
type recordLocationClient struct {
httpClient
location string
}
func (c *recordLocationClient) WrapClient() func(o *s3.Options) {
return func(o *s3.Options) {
c.httpClient = o.HTTPClient
o.HTTPClient = c
}
}
func (c *recordLocationClient) Do(r *http.Request) (resp *http.Response, err error) {
resp, err = c.httpClient.Do(r)
if err != nil {
return resp, err
}
if resp.Request != nil && resp.Request.URL != nil {
url := *resp.Request.URL
url.RawQuery = ""
c.location = url.String()
}
return resp, err
}
// internal structure to manage a specific multipart upload to S3.
type multiuploader struct {
*uploader
wg sync.WaitGroup
m sync.Mutex
err error
uploadID string
parts completedParts
}
// keeps track of a single chunk of data being sent to S3.
type chunk struct {
buf io.ReadSeeker
num int32
cleanup func()
}
// completedParts is a wrapper to make parts sortable by their part number,
// since S3 required this list to be sent in sorted order.
type completedParts []types.CompletedPart
func (a completedParts) Len() int { return len(a) }
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a completedParts) Less(i, j int) bool {
return aws.ToInt32(a[i].PartNumber) < aws.ToInt32(a[j].PartNumber)
}
// upload will perform a multipart upload using the firstBuf buffer containing
// the first chunk of data.
func (u *multiuploader) upload(firstBuf io.ReadSeeker, cleanup func()) (*UploadOutput, error) {
var params s3.CreateMultipartUploadInput
awsutil.Copy(&params, u.in)
// Create the multipart
var locationRecorder recordLocationClient
resp, err := u.cfg.S3.CreateMultipartUpload(u.ctx, &params,
append(u.cfg.ClientOptions, locationRecorder.WrapClient())...)
if err != nil {
cleanup()
return nil, err
}
u.uploadID = *resp.UploadId
// Create the workers
ch := make(chan chunk, u.cfg.Concurrency)
for i := 0; i < u.cfg.Concurrency; i++ {
u.wg.Add(1)
go u.readChunk(ch)
}
// Send part 1 to the workers
var num int32 = 1
ch <- chunk{buf: firstBuf, num: num, cleanup: cleanup}
// Read and queue the rest of the parts
for u.geterr() == nil && err == nil {
var (
reader io.ReadSeeker
nextChunkLen int
ok bool
)
reader, nextChunkLen, cleanup, err = u.nextReader()
ok, err = u.shouldContinue(num, nextChunkLen, err)
if !ok {
cleanup()
if err != nil {
u.seterr(err)
}
break
}
num++
ch <- chunk{buf: reader, num: num, cleanup: cleanup}
}
// Close the channel, wait for workers, and complete upload
close(ch)
u.wg.Wait()
completeOut := u.complete()
if err := u.geterr(); err != nil {
return nil, &multiUploadError{
err: err,
uploadID: u.uploadID,
}
}
return &UploadOutput{
Location: locationRecorder.location,
UploadID: u.uploadID,
CompletedParts: u.parts,
BucketKeyEnabled: aws.ToBool(completeOut.BucketKeyEnabled),
ChecksumCRC32: completeOut.ChecksumCRC32,
ChecksumCRC32C: completeOut.ChecksumCRC32C,
ChecksumSHA1: completeOut.ChecksumSHA1,
ChecksumSHA256: completeOut.ChecksumSHA256,
ETag: completeOut.ETag,
Expiration: completeOut.Expiration,
Key: completeOut.Key,
RequestCharged: completeOut.RequestCharged,
SSEKMSKeyId: completeOut.SSEKMSKeyId,
ServerSideEncryption: completeOut.ServerSideEncryption,
VersionID: completeOut.VersionId,
}, nil
}
func (u *multiuploader) shouldContinue(part int32, nextChunkLen int, err error) (bool, error) {
if err != nil && err != io.EOF {
return false, fmt.Errorf("read multipart upload data failed, %w", err)
}
if nextChunkLen == 0 {
// No need to upload empty part, if file was empty to start
// with empty single part would of been created and never
// started multipart upload.
return false, nil
}
part++
// This upload exceeded maximum number of supported parts, error now.
if part > u.cfg.MaxUploadParts || part > MaxUploadParts {
var msg string
if part > u.cfg.MaxUploadParts {
msg = fmt.Sprintf("exceeded total allowed configured MaxUploadParts (%d). Adjust PartSize to fit in this limit",
u.cfg.MaxUploadParts)
} else {
msg = fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit",
MaxUploadParts)
}
return false, fmt.Errorf(msg)
}
return true, err
}
// readChunk runs in worker goroutines to pull chunks off of the ch channel
// and send() them as UploadPart requests.
func (u *multiuploader) readChunk(ch chan chunk) {
defer u.wg.Done()
for {
data, ok := <-ch
if !ok {
break
}
if u.geterr() == nil {
if err := u.send(data); err != nil {
u.seterr(err)
}
}
data.cleanup()
}
}
// send performs an UploadPart request and keeps track of the completed
// part information.
func (u *multiuploader) send(c chunk) error {
params := &s3.UploadPartInput{
Bucket: u.in.Bucket,
Key: u.in.Key,
Body: c.buf,
SSECustomerAlgorithm: u.in.SSECustomerAlgorithm,
SSECustomerKey: u.in.SSECustomerKey,
SSECustomerKeyMD5: u.in.SSECustomerKeyMD5,
ExpectedBucketOwner: u.in.ExpectedBucketOwner,
RequestPayer: u.in.RequestPayer,
ChecksumAlgorithm: u.in.ChecksumAlgorithm,
// Invalid to set any of the individual ChecksumXXX members from
// PutObject as they are never valid for individual parts of a
// multipart upload.
PartNumber: aws.Int32(c.num),
UploadId: &u.uploadID,
}
// TODO should do copy then clear?
resp, err := u.cfg.S3.UploadPart(u.ctx, params, u.cfg.ClientOptions...)
if err != nil {
return err
}
var completed types.CompletedPart
awsutil.Copy(&completed, resp)
completed.PartNumber = aws.Int32(c.num)
u.m.Lock()
u.parts = append(u.parts, completed)
u.m.Unlock()
return nil
}
// geterr is a thread-safe getter for the error object
func (u *multiuploader) geterr() error {
u.m.Lock()
defer u.m.Unlock()
return u.err
}
// seterr is a thread-safe setter for the error object
func (u *multiuploader) seterr(e error) {
u.m.Lock()
defer u.m.Unlock()
u.err = e
}
// fail will abort the multipart unless LeavePartsOnError is set to true.
func (u *multiuploader) fail() {
if u.cfg.LeavePartsOnError {
return
}
params := &s3.AbortMultipartUploadInput{
Bucket: u.in.Bucket,
Key: u.in.Key,
UploadId: &u.uploadID,
}
_, err := u.cfg.S3.AbortMultipartUpload(u.ctx, params, u.cfg.ClientOptions...)
if err != nil {
// TODO: Add logging
//logMessage(u.cfg.S3, aws.LogDebug, fmt.Sprintf("failed to abort multipart upload, %v", err))
_ = err
}
}
// complete successfully completes a multipart upload and returns the response.
func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput {
if u.geterr() != nil {
u.fail()
return nil
}
// Parts must be sorted in PartNumber order.
sort.Sort(u.parts)
var params s3.CompleteMultipartUploadInput
awsutil.Copy(&params, u.in)
params.UploadId = &u.uploadID
params.MultipartUpload = &types.CompletedMultipartUpload{Parts: u.parts}
resp, err := u.cfg.S3.CompleteMultipartUpload(u.ctx, &params, u.cfg.ClientOptions...)
if err != nil {
u.seterr(err)
u.fail()
}
return resp
}
type readerAtSeeker interface {
io.ReaderAt
io.ReadSeeker
}
// setS3ExpressDefaultChecksum defaults to CRC32 for S3Express buckets,
// which is required when uploading to those through transfer manager.
type setS3ExpressDefaultChecksum struct{}
func (*setS3ExpressDefaultChecksum) ID() string {
return "setS3ExpressDefaultChecksum"
}
func (*setS3ExpressDefaultChecksum) HandleFinalize(
ctx context.Context, in smithymiddleware.FinalizeInput, next smithymiddleware.FinalizeHandler,
) (
out smithymiddleware.FinalizeOutput, metadata smithymiddleware.Metadata, err error,
) {
const checksumHeader = "x-amz-checksum-algorithm"
if internalcontext.GetS3Backend(ctx) != internalcontext.S3BackendS3Express {
return next.HandleFinalize(ctx, in)
}
// If this is CreateMultipartUpload we need to ensure the checksum
// algorithm header is present. Otherwise everything is driven off the
// context setting and we can let it flow from there.
if middleware.GetOperationName(ctx) == "CreateMultipartUpload" {
r, ok := in.Request.(*smithyhttp.Request)
if !ok {
return out, metadata, fmt.Errorf("unknown transport type %T", in.Request)
}
if internalcontext.GetChecksumInputAlgorithm(ctx) == "" {
r.Header.Set(checksumHeader, "CRC32")
}
return next.HandleFinalize(ctx, in)
} else if internalcontext.GetChecksumInputAlgorithm(ctx) == "" {
ctx = internalcontext.SetChecksumInputAlgorithm(ctx, string(types.ChecksumAlgorithmCrc32))
}
return next.HandleFinalize(ctx, in)
}

View file

@ -0,0 +1,83 @@
package manager
import (
"bufio"
"io"
"sync"
"github.com/aws/aws-sdk-go-v2/internal/sdkio"
)
// WriterReadFrom defines an interface implementing io.Writer and io.ReaderFrom
type WriterReadFrom interface {
io.Writer
io.ReaderFrom
}
// WriterReadFromProvider provides an implementation of io.ReadFrom for the given io.Writer
type WriterReadFromProvider interface {
GetReadFrom(writer io.Writer) (w WriterReadFrom, cleanup func())
}
type bufferedWriter interface {
WriterReadFrom
Flush() error
Reset(io.Writer)
}
type bufferedReadFrom struct {
bufferedWriter
}
func (b *bufferedReadFrom) ReadFrom(r io.Reader) (int64, error) {
n, err := b.bufferedWriter.ReadFrom(r)
if flushErr := b.Flush(); flushErr != nil && err == nil {
err = flushErr
}
return n, err
}
// PooledBufferedReadFromProvider is a WriterReadFromProvider that uses a sync.Pool
// to manage allocation and reuse of *bufio.Writer structures.
type PooledBufferedReadFromProvider struct {
pool sync.Pool
}
// NewPooledBufferedWriterReadFromProvider returns a new PooledBufferedReadFromProvider
// Size is used to control the size of the underlying *bufio.Writer created for
// calls to GetReadFrom.
func NewPooledBufferedWriterReadFromProvider(size int) *PooledBufferedReadFromProvider {
if size < int(32*sdkio.KibiByte) {
size = int(64 * sdkio.KibiByte)
}
return &PooledBufferedReadFromProvider{
pool: sync.Pool{
New: func() interface{} {
return &bufferedReadFrom{bufferedWriter: bufio.NewWriterSize(nil, size)}
},
},
}
}
// GetReadFrom takes an io.Writer and wraps it with a type which satisfies the WriterReadFrom
// interface/ Additionally a cleanup function is provided which must be called after usage of the WriterReadFrom
// has been completed in order to allow the reuse of the *bufio.Writer
func (p *PooledBufferedReadFromProvider) GetReadFrom(writer io.Writer) (r WriterReadFrom, cleanup func()) {
buffer := p.pool.Get().(*bufferedReadFrom)
buffer.Reset(writer)
r = buffer
cleanup = func() {
buffer.Reset(nil) // Reset to nil writer to release reference
p.pool.Put(buffer)
}
return r, cleanup
}
type suppressWriterAt struct {
suppressed io.Reader
}
func (s *suppressWriterAt) Read(p []byte) (n int, err error) {
return s.suppressed.Read(p)
}