wgengine: remove all peer status from open timeout diagnostics
Avoid contention from fetching status for all peers, and instead fetch status for a single peer. Updates tailscale/coral#72 Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
parent
ad1cc6cff9
commit
81dba3738e
|
@ -9,7 +9,6 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"tailscale.com/ipn/ipnstate"
|
|
||||||
"tailscale.com/net/flowtrack"
|
"tailscale.com/net/flowtrack"
|
||||||
"tailscale.com/net/packet"
|
"tailscale.com/net/packet"
|
||||||
"tailscale.com/net/tsaddr"
|
"tailscale.com/net/tsaddr"
|
||||||
|
@ -157,28 +156,8 @@ func (e *userspaceEngine) onOpenTimeout(flow flowtrack.Tuple) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't care if this information is perfectly up-to-date, since
|
ps, found := e.getPeerStatusLite(n.Key)
|
||||||
// we're just using it to print debug information.
|
if !found {
|
||||||
//
|
|
||||||
// In tailscale/coral#72, we see a goroutine profile with thousands of
|
|
||||||
// goroutines blocked on the mutex in getStatus here, so we wrap it in
|
|
||||||
// a singleflight and accept stale information to reduce contention.
|
|
||||||
st, err, _ := e.getStatusSf.Do(struct{}{}, e.getStatus)
|
|
||||||
|
|
||||||
var ps *ipnstate.PeerStatusLite
|
|
||||||
if err == nil {
|
|
||||||
for _, v := range st.Peers {
|
|
||||||
if v.NodeKey == n.Key {
|
|
||||||
v := v // copy
|
|
||||||
ps = &v
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
e.logf("open-conn-track: timeout opening %v to node %v; failed to get engine status: %v", flow, n.Key.ShortString(), err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if ps == nil {
|
|
||||||
onlyZeroRoute := true // whether peerForIP returned n only because its /0 route matched
|
onlyZeroRoute := true // whether peerForIP returned n only because its /0 route matched
|
||||||
for _, r := range n.AllowedIPs {
|
for _, r := range n.AllowedIPs {
|
||||||
if r.Bits() != 0 && r.Contains(flow.Dst.Addr()) {
|
if r.Bits() != 0 && r.Contains(flow.Dst.Addr()) {
|
||||||
|
|
|
@ -982,25 +982,30 @@ var singleNewline = []byte{'\n'}
|
||||||
|
|
||||||
var ErrEngineClosing = errors.New("engine closing; no status")
|
var ErrEngineClosing = errors.New("engine closing; no status")
|
||||||
|
|
||||||
|
func (e *userspaceEngine) getPeerStatusLite(pk key.NodePublic) (status ipnstate.PeerStatusLite, ok bool) {
|
||||||
|
e.wgLock.Lock()
|
||||||
|
if e.wgdev == nil {
|
||||||
|
e.wgLock.Unlock()
|
||||||
|
return status, false
|
||||||
|
}
|
||||||
|
peer := e.wgdev.LookupPeer(pk.Raw32())
|
||||||
|
e.wgLock.Unlock()
|
||||||
|
if peer == nil {
|
||||||
|
return status, false
|
||||||
|
}
|
||||||
|
status.NodeKey = pk
|
||||||
|
status.RxBytes = int64(wgint.PeerRxBytes(peer))
|
||||||
|
status.TxBytes = int64(wgint.PeerTxBytes(peer))
|
||||||
|
status.LastHandshake = time.Unix(0, wgint.PeerLastHandshakeNano(peer))
|
||||||
|
return status, true
|
||||||
|
}
|
||||||
|
|
||||||
func (e *userspaceEngine) getStatus() (*Status, error) {
|
func (e *userspaceEngine) getStatus() (*Status, error) {
|
||||||
// Grab derpConns before acquiring wgLock to not violate lock ordering;
|
// Grab derpConns before acquiring wgLock to not violate lock ordering;
|
||||||
// the DERPs method acquires magicsock.Conn.mu.
|
// the DERPs method acquires magicsock.Conn.mu.
|
||||||
// (See comment in userspaceEngine's declaration.)
|
// (See comment in userspaceEngine's declaration.)
|
||||||
derpConns := e.magicConn.DERPs()
|
derpConns := e.magicConn.DERPs()
|
||||||
|
|
||||||
e.wgLock.Lock()
|
|
||||||
wgdev := e.wgdev
|
|
||||||
e.wgLock.Unlock()
|
|
||||||
|
|
||||||
// Assume that once created, wgdev is typically not replaced in-flight.
|
|
||||||
if wgdev == nil {
|
|
||||||
// RequestStatus was invoked before the wgengine has
|
|
||||||
// finished initializing. This can happen when wgegine
|
|
||||||
// provides a callback to magicsock for endpoint
|
|
||||||
// updates that calls RequestStatus.
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
closing := e.closing
|
closing := e.closing
|
||||||
peerKeys := make([]key.NodePublic, len(e.peerSequence))
|
peerKeys := make([]key.NodePublic, len(e.peerSequence))
|
||||||
|
@ -1013,18 +1018,9 @@ func (e *userspaceEngine) getStatus() (*Status, error) {
|
||||||
|
|
||||||
peers := make([]ipnstate.PeerStatusLite, 0, len(peerKeys))
|
peers := make([]ipnstate.PeerStatusLite, 0, len(peerKeys))
|
||||||
for _, key := range peerKeys {
|
for _, key := range peerKeys {
|
||||||
// LookupPeer is internally locked in wgdev.
|
if status, found := e.getPeerStatusLite(key); found {
|
||||||
peer := wgdev.LookupPeer(key.Raw32())
|
peers = append(peers, status)
|
||||||
if peer == nil {
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var p ipnstate.PeerStatusLite
|
|
||||||
p.NodeKey = key
|
|
||||||
p.RxBytes = int64(wgint.PeerRxBytes(peer))
|
|
||||||
p.TxBytes = int64(wgint.PeerTxBytes(peer))
|
|
||||||
p.LastHandshake = time.Unix(0, wgint.PeerLastHandshakeNano(peer))
|
|
||||||
peers = append(peers, p)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Status{
|
return &Status{
|
||||||
|
|
|
@ -5,14 +5,9 @@
|
||||||
package wgengine
|
package wgengine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"regexp"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"tailscale.com/tstest"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWatchdog(t *testing.T) {
|
func TestWatchdog(t *testing.T) {
|
||||||
|
@ -42,49 +37,4 @@ func TestWatchdog(t *testing.T) {
|
||||||
e.RequestStatus()
|
e.RequestStatus()
|
||||||
e.Close()
|
e.Close()
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("watchdog fires on blocked getStatus", func(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
e, err := NewFakeUserspaceEngine(t.Logf, 0)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
t.Cleanup(e.Close)
|
|
||||||
usEngine := e.(*userspaceEngine)
|
|
||||||
e = NewWatchdog(e)
|
|
||||||
wdEngine := e.(*watchdogEngine)
|
|
||||||
wdEngine.maxWait = maxWaitMultiple * 100 * time.Millisecond
|
|
||||||
|
|
||||||
logBuf := new(tstest.MemLogger)
|
|
||||||
fatalCalled := make(chan struct{})
|
|
||||||
wdEngine.logf = logBuf.Logf
|
|
||||||
wdEngine.fatalf = func(format string, args ...any) {
|
|
||||||
t.Logf("FATAL: %s", fmt.Sprintf(format, args...))
|
|
||||||
fatalCalled <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
usEngine.wgLock.Lock() // blocks getStatus so the watchdog will fire
|
|
||||||
|
|
||||||
go e.RequestStatus()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-fatalCalled:
|
|
||||||
logs := logBuf.String()
|
|
||||||
if !strings.Contains(logs, "goroutine profile: total ") {
|
|
||||||
t.Errorf("fatal called without watchdog stacks, got: %s", logs)
|
|
||||||
}
|
|
||||||
re := regexp.MustCompile(`(?m)^\s*in-flight\[\d+\]: name=RequestStatus duration=.* start=.*$`)
|
|
||||||
if !re.MatchString(logs) {
|
|
||||||
t.Errorf("fatal called without in-flight list, got: %s", logs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// expected
|
|
||||||
case <-time.After(3 * time.Second):
|
|
||||||
t.Fatalf("watchdog failed to fire")
|
|
||||||
}
|
|
||||||
|
|
||||||
usEngine.wgLock.Unlock()
|
|
||||||
wdEngine.fatalf = t.Fatalf
|
|
||||||
wdEngine.Close()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue