worker: add proxy support to composer and oauth calls

In the internal deployment, we want to talk with composer over a http/https
proxy. This proxy adds new composer.proxy field to the worker config that
causes the worker to connect to composer and the oauth server using
a specified proxy.

NB: The proxy is not supported when connection to composer via unix sockets.

For testing this, I added a small HTTP proxy implementation, pls don't
use this in production, it's just good enough for tests.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2022-04-21 14:00:33 +02:00 committed by Tom Gundersen
parent 9ee3997428
commit 6fce34a5ea
4 changed files with 149 additions and 0 deletions

View file

@ -123,6 +123,9 @@ func RequestAndRunJob(client *worker.Client, acceptedJobTypes []string, jobImpls
func main() {
var config struct {
Composer *struct {
Proxy string `toml:"proxy"`
}
Koji map[string]struct {
Kerberos *struct {
Principal string `toml:"principal"`
@ -240,6 +243,11 @@ func main() {
clientSecret = strings.TrimSpace(string(cs))
}
proxy := ""
if config.Composer != nil && config.Composer.Proxy != "" {
proxy = config.Composer.Proxy
}
client, err = worker.NewClient(worker.ClientConfig{
BaseURL: fmt.Sprintf("https://%s", address),
TlsConfig: conf,
@ -248,6 +256,7 @@ func main() {
ClientId: config.Authentication.ClientId,
ClientSecret: clientSecret,
BasePath: config.BasePath,
ProxyURL: proxy,
})
if err != nil {
logrus.Fatalf("Error creating worker client: %v", err)
@ -266,10 +275,16 @@ func main() {
}
}
proxy := ""
if config.Composer != nil && config.Composer.Proxy != "" {
proxy = config.Composer.Proxy
}
client, err = worker.NewClient(worker.ClientConfig{
BaseURL: fmt.Sprintf("https://%s", address),
TlsConfig: conf,
BasePath: config.BasePath,
ProxyURL: proxy,
})
if err != nil {
logrus.Fatalf("Error creating worker client: %v", err)

View file

@ -38,6 +38,7 @@ type ClientConfig struct {
ClientId string
ClientSecret string
BasePath string
ProxyURL string
}
type Job interface {
@ -91,6 +92,17 @@ func NewClient(conf ClientConfig) (*Client, error) {
requester := &http.Client{}
transport := http.DefaultTransport.(*http.Transport).Clone()
if conf.ProxyURL != "" {
proxyURL, err := url.Parse(conf.ProxyURL)
if err != nil {
return nil, err
}
transport.Proxy = func(request *http.Request) (*url.URL, error) {
return proxyURL, nil
}
}
if conf.TlsConfig != nil {
transport.TLSClientConfig = conf.TlsConfig
}

View file

@ -100,3 +100,39 @@ func TestOAuth(t *testing.T) {
require.False(t, c)
require.NoError(t, err)
}
func TestProxy(t *testing.T) {
workerURL, oauthURL, offlineToken := newTestWorkerServer(t)
// initialize a test proxy server
proxy := &proxy{}
proxySrv := httptest.NewServer(proxy)
t.Cleanup(proxySrv.Close)
client, err := worker.NewClient(worker.ClientConfig{
BaseURL: workerURL,
TlsConfig: nil,
ClientId: "rhsm-api",
OfflineToken: offlineToken,
OAuthURL: oauthURL,
BasePath: "/api/image-builder-worker/v1",
ProxyURL: proxySrv.URL,
})
require.NoError(t, err)
job, err := client.RequestJob([]string{"osbuild"}, "arch")
require.NoError(t, err)
r := strings.NewReader("artifact contents")
require.NoError(t, job.UploadArtifact("some-artifact", r))
c, err := job.Canceled()
require.False(t, c)
require.NoError(t, err)
// we expect 5 calls to go through the proxy:
// - request job (fails, no oauth token)
// - oauth call
// - request job (succeeds)
// - upload artifact
// - cancel
require.Equal(t, 5, proxy.calls)
}

View file

@ -0,0 +1,86 @@
package worker_test
import (
"io"
"net"
"net/http"
"strings"
)
// Hop-by-hop headers. These are removed when sent to the backend.
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
var hopHeaders = []string{
"Connection",
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"Te", // canonicalized version of "TE"
"Trailers",
"Transfer-Encoding",
"Upgrade",
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}
func delHopHeaders(header http.Header) {
for _, h := range hopHeaders {
header.Del(h)
}
}
func appendHostToXForwardHeader(header http.Header, host string) {
// If we aren't the first proxy retain prior
// X-Forwarded-For information as a comma+space
// separated list and fold multiple headers into one.
if prior, ok := header["X-Forwarded-For"]; ok {
host = strings.Join(prior, ", ") + ", " + host
}
header.Set("X-Forwarded-For", host)
}
// proxy is a simple http-only proxy implementation.
// Don't use it in production. Also don't use it for https.
type proxy struct {
// number of calls that were made through the proxy
calls int
}
func (p *proxy) ServeHTTP(wr http.ResponseWriter, req *http.Request) {
p.calls++
if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
msg := "unsupported protocol scheme " + req.URL.Scheme
http.Error(wr, msg, http.StatusBadRequest)
return
}
client := &http.Client{}
//http: Request.RequestURI can't be set in client requests.
//http://golang.org/src/pkg/net/http/client.go
req.RequestURI = ""
delHopHeaders(req.Header)
if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
appendHostToXForwardHeader(req.Header, clientIP)
}
resp, err := client.Do(req)
if err != nil {
http.Error(wr, "Server Error", http.StatusInternalServerError)
}
defer resp.Body.Close()
delHopHeaders(resp.Header)
copyHeader(wr.Header(), resp.Header)
wr.WriteHeader(resp.StatusCode)
_, _ = io.Copy(wr, resp.Body)
}