Refactor Azure upload code to a separate package

as part of this, move also the AWS upload code to the same directory
This commit is contained in:
Martin Sehnoutka 2019-11-27 09:07:30 +01:00 committed by Tom Gundersen
parent 7b54f5cfdc
commit 1135e9fe01
7 changed files with 131 additions and 111 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ __pycache__
/osbuild-worker
/osbuild-pipeline
/osbuild-upload-azure
/osbuild-upload-aws

View file

@ -4,6 +4,7 @@ build:
go build -o osbuild-worker ./cmd/osbuild-worker/
go build -o osbuild-pipeline ./cmd/osbuild-pipeline/
go build -o osbuild-upload-azure ./cmd/osbuild-upload-azure/
go build -o osbuild-upload-aws ./cmd/osbuild-upload-aws/
.PHONY: install
install:

View file

@ -5,7 +5,7 @@ import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/osbuild/osbuild-composer/internal/awsupload"
"github.com/osbuild/osbuild-composer/internal/upload/awsupload"
)
func main() {

View file

@ -1,116 +1,14 @@
package main
import (
"bufio"
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
"net/url"
"os"
"path"
"sync"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/osbuild/osbuild-composer/internal/upload/azure"
)
func handleErrors(err error) {
if err != nil {
if serr, ok := err.(azblob.StorageError); ok { // This error is a Service-specific
switch serr.ServiceCode() { // Compare serviceCode to ServiceCodeXxx constants
case azblob.ServiceCodeContainerAlreadyExists:
// This error is not fatal
fmt.Println("Received 409. Container already exists")
return
}
}
// All other error causes the program to exit
fmt.Println(err)
os.Exit(1)
}
}
type azureCredentials struct {
storageAccount string
storageAccessKey string
}
type azureImageMetadata struct {
containerName string
imageName string
}
func azureUploadImage(credentials azureCredentials, metadata azureImageMetadata, fileName string, threads int) {
// Create a default request pipeline using your storage account name and account key.
credential, err := azblob.NewSharedKeyCredential(credentials.storageAccount, credentials.storageAccessKey)
handleErrors(err)
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
// get storage account blob service URL endpoint.
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", credentials.storageAccount, metadata.containerName))
// Create a ContainerURL object that wraps the container URL and a request
// pipeline to make requests.
containerURL := azblob.NewContainerURL(*URL, p)
// Create the container, use a never-expiring context
ctx := context.Background()
// Open the image file for reading
imageFile, err := os.Open(fileName)
handleErrors(err)
// Stat image to get the file size
stat, err := imageFile.Stat()
handleErrors(err)
// Create page blob URL. Page blob is required for VM images
blobURL := containerURL.NewPageBlobURL(metadata.imageName)
_, err = blobURL.Create(ctx, stat.Size(), 0, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
handleErrors(err)
// Create control variables
// This channel simulates behavior of a semaphore and bounds the number of parallel threads
var semaphore = make(chan int, threads)
var counter int64 = 0
// Create buffered reader to speed up the upload
reader := bufio.NewReader(imageFile)
imageSize := stat.Size()
// Run the upload
run := true
var wg sync.WaitGroup
for run {
buffer := make([]byte, azblob.PageBlobMaxUploadPagesBytes)
n, err := reader.Read(buffer)
if err != nil {
if err == io.EOF {
run = false
} else {
panic(err)
}
}
if n == 0 {
break
}
wg.Add(1)
semaphore <- 1
go func(counter int64, buffer []byte, n int) {
defer wg.Done()
_, err = blobURL.UploadPages(ctx, counter*azblob.PageBlobMaxUploadPagesBytes, bytes.NewReader(buffer[:n]), azblob.PageBlobAccessConditions{}, nil)
if err != nil {
log.Fatal(err)
}
<-semaphore
}(counter, buffer, n)
fmt.Printf("\rProgress: uploading bytest %d-%d from %d bytes", counter*azblob.PageBlobMaxUploadPagesBytes, counter*azblob.PageBlobMaxUploadPagesBytes+int64(n), imageSize)
counter++
}
wg.Wait()
}
func checkStringNotEmpty(variable string, errorMessage string) {
if variable == "" {
log.Fatal(errorMessage)
@ -137,12 +35,12 @@ func main() {
fmt.Println("Image to upload is:", fileName)
azureUploadImage(azureCredentials{
storageAccount: storageAccount,
storageAccessKey: storageAccessKey,
}, azureImageMetadata{
imageName: path.Base(fileName),
containerName: containerName,
azure.UploadImage(azure.Credentials{
StorageAccount: storageAccount,
StorageAccessKey: storageAccessKey,
}, azure.ImageMetadata{
ImageName: path.Base(fileName),
ContainerName: containerName,
}, fileName, threads)
}

View file

@ -6,9 +6,9 @@ import (
"os/exec"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/awsupload"
"github.com/osbuild/osbuild-composer/internal/pipeline"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/upload/awsupload"
)
type Job struct {

View file

@ -0,0 +1,120 @@
package azure
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/url"
"os"
"sync"
"github.com/Azure/azure-storage-blob-go/azblob"
)
// Credentials contains credentials to connect to your account
// It uses so called "Client credentials", see the official documentation for more information:
// https://docs.microsoft.com/en-us/azure/go/azure-sdk-go-authorization#available-authentication-types-and-methods
type Credentials struct {
StorageAccount string
StorageAccessKey string
}
// ImageMetadata contains information needed to store the image in a proper place.
// In case of Azure cloud storage this includes container name and blob name.
type ImageMetadata struct {
ContainerName string
ImageName string
}
// UploadImage takes the metadata and credentials required to upload the image specified by `fileName`
// It can speed up the upload by using gorutines. The number of parallel gorutines is bounded by
// the `threads` argument.
func UploadImage(credentials Credentials, metadata ImageMetadata, fileName string, threads int) error {
// Create a default request pipeline using your storage account name and account key.
credential, err := azblob.NewSharedKeyCredential(credentials.StorageAccount, credentials.StorageAccessKey)
if err != nil {
return err
}
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
// get storage account blob service URL endpoint.
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", credentials.StorageAccount, metadata.ContainerName))
// Create a ContainerURL object that wraps the container URL and a request
// pipeline to make requests.
containerURL := azblob.NewContainerURL(*URL, p)
// Create the container, use a never-expiring context
ctx := context.Background()
// Open the image file for reading
imageFile, err := os.Open(fileName)
if err != nil {
return err
}
// Stat image to get the file size
stat, err := imageFile.Stat()
if err != nil {
return err
}
// Create page blob URL. Page blob is required for VM images
blobURL := containerURL.NewPageBlobURL(metadata.ImageName)
_, err = blobURL.Create(ctx, stat.Size(), 0, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
if err != nil {
return err
}
// Create control variables
// This channel simulates behavior of a semaphore and bounds the number of parallel threads
var semaphore = make(chan int, threads)
// Forward error from gorutine to the caller
var errorInGorutine = make(chan error, 1)
var counter int64 = 0
// Create buffered reader to speed up the upload
reader := bufio.NewReader(imageFile)
// Run the upload
run := true
var wg sync.WaitGroup
for run {
buffer := make([]byte, azblob.PageBlobMaxUploadPagesBytes)
n, err := reader.Read(buffer)
if err != nil {
if err == io.EOF {
run = false
} else {
return err
}
}
if n == 0 {
break
}
wg.Add(1)
semaphore <- 1
go func(counter int64, buffer []byte, n int) {
defer wg.Done()
_, err = blobURL.UploadPages(ctx, counter*azblob.PageBlobMaxUploadPagesBytes, bytes.NewReader(buffer[:n]), azblob.PageBlobAccessConditions{}, nil)
if err != nil {
// Send the error to the error channel in a non-blocking way. If there is already an error, just discart this one
select {
case errorInGorutine <- err:
default:
}
}
<-semaphore
}(counter, buffer, n)
counter++
}
wg.Wait()
select {
case err := <-errorInGorutine:
return err
default:
return nil
}
}