diff --git a/internal/jobtracker/jobtracker.go b/internal/jobtracker/jobtracker.go index 23d3066..21df13b 100644 --- a/internal/jobtracker/jobtracker.go +++ b/internal/jobtracker/jobtracker.go @@ -14,8 +14,7 @@ type JobTracker struct { activeWorkers int32 queuedJobs int32 naps int32 - didWork bool - stop bool + started bool cond *sync.Cond Queue chan string send chan string @@ -73,37 +72,26 @@ func (jt *JobTracker) Nap() { } func (jt *JobTracker) EndWork() { - jt.didWork = true atomic.AddInt32(&jt.activeWorkers, -1) atomic.AddInt32(&jt.queuedJobs, -1) } func (jt *JobTracker) HasWork() bool { - hasWork := !jt.stop && (!jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0)) - + hasWork := !jt.started || atomic.LoadInt32(&jt.queuedJobs) > 0 || atomic.LoadInt32(&jt.activeWorkers) > 0 if !hasWork { jt.cond.Broadcast() } return hasWork } -func (jt *JobTracker) KillIfNoJobs() bool { - if atomic.LoadInt32(&jt.queuedJobs) == 0 { - jt.stop = true - return true - } - return false -} - func (jt *JobTracker) QueuedJobs() int32 { return atomic.LoadInt32(&jt.queuedJobs) } -func (jt *JobTracker) Wait() { - if !jt.KillIfNoJobs() { - jt.cond.L.Lock() - for jt.HasWork() { - jt.cond.Wait() - } +func (jt *JobTracker) StartAndWait() { + jt.started = true + jt.cond.L.Lock() + for jt.HasWork() { + jt.cond.Wait() } } diff --git a/pkg/goop/clone.go b/pkg/goop/clone.go index 9adbf04..b8feb7c 100644 --- a/pkg/goop/clone.go +++ b/pkg/goop/clone.go @@ -180,7 +180,7 @@ func FetchGit(baseUrl, baseDir string) error { for w := 1; w <= maxConcurrency; w++ { go workers.RecursiveDownloadWorker(c, baseUrl, baseDir, jt) } - jt.Wait() + jt.StartAndWait() log.Info().Str("dir", baseDir).Msg("running git checkout .") cmd := exec.Command("git", "checkout", ".") @@ -198,7 +198,7 @@ func FetchGit(baseUrl, baseDir string) error { for w := 1; w <= concurrency; w++ { go workers.DownloadWorker(c, baseUrl, baseDir, jt, false, false) } - jt.Wait() + jt.StartAndWait() jt = jobtracker.NewJobTracker() log.Info().Str("base", baseUrl).Msg("finding refs") @@ -208,7 +208,7 @@ func FetchGit(baseUrl, baseDir string) error { for w := 1; w <= maxConcurrency; w++ { go workers.FindRefWorker(c, baseUrl, baseDir, jt) } - jt.Wait() + jt.StartAndWait() log.Info().Str("base", baseUrl).Msg("finding packs") infoPacksPath := utils.Url(baseDir, ".git/objects/info/packs") @@ -227,7 +227,7 @@ func FetchGit(baseUrl, baseDir string) error { for w := 1; w <= concurrency; w++ { go workers.DownloadWorker(c, baseUrl, baseDir, jt, false, false) } - jt.Wait() + jt.StartAndWait() } log.Info().Str("base", baseUrl).Msg("finding objects") @@ -363,7 +363,7 @@ func FetchGit(baseUrl, baseDir string) error { for w := 1; w <= maxConcurrency; w++ { go workers.FindObjectsWorker(c, baseUrl, baseDir, jt, storage) } - jt.Wait() + jt.StartAndWait() // TODO: does this even make sense??????? if !utils.Exists(baseDir) { @@ -395,7 +395,7 @@ func FetchGit(baseUrl, baseDir string) error { for w := 1; w <= concurrency; w++ { go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true) } - jt.Wait() + jt.StartAndWait() /*// Fetch files marked as missing in status // TODO: why do we parse status AND decode index ??????? @@ -448,7 +448,7 @@ func FetchGit(baseUrl, baseDir string) error { for w := 1; w <= concurrency; w++ { go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true) } - jt.Wait() + jt.StartAndWait() } // TODO: turn into worker? @@ -510,7 +510,7 @@ func FetchGit(baseUrl, baseDir string) error { for w := 1; w <= concurrency; w++ { go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true) } - jt.Wait() + jt.StartAndWait() } } return nil