create jobs queue for scheduling new builds
This commit is contained in:
parent
7df735e36b
commit
0861b80c99
5 changed files with 150 additions and 9 deletions
|
|
@ -11,6 +11,7 @@ import (
|
|||
"os/signal"
|
||||
"path/filepath"
|
||||
|
||||
"osbuild-composer/internal/queue"
|
||||
"osbuild-composer/internal/rpmmd"
|
||||
"osbuild-composer/internal/weldr"
|
||||
)
|
||||
|
|
@ -59,7 +60,8 @@ func main() {
|
|||
}
|
||||
|
||||
stateChannel := make(chan []byte, 10)
|
||||
api := weldr.New(repo, packages, logger, state, stateChannel)
|
||||
buildChannel := make(chan queue.Build, 200)
|
||||
api := weldr.New(repo, packages, logger, state, stateChannel, buildChannel)
|
||||
go func() {
|
||||
for {
|
||||
err := writeFileAtomically(StateFile, <-stateChannel, 0755)
|
||||
|
|
|
|||
62
internal/queue/queue.go
Normal file
62
internal/queue/queue.go
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
package queue
|
||||
|
||||
import "sync"
|
||||
|
||||
// Build is a request waiting for a worker
|
||||
type Build struct {
|
||||
Pipeline string `json:"pipeline"`
|
||||
Manifest string `json:"manifest"`
|
||||
}
|
||||
|
||||
// Manifest contains additional metadata attached do a pipeline that are necessary for workers
|
||||
type Manifest struct {
|
||||
destination string
|
||||
}
|
||||
|
||||
// Job is an image build already in progress
|
||||
type Job struct {
|
||||
UUID string `json:"uuid"`
|
||||
Build Build `json:"build"`
|
||||
}
|
||||
|
||||
// JobQueue contains already running jobs waiting for
|
||||
type JobQueue struct {
|
||||
sync.Mutex
|
||||
incomingBuilds chan Build // Channel of incoming builds form Weldr API, we never want to block on this
|
||||
waitingBuilds []Build // Unbounded FIFO queue of waiting builds
|
||||
runningJobs map[string]Job // Already running jobs, key is UUID
|
||||
}
|
||||
|
||||
// NewJobQueue creates object of type JobQueue
|
||||
func NewJobQueue(timeout int, builds chan Build) *JobQueue {
|
||||
jobs := &JobQueue{
|
||||
incomingBuilds: builds,
|
||||
waitingBuilds: make([]Build, 0),
|
||||
runningJobs: make(map[string]Job),
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
// This call will block, do not put it inside the locked zone
|
||||
newBuild := <-jobs.incomingBuilds
|
||||
// Locking the whole job queue => as short as possible
|
||||
jobs.Lock()
|
||||
jobs.waitingBuilds = append(jobs.waitingBuilds, newBuild)
|
||||
jobs.Unlock()
|
||||
}
|
||||
}()
|
||||
return jobs
|
||||
}
|
||||
|
||||
// StartNewJob starts a new job
|
||||
func (j *JobQueue) StartNewJob(id string, worker string) Job {
|
||||
j.Lock()
|
||||
newBuild := j.waitingBuilds[0] // Take the first element
|
||||
j.waitingBuilds = j.waitingBuilds[1:] // Discart 1st element
|
||||
j.Unlock()
|
||||
job := Job{
|
||||
UUID: id,
|
||||
Build: newBuild,
|
||||
}
|
||||
j.runningJobs[id] = job
|
||||
return job
|
||||
}
|
||||
|
|
@ -9,11 +9,13 @@ import (
|
|||
|
||||
"github.com/julienschmidt/httprouter"
|
||||
|
||||
"osbuild-composer/internal/queue"
|
||||
"osbuild-composer/internal/rpmmd"
|
||||
)
|
||||
|
||||
type API struct {
|
||||
store *store
|
||||
store *store
|
||||
pendingBuilds chan queue.Build
|
||||
|
||||
repo rpmmd.RepoConfig
|
||||
packages rpmmd.PackageList
|
||||
|
|
@ -22,12 +24,15 @@ type API struct {
|
|||
router *httprouter.Router
|
||||
}
|
||||
|
||||
func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte) *API {
|
||||
func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte, builds chan queue.Build) *API {
|
||||
// This needs to be shared with the worker API so that they can communicate with each other
|
||||
// builds := make(chan queue.Build, 200)
|
||||
api := &API{
|
||||
store: newStore(initialState, stateChannel),
|
||||
repo: repo,
|
||||
packages: packages,
|
||||
logger: logger,
|
||||
store: newStore(initialState, stateChannel),
|
||||
pendingBuilds: builds,
|
||||
repo: repo,
|
||||
packages: packages,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
// sample blueprint on first run
|
||||
|
|
@ -67,6 +72,7 @@ func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger,
|
|||
api.router.DELETE("/api/v0/blueprints/delete/:blueprint", api.blueprintDeleteHandler)
|
||||
api.router.DELETE("/api/v0/blueprints/workspace/:blueprint", api.blueprintDeleteWorkspaceHandler)
|
||||
|
||||
api.router.POST("/api/v0/compose", api.composeHandler)
|
||||
api.router.GET("/api/v0/compose/queue", api.composeQueueHandler)
|
||||
api.router.GET("/api/v0/compose/finished", api.composeFinishedHandler)
|
||||
api.router.GET("/api/v0/compose/failed", api.composeFailedHandler)
|
||||
|
|
@ -510,6 +516,46 @@ func (api *API) blueprintDeleteWorkspaceHandler(writer http.ResponseWriter, requ
|
|||
statusResponseOK(writer)
|
||||
}
|
||||
|
||||
// Schedule new compose by first translating the appropriate blueprint into a pipeline and then
|
||||
// pushing it into the channel for waiting builds.
|
||||
func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
|
||||
// https://weldr.io/lorax/pylorax.api.html#pylorax.api.v0.v0_compose_start
|
||||
type ComposeRequest struct {
|
||||
BlueprintName string `json:"blueprint_name"`
|
||||
ComposeType string `json:"compose_type"`
|
||||
Branch string `json:"branch"`
|
||||
}
|
||||
|
||||
contentType := request.Header["Content-Type"]
|
||||
if len(contentType) != 1 || contentType[0] != "application/json" {
|
||||
statusResponseError(writer, http.StatusUnsupportedMediaType, "blueprint must be json")
|
||||
return
|
||||
}
|
||||
|
||||
var cr ComposeRequest
|
||||
err := json.NewDecoder(request.Body).Decode(&cr)
|
||||
if err != nil {
|
||||
statusResponseError(writer, http.StatusBadRequest, "invalid request format: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
bp := blueprint{}
|
||||
changed := false
|
||||
found := api.store.getBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed?
|
||||
|
||||
if found {
|
||||
api.pendingBuilds <- queue.Build{
|
||||
Pipeline: bp.translateToPipeline(cr.ComposeType),
|
||||
Manifest: "{\"output-path\": \"/var/cache/osbuild\"}",
|
||||
}
|
||||
} else {
|
||||
statusResponseError(writer, http.StatusBadRequest, "blueprint does not exist")
|
||||
return
|
||||
}
|
||||
|
||||
statusResponseOK(writer)
|
||||
}
|
||||
|
||||
func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
|
||||
var reply struct {
|
||||
New []interface{} `json:"new"`
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"osbuild-composer/internal/queue"
|
||||
"osbuild-composer/internal/rpmmd"
|
||||
"osbuild-composer/internal/weldr"
|
||||
)
|
||||
|
|
@ -111,13 +112,13 @@ func TestBasic(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range cases {
|
||||
api := weldr.New(repo, packages, nil, nil, nil)
|
||||
api := weldr.New(repo, packages, nil, nil, nil, nil)
|
||||
testRoute(t, api, "GET", c.Path, ``, c.ExpectedStatus, c.ExpectedJSON)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlueprints(t *testing.T) {
|
||||
api := weldr.New(repo, packages, nil, nil, nil)
|
||||
api := weldr.New(repo, packages, nil, nil, nil, nil)
|
||||
|
||||
testRoute(t, api, "POST", "/api/v0/blueprints/new",
|
||||
`{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0"}`,
|
||||
|
|
@ -135,3 +136,28 @@ func TestBlueprints(t *testing.T) {
|
|||
http.StatusOK, `{"blueprints":[{"name":"test","description":"Test","modules":[],"packages":[{"name":"systemd","version":"123"}],"version":"0"}],
|
||||
"changes":[{"name":"test","changed":true}], "errors":[]}`)
|
||||
}
|
||||
|
||||
func TestCompose(t *testing.T) {
|
||||
buildChannel := make(chan queue.Build, 200)
|
||||
api := weldr.New(repo, packages, nil, nil, nil, buildChannel)
|
||||
|
||||
testRoute(t, api, "POST", "/api/v0/blueprints/new",
|
||||
`{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0"}`,
|
||||
http.StatusOK, `{"status":true}`)
|
||||
|
||||
testRoute(t, api, "POST", "/api/v0/compose", `{"blueprint_name": "http-server","compose_type": "tar","branch": "master"}`,
|
||||
http.StatusBadRequest, `{"status":false,"errors":["blueprint does not exist"]}`)
|
||||
|
||||
testRoute(t, api, "POST", "/api/v0/compose", `{"blueprint_name": "test","compose_type": "tar","branch": "master"}`,
|
||||
http.StatusOK, `{"status":true}`)
|
||||
|
||||
build := <-buildChannel
|
||||
expected_pipeline := `{"pipeline": "string"}`
|
||||
expected_manifest := `{"output-path": "/var/cache/osbuild"}`
|
||||
if expected_manifest != build.Manifest {
|
||||
t.Errorf("Expected this manifest: %s; got this: %s", expected_manifest, build.Manifest)
|
||||
}
|
||||
if expected_pipeline != build.Pipeline {
|
||||
t.Errorf("Expected this manifest: %s; got this: %s", expected_pipeline, build.Pipeline)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package weldr
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"sort"
|
||||
"sync"
|
||||
|
|
@ -133,3 +134,7 @@ func (s *store) deleteBlueprintFromWorkspace(name string) {
|
|||
delete(s.Workspace, name)
|
||||
})
|
||||
}
|
||||
|
||||
func (b *blueprint) translateToPipeline(outputFormat string) string {
|
||||
return fmt.Sprintf("{\"pipeline\": \"%s\"}", "string")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue