From e03c1fff653e75cedc2989598bc9c5308bc9a252 Mon Sep 17 00:00:00 2001 From: Lars Karlitski Date: Sun, 10 May 2020 11:11:58 +0200 Subject: [PATCH] fsjobqueue: factor reflect.Select out of Dequeue() Hopefully this make it easier to read. --- internal/jobqueue/fsjobqueue/fsjobqueue.go | 81 ++++++++++++++-------- 1 file changed, 54 insertions(+), 27 deletions(-) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index eb8a11725..3715657d9 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -14,6 +14,7 @@ package fsjobqueue import ( "context" "encoding/json" + "errors" "fmt" "reflect" "sort" @@ -150,36 +151,12 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu } func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interface{}) (uuid.UUID, error) { - // Return early if the context is already canceled. - if err := ctx.Err(); err != nil { + + id, err := selectUUIDChannel(ctx, q.pendingChannels(jobTypes)) + if err != nil { return uuid.Nil, err } - // Use reflect.Select(), because the `select` statement cannot operate - // on an unknown amount of channels. - cases := []reflect.SelectCase{ - { - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ctx.Done()), - }, - } - for _, jt := range jobTypes { - cases = append(cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(q.pendingChannel(jt)), - }) - } - - chosen, value, recvOK := reflect.Select(cases) - if chosen == 0 && !recvOK { - return uuid.Nil, ctx.Err() - } - // pending channels are never closed - if !recvOK { - panic("pending channel was closed unexpectedly") - } - id := value.Interface().(uuid.UUID) - j, err := q.readJob(id) if err != nil { return uuid.Nil, err @@ -312,6 +289,25 @@ func (q *fsJobQueue) pendingChannel(jobType string) chan uuid.UUID { return c } +// Same as pendingChannel(), but for multiple job types. +func (q *fsJobQueue) pendingChannels(jobTypes []string) []chan uuid.UUID { + q.pendingMutex.Lock() + defer q.pendingMutex.Unlock() + + chans := make([]chan uuid.UUID, len(jobTypes)) + + for i, jt := range jobTypes { + c, exists := q.pending[jt] + if !exists { + c = make(chan uuid.UUID, 100) + q.pending[jt] = c + } + chans[i] = c + } + + return chans +} + // Sorts and removes duplicates from `ids`. func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID { s := map[uuid.UUID]bool{} @@ -335,3 +331,34 @@ func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID { return l } + +// Select on a list of `chan uuid.UUID`s. Returns an error if one of the +// channels is closed. +// +// Uses reflect.Select(), because the `select` statement cannot operate on an +// unknown amount of channels. +func selectUUIDChannel(ctx context.Context, chans []chan uuid.UUID) (uuid.UUID, error) { + cases := []reflect.SelectCase{ + { + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ctx.Done()), + }, + } + for _, c := range chans { + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(c), + }) + } + + chosen, value, recvOK := reflect.Select(cases) + if !recvOK { + if chosen == 0 { + return uuid.Nil, ctx.Err() + } else { + return uuid.Nil, errors.New("channel was closed unexpectedly") + } + } + + return value.Interface().(uuid.UUID), nil +}