Even in case of errors, as long as create fleet returns an instance, attempt to use it. In some cases AWS returns `InsufficientInstanceCapacity` but still creates an instance: ``` msg="Won't retry CreateFleet with OnDemand instance, retry: false, errors: InsufficientInstanceCapacity: There is no Spot capacity available that matches your request.; Already launched instance ([i-...]), aborting create fleet" msg="doCreateFleetRetry: returning retry: false, msg: [InsufficientInstanceCapacity: There is no Spot capacity available that matches your request. Already launched instance ([i-...]), aborting create fleet]" msg="doCreateFleetRetry: cancelling retry, instance already exists: [i-...]" msg="doCreateFleetRetry: setting retry to true" msg="Checking to retry fleet create on error InsufficientInstanceCapacity (msg: There is no Spot capacity available that matches your request.)" ```
703 lines
20 KiB
Go
703 lines
20 KiB
Go
package awscloud
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
|
|
"github.com/aws/aws-sdk-go-v2/service/ec2"
|
|
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
|
|
smithy "github.com/aws/smithy-go"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type SecureInstance struct {
|
|
FleetID string
|
|
SGID string
|
|
LTID string
|
|
Instance *ec2types.Instance
|
|
InstanceID string
|
|
}
|
|
|
|
// SecureInstanceUserData returns the cloud-init user data for a secure instance.
|
|
func SecureInstanceUserData(cloudWatchGroup, hostname string) string {
|
|
additionalFiles := ""
|
|
|
|
if cloudWatchGroup != "" || hostname != "" {
|
|
additionalFiles += ` - path: /tmp/cloud_init_vars
|
|
content: |
|
|
`
|
|
}
|
|
if cloudWatchGroup != "" {
|
|
additionalFiles += fmt.Sprintf(` OSBUILD_EXECUTOR_CLOUDWATCH_GROUP='%s'
|
|
`, cloudWatchGroup)
|
|
}
|
|
if hostname != "" {
|
|
additionalFiles += fmt.Sprintf(` OSBUILD_EXECUTOR_HOSTNAME='%s'
|
|
`, hostname)
|
|
}
|
|
|
|
return fmt.Sprintf(`#cloud-config
|
|
write_files:
|
|
- path: /tmp/worker-run-executor-service
|
|
content: ''
|
|
%s`, additionalFiles)
|
|
}
|
|
|
|
// Runs an instance with a security group that only allows traffic to
|
|
// the host. Will replace resources if they already exists.
|
|
func (a *AWS) RunSecureInstance(iamProfile, keyName, cloudWatchGroup, hostname string) (*SecureInstance, error) {
|
|
identity, err := a.ec2imds.GetInstanceIdentityDocument(context.Background(), &imds.GetInstanceIdentityDocumentInput{})
|
|
if err != nil {
|
|
logrus.Errorf("Error getting the identity document, %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
descrInstancesOutput, err := a.ec2.DescribeInstances(
|
|
context.Background(),
|
|
&ec2.DescribeInstancesInput{
|
|
InstanceIds: []string{
|
|
identity.InstanceID,
|
|
},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(descrInstancesOutput.Reservations) != 1 || len(descrInstancesOutput.Reservations[0].Instances) != 1 {
|
|
return nil, fmt.Errorf("Expected exactly one reservation (got %d) with one instance (got %d)", len(descrInstancesOutput.Reservations), len(descrInstancesOutput.Reservations[0].Instances))
|
|
}
|
|
vpcID := *descrInstancesOutput.Reservations[0].Instances[0].VpcId
|
|
imageID := *descrInstancesOutput.Reservations[0].Instances[0].ImageId
|
|
subnetID := *descrInstancesOutput.Reservations[0].Instances[0].SubnetId
|
|
|
|
secureInstance := &SecureInstance{}
|
|
defer func() {
|
|
if secureInstance.Instance == nil {
|
|
logrus.Errorf("Unable to create secure instance, deleting resources")
|
|
if err := a.TerminateSecureInstance(secureInstance); err != nil {
|
|
logrus.Errorf("Deleting secure instance in defer unsuccessful: %v", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
previousSI, err := a.terminatePreviousSI(identity.InstanceID)
|
|
if err != nil {
|
|
logrus.Errorf("Unable to terminate previous secure instance %s (parent instance ID: %s): %v", previousSI, identity.InstanceID, err)
|
|
return nil, fmt.Errorf("Unable to terminate previous secure instance %s (parent instance ID: %s): %v", previousSI, identity.InstanceID, err)
|
|
}
|
|
if previousSI != "" {
|
|
logrus.Warningf("Previous instance (%s) terminated by parent instance (%s)", previousSI, identity.InstanceID)
|
|
}
|
|
|
|
sgID, err := a.createOrReplaceSG(identity.InstanceID, identity.PrivateIP, vpcID)
|
|
if sgID != "" {
|
|
secureInstance.SGID = sgID
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ltID, err := a.createOrReplaceLT(identity.InstanceID, imageID, sgID, iamProfile, keyName, cloudWatchGroup, hostname)
|
|
if ltID != "" {
|
|
secureInstance.LTID = ltID
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
descrSubnetsOutput, err := a.ec2.DescribeSubnets(
|
|
context.Background(),
|
|
&ec2.DescribeSubnetsInput{
|
|
Filters: []ec2types.Filter{
|
|
ec2types.Filter{
|
|
Name: aws.String("vpc-id"),
|
|
Values: []string{
|
|
vpcID,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(descrSubnetsOutput.Subnets) == 0 {
|
|
return nil, fmt.Errorf("Expected at least 1 subnet in the VPC, got 0")
|
|
}
|
|
|
|
createFleetOutput, err := a.createFleet(&ec2.CreateFleetInput{
|
|
LaunchTemplateConfigs: []ec2types.FleetLaunchTemplateConfigRequest{
|
|
ec2types.FleetLaunchTemplateConfigRequest{
|
|
LaunchTemplateSpecification: &ec2types.FleetLaunchTemplateSpecificationRequest{
|
|
LaunchTemplateId: aws.String(secureInstance.LTID),
|
|
Version: aws.String("1"),
|
|
},
|
|
Overrides: []ec2types.FleetLaunchTemplateOverridesRequest{
|
|
ec2types.FleetLaunchTemplateOverridesRequest{
|
|
SubnetId: aws.String(subnetID),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
TagSpecifications: []ec2types.TagSpecification{
|
|
ec2types.TagSpecification{
|
|
ResourceType: ec2types.ResourceTypeInstance,
|
|
Tags: []ec2types.Tag{
|
|
ec2types.Tag{
|
|
Key: aws.String("parent"),
|
|
Value: aws.String(identity.InstanceID),
|
|
},
|
|
ec2types.Tag{
|
|
Key: aws.String("Name"),
|
|
Value: aws.String(fmt.Sprintf("Executor-for-%s", identity.InstanceID)),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
TargetCapacitySpecification: &ec2types.TargetCapacitySpecificationRequest{
|
|
DefaultTargetCapacityType: ec2types.DefaultTargetCapacityTypeSpot,
|
|
TotalTargetCapacity: aws.Int32(1),
|
|
},
|
|
SpotOptions: &ec2types.SpotOptionsRequest{
|
|
AllocationStrategy: ec2types.SpotAllocationStrategyPriceCapacityOptimized,
|
|
},
|
|
Type: ec2types.FleetTypeInstant,
|
|
})
|
|
|
|
// retrieve any instance information even if there's an error, that way the instance
|
|
// will be terminated before other resources are removed.
|
|
if createFleetOutput != nil {
|
|
if createFleetOutput.FleetId != nil {
|
|
secureInstance.FleetID = *createFleetOutput.FleetId
|
|
}
|
|
if len(createFleetOutput.Instances) > 0 && len(createFleetOutput.Instances[0].InstanceIds) > 0 {
|
|
secureInstance.InstanceID = createFleetOutput.Instances[0].InstanceIds[0]
|
|
}
|
|
}
|
|
if err != nil {
|
|
if secureInstance.FleetID == "" || secureInstance.InstanceID == "" {
|
|
logrus.Infof("CreateFleet returned an error (%v), without either an instance (%s) or a fleet (%s)", err, secureInstance.InstanceID, secureInstance.FleetID)
|
|
return nil, err
|
|
}
|
|
logrus.Warnf("CreateFleet returned an error (%v) but also created an instance (%s) in fleet (%s), continuing as normal.", err, secureInstance.InstanceID, secureInstance.FleetID)
|
|
}
|
|
|
|
instWaiter := ec2.NewInstanceStatusOkWaiter(a.ec2)
|
|
err = instWaiter.Wait(
|
|
context.Background(),
|
|
&ec2.DescribeInstanceStatusInput{
|
|
InstanceIds: []string{
|
|
secureInstance.InstanceID,
|
|
},
|
|
},
|
|
time.Hour,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
descrInstOutput, err := a.ec2.DescribeInstances(
|
|
context.Background(),
|
|
&ec2.DescribeInstancesInput{
|
|
InstanceIds: []string{
|
|
secureInstance.InstanceID,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(descrInstOutput.Reservations) != 1 {
|
|
return nil, fmt.Errorf("Expected exactly 1 reservation for instance: %s, got %d", secureInstance.InstanceID, len(descrInstOutput.Reservations))
|
|
}
|
|
if len(descrInstOutput.Reservations[0].Instances) != 1 {
|
|
return nil, fmt.Errorf("Expected exactly 1 instance for instance: %s, got %d", secureInstance.InstanceID, len(descrInstOutput.Reservations[0].Instances))
|
|
}
|
|
secureInstance.Instance = &descrInstOutput.Reservations[0].Instances[0]
|
|
|
|
logrus.Infof(
|
|
"Secure instance created: https://%s.console.aws.amazon.com/ec2/v2/home?region=%s#InstanceDetails:instanceId=%s",
|
|
identity.Region,
|
|
identity.Region,
|
|
secureInstance.InstanceID)
|
|
|
|
return secureInstance, nil
|
|
}
|
|
|
|
func (a *AWS) TerminateSecureInstance(si *SecureInstance) error {
|
|
if err := a.deleteFleetIfExists(si); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := a.deleteSGIfExists(si); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := a.deleteLTIfExists(si); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *AWS) terminatePreviousSI(hostInstanceID string) (string, error) {
|
|
var retryCount int
|
|
const maxRetries = 10
|
|
|
|
descrInstancesOutput, err := a.ec2.DescribeInstances(
|
|
context.Background(),
|
|
&ec2.DescribeInstancesInput{
|
|
Filters: []ec2types.Filter{
|
|
ec2types.Filter{
|
|
Name: aws.String("tag:parent"),
|
|
Values: []string{hostInstanceID},
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if len(descrInstancesOutput.Reservations) == 0 || len(descrInstancesOutput.Reservations[0].Instances) == 0 {
|
|
return "", nil
|
|
}
|
|
|
|
if descrInstancesOutput.Reservations[0].Instances[0].State.Name == ec2types.InstanceStateNameTerminated {
|
|
return "", nil
|
|
}
|
|
|
|
instanceID := *descrInstancesOutput.Reservations[0].Instances[0].InstanceId
|
|
_, err = a.ec2.TerminateInstances(
|
|
context.Background(),
|
|
&ec2.TerminateInstancesInput{
|
|
InstanceIds: []string{instanceID},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return instanceID, err
|
|
}
|
|
|
|
retryFunction := func(ctx context.Context, input *ec2.DescribeInstancesInput, output *ec2.DescribeInstancesOutput, err error) (bool, error) {
|
|
|
|
if err == nil {
|
|
var lastState ec2types.InstanceStateName = "unknown"
|
|
for _, reservation := range output.Reservations {
|
|
for _, instance := range reservation.Instances {
|
|
if instance.State.Name == ec2types.InstanceStateNameTerminated {
|
|
return false, nil
|
|
}
|
|
lastState = instance.State.Name
|
|
}
|
|
}
|
|
logrus.Warnf("instance not terminated, but %s", lastState)
|
|
} else {
|
|
logrus.Errorf("Error encountered while waiting for termination: %v", err)
|
|
}
|
|
|
|
retryCount++
|
|
if retryCount > maxRetries {
|
|
logrus.Errorf("maximum retries reached, while waiting for %v to terminate", instanceID)
|
|
return false, err
|
|
}
|
|
logrus.Warnf("retry waiting for termination %d/%d", retryCount, maxRetries)
|
|
return true, nil
|
|
|
|
}
|
|
|
|
TerminatedWaiterOptions := func(o *ec2.InstanceTerminatedWaiterOptions) {
|
|
o.Retryable = retryFunction
|
|
o.MinDelay = 60 * time.Second
|
|
}
|
|
|
|
instTermWaiter := ec2.NewInstanceTerminatedWaiter(a.ec2)
|
|
err = instTermWaiter.Wait(
|
|
context.Background(),
|
|
&ec2.DescribeInstancesInput{
|
|
InstanceIds: []string{instanceID},
|
|
},
|
|
time.Hour,
|
|
TerminatedWaiterOptions,
|
|
)
|
|
if err != nil {
|
|
return instanceID, err
|
|
}
|
|
return instanceID, nil
|
|
}
|
|
|
|
func isInvalidGroupNotFoundErr(err error) bool {
|
|
var apiErr smithy.APIError
|
|
if errors.As(err, &apiErr) {
|
|
if apiErr.ErrorCode() == "InvalidGroup.NotFound" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (a *AWS) createOrReplaceSG(hostInstanceID, hostIP, vpcID string) (string, error) {
|
|
sgName := fmt.Sprintf("SG for %s (%s)", hostInstanceID, hostIP)
|
|
descrSGOutput, err := a.ec2.DescribeSecurityGroups(
|
|
context.Background(),
|
|
&ec2.DescribeSecurityGroupsInput{
|
|
Filters: []ec2types.Filter{
|
|
ec2types.Filter{
|
|
Name: aws.String("group-name"),
|
|
Values: []string{
|
|
sgName,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
if err != nil && !isInvalidGroupNotFoundErr(err) {
|
|
return "", err
|
|
}
|
|
if descrSGOutput != nil {
|
|
for _, sg := range descrSGOutput.SecurityGroups {
|
|
_, err := a.ec2.DeleteSecurityGroup(
|
|
context.Background(),
|
|
&ec2.DeleteSecurityGroupInput{
|
|
GroupId: sg.GroupId,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
}
|
|
|
|
cSGOutput, err := a.ec2.CreateSecurityGroup(
|
|
context.Background(),
|
|
&ec2.CreateSecurityGroupInput{
|
|
Description: aws.String(sgName),
|
|
GroupName: aws.String(sgName),
|
|
VpcId: aws.String(vpcID),
|
|
},
|
|
)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
sgID := *cSGOutput.GroupId
|
|
|
|
sgIngressOutput, err := a.ec2.AuthorizeSecurityGroupIngress(
|
|
context.Background(),
|
|
&ec2.AuthorizeSecurityGroupIngressInput{
|
|
GroupId: aws.String(sgID),
|
|
IpPermissions: []ec2types.IpPermission{
|
|
ec2types.IpPermission{
|
|
IpProtocol: aws.String(string(ec2types.ProtocolTcp)),
|
|
FromPort: aws.Int32(1),
|
|
ToPort: aws.Int32(65535),
|
|
IpRanges: []ec2types.IpRange{
|
|
ec2types.IpRange{
|
|
CidrIp: aws.String(fmt.Sprintf("%s/32", hostIP)),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return sgID, err
|
|
}
|
|
if !*sgIngressOutput.Return {
|
|
return sgID, fmt.Errorf("Unable to attach ingress rules to SG")
|
|
}
|
|
|
|
describeSGOutput, err := a.ec2.DescribeSecurityGroups(
|
|
context.Background(),
|
|
&ec2.DescribeSecurityGroupsInput{
|
|
GroupIds: []string{
|
|
sgID,
|
|
},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return sgID, err
|
|
}
|
|
|
|
// SGs are created with a predefind egress rule that allows all outgoing traffic, so expecting 1 outbound rule
|
|
if len(describeSGOutput.SecurityGroups[0].IpPermissions) != 1 || len(describeSGOutput.SecurityGroups[0].IpPermissionsEgress) != 1 {
|
|
return sgID, fmt.Errorf("Expected 2 security group rules: 1 inbound (got %d) and 1 outbound (got %d)",
|
|
len(describeSGOutput.SecurityGroups[0].IpPermissions), len(describeSGOutput.SecurityGroups[0].IpPermissionsEgress))
|
|
}
|
|
|
|
return sgID, nil
|
|
}
|
|
|
|
func isLaunchTemplateNotFoundError(err error) bool {
|
|
var apiErr smithy.APIError
|
|
if errors.As(err, &apiErr) {
|
|
if apiErr.ErrorCode() == "InvalidLaunchTemplateId.NotFound" || apiErr.ErrorCode() == "InvalidLaunchTemplateName.NotFoundException" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (a *AWS) createOrReplaceLT(hostInstanceID, imageID, sgID, iamProfile, keyName, cloudWatchGroup, hostname string) (string, error) {
|
|
ltName := fmt.Sprintf("launch-template-for-%s-runner-instance", hostInstanceID)
|
|
descrLTOutput, err := a.ec2.DescribeLaunchTemplates(
|
|
context.Background(),
|
|
&ec2.DescribeLaunchTemplatesInput{
|
|
LaunchTemplateNames: []string{
|
|
ltName,
|
|
},
|
|
},
|
|
)
|
|
|
|
if err != nil && !isLaunchTemplateNotFoundError(err) {
|
|
return "", err
|
|
}
|
|
|
|
if descrLTOutput != nil && len(descrLTOutput.LaunchTemplates) == 1 {
|
|
_, err := a.ec2.DeleteLaunchTemplate(
|
|
context.Background(),
|
|
&ec2.DeleteLaunchTemplateInput{
|
|
LaunchTemplateId: descrLTOutput.LaunchTemplates[0].LaunchTemplateId,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
|
|
input := &ec2.CreateLaunchTemplateInput{
|
|
LaunchTemplateData: &ec2types.RequestLaunchTemplateData{
|
|
ImageId: aws.String(imageID),
|
|
InstanceInitiatedShutdownBehavior: ec2types.ShutdownBehaviorTerminate,
|
|
InstanceRequirements: &ec2types.InstanceRequirementsRequest{
|
|
AcceleratorCount: &ec2types.AcceleratorCountRequest{
|
|
Max: aws.Int32(0),
|
|
},
|
|
BareMetal: ec2types.BareMetalExcluded,
|
|
// el10 needs x86_64-v3
|
|
ExcludedInstanceTypes: []string{
|
|
"m1.*",
|
|
"c1.*",
|
|
"t1.*",
|
|
"m2.*",
|
|
"i2.*",
|
|
"m3.*",
|
|
"c3.*",
|
|
"r3.*",
|
|
},
|
|
MemoryMiB: &ec2types.MemoryMiBRequest{
|
|
Min: aws.Int32(4096),
|
|
},
|
|
NetworkInterfaceCount: &ec2types.NetworkInterfaceCountRequest{
|
|
Min: aws.Int32(1),
|
|
},
|
|
VCpuCount: &ec2types.VCpuCountRangeRequest{
|
|
Min: aws.Int32(2),
|
|
},
|
|
},
|
|
BlockDeviceMappings: []ec2types.LaunchTemplateBlockDeviceMappingRequest{
|
|
ec2types.LaunchTemplateBlockDeviceMappingRequest{
|
|
DeviceName: aws.String("/dev/sda1"),
|
|
Ebs: &ec2types.LaunchTemplateEbsBlockDeviceRequest{
|
|
DeleteOnTermination: aws.Bool(true),
|
|
Encrypted: aws.Bool(true),
|
|
VolumeSize: aws.Int32(50),
|
|
VolumeType: ec2types.VolumeTypeGp3,
|
|
},
|
|
},
|
|
},
|
|
SecurityGroupIds: []string{
|
|
sgID,
|
|
},
|
|
UserData: aws.String(base64.StdEncoding.EncodeToString([]byte(SecureInstanceUserData(cloudWatchGroup, hostname)))),
|
|
},
|
|
TagSpecifications: []ec2types.TagSpecification{
|
|
ec2types.TagSpecification{
|
|
ResourceType: ec2types.ResourceTypeLaunchTemplate,
|
|
Tags: []ec2types.Tag{
|
|
ec2types.Tag{
|
|
Key: aws.String("parent"),
|
|
Value: aws.String(hostInstanceID),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
LaunchTemplateName: aws.String(ltName),
|
|
}
|
|
|
|
if iamProfile != "" {
|
|
input.LaunchTemplateData.IamInstanceProfile = &ec2types.LaunchTemplateIamInstanceProfileSpecificationRequest{
|
|
Name: aws.String(iamProfile),
|
|
}
|
|
}
|
|
|
|
if keyName != "" {
|
|
input.LaunchTemplateData.KeyName = aws.String(keyName)
|
|
}
|
|
|
|
createLaunchTemplateOutput, err := a.ec2.CreateLaunchTemplate(context.Background(), input)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return *createLaunchTemplateOutput.LaunchTemplate.LaunchTemplateId, nil
|
|
}
|
|
|
|
func (a *AWS) deleteFleetIfExists(si *SecureInstance) error {
|
|
if si.FleetID == "" {
|
|
return nil
|
|
}
|
|
|
|
delFlOutput, err := a.ec2.DeleteFleets(
|
|
context.Background(),
|
|
&ec2.DeleteFleetsInput{
|
|
FleetIds: []string{
|
|
si.FleetID,
|
|
},
|
|
TerminateInstances: aws.Bool(true),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(delFlOutput.UnsuccessfulFleetDeletions) != 0 || len(delFlOutput.SuccessfulFleetDeletions) != 1 {
|
|
return fmt.Errorf("Deleting fleet unsuccessful")
|
|
}
|
|
|
|
if si.InstanceID != "" {
|
|
instTermWaiter := ec2.NewInstanceTerminatedWaiter(a.ec2)
|
|
err = instTermWaiter.Wait(
|
|
context.Background(),
|
|
&ec2.DescribeInstancesInput{
|
|
InstanceIds: []string{si.InstanceID},
|
|
},
|
|
time.Hour,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
si.FleetID = ""
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *AWS) deleteLTIfExists(si *SecureInstance) error {
|
|
if si.LTID == "" {
|
|
return nil
|
|
}
|
|
|
|
_, err := a.ec2.DeleteLaunchTemplate(
|
|
context.Background(),
|
|
&ec2.DeleteLaunchTemplateInput{
|
|
LaunchTemplateId: aws.String(si.LTID),
|
|
},
|
|
)
|
|
if err == nil {
|
|
si.LTID = ""
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (a *AWS) deleteSGIfExists(si *SecureInstance) error {
|
|
if si.SGID == "" {
|
|
return nil
|
|
}
|
|
|
|
_, err := a.ec2.DeleteSecurityGroup(
|
|
context.Background(),
|
|
&ec2.DeleteSecurityGroupInput{
|
|
GroupId: aws.String(si.SGID),
|
|
},
|
|
)
|
|
if err == nil {
|
|
si.SGID = ""
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (a *AWS) createFleet(input *ec2.CreateFleetInput) (*ec2.CreateFleetOutput, error) {
|
|
logCreateFleetInput(input)
|
|
createFleetOutput, err := a.ec2.CreateFleet(context.Background(), input)
|
|
if err != nil {
|
|
return createFleetOutput, fmt.Errorf("Unable to create spot fleet: %w", err)
|
|
}
|
|
|
|
retry, fleetErrs := doCreateFleetRetry(createFleetOutput)
|
|
if len(fleetErrs) > 0 && retry {
|
|
logrus.Warnf("Received errors (%s) from CreateFleet, retrying CreateFleet with OnDemand instance", strings.Join(fleetErrs, "; "))
|
|
input.SpotOptions = nil
|
|
input.TargetCapacitySpecification.DefaultTargetCapacityType = ec2types.DefaultTargetCapacityTypeOnDemand
|
|
logCreateFleetInput(input)
|
|
createFleetOutput, err = a.ec2.CreateFleet(context.Background(), input)
|
|
if err != nil {
|
|
return createFleetOutput, fmt.Errorf("Unable to create on demand fleet: %w", err)
|
|
}
|
|
} else {
|
|
logrus.Infof("Won't retry CreateFleet with OnDemand instance, retry: %v, errors: %s", retry, strings.Join(fleetErrs, "; "))
|
|
}
|
|
|
|
retry, fleetErrs = doCreateFleetRetry(createFleetOutput)
|
|
if len(fleetErrs) > 0 && retry {
|
|
logrus.Warnf("Received errors (%s) from CreateFleet with OnDemand instance option, retrying across availability zones", strings.Join(fleetErrs, "; "))
|
|
input.LaunchTemplateConfigs[0].Overrides = nil
|
|
logCreateFleetInput(input)
|
|
createFleetOutput, err = a.ec2.CreateFleet(context.Background(), input)
|
|
if err != nil {
|
|
return createFleetOutput, fmt.Errorf("Unable to create on demand fleet across AZs: %w", err)
|
|
}
|
|
} else {
|
|
logrus.Infof("Won't retry CreateFleet across AZs, retry: %v, errors: %s", retry, strings.Join(fleetErrs, "; "))
|
|
}
|
|
|
|
if len(createFleetOutput.Errors) > 0 {
|
|
fleetErrs := []string{}
|
|
for _, fleetErr := range createFleetOutput.Errors {
|
|
fleetErrs = append(fleetErrs, fmt.Sprintf("%s: %s", *fleetErr.ErrorCode, *fleetErr.ErrorMessage))
|
|
}
|
|
return createFleetOutput, fmt.Errorf("Unable to create fleet: %v", strings.Join(fleetErrs, "; "))
|
|
}
|
|
|
|
if len(createFleetOutput.Instances) != 1 {
|
|
return createFleetOutput, fmt.Errorf("Unable to create fleet with exactly one instance, got %d instances", len(createFleetOutput.Instances))
|
|
}
|
|
if len(createFleetOutput.Instances[0].InstanceIds) != 1 {
|
|
return createFleetOutput, fmt.Errorf("Expected exactly one instance ID on fleet, got %d", len(createFleetOutput.Instances[0].InstanceIds))
|
|
}
|
|
return createFleetOutput, nil
|
|
}
|
|
|
|
func doCreateFleetRetry(cfOutput *ec2.CreateFleetOutput) (bool, []string) {
|
|
if cfOutput == nil {
|
|
return false, nil
|
|
}
|
|
|
|
retryCodes := []string{
|
|
"UnfulfillableCapacity",
|
|
"InsufficientInstanceCapacity",
|
|
}
|
|
msg := []string{}
|
|
retry := false
|
|
for _, err := range cfOutput.Errors {
|
|
logrus.Infof("Checking to retry fleet create on error %s (msg: %s)", *err.ErrorCode, *err.ErrorMessage)
|
|
if slices.Contains(retryCodes, *err.ErrorCode) {
|
|
retry = true
|
|
logrus.Infof("doCreateFleetRetry: setting retry to true")
|
|
}
|
|
msg = append(msg, fmt.Sprintf("%s: %s", *err.ErrorCode, *err.ErrorMessage))
|
|
}
|
|
|
|
// Do not retry in case an instance already exists, in that case just fail and let the worker terminate the SI
|
|
if len(cfOutput.Instances) > 0 && len(cfOutput.Instances[0].InstanceIds) > 0 {
|
|
logrus.Infof("doCreateFleetRetry: cancelling retry, instance already exists: %s", cfOutput.Instances[0].InstanceIds)
|
|
retry = false
|
|
msg = append(msg, fmt.Sprintf("Already launched instance (%s), aborting create fleet", cfOutput.Instances[0].InstanceIds))
|
|
}
|
|
|
|
logrus.Infof("doCreateFleetRetry: returning retry: %v, msg: %v", retry, msg)
|
|
return retry, msg
|
|
}
|
|
|
|
func logCreateFleetInput(input *ec2.CreateFleetInput) {
|
|
if inputJSON, err := json.Marshal(input); err != nil {
|
|
logrus.Warnf("Unable to marshal input for logging: %v", input)
|
|
} else {
|
|
logrus.Infof("Creating fleet with input: %s", inputJSON)
|
|
}
|
|
}
|