clean up JobTracker and make it more reliable

This commit is contained in:
maia tillie arson crimew 2021-10-21 00:55:18 +02:00
parent 1a084596d5
commit 9d7d6f58e7
2 changed files with 15 additions and 27 deletions

View File

@ -14,8 +14,7 @@ type JobTracker struct {
activeWorkers int32
queuedJobs int32
naps int32
didWork bool
stop bool
started bool
cond *sync.Cond
Queue chan string
send chan string
@ -73,37 +72,26 @@ func (jt *JobTracker) Nap() {
}
func (jt *JobTracker) EndWork() {
jt.didWork = true
atomic.AddInt32(&jt.activeWorkers, -1)
atomic.AddInt32(&jt.queuedJobs, -1)
}
func (jt *JobTracker) HasWork() bool {
hasWork := !jt.stop && (!jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0))
hasWork := !jt.started || atomic.LoadInt32(&jt.queuedJobs) > 0 || atomic.LoadInt32(&jt.activeWorkers) > 0
if !hasWork {
jt.cond.Broadcast()
}
return hasWork
}
func (jt *JobTracker) KillIfNoJobs() bool {
if atomic.LoadInt32(&jt.queuedJobs) == 0 {
jt.stop = true
return true
}
return false
}
func (jt *JobTracker) QueuedJobs() int32 {
return atomic.LoadInt32(&jt.queuedJobs)
}
func (jt *JobTracker) Wait() {
if !jt.KillIfNoJobs() {
jt.cond.L.Lock()
for jt.HasWork() {
jt.cond.Wait()
}
func (jt *JobTracker) StartAndWait() {
jt.started = true
jt.cond.L.Lock()
for jt.HasWork() {
jt.cond.Wait()
}
}

View File

@ -180,7 +180,7 @@ func FetchGit(baseUrl, baseDir string) error {
for w := 1; w <= maxConcurrency; w++ {
go workers.RecursiveDownloadWorker(c, baseUrl, baseDir, jt)
}
jt.Wait()
jt.StartAndWait()
log.Info().Str("dir", baseDir).Msg("running git checkout .")
cmd := exec.Command("git", "checkout", ".")
@ -198,7 +198,7 @@ func FetchGit(baseUrl, baseDir string) error {
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, false, false)
}
jt.Wait()
jt.StartAndWait()
jt = jobtracker.NewJobTracker()
log.Info().Str("base", baseUrl).Msg("finding refs")
@ -208,7 +208,7 @@ func FetchGit(baseUrl, baseDir string) error {
for w := 1; w <= maxConcurrency; w++ {
go workers.FindRefWorker(c, baseUrl, baseDir, jt)
}
jt.Wait()
jt.StartAndWait()
log.Info().Str("base", baseUrl).Msg("finding packs")
infoPacksPath := utils.Url(baseDir, ".git/objects/info/packs")
@ -227,7 +227,7 @@ func FetchGit(baseUrl, baseDir string) error {
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, false, false)
}
jt.Wait()
jt.StartAndWait()
}
log.Info().Str("base", baseUrl).Msg("finding objects")
@ -363,7 +363,7 @@ func FetchGit(baseUrl, baseDir string) error {
for w := 1; w <= maxConcurrency; w++ {
go workers.FindObjectsWorker(c, baseUrl, baseDir, jt, storage)
}
jt.Wait()
jt.StartAndWait()
// TODO: does this even make sense???????
if !utils.Exists(baseDir) {
@ -395,7 +395,7 @@ func FetchGit(baseUrl, baseDir string) error {
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true)
}
jt.Wait()
jt.StartAndWait()
/*// Fetch files marked as missing in status
// TODO: why do we parse status AND decode index ???????
@ -448,7 +448,7 @@ func FetchGit(baseUrl, baseDir string) error {
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true)
}
jt.Wait()
jt.StartAndWait()
}
// TODO: turn into worker?
@ -510,7 +510,7 @@ func FetchGit(baseUrl, baseDir string) error {
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true)
}
jt.Wait()
jt.StartAndWait()
}
}
return nil