koji: fix excessive logging & monitoring
update koji upload to use custom leveled logging, this only affects logging since uploading uses a different connection to send the chunks, it is done separately in this commit
This commit is contained in:
parent
ed5cd56c5a
commit
7c4d74481a
1 changed files with 36 additions and 41 deletions
|
|
@ -295,14 +295,14 @@ func (k *Koji) CGImport(build ImageBuild, buildRoots []BuildRoot, images []Image
|
|||
}
|
||||
|
||||
// uploadChunk uploads a byte slice to a given filepath/filname at a given offset
|
||||
func (k *Koji) uploadChunk(chunk []byte, filepath, filename string, offset uint64) (uint, error) {
|
||||
func (k *Koji) uploadChunk(chunk []byte, filepath, filename string, offset uint64) error {
|
||||
// We have to open-code a bastardized version of XML-RPC: We send an octet-stream, as
|
||||
// if it was an RPC call, and get a regular XML-RPC reply back. In addition to the
|
||||
// standard URL parameters, we also have to pass any other parameters as part of the
|
||||
// URL, as the body can only contain the payload.
|
||||
u, err := url.Parse(k.server)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
q := u.Query()
|
||||
q.Add("filepath", filepath)
|
||||
|
|
@ -311,49 +311,23 @@ func (k *Koji) uploadChunk(chunk []byte, filepath, filename string, offset uint6
|
|||
q.Add("fileverify", "adler32")
|
||||
u.RawQuery = q.Encode()
|
||||
|
||||
retries := uint(0)
|
||||
|
||||
countingCheckRetry := func(ctx context.Context, resp *http.Response, err error) (bool, error) {
|
||||
shouldRetry, retErr := rh.DefaultRetryPolicy(ctx, resp, err)
|
||||
|
||||
// DefaultRetryPolicy denies retrying for any certificate related error.
|
||||
// Override it in case the error is a timeout.
|
||||
if !shouldRetry && err != nil {
|
||||
if v, ok := err.(*url.Error); ok {
|
||||
if _, ok := v.Err.(x509.UnknownAuthorityError); ok {
|
||||
// retry if it's a timeout
|
||||
shouldRetry = strings.Contains(strings.ToLower(v.Error()), "timeout")
|
||||
retErr = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if shouldRetry {
|
||||
retries++
|
||||
}
|
||||
|
||||
return shouldRetry, retErr
|
||||
}
|
||||
|
||||
client := rh.NewClient()
|
||||
client := createCustomRetryableClient()
|
||||
|
||||
client.HTTPClient = &http.Client{
|
||||
Transport: k.transport,
|
||||
}
|
||||
client.CheckRetry = countingCheckRetry
|
||||
client.Logger = rh.LeveledLogger(&LeveledLogrus{logrus.StandardLogger()})
|
||||
|
||||
respData, err := client.Post(u.String(), "application/octet-stream", bytes.NewBuffer(chunk))
|
||||
|
||||
if err != nil {
|
||||
return retries, err
|
||||
return err
|
||||
}
|
||||
|
||||
defer respData.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(respData.Body)
|
||||
if err != nil {
|
||||
return retries, err
|
||||
return err
|
||||
}
|
||||
|
||||
var reply struct {
|
||||
|
|
@ -364,31 +338,30 @@ func (k *Koji) uploadChunk(chunk []byte, filepath, filename string, offset uint6
|
|||
resp := xmlrpc.Response(body)
|
||||
|
||||
if resp.Err() != nil {
|
||||
return retries, fmt.Errorf("xmlrpc server returned an error: %v", resp.Err())
|
||||
return fmt.Errorf("xmlrpc server returned an error: %v", resp.Err())
|
||||
}
|
||||
|
||||
err = resp.Unmarshal(&reply)
|
||||
if err != nil {
|
||||
return retries, fmt.Errorf("cannot unmarshal the xmlrpc response: %v", err)
|
||||
return fmt.Errorf("cannot unmarshal the xmlrpc response: %v", err)
|
||||
}
|
||||
|
||||
if reply.Size != len(chunk) {
|
||||
return retries, fmt.Errorf("Sent a chunk of %d bytes, but server got %d bytes", len(chunk), reply.Size)
|
||||
return fmt.Errorf("Sent a chunk of %d bytes, but server got %d bytes", len(chunk), reply.Size)
|
||||
}
|
||||
|
||||
digest := fmt.Sprintf("%08x", adler32.Checksum(chunk))
|
||||
if reply.HexDigest != digest {
|
||||
return retries, fmt.Errorf("Sent a chunk with Adler32 digest %s, but server computed digest %s", digest, reply.HexDigest)
|
||||
return fmt.Errorf("Sent a chunk with Adler32 digest %s, but server computed digest %s", digest, reply.HexDigest)
|
||||
}
|
||||
|
||||
return retries, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upload uploads file to the temporary filepath on the kojiserver under the name filename
|
||||
// The md5sum and size of the file is returned on success.
|
||||
func (k *Koji) Upload(file io.Reader, filepath, filename string) (string, uint64, error) {
|
||||
chunk := make([]byte, 1024*1024) // upload a megabyte at a time
|
||||
retries := uint(0)
|
||||
offset := uint64(0)
|
||||
// Koji uses MD5 hashes
|
||||
/* #nosec G401 */
|
||||
|
|
@ -401,12 +374,11 @@ func (k *Koji) Upload(file io.Reader, filepath, filename string) (string, uint64
|
|||
}
|
||||
return "", 0, err
|
||||
}
|
||||
r, err := k.uploadChunk(chunk[:n], filepath, filename, offset)
|
||||
retries += r
|
||||
err = k.uploadChunk(chunk[:n], filepath, filename, offset)
|
||||
if err != nil {
|
||||
logrus.Infof("Koji upload failed after %d retries", retries)
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
offset += uint64(n)
|
||||
|
||||
m, err := hash.Write(chunk[:n])
|
||||
|
|
@ -417,7 +389,6 @@ func (k *Koji) Upload(file io.Reader, filepath, filename string) (string, uint64
|
|||
return "", 0, fmt.Errorf("sent %d bytes, but hashed %d", n, m)
|
||||
}
|
||||
}
|
||||
logrus.Infof("Koji upload successful after %d retries", retries)
|
||||
return fmt.Sprintf("%x", hash.Sum(nil)), offset, nil
|
||||
}
|
||||
|
||||
|
|
@ -492,3 +463,27 @@ func CreateKojiTransport(relaxTimeout uint) http.RoundTripper {
|
|||
|
||||
return rt
|
||||
}
|
||||
|
||||
func customCheckRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
|
||||
shouldRetry, retErr := rh.DefaultRetryPolicy(ctx, resp, err)
|
||||
|
||||
// DefaultRetryPolicy denies retrying for any certificate related error.
|
||||
// Override it in case the error is a timeout.
|
||||
if !shouldRetry && err != nil {
|
||||
if v, ok := err.(*url.Error); ok {
|
||||
if _, ok := v.Err.(x509.UnknownAuthorityError); ok {
|
||||
// retry if it's a timeout
|
||||
return strings.Contains(strings.ToLower(v.Error()), "timeout"), v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return shouldRetry, retErr
|
||||
}
|
||||
|
||||
func createCustomRetryableClient() *rh.Client {
|
||||
client := rh.NewClient()
|
||||
client.Logger = rh.LeveledLogger(&LeveledLogrus{logrus.StandardLogger()})
|
||||
client.CheckRetry = customCheckRetry
|
||||
return client
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue