GCP: refactor logging and storage cleanup
Originally, the internal GCP library in `internal/upload/gcp` was logging various information and errors. Refactor the code to move all logging to callers of the library. As a result, some methods now return additional information to preserve the same amount of information being logged for GCP. Refactor methods to have only single purpose and not do any extra work, such as storage cleanup. Methods which create new resources now don't do any cleanup at all. The caller is responsible to check for any errors and perform any cleanup necessary. Necessary methods to perform cleanup are provided. Modify worker's job implementation and GCP CLI tool to explicitly do all necessary cleanup, including in case of errors. Signed-off-by: Tomas Hozza <thozza@redhat.com>
This commit is contained in:
parent
d7b0323a2d
commit
7de2011beb
3 changed files with 238 additions and 180 deletions
|
|
@ -1,12 +1,10 @@
|
|||
package gcp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
|
@ -23,10 +21,12 @@ import (
|
|||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
)
|
||||
|
||||
// GCP structure holds necessary information to authenticate and interact with GCP.
|
||||
type GCP struct {
|
||||
creds *google.Credentials
|
||||
}
|
||||
|
||||
// New returns an authenticated GCP instance, allowing to interact with GCP API.
|
||||
func New(credentials []byte) (*GCP, error) {
|
||||
scopes := []string{
|
||||
compute.ComputeScope, // permissions to image
|
||||
|
|
@ -66,13 +66,65 @@ func (g *GCP) GetProjectID() string {
|
|||
return g.creds.ProjectID
|
||||
}
|
||||
|
||||
// Upload uploads an OS image to specified Cloud Storage bucket and object.
|
||||
// StorageObjectUpload uploads an OS image to specified Cloud Storage bucket and object.
|
||||
// The bucket must exist. MD5 sum of the image file and uploaded object is
|
||||
// compared after the upload to verify the integrity of the uploaded image.
|
||||
//
|
||||
// The ObjectAttrs is returned if the object has been created.
|
||||
//
|
||||
// Uses:
|
||||
// - Storage API
|
||||
func (g *GCP) Upload(filename, bucket, object string) error {
|
||||
func (g *GCP) StorageObjectUpload(filename, bucket, object string) (*storage.ObjectAttrs, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
storageClient, err := storage.NewClient(ctx, option.WithCredentials(g.creds))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get Storage client: %v", err)
|
||||
}
|
||||
defer storageClient.Close()
|
||||
|
||||
// Open the image file
|
||||
imageFile, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open the image: %v", err)
|
||||
}
|
||||
defer imageFile.Close()
|
||||
|
||||
// Compute MD5 checksum of the image file for later verification
|
||||
imageFileHash := md5.New()
|
||||
if _, err := io.Copy(imageFileHash, imageFile); err != nil {
|
||||
return nil, fmt.Errorf("cannot create md5 of the image: %v", err)
|
||||
}
|
||||
// Move the cursor of opened file back to the start
|
||||
if _, err := imageFile.Seek(0, 0); err != nil {
|
||||
return nil, fmt.Errorf("cannot seek the image: %v", err)
|
||||
}
|
||||
|
||||
// Upload the image
|
||||
// The Bucket MUST exist and be of a STANDARD storage class
|
||||
obj := storageClient.Bucket(bucket).Object(object)
|
||||
wc := obj.NewWriter(ctx)
|
||||
|
||||
// Uploaded data is rejected if its MD5 hash does not match the set value.
|
||||
wc.MD5 = imageFileHash.Sum(nil)
|
||||
|
||||
if _, err = io.Copy(wc, imageFile); err != nil {
|
||||
return nil, fmt.Errorf("uploading the image failed: %v", err)
|
||||
}
|
||||
|
||||
// The object will not be available until Close has been called.
|
||||
if err := wc.Close(); err != nil {
|
||||
return nil, fmt.Errorf("Writer.Close: %v", err)
|
||||
}
|
||||
|
||||
return wc.Attrs(), nil
|
||||
}
|
||||
|
||||
// StorageObjectDelete deletes the given object from a bucket.
|
||||
//
|
||||
// Uses:
|
||||
// - Storage API
|
||||
func (g *GCP) StorageObjectDelete(bucket, object string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
storageClient, err := storage.NewClient(ctx, option.WithCredentials(g.creds))
|
||||
|
|
@ -81,57 +133,119 @@ func (g *GCP) Upload(filename, bucket, object string) error {
|
|||
}
|
||||
defer storageClient.Close()
|
||||
|
||||
// Open the image file
|
||||
imageFile, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open the image: %v", err)
|
||||
}
|
||||
defer imageFile.Close()
|
||||
|
||||
// Compute MD5 checksum of the image file for later verification
|
||||
imageFileHash := md5.New()
|
||||
if _, err := io.Copy(imageFileHash, imageFile); err != nil {
|
||||
return fmt.Errorf("cannot create md5 of the image: %v", err)
|
||||
}
|
||||
// Move the cursor of opened file back to the start
|
||||
if _, err := imageFile.Seek(0, 0); err != nil {
|
||||
return fmt.Errorf("cannot seek the image: %v", err)
|
||||
}
|
||||
|
||||
// Upload the image
|
||||
// The Bucket MUST exist and be of a STANDARD storage class
|
||||
obj := storageClient.Bucket(bucket).Object(object)
|
||||
wc := obj.NewWriter(ctx)
|
||||
log.Printf("[GCP] 🚀 Uploading image to: %s/%s\n", bucket, object)
|
||||
if _, err = io.Copy(wc, imageFile); err != nil {
|
||||
return fmt.Errorf("uploading the image failed: %v", err)
|
||||
}
|
||||
|
||||
if err := wc.Close(); err != nil {
|
||||
return fmt.Errorf("Writer.Close: %v", err)
|
||||
}
|
||||
|
||||
// Verify the MD5 sum of the uploaded file
|
||||
objAttrs, err := obj.Attrs(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get uploaded object attributed: %v", err)
|
||||
}
|
||||
objChecksum := objAttrs.MD5
|
||||
fileChecksum := imageFileHash.Sum(nil)
|
||||
if !bytes.Equal(objChecksum, fileChecksum) {
|
||||
return fmt.Errorf("error during image upload. the image seems to be corrupted")
|
||||
objectHandle := storageClient.Bucket(bucket).Object(object)
|
||||
if err = objectHandle.Delete(ctx); err != nil {
|
||||
return fmt.Errorf("failed to delete image file object: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Import imports a previously uploaded image by submitting a Cloud Build API
|
||||
// StorageImageImportCleanup deletes all objects created as part of an Image
|
||||
// import into Compute Engine and the related Build Job. The method returns a list
|
||||
// of deleted Storage objects, as well as list of errors which occurred during
|
||||
// the cleanup. The method tries to clean up as much as possible, therefore
|
||||
// it does not return on non-fatal errors.
|
||||
//
|
||||
// The Build job stores a copy of the to-be-imported image in a region specific
|
||||
// bucket, along with the Build job logs and some cache files.
|
||||
//
|
||||
// Uses:
|
||||
// - Compute Engine API
|
||||
// - Storage API
|
||||
func (g *GCP) StorageImageImportCleanup(imageName string) ([]string, []error) {
|
||||
var deletedObjects []string
|
||||
var errors []error
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
storageClient, err := storage.NewClient(ctx, option.WithCredentials(g.creds))
|
||||
if err != nil {
|
||||
errors = append(errors, fmt.Errorf("failed to get Storage client: %v", err))
|
||||
return deletedObjects, errors
|
||||
}
|
||||
defer storageClient.Close()
|
||||
|
||||
computeService, err := compute.NewService(ctx, option.WithCredentials(g.creds))
|
||||
if err != nil {
|
||||
errors = append(errors, fmt.Errorf("failed to get Compute Engine client: %v", err))
|
||||
return deletedObjects, errors
|
||||
}
|
||||
|
||||
// Clean up the cache bucket
|
||||
imageGetCall := computeService.Images.Get(g.creds.ProjectID, imageName)
|
||||
image, err := imageGetCall.Do()
|
||||
if err != nil {
|
||||
// Without the image, we can not determine which objects to delete, just return
|
||||
errors = append(errors, fmt.Errorf("failed to get image: %v", err))
|
||||
return deletedObjects, errors
|
||||
}
|
||||
|
||||
// Determine the regular expression to match files related to the specific Image Import
|
||||
// e.g. "https://www.googleapis.com/compute/v1/projects/ascendant-braid-303513/zones/europe-west1-b/disks/disk-d7tr4"
|
||||
// e.g. "https://www.googleapis.com/compute/v1/projects/ascendant-braid-303513/zones/europe-west1-b/disks/disk-l7s2w-1"
|
||||
// Needed is only the part between "disk-" and possible "-<num>"/"EOF"
|
||||
ss := strings.Split(image.SourceDisk, "/")
|
||||
srcDiskName := ss[len(ss)-1]
|
||||
ss = strings.Split(srcDiskName, "-")
|
||||
if len(ss) < 2 {
|
||||
errors = append(errors, fmt.Errorf("unexpected source disk name '%s', can not clean up storage", srcDiskName))
|
||||
return deletedObjects, errors
|
||||
}
|
||||
scrDiskSuffix := ss[1]
|
||||
// e.g. "gce-image-import-2021-02-05T17:27:40Z-2xhp5/daisy-import-image-20210205-17:27:43-s6l0l/logs/daisy.log"
|
||||
reStr := fmt.Sprintf("gce-image-import-.+-%s", scrDiskSuffix)
|
||||
cacheFilesRe := regexp.MustCompile(reStr)
|
||||
|
||||
buckets := storageClient.Buckets(ctx, g.creds.ProjectID)
|
||||
for {
|
||||
bkt, err := buckets.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
errors = append(errors, fmt.Errorf("failure while iterating over storage buckets: %v", err))
|
||||
return deletedObjects, errors
|
||||
}
|
||||
|
||||
// Check all buckets created by the Image Import Build jobs
|
||||
// These are named e.g. "<project_id>-daisy-bkt-eu" - "ascendant-braid-303513-daisy-bkt-eu"
|
||||
if strings.HasPrefix(bkt.Name, fmt.Sprintf("%s-daisy-bkt", g.creds.ProjectID)) {
|
||||
objects := storageClient.Bucket(bkt.Name).Objects(ctx, nil)
|
||||
for {
|
||||
obj, err := objects.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
// Do not return, just log, to clean up as much as possible!
|
||||
errors = append(errors, fmt.Errorf("failure while iterating over bucket objects: %v", err))
|
||||
break
|
||||
}
|
||||
if cacheFilesRe.FindString(obj.Name) != "" {
|
||||
o := storageClient.Bucket(bkt.Name).Object(obj.Name)
|
||||
if err = o.Delete(ctx); err != nil {
|
||||
// Do not return, just log, to clean up as much as possible!
|
||||
errors = append(errors, fmt.Errorf("failed to delete storage object: %v", err))
|
||||
}
|
||||
deletedObjects = append(deletedObjects, fmt.Sprintf("%s/%s", bkt.Name, obj.Name))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return deletedObjects, errors
|
||||
}
|
||||
|
||||
// ComputeImageImport imports a previously uploaded image by submitting a Cloud Build API
|
||||
// job. The job builds an image into Compute Node from an image uploaded to the
|
||||
// storage.
|
||||
//
|
||||
// The source image file is deleted from the storage bucket after a successful
|
||||
// image import. Also all cache files created as part of the image import are
|
||||
// deleted from the respective storage bucket.
|
||||
// The Build job usually creates a number of cache files in the Storage.
|
||||
// This method does not do any cleanup, regardless if the image import succeeds or fails.
|
||||
//
|
||||
// To delete the Storage object used for image import, use StorageObjectDelete().
|
||||
// To cleanup cache files after the Build job, use StorageImageImportCleanup().
|
||||
//
|
||||
// bucket - Google storage bucket name with the uploaded image
|
||||
// object - Google storage object name of the uploaded image
|
||||
|
|
@ -153,11 +267,11 @@ func (g *GCP) Upload(filename, bucket, object string) error {
|
|||
//
|
||||
// Uses:
|
||||
// - Cloud Build API
|
||||
func (g *GCP) Import(bucket, object, imageName, os, region string) error {
|
||||
func (g *GCP) ComputeImageImport(bucket, object, imageName, os, region string) (*cloudbuildpb.Build, error) {
|
||||
ctx := context.Background()
|
||||
cloudbuildClient, err := cloudbuild.NewClient(ctx, option.WithCredentials(g.creds))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get Cloud Build client: %v", err)
|
||||
return nil, fmt.Errorf("failed to get Cloud Build client: %v", err)
|
||||
}
|
||||
defer cloudbuildClient.Close()
|
||||
|
||||
|
|
@ -195,33 +309,29 @@ func (g *GCP) Import(bucket, object, imageName, os, region string) error {
|
|||
ProjectId: g.creds.ProjectID,
|
||||
Build: imageBuild,
|
||||
}
|
||||
log.Printf("[GCP] 📥 Importing image into Compute Node as '%s'\n", imageName)
|
||||
|
||||
resp, err := cloudbuildClient.CreateBuild(ctx, createBuildReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create image import build job: %v", err)
|
||||
return nil, fmt.Errorf("failed to create image import build job: %v", err)
|
||||
}
|
||||
|
||||
// Get the returned Build struct
|
||||
buildOpMetadata := &cloudbuildpb.BuildOperationMetadata{}
|
||||
if err := ptypes.UnmarshalAny(resp.Metadata, buildOpMetadata); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
imageBuild = buildOpMetadata.Build
|
||||
|
||||
log.Printf("[GCP] 📜 Image import log URL: %s\n", imageBuild.LogUrl)
|
||||
log.Printf("[GCP] 🤔 Image import build status: %+v\n", imageBuild.Status)
|
||||
|
||||
getBuldReq := &cloudbuildpb.GetBuildRequest{
|
||||
ProjectId: imageBuild.ProjectId,
|
||||
Id: imageBuild.Id,
|
||||
}
|
||||
|
||||
// Wait for the build to finish
|
||||
log.Println("[GCP] 🥱 Waiting for the image import to finish")
|
||||
for {
|
||||
imageBuild, err = cloudbuildClient.GetBuild(ctx, getBuldReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get the build info: %v", err)
|
||||
return imageBuild, fmt.Errorf("failed to get the build info: %v", err)
|
||||
}
|
||||
// The build finished
|
||||
if imageBuild.Status != cloudbuildpb.Build_WORKING && imageBuild.Status != cloudbuildpb.Build_QUEUED {
|
||||
|
|
@ -230,115 +340,20 @@ func (g *GCP) Import(bucket, object, imageName, os, region string) error {
|
|||
time.Sleep(time.Second * 30)
|
||||
}
|
||||
|
||||
fmt.Printf("[GCP] 🎉 Image import finished with status: %s\n", imageBuild.Status)
|
||||
|
||||
// Clean up cache files created by the Image Import Build job
|
||||
if err = g.ImageImportStorageCleanup(bucket, object, imageName); err != nil {
|
||||
fmt.Printf("storage cleanup failed: %v", err)
|
||||
}
|
||||
|
||||
if imageBuild.Status != cloudbuildpb.Build_SUCCESS {
|
||||
return fmt.Errorf("image import didn't finish successfully: %s", imageBuild.Status)
|
||||
return imageBuild, fmt.Errorf("image import didn't finish successfully: %s", imageBuild.Status)
|
||||
}
|
||||
|
||||
fmt.Printf("[GCP] 💿 Image URL: https://console.cloud.google.com/compute/imagesDetail/projects/%s/global/images/%s\n", g.creds.ProjectID, imageName)
|
||||
|
||||
return nil
|
||||
return imageBuild, nil
|
||||
}
|
||||
|
||||
// ImageImportStorageCleanup deletes all objects created as part of an Image
|
||||
// import into Compute Engine and the related Build Job. It also deletes the
|
||||
// source image file, which has been used for image import.
|
||||
//
|
||||
// The Build job stores a copy of the to-be-imported image in a region specific
|
||||
// bucket, along with the Build job logs.
|
||||
//
|
||||
// Uses:
|
||||
// - Compute Engine API
|
||||
// - Storage API
|
||||
func (g *GCP) ImageImportStorageCleanup(bucket, object, imageName string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
storageClient, err := storage.NewClient(ctx, option.WithCredentials(g.creds))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get Storage client: %v", err)
|
||||
}
|
||||
defer storageClient.Close()
|
||||
|
||||
computeService, err := compute.NewService(ctx, option.WithCredentials(g.creds))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get Compute Engine client: %v", err)
|
||||
}
|
||||
|
||||
// Clean up the cache bucket
|
||||
imageGetCall := computeService.Images.Get(g.creds.ProjectID, imageName)
|
||||
image, err := imageGetCall.Do()
|
||||
if err != nil {
|
||||
// Without the image, we can not determine which objects to delete, just return
|
||||
return fmt.Errorf("failed to get image: %v", err)
|
||||
}
|
||||
|
||||
// Determine the regular expression to match files related to the specific Image Import
|
||||
// e.g. "https://www.googleapis.com/compute/v1/projects/ascendant-braid-303513/zones/europe-west1-b/disks/disk-d7tr4"
|
||||
// e.g. "https://www.googleapis.com/compute/v1/projects/ascendant-braid-303513/zones/europe-west1-b/disks/disk-l7s2w-1"
|
||||
// Needed is only the part between "disk-" and possible "-<num>"/"EOF"
|
||||
ss := strings.Split(image.SourceDisk, "/")
|
||||
srcDiskName := ss[len(ss)-1]
|
||||
ss = strings.Split(srcDiskName, "-")
|
||||
if len(ss) < 2 {
|
||||
return fmt.Errorf("unexpected source disk name '%s', can not clean up storage", srcDiskName)
|
||||
}
|
||||
scrDiskSuffix := ss[1]
|
||||
// e.g. "gce-image-import-2021-02-05T17:27:40Z-2xhp5/daisy-import-image-20210205-17:27:43-s6l0l/logs/daisy.log"
|
||||
reStr := fmt.Sprintf("gce-image-import-.+-%s", scrDiskSuffix)
|
||||
cacheFilesRe := regexp.MustCompile(reStr)
|
||||
|
||||
buckets := storageClient.Buckets(ctx, g.creds.ProjectID)
|
||||
for {
|
||||
bkt, err := buckets.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failure while iterating over storage buckets: %v", err)
|
||||
}
|
||||
|
||||
// Check all buckets created by the Image Import Build jobs
|
||||
// These are named e.g. "<project_id>-daisy-bkt-eu" - "ascendant-braid-303513-daisy-bkt-eu"
|
||||
if strings.HasPrefix(bkt.Name, fmt.Sprintf("%s-daisy-bkt", g.creds.ProjectID)) {
|
||||
objects := storageClient.Bucket(bkt.Name).Objects(ctx, nil)
|
||||
for {
|
||||
obj, err := objects.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
// Do not return, just log, to clean up as much as possible!
|
||||
fmt.Printf("ERROR: failure while iterating over storage objects: %v", err)
|
||||
break
|
||||
}
|
||||
if cacheFilesRe.FindString(obj.Name) != "" {
|
||||
o := storageClient.Bucket(bkt.Name).Object(obj.Name)
|
||||
fmt.Printf("[GCP] 🧹 Deleting image import job file '%s'\n", obj.Name)
|
||||
if err = o.Delete(ctx); err != nil {
|
||||
// Do not return, just log, to clean up as much as possible!
|
||||
fmt.Printf("ERROR: failed to delete storage object: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
imageFileObject := storageClient.Bucket(bucket).Object(object)
|
||||
fmt.Printf("[GCP] 🧹 Deleting image file from Storage: %s/%s\n", bucket, object)
|
||||
if err = imageFileObject.Delete(ctx); err != nil {
|
||||
return fmt.Errorf("failed to delete image file object: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
// ComputeImageURL returns an image's URL to Google Cloud Console. The method does
|
||||
// not check at all, if the image actually exists or not.
|
||||
func (g *GCP) ComputeImageURL(imageName string) string {
|
||||
return fmt.Sprintf("https://console.cloud.google.com/compute/imagesDetail/projects/%s/global/images/%s", g.creds.ProjectID, imageName)
|
||||
}
|
||||
|
||||
// Share shares the specified Compute Engine image with list of accounts.
|
||||
// ComputeImageShare shares the specified Compute Engine image with list of accounts.
|
||||
//
|
||||
// "shareWith" is a list of accounts to share the image with. Items can be one
|
||||
// of the following options:
|
||||
|
|
@ -357,7 +372,7 @@ func (g *GCP) ImageImportStorageCleanup(bucket, object, imageName string) error
|
|||
//
|
||||
// Uses:
|
||||
// - Compute Engine API
|
||||
func (g *GCP) Share(imageName string, shareWith []string) error {
|
||||
func (g *GCP) ComputeImageShare(imageName string, shareWith []string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
computeService, err := compute.NewService(ctx, option.WithCredentials(g.creds))
|
||||
|
|
@ -389,7 +404,6 @@ func (g *GCP) Share(imageName string, shareWith []string) error {
|
|||
Policy: newPolicy,
|
||||
}
|
||||
newPolicyCall := computeService.Images.SetIamPolicy(g.creds.ProjectID, imageName, req)
|
||||
fmt.Printf("[GCP] Sharing the image with: %+v\n", shareWith)
|
||||
_, err = newPolicyCall.Do()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set new image policy: %v", err)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue