mirror of https://github.com/nyancrimew/goop.git
fix remaining deadlocks due to oversight + clean up
This commit is contained in:
parent
574b161773
commit
5c05777482
|
@ -32,6 +32,8 @@ func NewJobTracker() *JobTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jt *JobTracker) manageQueue() {
|
func (jt *JobTracker) manageQueue() {
|
||||||
|
defer close(jt.Queue)
|
||||||
|
defer close(jt.send)
|
||||||
queue := list.New()
|
queue := list.New()
|
||||||
for jt.HasWork() {
|
for jt.HasWork() {
|
||||||
if front := queue.Front(); front == nil {
|
if front := queue.Front(); front == nil {
|
||||||
|
@ -50,9 +52,6 @@ func (jt *JobTracker) manageQueue() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close(jt.Queue)
|
|
||||||
close(jt.send)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jt *JobTracker) AddJob(job string) {
|
func (jt *JobTracker) AddJob(job string) {
|
||||||
|
@ -68,7 +67,7 @@ func (jt *JobTracker) StartWork() {
|
||||||
func (jt *JobTracker) Nap() {
|
func (jt *JobTracker) Nap() {
|
||||||
ratio := float64(atomic.AddInt32(&jt.naps, 1)) / float64(atomic.LoadInt32(&jt.activeWorkers))
|
ratio := float64(atomic.AddInt32(&jt.naps, 1)) / float64(atomic.LoadInt32(&jt.activeWorkers))
|
||||||
n := utils.MaxInt(int(math.Ceil(ratio)), 1)
|
n := utils.MaxInt(int(math.Ceil(ratio)), 1)
|
||||||
time.Sleep(time.Duration(n) * 10 * time.Millisecond)
|
time.Sleep(time.Duration(n) * 15 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jt *JobTracker) EndWork() {
|
func (jt *JobTracker) EndWork() {
|
||||||
|
@ -78,12 +77,7 @@ func (jt *JobTracker) EndWork() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jt *JobTracker) HasWork() bool {
|
func (jt *JobTracker) HasWork() bool {
|
||||||
if jt.stop {
|
hasWork := !jt.stop && (!jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0))
|
||||||
return false
|
|
||||||
}
|
|
||||||
// TODO: didWork is a somewhat ugly workaround to ensure we dont exit before doing work at least once,
|
|
||||||
// this will however result in locking up if we create a JobTracker but never queue any jobs
|
|
||||||
hasWork := !jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0)
|
|
||||||
|
|
||||||
if !hasWork {
|
if !hasWork {
|
||||||
jt.cond.Broadcast()
|
jt.cond.Broadcast()
|
||||||
|
@ -104,9 +98,10 @@ func (jt *JobTracker) QueuedJobs() int32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jt *JobTracker) Wait() {
|
func (jt *JobTracker) Wait() {
|
||||||
jt.KillIfNoJobs()
|
if !jt.KillIfNoJobs() {
|
||||||
jt.cond.L.Lock()
|
jt.cond.L.Lock()
|
||||||
for jt.HasWork() {
|
for jt.HasWork() {
|
||||||
jt.cond.Wait()
|
jt.cond.Wait()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,7 @@ func findRefWork(c *fasthttp.Client, baseUrl, baseDir, path string, jt *jobtrack
|
||||||
} else {
|
} else {
|
||||||
checkedRefs[path] = true
|
checkedRefs[path] = true
|
||||||
}
|
}
|
||||||
|
checkedRefsMutex.Unlock()
|
||||||
|
|
||||||
targetFile := utils.Url(baseDir, path)
|
targetFile := utils.Url(baseDir, path)
|
||||||
if utils.Exists(targetFile) {
|
if utils.Exists(targetFile) {
|
||||||
|
|
|
@ -324,6 +324,7 @@ func FetchGit(baseUrl, baseDir string) error {
|
||||||
storage.IterEncodedObjects()
|
storage.IterEncodedObjects()
|
||||||
}*/
|
}*/
|
||||||
|
|
||||||
|
jt = jobtracker.NewJobTracker()
|
||||||
log.Info().Str("base", baseUrl).Msg("fetching object")
|
log.Info().Str("base", baseUrl).Msg("fetching object")
|
||||||
for obj := range objs {
|
for obj := range objs {
|
||||||
jt.AddJob(obj)
|
jt.AddJob(obj)
|
||||||
|
|
Loading…
Reference in New Issue