fsjobqueue: use a single mutex to protect all fields

In addition, it protects that all public functions operate on `db`
in transactions.
This commit is contained in:
Lars Karlitski 2020-05-10 12:13:58 +02:00 committed by Tom Gundersen
parent e03c1fff65
commit e599a95067
2 changed files with 64 additions and 21 deletions

View file

@ -27,19 +27,22 @@ import (
)
type fsJobQueue struct {
// Protects all fields of this struct. In particular, it ensures
// transactions on `db` are atomic. All public functions except
// JobStatus hold it while they're running. Dequeue() releases it
// briefly while waiting on pending channels.
mu sync.Mutex
db *jsondb.JSONDatabase
// Maps job types to channels of job ids for that type. Only access
// through pendingChannel() to ensure concurrent access is restricted
// by the mutex.
pending map[string]chan uuid.UUID
pendingMutex sync.Mutex
// through pendingChannel(), which ensures that a map for the given job
// typ exists.
pending map[string]chan uuid.UUID
// Maps job ids to the jobs that depend on it, if any of those
// dependants have not yet finished. Only acccess while holding the
// associated mutex.
dependants map[uuid.UUID][]uuid.UUID
dependantsMutex sync.Mutex
// dependants have not yet finished.
dependants map[uuid.UUID][]uuid.UUID
}
// On-disk job struct. Contains all necessary (but non-redundant) information
@ -96,8 +99,6 @@ func New(dir string) (*fsJobQueue, error) {
if n == len(j.Dependencies) {
q.pendingChannel(j.Type) <- j.Id
} else {
q.dependantsMutex.Lock()
defer q.dependantsMutex.Unlock()
for _, dep := range j.Dependencies {
q.dependants[dep] = append(q.dependants[dep], j.Id)
}
@ -108,6 +109,9 @@ func New(dir string) (*fsJobQueue, error) {
}
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
q.mu.Lock()
defer q.mu.Unlock()
var j = job{
Id: uuid.New(),
Type: jobType,
@ -140,8 +144,6 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
if finished == len(j.Dependencies) {
q.pendingChannel(j.Type) <- j.Id
} else {
q.dependantsMutex.Lock()
defer q.dependantsMutex.Unlock()
for _, id := range j.Dependencies {
q.dependants[id] = append(q.dependants[id], j.Id)
}
@ -151,8 +153,22 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
}
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interface{}) (uuid.UUID, error) {
q.mu.Lock()
defer q.mu.Unlock()
// Return early if the context is already canceled.
if err := ctx.Err(); err != nil {
return uuid.Nil, err
}
chans := q.pendingChannels(jobTypes)
// Unlock the mutex while polling channels, so that multiple goroutines
// can wait at the same time.
q.mu.Unlock()
id, err := selectUUIDChannel(ctx, chans)
q.mu.Lock()
id, err := selectUUIDChannel(ctx, q.pendingChannels(jobTypes))
if err != nil {
return uuid.Nil, err
}
@ -178,6 +194,9 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interf
}
func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return err
@ -200,8 +219,6 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return fmt.Errorf("error writing job %s: %v", id, err)
}
q.dependantsMutex.Lock()
defer q.dependantsMutex.Unlock()
for _, depid := range q.dependants[id] {
dep, err := q.readJob(depid)
if err != nil {
@ -277,9 +294,6 @@ func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
// Safe access to the pending channel for `jobType`. Channels are created on
// demand.
func (q *fsJobQueue) pendingChannel(jobType string) chan uuid.UUID {
q.pendingMutex.Lock()
defer q.pendingMutex.Unlock()
c, exists := q.pending[jobType]
if !exists {
c = make(chan uuid.UUID, 100)
@ -291,9 +305,6 @@ func (q *fsJobQueue) pendingChannel(jobType string) chan uuid.UUID {
// Same as pendingChannel(), but for multiple job types.
func (q *fsJobQueue) pendingChannels(jobTypes []string) []chan uuid.UUID {
q.pendingMutex.Lock()
defer q.pendingMutex.Unlock()
chans := make([]chan uuid.UUID, len(jobTypes))
for i, jt := range jobTypes {

View file

@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
@ -170,3 +171,34 @@ func TestDependencies(t *testing.T) {
require.True(t, !finished.IsZero())
})
}
// Test that a job queue allows parallel access to multiple workers, mainly to
// verify the quirky unlocking in Dequeue().
func TestMultipleWorkers(t *testing.T) {
q, dir := newTemporaryQueue(t)
defer cleanupTempDir(t, dir)
done := make(chan struct{})
go func() {
defer close(done)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
id, err := q.Dequeue(ctx, []string{"octopus"}, &json.RawMessage{})
require.NoError(t, err)
require.NotEmpty(t, id)
}()
// Increase the likelihood that the above goroutine was scheduled and
// is waiting in Dequeue().
time.Sleep(10 * time.Millisecond)
// This call to Dequeue() should not block on the one in the goroutine.
id := pushTestJob(t, q, "clownfish", nil, nil)
r, err := q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{})
require.NoError(t, err)
require.Equal(t, id, r)
// Now wake up the Dequeue() in the goroutine and wait for it to finish.
_ = pushTestJob(t, q, "octopus", nil, nil)
<-done
}