fsjobqueue: pass accepted job types to New()

This makes the queue more type safe and allows to get rid of the
`pendingChannel` and `pendingChannels` helpers, which only existed to
create not-yet-existing pending channels.
This commit is contained in:
Lars Karlitski 2020-05-10 18:17:05 +02:00 committed by Tom Gundersen
parent 3240f11647
commit 8df143fabe
3 changed files with 33 additions and 43 deletions

View file

@ -125,7 +125,7 @@ func main() {
log.Fatalf("cannot create queue directory: %v", err) log.Fatalf("cannot create queue directory: %v", err)
} }
jobs, err := fsjobqueue.New(queueDir) jobs, err := fsjobqueue.New(queueDir, []string{"osbuild"})
if err != nil { if err != nil {
log.Fatalf("cannot create jobqueue: %v", err) log.Fatalf("cannot create jobqueue: %v", err)
} }

View file

@ -35,9 +35,7 @@ type fsJobQueue struct {
db *jsondb.JSONDatabase db *jsondb.JSONDatabase
// Maps job types to channels of job ids for that type. Only access // Maps job types to channels of job ids for that type.
// through pendingChannel(), which ensures that a map for the given job
// typ exists.
pending map[string]chan uuid.UUID pending map[string]chan uuid.UUID
// Maps job ids to the jobs that depend on it, if any of those // Maps job ids to the jobs that depend on it, if any of those
@ -63,13 +61,17 @@ type job struct {
// Create a new fsJobQueue object for `dir`. This object must have exclusive // Create a new fsJobQueue object for `dir`. This object must have exclusive
// access to `dir`. If `dir` contains jobs created from previous runs, they are // access to `dir`. If `dir` contains jobs created from previous runs, they are
// loaded and rescheduled to run if necessary. // loaded and rescheduled to run if necessary.
func New(dir string) (*fsJobQueue, error) { func New(dir string, acceptedJobTypes []string) (*fsJobQueue, error) {
q := &fsJobQueue{ q := &fsJobQueue{
db: jsondb.New(dir, 0600), db: jsondb.New(dir, 0600),
pending: make(map[string]chan uuid.UUID), pending: make(map[string]chan uuid.UUID),
dependants: make(map[uuid.UUID][]uuid.UUID), dependants: make(map[uuid.UUID][]uuid.UUID),
} }
for _, jt := range acceptedJobTypes {
q.pending[jt] = make(chan uuid.UUID, 100)
}
// Look for jobs that are still pending and build the dependant map. // Look for jobs that are still pending and build the dependant map.
ids, err := q.db.List() ids, err := q.db.List()
if err != nil { if err != nil {
@ -97,6 +99,10 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
q.mu.Lock() q.mu.Lock()
defer q.mu.Unlock() defer q.mu.Unlock()
if _, exists := q.pending[jobType]; !exists {
return uuid.Nil, fmt.Errorf("this queue does not accept job type '%s'", jobType)
}
var j = job{ var j = job{
Id: uuid.New(), Id: uuid.New(),
Type: jobType, Type: jobType,
@ -146,7 +152,14 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interf
return uuid.Nil, err return uuid.Nil, err
} }
chans := q.pendingChannels(jobTypes) // Filter q.pending by the `jobTypes`. Ignore those job types that this
// queue doesn't accept.
chans := []chan uuid.UUID{}
for _, jt := range jobTypes {
if c, exists := q.pending[jt]; exists {
chans = append(chans, c)
}
}
// Unlock the mutex while polling channels, so that multiple goroutines // Unlock the mutex while polling channels, so that multiple goroutines
// can wait at the same time. // can wait at the same time.
@ -278,7 +291,11 @@ func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
} }
if depsFinished { if depsFinished {
q.pendingChannel(j.Type) <- j.Id c, exists := q.pending[j.Type]
if !exists {
return fmt.Errorf("this queue doesn't accept job type '%s'", j.Type)
}
c <- j.Id
} else if updateDependants { } else if updateDependants {
for _, id := range j.Dependencies { for _, id := range j.Dependencies {
q.dependants[id] = append(q.dependants[id], j.Id) q.dependants[id] = append(q.dependants[id], j.Id)
@ -288,34 +305,6 @@ func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
return nil return nil
} }
// Safe access to the pending channel for `jobType`. Channels are created on
// demand.
func (q *fsJobQueue) pendingChannel(jobType string) chan uuid.UUID {
c, exists := q.pending[jobType]
if !exists {
c = make(chan uuid.UUID, 100)
q.pending[jobType] = c
}
return c
}
// Same as pendingChannel(), but for multiple job types.
func (q *fsJobQueue) pendingChannels(jobTypes []string) []chan uuid.UUID {
chans := make([]chan uuid.UUID, len(jobTypes))
for i, jt := range jobTypes {
c, exists := q.pending[jt]
if !exists {
c = make(chan uuid.UUID, 100)
q.pending[jt] = c
}
chans[i] = c
}
return chans
}
// Sorts and removes duplicates from `ids`. // Sorts and removes duplicates from `ids`.
func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID { func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID {
s := map[uuid.UUID]bool{} s := map[uuid.UUID]bool{}

View file

@ -23,11 +23,11 @@ func cleanupTempDir(t *testing.T, dir string) {
require.NoError(t, err) require.NoError(t, err)
} }
func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) { func newTemporaryQueue(t *testing.T, jobTypes []string) (jobqueue.JobQueue, string) {
dir, err := ioutil.TempDir("", "jobqueue-test-") dir, err := ioutil.TempDir("", "jobqueue-test-")
require.NoError(t, err) require.NoError(t, err)
q, err := fsjobqueue.New(dir) q, err := fsjobqueue.New(dir, jobTypes)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, q) require.NotNil(t, q)
@ -35,6 +35,7 @@ func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) {
} }
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID { func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
t.Helper()
id, err := q.Enqueue(jobType, args, dependencies) id, err := q.Enqueue(jobType, args, dependencies)
require.NoError(t, err) require.NoError(t, err)
require.NotEmpty(t, id) require.NotEmpty(t, id)
@ -53,13 +54,13 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result
} }
func TestNonExistant(t *testing.T) { func TestNonExistant(t *testing.T) {
q, err := fsjobqueue.New("/non-existant-directory") q, err := fsjobqueue.New("/non-existant-directory", []string{})
require.Error(t, err) require.Error(t, err)
require.Nil(t, q) require.Nil(t, q)
} }
func TestErrors(t *testing.T) { func TestErrors(t *testing.T) {
q, dir := newTemporaryQueue(t) q, dir := newTemporaryQueue(t, []string{"test"})
defer cleanupTempDir(t, dir) defer cleanupTempDir(t, dir)
// not serializable to JSON // not serializable to JSON
@ -79,7 +80,7 @@ func TestArgs(t *testing.T) {
S string S string
} }
q, dir := newTemporaryQueue(t) q, dir := newTemporaryQueue(t, []string{"fish", "octopus"})
defer cleanupTempDir(t, dir) defer cleanupTempDir(t, dir)
oneargs := argument{7, "🐠"} oneargs := argument{7, "🐠"}
@ -101,7 +102,7 @@ func TestArgs(t *testing.T) {
} }
func TestJobTypes(t *testing.T) { func TestJobTypes(t *testing.T) {
q, dir := newTemporaryQueue(t) q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"})
defer cleanupTempDir(t, dir) defer cleanupTempDir(t, dir)
one := pushTestJob(t, q, "octopus", nil, nil) one := pushTestJob(t, q, "octopus", nil, nil)
@ -118,7 +119,7 @@ func TestJobTypes(t *testing.T) {
} }
func TestDependencies(t *testing.T) { func TestDependencies(t *testing.T) {
q, dir := newTemporaryQueue(t) q, dir := newTemporaryQueue(t, []string{"test"})
defer cleanupTempDir(t, dir) defer cleanupTempDir(t, dir)
t.Run("done-before-pushing-dependant", func(t *testing.T) { t.Run("done-before-pushing-dependant", func(t *testing.T) {
@ -175,7 +176,7 @@ func TestDependencies(t *testing.T) {
// Test that a job queue allows parallel access to multiple workers, mainly to // Test that a job queue allows parallel access to multiple workers, mainly to
// verify the quirky unlocking in Dequeue(). // verify the quirky unlocking in Dequeue().
func TestMultipleWorkers(t *testing.T) { func TestMultipleWorkers(t *testing.T) {
q, dir := newTemporaryQueue(t) q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"})
defer cleanupTempDir(t, dir) defer cleanupTempDir(t, dir)
done := make(chan struct{}) done := make(chan struct{})