From 0bd66cf8e866667e88b31d4b2e3037b35a209487 Mon Sep 17 00:00:00 2001 From: Simon de Vlieger Date: Thu, 25 Jan 2024 15:00:37 +0100 Subject: [PATCH] jobsite: `manager` and `builder` WIP --- cmd/osbuild-jobsite-builder/main.go | 415 ++++++++++++++++++++++++++++ cmd/osbuild-jobsite-manager/main.go | 387 ++++++++++++++++++++++++++ 2 files changed, 802 insertions(+) create mode 100644 cmd/osbuild-jobsite-builder/main.go create mode 100644 cmd/osbuild-jobsite-manager/main.go diff --git a/cmd/osbuild-jobsite-builder/main.go b/cmd/osbuild-jobsite-builder/main.go new file mode 100644 index 000000000..a94784b16 --- /dev/null +++ b/cmd/osbuild-jobsite-builder/main.go @@ -0,0 +1,415 @@ +package main + +import ( + "bufio" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +const ( + ExitOk int = iota +) + +type State int + +const ( + StateClaim State = iota + StateProvision + StatePopulate + StateBuild + StateProgress + StateExport + StateDone + + StateError + StateSignal + StateTimeout +) + +var ( + argJSON bool + + argBuilderHost string + argBuilderPort int + + argTimeoutClaim int + argTimeoutProvision int + argTimeoutPopulate int + argTimeoutBuild int + argTimeoutExport int + + argBuildPath string +) + +type BuildRequest struct { + Pipelines []string `json:"pipelines"` + Environments []string `json:"environments"` +} + +func init() { + flag.BoolVar(&argJSON, "json", false, "Enable JSON output") + + flag.StringVar(&argBuilderHost, "builder-host", "localhost", "Hostname or IP where this program will listen on.") + flag.IntVar(&argBuilderPort, "builder-port", 3333, "Port this program will listen on.") + + flag.IntVar(&argTimeoutClaim, "timeout-claim", 600, "Timeout before the claim phase needs to be completed in seconds.") + flag.IntVar(&argTimeoutProvision, "timeout-provision", 30, "Timeout before the provision phase needs to be completed in seconds.") + flag.IntVar(&argTimeoutPopulate, "timeout-populate", 30, "Timeout before the populate phase needs to be completed in seconds.") + flag.IntVar(&argTimeoutBuild, "timeout-build", 3600, "Timeout before the build phase needs to be completed in seconds.") + flag.IntVar(&argTimeoutExport, "timeout-export", 1800, "Timeout before the export phase needs to be completed in seconds.") + + flag.StringVar(&argBuildPath, "build-path", "/run/osbuild", "Path to use as a build directory.") + + flag.Parse() + + logrus.SetLevel(logrus.InfoLevel) + + if argJSON { + logrus.SetFormatter(&logrus.JSONFormatter{}) + } +} + +type Agent struct { + Host string + Port int + State State + StateLock sync.Mutex + StateChannel chan State +} + +type BackgroundProcess struct { + Process *exec.Cmd + Stdout io.ReadCloser + Stderr io.ReadCloser + Done bool + Error error +} + +var ( + Build *BackgroundProcess +) + +func (agent *Agent) SetState(state State) { + agent.StateLock.Lock() + defer agent.StateLock.Unlock() + + if state <= agent.State { + agent.State = StateError + } else { + agent.State = state + } + + agent.StateChannel <- agent.State +} + +func (agent *Agent) GetState() State { + agent.StateLock.Lock() + defer agent.StateLock.Unlock() + + return agent.State +} + +func (agent *Agent) GuardState(stateWanted State) { + if stateCurrent := agent.GetState(); stateWanted != stateCurrent { + logrus.Fatalf("Agent.GuardState: Requested guard for %d but we're in %d. Exit.", stateWanted, stateCurrent) + } +} + +func (agent *Agent) HandleClaim(w http.ResponseWriter, r *http.Request) { + agent.GuardState(StateClaim) + + if r.Method != "POST" { + logrus.WithFields( + logrus.Fields{"method": r.Method}, + ).Fatal("Agent.HandleClaim: unexpected request method") + } + + fmt.Fprintf(w, "%s", "done") + + logrus.Info("Agent.HandleClaim: Done.") + + agent.SetState(StateProvision) +} + +func (agent *Agent) HandleProvision(w http.ResponseWriter, r *http.Request) { + agent.GuardState(StateProvision) + + if r.Method != "PUT" { + logrus.WithFields( + logrus.Fields{"method": r.Method}, + ).Fatal("Agent.HandleProvision: Unexpected request method.") + } + + logrus.WithFields(logrus.Fields{"argBuildPath": argBuildPath}).Debug("Agent.HandleProvision: Opening manifest.json.") + + dst, err := os.OpenFile( + path.Join(argBuildPath, "manifest.json"), + os.O_WRONLY|os.O_CREATE|os.O_EXCL, + 0400, + ) + defer dst.Close() + + if err != nil { + logrus.Fatal("Agent.HandleProvision: Failed to open manifest.json.") + } + + logrus.Debug("Agent.HandleProvision: Writing manifest.json.") + + _, err = io.Copy(dst, r.Body) + + if err != nil { + logrus.Fatal("Agent.HandleProvision: Failed to write manifest.json.") + } + + w.WriteHeader(http.StatusCreated) + + if _, err := w.Write([]byte(`done`)); err != nil { + logrus.Fatal("Agent.HandleProvision: Failed to write response.") + } + + logrus.Info("Agent.HandleProvision: Done.") + + agent.SetState(StatePopulate) +} + +func (agent *Agent) HandlePopulate(w http.ResponseWriter, r *http.Request) { + agent.GuardState(StatePopulate) + + if r.Method != "POST" { + logrus.WithFields( + logrus.Fields{"method": r.Method}, + ).Fatal("Agent.HandlePopulate: unexpected request method") + } + + w.WriteHeader(http.StatusOK) + + if _, err := w.Write([]byte(`done`)); err != nil { + logrus.Fatal("Agent.HandlePopulate: Failed to write response.") + } + + logrus.Info("Agent.HandlePopulate: Done.") + + agent.SetState(StateBuild) +} + +func (agent *Agent) HandleBuild(w http.ResponseWriter, r *http.Request) { + agent.GuardState(StateBuild) + + if r.Method != "POST" { + logrus.WithFields( + logrus.Fields{"method": r.Method}, + ).Fatal("Agent.HandleBuild: Unexpected request method.") + } + + var buildRequest BuildRequest + + var err error + + if err = json.NewDecoder(r.Body).Decode(&buildRequest); err != nil { + logrus.Fatal("HandleBuild: Failed to decode body.") + } + + if Build != nil { + logrus.Fatal("HandleBuild: Build started but Build was non-nil.") + } + + args := []string{ + "--store", path.Join(argBuildPath, "store"), + "--cache-max-size", "unlimited", + "--checkpoint", "*", + "--output-directory", path.Join(argBuildPath, "export"), + } + + for _, pipeline := range buildRequest.Pipelines { + args = append(args, "--export") + args = append(args, pipeline) + } + + args = append(args, path.Join(argBuildPath, "manifest.json")) + + envs := os.Environ() + envs = append(envs, buildRequest.Environments...) + + Build = &BackgroundProcess{} + Build.Process = exec.Command( + "/usr/bin/osbuild", + args..., + ) + Build.Process.Env = envs + + logrus.Infof("BackgroundProcess: Starting %s with %s", Build.Process, envs) + + Build.Stdout, err = Build.Process.StdoutPipe() + + if err != nil { + logrus.Fatal(err) + } + + Build.Stderr, err = Build.Process.StderrPipe() + + if err != nil { + logrus.Fatal(err) + } + + if err := Build.Process.Start(); err != nil { + logrus.Fatal("BackgroundProcess: Failed to start process.") + } + + go func() { + Build.Error = Build.Process.Wait() + Build.Done = true + + logrus.Info("BackgroundProcess: Exited.") + }() + + go func() { + scanner := bufio.NewScanner(Build.Stdout) + for scanner.Scan() { + m := scanner.Text() + logrus.Infof("BackgroundProcess: Stdout: %s", m) + } + }() + + go func() { + scanner := bufio.NewScanner(Build.Stderr) + for scanner.Scan() { + m := scanner.Text() + logrus.Infof("BackgroundProcess: Stderr: %s", m) + } + }() + + w.WriteHeader(http.StatusCreated) + + agent.SetState(StateProgress) +} + +func (agent *Agent) HandleProgress(w http.ResponseWriter, r *http.Request) { + agent.GuardState(StateProgress) + + if r.Method != "GET" { + logrus.WithFields( + logrus.Fields{"method": r.Method}, + ).Fatal("Agent.HandleProgress: Unexpected request method.") + } + + if Build == nil { + logrus.Fatal("HandleProgress: Progress requested but Build was nil.") + } + + if Build.Done { + w.WriteHeader(http.StatusOK) + + if Build.Error != nil { + logrus.Fatalf("Agent.HandleBuild: Buildprocess exited with error: %s", Build.Error) + } + + agent.SetState(StateExport) + } else { + w.WriteHeader(http.StatusAccepted) + } + + if _, err := w.Write([]byte(`done`)); err != nil { + logrus.Fatal("Agent.HandleBuild: Failed to write response.") + } + + logrus.Info("Agent.HandleBuild: Done.") +} + +func (agent *Agent) HandleExport(w http.ResponseWriter, r *http.Request) { + agent.GuardState(StateExport) + + if r.Method != "GET" { + logrus.WithFields( + logrus.Fields{"method": r.Method}, + ).Fatal("Agent.HandleExport: unexpected request method") + } + + exportPath := r.URL.Query().Get("path") + + if exportPath == "" { + logrus.Fatal("Agent.HandleExport: Missing export.") + } + + // XXX check subdir + srcPath := path.Join(argBuildPath, "export", exportPath) + + src, err := os.Open( + srcPath, + ) + + if err != nil { + logrus.Fatalf("Agent.HandleExport: Failed to open source: %s.", err) + } + + _, err = io.Copy(w, src) + + if err != nil { + logrus.Fatalf("Agent.HandleExport: Failed to write response: %s.", err) + } + + logrus.Info("Agent.HandleExport: Done.") + + agent.SetState(StateDone) +} + +func (agent *Agent) Serve() error { + mux := http.NewServeMux() + mux.HandleFunc("/claim", agent.HandleClaim) + + mux.HandleFunc("/provision", agent.HandleProvision) + mux.HandleFunc("/populate", agent.HandlePopulate) + + mux.HandleFunc("/build", agent.HandleBuild) + mux.HandleFunc("/progress", agent.HandleProgress) + + mux.HandleFunc("/export", agent.HandleExport) + + net := &http.Server{ + ReadTimeout: 1 * time.Second, + WriteTimeout: 1800 * time.Second, + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 1 * time.Second, + Addr: fmt.Sprintf("%s:%d", agent.Host, agent.Port), + Handler: mux, + } + + return net.ListenAndServe() +} + +func main() { + logrus.WithFields( + logrus.Fields{ + "argJSON": argJSON, + "argBuilderHost": argBuilderHost, + "argBuilderPort": argBuilderPort, + "argTimeoutClaim": argTimeoutClaim, + "argTimeoutProvision": argTimeoutProvision, + "argTimeoutBuild": argTimeoutBuild, + "argTimeoutExport": argTimeoutExport, + }).Info("main: startup") + + agent := Agent{ + State: StateClaim, + StateChannel: make(chan State, 16), + Host: argBuilderHost, + Port: argBuilderPort, + } + go agent.Serve() + + for state := range agent.StateChannel { + if state == StateDone { + logrus.Info("main: Shutdown.") + os.Exit(ExitOk) + } + } +} diff --git a/cmd/osbuild-jobsite-manager/main.go b/cmd/osbuild-jobsite-manager/main.go new file mode 100644 index 000000000..a77241414 --- /dev/null +++ b/cmd/osbuild-jobsite-manager/main.go @@ -0,0 +1,387 @@ +// # `jobsite-manager` +package main + +import ( + "bytes" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "path" + "syscall" + "time" + + "github.com/sirupsen/logrus" +) + +const ( + ExitOk int = iota + ExitError + ExitTimeout + ExitSignal +) + +type ArgumentList []string + +func (AL *ArgumentList) String() string { + return "" +} + +func (AL *ArgumentList) Set(value string) error { + *AL = append(*AL, value) + return nil +} + +var ( + argJSON bool + + argJobsiteHost string + argJobsitePort int + argBuilderHost string + argBuilderPort int + + argTimeoutClaim int + argTimeoutProvision int + argTimeoutPopulate int + argTimeoutBuild int + argTimeoutProgress int + argTimeoutExport int + + argPipelines ArgumentList + argEnvironments ArgumentList + argExports ArgumentList + argOutputPath string +) + +type BuildRequest struct { + Pipelines []string `json:"pipelines"` + Environments []string `json:"environments"` +} + +type Step func(chan<- struct{}, chan<- error) + +func init() { + flag.BoolVar(&argJSON, "json", false, "Enable JSON output") + + flag.StringVar(&argJobsiteHost, "manager-host", "localhost", "Hostname or IP where this program will listen on.") + flag.IntVar(&argJobsitePort, "manager-port", 3333, "Port this program will listen on.") + + flag.StringVar(&argBuilderHost, "builder-host", "localhost", "Hostname or IP of a jobsite-builder that this program will connect to.") + flag.IntVar(&argBuilderPort, "builder-port", 3333, "Port of a jobsite-builder that this program will connect to.") + + flag.IntVar(&argTimeoutClaim, "timeout-claim", 600, "Timeout before the claim phase needs to be completed in seconds.") + flag.IntVar(&argTimeoutProvision, "timeout-provision", 30, "Timeout before the provision phase needs to be completed in seconds.") + flag.IntVar(&argTimeoutPopulate, "timeout-populate", 30, "Timeout before the populate phase needs to be completed in seconds.") + flag.IntVar(&argTimeoutBuild, "timeout-build", 30, "Timeout before the build phase needs to be completed in seconds.") + flag.IntVar(&argTimeoutProgress, "timeout-progress", 3600, "Timeout before the progress phase needs to be completed in seconds.") + flag.IntVar(&argTimeoutExport, "timeout-export", 1800, "Timeout before the export phase needs to be completed in seconds.") + + flag.Var(&argPipelines, "pipeline", "Pipelines to export. Can be passed multiple times.") + flag.Var(&argEnvironments, "environment", "Environments to add. Can be passed multiple times.") + flag.Var(&argExports, "export", "Files to export. Can be passed multiple times.") + flag.StringVar(&argOutputPath, "output", "/dev/null", "Output directory to write to.") + + flag.Parse() + + logrus.SetLevel(logrus.InfoLevel) + + if argJSON { + logrus.SetFormatter(&logrus.JSONFormatter{}) + } +} + +func main() { + logrus.Info("main: Starting up.") + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + done := make(chan struct{}, 1) + errs := make(chan error, 1) + + go Dance(done, errs) + + for { + select { + case sig := <-sigs: + logrus.WithFields( + logrus.Fields{ + "signal": sig, + }).Info("main: Exiting on signal.") + os.Exit(ExitSignal) + case err := <-errs: + logrus.WithFields( + logrus.Fields{ + "error": err, + }).Info("main: Exiting on error.") + os.Exit(ExitError) + case <-done: + logrus.Info("main: Shutting down succesfully.") + os.Exit(ExitOk) + } + } +} + +func Dance(done chan<- struct{}, errs chan<- error) { + manifest, err := io.ReadAll(os.Stdin) + + if err != nil { + errs <- err + return + } + + if err := StepClaim(); err != nil { + errs <- err + return + } + + if err := StepProvision(manifest); err != nil { + errs <- err + return + } + + if err := StepPopulate(); err != nil { + errs <- err + return + } + + if err := StepBuild(); err != nil { + errs <- err + return + } + + if err := StepProgress(); err != nil { + errs <- err + return + } + + if err := StepExport(); err != nil { + errs <- err + return + } + + close(done) +} + +func Request(method string, path string, body []byte) (*http.Response, error) { + cli := &http.Client{} + url := fmt.Sprintf("http://%s:%d/%s", argBuilderHost, argBuilderPort, path) + + req, err := http.NewRequest(method, url, bytes.NewBuffer(body)) + + if err != nil { + return nil, err + } + + logrus.Debugf("Request: Making a %s request to %s.", method, url) + + res, err := cli.Do(req) + + if err != nil { + return nil, err + } + + return res, nil +} + +func Wait(timeout int, fn Step) error { + done := make(chan struct{}, 1) + errs := make(chan error, 1) + + go fn(done, errs) + + select { + case <-time.After(time.Duration(timeout) * time.Second): + return fmt.Errorf("timeout") + case <-done: + return nil + case err := <-errs: + return err + } +} + +func StepClaim() error { + return Wait(argTimeoutClaim, func(done chan<- struct{}, errs chan<- error) { + for { + res, err := Request("POST", "claim", []byte("")) + + if err != nil { + if errors.Is(err, syscall.ECONNREFUSED) { + logrus.Info("StepClaim: Got ECONNREFUSED, the builder is likely not yet up. Retry.") + time.Sleep(1 * time.Second) + continue + } + errs <- err + return + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + errs <- fmt.Errorf("StepClaim: Got an unexpected response %d while expecting %d. Exiting.", res.StatusCode, http.StatusOK) + return + } + + break + } + + logrus.Info("StepClaim: Done.") + + close(done) + }) +} + +func StepProvision(manifest []byte) error { + return Wait(argTimeoutProvision, func(done chan<- struct{}, errs chan<- error) { + res, err := Request("PUT", "provision", manifest) + + if err != nil { + errs <- err + return + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusCreated { + errs <- fmt.Errorf("StepProvision: Got an unexpected response %d while expecting %d. Exiting.", res.StatusCode, http.StatusCreated) + return + } + + logrus.Info("StepProvision: Done.") + + close(done) + }) +} + +func StepPopulate() error { + return Wait(argTimeoutPopulate, func(done chan<- struct{}, errs chan<- error) { + res, err := Request("POST", "populate", []byte("")) + + if err != nil { + errs <- err + return + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + errs <- fmt.Errorf("StepPopulate: Got an unexpected response %d while expecting %d. Exiting.", res.StatusCode, http.StatusOK) + return + } + + logrus.Info("StepPopulate: Done.") + + close(done) + }) +} + +func StepBuild() error { + return Wait(argTimeoutBuild, func(done chan<- struct{}, errs chan<- error) { + arg := BuildRequest{ + Pipelines: argPipelines, + Environments: argEnvironments, + } + + dat, err := json.Marshal(arg) + + if err != nil { + logrus.Fatalf("StepBuild: Failed to marshal data: %s", err) + } + + res, err := Request("POST", "build", dat) + + if err != nil { + errs <- err + return + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusCreated { + errs <- fmt.Errorf("StepBuild: Got an unexpected response %d while expecting %d. Exiting.", res.StatusCode, http.StatusOK) + return + } + + logrus.Info("StepBuild: Done.") + + close(done) + }) +} + +func StepProgress() error { + return Wait(argTimeoutProgress, func(done chan<- struct{}, errs chan<- error) { + for { + res, err := Request("GET", "progress", []byte("")) + + if err != nil { + errs <- err + return + } + + defer res.Body.Close() + + if res.StatusCode == http.StatusAccepted { + logrus.Info("StepProgress: Build is pending. Retry.") + time.Sleep(5 * time.Second) + continue + } + + if res.StatusCode != http.StatusOK { + errs <- fmt.Errorf("StepProgress: Got an unexpected response %d while expecting %d. Exiting.", res.StatusCode, http.StatusOK) + return + } + + break + } + + logrus.Info("StepProgress: Done.") + + close(done) + }) +} + +func StepExport() error { + return Wait(argTimeoutExport, func(done chan<- struct{}, errs chan<- error) { + for _, export := range argExports { + res, err := Request("GET", fmt.Sprintf("export?path=%s", export), []byte("")) + + if err != nil { + errs <- err + return + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + errs <- fmt.Errorf("StepExport: Got an unexpected response %d while expecting %d. Exiting.", res.StatusCode, http.StatusOK) + return + } + + dst, err := os.OpenFile( + path.Join(argOutputPath, export), + os.O_WRONLY|os.O_CREATE|os.O_EXCL, + 0400, + ) + + if err != nil { + errs <- fmt.Errorf("StepExport: Failed to open destination response: %s.", err) + return + } + + _, err = io.Copy(dst, res.Body) + + if err != nil { + errs <- fmt.Errorf("StepExport: Failed to copy response: %s.", err) + return + } + } + + logrus.Info("StepExport: Done.") + + close(done) + }) +}