Refactor JobTracker to prepare for library extraction

This commit is contained in:
maia tillie arson crimew 2021-10-24 18:52:27 +02:00
parent 0703f40da3
commit 967f998486
7 changed files with 174 additions and 213 deletions

View File

@ -6,53 +6,49 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/deletescape/goop/internal/utils"
)
type JobTracker struct {
activeWorkers int32
queuedJobs int32
naps int32
started bool
cond *sync.Cond
Queue chan string
send chan string
worker Worker
maxConcurrency int32
activeWorkers int32
queuedJobs int32
naps int32
napper Napper
started bool
cond *sync.Cond
queue chan string
send chan string
}
func NewJobTracker() *JobTracker {
type Worker func(jt *JobTracker, job string, context Context)
type Napper func(naps, activeWorkers int32)
type Context interface{}
// DefaultNapper will sleep a minimum of 15 milliseconds per nap, but increasingly longer the more often workers go idle
// this ensures that idle workers do not take up too much cpu time, and should prevent accidental early exits
func DefaultNapper(naps int32, activeWorkers int32) {
ratio := float64(naps) / float64(activeWorkers)
n := int(math.Ceil(ratio))
if n < 1 {
n = 1
}
time.Sleep(time.Duration(n) * 15 * time.Millisecond)
}
func NewJobTracker(worker Worker, maxConcurrency int32, napper Napper) *JobTracker {
jt := &JobTracker{
cond: sync.NewCond(&sync.Mutex{}),
Queue: make(chan string, 1),
send: make(chan string, 1),
worker: worker,
maxConcurrency: maxConcurrency,
cond: sync.NewCond(&sync.Mutex{}),
napper: napper,
queue: make(chan string, 1),
send: make(chan string, 1),
}
go jt.manageQueue()
return jt
}
func (jt *JobTracker) manageQueue() {
defer close(jt.Queue)
defer close(jt.send)
queue := list.New()
for jt.HasWork() {
if front := queue.Front(); front == nil {
value, ok := <-jt.send
if ok {
queue.PushBack(value)
}
} else {
select {
case jt.Queue <- front.Value.(string):
queue.Remove(front)
case value, ok := <-jt.send:
if ok {
queue.PushBack(value)
}
}
}
}
}
func (jt *JobTracker) AddJob(job string) {
if job == "" {
return
@ -61,22 +57,61 @@ func (jt *JobTracker) AddJob(job string) {
jt.send <- job
}
func (jt *JobTracker) StartWork() {
func (jt *JobTracker) AddJobs(jobs ...string) {
for _, job := range jobs {
jt.AddJob(job)
}
}
func (jt *JobTracker) StartAndWait(context Context) {
numWorkers := int(min32(atomic.LoadInt32(&jt.queuedJobs), jt.maxConcurrency))
for w := 0; w < numWorkers; w++ {
go workRoutine(jt, jt.worker, context)
}
jt.started = true
jt.cond.L.Lock()
for jt.hasWork() {
jt.cond.Wait()
}
}
func (jt *JobTracker) manageQueue() {
defer close(jt.queue)
defer close(jt.send)
queue := list.New()
for jt.hasWork() {
if front := queue.Front(); front == nil {
value, ok := <-jt.send
if ok {
queue.PushBack(value)
}
} else {
select {
case jt.queue <- front.Value.(string):
queue.Remove(front)
case value, ok := <-jt.send:
if ok {
queue.PushBack(value)
}
}
}
}
}
func (jt *JobTracker) startWork() {
atomic.AddInt32(&jt.activeWorkers, 1)
}
func (jt *JobTracker) Nap() {
ratio := float64(atomic.AddInt32(&jt.naps, 1)) / float64(atomic.LoadInt32(&jt.activeWorkers))
n := utils.MaxInt(int(math.Ceil(ratio)), 1)
time.Sleep(time.Duration(n) * 15 * time.Millisecond)
func (jt *JobTracker) nap() {
jt.napper(atomic.LoadInt32(&jt.naps), atomic.LoadInt32(&jt.activeWorkers))
}
func (jt *JobTracker) EndWork() {
func (jt *JobTracker) endWork() {
atomic.AddInt32(&jt.activeWorkers, -1)
atomic.AddInt32(&jt.queuedJobs, -1)
}
func (jt *JobTracker) HasWork() bool {
func (jt *JobTracker) hasWork() bool {
hasWork := !jt.started || atomic.LoadInt32(&jt.queuedJobs) > 0 || atomic.LoadInt32(&jt.activeWorkers) > 0
if !hasWork {
jt.cond.Broadcast()
@ -84,14 +119,25 @@ func (jt *JobTracker) HasWork() bool {
return hasWork
}
func (jt *JobTracker) QueuedJobs() int32 {
return atomic.LoadInt32(&jt.queuedJobs)
}
func (jt *JobTracker) StartAndWait() {
jt.started = true
jt.cond.L.Lock()
for jt.HasWork() {
jt.cond.Wait()
func workRoutine(jt *JobTracker, worker Worker, context Context) {
for {
select {
case job := <-jt.queue:
jt.startWork()
worker(jt, job, context)
jt.endWork()
default:
if !jt.hasWork() {
return
}
jt.nap()
}
}
}
func min32(x, y int32) int32 {
if x < y {
return x
}
return y
}

View File

@ -12,27 +12,18 @@ import (
"github.com/phuslu/log"
)
func CreateObjectWorker(baseDir string, jt *jobtracker.JobTracker, storage *filesystem.ObjectStorage, index *index.Index) {
for {
select {
case file := <-jt.Queue:
createObjWork(baseDir, file, jt, storage, index)
default:
if !jt.HasWork() {
return
}
jt.Nap()
}
}
type CreateObjectContext struct {
BaseDir string
Storage *filesystem.ObjectStorage
Index *index.Index
}
func createObjWork(baseDir, f string, jt *jobtracker.JobTracker, storage *filesystem.ObjectStorage, idx *index.Index) {
jt.StartWork()
defer jt.EndWork()
func CreateObjectWorker(jt *jobtracker.JobTracker, f string, context jobtracker.Context) {
c := context.(CreateObjectContext)
fp := utils.Url(baseDir, f)
fp := utils.Url(c.BaseDir, f)
entry, err := idx.Entry(f)
entry, err := c.Index.Entry(f)
if err != nil {
log.Error().Str("file", f).Err(err).Msg("file is not in index")
return
@ -60,7 +51,7 @@ func createObjWork(baseDir, f string, jt *jobtracker.JobTracker, storage *filesy
return
}
obj := storage.NewEncodedObject()
obj := c.Storage.NewEncodedObject()
obj.SetSize(int64(len(content)))
obj.SetType(plumbing.BlobObject)
@ -72,7 +63,7 @@ func createObjWork(baseDir, f string, jt *jobtracker.JobTracker, storage *filesy
defer ow.Close()
ow.Write(content)
_, err = storage.SetEncodedObject(obj)
_, err = c.Storage.SetEncodedObject(obj)
if err != nil {
log.Error().Str("file", f).Err(err).Msg("failed to create object")
return

View File

@ -10,33 +10,25 @@ import (
"github.com/valyala/fasthttp"
)
func DownloadWorker(c *fasthttp.Client, baseUrl, baseDir string, jt *jobtracker.JobTracker, allowHtml, allowEmpty bool) {
for {
select {
case file := <-jt.Queue:
downloadWork(c, baseUrl, baseDir, file, jt, allowHtml, allowEmpty)
default:
if !jt.HasWork() {
return
}
jt.Nap()
}
}
type DownloadContext struct {
C *fasthttp.Client
BaseUrl string
BaseDir string
AllowHtml bool
AlllowEmpty bool
}
func downloadWork(c *fasthttp.Client, baseUrl, baseDir, file string, jt *jobtracker.JobTracker, allowHtml, allowEmpty bool) {
jt.StartWork()
defer jt.EndWork()
func DownloadWorker(jt *jobtracker.JobTracker, file string, context jobtracker.Context) {
c := context.(DownloadContext)
checkRatelimted()
targetFile := utils.Url(baseDir, file)
targetFile := utils.Url(c.BaseDir, file)
if utils.Exists(targetFile) {
log.Info().Str("file", targetFile).Msg("already fetched, skipping redownload")
return
}
uri := utils.Url(baseUrl, file)
code, body, err := c.Get(nil, uri)
uri := utils.Url(c.BaseUrl, file)
code, body, err := c.C.Get(nil, uri)
if err == nil && code != 200 {
if code == 429 {
setRatelimited()
@ -50,11 +42,11 @@ func downloadWork(c *fasthttp.Client, baseUrl, baseDir, file string, jt *jobtrac
return
}
if !allowHtml && utils.IsHtml(body) {
if !c.AllowHtml && utils.IsHtml(body) {
log.Warn().Str("uri", uri).Msg("file appears to be html, skipping")
return
}
if !allowEmpty && utils.IsEmptyBytes(body) {
if !c.AlllowEmpty && utils.IsEmptyBytes(body) {
log.Warn().Str("uri", uri).Msg("file appears to be empty, skipping")
return
}

View File

@ -18,23 +18,15 @@ import (
var checkedObjs = make(map[string]bool)
var checkedObjsMutex sync.Mutex
func FindObjectsWorker(c *fasthttp.Client, baseUrl, baseDir string, jt *jobtracker.JobTracker, storage *filesystem.ObjectStorage) {
for {
select {
case obj := <-jt.Queue:
findObjWork(c, baseUrl, baseDir, obj, jt, storage)
default:
if !jt.HasWork() {
return
}
jt.Nap()
}
}
type FindObjectsContext struct {
C *fasthttp.Client
BaseUrl string
BaseDir string
Storage *filesystem.ObjectStorage
}
func findObjWork(c *fasthttp.Client, baseUrl, baseDir, obj string, jt *jobtracker.JobTracker, storage *filesystem.ObjectStorage) {
jt.StartWork()
defer jt.EndWork()
func FindObjectsWorker(jt *jobtracker.JobTracker, obj string, context jobtracker.Context) {
c := context.(FindObjectsContext)
checkRatelimted()
@ -53,15 +45,15 @@ func findObjWork(c *fasthttp.Client, baseUrl, baseDir, obj string, jt *jobtracke
checkedObjsMutex.Unlock()
file := fmt.Sprintf(".git/objects/%s/%s", obj[:2], obj[2:])
fullPath := utils.Url(baseDir, file)
fullPath := utils.Url(c.BaseDir, file)
if utils.Exists(fullPath) {
log.Info().Str("obj", obj).Msg("already fetched, skipping redownload")
encObj, err := storage.EncodedObject(plumbing.AnyObject, plumbing.NewHash(obj))
encObj, err := c.Storage.EncodedObject(plumbing.AnyObject, plumbing.NewHash(obj))
if err != nil {
log.Error().Str("obj", obj).Err(err).Msg("couldn't read object")
return
}
decObj, err := object.DecodeObject(storage, encObj)
decObj, err := object.DecodeObject(c.Storage, encObj)
if err != nil {
log.Error().Str("obj", obj).Err(err).Msg("couldn't decode object")
return
@ -73,8 +65,8 @@ func findObjWork(c *fasthttp.Client, baseUrl, baseDir, obj string, jt *jobtracke
return
}
uri := utils.Url(baseUrl, file)
code, body, err := c.Get(nil, uri)
uri := utils.Url(c.BaseUrl, file)
code, body, err := c.C.Get(nil, uri)
if err == nil && code != 200 {
if code == 429 {
setRatelimited()
@ -107,12 +99,12 @@ func findObjWork(c *fasthttp.Client, baseUrl, baseDir, obj string, jt *jobtracke
log.Info().Str("obj", obj).Msg("fetched object")
encObj, err := storage.EncodedObject(plumbing.AnyObject, plumbing.NewHash(obj))
encObj, err := c.Storage.EncodedObject(plumbing.AnyObject, plumbing.NewHash(obj))
if err != nil {
log.Error().Str("obj", obj).Err(err).Msg("couldn't read object")
return
}
decObj, err := object.DecodeObject(storage, encObj)
decObj, err := object.DecodeObject(c.Storage, encObj)
if err != nil {
log.Error().Str("obj", obj).Err(err).Msg("couldn't decode object")
return

View File

@ -21,23 +21,14 @@ var branchRegex = regexp.MustCompile(`(?m)branch ["'](.+)["']`)
var checkedRefs = make(map[string]bool)
var checkedRefsMutex sync.Mutex
func FindRefWorker(c *fasthttp.Client, baseUrl, baseDir string, jt *jobtracker.JobTracker) {
for {
select {
case path := <-jt.Queue:
findRefWork(c, baseUrl, baseDir, path, jt)
default:
if !jt.HasWork() {
return
}
jt.Nap()
}
}
type FindRefContext struct {
C *fasthttp.Client
BaseUrl string
BaseDir string
}
func findRefWork(c *fasthttp.Client, baseUrl, baseDir, path string, jt *jobtracker.JobTracker) {
jt.StartWork()
defer jt.EndWork()
func FindRefWorker(jt *jobtracker.JobTracker, path string, context jobtracker.Context) {
c := context.(FindRefContext)
checkRatelimted()
@ -51,7 +42,7 @@ func findRefWork(c *fasthttp.Client, baseUrl, baseDir, path string, jt *jobtrack
}
checkedRefsMutex.Unlock()
targetFile := utils.Url(baseDir, path)
targetFile := utils.Url(c.BaseDir, path)
if utils.Exists(targetFile) {
log.Info().Str("file", targetFile).Msg("already fetched, skipping redownload")
content, err := ioutil.ReadFile(targetFile)
@ -90,8 +81,8 @@ func findRefWork(c *fasthttp.Client, baseUrl, baseDir, path string, jt *jobtrack
return
}
uri := utils.Url(baseUrl, path)
code, body, err := c.Get(nil, uri)
uri := utils.Url(c.BaseUrl, path)
code, body, err := c.C.Get(nil, uri)
if err == nil && code != 200 {
if code == 429 {
setRatelimited()

View File

@ -12,36 +12,25 @@ import (
"github.com/valyala/fasthttp"
)
func RecursiveDownloadWorker(c *fasthttp.Client, baseUrl, baseDir string, jt *jobtracker.JobTracker) {
for {
select {
case f, ok := <-jt.Queue:
if ok {
recursiveDownload(c, baseUrl, baseDir, f, jt)
}
default:
if !jt.HasWork() {
return
}
jt.Nap()
}
}
type RecursiveDownloadContext struct {
C *fasthttp.Client
BaseUrl string
BaseDir string
}
func recursiveDownload(c *fasthttp.Client, baseUrl, baseDir, f string, jt *jobtracker.JobTracker) {
jt.StartWork()
defer jt.EndWork()
func RecursiveDownloadWorker(jt *jobtracker.JobTracker, f string, context jobtracker.Context) {
c := context.(RecursiveDownloadContext)
checkRatelimted()
filePath := utils.Url(baseDir, f)
filePath := utils.Url(c.BaseDir, f)
isDir := strings.HasSuffix(f, "/")
if !isDir && utils.Exists(filePath) {
log.Info().Str("file", filePath).Msg("already fetched, skipping redownload")
return
}
uri := utils.Url(baseUrl, f)
code, body, err := c.Get(nil, uri)
uri := utils.Url(c.BaseUrl, f)
code, body, err := c.C.Get(nil, uri)
if err == nil && code != 200 {
if code == 429 {
setRatelimited()

View File

@ -172,15 +172,9 @@ func FetchGit(baseUrl, baseDir string) error {
}
if utils.StringsContain(indexedFiles, "HEAD") {
log.Info().Str("base", baseUrl).Msg("fetching .git/ recursively")
jt := jobtracker.NewJobTracker()
for _, f := range indexedFiles {
// TODO: add support for non top level git repos
jt.AddJob(utils.Url(".git", f))
}
for w := 1; w <= maxConcurrency; w++ {
go workers.RecursiveDownloadWorker(c, baseUrl, baseDir, jt)
}
jt.StartAndWait()
jt := jobtracker.NewJobTracker(workers.RecursiveDownloadWorker, maxConcurrency, jobtracker.DefaultNapper)
jt.AddJobs(indexedFiles...)
jt.StartAndWait(&workers.RecursiveDownloadContext{C: c, BaseUrl: baseUrl, BaseDir: baseDir})
if err := checkout(baseDir); err != nil {
log.Error().Str("dir", baseDir).Err(err).Msg("failed to checkout")
@ -192,25 +186,14 @@ func FetchGit(baseUrl, baseDir string) error {
}
log.Info().Str("base", baseUrl).Msg("fetching common files")
jt := jobtracker.NewJobTracker()
for _, f := range commonFiles {
jt.AddJob(f)
}
concurrency := utils.MinInt(maxConcurrency, len(commonFiles))
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, false, false)
}
jt.StartAndWait()
jt := jobtracker.NewJobTracker(workers.DownloadWorker, maxConcurrency, jobtracker.DefaultNapper)
jt.AddJobs(commonFiles...)
jt.StartAndWait(workers.DownloadContext{C: c, BaseDir: baseDir, BaseUrl: baseUrl})
jt = jobtracker.NewJobTracker()
log.Info().Str("base", baseUrl).Msg("finding refs")
for _, ref := range commonRefs {
jt.AddJob(ref)
}
for w := 1; w <= maxConcurrency; w++ {
go workers.FindRefWorker(c, baseUrl, baseDir, jt)
}
jt.StartAndWait()
jt = jobtracker.NewJobTracker(workers.FindRefWorker, maxConcurrency, jobtracker.DefaultNapper)
jt.AddJobs(commonRefs...)
jt.StartAndWait(workers.FindRefContext{C: c, BaseUrl: baseUrl, BaseDir: baseDir})
log.Info().Str("base", baseUrl).Msg("finding packs")
infoPacksPath := utils.Url(baseDir, ".git/objects/info/packs")
@ -220,16 +203,12 @@ func FetchGit(baseUrl, baseDir string) error {
return err
}
hashes := packRegex.FindAllSubmatch(infoPacks, -1)
jt = jobtracker.NewJobTracker()
jt = jobtracker.NewJobTracker(workers.DownloadWorker, maxConcurrency, jobtracker.DefaultNapper)
for _, sha1 := range hashes {
jt.AddJob(fmt.Sprintf(".git/objects/pack/pack-%s.idx", sha1[1]))
jt.AddJob(fmt.Sprintf(".git/objects/pack/pack-%s.pack", sha1[1]))
}
concurrency := utils.MinInt(maxConcurrency, int(jt.QueuedJobs()))
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, false, false)
}
jt.StartAndWait()
jt.StartAndWait(workers.DownloadContext{C: c, BaseUrl: baseUrl, BaseDir: baseDir})
}
log.Info().Str("base", baseUrl).Msg("finding objects")
@ -357,15 +336,12 @@ func FetchGit(baseUrl, baseDir string) error {
storage.IterEncodedObjects()
}*/
jt = jobtracker.NewJobTracker()
log.Info().Str("base", baseUrl).Msg("fetching object")
jt = jobtracker.NewJobTracker(workers.FindObjectsWorker, maxConcurrency, jobtracker.DefaultNapper)
for obj := range objs {
jt.AddJob(obj)
}
for w := 1; w <= maxConcurrency; w++ {
go workers.FindObjectsWorker(c, baseUrl, baseDir, jt, objStorage)
}
jt.StartAndWait()
jt.StartAndWait(workers.FindObjectsContext{C: c, BaseUrl: baseUrl, BaseDir: baseDir, Storage: objStorage})
// exit early if we haven't managed to dump anything
if !utils.Exists(baseDir) {
@ -484,15 +460,11 @@ func fetchLfs(baseDir, baseUrl string) {
// TODO: global filters
jt := jobtracker.NewJobTracker()
jt := jobtracker.NewJobTracker(workers.DownloadWorker, maxConcurrency, jobtracker.DefaultNapper)
for _, hash := range hashes {
jt.AddJob(fmt.Sprintf(".git/lfs/objects/%s/%s/%s", hash[:2], hash[2:4], hash))
}
concurrency := utils.MinInt(maxConcurrency, int(jt.QueuedJobs()))
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, false, false)
}
jt.StartAndWait()
jt.StartAndWait(workers.DownloadContext{C: c, BaseUrl: baseUrl, BaseDir: baseDir})
}
}
@ -515,30 +487,22 @@ func fetchMissing(baseDir, baseUrl string, objStorage *filesystem.ObjectStorage)
log.Error().Str("dir", baseDir).Err(err).Msg("couldn't decode git index")
return
} else {
jt := jobtracker.NewJobTracker()
jt := jobtracker.NewJobTracker(workers.DownloadWorker, maxConcurrency, jobtracker.DefaultNapper)
for _, entry := range idx.Entries {
if !strings.HasSuffix(entry.Name, ".php") && !utils.Exists(utils.Url(baseDir, fmt.Sprintf(".git/objects/%s/%s", entry.Hash.String()[:2], entry.Hash.String()[2:]))) {
missingFiles = append(missingFiles, entry.Name)
jt.AddJob(entry.Name)
}
}
concurrency := utils.MinInt(maxConcurrency, int(jt.QueuedJobs()))
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true)
}
jt.StartAndWait()
jt.StartAndWait(workers.DownloadContext{C: c, BaseUrl: baseUrl, BaseDir: baseDir, AllowHtml: true, AlllowEmpty: true})
jt = jobtracker.NewJobTracker()
jt = jobtracker.NewJobTracker(workers.CreateObjectWorker, maxConcurrency, jobtracker.DefaultNapper)
for _, f := range missingFiles {
if utils.Exists(utils.Url(baseDir, f)) {
jt.AddJob(f)
}
}
concurrency = utils.MinInt(maxConcurrency, int(jt.QueuedJobs()))
for w := 1; w <= concurrency; w++ {
go workers.CreateObjectWorker(baseDir, jt, objStorage, &idx)
}
jt.StartAndWait()
jt.StartAndWait(workers.CreateObjectContext{BaseDir: baseDir, Storage: objStorage, Index: &idx})
}
}
}
@ -554,7 +518,7 @@ func fetchIgnored(baseDir, baseUrl string) error {
}
defer ignoreFile.Close()
jt := jobtracker.NewJobTracker()
jt := jobtracker.NewJobTracker(workers.DownloadWorker, maxConcurrency, jobtracker.DefaultNapper)
scanner := bufio.NewScanner(ignoreFile)
for scanner.Scan() {
@ -571,11 +535,7 @@ func fetchIgnored(baseDir, baseUrl string) error {
return err
}
concurrency := utils.MinInt(maxConcurrency, int(jt.QueuedJobs()))
for w := 1; w <= concurrency; w++ {
go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true)
}
jt.StartAndWait()
jt.StartAndWait(workers.DownloadContext{C: c, BaseUrl: baseUrl, BaseDir: baseDir, AllowHtml: true, AlllowEmpty: true})
}
return nil
}