worker: Add identity filter and client oauth support

This commit is contained in:
sanne 2021-06-07 09:33:21 +02:00 committed by Sanne Raymaekers
parent 968e7b210f
commit 0ea31c39d5
11 changed files with 277 additions and 65 deletions

View file

@ -3,3 +3,4 @@
package api
const BasePath = "/api/worker/v1"
const CloudBasePath = "/api/composer-worker/v1"

View file

@ -10,15 +10,29 @@ import (
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/worker/api"
)
type bearerToken struct {
AccessToken string `json:"access_token"`
ValidForSeconds int `json:"expires_in"`
}
type Client struct {
server *url.URL
requester *http.Client
server *url.URL
requester *http.Client
offlineToken *string
oAuthURL *string
lastTokenRefresh *time.Time
bearerToken *bearerToken
tokenMu *sync.Mutex
}
type Job interface {
@ -33,7 +47,7 @@ type Job interface {
}
type job struct {
requester *http.Client
client *Client
id uuid.UUID
location string
artifactLocation string
@ -42,24 +56,33 @@ type job struct {
dynamicArgs []json.RawMessage
}
func NewClient(baseURL string, conf *tls.Config) (*Client, error) {
func NewClient(baseURL string, conf *tls.Config, offlineToken, oAuthURL *string) (*Client, error) {
server, err := url.Parse(baseURL)
if err != nil {
return nil, err
}
server, err = server.Parse(api.BasePath + "/")
bp := api.BasePath
if offlineToken != nil {
bp = api.CloudBasePath
}
server, err = server.Parse(bp + "/")
if err != nil {
panic(err)
}
requester := &http.Client{
Transport: &http.Transport{
TLSClientConfig: conf,
},
if conf != nil && offlineToken != nil {
return nil, fmt.Errorf("error creating client, both tls and oauth are enabled")
}
return &Client{server, requester}, nil
requester := &http.Client{}
if conf != nil {
requester.Transport = &http.Transport{
TLSClientConfig: conf,
}
}
return &Client{server, requester, offlineToken, oAuthURL, nil, nil, &sync.Mutex{}}, nil
}
func NewClientUnix(path string) *Client {
@ -81,7 +104,62 @@ func NewClientUnix(path string) *Client {
},
}
return &Client{server, requester}
return &Client{server, requester, nil, nil, nil, nil, nil}
}
// Note: Only call this function with Client.tokenMu locked!
func (c *Client) refreshBearerToken() error {
if c.offlineToken == nil || c.oAuthURL == nil {
return fmt.Errorf("No offline token or oauth url available")
}
data := url.Values{}
data.Set("grant_type", "refresh_token")
data.Set("client_id", "rhsm-api")
data.Set("refresh_token", *c.offlineToken)
t := time.Now()
resp, err := http.Post(*c.oAuthURL, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
if err != nil {
return err
}
var bt bearerToken
err = json.NewDecoder(resp.Body).Decode(&bt)
if err != nil {
return err
}
c.bearerToken = &bt
c.lastTokenRefresh = &t
return nil
}
func (c *Client) NewRequest(method, url string, body io.Reader) (*http.Request, error) {
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, err
}
// If we're using OAUTH, add the Bearer token
if c.offlineToken != nil {
// make sure we have a valid token
var d time.Duration
c.tokenMu.Lock()
defer c.tokenMu.Unlock()
if c.lastTokenRefresh != nil {
d = time.Since(*c.lastTokenRefresh)
}
if c.bearerToken == nil || d.Seconds() >= (float64(c.bearerToken.ValidForSeconds)*0.8) {
err = c.refreshBearerToken()
if err != nil {
return nil, err
}
}
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", c.bearerToken.AccessToken))
}
return req, nil
}
func (c *Client) RequestJob(types []string) (Job, error) {
@ -100,7 +178,13 @@ func (c *Client) RequestJob(types []string) (Job, error) {
panic(err)
}
response, err := c.requester.Post(url.String(), "application/json", &buf)
req, err := c.NewRequest("POST", url.String(), &buf)
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/json")
response, err := c.requester.Do(req)
if err != nil {
return nil, fmt.Errorf("error requesting job: %v", err)
}
@ -127,7 +211,7 @@ func (c *Client) RequestJob(types []string) (Job, error) {
}
return &job{
requester: c.requester,
client: c,
id: jr.Id,
jobType: jr.Type,
args: jr.Args,
@ -174,14 +258,14 @@ func (j *job) Update(result interface{}) error {
panic(err)
}
req, err := http.NewRequest("PATCH", j.location, &buf)
req, err := j.client.NewRequest("PATCH", j.location, &buf)
if err != nil {
panic(err)
}
req.Header.Add("Content-Type", "application/json")
response, err := j.requester.Do(req)
response, err := j.client.requester.Do(req)
if err != nil {
return fmt.Errorf("error fetching job info: %v", err)
}
@ -195,7 +279,12 @@ func (j *job) Update(result interface{}) error {
}
func (j *job) Canceled() (bool, error) {
response, err := j.requester.Get(j.location)
req, err := j.client.NewRequest("GET", j.location, nil)
if err != nil {
return false, err
}
response, err := j.client.requester.Do(req)
if err != nil {
return false, fmt.Errorf("error fetching job info: %v", err)
}
@ -229,14 +318,14 @@ func (j *job) UploadArtifact(name string, reader io.Reader) error {
panic(err)
}
req, err := http.NewRequest("PUT", loc.String(), reader)
req, err := j.client.NewRequest("PUT", loc.String(), reader)
if err != nil {
return fmt.Errorf("cannot create request: %v", err)
}
req.Header.Add("Content-Type", "application/octet-stream")
response, err := j.requester.Do(req)
response, err := j.client.requester.Do(req)
if err != nil {
return fmt.Errorf("error uploading artifact: %v", err)
}

View file

@ -2,6 +2,7 @@ package worker
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
@ -11,6 +12,7 @@ import (
"net/http"
"os"
"path"
"strings"
"sync"
"time"
@ -22,9 +24,11 @@ import (
)
type Server struct {
jobs jobqueue.JobQueue
logger *log.Logger
artifactsDir string
jobs jobqueue.JobQueue
logger *log.Logger
artifactsDir string
identityFilter []string
basePath string
// Currently running jobs. Workers are not handed job ids, but
// independent tokens which serve as an indirection. This enables
@ -48,12 +52,14 @@ type JobStatus struct {
var ErrTokenNotExist = errors.New("worker token does not exist")
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string) *Server {
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, identityFilter []string) *Server {
return &Server{
jobs: jobs,
logger: logger,
artifactsDir: artifactsDir,
running: make(map[uuid.UUID]uuid.UUID),
jobs: jobs,
logger: logger,
artifactsDir: artifactsDir,
identityFilter: identityFilter,
running: make(map[uuid.UUID]uuid.UUID),
}
}
@ -68,14 +74,57 @@ func (s *Server) Handler() http.Handler {
e.DefaultHTTPErrorHandler(err, c)
}
var mws []echo.MiddlewareFunc
if len(s.identityFilter) > 0 {
mws = append(mws, s.VerifyIdentityHeader)
}
handler := apiHandlers{
server: s,
}
api.RegisterHandlers(e.Group(api.BasePath), &handler)
api.RegisterHandlers(e.Group(api.BasePath, mws...), &handler)
api.RegisterHandlers(e.Group(api.CloudBasePath, mws...), &handler)
return e
}
func (s *Server) VerifyIdentityHeader(nextHandler echo.HandlerFunc) echo.HandlerFunc {
return func(ctx echo.Context) error {
type identityHeader struct {
Identity struct {
AccountNumber string `json:"account_number"`
} `json:"identity"`
}
request := ctx.Request()
idHeaderB64 := request.Header["X-Rh-Identity"]
if len(idHeaderB64) != 1 {
return echo.NewHTTPError(http.StatusNotFound, "Auth header is not present")
}
b64Result, err := base64.StdEncoding.DecodeString(idHeaderB64[0])
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, "Auth header has incorrect format")
}
var idHeader identityHeader
err = json.Unmarshal([]byte(strings.TrimSuffix(fmt.Sprintf("%s", b64Result), "\n")), &idHeader)
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, "Auth header has incorrect format")
}
for _, i := range s.identityFilter {
if idHeader.Identity.AccountNumber == i {
ctx.Set("IdentityHeader", idHeader)
return nextHandler(ctx)
}
}
return echo.NewHTTPError(http.StatusNotFound, "Account not allowed")
}
}
func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob) (uuid.UUID, error) {
return s.jobs.Enqueue("osbuild:"+arch, job, nil)
}
@ -302,8 +351,8 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error {
return ctx.JSON(http.StatusCreated, requestJobResponse{
Id: jobId,
Location: fmt.Sprintf("%s/jobs/%v", api.BasePath, token),
ArtifactLocation: fmt.Sprintf("%s/jobs/%v/artifacts/", api.BasePath, token),
Location: fmt.Sprintf("%s/jobs/%v", h.server.basePath, token),
ArtifactLocation: fmt.Sprintf("%s/jobs/%v/artifacts/", h.server.basePath, token),
Type: jobType,
Args: jobArgs,
DynamicArgs: dynamicJobArgs,

View file

@ -9,6 +9,7 @@ import (
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/osbuild/osbuild-composer/internal/distro"
@ -18,12 +19,12 @@ import (
"github.com/osbuild/osbuild-composer/internal/worker"
)
func newTestServer(t *testing.T, tempdir string) *worker.Server {
func newTestServer(t *testing.T, tempdir string, identities []string) *worker.Server {
q, err := fsjobqueue.New(tempdir)
if err != nil {
t.Fatalf("error creating fsjobqueue: %v", err)
}
return worker.NewServer(nil, q, "")
return worker.NewServer(nil, q, "", identities)
}
// Ensure that the status request returns OK.
@ -32,7 +33,7 @@ func TestStatus(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(tempdir)
server := newTestServer(t, tempdir)
server := newTestServer(t, tempdir, []string{})
handler := server.Handler()
test.TestRoute(t, handler, false, "GET", "/api/worker/v1/status", ``, http.StatusOK, `{"status":"OK"}`, "message")
}
@ -63,7 +64,7 @@ func TestErrors(t *testing.T) {
defer os.RemoveAll(tempdir)
for _, c := range cases {
server := newTestServer(t, tempdir)
server := newTestServer(t, tempdir, []string{})
handler := server.Handler()
test.TestRoute(t, handler, false, c.Method, c.Path, c.Body, c.ExpectedStatus, "{}", "message")
}
@ -87,7 +88,7 @@ func TestCreate(t *testing.T) {
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, tempdir)
server := newTestServer(t, tempdir, []string{})
handler := server.Handler()
_, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
@ -116,7 +117,7 @@ func TestCancel(t *testing.T) {
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, tempdir)
server := newTestServer(t, tempdir, []string{})
handler := server.Handler()
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
@ -157,7 +158,7 @@ func TestUpdate(t *testing.T) {
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, tempdir)
server := newTestServer(t, tempdir, []string{})
handler := server.Handler()
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
@ -186,7 +187,7 @@ func TestArgs(t *testing.T) {
tempdir, err := ioutil.TempDir("", "worker-tests-")
require.NoError(t, err)
defer os.RemoveAll(tempdir)
server := newTestServer(t, tempdir)
server := newTestServer(t, tempdir, []string{})
job := worker.OSBuildJob{
Manifest: manifest,
@ -226,7 +227,7 @@ func TestUpload(t *testing.T) {
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, tempdir)
server := newTestServer(t, tempdir, []string{})
handler := server.Handler()
jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
@ -241,3 +242,38 @@ func TestUpload(t *testing.T) {
test.TestRoute(t, handler, false, "PUT", fmt.Sprintf("/api/worker/v1/jobs/%s/artifacts/foobar", token), `this is my artifact`, http.StatusOK, `?`)
}
func TestIdentities(t *testing.T) {
tempdir, err := ioutil.TempDir("", "worker-tests-")
require.NoError(t, err)
defer os.RemoveAll(tempdir)
// distroStruct := test_distro.New()
// arch, err := distroStruct.GetArch(test_distro.TestArchName)
// require.NoError(t, err)
// imageType, err := arch.GetImageType(test_distro.TestImageTypeName)
// require.NoError(t, err)
// manifest, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, nil, 0)
// require.NoError(t, err)
server := newTestServer(t, tempdir, []string{"000000"})
handler := server.Handler()
// _, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
// require.NoError(t, err)
test.TestRoute(t, handler, false, "GET", "/api/worker/v1/status", ``, http.StatusNotFound, `{"message":"Auth header is not present"}`, "message")
header := map[string]string{
"x-rh-identity": "eyJlbnRpdGxlbWVudHMiOnsiaW5zaWdodHMiOnsiaXNfZW50aXRsZWQiOnRydWV9LCJzbWFydF9tYW5hZ2VtZW50Ijp7ImlzX2VudGl0bGVkIjp0cnVlfSwib3BlbnNoaWZ0Ijp7ImlzX2VudGl0bGVkIjp0cnVlfSwiaHlicmlkIjp7ImlzX2VudGl0bGVkIjp0cnVlfSwibWlncmF0aW9ucyI6eyJpc19lbnRpdGxlZCI6dHJ1ZX0sImFuc2libGUiOnsiaXNfZW50aXRsZWQiOnRydWV9fSwiaWRlbnRpdHkiOnsiYWNjb3VudF9udW1iZXIiOiIwMDAwMDMiLCJ0eXBlIjoiVXNlciIsInVzZXIiOnsidXNlcm5hbWUiOiJ1c2VyIiwiZW1haWwiOiJ1c2VyQHVzZXIudXNlciIsImZpcnN0X25hbWUiOiJ1c2VyIiwibGFzdF9uYW1lIjoidXNlciIsImlzX2FjdGl2ZSI6dHJ1ZSwiaXNfb3JnX2FkbWluIjp0cnVlLCJpc19pbnRlcm5hbCI6dHJ1ZSwibG9jYWxlIjoiZW4tVVMifSwiaW50ZXJuYWwiOnsib3JnX2lkIjoiMDAwMDAwIn19fQo=",
}
response := test.SendHTTPWithHeader(handler, "GET", "/api/worker/v1/status", ``, header)
assert.Equal(t, 404, response.StatusCode, "status mismatch")
header = map[string]string{
"x-rh-identity": "eyJlbnRpdGxlbWVudHMiOnsiaW5zaWdodHMiOnsiaXNfZW50aXRsZWQiOnRydWV9LCJzbWFydF9tYW5hZ2VtZW50Ijp7ImlzX2VudGl0bGVkIjp0cnVlfSwib3BlbnNoaWZ0Ijp7ImlzX2VudGl0bGVkIjp0cnVlfSwiaHlicmlkIjp7ImlzX2VudGl0bGVkIjp0cnVlfSwibWlncmF0aW9ucyI6eyJpc19lbnRpdGxlZCI6dHJ1ZX0sImFuc2libGUiOnsiaXNfZW50aXRsZWQiOnRydWV9fSwiaWRlbnRpdHkiOnsiYWNjb3VudF9udW1iZXIiOiIwMDAwMDAiLCJ0eXBlIjoiVXNlciIsInVzZXIiOnsidXNlcm5hbWUiOiJ1c2VyIiwiZW1haWwiOiJ1c2VyQHVzZXIudXNlciIsImZpcnN0X25hbWUiOiJ1c2VyIiwibGFzdF9uYW1lIjoidXNlciIsImlzX2FjdGl2ZSI6dHJ1ZSwiaXNfb3JnX2FkbWluIjp0cnVlLCJpc19pbnRlcm5hbCI6dHJ1ZSwibG9jYWxlIjoiZW4tVVMifSwiaW50ZXJuYWwiOnsib3JnX2lkIjoiMDAwMDAwIn19fQ==",
}
response = test.SendHTTPWithHeader(handler, "GET", "/api/worker/v1/status", ``, header)
assert.Equal(t, 200, response.StatusCode, "status mismatch")
}