worker: add API support for canceling jobs

This commit is contained in:
Lars Karlitski 2020-06-11 13:03:14 +02:00 committed by Ondřej Budai
parent b759f4e0be
commit 27acd03b68
4 changed files with 89 additions and 0 deletions

View file

@ -91,6 +91,26 @@ func (c *Client) AddJob() (*Job, error) {
}, nil
}
func (c *Client) JobCanceled(job *Job) bool {
response, err := c.client.Get(c.createURL("/job-queue/v1/jobs/" + job.Id.String()))
if err != nil {
return true
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return true
}
var jr jobResponse
err = json.NewDecoder(response.Body).Decode(&jr)
if err != nil {
return true
}
return jr.Canceled
}
func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *common.ComposeResult) error {
var b bytes.Buffer
err := json.NewEncoder(&b).Encode(&updateJobRequest{status, result})

View file

@ -42,6 +42,11 @@ type addJobResponse struct {
Targets []*target.Target `json:"targets,omitempty"`
}
type jobResponse struct {
Id uuid.UUID `json:"id"`
Canceled bool `json:"canceled"`
}
type updateJobRequest struct {
Status common.ImageBuildState `json:"status"`
Result *common.ComposeResult `json:"result"`

View file

@ -33,6 +33,7 @@ type JobStatus struct {
Queued time.Time
Started time.Time
Finished time.Time
Canceled bool
Result OSBuildJobResult
}
@ -54,6 +55,7 @@ func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string)
// Add handlers for managing jobs.
s.router.POST("/job-queue/v1/jobs", s.addJobHandler)
s.router.GET("/job-queue/v1/jobs/:job_id", s.jobHandler)
s.router.PATCH("/job-queue/v1/jobs/:job_id", s.updateJobHandler)
s.router.POST("/job-queue/v1/jobs/:job_id/artifacts/:name", s.addJobImageHandler)
@ -115,10 +117,15 @@ func (s *Server) JobStatus(id uuid.UUID) (*JobStatus, error) {
Queued: queued,
Started: started,
Finished: finished,
Canceled: canceled,
Result: result,
}, nil
}
func (s *Server) Cancel(id uuid.UUID) error {
return s.jobs.CancelJob(id)
}
// Provides access to artifacts of a job. Returns an io.Reader for the artifact
// and the artifact's size.
func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) {
@ -187,6 +194,30 @@ func (s *Server) statusHandler(writer http.ResponseWriter, request *http.Request
})
}
func (s *Server) jobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
id, err := uuid.Parse(params.ByName("job_id"))
if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "cannot parse compose id: %v", err)
return
}
status, err := s.JobStatus(id)
if err != nil {
switch err {
case jobqueue.ErrNotExist:
jsonErrorf(writer, http.StatusNotFound, "job does not exist: %s", id)
default:
jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
}
return
}
_ = json.NewEncoder(writer).Encode(jobResponse{
Id: id,
Canceled: status.Canceled,
})
}
func (s *Server) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
contentType := request.Header["Content-Type"]
if len(contentType) != 1 || contentType[0] != "application/json" {

View file

@ -1,6 +1,7 @@
package worker_test
import (
"fmt"
"net/http"
"testing"
@ -68,6 +69,38 @@ func TestCreate(t *testing.T) {
test.TestRoute(t, server, false, "POST", "/job-queue/v1/jobs", `{}`, http.StatusCreated,
`{"id":"`+id.String()+`","manifest":{"sources":{},"pipeline":{}}}`, "created")
test.TestRoute(t, server, false, "GET", fmt.Sprintf("/job-queue/v1/jobs/%s", id), `{}`, http.StatusOK,
`{"id":"`+id.String()+`","canceled":false}`)
}
func TestCancel(t *testing.T) {
distroStruct := fedoratest.New()
arch, err := distroStruct.GetArch("x86_64")
if err != nil {
t.Fatalf("error getting arch from distro")
}
imageType, err := arch.GetImageType("qcow2")
if err != nil {
t.Fatalf("error getting image type from arch")
}
manifest, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, nil, nil)
if err != nil {
t.Fatalf("error creating osbuild manifest")
}
server := worker.NewServer(nil, testjobqueue.New(), "")
id, err := server.Enqueue(manifest, nil)
require.NoError(t, err)
test.TestRoute(t, server, false, "POST", "/job-queue/v1/jobs", `{}`, http.StatusCreated,
`{"id":"`+id.String()+`","manifest":{"sources":{},"pipeline":{}}}`, "created")
err = server.Cancel(id)
require.NoError(t, err)
test.TestRoute(t, server, false, "GET", fmt.Sprintf("/job-queue/v1/jobs/%s", id), `{}`, http.StatusOK,
`{"id":"`+id.String()+`","canceled":true}`)
}
func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) {