From 099b34b3016bb20d95924e9d27b32f3cd0fd6cf1 Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Thu, 30 Jun 2022 11:46:24 +0200 Subject: [PATCH] worker: Define new jobs to handle copying and resharing of images The copy job copies from one region to another. It does not preserve the sharing on the ami and it's snapshot, that needs to be queued separately. --- cmd/osbuild-worker/jobimpl-awsec2.go | 144 ++++++++++++++++++ cmd/osbuild-worker/main.go | 6 + internal/cloud/awscloud/awscloud.go | 220 ++++++++++++++++++++++----- internal/worker/json.go | 27 ++++ internal/worker/server.go | 50 ++++++ 5 files changed, 405 insertions(+), 42 deletions(-) create mode 100644 cmd/osbuild-worker/jobimpl-awsec2.go diff --git a/cmd/osbuild-worker/jobimpl-awsec2.go b/cmd/osbuild-worker/jobimpl-awsec2.go new file mode 100644 index 000000000..1a055d3ee --- /dev/null +++ b/cmd/osbuild-worker/jobimpl-awsec2.go @@ -0,0 +1,144 @@ +package main + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/sirupsen/logrus" + + "github.com/osbuild/osbuild-composer/internal/cloud/awscloud" + "github.com/osbuild/osbuild-composer/internal/worker" + "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" +) + +func getAWS(awsCreds, region string) (*awscloud.AWS, error) { + if awsCreds != "" { + return awscloud.NewFromFile(awsCreds, region) + } + return awscloud.NewDefault(region) +} + +type AWSEC2CopyJobImpl struct { + AWSCreds string +} + +func (impl *AWSEC2CopyJobImpl) Run(job worker.Job) error { + logWithId := logrus.WithField("jobId", job.Id()) + result := worker.AWSEC2CopyJobResult{} + + defer func() { + err := job.Update(&result) + if err != nil { + logWithId.Errorf("Error reporting job result: %v", err) + } + }() + + var args worker.AWSEC2CopyJob + err := job.Args(&args) + if err != nil { + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingJobArgs, fmt.Sprintf("Error parsing arguments: %v", err), nil) + return err + } + + aws, err := getAWS(impl.AWSCreds, args.TargetRegion) + if err != nil { + logWithId.Errorf("Error creating aws client: %v", err) + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorInvalidConfig, "Invalid worker config", nil) + return err + } + + ami, err := aws.CopyImage(args.TargetName, args.Ami, args.SourceRegion) + if err != nil { + logWithId.Errorf("Error copying ami: %v", err) + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorSharingTarget, fmt.Sprintf("Error copying ami %s", args.Ami), nil) + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case "InvalidRegion": + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorSharingTarget, fmt.Sprintf("Invalid source region '%s'", args.SourceRegion), nil) + case "InvalidAMIID.Malformed": + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorSharingTarget, fmt.Sprintf("Malformed source ami id '%s'", args.Ami), nil) + case "InvalidAMIID.NotFound": + fallthrough // CopyImage returns InvalidRequest instead of InvalidAMIID.NotFound + case "InvalidRequest": + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorSharingTarget, fmt.Sprintf("Source ami '%s' not found", args.Ami), nil) + } + } + return err + } + + result.Ami = ami + result.Region = args.TargetRegion + return nil +} + +type AWSEC2ShareJobImpl struct { + AWSCreds string +} + +func (impl *AWSEC2ShareJobImpl) Run(job worker.Job) error { + logWithId := logrus.WithField("jobId", job.Id()) + result := worker.AWSEC2ShareJobResult{} + + defer func() { + err := job.Update(&result) + if err != nil { + logWithId.Errorf("Error reporting job result: %v", err) + } + }() + + var args worker.AWSEC2ShareJob + err := job.Args(&args) + if err != nil { + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingJobArgs, fmt.Sprintf("Error parsing arguments: %v", err), nil) + return err + } + + if args.Ami == "" || args.Region == "" { + if job.NDynamicArgs() != 1 { + logWithId.Error("No arguments given and dynamic args empty") + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorNoDynamicArgs, "An ec2 share job should have args or depend on an ec2 copy job", nil) + return nil + } + var cjResult worker.AWSEC2CopyJobResult + err = job.DynamicArgs(0, &cjResult) + if err != nil { + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingDynamicArgs, "Error parsing dynamic args as ec2 copy job", nil) + return err + } + if cjResult.JobError != nil { + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorJobDependency, "AWSEC2CopyJob dependency failed", nil) + return nil + } + + args.Ami = cjResult.Ami + args.Region = cjResult.Region + } + + aws, err := getAWS(impl.AWSCreds, args.Region) + if err != nil { + logWithId.Errorf("Error creating aws client: %v", err) + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorInvalidConfig, "Invalid worker config", nil) + return err + } + + err = aws.ShareImage(args.Ami, args.ShareWithAccounts) + if err != nil { + logWithId.Errorf("Error sharing image: %v", err) + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorSharingTarget, fmt.Sprintf("Error sharing image with target %v", args.ShareWithAccounts), nil) + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case "InvalidAMIID.Malformed": + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorSharingTarget, fmt.Sprintf("Malformed ami id '%s'", args.Ami), nil) + case "InvalidAMIID.NotFound": + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorSharingTarget, fmt.Sprintf("Ami '%s' not found in region '%s'", args.Ami, args.Region), nil) + case "InvalidAMIAttributeItemValue": + result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorSharingTarget, fmt.Sprintf("Invalid user id to share ami with: %v", args.ShareWithAccounts), nil) + } + } + return err + } + + result.Ami = args.Ami + result.Region = args.Region + return nil +} diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index d5e6c94c5..7c5478682 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -464,6 +464,12 @@ func main() { worker.JobTypeContainerResolve: &ContainerResolveJobImpl{ AuthFilePath: containersAuthFilePath, }, + worker.JobTypeAWSEC2Copy: &AWSEC2CopyJobImpl{ + AWSCreds: awsCredentials, + }, + worker.JobTypeAWSEC2Share: &AWSEC2ShareJobImpl{ + AWSCreds: awsCredentials, + }, } acceptedJobTypes := []string{} diff --git a/internal/cloud/awscloud/awscloud.go b/internal/cloud/awscloud/awscloud.go index 62e8d1704..fd896e3c6 100644 --- a/internal/cloud/awscloud/awscloud.go +++ b/internal/cloud/awscloud/awscloud.go @@ -266,29 +266,6 @@ func (a *AWS) Register(name, bucket, key string, shareWith []string, rpmArch str snapshotID := importOutput.ImportSnapshotTasks[0].SnapshotTaskDetail.SnapshotId - if len(shareWith) > 0 { - logrus.Info("[AWS] 🎥 Sharing ec2 snapshot") - var userIds []*string - for _, v := range shareWith { - // Implicit memory alasing doesn't couse any bug in this case - /* #nosec G601 */ - userIds = append(userIds, &v) - } - _, err := a.ec2.ModifySnapshotAttribute( - &ec2.ModifySnapshotAttributeInput{ - Attribute: aws.String("createVolumePermission"), - OperationType: aws.String("add"), - SnapshotId: snapshotID, - UserIds: userIds, - }, - ) - if err != nil { - logrus.Warnf("[AWS] 📨 Error sharing ec2 snapshot: %v", err) - return nil, err - } - logrus.Info("[AWS] 📨 Shared ec2 snapshot") - } - // Tag the snapshot with the image name. req, _ := a.ec2.CreateTagsRequest( &ec2.CreateTagsInput{ @@ -348,33 +325,192 @@ func (a *AWS) Register(name, bucket, key string, shareWith []string, rpmArch str } if len(shareWith) > 0 { - logrus.Info("[AWS] 💿 Sharing ec2 AMI") - var launchPerms []*ec2.LaunchPermission - for _, id := range shareWith { - launchPerms = append(launchPerms, &ec2.LaunchPermission{ - // Implicit memory alasing doesn't couse any bug in this case - /* #nosec G601 */ - UserId: &id, - }) - } - _, err := a.ec2.ModifyImageAttribute( - &ec2.ModifyImageAttributeInput{ - ImageId: registerOutput.ImageId, - LaunchPermission: &ec2.LaunchPermissionModifications{ - Add: launchPerms, - }, - }, - ) + err = a.shareSnapshot(snapshotID, shareWith) + if err != nil { + return nil, err + } + err = a.shareImage(registerOutput.ImageId, shareWith) if err != nil { - logrus.Warnf("[AWS] 📨 Error sharing AMI: %v", err) return nil, err } - logrus.Info("[AWS] 💿 Shared AMI") } return registerOutput.ImageId, nil } +// target region is determined by the region configured in the aws session +func (a *AWS) CopyImage(name, ami, sourceRegion string) (string, error) { + result, err := a.ec2.CopyImage( + &ec2.CopyImageInput{ + Name: aws.String(name), + SourceImageId: aws.String(ami), + SourceRegion: aws.String(sourceRegion), + }, + ) + if err != nil { + return "", err + } + + dIInput := &ec2.DescribeImagesInput{ + ImageIds: []*string{result.ImageId}, + } + + // Custom waiter which waits indefinitely until a final state + w := request.Waiter{ + Name: "WaitUntilImageAvailable", + MaxAttempts: 0, + Delay: request.ConstantWaiterDelay(15 * time.Second), + Acceptors: []request.WaiterAcceptor{ + { + State: request.SuccessWaiterState, + Matcher: request.PathAllWaiterMatch, Argument: "Images[].State", + Expected: "available", + }, + { + State: request.FailureWaiterState, + Matcher: request.PathAnyWaiterMatch, Argument: "Images[].State", + Expected: "failed", + }, + }, + Logger: a.ec2.Config.Logger, + NewRequest: func(opts []request.Option) (*request.Request, error) { + var inCpy *ec2.DescribeImagesInput + if dIInput != nil { + tmp := *dIInput + inCpy = &tmp + } + req, _ := a.ec2.DescribeImagesRequest(inCpy) + req.SetContext(aws.BackgroundContext()) + req.ApplyOptions(opts...) + return req, nil + }, + } + err = w.WaitWithContext(aws.BackgroundContext()) + if err != nil { + return *result.ImageId, err + } + + // Tag image with name + _, err = a.ec2.CreateTags(&ec2.CreateTagsInput{ + Resources: []*string{result.ImageId}, + Tags: []*ec2.Tag{ + { + Key: aws.String("Name"), + Value: aws.String(name), + }, + }, + }) + + if err != nil { + return *result.ImageId, err + } + + imgs, err := a.ec2.DescribeImages(dIInput) + if err != nil { + return *result.ImageId, err + } + if len(imgs.Images) == 0 { + return *result.ImageId, fmt.Errorf("Unable to find image with id: %v", ami) + } + + // Tag snapshot with name + for _, bdm := range imgs.Images[0].BlockDeviceMappings { + _, err = a.ec2.CreateTags(&ec2.CreateTagsInput{ + Resources: []*string{bdm.Ebs.SnapshotId}, + Tags: []*ec2.Tag{ + { + Key: aws.String("Name"), + Value: aws.String(name), + }, + }, + }) + if err != nil { + return *result.ImageId, err + } + } + + return *result.ImageId, nil +} + +func (a *AWS) ShareImage(ami string, userIds []string) error { + imgs, err := a.ec2.DescribeImages( + &ec2.DescribeImagesInput{ + ImageIds: []*string{aws.String(ami)}, + }, + ) + if err != nil { + return err + } + if len(imgs.Images) == 0 { + return fmt.Errorf("Unable to find image with id: %v", ami) + } + + for _, bdm := range imgs.Images[0].BlockDeviceMappings { + err = a.shareSnapshot(bdm.Ebs.SnapshotId, userIds) + if err != nil { + return err + } + } + + err = a.shareImage(aws.String(ami), userIds) + if err != nil { + return err + } + return nil +} + +func (a *AWS) shareImage(ami *string, userIds []string) error { + logrus.Info("[AWS] 🎥 Sharing ec2 snapshot") + var uIds []*string + for i := range userIds { + uIds = append(uIds, &userIds[i]) + } + + logrus.Info("[AWS] 💿 Sharing ec2 AMI") + var launchPerms []*ec2.LaunchPermission + for _, id := range uIds { + launchPerms = append(launchPerms, &ec2.LaunchPermission{ + UserId: id, + }) + } + _, err := a.ec2.ModifyImageAttribute( + &ec2.ModifyImageAttributeInput{ + ImageId: ami, + LaunchPermission: &ec2.LaunchPermissionModifications{ + Add: launchPerms, + }, + }, + ) + if err != nil { + logrus.Warnf("[AWS] 📨 Error sharing AMI: %v", err) + return err + } + logrus.Info("[AWS] 💿 Shared AMI") + return nil +} + +func (a *AWS) shareSnapshot(snapshotId *string, userIds []string) error { + logrus.Info("[AWS] 🎥 Sharing ec2 snapshot") + var uIds []*string + for i := range userIds { + uIds = append(uIds, &userIds[i]) + } + _, err := a.ec2.ModifySnapshotAttribute( + &ec2.ModifySnapshotAttributeInput{ + Attribute: aws.String(ec2.SnapshotAttributeNameCreateVolumePermission), + OperationType: aws.String("add"), + SnapshotId: snapshotId, + UserIds: uIds, + }, + ) + if err != nil { + logrus.Warnf("[AWS] 📨 Error sharing ec2 snapshot: %v", err) + return err + } + logrus.Info("[AWS] 📨 Shared ec2 snapshot") + return nil +} + func (a *AWS) RemoveSnapshotAndDeregisterImage(image *ec2.Image) error { if image == nil { return fmt.Errorf("image is nil") diff --git a/internal/worker/json.go b/internal/worker/json.go index adb094aac..7adc79447 100644 --- a/internal/worker/json.go +++ b/internal/worker/json.go @@ -256,6 +256,33 @@ type ContainerResolveJobResult struct { JobResult } +type AWSEC2ShareJob struct { + Ami string `json:"ami"` + Region string `json:"region"` + ShareWithAccounts []string `json:"shareWithAccounts"` +} + +type AWSEC2ShareJobResult struct { + JobResult + + Ami string `json:"ami"` + Region string `json:"region"` +} + +type AWSEC2CopyJob struct { + Ami string `json:"ami"` + SourceRegion string `json:"source_region"` + TargetRegion string `json:"target_region"` + TargetName string `json:"target_name"` +} + +type AWSEC2CopyJobResult struct { + JobResult + + Ami string `json:"ami"` + Region string `json:"region"` +} + // // JSON-serializable types for the client // diff --git a/internal/worker/server.go b/internal/worker/server.go index 1289a0681..2ac477511 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -35,6 +35,8 @@ const ( JobTypeDepsolve string = "depsolve" JobTypeManifestIDOnly string = "manifest-id-only" JobTypeContainerResolve string = "container-resolve" + JobTypeAWSEC2Copy string = "aws-ec2-copy" + JobTypeAWSEC2Share string = "aws-ec2-share" ) type Server struct { @@ -157,6 +159,14 @@ func (s *Server) EnqueueContainerResolveJob(job *ContainerResolveJob, channel st return s.enqueue(JobTypeContainerResolve, job, nil, channel) } +func (s *Server) EnqueueAWSEC2CopyJob(job *AWSEC2CopyJob, parent uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue(JobTypeAWSEC2Copy, job, []uuid.UUID{parent}, channel) +} + +func (s *Server) EnqueueAWSEC2ShareJob(job *AWSEC2ShareJob, parent uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue(JobTypeAWSEC2Share, job, []uuid.UUID{parent}, channel) +} + func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) { prometheus.EnqueueJobMetrics(strings.Split(jobType, ":")[0], channel) return s.jobs.Enqueue(jobType, job, dependencies, channel) @@ -359,6 +369,32 @@ func (s *Server) ContainerResolveJobInfo(id uuid.UUID, result *ContainerResolveJ return jobInfo, nil } +func (s *Server) AWSEC2CopyJobInfo(id uuid.UUID, result *AWSEC2CopyJobResult) (*JobInfo, error) { + jobInfo, err := s.jobInfo(id, result) + if err != nil { + return nil, err + } + + if jobInfo.JobType != JobTypeAWSEC2Copy { + return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeAWSEC2Copy, jobInfo.JobType) + } + + return jobInfo, nil +} + +func (s *Server) AWSEC2ShareJobInfo(id uuid.UUID, result *AWSEC2ShareJobResult) (*JobInfo, error) { + jobInfo, err := s.jobInfo(id, result) + if err != nil { + return nil, err + } + + if jobInfo.JobType != JobTypeAWSEC2Share { + return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeAWSEC2Share, jobInfo.JobType) + } + + return jobInfo, nil +} + func (s *Server) jobInfo(id uuid.UUID, result interface{}) (*JobInfo, error) { jobType, channel, rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id) if err != nil { @@ -624,6 +660,20 @@ func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error { return err } jobResult = &kojiFinalizeJR.JobResult + case JobTypeAWSEC2Copy: + var awsEC2CopyJR AWSEC2CopyJobResult + jobInfo, err = s.AWSEC2CopyJobInfo(jobId, &awsEC2CopyJR) + if err != nil { + return err + } + jobResult = &awsEC2CopyJR.JobResult + case JobTypeAWSEC2Share: + var awsEC2ShareJR AWSEC2ShareJobResult + jobInfo, err = s.AWSEC2ShareJobInfo(jobId, &awsEC2ShareJR) + if err != nil { + return err + } + jobResult = &awsEC2ShareJR.JobResult case JobTypeContainerResolve: var containerResolveJR ContainerResolveJobResult