From 557ddced6c8ffbbe57f46958bf382a7e49b1de1d Mon Sep 17 00:00:00 2001 From: Rhea Ghosh Date: Fri, 6 Oct 2023 09:47:03 -0500 Subject: [PATCH] {ipn/ipnlocal, taildrop}: move put logic to taildrop (#9680) Cleaning up taildrop logic for sending files. Updates tailscale/corp#14772 Signed-off-by: Rhea Ghosh Co-authored-by: Joe Tsai --- ipn/ipnlocal/local.go | 19 +-- ipn/ipnlocal/peerapi.go | 178 +------------------------ ipn/ipnlocal/peerapi_test.go | 92 +------------ taildrop/{receiver.go => retrieve.go} | 4 +- taildrop/send.go | 182 ++++++++++++++++++++++++++ taildrop/taildrop.go | 32 ++++- taildrop/taildrop_test.go | 88 +++++++++++++ 7 files changed, 312 insertions(+), 283 deletions(-) rename taildrop/{receiver.go => retrieve.go} (99%) create mode 100644 taildrop/send.go create mode 100644 taildrop/taildrop_test.go diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 031e399d4..b92d45958 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -239,7 +239,6 @@ type LocalBackend struct { peerAPIServer *peerAPIServer // or nil peerAPIListeners []*peerAPIListener loginFlags controlclient.LoginFlags - incomingFiles map[*incomingFile]bool fileWaiters set.HandleSet[context.CancelFunc] // of wake-up funcs notifyWatchers set.HandleSet[*watchSession] lastStatusTime time.Time // status.AsOf value of the last processed status update @@ -2213,10 +2212,7 @@ func (b *LocalBackend) sendFileNotify() { // Make sure we always set n.IncomingFiles non-nil so it gets encoded // in JSON to clients. They distinguish between empty and non-nil // to know whether a Notify should be able about files. - n.IncomingFiles = make([]ipn.PartialFile, 0) - for f := range b.incomingFiles { - n.IncomingFiles = append(n.IncomingFiles, f.PartialFile()) - } + n.IncomingFiles = apiSrv.taildrop.IncomingFiles() b.mu.Unlock() sort.Slice(n.IncomingFiles, func(i, j int) bool { @@ -4590,19 +4586,6 @@ func (b *LocalBackend) SetDNS(ctx context.Context, name, value string) error { return cc.SetDNS(ctx, req) } -func (b *LocalBackend) registerIncomingFile(inf *incomingFile, active bool) { - b.mu.Lock() - defer b.mu.Unlock() - if b.incomingFiles == nil { - b.incomingFiles = make(map[*incomingFile]bool) - } - if active { - b.incomingFiles[inf] = true - } else { - delete(b.incomingFiles, inf) - } -} - func peerAPIPorts(peer tailcfg.NodeView) (p4, p6 uint16) { svcs := peer.Hostinfo().Services() for i := range svcs.LenIter() { diff --git a/ipn/ipnlocal/peerapi.go b/ipn/ipnlocal/peerapi.go index 71ce4f50f..f9c194fc8 100644 --- a/ipn/ipnlocal/peerapi.go +++ b/ipn/ipnlocal/peerapi.go @@ -15,7 +15,6 @@ import ( "net" "net/http" "net/netip" - "net/url" "os" "runtime" "slices" @@ -39,10 +38,8 @@ import ( "tailscale.com/net/sockstats" "tailscale.com/tailcfg" "tailscale.com/taildrop" - "tailscale.com/tstime" "tailscale.com/types/views" "tailscale.com/util/clientmetric" - "tailscale.com/version/distro" "tailscale.com/wgengine/filter" ) @@ -586,64 +583,6 @@ func (h *peerAPIHandler) handleServeSockStats(w http.ResponseWriter, r *http.Req fmt.Fprintln(w, "") } -type incomingFile struct { - clock tstime.Clock - - name string // "foo.jpg" - started time.Time - size int64 // or -1 if unknown; never 0 - w io.Writer // underlying writer - sendFileNotify func() // called when done - partialPath string // non-empty in direct mode - - mu sync.Mutex - copied int64 - done bool - lastNotify time.Time -} - -func (f *incomingFile) markAndNotifyDone() { - f.mu.Lock() - f.done = true - f.mu.Unlock() - f.sendFileNotify() -} - -func (f *incomingFile) Write(p []byte) (n int, err error) { - n, err = f.w.Write(p) - - var needNotify bool - defer func() { - if needNotify { - f.sendFileNotify() - } - }() - if n > 0 { - f.mu.Lock() - defer f.mu.Unlock() - f.copied += int64(n) - now := f.clock.Now() - if f.lastNotify.IsZero() || now.Sub(f.lastNotify) > time.Second { - f.lastNotify = now - needNotify = true - } - } - return n, err -} - -func (f *incomingFile) PartialFile() ipn.PartialFile { - f.mu.Lock() - defer f.mu.Unlock() - return ipn.PartialFile{ - Name: f.name, - Started: f.started, - DeclaredSize: f.size, - Received: f.copied, - PartialPath: f.partialPath, - Done: f.done, - } -} - // canPutFile reports whether h can put a file ("Taildrop") to this node. func (h *peerAPIHandler) canPutFile() bool { if h.peerNode.UnsignedPeerAPIOnly() { @@ -687,10 +626,6 @@ func (h *peerAPIHandler) peerHasCap(wantCap tailcfg.PeerCapability) bool { } func (h *peerAPIHandler) handlePeerPut(w http.ResponseWriter, r *http.Request) { - if !envknob.CanTaildrop() { - http.Error(w, "Taildrop disabled on device", http.StatusForbidden) - return - } if !h.canPutFile() { http.Error(w, "Taildrop access denied", http.StatusForbidden) return @@ -699,117 +634,12 @@ func (h *peerAPIHandler) handlePeerPut(w http.ResponseWriter, r *http.Request) { http.Error(w, "file sharing not enabled by Tailscale admin", http.StatusForbidden) return } - if r.Method != "PUT" { - http.Error(w, "expected method PUT", http.StatusMethodNotAllowed) - return - } - if mayDeref(h.ps.taildrop).RootDir == "" { - http.Error(w, taildrop.ErrNoTaildrop.Error(), http.StatusInternalServerError) - return - } - if distro.Get() == distro.Unraid && !h.ps.taildrop.DirectFileMode { - http.Error(w, "Taildrop folder not configured or accessible", http.StatusInternalServerError) - return - } - rawPath := r.URL.EscapedPath() - suffix, ok := strings.CutPrefix(rawPath, "/v0/put/") - if !ok { - http.Error(w, "misconfigured internals", 500) - return - } - if suffix == "" { - http.Error(w, "empty filename", 400) - return - } - if strings.Contains(suffix, "/") { - http.Error(w, "directories not supported", 400) - return - } - baseName, err := url.PathUnescape(suffix) - if err != nil { - http.Error(w, "bad path encoding", 400) - return - } - dstFile, ok := h.ps.taildrop.DiskPath(baseName) - if !ok { - http.Error(w, "bad filename", 400) - return - } t0 := h.ps.b.clock.Now() - // TODO(bradfitz): prevent same filename being sent by two peers at once - - // prevent same filename being sent twice - if _, err := os.Stat(dstFile); err == nil { - http.Error(w, "file exists", http.StatusConflict) - return + n, ok := h.ps.taildrop.HandlePut(w, r) + if ok { + d := h.ps.b.clock.Since(t0).Round(time.Second / 10) + h.logf("got put of %s in %v from %v/%v", approxSize(n), d, h.remoteAddr.Addr(), h.peerNode.ComputedName) } - - partialFile := dstFile + taildrop.PartialSuffix - f, err := os.Create(partialFile) - if err != nil { - h.logf("put Create error: %v", taildrop.RedactErr(err)) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - var success bool - defer func() { - if !success { - os.Remove(partialFile) - } - }() - var finalSize int64 - var inFile *incomingFile - if r.ContentLength != 0 { - inFile = &incomingFile{ - clock: h.ps.b.clock, - name: baseName, - started: h.ps.b.clock.Now(), - size: r.ContentLength, - w: f, - sendFileNotify: h.ps.b.sendFileNotify, - } - if h.ps.taildrop.DirectFileMode { - inFile.partialPath = partialFile - } - h.ps.b.registerIncomingFile(inFile, true) - defer h.ps.b.registerIncomingFile(inFile, false) - n, err := io.Copy(inFile, r.Body) - if err != nil { - err = taildrop.RedactErr(err) - f.Close() - h.logf("put Copy error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - finalSize = n - } - if err := taildrop.RedactErr(f.Close()); err != nil { - h.logf("put Close error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - if h.ps.taildrop.DirectFileMode && !h.ps.taildrop.DirectFileDoFinalRename { - if inFile != nil { // non-zero length; TODO: notify even for zero length - inFile.markAndNotifyDone() - } - } else { - if err := os.Rename(partialFile, dstFile); err != nil { - err = taildrop.RedactErr(err) - h.logf("put final rename: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } - - d := h.ps.b.clock.Since(t0).Round(time.Second / 10) - h.logf("got put of %s in %v from %v/%v", approxSize(finalSize), d, h.remoteAddr.Addr(), h.peerNode.ComputedName) - - // TODO: set modtime - // TODO: some real response - success = true - io.WriteString(w, "{}\n") - h.ps.taildrop.KnownEmpty.Store(false) - h.ps.b.sendFileNotify() } func approxSize(n int64) string { diff --git a/ipn/ipnlocal/peerapi_test.go b/ipn/ipnlocal/peerapi_test.go index 776897c50..71dfb3386 100644 --- a/ipn/ipnlocal/peerapi_test.go +++ b/ipn/ipnlocal/peerapi_test.go @@ -15,7 +15,6 @@ import ( "net/netip" "os" "path/filepath" - "runtime" "strings" "testing" @@ -494,7 +493,10 @@ func TestHandlePeerAPI(t *testing.T) { if !tt.omitRoot { rootDir = t.TempDir() if e.ph.ps.taildrop == nil { - e.ph.ps.taildrop = &taildrop.Handler{} + e.ph.ps.taildrop = &taildrop.Handler{ + Logf: e.logBuf.Logf, + Clock: &tstest.Clock{}, + } } e.ph.ps.taildrop.RootDir = rootDir } @@ -579,92 +581,6 @@ func TestFileDeleteRace(t *testing.T) { } } -// Tests "foo.jpg.deleted" marks (for Windows). -func TestDeletedMarkers(t *testing.T) { - dir := t.TempDir() - ps := &peerAPIServer{ - b: &LocalBackend{ - logf: t.Logf, - capFileSharing: true, - }, - taildrop: &taildrop.Handler{ - RootDir: dir, - }, - } - - nothingWaiting := func() { - t.Helper() - ps.taildrop.KnownEmpty.Store(false) - if ps.taildrop.HasFilesWaiting() { - t.Fatal("unexpected files waiting") - } - } - touch := func(base string) { - t.Helper() - if err := taildrop.TouchFile(filepath.Join(dir, base)); err != nil { - t.Fatal(err) - } - } - wantEmptyTempDir := func() { - t.Helper() - if fis, err := os.ReadDir(dir); err != nil { - t.Fatal(err) - } else if len(fis) > 0 && runtime.GOOS != "windows" { - for _, fi := range fis { - t.Errorf("unexpected file in tempdir: %q", fi.Name()) - } - } - } - - nothingWaiting() - wantEmptyTempDir() - - touch("foo.jpg.deleted") - nothingWaiting() - wantEmptyTempDir() - - touch("foo.jpg.deleted") - touch("foo.jpg") - nothingWaiting() - wantEmptyTempDir() - - touch("foo.jpg.deleted") - touch("foo.jpg") - wf, err := ps.taildrop.WaitingFiles() - if err != nil { - t.Fatal(err) - } - if len(wf) != 0 { - t.Fatalf("WaitingFiles = %d; want 0", len(wf)) - } - wantEmptyTempDir() - - touch("foo.jpg.deleted") - touch("foo.jpg") - if rc, _, err := ps.taildrop.OpenFile("foo.jpg"); err == nil { - rc.Close() - t.Fatal("unexpected foo.jpg open") - } - wantEmptyTempDir() - - // And verify basics still work in non-deleted cases. - touch("foo.jpg") - touch("bar.jpg.deleted") - if wf, err := ps.taildrop.WaitingFiles(); err != nil { - t.Error(err) - } else if len(wf) != 1 { - t.Errorf("WaitingFiles = %d; want 1", len(wf)) - } else if wf[0].Name != "foo.jpg" { - t.Errorf("unexpected waiting file %+v", wf[0]) - } - if rc, _, err := ps.taildrop.OpenFile("foo.jpg"); err != nil { - t.Fatal(err) - } else { - rc.Close() - } - -} - func TestPeerAPIReplyToDNSQueries(t *testing.T) { var h peerAPIHandler diff --git a/taildrop/receiver.go b/taildrop/retrieve.go similarity index 99% rename from taildrop/receiver.go rename to taildrop/retrieve.go index def924297..a472ee270 100644 --- a/taildrop/receiver.go +++ b/taildrop/retrieve.go @@ -25,7 +25,7 @@ func (s *Handler) HasFilesWaiting() bool { if s == nil || s.RootDir == "" || s.DirectFileMode { return false } - if s.KnownEmpty.Load() { + if s.knownEmpty.Load() { // Optimization: this is usually empty, so avoid opening // the directory and checking. We can't cache the actual // has-files-or-not values as the macOS/iOS client might @@ -66,7 +66,7 @@ func (s *Handler) HasFilesWaiting() bool { } } if err == io.EOF { - s.KnownEmpty.Store(true) + s.knownEmpty.Store(true) } if err != nil { break diff --git a/taildrop/send.go b/taildrop/send.go new file mode 100644 index 000000000..7546eeeef --- /dev/null +++ b/taildrop/send.go @@ -0,0 +1,182 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package taildrop + +import ( + "io" + "net/http" + "net/url" + "os" + "strings" + "sync" + "time" + + "tailscale.com/envknob" + "tailscale.com/tstime" + "tailscale.com/version/distro" +) + +type incomingFile struct { + clock tstime.Clock + + name string // "foo.jpg" + started time.Time + size int64 // or -1 if unknown; never 0 + w io.Writer // underlying writer + sendFileNotify func() // called when done + partialPath string // non-empty in direct mode + + mu sync.Mutex + copied int64 + done bool + lastNotify time.Time +} + +func (f *incomingFile) markAndNotifyDone() { + f.mu.Lock() + f.done = true + f.mu.Unlock() + f.sendFileNotify() +} + +func (f *incomingFile) Write(p []byte) (n int, err error) { + n, err = f.w.Write(p) + + var needNotify bool + defer func() { + if needNotify { + f.sendFileNotify() + } + }() + if n > 0 { + f.mu.Lock() + defer f.mu.Unlock() + f.copied += int64(n) + now := f.clock.Now() + if f.lastNotify.IsZero() || now.Sub(f.lastNotify) > time.Second { + f.lastNotify = now + needNotify = true + } + } + return n, err +} + +// HandlePut receives a file. +// It returns the number of bytes received and whether it was received successfully. +func (h *Handler) HandlePut(w http.ResponseWriter, r *http.Request) (finalSize int64, success bool) { + if !envknob.CanTaildrop() { + http.Error(w, "Taildrop disabled on device", http.StatusForbidden) + return finalSize, success + } + if r.Method != "PUT" { + http.Error(w, "expected method PUT", http.StatusMethodNotAllowed) + return finalSize, success + } + if h == nil || h.RootDir == "" { + http.Error(w, ErrNoTaildrop.Error(), http.StatusInternalServerError) + return finalSize, success + } + if distro.Get() == distro.Unraid && !h.DirectFileMode { + http.Error(w, "Taildrop folder not configured or accessible", http.StatusInternalServerError) + return finalSize, success + } + rawPath := r.URL.EscapedPath() + suffix, ok := strings.CutPrefix(rawPath, "/v0/put/") + if !ok { + http.Error(w, "misconfigured internals", http.StatusInternalServerError) + return finalSize, success + } + if suffix == "" { + http.Error(w, "empty filename", http.StatusBadRequest) + return finalSize, success + } + if strings.Contains(suffix, "/") { + http.Error(w, "directories not supported", http.StatusBadRequest) + return finalSize, success + } + baseName, err := url.PathUnescape(suffix) + if err != nil { + http.Error(w, "bad path encoding", http.StatusBadRequest) + return finalSize, success + } + dstFile, ok := h.DiskPath(baseName) + if !ok { + http.Error(w, "bad filename", 400) + return finalSize, success + } + // TODO(bradfitz): prevent same filename being sent by two peers at once + + // prevent same filename being sent twice + if _, err := os.Stat(dstFile); err == nil { + http.Error(w, "file exists", http.StatusConflict) + return finalSize, success + } + + partialFile := dstFile + PartialSuffix + f, err := os.Create(partialFile) + if err != nil { + h.Logf("put Create error: %v", RedactErr(err)) + http.Error(w, err.Error(), http.StatusInternalServerError) + return finalSize, success + } + defer func() { + if !success { + os.Remove(partialFile) + } + }() + var inFile *incomingFile + sendFileNotify := h.SendFileNotify + if sendFileNotify == nil { + sendFileNotify = func() {} // avoid nil panics below + } + if r.ContentLength != 0 { + inFile = &incomingFile{ + clock: h.Clock, + name: baseName, + started: h.Clock.Now(), + size: r.ContentLength, + w: f, + sendFileNotify: sendFileNotify, + } + if h.DirectFileMode { + inFile.partialPath = partialFile + } + h.incomingFiles.Store(inFile, struct{}{}) + defer h.incomingFiles.Delete(inFile) + n, err := io.Copy(inFile, r.Body) + if err != nil { + err = RedactErr(err) + f.Close() + h.Logf("put Copy error: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return finalSize, success + } + finalSize = n + } + if err := RedactErr(f.Close()); err != nil { + h.Logf("put Close error: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return finalSize, success + } + if h.DirectFileMode && !h.DirectFileDoFinalRename { + if inFile != nil { // non-zero length; TODO: notify even for zero length + inFile.markAndNotifyDone() + } + } else { + if err := os.Rename(partialFile, dstFile); err != nil { + err = RedactErr(err) + h.Logf("put final rename: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return finalSize, success + } + } + + // TODO: set modtime + // TODO: some real response + success = true + io.WriteString(w, "{}\n") + h.knownEmpty.Store(false) + sendFileNotify() + return finalSize, success +} diff --git a/taildrop/taildrop.go b/taildrop/taildrop.go index 1c9caf389..46342f1a4 100644 --- a/taildrop/taildrop.go +++ b/taildrop/taildrop.go @@ -15,6 +15,8 @@ import ( "unicode" "unicode/utf8" + "tailscale.com/ipn" + "tailscale.com/syncs" "tailscale.com/tstime" "tailscale.com/types/logger" "tailscale.com/util/multierr" @@ -41,7 +43,14 @@ type Handler struct { // it's received. DirectFileDoFinalRename bool - KnownEmpty atomic.Bool + // SendFileNotify is called periodically while a file is actively + // receiving the contents for the file. There is a final call + // to the function when reception completes. + SendFileNotify func() + + knownEmpty atomic.Bool + + incomingFiles syncs.Map[*incomingFile, struct{}] } var ( @@ -113,6 +122,27 @@ func (s *Handler) DiskPath(baseName string) (fullPath string, ok bool) { return filepath.Join(s.RootDir, baseName), true } +func (s *Handler) IncomingFiles() []ipn.PartialFile { + // Make sure we always set n.IncomingFiles non-nil so it gets encoded + // in JSON to clients. They distinguish between empty and non-nil + // to know whether a Notify should be able about files. + files := make([]ipn.PartialFile, 0) + s.incomingFiles.Range(func(f *incomingFile, _ struct{}) bool { + f.mu.Lock() + defer f.mu.Unlock() + files = append(files, ipn.PartialFile{ + Name: f.name, + Started: f.started, + DeclaredSize: f.size, + Received: f.copied, + PartialPath: f.partialPath, + Done: f.done, + }) + return true + }) + return files +} + type redactedErr struct { msg string inner error diff --git a/taildrop/taildrop_test.go b/taildrop/taildrop_test.go new file mode 100644 index 000000000..3f37f3582 --- /dev/null +++ b/taildrop/taildrop_test.go @@ -0,0 +1,88 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package taildrop + +import ( + "os" + "path/filepath" + "runtime" + "testing" +) + +// Tests "foo.jpg.deleted" marks (for Windows). +func TestDeletedMarkers(t *testing.T) { + dir := t.TempDir() + h := &Handler{RootDir: dir} + + nothingWaiting := func() { + t.Helper() + h.knownEmpty.Store(false) + if h.HasFilesWaiting() { + t.Fatal("unexpected files waiting") + } + } + touch := func(base string) { + t.Helper() + if err := TouchFile(filepath.Join(dir, base)); err != nil { + t.Fatal(err) + } + } + wantEmptyTempDir := func() { + t.Helper() + if fis, err := os.ReadDir(dir); err != nil { + t.Fatal(err) + } else if len(fis) > 0 && runtime.GOOS != "windows" { + for _, fi := range fis { + t.Errorf("unexpected file in tempdir: %q", fi.Name()) + } + } + } + + nothingWaiting() + wantEmptyTempDir() + + touch("foo.jpg.deleted") + nothingWaiting() + wantEmptyTempDir() + + touch("foo.jpg.deleted") + touch("foo.jpg") + nothingWaiting() + wantEmptyTempDir() + + touch("foo.jpg.deleted") + touch("foo.jpg") + wf, err := h.WaitingFiles() + if err != nil { + t.Fatal(err) + } + if len(wf) != 0 { + t.Fatalf("WaitingFiles = %d; want 0", len(wf)) + } + wantEmptyTempDir() + + touch("foo.jpg.deleted") + touch("foo.jpg") + if rc, _, err := h.OpenFile("foo.jpg"); err == nil { + rc.Close() + t.Fatal("unexpected foo.jpg open") + } + wantEmptyTempDir() + + // And verify basics still work in non-deleted cases. + touch("foo.jpg") + touch("bar.jpg.deleted") + if wf, err := h.WaitingFiles(); err != nil { + t.Error(err) + } else if len(wf) != 1 { + t.Errorf("WaitingFiles = %d; want 1", len(wf)) + } else if wf[0].Name != "foo.jpg" { + t.Errorf("unexpected waiting file %+v", wf[0]) + } + if rc, _, err := h.OpenFile("foo.jpg"); err != nil { + t.Fatal(err) + } else { + rc.Close() + } +}