debian-forge-composer/internal/upload/azure/azurestorage.go
Ondřej Budai 2e39d629a9 worker: add azure image upload target
This commit adds and implements org.osbuild.azure.image target.

Let's talk about the already implemented org.osbuild.azure target firstly:
The purpose of this target is to authenticate using the Azure Storage
credentials and upload the image file as a Page Blob. Page Blob is basically
an object in storage and it cannot be directly used to launch a VM. To achieve
that, you need to define an actual Azure Image with the Page Blob attached.

For the cloud API, we would like to create an actual Azure Image that is
immediately available for new VMs. The new target accomplishes it.
To achieve this, it must use a different authentication method: Azure OAuth.
The other important difference is that currently, the credentials are stored
on the worker and not in target options. This should lead to better security
because we don't send the credentials over network. In the future, we would
like to have credential-less setup using workers in Azure with the right
IAM policies applied but this requires more investigation and is not
implemented in this commit.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
2021-03-06 15:40:48 +00:00

203 lines
6.7 KiB
Go

package azure
import (
"bufio"
"bytes"
"context"
"crypto/md5"
"errors"
"fmt"
"io"
"net/url"
"os"
"strings"
"sync"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/google/uuid"
)
// StorageClient is a client for the Azure Storage API,
// see the docs: https://docs.microsoft.com/en-us/rest/api/storageservices/
type StorageClient struct {
pipeline pipeline.Pipeline
}
// NewStorageClient creates a new client for Azure Storage API.
// See the following keys how to retrieve the storageAccessKey using the
// Azure's API:
// https://docs.microsoft.com/en-us/rest/api/storagerp/storageaccounts/listkeys
func NewStorageClient(storageAccount, storageAccessKey string) (*StorageClient, error) {
credential, err := azblob.NewSharedKeyCredential(storageAccount, storageAccessKey)
if err != nil {
return nil, fmt.Errorf("cannot create shared key credential: %v", err)
}
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
return &StorageClient{
pipeline: p,
}, nil
}
// BlobMetadata contains information needed to store the image in a proper place.
// In case of Azure cloud storage this includes container name and blob name.
type BlobMetadata struct {
StorageAccount string
ContainerName string
BlobName string
}
// DefaultUploadThreads defines a tested default value for the UploadPageBlob method's threads parameter.
const DefaultUploadThreads = 16
// UploadPageBlob takes the metadata and credentials required to upload the image specified by `fileName`
// It can speed up the upload by using goroutines. The number of parallel goroutines is bounded by
// the `threads` argument.
func (c StorageClient) UploadPageBlob(metadata BlobMetadata, fileName string, threads int) error {
// Azure cannot create an image from a storage blob without .vhd extension
if !strings.HasSuffix(metadata.BlobName, ".vhd") {
metadata.BlobName = metadata.BlobName + ".vhd"
}
// get storage account blob service URL endpoint.
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", metadata.StorageAccount, metadata.ContainerName))
// Create a ContainerURL object that wraps the container URL and a request
// pipeline to make requests.
containerURL := azblob.NewContainerURL(*URL, c.pipeline)
// Create the container, use a never-expiring context
ctx := context.Background()
// Open the image file for reading
imageFile, err := os.Open(fileName)
if err != nil {
return fmt.Errorf("cannot open the image: %v", err)
}
defer imageFile.Close()
// Stat image to get the file size
stat, err := imageFile.Stat()
if err != nil {
return fmt.Errorf("cannot stat the image: %v", err)
}
if stat.Size()%512 != 0 {
return errors.New("size for azure image must be aligned to 512 bytes")
}
// Hash the imageFile
imageFileHash := md5.New()
if _, err := io.Copy(imageFileHash, imageFile); err != nil {
return fmt.Errorf("cannot create md5 of the image: %v", err)
}
// Move the cursor back to the start of the imageFile
if _, err := imageFile.Seek(0, 0); err != nil {
return fmt.Errorf("cannot seek the image: %v", err)
}
// Create page blob URL. Page blob is required for VM images
blobURL := newPageBlobURL(containerURL, metadata.BlobName)
_, err = blobURL.Create(ctx, stat.Size(), 0, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
if err != nil {
return fmt.Errorf("cannot create the blob URL: %v", err)
}
// Wrong MD5 does not seem to have any impact on the upload
_, err = blobURL.SetHTTPHeaders(ctx, azblob.BlobHTTPHeaders{ContentMD5: imageFileHash.Sum(nil)}, azblob.BlobAccessConditions{})
if err != nil {
return fmt.Errorf("cannot set the HTTP headers on the blob URL: %v", err)
}
// Create control variables
// This channel simulates behavior of a semaphore and bounds the number of parallel threads
var semaphore = make(chan int, threads)
// Forward error from goroutine to the caller
var errorInGoroutine = make(chan error, 1)
var counter int64 = 0
// Create buffered reader to speed up the upload
reader := bufio.NewReader(imageFile)
// Run the upload
run := true
var wg sync.WaitGroup
for run {
buffer := make([]byte, azblob.PageBlobMaxUploadPagesBytes)
n, err := reader.Read(buffer)
if err != nil {
if err == io.EOF {
run = false
} else {
return fmt.Errorf("reading the image failed: %v", err)
}
}
if n == 0 {
break
}
wg.Add(1)
semaphore <- 1
go func(counter int64, buffer []byte, n int) {
defer wg.Done()
_, err = blobURL.UploadPages(ctx, counter*azblob.PageBlobMaxUploadPagesBytes, bytes.NewReader(buffer[:n]), azblob.PageBlobAccessConditions{}, nil)
if err != nil {
err = fmt.Errorf("uploading a page failed: %v", err)
// Send the error to the error channel in a non-blocking way. If there is already an error, just discard this one
select {
case errorInGoroutine <- err:
default:
}
}
<-semaphore
}(counter, buffer, n)
counter++
}
// Wait for all goroutines to finish
wg.Wait()
// Check any errors during the transmission using a nonblocking read from the channel
select {
case err := <-errorInGoroutine:
return err
default:
}
// Check properties, specifically MD5 sum of the blob
props, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
if err != nil {
return fmt.Errorf("getting the properties of the new blob failed: %v", err)
}
var blobChecksum []byte = props.ContentMD5()
var fileChecksum []byte = imageFileHash.Sum(nil)
if !bytes.Equal(blobChecksum, fileChecksum) {
return errors.New("error during image upload. the image seems to be corrupted")
}
return nil
}
// CreateStorageContainerIfNotExist creates an empty storage container inside
// a storage account. If a container with the same name already exists,
// this method is no-op.
func (c StorageClient) CreateStorageContainerIfNotExist(ctx context.Context, storageAccount, name string) error {
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", storageAccount, name))
containerURL := azblob.NewContainerURL(*URL, c.pipeline)
_, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
if err != nil {
if storageErr, ok := err.(azblob.StorageError); ok && storageErr.(azblob.StorageError).ServiceCode() == azblob.ServiceCodeContainerAlreadyExists {
return nil
}
return fmt.Errorf("cannot create a storage container: %v", err)
}
return nil
}
// RandomStorageAccountName returns a randomly generated name that can be used
// for a storage account. This means that it must use only alphanumeric
// characters and its length must be 24 or lower.
func RandomStorageAccountName(prefix string) string {
id := uuid.New().String()
id = strings.ReplaceAll(id, "-", "")
return (prefix + id)[:24]
}