From 10e97aad664aefef472cf2c9065b3de45cb0429f Mon Sep 17 00:00:00 2001 From: maia tillie arson crimew Date: Wed, 20 Oct 2021 21:38:29 +0200 Subject: [PATCH] potentially fix deadlock issues with new queueing system --- internal/jobtracker/jobtracker.go | 19 ++++++++++++++++--- pkg/goop/clone.go | 1 + 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/internal/jobtracker/jobtracker.go b/internal/jobtracker/jobtracker.go index 78e6867..eb92e92 100644 --- a/internal/jobtracker/jobtracker.go +++ b/internal/jobtracker/jobtracker.go @@ -11,6 +11,7 @@ type JobTracker struct { activeWorkers int32 queuedJobs int32 didWork bool + stop bool cond *sync.Cond Queue chan string send chan string @@ -31,9 +32,6 @@ func NewJobTracker() *JobTracker { } func (jt *JobTracker) manageQueue() { - defer close(jt.Queue) - defer close(jt.send) - queue := list.New() for jt.HasWork() { if front := queue.Front(); front == nil { @@ -52,6 +50,9 @@ func (jt *JobTracker) manageQueue() { } } } + + close(jt.Queue) + close(jt.send) } func (jt *JobTracker) AddJob(job string) { @@ -71,6 +72,9 @@ func (jt *JobTracker) EndWork() { } func (jt *JobTracker) HasWork() bool { + if jt.stop { + return false + } // TODO: didWork is a somewhat ugly workaround to ensure we dont exit before doing work at least once, // this will however result in locking up if we create a JobTracker but never queue any jobs hasWork := !jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0) @@ -81,11 +85,20 @@ func (jt *JobTracker) HasWork() bool { return hasWork } +func (jt *JobTracker) KillIfNoJobs() bool { + if atomic.LoadInt32(&jt.queuedJobs) == 0 { + jt.stop = true + return true + } + return false +} + func (jt *JobTracker) QueuedJobs() int32 { return atomic.LoadInt32(&jt.queuedJobs) } func (jt *JobTracker) Wait() { + jt.KillIfNoJobs() jt.cond.L.Lock() for jt.HasWork() { jt.cond.Wait() diff --git a/pkg/goop/clone.go b/pkg/goop/clone.go index 725fa1b..cf7e88b 100644 --- a/pkg/goop/clone.go +++ b/pkg/goop/clone.go @@ -169,6 +169,7 @@ func FetchGit(baseUrl, baseDir string) error { } jt.Wait() + jt = jobtracker.NewJobTracker() log.Info().Str("base", baseUrl).Msg("finding refs") for _, ref := range commonRefs { jt.AddJob(ref)