jobqueue: Better logging

Use appropriate logging levels for different statements
Log important jobqueue events with relevant information
This commit is contained in:
Diaa Sami 2021-09-22 16:22:22 +02:00 committed by Tom Gundersen
parent 97fe226c8a
commit 751ef84fc1

View file

@ -10,7 +10,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"time"
"github.com/google/uuid"
@ -20,6 +19,7 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
logrus "github.com/sirupsen/logrus"
)
const (
@ -117,7 +117,7 @@ func (q *dbJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
defer func() {
err := tx.Rollback(context.Background())
if err != nil && !errors.As(err, &pgx.ErrTxClosed) {
log.Println("error rolling back enqueue transaction: ", err)
logrus.Error("error rolling back enqueue transaction: ", err)
}
}()
@ -144,6 +144,8 @@ func (q *dbJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
return uuid.Nil, fmt.Errorf("unable to commit database transaction: %v", err)
}
logrus.Infof("Enqueued job of type %s with ID %s", jobType, id)
return id, nil
}
@ -160,7 +162,7 @@ func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
defer func() {
_, err := conn.Exec(ctx, sqlUnlisten)
if err != nil {
log.Println("Error unlistening for jobs in dequeue: ", err)
logrus.Error("Error unlistening for jobs in dequeue: ", err)
}
conn.Release()
}()
@ -202,6 +204,8 @@ func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
}
logrus.Infof("Dequeued job of type %v with ID %s", jobType, id)
return id, token, dependencies, jobType, args, nil
}
@ -219,7 +223,7 @@ func (q *dbJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
defer func() {
err = tx.Rollback(context.Background())
if err != nil && !errors.As(err, &pgx.ErrTxClosed) {
log.Println("error rolling back finish job transaction: ", err)
logrus.Error("error rolling back finish job transaction: ", err)
}
}()
@ -267,6 +271,8 @@ func (q *dbJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return fmt.Errorf("unable to commit database transaction: %v", err)
}
logrus.Infof("Finished job with ID %s", id)
return nil
}
@ -286,6 +292,8 @@ func (q *dbJobQueue) CancelJob(id uuid.UUID) error {
return jobqueue.ErrNotRunning
}
logrus.Infof("Cancelled job with ID %s", id)
return nil
}
@ -377,13 +385,13 @@ func (q *dbJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) {
err = rows.Scan(&t)
if err != nil {
// Log the error and try to continue with the next row
log.Println("Unable to read token from heartbeats: ", err)
logrus.Error("Unable to read token from heartbeats: ", err)
continue
}
tokens = append(tokens, t)
}
if rows.Err() != nil {
log.Println("Error reading tokens from heartbeats: ", rows.Err())
logrus.Error("Error reading tokens from heartbeats: ", rows.Err())
}
return
@ -399,10 +407,10 @@ func (q *dbJobQueue) RefreshHeartbeat(token uuid.UUID) {
tag, err := conn.Exec(context.Background(), sqlRefreshHeartbeat, token)
if err != nil {
log.Println("Error refreshing heartbeat: ", err)
logrus.Error("Error refreshing heartbeat: ", err)
}
if tag.RowsAffected() != 1 {
log.Println("No rows affected when refreshing heartbeat for ", token)
logrus.Error("No rows affected when refreshing heartbeat for ", token)
}
}