upload/azure: migrate from azure-storage-blob-go to azure-sdk-for-go
https://github.com/Azure/azure-storage-blob-go/ is deprecated, the main SDK should be now used instead. Let's migrate the code. There should be no functional changes. Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
parent
9beddf626f
commit
abe6ccfb50
226 changed files with 29224 additions and 30426 deletions
|
|
@ -16,15 +16,20 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/Azure/azure-pipeline-go/pipeline"
|
||||
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/common"
|
||||
)
|
||||
|
||||
// StorageClient is a client for the Azure Storage API,
|
||||
// see the docs: https://docs.microsoft.com/en-us/rest/api/storageservices/
|
||||
type StorageClient struct {
|
||||
pipeline pipeline.Pipeline
|
||||
credential *azblob.SharedKeyCredential
|
||||
}
|
||||
|
||||
// NewStorageClient creates a new client for Azure Storage API.
|
||||
|
|
@ -37,9 +42,8 @@ func NewStorageClient(storageAccount, storageAccessKey string) (*StorageClient,
|
|||
return nil, fmt.Errorf("cannot create shared key credential: %v", err)
|
||||
}
|
||||
|
||||
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
|
||||
return &StorageClient{
|
||||
pipeline: p,
|
||||
credential: credential,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -54,6 +58,10 @@ type BlobMetadata struct {
|
|||
// DefaultUploadThreads defines a tested default value for the UploadPageBlob method's threads parameter.
|
||||
const DefaultUploadThreads = 16
|
||||
|
||||
// PageBlobMaxUploadPagesBytes defines how much bytes can we upload in a single UploadPages call.
|
||||
// See https://learn.microsoft.com/en-us/rest/api/storageservices/put-page
|
||||
const PageBlobMaxUploadPagesBytes = 4 * 1024 * 1024
|
||||
|
||||
// UploadPageBlob takes the metadata and credentials required to upload the image specified by `fileName`
|
||||
// It can speed up the upload by using goroutines. The number of parallel goroutines is bounded by
|
||||
// the `threads` argument.
|
||||
|
|
@ -61,12 +69,12 @@ const DefaultUploadThreads = 16
|
|||
// Note that if you want to create an image out of the page blob, make sure that metadata.BlobName
|
||||
// has a .vhd extension, see EnsureVHDExtension.
|
||||
func (c StorageClient) UploadPageBlob(metadata BlobMetadata, fileName string, threads int) error {
|
||||
// get storage account blob service URL endpoint.
|
||||
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", metadata.StorageAccount, metadata.ContainerName))
|
||||
|
||||
// Create a ContainerURL object that wraps the container URL and a request
|
||||
// pipeline to make requests.
|
||||
containerURL := azblob.NewContainerURL(*URL, c.pipeline)
|
||||
// Create a page blob client.
|
||||
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", metadata.StorageAccount, metadata.ContainerName, metadata.BlobName))
|
||||
client, err := pageblob.NewClientWithSharedKeyCredential(URL.String(), c.credential, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create a pageblob client: %w", err)
|
||||
}
|
||||
|
||||
// Create the container, use a never-expiring context
|
||||
ctx := context.Background()
|
||||
|
|
@ -100,16 +108,14 @@ func (c StorageClient) UploadPageBlob(metadata BlobMetadata, fileName string, th
|
|||
return fmt.Errorf("cannot seek the image: %v", err)
|
||||
}
|
||||
|
||||
// Create page blob URL. Page blob is required for VM images
|
||||
blobURL := containerURL.NewPageBlobURL(metadata.BlobName)
|
||||
_, err = blobURL.Create(ctx, stat.Size(), 0, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.PremiumPageBlobAccessTierNone, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{})
|
||||
// Create page blob. Page blob is required for VM images
|
||||
_, err = client.Create(ctx, stat.Size(), &pageblob.CreateOptions{
|
||||
HTTPHeaders: &blob.HTTPHeaders{
|
||||
BlobContentMD5: imageFileHash.Sum(nil),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create the blob URL: %v", err)
|
||||
}
|
||||
// Wrong MD5 does not seem to have any impact on the upload
|
||||
_, err = blobURL.SetHTTPHeaders(ctx, azblob.BlobHTTPHeaders{ContentMD5: imageFileHash.Sum(nil)}, azblob.BlobAccessConditions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot set the HTTP headers on the blob URL: %v", err)
|
||||
return fmt.Errorf("cannot create a new page blob: %w", err)
|
||||
}
|
||||
|
||||
// Create control variables
|
||||
|
|
@ -125,7 +131,7 @@ func (c StorageClient) UploadPageBlob(metadata BlobMetadata, fileName string, th
|
|||
run := true
|
||||
var wg sync.WaitGroup
|
||||
for run {
|
||||
buffer := make([]byte, azblob.PageBlobMaxUploadPagesBytes)
|
||||
buffer := make([]byte, PageBlobMaxUploadPagesBytes)
|
||||
n, err := reader.Read(buffer)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
|
|
@ -141,7 +147,11 @@ func (c StorageClient) UploadPageBlob(metadata BlobMetadata, fileName string, th
|
|||
semaphore <- 1
|
||||
go func(counter int64, buffer []byte, n int) {
|
||||
defer wg.Done()
|
||||
_, err = blobURL.UploadPages(ctx, counter*azblob.PageBlobMaxUploadPagesBytes, bytes.NewReader(buffer[:n]), azblob.PageBlobAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{})
|
||||
uploadRange := blob.HTTPRange{
|
||||
Offset: counter * PageBlobMaxUploadPagesBytes,
|
||||
Count: int64(n),
|
||||
}
|
||||
_, err := client.UploadPages(ctx, common.NopSeekCloser(bytes.NewReader(buffer[:n])), uploadRange, nil)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("uploading a page failed: %v", err)
|
||||
// Send the error to the error channel in a non-blocking way. If there is already an error, just discard this one
|
||||
|
|
@ -171,14 +181,19 @@ func (c StorageClient) UploadPageBlob(metadata BlobMetadata, fileName string, th
|
|||
// this method is no-op.
|
||||
func (c StorageClient) CreateStorageContainerIfNotExist(ctx context.Context, storageAccount, name string) error {
|
||||
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", storageAccount, name))
|
||||
containerURL := azblob.NewContainerURL(*URL, c.pipeline)
|
||||
|
||||
_, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
|
||||
cl, err := container.NewClientWithSharedKeyCredential(URL.String(), c.credential, nil)
|
||||
if err != nil {
|
||||
if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeContainerAlreadyExists {
|
||||
return fmt.Errorf("cannot create a storage container client: %w", err)
|
||||
}
|
||||
|
||||
_, err = cl.Create(ctx, nil)
|
||||
if err != nil {
|
||||
if bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("cannot create a storage container: %v", err)
|
||||
|
||||
return fmt.Errorf("cannot create a storage container: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -198,12 +213,14 @@ func (c StorageClient) TagBlob(ctx context.Context, metadata BlobMetadata, tags
|
|||
}
|
||||
}
|
||||
|
||||
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", metadata.StorageAccount, metadata.ContainerName))
|
||||
containerURL := azblob.NewContainerURL(*URL, c.pipeline)
|
||||
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", metadata.StorageAccount, metadata.ContainerName, metadata.BlobName))
|
||||
|
||||
blobURL := containerURL.NewPageBlobURL(metadata.BlobName)
|
||||
client, err := blob.NewClientWithSharedKeyCredential(URL.String(), c.credential, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create a blob client: %w", err)
|
||||
}
|
||||
|
||||
_, err := blobURL.SetTags(ctx, nil, nil, nil, nil, nil, nil, tags)
|
||||
_, err = client.SetTags(ctx, tags, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot tag the blob: %v", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue