diff --git a/internal/jobtracker/jobtracker.go b/internal/jobtracker/jobtracker.go index 5b19086..26b2d3e 100644 --- a/internal/jobtracker/jobtracker.go +++ b/internal/jobtracker/jobtracker.go @@ -32,6 +32,8 @@ func NewJobTracker() *JobTracker { } func (jt *JobTracker) manageQueue() { + defer close(jt.Queue) + defer close(jt.send) queue := list.New() for jt.HasWork() { if front := queue.Front(); front == nil { @@ -50,9 +52,6 @@ func (jt *JobTracker) manageQueue() { } } } - - close(jt.Queue) - close(jt.send) } func (jt *JobTracker) AddJob(job string) { @@ -68,7 +67,7 @@ func (jt *JobTracker) StartWork() { func (jt *JobTracker) Nap() { ratio := float64(atomic.AddInt32(&jt.naps, 1)) / float64(atomic.LoadInt32(&jt.activeWorkers)) n := utils.MaxInt(int(math.Ceil(ratio)), 1) - time.Sleep(time.Duration(n) * 10 * time.Millisecond) + time.Sleep(time.Duration(n) * 15 * time.Millisecond) } func (jt *JobTracker) EndWork() { @@ -78,12 +77,7 @@ func (jt *JobTracker) EndWork() { } func (jt *JobTracker) HasWork() bool { - if jt.stop { - return false - } - // TODO: didWork is a somewhat ugly workaround to ensure we dont exit before doing work at least once, - // this will however result in locking up if we create a JobTracker but never queue any jobs - hasWork := !jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0) + hasWork := !jt.stop && (!jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0)) if !hasWork { jt.cond.Broadcast() @@ -104,9 +98,10 @@ func (jt *JobTracker) QueuedJobs() int32 { } func (jt *JobTracker) Wait() { - jt.KillIfNoJobs() - jt.cond.L.Lock() - for jt.HasWork() { - jt.cond.Wait() + if !jt.KillIfNoJobs() { + jt.cond.L.Lock() + for jt.HasWork() { + jt.cond.Wait() + } } } diff --git a/internal/workers/findref.go b/internal/workers/findref.go index 0e83034..f6748a6 100644 --- a/internal/workers/findref.go +++ b/internal/workers/findref.go @@ -49,6 +49,7 @@ func findRefWork(c *fasthttp.Client, baseUrl, baseDir, path string, jt *jobtrack } else { checkedRefs[path] = true } + checkedRefsMutex.Unlock() targetFile := utils.Url(baseDir, path) if utils.Exists(targetFile) { diff --git a/pkg/goop/clone.go b/pkg/goop/clone.go index cf7e88b..35f89fe 100644 --- a/pkg/goop/clone.go +++ b/pkg/goop/clone.go @@ -324,6 +324,7 @@ func FetchGit(baseUrl, baseDir string) error { storage.IterEncodedObjects() }*/ + jt = jobtracker.NewJobTracker() log.Info().Str("base", baseUrl).Msg("fetching object") for obj := range objs { jt.AddJob(obj)