From 4df04643ab8ecf059860478531209331d91f598b Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Mon, 3 Jun 2024 18:57:18 +0200 Subject: [PATCH] internal/osbuildexecutor/aws-ec2: use osbuild-worker-executor Adds some unit tests as well. --- cmd/osbuild-worker/jobimpl-osbuild.go | 13 +- .../osbuildexecutor/runner-aws-ec2_test.go | 126 +++++++++ .../osbuildexecutor/runner-impl-aws-ec2.go | 239 +++++++++++++++--- 3 files changed, 335 insertions(+), 43 deletions(-) create mode 100644 internal/osbuildexecutor/runner-aws-ec2_test.go diff --git a/cmd/osbuild-worker/jobimpl-osbuild.go b/cmd/osbuild-worker/jobimpl-osbuild.go index 6f3930cbb..cb735de3f 100644 --- a/cmd/osbuild-worker/jobimpl-osbuild.go +++ b/cmd/osbuild-worker/jobimpl-osbuild.go @@ -502,7 +502,18 @@ func (impl *OSBuildJobImpl) Run(job worker.Job) error { case "host": executor = osbuildexecutor.NewHostExecutor() case "aws.ec2": - executor = osbuildexecutor.NewAWSEC2Executor(impl.OSBuildExecutor.IAMProfile, impl.OSBuildExecutor.KeyName, impl.OSBuildExecutor.CloudWatchGroup) + err = os.MkdirAll("/var/tmp/osbuild-composer", 0755) + if err != nil { + osbuildJobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorInvalidConfig, "Unable to create /var/tmp/osbuild-composer needed to aws.ec2 executor", nil) + return err + } + tmpDir, err := os.MkdirTemp("/var/tmp/osbuild-composer", "") + if err != nil { + osbuildJobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorInvalidConfig, "Unable to create /var/tmp/osbuild-composer needed to aws.ec2 executor", nil) + return err + } + defer os.RemoveAll(tmpDir) + executor = osbuildexecutor.NewAWSEC2Executor(impl.OSBuildExecutor.IAMProfile, impl.OSBuildExecutor.KeyName, impl.OSBuildExecutor.CloudWatchGroup, tmpDir) default: osbuildJobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorInvalidConfig, "No osbuild executor defined", nil) return err diff --git a/internal/osbuildexecutor/runner-aws-ec2_test.go b/internal/osbuildexecutor/runner-aws-ec2_test.go new file mode 100644 index 000000000..883c43d33 --- /dev/null +++ b/internal/osbuildexecutor/runner-aws-ec2_test.go @@ -0,0 +1,126 @@ +package osbuildexecutor + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/osbuild/images/pkg/osbuild" + "github.com/stretchr/testify/require" +) + +func TestWaitForSI(t *testing.T) { + server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + defer cancel() + require.False(t, waitForSI(ctx, server.URL)) + + server.Start() + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second*1) + defer cancel2() + require.True(t, waitForSI(ctx2, server.URL)) +} + +func TestWriteInputArchive(t *testing.T) { + cacheDir := t.TempDir() + storeDir := filepath.Join(cacheDir, "store") + require.NoError(t, os.Mkdir(storeDir, 0755)) + storeSubDir := filepath.Join(storeDir, "subdir") + require.NoError(t, os.Mkdir(storeSubDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(storeDir, "contents"), []byte("storedata"), 0600)) + require.NoError(t, os.WriteFile(filepath.Join(storeSubDir, "contents"), []byte("storedata"), 0600)) + + archive, err := writeInputArchive(cacheDir, storeDir, []string{"image"}, []byte("{\"version\": 2}")) + require.NoError(t, err) + + cmd := exec.Command("tar", + "-tf", + archive, + ) + out, err := cmd.CombinedOutput() + require.NoError(t, err) + require.ElementsMatch(t, []string{ + "control.json", + "manifest.json", + "store/", + "store/subdir/", + "store/subdir/contents", + "store/contents", + "", + }, strings.Split(string(out), "\n")) +} + +func TestHandleBuild(t *testing.T) { + buildServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusCreated) + osbuildResult := osbuild.Result{ + Success: true, + } + data, err := json.Marshal(osbuildResult) + require.NoError(t, err) + _, err = w.Write(data) + require.NoError(t, err) + })) + + cacheDir := t.TempDir() + inputArchive := filepath.Join(cacheDir, "test.tar") + require.NoError(t, os.WriteFile(inputArchive, []byte("test"), 0600)) + + osbuildResult, err := handleBuild(inputArchive, buildServer.URL) + require.NoError(t, err) + require.True(t, osbuildResult.Success) +} + +func TestHandleOutputArchive(t *testing.T) { + serverDir := t.TempDir() + serverOutputDir := filepath.Join(serverDir, "output") + require.NoError(t, os.Mkdir(serverOutputDir, 0755)) + serverImageDir := filepath.Join(serverOutputDir, "image") + require.NoError(t, os.Mkdir(serverImageDir, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(serverImageDir, "disk.img"), []byte("image"), 0600)) + + serverOutput := filepath.Join(serverDir, "server-output.tar") + cmd := exec.Command("tar", + "-C", + serverDir, + "-cf", + serverOutput, + filepath.Base(serverOutputDir), + ) + require.NoError(t, cmd.Run()) + + resultServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + file, err := os.Open(serverOutput) + if err != nil { + require.NoError(t, err) + } + defer file.Close() + _, err = io.Copy(w, file) + require.NoError(t, err) + })) + + outputDir := t.TempDir() + archive, err := fetchOutputArchive(outputDir, resultServer.URL) + require.NoError(t, err) + + extractDir := filepath.Join(outputDir, "extracted") + require.NoError(t, os.Mkdir(extractDir, 0755)) + require.NoError(t, extractOutputArchive(extractDir, archive)) + + content, err := os.ReadFile(filepath.Join(extractDir, "image", "disk.img")) + require.NoError(t, err) + require.Equal(t, []byte("image"), content) +} diff --git a/internal/osbuildexecutor/runner-impl-aws-ec2.go b/internal/osbuildexecutor/runner-impl-aws-ec2.go index 6486b1b3f..74a34f336 100644 --- a/internal/osbuildexecutor/runner-impl-aws-ec2.go +++ b/internal/osbuildexecutor/runner-impl-aws-ec2.go @@ -1,10 +1,15 @@ package osbuildexecutor import ( - "bytes" + "context" "encoding/json" + "fmt" "io" + "net/http" + "os" "os/exec" + "path/filepath" + "time" "github.com/osbuild/images/pkg/osbuild" "github.com/sirupsen/logrus" @@ -16,6 +21,7 @@ type awsEC2Executor struct { iamProfile string keyName string cloudWatchGroup string + tmpDir string } func prepareSources(manifest []byte, store string, extraEnv []string, result bool, errorWriter io.Writer) error { @@ -24,6 +30,167 @@ func prepareSources(manifest []byte, store string, extraEnv []string, result boo return err } +// TODO extract this, also used in the osbuild-worker-executor unit +// tests. +func waitForSI(ctx context.Context, host string) bool { + client := http.Client{ + Timeout: time.Second * 1, + } + + for { + resp, err := client.Get(fmt.Sprintf("%s/api/v1/", host)) + if err != nil { + logrus.Debugf("Waiting for secure instance continues: %v", err) + } + if resp != nil { + defer resp.Body.Close() + if resp.StatusCode == 200 { + return true + } + body, err := io.ReadAll(resp.Body) + if err != nil { + logrus.Warningf("Unable to read body waiting for secure instance: %v", err) + } + logrus.Debugf("Waiting for secure instance continues: %s", body) + } + select { + case <-ctx.Done(): + logrus.Error("Timeout waiting for secure instance to spin up") + return false + default: + time.Sleep(time.Second) + continue + } + } +} + +func writeInputArchive(cacheDir, store string, exports []string, manifestData []byte) (string, error) { + archive := filepath.Join(cacheDir, "input.tar") + control := filepath.Join(cacheDir, "control.json") + manifest := filepath.Join(cacheDir, "manifest.json") + + controlData := struct { + Exports []string `json:"exports"` + }{ + Exports: exports, + } + controlDataBytes, err := json.Marshal(controlData) + if err != nil { + return "", err + } + err = os.WriteFile(control, controlDataBytes, 0600) + if err != nil { + return "", err + } + err = os.WriteFile(manifest, manifestData, 0600) + if err != nil { + return "", err + } + + cmd := exec.Command("tar", + "-C", + cacheDir, + "-cf", + archive, + filepath.Base(control), + filepath.Base(manifest), + ) + if output, err := cmd.CombinedOutput(); err != nil { + return "", fmt.Errorf("Unable to create input tar: %w, %s", err, output) + } + // Separate tar call, as we need to switch to the store directory. + /* #nosec G204 */ + cmd = exec.Command("tar", + "-C", + filepath.Dir(store), + "-rf", + archive, + filepath.Base(store), + ) + if output, err := cmd.CombinedOutput(); err != nil { + return "", fmt.Errorf("Unable to create input tar: %w, %s", err, output) + } + + return archive, nil +} + +func handleBuild(inputArchive, host string) (*osbuild.Result, error) { + client := http.Client{ + Timeout: time.Minute * 60, + } + inputFile, err := os.Open(inputArchive) + if err != nil { + return nil, err + } + defer inputFile.Close() + + resp, err := client.Post(fmt.Sprintf("%s/api/v1/build", host), "application/x-tar", inputFile) + if err != nil { + return nil, fmt.Errorf("Unable to request build from executor instance: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != 201 { + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("Unable to read body waiting for build to run: %w, http status: %d", err, resp.StatusCode) + } + return nil, fmt.Errorf("Something went wrong during executor build: http status: %v, %d, %s", err, resp.StatusCode, body) + } + + var osbuildResult osbuild.Result + + err = json.NewDecoder(resp.Body).Decode(&osbuildResult) + if err != nil { + return nil, fmt.Errorf("Unable to decode response body into osbuild result: %w", err) + } + + return &osbuildResult, nil +} + +func fetchOutputArchive(cacheDir, host string) (string, error) { + client := http.Client{ + Timeout: time.Minute * 30, + } + + resp, err := client.Get(fmt.Sprintf("%s/api/v1/result/output.tar", host)) + if err != nil { + return "", err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("cannot fetch output archive: %w, http status: %d", err, resp.StatusCode) + } + return "", fmt.Errorf("cannot fetch output archive: %w, http status: %d, body: %s", err, resp.StatusCode, body) + } + file, err := os.Create(filepath.Join(cacheDir, "output.tar")) + if err != nil { + return "", fmt.Errorf("Unable to write executor result tarball: %w", err) + } + defer file.Close() + _, err = io.Copy(file, resp.Body) + if err != nil { + return "", fmt.Errorf("Unable to write executor result tarball: %w", err) + } + return file.Name(), nil +} + +func extractOutputArchive(outputDirectory, outputTar string) error { + cmd := exec.Command("tar", + "--strip-components=1", + "-C", + outputDirectory, + "-Sxf", + outputTar, + ) + if output, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("Unable to create input tar: %w, %s", err, output) + } + return nil + +} + func (ec2e *awsEC2Executor) RunOSBuild(manifest []byte, store, outputDirectory string, exports, exportPaths, checkpoints, extraEnv []string, result bool, errorWriter io.Writer) (*osbuild.Result, error) { @@ -53,61 +220,49 @@ func (ec2e *awsEC2Executor) RunOSBuild(manifest []byte, store, outputDirectory s } }() - logrus.Info("Spinning up jobsite manager") - args := []string{ - "--builder-host", - *si.Instance.PrivateIpAddress, - "--store", - store, + executorHost := fmt.Sprintf("http://%s:8001", *si.Instance.PrivateIpAddress) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + if !waitForSI(ctx, executorHost) { + return nil, fmt.Errorf("Timeout waiting for executor to come online") } - for _, exp := range exports { - args = append(args, "--export", exp) - } - for _, exp := range exportPaths { - args = append(args, "--export-file", exp) - } - args = append(args, "--output", outputDirectory) - - cmd := exec.Command( - "/usr/libexec/osbuild-composer/osbuild-jobsite-manager", - args..., - ) - - stdout := &bytes.Buffer{} - stderr := &bytes.Buffer{} - cmd.Stdout = stdout - cmd.Stderr = stderr - cmd.Stdin = bytes.NewReader(manifest) - - err = cmd.Start() + inputArchive, err := writeInputArchive(ec2e.tmpDir, store, exports, manifest) if err != nil { - logrus.Errorf("Starting osbuild-jobsite-manager failed: %v", err) - return nil, err - } - err = cmd.Wait() - if err != nil { - logrus.Errorf("Waiting for osbuild-jobsite-manager failed: %v", err) - if e, ok := err.(*exec.ExitError); ok { - logrus.Errorf("Exit code: %d", e.ExitCode()) - } - logrus.Errorf("StdErr :%s", stderr.String()) + logrus.Errorf("Unable to write input archive: %v", err) return nil, err } - var osbuildResult osbuild.Result - err = json.Unmarshal(stdout.Bytes(), &osbuildResult) + osbuildResult, err := handleBuild(inputArchive, executorHost) if err != nil { - logrus.Errorf("Unable to unmarshal stdout into osbuild result: %v", stdout.String()) + logrus.Errorf("Something went wrong handling the executor's build: %v", err) return nil, err } - return &osbuildResult, nil + if !osbuildResult.Success { + return osbuildResult, nil + } + + outputArchive, err := fetchOutputArchive(ec2e.tmpDir, executorHost) + if err != nil { + logrus.Errorf("Unable to fetch executor output: %v", err) + return nil, err + } + + err = extractOutputArchive(outputDirectory, outputArchive) + if err != nil { + logrus.Errorf("Unable to extract executor output: %v", err) + return nil, err + } + + return osbuildResult, nil } -func NewAWSEC2Executor(iamProfile, keyName, cloudWatchGroup string) Executor { +func NewAWSEC2Executor(iamProfile, keyName, cloudWatchGroup, tmpDir string) Executor { return &awsEC2Executor{ iamProfile, keyName, cloudWatchGroup, + tmpDir, } }