fsjobqueue: factor reflect.Select out of Dequeue()
Hopefully this make it easier to read.
This commit is contained in:
parent
6773c01722
commit
e03c1fff65
1 changed files with 54 additions and 27 deletions
|
|
@ -14,6 +14,7 @@ package fsjobqueue
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
|
|
@ -150,36 +151,12 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
|
|||
}
|
||||
|
||||
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interface{}) (uuid.UUID, error) {
|
||||
// Return early if the context is already canceled.
|
||||
if err := ctx.Err(); err != nil {
|
||||
|
||||
id, err := selectUUIDChannel(ctx, q.pendingChannels(jobTypes))
|
||||
if err != nil {
|
||||
return uuid.Nil, err
|
||||
}
|
||||
|
||||
// Use reflect.Select(), because the `select` statement cannot operate
|
||||
// on an unknown amount of channels.
|
||||
cases := []reflect.SelectCase{
|
||||
{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(ctx.Done()),
|
||||
},
|
||||
}
|
||||
for _, jt := range jobTypes {
|
||||
cases = append(cases, reflect.SelectCase{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(q.pendingChannel(jt)),
|
||||
})
|
||||
}
|
||||
|
||||
chosen, value, recvOK := reflect.Select(cases)
|
||||
if chosen == 0 && !recvOK {
|
||||
return uuid.Nil, ctx.Err()
|
||||
}
|
||||
// pending channels are never closed
|
||||
if !recvOK {
|
||||
panic("pending channel was closed unexpectedly")
|
||||
}
|
||||
id := value.Interface().(uuid.UUID)
|
||||
|
||||
j, err := q.readJob(id)
|
||||
if err != nil {
|
||||
return uuid.Nil, err
|
||||
|
|
@ -312,6 +289,25 @@ func (q *fsJobQueue) pendingChannel(jobType string) chan uuid.UUID {
|
|||
return c
|
||||
}
|
||||
|
||||
// Same as pendingChannel(), but for multiple job types.
|
||||
func (q *fsJobQueue) pendingChannels(jobTypes []string) []chan uuid.UUID {
|
||||
q.pendingMutex.Lock()
|
||||
defer q.pendingMutex.Unlock()
|
||||
|
||||
chans := make([]chan uuid.UUID, len(jobTypes))
|
||||
|
||||
for i, jt := range jobTypes {
|
||||
c, exists := q.pending[jt]
|
||||
if !exists {
|
||||
c = make(chan uuid.UUID, 100)
|
||||
q.pending[jt] = c
|
||||
}
|
||||
chans[i] = c
|
||||
}
|
||||
|
||||
return chans
|
||||
}
|
||||
|
||||
// Sorts and removes duplicates from `ids`.
|
||||
func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID {
|
||||
s := map[uuid.UUID]bool{}
|
||||
|
|
@ -335,3 +331,34 @@ func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID {
|
|||
|
||||
return l
|
||||
}
|
||||
|
||||
// Select on a list of `chan uuid.UUID`s. Returns an error if one of the
|
||||
// channels is closed.
|
||||
//
|
||||
// Uses reflect.Select(), because the `select` statement cannot operate on an
|
||||
// unknown amount of channels.
|
||||
func selectUUIDChannel(ctx context.Context, chans []chan uuid.UUID) (uuid.UUID, error) {
|
||||
cases := []reflect.SelectCase{
|
||||
{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(ctx.Done()),
|
||||
},
|
||||
}
|
||||
for _, c := range chans {
|
||||
cases = append(cases, reflect.SelectCase{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(c),
|
||||
})
|
||||
}
|
||||
|
||||
chosen, value, recvOK := reflect.Select(cases)
|
||||
if !recvOK {
|
||||
if chosen == 0 {
|
||||
return uuid.Nil, ctx.Err()
|
||||
} else {
|
||||
return uuid.Nil, errors.New("channel was closed unexpectedly")
|
||||
}
|
||||
}
|
||||
|
||||
return value.Interface().(uuid.UUID), nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue