worker: move ComposerClient to jobqueue package
This moves the client code into the same package as the server code, which makes it easier to change (and version) the two in sync. Also, it will allow to make some structs private to the jobqueue package and to test `Client`. Also rename it to jobqueue.Client.
This commit is contained in:
parent
cb4421b69f
commit
b5432e78b9
2 changed files with 124 additions and 114 deletions
|
|
@ -1,19 +1,13 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/common"
|
||||
"github.com/osbuild/osbuild-composer/internal/jobqueue"
|
||||
|
|
@ -21,12 +15,6 @@ import (
|
|||
|
||||
const RemoteWorkerPort = 8700
|
||||
|
||||
type ComposerClient struct {
|
||||
client *http.Client
|
||||
scheme string
|
||||
hostname string
|
||||
}
|
||||
|
||||
type connectionConfig struct {
|
||||
CACertFile string
|
||||
ClientKeyFile string
|
||||
|
|
@ -56,105 +44,7 @@ func createTLSConfig(config *connectionConfig) (*tls.Config, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func NewClient(address string, conf *tls.Config) *ComposerClient {
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: conf,
|
||||
},
|
||||
}
|
||||
|
||||
var scheme string
|
||||
if conf != nil {
|
||||
scheme = "http"
|
||||
} else {
|
||||
scheme = "https"
|
||||
}
|
||||
|
||||
return &ComposerClient{client, scheme, address}
|
||||
}
|
||||
|
||||
func NewClientUnix(path string) *ComposerClient {
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: func(context context.Context, network, addr string) (net.Conn, error) {
|
||||
return net.Dial("unix", path)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return &ComposerClient{client, "http", "localhost"}
|
||||
}
|
||||
|
||||
func (c *ComposerClient) AddJob() (*jobqueue.Job, error) {
|
||||
type request struct {
|
||||
}
|
||||
|
||||
var b bytes.Buffer
|
||||
err := json.NewEncoder(&b).Encode(request{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
response, err := c.client.Post(c.createURL("/job-queue/v1/jobs"), "application/json", &b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
if response.StatusCode != http.StatusCreated {
|
||||
rawR, _ := ioutil.ReadAll(response.Body)
|
||||
r := string(rawR)
|
||||
return nil, fmt.Errorf("couldn't create job, got %d: %s", response.StatusCode, r)
|
||||
}
|
||||
|
||||
job := &jobqueue.Job{}
|
||||
err = json.NewDecoder(response.Body).Decode(job)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (c *ComposerClient) UpdateJob(job *jobqueue.Job, status common.ImageBuildState, result *common.ComposeResult) error {
|
||||
var b bytes.Buffer
|
||||
err := json.NewEncoder(&b).Encode(&jobqueue.JobStatus{status, result})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
urlPath := fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d", job.ID.String(), job.ImageBuildID)
|
||||
url := c.createURL(urlPath)
|
||||
req, err := http.NewRequest("PATCH", url, &b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
response, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return errors.New("error setting job status")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ComposerClient) UploadImage(job *jobqueue.Job, reader io.Reader) error {
|
||||
// content type doesn't really matter
|
||||
url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", job.ID.String(), job.ImageBuildID))
|
||||
_, err := c.client.Post(url, "application/octet-stream", reader)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *ComposerClient) createURL(path string) string {
|
||||
return c.scheme + "://" + c.hostname + path
|
||||
}
|
||||
|
||||
func handleJob(client *ComposerClient) error {
|
||||
func handleJob(client *jobqueue.Client) error {
|
||||
fmt.Println("Waiting for a new job...")
|
||||
job, err := client.AddJob()
|
||||
if err != nil {
|
||||
|
|
@ -181,7 +71,7 @@ func main() {
|
|||
flag.StringVar(&address, "remote", "", "Connect to a remote composer using the specified address")
|
||||
flag.Parse()
|
||||
|
||||
var client *ComposerClient
|
||||
var client *jobqueue.Client
|
||||
if address != "" {
|
||||
address = fmt.Sprintf("%s:%d", address, RemoteWorkerPort)
|
||||
|
||||
|
|
@ -194,9 +84,9 @@ func main() {
|
|||
log.Fatalf("Error creating TLS config: %v", err)
|
||||
}
|
||||
|
||||
client = NewClient(address, conf)
|
||||
client = jobqueue.NewClient(address, conf)
|
||||
} else {
|
||||
client = NewClientUnix("/run/osbuild-composer/job.socket")
|
||||
client = jobqueue.NewClientUnix(address)
|
||||
}
|
||||
|
||||
for {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue