syncs: add Waiter as a way to wakeup worker goroutines

Signed-off-by: Maisem Ali <maisem@tailscale.com>
This commit is contained in:
Maisem Ali 2023-02-19 14:02:25 -08:00
parent d38abe90be
commit 4da7a50c4b
2 changed files with 60 additions and 0 deletions

View File

@ -12,6 +12,42 @@ import (
"tailscale.com/util/mak"
)
// Waiter is used to wake up a goroutine waiting for something to happen.
type Waiter struct {
ch chan struct{} // buffered chan of size 1
}
// NewWaiter returns a new Waiter.
func NewWaiter() *Waiter {
return &Waiter{ch: make(chan struct{}, 1)}
}
// Wake wakes up a goroutine waiting on Wait. It returns true if it managed to
// mark the waiter as woken up. If it returns false, a Wake was already pending.
// If there are multiple goroutines waiting, only one will wake up.
// If there are no goroutines waiting, the next call to Wait will return
// immediately. Multiple calls to Wake without a call to Wait in between will
// only wake up one goroutine.
func (t *Waiter) Wake() (ok bool) {
select {
case t.ch <- struct{}{}:
return true
default:
return false
}
}
// Wait blocks until Wake is called. If a wake is already pending, it returns
// immediately. If the context is canceled, it returns ctx.Err().
func (t *Waiter) Wait(ctx context.Context) error {
select {
case <-t.ch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// ClosedChan returns a channel that's already closed.
func ClosedChan() <-chan struct{} { return closedChan }

View File

@ -7,10 +7,34 @@ import (
"context"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
)
func TestWaiter(t *testing.T) {
w := NewWaiter()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if !w.Wake() {
t.Fatal("Wake() = false; want true")
}
if w.Wake() { // second call should return false
t.Fatal("Wake() = true; want false")
}
if err := w.Wait(ctx); err != nil {
t.Fatal(err)
}
go func() {
if !w.Wake() {
t.Errorf("Wake() = false; want true")
}
}()
if err := w.Wait(ctx); err != nil {
t.Fatal(err)
}
}
func TestWaitGroupChan(t *testing.T) {
wg := NewWaitGroupChan()