many: switch to osbuild/images/pkg/upload for azure
This is part of consolidating all the upload code in images.
This commit is contained in:
parent
26ab15b1c9
commit
0e2daa201f
11 changed files with 59 additions and 77 deletions
|
|
@ -21,7 +21,7 @@ import (
|
|||
"github.com/Azure/go-autorest/autorest/azure/auth"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/common"
|
||||
"github.com/osbuild/osbuild-composer/internal/upload/azure"
|
||||
"github.com/osbuild/images/pkg/upload/azure"
|
||||
)
|
||||
|
||||
// wrapErrorf returns error constructed using fmt.Errorf from format and any
|
||||
|
|
|
|||
|
|
@ -1,200 +0,0 @@
|
|||
package azure
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/common"
|
||||
"github.com/osbuild/osbuild-composer/internal/target"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
creds *azidentity.ClientSecretCredential
|
||||
resFact *armresources.ClientFactory
|
||||
storFact *armstorage.ClientFactory
|
||||
}
|
||||
|
||||
// NewClient creates a client for accessing the Azure API.
|
||||
// See https://docs.microsoft.com/en-us/rest/api/azure/
|
||||
// If you need to work with the Azure Storage API, see NewStorageClient
|
||||
func NewClient(credentials Credentials, tenantID, subscriptionID string) (*Client, error) {
|
||||
creds, err := azidentity.NewClientSecretCredential(tenantID, credentials.clientID, credentials.clientSecret, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating azure ClientSecretCredential failed: %v", err)
|
||||
}
|
||||
|
||||
resFact, err := armresources.NewClientFactory(subscriptionID, creds, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating resources client factory failed: %v", err)
|
||||
}
|
||||
|
||||
storFact, err := armstorage.NewClientFactory(subscriptionID, creds, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating storage client factory failed: %v", err)
|
||||
}
|
||||
|
||||
return &Client{
|
||||
creds,
|
||||
resFact,
|
||||
storFact,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Tag is a name-value pair representing the tag structure in Azure
|
||||
type Tag struct {
|
||||
Name string
|
||||
Value string
|
||||
}
|
||||
|
||||
// GetResourceNameByTag returns a name of an Azure resource tagged with the
|
||||
// given `tag`. Note that if multiple resources with the same tag exists
|
||||
// in the specified resource group, only one name is returned. It's undefined
|
||||
// which one it is.
|
||||
func (ac Client) GetResourceNameByTag(ctx context.Context, resourceGroup string, tag Tag) (string, error) {
|
||||
c := ac.resFact.NewClient()
|
||||
|
||||
pager := c.NewListByResourceGroupPager(resourceGroup, &armresources.ClientListByResourceGroupOptions{
|
||||
Filter: common.ToPtr(fmt.Sprintf("tagName eq '%s' and tagValue eq '%s'", tag.Name, tag.Value)),
|
||||
})
|
||||
|
||||
result, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("listing resources failed: %v", err)
|
||||
}
|
||||
|
||||
if len(result.Value) < 1 {
|
||||
return "", nil
|
||||
}
|
||||
return *result.Value[0].Name, nil
|
||||
}
|
||||
|
||||
// GetResourceGroupLocation returns the location of the given resource group.
|
||||
func (ac Client) GetResourceGroupLocation(ctx context.Context, resourceGroup string) (string, error) {
|
||||
c := ac.resFact.NewResourceGroupsClient()
|
||||
|
||||
group, err := c.Get(ctx, resourceGroup, nil)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("retrieving resource group failed: %v", err)
|
||||
}
|
||||
|
||||
return *group.Location, nil
|
||||
}
|
||||
|
||||
// CreateStorageAccount creates a storage account in the specified resource
|
||||
// group. The location parameter can be used to specify its location. The tag
|
||||
// can be used to specify a tag attached to the account.
|
||||
// The location is optional and if not provided, it is determined
|
||||
// from the resource group.
|
||||
func (ac Client) CreateStorageAccount(ctx context.Context, resourceGroup, name, location string, tag Tag) error {
|
||||
c := ac.storFact.NewAccountsClient()
|
||||
|
||||
var err error
|
||||
if location == "" {
|
||||
location, err = ac.GetResourceGroupLocation(ctx, resourceGroup)
|
||||
if err != nil {
|
||||
return fmt.Errorf("retrieving resource group location failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
poller, err := c.BeginCreate(ctx, resourceGroup, name, armstorage.AccountCreateParameters{
|
||||
SKU: &armstorage.SKU{
|
||||
Name: common.ToPtr(armstorage.SKUNameStandardLRS),
|
||||
Tier: common.ToPtr(armstorage.SKUTierStandard),
|
||||
},
|
||||
Location: &location,
|
||||
Tags: map[string]*string{
|
||||
tag.Name: &tag.Value,
|
||||
},
|
||||
Properties: &armstorage.AccountPropertiesCreateParameters{
|
||||
AllowBlobPublicAccess: common.ToPtr(false),
|
||||
MinimumTLSVersion: common.ToPtr(armstorage.MinimumTLSVersionTLS12),
|
||||
},
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending the create storage account request failed: %v", err)
|
||||
}
|
||||
|
||||
_, err = poller.PollUntilDone(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create storage account request failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetStorageAccountKey returns a storage account key that can be used to
|
||||
// access the given storage account. This method always returns only the first
|
||||
// key.
|
||||
func (ac Client) GetStorageAccountKey(ctx context.Context, resourceGroup string, storageAccount string) (string, error) {
|
||||
c := ac.storFact.NewAccountsClient()
|
||||
keys, err := c.ListKeys(ctx, resourceGroup, storageAccount, nil)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("retrieving keys for a storage account failed: %v", err)
|
||||
}
|
||||
|
||||
if len(keys.Keys) == 0 {
|
||||
return "", errors.New("azure returned an empty list of keys")
|
||||
}
|
||||
|
||||
return *keys.Keys[0].Value, nil
|
||||
}
|
||||
|
||||
// RegisterImage creates a generalized V1 Linux image from a given blob.
|
||||
// The location is optional and if not provided, it is determined
|
||||
// from the resource group.
|
||||
func (ac Client) RegisterImage(ctx context.Context, subscriptionID, resourceGroup, storageAccount, storageContainer, blobName, imageName, location string, hyperVGen target.HyperVGenerationType) error {
|
||||
c, err := armcompute.NewImagesClient(subscriptionID, ac.creds, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create compute client: %v", err)
|
||||
}
|
||||
|
||||
blobURI := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", storageAccount, storageContainer, blobName)
|
||||
|
||||
if location == "" {
|
||||
location, err = ac.GetResourceGroupLocation(ctx, resourceGroup)
|
||||
if err != nil {
|
||||
return fmt.Errorf("retrieving resource group location failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var hypvgen armcompute.HyperVGenerationTypes
|
||||
switch hyperVGen {
|
||||
case target.HyperVGenV1:
|
||||
hypvgen = armcompute.HyperVGenerationTypes(armcompute.HyperVGenerationTypesV1)
|
||||
case target.HyperVGenV2:
|
||||
hypvgen = armcompute.HyperVGenerationTypes(armcompute.HyperVGenerationTypesV2)
|
||||
default:
|
||||
return fmt.Errorf("Unknown hyper v generation type %v", hyperVGen)
|
||||
}
|
||||
|
||||
imageFuture, err := c.BeginCreateOrUpdate(ctx, resourceGroup, imageName, armcompute.Image{
|
||||
Properties: &armcompute.ImageProperties{
|
||||
HyperVGeneration: common.ToPtr(hypvgen),
|
||||
SourceVirtualMachine: nil,
|
||||
StorageProfile: &armcompute.ImageStorageProfile{
|
||||
OSDisk: &armcompute.ImageOSDisk{
|
||||
OSType: common.ToPtr(armcompute.OperatingSystemTypesLinux),
|
||||
BlobURI: &blobURI,
|
||||
OSState: common.ToPtr(armcompute.OperatingSystemStateTypesGeneralized),
|
||||
},
|
||||
},
|
||||
},
|
||||
Location: &location,
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending the create image request failed: %v", err)
|
||||
}
|
||||
|
||||
_, err = imageFuture.PollUntilDone(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create image request failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,269 +0,0 @@
|
|||
package azure
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
// azure uses MD5 hashes
|
||||
/* #nosec G501 */
|
||||
"crypto/md5"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/common"
|
||||
)
|
||||
|
||||
// StorageClient is a client for the Azure Storage API,
|
||||
// see the docs: https://docs.microsoft.com/en-us/rest/api/storageservices/
|
||||
type StorageClient struct {
|
||||
credential *azblob.SharedKeyCredential
|
||||
}
|
||||
|
||||
// NewStorageClient creates a new client for Azure Storage API.
|
||||
// See the following keys how to retrieve the storageAccessKey using the
|
||||
// Azure's API:
|
||||
// https://docs.microsoft.com/en-us/rest/api/storagerp/storageaccounts/listkeys
|
||||
func NewStorageClient(storageAccount, storageAccessKey string) (*StorageClient, error) {
|
||||
credential, err := azblob.NewSharedKeyCredential(storageAccount, storageAccessKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create shared key credential: %v", err)
|
||||
}
|
||||
|
||||
return &StorageClient{
|
||||
credential: credential,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// BlobMetadata contains information needed to store the image in a proper place.
|
||||
// In case of Azure cloud storage this includes container name and blob name.
|
||||
type BlobMetadata struct {
|
||||
StorageAccount string
|
||||
ContainerName string
|
||||
BlobName string
|
||||
}
|
||||
|
||||
// DefaultUploadThreads defines a tested default value for the UploadPageBlob method's threads parameter.
|
||||
const DefaultUploadThreads = 16
|
||||
|
||||
// PageBlobMaxUploadPagesBytes defines how much bytes can we upload in a single UploadPages call.
|
||||
// See https://learn.microsoft.com/en-us/rest/api/storageservices/put-page
|
||||
const PageBlobMaxUploadPagesBytes = 4 * 1024 * 1024
|
||||
|
||||
// allZerosSlice returns true if all values in the slice are equal to 0
|
||||
func allZerosSlice(slice []byte) bool {
|
||||
for i := 0; i < len(slice); i++ {
|
||||
if slice[i] != 0 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// UploadPageBlob takes the metadata and credentials required to upload the image specified by `fileName`
|
||||
// It can speed up the upload by using goroutines. The number of parallel goroutines is bounded by
|
||||
// the `threads` argument.
|
||||
//
|
||||
// Note that if you want to create an image out of the page blob, make sure that metadata.BlobName
|
||||
// has a .vhd extension, see EnsureVHDExtension.
|
||||
func (c StorageClient) UploadPageBlob(metadata BlobMetadata, fileName string, threads int) error {
|
||||
// Create a page blob client.
|
||||
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", metadata.StorageAccount, metadata.ContainerName, metadata.BlobName))
|
||||
client, err := pageblob.NewClientWithSharedKeyCredential(URL.String(), c.credential, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create a pageblob client: %w", err)
|
||||
}
|
||||
|
||||
// Create the container, use a never-expiring context
|
||||
ctx := context.Background()
|
||||
|
||||
// Open the image file for reading
|
||||
imageFile, err := os.Open(fileName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open the image: %v", err)
|
||||
}
|
||||
defer imageFile.Close()
|
||||
|
||||
// Stat image to get the file size
|
||||
stat, err := imageFile.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot stat the image: %v", err)
|
||||
}
|
||||
|
||||
if stat.Size()%512 != 0 {
|
||||
return errors.New("size for azure image must be aligned to 512 bytes")
|
||||
}
|
||||
|
||||
// Hash the imageFile
|
||||
// azure uses MD5 hashes
|
||||
/* #nosec G401 */
|
||||
imageFileHash := md5.New()
|
||||
if _, err := io.Copy(imageFileHash, imageFile); err != nil {
|
||||
return fmt.Errorf("cannot create md5 of the image: %v", err)
|
||||
}
|
||||
// Move the cursor back to the start of the imageFile
|
||||
if _, err := imageFile.Seek(0, 0); err != nil {
|
||||
return fmt.Errorf("cannot seek the image: %v", err)
|
||||
}
|
||||
|
||||
// Create page blob. Page blob is required for VM images
|
||||
_, err = client.Create(ctx, stat.Size(), &pageblob.CreateOptions{
|
||||
HTTPHeaders: &blob.HTTPHeaders{
|
||||
BlobContentMD5: imageFileHash.Sum(nil),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create a new page blob: %w", err)
|
||||
}
|
||||
|
||||
// Create control variables
|
||||
// This channel simulates behavior of a semaphore and bounds the number of parallel threads
|
||||
var semaphore = make(chan int, threads)
|
||||
// Forward error from goroutine to the caller
|
||||
var errorInGoroutine = make(chan error, 1)
|
||||
var counter int64 = 0
|
||||
|
||||
// Create buffered reader to speed up the upload
|
||||
reader := bufio.NewReader(imageFile)
|
||||
// Run the upload
|
||||
run := true
|
||||
var wg sync.WaitGroup
|
||||
for run {
|
||||
buffer := make([]byte, PageBlobMaxUploadPagesBytes)
|
||||
n, err := reader.Read(buffer)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
run = false
|
||||
} else {
|
||||
return fmt.Errorf("reading the image failed: %v", err)
|
||||
}
|
||||
}
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// Skip the uploading part if there are only zeros in the buffer.
|
||||
// We already defined the size of the blob in the initial call and the blob is zero-initialized,
|
||||
// so this pushing zeros would actually be a no-op.
|
||||
if allZerosSlice(buffer) {
|
||||
counter++
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
semaphore <- 1
|
||||
go func(counter int64, buffer []byte, n int) {
|
||||
defer wg.Done()
|
||||
uploadRange := blob.HTTPRange{
|
||||
Offset: counter * PageBlobMaxUploadPagesBytes,
|
||||
Count: int64(n),
|
||||
}
|
||||
_, err := client.UploadPages(ctx, common.NopSeekCloser(bytes.NewReader(buffer[:n])), uploadRange, nil)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("uploading a page failed: %v", err)
|
||||
// Send the error to the error channel in a non-blocking way. If there is already an error, just discard this one
|
||||
select {
|
||||
case errorInGoroutine <- err:
|
||||
default:
|
||||
}
|
||||
}
|
||||
<-semaphore
|
||||
}(counter, buffer, n)
|
||||
counter++
|
||||
}
|
||||
// Wait for all goroutines to finish
|
||||
wg.Wait()
|
||||
// Check any errors during the transmission using a nonblocking read from the channel
|
||||
select {
|
||||
case err := <-errorInGoroutine:
|
||||
return err
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateStorageContainerIfNotExist creates an empty storage container inside
|
||||
// a storage account. If a container with the same name already exists,
|
||||
// this method is no-op.
|
||||
func (c StorageClient) CreateStorageContainerIfNotExist(ctx context.Context, storageAccount, name string) error {
|
||||
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", storageAccount, name))
|
||||
|
||||
cl, err := container.NewClientWithSharedKeyCredential(URL.String(), c.credential, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create a storage container client: %w", err)
|
||||
}
|
||||
|
||||
_, err = cl.Create(ctx, nil)
|
||||
if err != nil {
|
||||
if bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("cannot create a storage container: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Taken from https://docs.microsoft.com/en-us/rest/api/storageservices/set-blob-tags#request-body
|
||||
var tagKeyRegexp = regexp.MustCompile(`^[a-zA-Z0-9 +-./:=_]{1,256}$`)
|
||||
var tagValueRegexp = regexp.MustCompile(`^[a-zA-Z0-9 +-./:=_]{0,256}$`)
|
||||
|
||||
func (c StorageClient) TagBlob(ctx context.Context, metadata BlobMetadata, tags map[string]string) error {
|
||||
for key, value := range tags {
|
||||
if !tagKeyRegexp.MatchString(key) {
|
||||
return fmt.Errorf("tag key `%s` doesn't match the format accepted by Azure", key)
|
||||
}
|
||||
if !tagValueRegexp.MatchString(key) {
|
||||
return fmt.Errorf("tag value `%s` of key `%s` doesn't match the format accepted by Azure", value, key)
|
||||
}
|
||||
}
|
||||
|
||||
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", metadata.StorageAccount, metadata.ContainerName, metadata.BlobName))
|
||||
|
||||
client, err := blob.NewClientWithSharedKeyCredential(URL.String(), c.credential, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create a blob client: %w", err)
|
||||
}
|
||||
|
||||
_, err = client.SetTags(ctx, tags, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot tag the blob: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RandomStorageAccountName returns a randomly generated name that can be used
|
||||
// for a storage account. This means that it must use only alphanumeric
|
||||
// characters and its length must be 24 or lower.
|
||||
func RandomStorageAccountName(prefix string) string {
|
||||
id := uuid.New().String()
|
||||
id = strings.ReplaceAll(id, "-", "")
|
||||
|
||||
return (prefix + id)[:24]
|
||||
}
|
||||
|
||||
// EnsureVHDExtension returns the given string with .vhd suffix if it already
|
||||
// doesn't have one.
|
||||
func EnsureVHDExtension(s string) string {
|
||||
if strings.HasSuffix(s, ".vhd") {
|
||||
return s
|
||||
}
|
||||
|
||||
return s + ".vhd"
|
||||
}
|
||||
|
|
@ -1,33 +0,0 @@
|
|||
package azure
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRandomStorageAccountName(t *testing.T) {
|
||||
randomName := RandomStorageAccountName("ib")
|
||||
|
||||
assert.Len(t, randomName, 24)
|
||||
|
||||
r := regexp.MustCompile(`^[\d\w]{24}$`)
|
||||
assert.True(t, r.MatchString(randomName), "the returned name should be 24 characters long and contain only alphanumerical characters")
|
||||
}
|
||||
|
||||
func TestEnsureVHDExtension(t *testing.T) {
|
||||
tests := []struct {
|
||||
s string
|
||||
want string
|
||||
}{
|
||||
{s: "toucan.zip", want: "toucan.zip.vhd"},
|
||||
{s: "kingfisher.vhd", want: "kingfisher.vhd"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.s, func(t *testing.T) {
|
||||
require.Equal(t, tt.want, EnsureVHDExtension(tt.s))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -1,35 +0,0 @@
|
|||
package azure
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
)
|
||||
|
||||
type Credentials struct {
|
||||
clientID string
|
||||
clientSecret string
|
||||
}
|
||||
|
||||
// ParseAzureCredentialsFile parses a credentials file for azure.
|
||||
// The file is in toml format and contains two keys: client_id and
|
||||
// client_secret
|
||||
//
|
||||
// Example of the file:
|
||||
// client_id = "clientIdOfMyApplication"
|
||||
// client_secret = "ToucanToucan~"
|
||||
func ParseAzureCredentialsFile(filename string) (*Credentials, error) {
|
||||
var creds struct {
|
||||
ClientID string `toml:"client_id"`
|
||||
ClientSecret string `toml:"client_secret"`
|
||||
}
|
||||
_, err := toml.DecodeFile(filename, &creds)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse azure credentials: %v", err)
|
||||
}
|
||||
|
||||
return &Credentials{
|
||||
clientID: creds.ClientID,
|
||||
clientSecret: creds.ClientSecret,
|
||||
}, nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue