wgengine/magicsock: track per-endpoint changes in ringbuffer
This change adds a ringbuffer to each magicsock endpoint that keeps a fixed set of "changes"–debug information about what updates have been made to that endpoint. Additionally, this adds a LocalAPI endpoint and associated "debug peer-status" CLI subcommand to fetch the set of changes for a given IP or hostname. Updates tailscale/corp#9364 Signed-off-by: Andrew Dunham <andrew@du.nham.ca> Change-Id: I34f726a71bddd0dfa36ec05ebafffb24f6e0516a
This commit is contained in:
parent
9245d813c6
commit
be107f92d3
|
@ -213,6 +213,11 @@ var debugCmd = &ffcli.Command{
|
|||
return fs
|
||||
})(),
|
||||
},
|
||||
{
|
||||
Name: "peer-endpoint-changes",
|
||||
Exec: runPeerEndpointChanges,
|
||||
ShortHelp: "prints debug information about a peer's endpoint changes",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -822,3 +827,61 @@ func debugPortmap(ctx context.Context, args []string) error {
|
|||
_, err = io.Copy(os.Stdout, rc)
|
||||
return err
|
||||
}
|
||||
|
||||
func runPeerEndpointChanges(ctx context.Context, args []string) error {
|
||||
st, err := localClient.Status(ctx)
|
||||
if err != nil {
|
||||
return fixTailscaledConnectError(err)
|
||||
}
|
||||
description, ok := isRunningOrStarting(st)
|
||||
if !ok {
|
||||
printf("%s\n", description)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if len(args) != 1 || args[0] == "" {
|
||||
return errors.New("usage: peer-status <hostname-or-IP>")
|
||||
}
|
||||
var ip string
|
||||
|
||||
hostOrIP := args[0]
|
||||
ip, self, err := tailscaleIPFromArg(ctx, hostOrIP)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if self {
|
||||
printf("%v is local Tailscale IP\n", ip)
|
||||
return nil
|
||||
}
|
||||
|
||||
if ip != hostOrIP {
|
||||
log.Printf("lookup %q => %q", hostOrIP, ip)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", "http://local-tailscaled.sock/localapi/v0/debug-peer-endpoint-changes?ip="+ip, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := localClient.DoLocalRequest(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var dst bytes.Buffer
|
||||
if err := json.Indent(&dst, body, "", " "); err != nil {
|
||||
return fmt.Errorf("indenting returned JSON: %w", err)
|
||||
}
|
||||
|
||||
if ss := dst.String(); !strings.HasSuffix(ss, "\n") {
|
||||
dst.WriteByte('\n')
|
||||
}
|
||||
fmt.Printf("%s", dst.String())
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -303,6 +303,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
|
|||
tailscale.com/util/osshare from tailscale.com/ipn/ipnlocal+
|
||||
W tailscale.com/util/pidowner from tailscale.com/ipn/ipnauth
|
||||
tailscale.com/util/racebuild from tailscale.com/logpolicy
|
||||
tailscale.com/util/ringbuffer from tailscale.com/wgengine/magicsock
|
||||
tailscale.com/util/set from tailscale.com/health+
|
||||
tailscale.com/util/singleflight from tailscale.com/control/controlclient+
|
||||
tailscale.com/util/slicesx from tailscale.com/net/dnscache+
|
||||
|
|
|
@ -42,6 +42,7 @@ var (
|
|||
regBool = map[string]*bool{}
|
||||
regOptBool = map[string]*opt.Bool{}
|
||||
regDuration = map[string]*time.Duration{}
|
||||
regInt = map[string]*int{}
|
||||
)
|
||||
|
||||
func noteEnv(k, v string) {
|
||||
|
@ -182,6 +183,25 @@ func RegisterDuration(envVar string) func() time.Duration {
|
|||
return func() time.Duration { return *p }
|
||||
}
|
||||
|
||||
// RegisterInt returns a func that gets the named environment variable as an
|
||||
// integer, without a map lookup per call. It assumes that any mutations happen
|
||||
// via envknob.Setenv.
|
||||
func RegisterInt(envVar string) func() int {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
p, ok := regInt[envVar]
|
||||
if !ok {
|
||||
val := os.Getenv(envVar)
|
||||
if val != "" {
|
||||
noteEnvLocked(envVar, val)
|
||||
}
|
||||
p = new(int)
|
||||
setIntLocked(p, envVar, val)
|
||||
regInt[envVar] = p
|
||||
}
|
||||
return func() int { return *p }
|
||||
}
|
||||
|
||||
func setBoolLocked(p *bool, envVar, val string) {
|
||||
noteEnvLocked(envVar, val)
|
||||
if val == "" {
|
||||
|
@ -221,6 +241,19 @@ func setDurationLocked(p *time.Duration, envVar, val string) {
|
|||
}
|
||||
}
|
||||
|
||||
func setIntLocked(p *int, envVar, val string) {
|
||||
noteEnvLocked(envVar, val)
|
||||
if val == "" {
|
||||
*p = 0
|
||||
return
|
||||
}
|
||||
var err error
|
||||
*p, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
log.Fatalf("invalid int environment variable %s value %q", envVar, val)
|
||||
}
|
||||
}
|
||||
|
||||
// Bool returns the boolean value of the named environment variable.
|
||||
// If the variable is not set, it returns false.
|
||||
// An invalid value exits the binary with a failure.
|
||||
|
|
|
@ -4897,3 +4897,25 @@ func (b *LocalBackend) StreamDebugCapture(ctx context.Context, w io.Writer) erro
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *LocalBackend) GetPeerEndpointChanges(ctx context.Context, ip netip.Addr) ([]magicsock.EndpointChange, error) {
|
||||
pip, ok := b.e.PeerForIP(ip)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("no matching peer")
|
||||
}
|
||||
if pip.IsSelf {
|
||||
return nil, fmt.Errorf("%v is local Tailscale IP", ip)
|
||||
}
|
||||
peer := pip.Node
|
||||
|
||||
mc, err := b.magicConn()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting magicsock conn: %w", err)
|
||||
}
|
||||
|
||||
chs, err := mc.GetEndpointChanges(peer)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting endpoint changes: %w", err)
|
||||
}
|
||||
return chs, nil
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ var handler = map[string]localAPIHandler{
|
|||
"debug-packet-filter-matches": (*Handler).serveDebugPacketFilterMatches,
|
||||
"debug-packet-filter-rules": (*Handler).serveDebugPacketFilterRules,
|
||||
"debug-portmap": (*Handler).serveDebugPortmap,
|
||||
"debug-peer-endpoint-changes": (*Handler).serveDebugPeerEndpointChanges,
|
||||
"debug-capture": (*Handler).serveDebugCapture,
|
||||
"derpmap": (*Handler).serveDERPMap,
|
||||
"dev-set-state-store": (*Handler).serveDevSetStateStore,
|
||||
|
@ -868,6 +869,34 @@ func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
|
|||
e.Encode(st)
|
||||
}
|
||||
|
||||
func (h *Handler) serveDebugPeerEndpointChanges(w http.ResponseWriter, r *http.Request) {
|
||||
if !h.PermitRead {
|
||||
http.Error(w, "status access denied", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
ipStr := r.FormValue("ip")
|
||||
if ipStr == "" {
|
||||
http.Error(w, "missing 'ip' parameter", 400)
|
||||
return
|
||||
}
|
||||
ip, err := netip.ParseAddr(ipStr)
|
||||
if err != nil {
|
||||
http.Error(w, "invalid IP", 400)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
chs, err := h.b.GetPeerEndpointChanges(r.Context(), ip)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
|
||||
e := json.NewEncoder(w)
|
||||
e.SetIndent("", "\t")
|
||||
e.Encode(chs)
|
||||
}
|
||||
|
||||
// InUseOtherUserIPNStream reports whether r is a request for the watch-ipn-bus
|
||||
// handler. If so, it writes an ipn.Notify InUseOtherUser message to the user
|
||||
// and returns true. Otherwise it returns false, in which case it doesn't write
|
||||
|
|
|
@ -62,6 +62,7 @@ import (
|
|||
"tailscale.com/types/nettype"
|
||||
"tailscale.com/util/clientmetric"
|
||||
"tailscale.com/util/mak"
|
||||
"tailscale.com/util/ringbuffer"
|
||||
"tailscale.com/util/uniq"
|
||||
"tailscale.com/version"
|
||||
"tailscale.com/wgengine/capture"
|
||||
|
@ -1022,6 +1023,25 @@ func (c *Conn) populateCLIPingResponseLocked(res *ipnstate.PingResult, latency t
|
|||
res.DERPRegionCode = c.derpRegionCodeLocked(regionID)
|
||||
}
|
||||
|
||||
// GetEndpointChanges returns the most recent changes for a particular
|
||||
// endpoint. The returned EndpointChange structs are for debug use only and
|
||||
// there are no guarantees about order, size, or content.
|
||||
func (c *Conn) GetEndpointChanges(peer *tailcfg.Node) ([]EndpointChange, error) {
|
||||
c.mu.Lock()
|
||||
if c.privateKey.IsZero() {
|
||||
c.mu.Unlock()
|
||||
return nil, fmt.Errorf("tailscaled stopped")
|
||||
}
|
||||
ep, ok := c.peerMap.endpointForNodeKey(peer.Key)
|
||||
c.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown peer")
|
||||
}
|
||||
|
||||
return ep.debugUpdates.GetAll(), nil
|
||||
}
|
||||
|
||||
func (c *Conn) derpRegionCodeLocked(regionID int) string {
|
||||
if c.derpMap == nil {
|
||||
return ""
|
||||
|
@ -2540,6 +2560,8 @@ func nodesEqual(x, y []*tailcfg.Node) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
var debugRingBufferMaxSizeBytes = envknob.RegisterInt("TS_DEBUG_MAGICSOCK_RING_BUFFER_MAX_SIZE_BYTES")
|
||||
|
||||
// SetNetworkMap is called when the control client gets a new network
|
||||
// map from the control server. It must always be non-nil.
|
||||
//
|
||||
|
@ -2574,6 +2596,30 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) {
|
|||
c.logf("[v1] magicsock: got updated network map; %d peers", len(nm.Peers))
|
||||
heartbeatDisabled := debugEnableSilentDisco() || (c.netMap != nil && c.netMap.Debug != nil && c.netMap.Debug.EnableSilentDisco)
|
||||
|
||||
// Set a maximum size for our set of endpoint ring buffers by assuming
|
||||
// that a single large update is ~500 bytes, and that we want to not
|
||||
// use more than 1MiB of memory on phones / 4MiB on other devices.
|
||||
// Calculate the per-endpoint ring buffer size by dividing that out,
|
||||
// but always storing at least two entries.
|
||||
var entriesPerBuffer int = 2
|
||||
if len(nm.Peers) > 0 {
|
||||
var maxRingBufferSize int
|
||||
if runtime.GOOS == "ios" || runtime.GOOS == "android" {
|
||||
maxRingBufferSize = 1 * 1024 * 1024
|
||||
} else {
|
||||
maxRingBufferSize = 4 * 1024 * 1024
|
||||
}
|
||||
if v := debugRingBufferMaxSizeBytes(); v > 0 {
|
||||
maxRingBufferSize = v
|
||||
}
|
||||
|
||||
const averageRingBufferElemSize = 512
|
||||
entriesPerBuffer = maxRingBufferSize / (averageRingBufferElemSize * len(nm.Peers))
|
||||
if entriesPerBuffer < 2 {
|
||||
entriesPerBuffer = 2
|
||||
}
|
||||
}
|
||||
|
||||
// Try a pass of just upserting nodes and creating missing
|
||||
// endpoints. If the set of nodes is the same, this is an
|
||||
// efficient alloc-free update. If the set of nodes is different,
|
||||
|
@ -2599,6 +2645,7 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) {
|
|||
|
||||
ep := &endpoint{
|
||||
c: c,
|
||||
debugUpdates: ringbuffer.New[EndpointChange](entriesPerBuffer),
|
||||
publicKey: n.Key,
|
||||
publicKeyHex: n.Key.UntypedHexString(),
|
||||
sentPing: map[stun.TxID]sentPing{},
|
||||
|
@ -2751,6 +2798,16 @@ func (c *Conn) logActiveDerpLocked() {
|
|||
}))
|
||||
}
|
||||
|
||||
// EndpointChange is a structure containing information about changes made to a
|
||||
// particular endpoint. This is not a stable interface and could change at any
|
||||
// time.
|
||||
type EndpointChange struct {
|
||||
When time.Time // when the change occurred
|
||||
What string // what this change is
|
||||
From any `json:",omitempty"` // information about the previous state
|
||||
To any `json:",omitempty"` // information about the new state
|
||||
}
|
||||
|
||||
func (c *Conn) logEndpointChange(endpoints []tailcfg.Endpoint) {
|
||||
c.logf("magicsock: endpoints changed: %s", logger.ArgWriter(func(buf *bufio.Writer) {
|
||||
for i, ep := range endpoints {
|
||||
|
@ -3706,6 +3763,7 @@ type endpoint struct {
|
|||
lastRecv mono.Time
|
||||
numStopAndResetAtomic int64
|
||||
sendFunc syncs.AtomicValue[endpointSendFunc] // nil or unset means unused
|
||||
debugUpdates *ringbuffer.RingBuffer[EndpointChange]
|
||||
|
||||
// These fields are initialized once and never modified.
|
||||
c *Conn
|
||||
|
@ -3847,9 +3905,19 @@ func (st *endpointState) shouldDeleteLocked() bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (de *endpoint) deleteEndpointLocked(ep netip.AddrPort) {
|
||||
func (de *endpoint) deleteEndpointLocked(why string, ep netip.AddrPort) {
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "deleteEndpointLocked-" + why,
|
||||
From: ep,
|
||||
})
|
||||
delete(de.endpointState, ep)
|
||||
if de.bestAddr.AddrPort == ep {
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "deleteEndpointLocked-bestAddr-" + why,
|
||||
From: de.bestAddr,
|
||||
})
|
||||
de.bestAddr = addrLatency{}
|
||||
}
|
||||
}
|
||||
|
@ -4189,7 +4257,7 @@ func (de *endpoint) sendPingsLocked(now mono.Time, sendCallMeMaybe bool) {
|
|||
var sentAny bool
|
||||
for ep, st := range de.endpointState {
|
||||
if st.shouldDeleteLocked() {
|
||||
de.deleteEndpointLocked(ep)
|
||||
de.deleteEndpointLocked("sendPingsLocked", ep)
|
||||
continue
|
||||
}
|
||||
if runtime.GOOS == "js" {
|
||||
|
@ -4233,17 +4301,39 @@ func (de *endpoint) updateFromNode(n *tailcfg.Node, heartbeatDisabled bool) {
|
|||
de.c.logf("[v1] magicsock: disco: node %s changed from %s to %s", de.publicKey.ShortString(), de.discoKey, n.DiscoKey)
|
||||
de.discoKey = n.DiscoKey
|
||||
de.discoShort = de.discoKey.ShortString()
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "updateFromNode-resetLocked",
|
||||
})
|
||||
de.resetLocked()
|
||||
}
|
||||
if n.DERP == "" {
|
||||
if de.derpAddr.IsValid() {
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "updateFromNode-remove-DERP",
|
||||
From: de.derpAddr,
|
||||
})
|
||||
}
|
||||
de.derpAddr = netip.AddrPort{}
|
||||
} else {
|
||||
de.derpAddr, _ = netip.ParseAddrPort(n.DERP)
|
||||
newDerp, _ := netip.ParseAddrPort(n.DERP)
|
||||
if de.derpAddr != newDerp {
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "updateFromNode-DERP",
|
||||
From: de.derpAddr,
|
||||
To: newDerp,
|
||||
})
|
||||
}
|
||||
de.derpAddr = newDerp
|
||||
}
|
||||
|
||||
for _, st := range de.endpointState {
|
||||
st.index = indexSentinelDeleted // assume deleted until updated in next loop
|
||||
}
|
||||
|
||||
var newIpps []netip.AddrPort
|
||||
for i, epStr := range n.Endpoints {
|
||||
if i > math.MaxInt16 {
|
||||
// Seems unlikely.
|
||||
|
@ -4258,14 +4348,22 @@ func (de *endpoint) updateFromNode(n *tailcfg.Node, heartbeatDisabled bool) {
|
|||
st.index = int16(i)
|
||||
} else {
|
||||
de.endpointState[ipp] = &endpointState{index: int16(i)}
|
||||
newIpps = append(newIpps, ipp)
|
||||
}
|
||||
}
|
||||
if len(newIpps) > 0 {
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "updateFromNode-new-Endpoints",
|
||||
To: newIpps,
|
||||
})
|
||||
}
|
||||
|
||||
// Now delete anything unless it's still in the network map or
|
||||
// was a recently discovered endpoint.
|
||||
for ep, st := range de.endpointState {
|
||||
if st.shouldDeleteLocked() {
|
||||
de.deleteEndpointLocked(ep)
|
||||
de.deleteEndpointLocked("updateFromNode", ep)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4308,7 +4406,7 @@ func (de *endpoint) addCandidateEndpoint(ep netip.AddrPort, forRxPingTxID stun.T
|
|||
if size := len(de.endpointState); size > 100 {
|
||||
for ep, st := range de.endpointState {
|
||||
if st.shouldDeleteLocked() {
|
||||
de.deleteEndpointLocked(ep)
|
||||
de.deleteEndpointLocked("addCandidateEndpoint", ep)
|
||||
}
|
||||
}
|
||||
size2 := len(de.endpointState)
|
||||
|
@ -4386,9 +4484,21 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip
|
|||
thisPong := addrLatency{sp.to, latency}
|
||||
if betterAddr(thisPong, de.bestAddr) {
|
||||
de.c.logf("magicsock: disco: node %v %v now using %v", de.publicKey.ShortString(), de.discoShort, sp.to)
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "handlePingLocked-bestAddr-update",
|
||||
From: de.bestAddr,
|
||||
To: thisPong,
|
||||
})
|
||||
de.bestAddr = thisPong
|
||||
}
|
||||
if de.bestAddr.AddrPort == thisPong.AddrPort {
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "handlePingLocked-bestAddr-latency",
|
||||
From: de.bestAddr,
|
||||
To: thisPong,
|
||||
})
|
||||
de.bestAddr.latency = latency
|
||||
de.bestAddrAt = now
|
||||
de.trustBestAddrUntil = now.Add(trustUDPAddrDuration)
|
||||
|
@ -4417,6 +4527,10 @@ type addrLatency struct {
|
|||
latency time.Duration
|
||||
}
|
||||
|
||||
func (a addrLatency) String() string {
|
||||
return a.AddrPort.String() + "@" + a.latency.String()
|
||||
}
|
||||
|
||||
// betterAddr reports whether a is a better addr to use than b.
|
||||
func betterAddr(a, b addrLatency) bool {
|
||||
if a.AddrPort == b.AddrPort {
|
||||
|
@ -4490,6 +4604,12 @@ func (de *endpoint) handleCallMeMaybe(m *disco.CallMeMaybe) {
|
|||
}
|
||||
}
|
||||
if len(newEPs) > 0 {
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "handleCallMeMaybe-new-endpoints",
|
||||
To: newEPs,
|
||||
})
|
||||
|
||||
de.c.dlogf("[v1] magicsock: disco: call-me-maybe from %v %v added new endpoints: %v",
|
||||
de.publicKey.ShortString(), de.discoShort,
|
||||
logger.ArgWriter(func(w *bufio.Writer) {
|
||||
|
@ -4507,7 +4627,7 @@ func (de *endpoint) handleCallMeMaybe(m *disco.CallMeMaybe) {
|
|||
for ep, want := range de.isCallMeMaybeEP {
|
||||
if !want {
|
||||
delete(de.isCallMeMaybeEP, ep)
|
||||
de.deleteEndpointLocked(ep)
|
||||
de.deleteEndpointLocked("handleCallMeMaybe", ep)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4551,6 +4671,10 @@ func (de *endpoint) stopAndReset() {
|
|||
de.c.logf("[v1] magicsock: doing cleanup for discovery key %s", de.discoKey.ShortString())
|
||||
}
|
||||
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "stopAndReset-resetLocked",
|
||||
})
|
||||
de.resetLocked()
|
||||
if de.heartBeatTimer != nil {
|
||||
de.heartBeatTimer.Stop()
|
||||
|
|
Loading…
Reference in New Issue