potentially fix deadlock issues with new queueing system

This commit is contained in:
maia tillie arson crimew 2021-10-20 21:38:29 +02:00
parent 7a8066e0d5
commit 10e97aad66
2 changed files with 17 additions and 3 deletions

View File

@ -11,6 +11,7 @@ type JobTracker struct {
activeWorkers int32 activeWorkers int32
queuedJobs int32 queuedJobs int32
didWork bool didWork bool
stop bool
cond *sync.Cond cond *sync.Cond
Queue chan string Queue chan string
send chan string send chan string
@ -31,9 +32,6 @@ func NewJobTracker() *JobTracker {
} }
func (jt *JobTracker) manageQueue() { func (jt *JobTracker) manageQueue() {
defer close(jt.Queue)
defer close(jt.send)
queue := list.New() queue := list.New()
for jt.HasWork() { for jt.HasWork() {
if front := queue.Front(); front == nil { if front := queue.Front(); front == nil {
@ -52,6 +50,9 @@ func (jt *JobTracker) manageQueue() {
} }
} }
} }
close(jt.Queue)
close(jt.send)
} }
func (jt *JobTracker) AddJob(job string) { func (jt *JobTracker) AddJob(job string) {
@ -71,6 +72,9 @@ func (jt *JobTracker) EndWork() {
} }
func (jt *JobTracker) HasWork() bool { func (jt *JobTracker) HasWork() bool {
if jt.stop {
return false
}
// TODO: didWork is a somewhat ugly workaround to ensure we dont exit before doing work at least once, // TODO: didWork is a somewhat ugly workaround to ensure we dont exit before doing work at least once,
// this will however result in locking up if we create a JobTracker but never queue any jobs // this will however result in locking up if we create a JobTracker but never queue any jobs
hasWork := !jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0) hasWork := !jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0)
@ -81,11 +85,20 @@ func (jt *JobTracker) HasWork() bool {
return hasWork return hasWork
} }
func (jt *JobTracker) KillIfNoJobs() bool {
if atomic.LoadInt32(&jt.queuedJobs) == 0 {
jt.stop = true
return true
}
return false
}
func (jt *JobTracker) QueuedJobs() int32 { func (jt *JobTracker) QueuedJobs() int32 {
return atomic.LoadInt32(&jt.queuedJobs) return atomic.LoadInt32(&jt.queuedJobs)
} }
func (jt *JobTracker) Wait() { func (jt *JobTracker) Wait() {
jt.KillIfNoJobs()
jt.cond.L.Lock() jt.cond.L.Lock()
for jt.HasWork() { for jt.HasWork() {
jt.cond.Wait() jt.cond.Wait()

View File

@ -169,6 +169,7 @@ func FetchGit(baseUrl, baseDir string) error {
} }
jt.Wait() jt.Wait()
jt = jobtracker.NewJobTracker()
log.Info().Str("base", baseUrl).Msg("finding refs") log.Info().Str("base", baseUrl).Msg("finding refs")
for _, ref := range commonRefs { for _, ref := range commonRefs {
jt.AddJob(ref) jt.AddJob(ref)