diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index eeb57a3a8..e363278d5 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -348,6 +348,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de 💣 tailscale.com/util/deephash from tailscale.com/ipn/ipnlocal+ L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics+ tailscale.com/util/dnsname from tailscale.com/hostinfo+ + tailscale.com/util/execqueue from tailscale.com/control/controlclient tailscale.com/util/goroutines from tailscale.com/ipn/ipnlocal tailscale.com/util/groupmember from tailscale.com/ipn/ipnauth+ 💣 tailscale.com/util/hashx from tailscale.com/util/deephash diff --git a/control/controlclient/auto.go b/control/controlclient/auto.go index 86b03efa5..d0551cdab 100644 --- a/control/controlclient/auto.go +++ b/control/controlclient/auto.go @@ -22,6 +22,7 @@ import ( "tailscale.com/types/netmap" "tailscale.com/types/persist" "tailscale.com/types/structs" + "tailscale.com/util/execqueue" ) type LoginGoal struct { @@ -118,7 +119,7 @@ type Auto struct { closed bool updateCh chan struct{} // readable when we should inform the server of a change observer Observer // called to update Client status; always non-nil - observerQueue execQueue + observerQueue execqueue.ExecQueue unregisterHealthWatch func() @@ -675,7 +676,7 @@ func (c *Auto) Shutdown() { direct := c.direct if !closed { c.closed = true - c.observerQueue.shutdown() + c.observerQueue.Shutdown() c.cancelAuthCtxLocked() c.cancelMapCtxLocked() for _, w := range c.unpauseWaiters { @@ -696,7 +697,7 @@ func (c *Auto) Shutdown() { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - c.observerQueue.wait(ctx) + c.observerQueue.Wait(ctx) c.logf("Client.Shutdown done.") } } @@ -737,95 +738,3 @@ func (c *Auto) DoNoiseRequest(req *http.Request) (*http.Response, error) { func (c *Auto) GetSingleUseNoiseRoundTripper(ctx context.Context) (http.RoundTripper, *tailcfg.EarlyNoise, error) { return c.direct.GetSingleUseNoiseRoundTripper(ctx) } - -type execQueue struct { - mu sync.Mutex - closed bool - inFlight bool // whether a goroutine is running q.run - doneWaiter chan struct{} // non-nil if waiter is waiting, then closed - queue []func() -} - -func (q *execQueue) Add(f func()) { - q.mu.Lock() - defer q.mu.Unlock() - if q.closed { - return - } - if q.inFlight { - q.queue = append(q.queue, f) - } else { - q.inFlight = true - go q.run(f) - } -} - -// RunSync waits for the queue to be drained and then synchronously runs f. -// It returns an error if the queue is closed before f is run or ctx expires. -func (q *execQueue) RunSync(ctx context.Context, f func()) error { - for { - if err := q.wait(ctx); err != nil { - return err - } - q.mu.Lock() - if q.inFlight { - q.mu.Unlock() - continue - } - defer q.mu.Unlock() - if q.closed { - return errors.New("closed") - } - f() - return nil - } -} - -func (q *execQueue) run(f func()) { - f() - - q.mu.Lock() - for len(q.queue) > 0 && !q.closed { - f := q.queue[0] - q.queue[0] = nil - q.queue = q.queue[1:] - q.mu.Unlock() - f() - q.mu.Lock() - } - q.inFlight = false - q.queue = nil - if q.doneWaiter != nil { - close(q.doneWaiter) - q.doneWaiter = nil - } - q.mu.Unlock() -} - -func (q *execQueue) shutdown() { - q.mu.Lock() - defer q.mu.Unlock() - q.closed = true -} - -// wait waits for the queue to be empty. -func (q *execQueue) wait(ctx context.Context) error { - q.mu.Lock() - waitCh := q.doneWaiter - if q.inFlight && waitCh == nil { - waitCh = make(chan struct{}) - q.doneWaiter = waitCh - } - q.mu.Unlock() - - if waitCh == nil { - return nil - } - - select { - case <-waitCh: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go new file mode 100644 index 000000000..889cea255 --- /dev/null +++ b/util/execqueue/execqueue.go @@ -0,0 +1,104 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package execqueue implements an ordered asynchronous queue for executing functions. +package execqueue + +import ( + "context" + "errors" + "sync" +) + +type ExecQueue struct { + mu sync.Mutex + closed bool + inFlight bool // whether a goroutine is running q.run + doneWaiter chan struct{} // non-nil if waiter is waiting, then closed + queue []func() +} + +func (q *ExecQueue) Add(f func()) { + q.mu.Lock() + defer q.mu.Unlock() + if q.closed { + return + } + if q.inFlight { + q.queue = append(q.queue, f) + } else { + q.inFlight = true + go q.run(f) + } +} + +// RunSync waits for the queue to be drained and then synchronously runs f. +// It returns an error if the queue is closed before f is run or ctx expires. +func (q *ExecQueue) RunSync(ctx context.Context, f func()) error { + for { + if err := q.Wait(ctx); err != nil { + return err + } + q.mu.Lock() + if q.inFlight { + q.mu.Unlock() + continue + } + defer q.mu.Unlock() + if q.closed { + return errors.New("closed") + } + f() + return nil + } +} + +func (q *ExecQueue) run(f func()) { + f() + + q.mu.Lock() + for len(q.queue) > 0 && !q.closed { + f := q.queue[0] + q.queue[0] = nil + q.queue = q.queue[1:] + q.mu.Unlock() + f() + q.mu.Lock() + } + q.inFlight = false + q.queue = nil + if q.doneWaiter != nil { + close(q.doneWaiter) + q.doneWaiter = nil + } + q.mu.Unlock() +} + +// Shutdown asynchronously signals the queue to stop. +func (q *ExecQueue) Shutdown() { + q.mu.Lock() + defer q.mu.Unlock() + q.closed = true +} + +// Wait waits for the queue to be empty. +func (q *ExecQueue) Wait(ctx context.Context) error { + q.mu.Lock() + waitCh := q.doneWaiter + if q.inFlight && waitCh == nil { + waitCh = make(chan struct{}) + q.doneWaiter = waitCh + } + q.mu.Unlock() + + if waitCh == nil { + return nil + } + + select { + case <-waitCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/util/execqueue/execqueue_test.go b/util/execqueue/execqueue_test.go new file mode 100644 index 000000000..d10b741f7 --- /dev/null +++ b/util/execqueue/execqueue_test.go @@ -0,0 +1,22 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package execqueue + +import ( + "context" + "sync/atomic" + "testing" +) + +func TestExecQueue(t *testing.T) { + ctx := context.Background() + var n atomic.Int32 + q := &ExecQueue{} + defer q.Shutdown() + q.Add(func() { n.Add(1) }) + q.Wait(ctx) + if got := n.Load(); got != 1 { + t.Errorf("n=%d; want 1", got) + } +}