diff --git a/cmd/goop.go b/cmd/goop.go index 8f2019e..445aeda 100644 --- a/cmd/goop.go +++ b/cmd/goop.go @@ -1,10 +1,11 @@ package cmd import ( - "fmt" - "github.com/deletescape/goop/pkg/goop" - "github.com/spf13/cobra" "os" + + "github.com/deletescape/goop/pkg/goop" + "github.com/phuslu/log" + "github.com/spf13/cobra" ) var force bool @@ -21,12 +22,12 @@ var rootCmd = &cobra.Command{ } if list { if err := goop.CloneList(args[0], dir, force, keep); err != nil { - fmt.Fprintln(os.Stderr, err) + log.Error().Err(err).Msg("exiting") os.Exit(1) } } else { if err := goop.Clone(args[0], dir, force, keep); err != nil { - fmt.Fprintln(os.Stderr, err) + log.Error().Err(err).Msg("exiting") os.Exit(1) } } @@ -41,7 +42,7 @@ func init() { func Execute() { if err := rootCmd.Execute(); err != nil { - fmt.Fprintln(os.Stderr, err) + log.Error().Err(err).Msg("exiting") os.Exit(1) } } diff --git a/go.mod b/go.mod index d0ca31b..4c09afe 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/PuerkitoBio/goquery v1.6.0 github.com/go-git/go-billy/v5 v5.0.0 github.com/go-git/go-git/v5 v5.2.0 + github.com/phuslu/log v1.0.75 github.com/spf13/cobra v1.1.1 github.com/valyala/fasthttp v1.16.0 ) diff --git a/go.sum b/go.sum index 7022e73..1ab499d 100644 --- a/go.sum +++ b/go.sum @@ -164,6 +164,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/phuslu/log v1.0.75 h1:2Qcqgwo1sOsvj7QIuclIS92hmWxIISI2+XskYM1Nw2A= +github.com/phuslu/log v1.0.75/go.mod h1:kzJN3LRifrepxThMjufQwS7S35yFAB+jAV1qgA7eBW4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/internal/jobtracker/jobtracker.go b/internal/jobtracker/jobtracker.go new file mode 100644 index 0000000..55d2277 --- /dev/null +++ b/internal/jobtracker/jobtracker.go @@ -0,0 +1,64 @@ +package jobtracker + +import ( + "sync" + "sync/atomic" + "time" +) + +type JobTracker struct { + activeWorkers int32 + queuedJobs int32 + didWork bool + cond *sync.Cond + Queue chan string +} + +func Nap() { + time.Sleep(40 * time.Millisecond) +} + +func NewJobTracker() *JobTracker { + return &JobTracker{ + cond: sync.NewCond(&sync.Mutex{}), + Queue: make(chan string, 999999), // TODO: dont create oversized queues, we should try to save memory; maybe read the channel docs again + } +} + +func (jt *JobTracker) AddJob(job string) { + // TODO: can we discard empty jobs here? + jt.cond.L.Lock() + atomic.AddInt32(&jt.queuedJobs, 1) + jt.Queue <- job + jt.cond.L.Unlock() +} + +func (jt *JobTracker) StartWork() { + atomic.AddInt32(&jt.activeWorkers, 1) +} + +func (jt *JobTracker) EndWork() { + jt.didWork = true + atomic.AddInt32(&jt.activeWorkers, -1) + atomic.AddInt32(&jt.queuedJobs, -1) +} + +func (jt *JobTracker) HasWork() bool { + // TODO: didWork is a somewhat ugly workaround to ensure we dont exit before doing work at least once, + // this will however result in locking up if we create a JobTracker but never queue any jobs + hasWork := !jt.didWork || (atomic.LoadInt32(&jt.queuedJobs) > 0 && atomic.LoadInt32(&jt.activeWorkers) > 0) + + if !hasWork { + jt.cond.Broadcast() + } + return hasWork +} + +func (jt *JobTracker) Wait() { + defer close(jt.Queue) + + jt.cond.L.Lock() + for jt.HasWork() { + jt.cond.Wait() + } +} diff --git a/internal/utils/html.go b/internal/utils/html.go index 0747a2d..746e390 100644 --- a/internal/utils/html.go +++ b/internal/utils/html.go @@ -2,9 +2,10 @@ package utils import ( "bytes" - "github.com/PuerkitoBio/goquery" "net/url" "strings" + + "github.com/PuerkitoBio/goquery" ) var htmlTag = []byte{'<', 'h', 't', 'm', 'l'} @@ -36,5 +37,5 @@ func GetIndexedFiles(body []byte) ([]string, error) { } return true }) - return files, err + return files, exitErr } diff --git a/internal/utils/urls.go b/internal/utils/urls.go index 97b8144..c8c7db8 100644 --- a/internal/utils/urls.go +++ b/internal/utils/urls.go @@ -2,6 +2,7 @@ package utils import "strings" +//TODO: replace all uses of this with the proper path utils func Url(base, path string) string { return strings.TrimSuffix(base, "/") + "/" + strings.TrimPrefix(path, "/") } diff --git a/internal/workers/consts.go b/internal/workers/consts.go deleted file mode 100644 index 91b11e8..0000000 --- a/internal/workers/consts.go +++ /dev/null @@ -1,8 +0,0 @@ -package workers - -import "time" - -const ( - gracePeriod = 350 * time.Millisecond - graceTimes = 15 -) diff --git a/internal/workers/download.go b/internal/workers/download.go index baf5c42..6812c50 100644 --- a/internal/workers/download.go +++ b/internal/workers/download.go @@ -1,65 +1,73 @@ package workers import ( - "fmt" - "github.com/deletescape/goop/internal/utils" - "github.com/valyala/fasthttp" "io/ioutil" "os" - "sync" - "time" + + "github.com/deletescape/goop/internal/jobtracker" + "github.com/deletescape/goop/internal/utils" + "github.com/phuslu/log" + "github.com/valyala/fasthttp" ) -func DownloadWorker(c *fasthttp.Client, queue chan string, baseUrl, baseDir string, wg *sync.WaitGroup, allowHtml bool) { - defer wg.Done() - var ctr int +func DownloadWorker(c *fasthttp.Client, baseUrl, baseDir string, jt *jobtracker.JobTracker, allowHtml, allowEmpty bool) { for { select { - case file := <-queue: - - checkRatelimted() - if file == "" { - continue - } - targetFile := utils.Url(baseDir, file) - if utils.Exists(targetFile) { - fmt.Printf("%s was downloaded already, skipping\n", targetFile) - continue - } - uri := utils.Url(baseUrl, file) - code, body, err := c.Get(nil, uri) - fmt.Printf("[-] Fetching %s [%d]\n", uri, code) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - if code == 200 { - if !allowHtml && utils.IsHtml(body) { - fmt.Printf("warning: %s appears to be an html file, skipping\n", uri) - continue - } - if utils.IsEmptyBytes(body) { - fmt.Printf("warning: %s appears to be an empty file, skipping\n", uri) - continue - } - if err := utils.CreateParentFolders(targetFile); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - if err := ioutil.WriteFile(targetFile, body, os.ModePerm); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - } - } else if code == 429 { - setRatelimited() - queue <- file - } + case file := <-jt.Queue: + downloadWork(c, baseUrl, baseDir, file, jt, allowHtml, allowEmpty) default: - // TODO: get rid of dirty hack somehow - if ctr >= graceTimes { + if !jt.HasWork() { return } - ctr++ - time.Sleep(gracePeriod) + jobtracker.Nap() } } } + +func downloadWork(c *fasthttp.Client, baseUrl, baseDir, file string, jt *jobtracker.JobTracker, allowHtml, allowEmpty bool) { + jt.StartWork() + defer jt.EndWork() + + if file == "" { + return + } + checkRatelimted() + + targetFile := utils.Url(baseDir, file) + if utils.Exists(targetFile) { + log.Info().Str("file", targetFile).Msg("already fetched, skipping redownload") + return + } + uri := utils.Url(baseUrl, file) + code, body, err := c.Get(nil, uri) + if err == nil && code != 200 { + if code == 429 { + setRatelimited() + jt.AddJob(file) + return + } + log.Warn().Str("uri", uri).Int("code", code).Msg("couldn't fetch file") + return + } else if err != nil { + log.Error().Str("uri", uri).Int("code", code).Err(err).Msg("couldn't fetch file") + return + } + + if !allowHtml && utils.IsHtml(body) { + log.Warn().Str("uri", uri).Msg("file appears to be html, skipping") + return + } + if !allowEmpty && utils.IsEmptyBytes(body) { + log.Warn().Str("uri", uri).Msg("file appears to be empty, skipping") + return + } + if err := utils.CreateParentFolders(targetFile); err != nil { + log.Error().Str("uri", uri).Str("file", targetFile).Err(err).Msg("couldn't create parent directories") + return + } + if err := ioutil.WriteFile(targetFile, body, os.ModePerm); err != nil { + log.Error().Str("uri", uri).Str("file", targetFile).Err(err).Msg("clouldn't write file") + return + } + log.Info().Str("uri", uri).Str("file", file).Msg("fetched file") +} diff --git a/internal/workers/findobjects.go b/internal/workers/findobjects.go index 804d9d6..7f09263 100644 --- a/internal/workers/findobjects.go +++ b/internal/workers/findobjects.go @@ -2,110 +2,123 @@ package workers import ( "fmt" + "io/ioutil" + "os" + "sync" + + "github.com/deletescape/goop/internal/jobtracker" "github.com/deletescape/goop/internal/utils" "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/plumbing/object" "github.com/go-git/go-git/v5/storage/filesystem" + "github.com/phuslu/log" "github.com/valyala/fasthttp" - "io/ioutil" - "os" - "sync" - "time" ) var checkedObjs = make(map[string]bool) var checkedObjsMutex sync.Mutex -func FindObjectsWorker(c *fasthttp.Client, queue chan string, baseUrl, baseDir string, wg *sync.WaitGroup, storage *filesystem.ObjectStorage) { - defer wg.Done() - var ctr int +func FindObjectsWorker(c *fasthttp.Client, baseUrl, baseDir string, jt *jobtracker.JobTracker, storage *filesystem.ObjectStorage) { for { select { - case obj := <-queue: - checkRatelimted() - if obj == "" { - continue - } - ctr = 0 - checkedObjsMutex.Lock() - if checked, ok := checkedObjs[obj]; checked && ok { - // Obj has already been checked - checkedObjsMutex.Unlock() - continue - } else { - checkedObjs[obj] = true - } - checkedObjsMutex.Unlock() - file := fmt.Sprintf(".git/objects/%s/%s", obj[:2], obj[2:]) - fullPath := utils.Url(baseDir, file) - if utils.Exists(fullPath) { - fmt.Printf("%s was downloaded already, skipping\n", fullPath) - encObj, err := storage.EncodedObject(plumbing.AnyObject, plumbing.NewHash(obj)) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - decObj, err := object.DecodeObject(storage, encObj) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - referencedHashes := utils.GetReferencedHashes(decObj) - for _, h := range referencedHashes { - queue <- h - } - continue - } - uri := utils.Url(baseUrl, file) - code, body, err := c.Get(nil, uri) - fmt.Printf("[-] Fetching %s [%d]\n", uri, code) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - if code == 200 { - if utils.IsHtml(body) { - fmt.Printf("warning: %s appears to be an html file, skipping\n", uri) - continue - } - if utils.IsEmptyBytes(body) { - fmt.Printf("warning: %s appears to be an empty file, skipping\n", uri) - continue - } - if err := utils.CreateParentFolders(fullPath); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - if err := ioutil.WriteFile(fullPath, body, os.ModePerm); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - - encObj, err := storage.EncodedObject(plumbing.AnyObject, plumbing.NewHash(obj)) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - decObj, err := object.DecodeObject(storage, encObj) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - referencedHashes := utils.GetReferencedHashes(decObj) - for _, h := range referencedHashes { - queue <- h - } - } else if code == 429 { - setRatelimited() - queue <- obj - } + case obj := <-jt.Queue: + findObjWork(c, baseUrl, baseDir, obj, jt, storage) default: - // TODO: get rid of dirty hack somehow - if ctr >= graceTimes { + if !jt.HasWork() { return } - ctr++ - time.Sleep(gracePeriod) + jobtracker.Nap() } } } + +func findObjWork(c *fasthttp.Client, baseUrl, baseDir, obj string, jt *jobtracker.JobTracker, storage *filesystem.ObjectStorage) { + jt.StartWork() + defer jt.EndWork() + + if obj == "" { + return + } + + checkRatelimted() + + checkedObjsMutex.Lock() + if checked, ok := checkedObjs[obj]; checked && ok { + // Obj has already been checked + checkedObjsMutex.Unlock() + return + } else { + checkedObjs[obj] = true + } + checkedObjsMutex.Unlock() + + file := fmt.Sprintf(".git/objects/%s/%s", obj[:2], obj[2:]) + fullPath := utils.Url(baseDir, file) + if utils.Exists(fullPath) { + log.Info().Str("obj", obj).Msg("already fetched, skipping redownload") + encObj, err := storage.EncodedObject(plumbing.AnyObject, plumbing.NewHash(obj)) + if err != nil { + log.Error().Str("obj", obj).Err(err).Msg("couldn't read object") + return + } + decObj, err := object.DecodeObject(storage, encObj) + if err != nil { + log.Error().Str("obj", obj).Err(err).Msg("couldn't decode object") + return + } + referencedHashes := utils.GetReferencedHashes(decObj) + for _, h := range referencedHashes { + jt.AddJob(h) + } + return + } + + uri := utils.Url(baseUrl, file) + code, body, err := c.Get(nil, uri) + if err == nil && code != 200 { + if code == 429 { + setRatelimited() + jt.AddJob(obj) + return + } + log.Warn().Str("obj", obj).Int("code", code).Msg("failed to fetch object") + return + } else if err != nil { + log.Error().Str("obj", obj).Int("code", code).Err(err).Msg("failed to fetch object") + return + } + + if utils.IsHtml(body) { + log.Warn().Str("uri", uri).Msg("file appears to be html, skipping") + return + } + if utils.IsEmptyBytes(body) { + log.Warn().Str("uri", uri).Msg("file appears to be empty, skipping") + return + } + if err := utils.CreateParentFolders(fullPath); err != nil { + log.Error().Str("uri", uri).Str("file", fullPath).Err(err).Msg("couldn't create parent directories") + return + } + if err := ioutil.WriteFile(fullPath, body, os.ModePerm); err != nil { + log.Error().Str("uri", uri).Str("file", fullPath).Err(err).Msg("clouldn't write file") + return + } + + log.Info().Str("obj", obj).Msg("fetched object") + + encObj, err := storage.EncodedObject(plumbing.AnyObject, plumbing.NewHash(obj)) + if err != nil { + log.Error().Str("obj", obj).Err(err).Msg("couldn't read object") + return + } + decObj, err := object.DecodeObject(storage, encObj) + if err != nil { + log.Error().Str("obj", obj).Err(err).Msg("couldn't decode object") + return + } + referencedHashes := utils.GetReferencedHashes(decObj) + for _, h := range referencedHashes { + jt.AddJob(h) + } +} diff --git a/internal/workers/findref.go b/internal/workers/findref.go index 13ed3d8..17ef345 100644 --- a/internal/workers/findref.go +++ b/internal/workers/findref.go @@ -1,14 +1,15 @@ package workers import ( - "fmt" - "github.com/deletescape/goop/internal/utils" - "github.com/valyala/fasthttp" "io/ioutil" "os" "regexp" "sync" - "time" + + "github.com/deletescape/goop/internal/jobtracker" + "github.com/deletescape/goop/internal/utils" + "github.com/phuslu/log" + "github.com/valyala/fasthttp" ) var refRegex = regexp.MustCompile(`(?m)(refs(/[a-zA-Z0-9\-\.\_\*]+)+)`) @@ -17,92 +18,102 @@ var branchRegex = regexp.MustCompile(`(?m)branch ["'](.+)["']`) var checkedRefs = make(map[string]bool) var checkedRefsMutex sync.Mutex -func FindRefWorker(c *fasthttp.Client, queue chan string, baseUrl, baseDir string, wg *sync.WaitGroup) { - defer wg.Done() - var ctr int +func FindRefWorker(c *fasthttp.Client, baseUrl, baseDir string, jt *jobtracker.JobTracker) { for { select { - case path := <-queue: - checkRatelimted() - if path == "" { - continue - } - ctr = 0 - checkedRefsMutex.Lock() - if checked, ok := checkedRefs[path]; checked && ok { - // Ref has already been checked - checkedRefsMutex.Unlock() - continue - } else { - checkedRefs[path] = true - } - checkedRefsMutex.Unlock() - targetFile := utils.Url(baseDir, path) - if utils.Exists(targetFile) { - fmt.Printf("%s was downloaded already, skipping\n", targetFile) - content, err := ioutil.ReadFile(targetFile) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - } - for _, ref := range refRegex.FindAll(content, -1) { - queue <- utils.Url(".git", string(ref)) - queue <- utils.Url(".git/logs", string(ref)) - } - if path == ".git/config" || path == ".git/FETCH_HEAD" { - // TODO check the actual origin instead of just assuming origin here - for _, branch := range branchRegex.FindAllSubmatch(content, -1) { - queue <- utils.Url(".git/refs/remotes/origin", string(branch[1])) - queue <- utils.Url(".git/logs/refs/remotes/origin", string(branch[1])) - } - } - continue - } - uri := utils.Url(baseUrl, path) - code, body, err := c.Get(nil, uri) - fmt.Printf("[-] Fetching %s [%d]\n", uri, code) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - if code == 200 { - if utils.IsHtml(body) { - fmt.Printf("warning: %s appears to be an html file, skipping\n", uri) - continue - } - if utils.IsEmptyBytes(body) { - fmt.Printf("warning: %s appears to be an empty file, skipping\n", uri) - continue - } - if err := utils.CreateParentFolders(targetFile); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - if err := ioutil.WriteFile(targetFile, body, os.ModePerm); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - - for _, ref := range refRegex.FindAll(body, -1) { - queue <- utils.Url(".git", string(ref)) - queue <- utils.Url(".git/logs", string(ref)) - } - if path == ".git/config" || path == ".git/FETCH_HEAD" { - // TODO check the actual origin instead of just assuming origin here - for _, branch := range branchRegex.FindAllSubmatch(body, -1) { - queue <- utils.Url(".git/refs/remotes/origin", string(branch[1])) - } - } - } else if code == 429 { - setRatelimited() - queue <- path - } + case path := <-jt.Queue: + findRefWork(c, baseUrl, baseDir, path, jt) default: - // TODO: get rid of dirty hack somehow - if ctr >= graceTimes { + if !jt.HasWork() { return } - ctr++ - time.Sleep(gracePeriod) + jobtracker.Nap() + } + } +} + +func findRefWork(c *fasthttp.Client, baseUrl, baseDir, path string, jt *jobtracker.JobTracker) { + jt.StartWork() + defer jt.EndWork() + + // TODO: do we still need this check here? + if path == "" { + return + } + + checkedRefsMutex.Lock() + if checked, ok := checkedRefs[path]; checked && ok { + // Ref has already been checked + checkedRefsMutex.Unlock() + return + } else { + checkedRefs[path] = true + } + + targetFile := utils.Url(baseDir, path) + if utils.Exists(targetFile) { + log.Info().Str("file", targetFile).Msg("already fetched, skipping redownload") + content, err := ioutil.ReadFile(targetFile) + if err != nil { + log.Error().Str("file", targetFile).Err(err).Msg("error while reading file") + } + for _, ref := range refRegex.FindAll(content, -1) { + jt.AddJob(utils.Url(".git", string(ref))) + jt.AddJob(utils.Url(".git/logs", string(ref))) + } + if path == ".git/config" || path == ".git/FETCH_HEAD" { + // TODO check the actual origin instead of just assuming origin here + for _, branch := range branchRegex.FindAllSubmatch(content, -1) { + jt.AddJob(utils.Url(".git/refs/remotes/origin", string(branch[1]))) + jt.AddJob(utils.Url(".git/logs/refs/remotes/origin", string(branch[1]))) + } + } + return + } + + uri := utils.Url(baseUrl, path) + code, body, err := c.Get(nil, uri) + if err == nil && code != 200 { + if code == 429 { + setRatelimited() + jt.AddJob(path) + return + } + log.Warn().Str("uri", uri).Int("code", code).Msg("failed to fetch ref") + return + } else if err != nil { + log.Error().Str("uri", uri).Int("code", code).Err(err).Msg("failed to fetch ref") + return + } + + if utils.IsHtml(body) { + log.Warn().Str("uri", uri).Msg("file appears to be html, skipping") + return + } + if utils.IsEmptyBytes(body) { + log.Warn().Str("uri", uri).Msg("file appears to be empty, skipping") + return + } + if err := utils.CreateParentFolders(targetFile); err != nil { + log.Error().Str("uri", uri).Str("file", targetFile).Err(err).Msg("couldn't create parent directories") + return + } + if err := ioutil.WriteFile(targetFile, body, os.ModePerm); err != nil { + log.Error().Str("uri", uri).Str("file", targetFile).Err(err).Msg("clouldn't write file") + return + } + + log.Info().Str("uri", uri).Msg("fetched ref") + + for _, ref := range refRegex.FindAll(body, -1) { + jt.AddJob(utils.Url(".git", string(ref))) + jt.AddJob(utils.Url(".git/logs", string(ref))) + } + if path == ".git/config" || path == ".git/FETCH_HEAD" { + // TODO check the actual origin instead of just assuming origin here + for _, branch := range branchRegex.FindAllSubmatch(body, -1) { + jt.AddJob(utils.Url(".git/refs/remotes/origin", string(branch[1]))) + jt.AddJob(utils.Url(".git/logs/refs/remotes/origin", string(branch[1]))) } } } diff --git a/internal/workers/ratelimit.go b/internal/workers/ratelimit.go index 159dc8e..109bc70 100644 --- a/internal/workers/ratelimit.go +++ b/internal/workers/ratelimit.go @@ -1,9 +1,10 @@ package workers import ( - "fmt" "sync/atomic" "time" + + "github.com/phuslu/log" ) var rateLimited int32 @@ -13,7 +14,7 @@ var unsetter int32 func setRatelimited() { if atomic.CompareAndSwapInt32(&rateLimited, 0, 1) { atomic.StoreUint32(&ratelimitCount, atomic.LoadUint32(&ratelimitCount)+1) - fmt.Println("[-] Server is rate limiting us, starting to wait") + log.Warn().Uint32("count", atomic.LoadUint32(&ratelimitCount)).Msg("server is rate limiting us, waiting...") } } diff --git a/internal/workers/recursivedownload.go b/internal/workers/recursivedownload.go index 93c6483..131897a 100644 --- a/internal/workers/recursivedownload.go +++ b/internal/workers/recursivedownload.go @@ -1,74 +1,87 @@ package workers import ( - "fmt" - "github.com/deletescape/goop/internal/utils" - "github.com/valyala/fasthttp" "io/ioutil" "os" "strings" - "sync" - "time" + + "github.com/deletescape/goop/internal/jobtracker" + "github.com/deletescape/goop/internal/utils" + "github.com/phuslu/log" + "github.com/valyala/fasthttp" ) -func RecursiveDownloadWorker(c *fasthttp.Client, queue chan string, baseUrl, baseDir string, wg *sync.WaitGroup) { - defer wg.Done() - var ctr int +func RecursiveDownloadWorker(c *fasthttp.Client, baseUrl, baseDir string, jt *jobtracker.JobTracker) { for { select { - case f := <-queue: - checkRatelimted() - if f == "" { - continue - } - ctr = 0 - filePath := utils.Url(baseDir, f) - isDir := strings.HasSuffix(f, "/") - if !isDir && utils.Exists(filePath) { - fmt.Printf("%s was downloaded already, skipping\n", filePath) - continue - } - uri := utils.Url(baseUrl, f) - code, body, err := c.Get(nil, uri) - fmt.Printf("[-] Fetching %s [%d]\n", uri, code) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - if code == 429 { - setRatelimited() - queue <- f - continue - } - if isDir { - if !utils.IsHtml(body) { - fmt.Printf("warning: %s doesn't appear to be an index", uri) - continue - } - indexedFiles, err := utils.GetIndexedFiles(body) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - for _, idxf := range indexedFiles { - queue <- utils.Url(f, idxf) - } - } else { - if err := utils.CreateParentFolders(filePath); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - continue - } - if err := ioutil.WriteFile(filePath, body, os.ModePerm); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - } + case f, ok := <-jt.Queue: + if ok { + recursiveDownload(c, baseUrl, baseDir, f, jt) } default: - // TODO: get rid of dirty hack somehow - if ctr >= graceTimes { + if !jt.HasWork() { return } - ctr++ - time.Sleep(gracePeriod) + jobtracker.Nap() } } } + +func recursiveDownload(c *fasthttp.Client, baseUrl, baseDir, f string, jt *jobtracker.JobTracker) { + jt.StartWork() + defer jt.EndWork() + + if f == "" { + return + } + + checkRatelimted() + + filePath := utils.Url(baseDir, f) + isDir := strings.HasSuffix(f, "/") + if !isDir && utils.Exists(filePath) { + log.Info().Str("file", filePath).Msg("already fetched, skipping redownload") + return + } + uri := utils.Url(baseUrl, f) + code, body, err := c.Get(nil, uri) + if err == nil && code != 200 { + if code == 429 { + setRatelimited() + jt.AddJob(f) + return + } + log.Warn().Str("uri", uri).Int("code", code).Msg("failed to fetch file") + return + } else if err != nil { + log.Error().Str("uri", uri).Int("code", code).Err(err).Msg("failed to fetch file") + return + } + + if isDir { + if !utils.IsHtml(body) { + log.Warn().Str("uri", uri).Msg("not a directory index, skipping") + return + } + + indexedFiles, err := utils.GetIndexedFiles(body) + if err != nil { + log.Error().Str("uri", uri).Err(err).Msg("couldn't get list of indexed files") + return + } + log.Info().Str("uri", uri).Msg("fetched directory listing") + for _, idxf := range indexedFiles { + jt.AddJob(utils.Url(f, idxf)) + } + } else { + if err := utils.CreateParentFolders(filePath); err != nil { + log.Error().Str("file", filePath).Err(err).Msg("couldn't create parent directories") + return + } + if err := ioutil.WriteFile(filePath, body, os.ModePerm); err != nil { + log.Error().Str("file", filePath).Err(err).Msg("couldn't write to file") + return + } + log.Info().Str("uri", uri).Msg("fetched file") + } +} diff --git a/main.go b/main.go index 4320403..ce35acb 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,23 @@ package main -import "github.com/deletescape/goop/cmd" +import ( + "os" -func main() { + "github.com/deletescape/goop/cmd" + "github.com/phuslu/log" +) + +func main() { + if log.IsTerminal(os.Stderr.Fd()) { + log.DefaultLogger = log.Logger{ + TimeFormat: "15:04:05", + Caller: 1, + Writer: &log.ConsoleWriter{ + ColorOutput: true, + QuoteString: true, + EndWithMessage: true, + }, + } + } cmd.Execute() } diff --git a/pkg/goop/clone.go b/pkg/goop/clone.go index e3dcc29..2b1467b 100644 --- a/pkg/goop/clone.go +++ b/pkg/goop/clone.go @@ -5,6 +5,15 @@ import ( "bytes" "crypto/tls" "fmt" + "io/ioutil" + "net/url" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/deletescape/goop/internal/jobtracker" "github.com/deletescape/goop/internal/utils" "github.com/deletescape/goop/internal/workers" "github.com/go-git/go-billy/v5/osfs" @@ -14,17 +23,11 @@ import ( "github.com/go-git/go-git/v5/plumbing/object" "github.com/go-git/go-git/v5/storage/filesystem" "github.com/go-git/go-git/v5/storage/filesystem/dotgit" + "github.com/phuslu/log" "github.com/valyala/fasthttp" - "io/ioutil" - "net/url" - "os" - "os/exec" - "path/filepath" - "strings" - "sync" - "time" ) +// TODO: support proxy environment variables var c = &fasthttp.Client{ Name: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36", MaxConnsPerHost: utils.MaxInt(maxConcurrency+250, fasthttp.DefaultMaxConnsPerHost), @@ -35,18 +38,6 @@ var c = &fasthttp.Client{ MaxConnWaitTimeout: 10 * time.Second, } -var wg sync.WaitGroup - -func createQueue(scale int) chan string { - wg = sync.WaitGroup{} - return make(chan string, maxConcurrency*scale) -} - -func waitForQueue(queue chan string) { - wg.Wait() - close(queue) -} - func CloneList(listFile, baseDir string, force, keep bool) error { lf, err := os.Open(listFile) if err != nil { @@ -64,17 +55,15 @@ func CloneList(listFile, baseDir string, force, keep bool) error { if dir != "" { parsed, err := url.Parse(u) if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) + log.Error().Str("uri", u).Err(err).Msg("couldn't parse uri") continue } dir = utils.Url(dir, parsed.Host) } - fmt.Printf("[-] Downloading %s to %s\n", u, dir) + log.Info().Str("target", u).Str("dir", dir).Bool("force", force).Bool("keep", keep).Msg("starting download") if err := Clone(u, dir, force, keep); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) + log.Error().Str("target", u).Str("dir", dir).Bool("force", force).Bool("keep", keep).Msg("download failed") } - fmt.Println() - fmt.Println() } return nil } @@ -100,55 +89,46 @@ func Clone(u, dir string, force, keep bool) error { baseDir = parsed.Host } - if !utils.Exists(baseDir) { - if err := os.MkdirAll(baseDir, os.ModePerm); err != nil { + if utils.Exists(baseDir) { + if !utils.IsFolder(baseDir) { + return fmt.Errorf("%s is not a directory", baseDir) + } + isEmpty, err := utils.IsEmpty(baseDir) + if err != nil { return err } - } - if !utils.IsFolder(baseDir) { - return fmt.Errorf("%s is not a directory", dir) - } - isEmpty, err := utils.IsEmpty(baseDir) - if err != nil { - return err - } - if !isEmpty { - if force || keep { - if !keep { + if !isEmpty { + if force { if err := os.RemoveAll(baseDir); err != nil { return err } - if err := os.MkdirAll(baseDir, os.ModePerm); err != nil { - return err - } + } else if !keep { + return fmt.Errorf("%s is not empty", baseDir) } - } else { - return fmt.Errorf("%s is not empty", baseDir) } } + return FetchGit(baseUrl, baseDir) } func FetchGit(baseUrl, baseDir string) error { - fmt.Printf("[-] Testing %s/.git/HEAD ", baseUrl) + log.Info().Str("base", baseUrl).Msg("testing for .git/HEAD") code, body, err := c.Get(nil, utils.Url(baseUrl, ".git/HEAD")) - fmt.Printf("[%d]\n", code) if err != nil { return err } if code != 200 { - fmt.Fprintf(os.Stderr, "error: %s/.git/HEAD does not exist\n", baseUrl) + log.Warn().Str("base", baseUrl).Int("code", code).Msg(".git/HEAD doesn't appear to exist, clone will most likely fail") } else if !bytes.HasPrefix(body, refPrefix) { - fmt.Fprintf(os.Stderr, "error: %s/.git/HEAD is not a git HEAD file\n", baseUrl) + log.Warn().Str("base", baseUrl).Int("code", code).Msg(".git/HEAD doesn't appear to be a git HEAD file, clone will most likely fail") } - fmt.Printf("[-] Testing %s/.git/ ", baseUrl) - code, body, err = c.Get(nil, utils.Url(baseUrl, ".git/")) - fmt.Printf("[%d]\n", code) + log.Info().Str("base", baseUrl).Msg("testing if recursive download is possible") + code, body, err = c.Get(body, utils.Url(baseUrl, ".git/")) if err != nil { if utils.IgnoreError(err) { - fmt.Fprintf(os.Stderr, "error: %s\n", err) + log.Error().Str("base", baseUrl).Int("code", code).Err(err) } else { return err } @@ -160,48 +140,45 @@ func FetchGit(baseUrl, baseDir string) error { return err } if utils.StringsContain(indexedFiles, "HEAD") { - fmt.Println("[-] Fetching .git recursively") - queue := createQueue(2000) - wg.Add(maxConcurrency) + log.Info().Str("base", baseUrl).Msg("fetching .git/ recursively") + jt := jobtracker.NewJobTracker() for w := 1; w <= maxConcurrency; w++ { - go workers.RecursiveDownloadWorker(c, queue, baseUrl, baseDir, &wg) + go workers.RecursiveDownloadWorker(c, baseUrl, baseDir, jt) } for _, f := range indexedFiles { // TODO: add support for non top level git repos - queue <- utils.Url(".git", f) + jt.AddJob(utils.Url(".git", f)) } - waitForQueue(queue) - fmt.Println("[-] Running git checkout .") + jt.Wait() + + log.Info().Str("dir", baseDir).Msg("running git checkout .") cmd := exec.Command("git", "checkout", ".") cmd.Dir = baseDir return cmd.Run() } } - fmt.Println("[-] Fetching common files") - queue := createQueue(len(commonFiles)) + log.Info().Str("base", baseUrl).Msg("fetching common files") + jt := jobtracker.NewJobTracker() concurrency := utils.MinInt(maxConcurrency, len(commonFiles)) - wg.Add(concurrency) for w := 1; w <= concurrency; w++ { - go workers.DownloadWorker(c, queue, baseUrl, baseDir, &wg, false) + go workers.DownloadWorker(c, baseUrl, baseDir, jt, false, false) } for _, f := range commonFiles { - queue <- f + jt.AddJob(f) } - waitForQueue(queue) + jt.Wait() - fmt.Println("[-] Finding refs") - queue = createQueue(100) - wg.Add(maxConcurrency) + log.Info().Str("base", baseUrl).Msg("finding refs") for w := 1; w <= maxConcurrency; w++ { - go workers.FindRefWorker(c, queue, baseUrl, baseDir, &wg) + go workers.FindRefWorker(c, baseUrl, baseDir, jt) } for _, ref := range commonRefs { - queue <- ref + jt.AddJob(ref) } - waitForQueue(queue) + jt.Wait() - fmt.Println("[-] Finding packs") + log.Info().Str("base", baseUrl).Msg("finding packs") infoPacksPath := utils.Url(baseDir, ".git/objects/info/packs") if utils.Exists(infoPacksPath) { infoPacks, err := ioutil.ReadFile(infoPacksPath) @@ -209,20 +186,19 @@ func FetchGit(baseUrl, baseDir string) error { return err } hashes := packRegex.FindAllSubmatch(infoPacks, -1) - queue = createQueue(len(hashes) * 3) + jt = jobtracker.NewJobTracker() concurrency := utils.MinInt(maxConcurrency, len(hashes)) - wg.Add(concurrency) for w := 1; w <= concurrency; w++ { - go workers.DownloadWorker(c, queue, baseUrl, baseDir, &wg, false) + go workers.DownloadWorker(c, baseUrl, baseDir, jt, false, false) } for _, sha1 := range hashes { - queue <- fmt.Sprintf(".git/objects/pack/pack-%s.idx", sha1[1]) - queue <- fmt.Sprintf(".git/objects/pack/pack-%s.pack", sha1[1]) + jt.AddJob(fmt.Sprintf(".git/objects/pack/pack-%s.idx", sha1[1])) + jt.AddJob(fmt.Sprintf(".git/objects/pack/pack-%s.pack", sha1[1])) } - waitForQueue(queue) + jt.Wait() } - fmt.Println("[-] Finding objects") + log.Info().Str("base", baseUrl).Msg("finding objects") objs := make(map[string]bool) // object "set" //var packed_objs [][]byte @@ -262,11 +238,11 @@ func FetchGit(baseUrl, baseDir string) error { refName := strings.TrimPrefix(path, refLogPrefix) filePath := utils.Url(gitRefsDir, refName) if !utils.Exists(filePath) { - fmt.Println("[-] Generating ref file for", refName) + log.Info().Str("dir", baseDir).Str("ref", refName).Msg("generating ref file") content, err := ioutil.ReadFile(path) if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) + log.Error().Str("dir", baseDir).Str("ref", refName).Err(err).Msg("couldn't read reflog file") return nil } @@ -275,12 +251,12 @@ func FetchGit(baseUrl, baseDir string) error { lastEntryObj := logObjs[len(logObjs)-1][1] if err := utils.CreateParentFolders(filePath); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) + log.Error().Str("file", filePath).Err(err).Msg("couldn't create parent directories") return nil } if err := ioutil.WriteFile(filePath, lastEntryObj, os.ModePerm); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) + log.Error().Str("file", filePath).Err(err).Msg("couldn't write to file") } } } @@ -297,7 +273,7 @@ func FetchGit(baseUrl, baseDir string) error { content, err := ioutil.ReadFile(f) if err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) + log.Error().Str("file", f).Err(err).Msg("couldn't read reflog file") return err } @@ -316,8 +292,7 @@ func FetchGit(baseUrl, baseDir string) error { var idx index.Index decoder := index.NewDecoder(f) if err := decoder.Decode(&idx); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) - //return err + log.Error().Str("dir", baseDir).Err(err).Msg("couldn't decode git index") } for _, entry := range idx.Entries { objs[entry.Hash.String()] = true @@ -329,63 +304,66 @@ func FetchGit(baseUrl, baseDir string) error { objs[hash.String()] = true encObj, err := storage.EncodedObject(plumbing.AnyObject, hash) if err != nil { - return fmt.Errorf("error: %s\n", err) + return err } decObj, err := object.DecodeObject(storage, encObj) if err != nil { - return fmt.Errorf("error: %s\n", err) + return err } for _, hash := range utils.GetReferencedHashes(decObj) { objs[hash] = true } return nil }); err != nil { - fmt.Fprintf(os.Stderr, "error: %s\n", err) + log.Error().Str("dir", baseDir).Err(err).Msg("error while processing object files") } // TODO: find more objects to fetch in pack files and remove packed objects from list of objects to be fetched /*for _, pack := range storage.ObjectPacks() { storage.IterEncodedObjects() }*/ - fmt.Println("[-] Fetching objects") - queue = createQueue(2000) - wg.Add(maxConcurrency) + log.Info().Str("base", baseUrl).Msg("fetching object") for w := 1; w <= maxConcurrency; w++ { - go workers.FindObjectsWorker(c, queue, baseUrl, baseDir, &wg, storage) + go workers.FindObjectsWorker(c, baseUrl, baseDir, jt, storage) } for obj := range objs { - queue <- obj + jt.AddJob(obj) } - waitForQueue(queue) + jt.Wait() - fmt.Println("[-] Running git checkout .") + // TODO: does this even make sense??????? + if !utils.Exists(baseDir) { + return nil + } + + log.Info().Str("dir", baseDir).Msg("running git checkout .") cmd := exec.Command("git", "checkout", ".") cmd.Dir = baseDir stderr := &bytes.Buffer{} cmd.Stderr = stderr if err := cmd.Run(); err != nil { - if exErr, ok := err.(*exec.ExitError); ok && exErr.ProcessState.ExitCode() == 255 || exErr.ProcessState.ExitCode() == 128 { - fmt.Println("[-] Attempting to fetch missing files") + if exErr, ok := err.(*exec.ExitError); ok && (exErr.ProcessState.ExitCode() == 255 || exErr.ProcessState.ExitCode() == 128) { + log.Info().Str("base", baseUrl).Str("dir", baseDir).Msg("attempting to fetch missing files") out, err := ioutil.ReadAll(stderr) if err != nil { return err } errors := stdErrRegex.FindAllSubmatch(out, -1) - queue = createQueue(len(errors) * 3) + jt = jobtracker.NewJobTracker() concurrency := utils.MinInt(maxConcurrency, len(errors)) - wg.Add(concurrency) for w := 1; w <= concurrency; w++ { - go workers.DownloadWorker(c, queue, baseUrl, baseDir, &wg, true) + go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true) } for _, e := range errors { if !bytes.HasSuffix(e[1], phpSuffix) { - queue <- string(e[1]) + jt.AddJob(string(e[1])) } } - waitForQueue(queue) + jt.Wait() // Fetch files marked as missing in status + // TODO: why do we parse status AND decode index ??????? cmd := exec.Command("git", "status") cmd.Dir = baseDir stdout := &bytes.Buffer{} @@ -398,18 +376,17 @@ func FetchGit(baseUrl, baseDir string) error { return err } deleted := statusRegex.FindAllSubmatch(out, -1) - queue = createQueue(len(deleted) * 3) concurrency = utils.MinInt(maxConcurrency, len(deleted)) - wg.Add(concurrency) + jt = jobtracker.NewJobTracker() for w := 1; w <= concurrency; w++ { - go workers.DownloadWorker(c, queue, baseUrl, baseDir, &wg, true) + go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true) } for _, e := range deleted { if !bytes.HasSuffix(e[1], phpSuffix) { - queue <- string(e[1]) + jt.AddJob(string(e[1])) } } - waitForQueue(queue) + jt.Wait() } // Iterate over index to find missing files @@ -425,18 +402,17 @@ func FetchGit(baseUrl, baseDir string) error { fmt.Fprintf(os.Stderr, "error: %s\n", err) //return err } - queue = createQueue(len(idx.Entries) * 3) concurrency = utils.MinInt(maxConcurrency, len(idx.Entries)) - wg.Add(concurrency) + jt = jobtracker.NewJobTracker() for w := 1; w <= concurrency; w++ { - go workers.DownloadWorker(c, queue, baseUrl, baseDir, &wg, true) + go workers.DownloadWorker(c, baseUrl, baseDir, jt, true, true) } for _, entry := range idx.Entries { if !strings.HasSuffix(entry.Name, ".php") && !utils.Exists(utils.Url(baseDir, entry.Name)) { - queue <- entry.Name + jt.AddJob(entry.Name) } } - waitForQueue(queue) + jt.Wait() } } else {