diff --git a/.gitignore b/.gitignore index 1c8887b0a..e68b4a8e6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__ /osbuild-worker /osbuild-pipeline /osbuild-upload-azure +/osbuild-upload-aws diff --git a/Makefile b/Makefile index 63d786b4a..d1f7a97dd 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,7 @@ build: go build -o osbuild-worker ./cmd/osbuild-worker/ go build -o osbuild-pipeline ./cmd/osbuild-pipeline/ go build -o osbuild-upload-azure ./cmd/osbuild-upload-azure/ + go build -o osbuild-upload-aws ./cmd/osbuild-upload-aws/ .PHONY: install install: diff --git a/cmd/osbuild-upload-aws/main.go b/cmd/osbuild-upload-aws/main.go index 32f0ffd6b..7bc132b75 100644 --- a/cmd/osbuild-upload-aws/main.go +++ b/cmd/osbuild-upload-aws/main.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/aws/aws-sdk-go/aws" - "github.com/osbuild/osbuild-composer/internal/awsupload" + "github.com/osbuild/osbuild-composer/internal/upload/awsupload" ) func main() { diff --git a/cmd/osbuild-upload-azure/main.go b/cmd/osbuild-upload-azure/main.go index 2e6943e53..779a2e13b 100644 --- a/cmd/osbuild-upload-azure/main.go +++ b/cmd/osbuild-upload-azure/main.go @@ -1,116 +1,14 @@ package main import ( - "bufio" - "bytes" - "context" "flag" "fmt" - "io" "log" - "net/url" - "os" "path" - "sync" - "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/osbuild/osbuild-composer/internal/upload/azure" ) -func handleErrors(err error) { - if err != nil { - if serr, ok := err.(azblob.StorageError); ok { // This error is a Service-specific - switch serr.ServiceCode() { // Compare serviceCode to ServiceCodeXxx constants - case azblob.ServiceCodeContainerAlreadyExists: - // This error is not fatal - fmt.Println("Received 409. Container already exists") - return - } - } - // All other error causes the program to exit - fmt.Println(err) - os.Exit(1) - } -} - -type azureCredentials struct { - storageAccount string - storageAccessKey string -} - -type azureImageMetadata struct { - containerName string - imageName string -} - -func azureUploadImage(credentials azureCredentials, metadata azureImageMetadata, fileName string, threads int) { - // Create a default request pipeline using your storage account name and account key. - credential, err := azblob.NewSharedKeyCredential(credentials.storageAccount, credentials.storageAccessKey) - handleErrors(err) - p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) - - // get storage account blob service URL endpoint. - URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", credentials.storageAccount, metadata.containerName)) - - // Create a ContainerURL object that wraps the container URL and a request - // pipeline to make requests. - containerURL := azblob.NewContainerURL(*URL, p) - - // Create the container, use a never-expiring context - ctx := context.Background() - - // Open the image file for reading - imageFile, err := os.Open(fileName) - handleErrors(err) - - // Stat image to get the file size - stat, err := imageFile.Stat() - handleErrors(err) - - // Create page blob URL. Page blob is required for VM images - blobURL := containerURL.NewPageBlobURL(metadata.imageName) - _, err = blobURL.Create(ctx, stat.Size(), 0, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}) - handleErrors(err) - - // Create control variables - // This channel simulates behavior of a semaphore and bounds the number of parallel threads - var semaphore = make(chan int, threads) - var counter int64 = 0 - - // Create buffered reader to speed up the upload - reader := bufio.NewReader(imageFile) - imageSize := stat.Size() - // Run the upload - run := true - var wg sync.WaitGroup - for run { - buffer := make([]byte, azblob.PageBlobMaxUploadPagesBytes) - n, err := reader.Read(buffer) - if err != nil { - if err == io.EOF { - run = false - } else { - panic(err) - } - } - if n == 0 { - break - } - wg.Add(1) - semaphore <- 1 - go func(counter int64, buffer []byte, n int) { - defer wg.Done() - _, err = blobURL.UploadPages(ctx, counter*azblob.PageBlobMaxUploadPagesBytes, bytes.NewReader(buffer[:n]), azblob.PageBlobAccessConditions{}, nil) - if err != nil { - log.Fatal(err) - } - <-semaphore - }(counter, buffer, n) - fmt.Printf("\rProgress: uploading bytest %d-%d from %d bytes", counter*azblob.PageBlobMaxUploadPagesBytes, counter*azblob.PageBlobMaxUploadPagesBytes+int64(n), imageSize) - counter++ - } - wg.Wait() -} - func checkStringNotEmpty(variable string, errorMessage string) { if variable == "" { log.Fatal(errorMessage) @@ -137,12 +35,12 @@ func main() { fmt.Println("Image to upload is:", fileName) - azureUploadImage(azureCredentials{ - storageAccount: storageAccount, - storageAccessKey: storageAccessKey, - }, azureImageMetadata{ - imageName: path.Base(fileName), - containerName: containerName, + azure.UploadImage(azure.Credentials{ + StorageAccount: storageAccount, + StorageAccessKey: storageAccessKey, + }, azure.ImageMetadata{ + ImageName: path.Base(fileName), + ContainerName: containerName, }, fileName, threads) } diff --git a/internal/jobqueue/job.go b/internal/jobqueue/job.go index 0defad289..09ca106a6 100644 --- a/internal/jobqueue/job.go +++ b/internal/jobqueue/job.go @@ -6,9 +6,9 @@ import ( "os/exec" "github.com/google/uuid" - "github.com/osbuild/osbuild-composer/internal/awsupload" "github.com/osbuild/osbuild-composer/internal/pipeline" "github.com/osbuild/osbuild-composer/internal/target" + "github.com/osbuild/osbuild-composer/internal/upload/awsupload" ) type Job struct { diff --git a/internal/awsupload/awsupload.go b/internal/upload/awsupload/awsupload.go similarity index 100% rename from internal/awsupload/awsupload.go rename to internal/upload/awsupload/awsupload.go diff --git a/internal/upload/azure/azure.go b/internal/upload/azure/azure.go new file mode 100644 index 000000000..eec8717ca --- /dev/null +++ b/internal/upload/azure/azure.go @@ -0,0 +1,120 @@ +package azure + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "net/url" + "os" + "sync" + + "github.com/Azure/azure-storage-blob-go/azblob" +) + +// Credentials contains credentials to connect to your account +// It uses so called "Client credentials", see the official documentation for more information: +// https://docs.microsoft.com/en-us/azure/go/azure-sdk-go-authorization#available-authentication-types-and-methods +type Credentials struct { + StorageAccount string + StorageAccessKey string +} + +// ImageMetadata contains information needed to store the image in a proper place. +// In case of Azure cloud storage this includes container name and blob name. +type ImageMetadata struct { + ContainerName string + ImageName string +} + +// UploadImage takes the metadata and credentials required to upload the image specified by `fileName` +// It can speed up the upload by using gorutines. The number of parallel gorutines is bounded by +// the `threads` argument. +func UploadImage(credentials Credentials, metadata ImageMetadata, fileName string, threads int) error { + // Create a default request pipeline using your storage account name and account key. + credential, err := azblob.NewSharedKeyCredential(credentials.StorageAccount, credentials.StorageAccessKey) + if err != nil { + return err + } + + p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) + + // get storage account blob service URL endpoint. + URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", credentials.StorageAccount, metadata.ContainerName)) + + // Create a ContainerURL object that wraps the container URL and a request + // pipeline to make requests. + containerURL := azblob.NewContainerURL(*URL, p) + + // Create the container, use a never-expiring context + ctx := context.Background() + + // Open the image file for reading + imageFile, err := os.Open(fileName) + if err != nil { + return err + } + + // Stat image to get the file size + stat, err := imageFile.Stat() + if err != nil { + return err + } + + // Create page blob URL. Page blob is required for VM images + blobURL := containerURL.NewPageBlobURL(metadata.ImageName) + _, err = blobURL.Create(ctx, stat.Size(), 0, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}) + if err != nil { + return err + } + + // Create control variables + // This channel simulates behavior of a semaphore and bounds the number of parallel threads + var semaphore = make(chan int, threads) + // Forward error from gorutine to the caller + var errorInGorutine = make(chan error, 1) + var counter int64 = 0 + + // Create buffered reader to speed up the upload + reader := bufio.NewReader(imageFile) + // Run the upload + run := true + var wg sync.WaitGroup + for run { + buffer := make([]byte, azblob.PageBlobMaxUploadPagesBytes) + n, err := reader.Read(buffer) + if err != nil { + if err == io.EOF { + run = false + } else { + return err + } + } + if n == 0 { + break + } + wg.Add(1) + semaphore <- 1 + go func(counter int64, buffer []byte, n int) { + defer wg.Done() + _, err = blobURL.UploadPages(ctx, counter*azblob.PageBlobMaxUploadPagesBytes, bytes.NewReader(buffer[:n]), azblob.PageBlobAccessConditions{}, nil) + if err != nil { + // Send the error to the error channel in a non-blocking way. If there is already an error, just discart this one + select { + case errorInGorutine <- err: + default: + } + } + <-semaphore + }(counter, buffer, n) + counter++ + } + wg.Wait() + select { + case err := <-errorInGorutine: + return err + default: + return nil + } +}