derp: make RunConnectionLoop funcs take Messages, support PeerPresentFlags

PeerPresentFlags was added in 5ffb2668ef but wasn't plumbed through to
the RunConnectionLoop. Rather than add yet another parameter (as
IP:port was added earlier), pass in the raw PeerPresentMessage and
PeerGoneMessage struct values, which are the same things, plus two
fields: PeerGoneReasonType for gone and the PeerPresentFlags from
5ffb2668ef.

Updates tailscale/corp#17816

Change-Id: Ib19d9f95353651ada90656071fc3656cf58b7987
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick 2024-06-25 08:04:12 -07:00 committed by Brad Fitzpatrick
parent 7eb8a77ac8
commit 3485e4bf5a
4 changed files with 26 additions and 28 deletions

View File

@ -9,14 +9,12 @@ import (
"fmt" "fmt"
"log" "log"
"net" "net"
"net/netip"
"strings" "strings"
"time" "time"
"tailscale.com/derp" "tailscale.com/derp"
"tailscale.com/derp/derphttp" "tailscale.com/derp/derphttp"
"tailscale.com/net/netmon" "tailscale.com/net/netmon"
"tailscale.com/types/key"
"tailscale.com/types/logger" "tailscale.com/types/logger"
) )
@ -71,8 +69,8 @@ func startMeshWithHost(s *derp.Server, host string) error {
return d.DialContext(ctx, network, addr) return d.DialContext(ctx, network, addr)
}) })
add := func(k key.NodePublic, _ netip.AddrPort) { s.AddPacketForwarder(k, c) } add := func(m derp.PeerPresentMessage) { s.AddPacketForwarder(m.Key, c) }
remove := func(k key.NodePublic) { s.RemovePacketForwarder(k, c) } remove := func(m derp.PeerGoneMessage) { s.RemovePacketForwarder(m.Peer, c) }
go c.RunWatchConnectionLoop(context.Background(), s.PublicKey(), logf, add, remove) go c.RunWatchConnectionLoop(context.Background(), s.PublicKey(), logf, add, remove)
return nil return nil
} }

View File

@ -131,8 +131,9 @@ const (
type PeerGoneReasonType byte type PeerGoneReasonType byte
const ( const (
PeerGoneReasonDisconnected = PeerGoneReasonType(0x00) // peer disconnected from this server PeerGoneReasonDisconnected = PeerGoneReasonType(0x00) // peer disconnected from this server
PeerGoneReasonNotHere = PeerGoneReasonType(0x01) // server doesn't know about this peer, unexpected PeerGoneReasonNotHere = PeerGoneReasonType(0x01) // server doesn't know about this peer, unexpected
PeerGoneReasonMeshConnBroke = PeerGoneReasonType(0xf0) // invented by Client.RunWatchConnectionLoop on disconnect; not sent on the wire
) )
// PeerPresentFlags is an optional byte of bit flags sent after a framePeerPresent message. // PeerPresentFlags is an optional byte of bit flags sent after a framePeerPresent message.

View File

@ -11,7 +11,6 @@ import (
"net" "net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/netip"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -299,13 +298,13 @@ func TestBreakWatcherConnRecv(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
var peers int var peers int
add := func(k key.NodePublic, _ netip.AddrPort) { add := func(m derp.PeerPresentMessage) {
t.Logf("add: %v", k.ShortString()) t.Logf("add: %v", m.Key.ShortString())
peers++ peers++
// Signal that the watcher has run // Signal that the watcher has run
watcherChan <- peers watcherChan <- peers
} }
remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- } remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- }
watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove) watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove)
}() }()
@ -370,15 +369,15 @@ func TestBreakWatcherConn(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
var peers int var peers int
add := func(k key.NodePublic, _ netip.AddrPort) { add := func(m derp.PeerPresentMessage) {
t.Logf("add: %v", k.ShortString()) t.Logf("add: %v", m.Key.ShortString())
peers++ peers++
// Signal that the watcher has run // Signal that the watcher has run
watcherChan <- peers watcherChan <- peers
// Wait for breaker to run // Wait for breaker to run
<-breakerChan <-breakerChan
} }
remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- } remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- }
watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove) watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove)
}() }()
@ -407,8 +406,8 @@ func TestBreakWatcherConn(t *testing.T) {
} }
} }
func noopAdd(key.NodePublic, netip.AddrPort) {} func noopAdd(derp.PeerPresentMessage) {}
func noopRemove(key.NodePublic) {} func noopRemove(derp.PeerGoneMessage) {}
func TestRunWatchConnectionLoopServeConnect(t *testing.T) { func TestRunWatchConnectionLoopServeConnect(t *testing.T) {
defer func() { testHookWatchLookConnectResult = nil }() defer func() { testHookWatchLookConnectResult = nil }()

View File

@ -5,7 +5,6 @@ package derphttp
import ( import (
"context" "context"
"net/netip"
"sync" "sync"
"time" "time"
@ -35,9 +34,14 @@ var testHookWatchLookConnectResult func(connectError error, wasSelfConnect bool)
// To force RunWatchConnectionLoop to return quickly, its ctx needs to be // To force RunWatchConnectionLoop to return quickly, its ctx needs to be
// closed, and c itself needs to be closed. // closed, and c itself needs to be closed.
// //
// It is a fatal error to call this on an already-started Client withoutq having // It is a fatal error to call this on an already-started Client without having
// initialized Client.WatchConnectionChanges to true. // initialized Client.WatchConnectionChanges to true.
func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add func(key.NodePublic, netip.AddrPort), remove func(key.NodePublic)) { //
// If the DERP connection breaks and reconnects, remove will be called for all
// previously seen peers, with Reason type PeerGoneReasonSynthetic. Those
// clients are likely still connected and their add message will appear after
// reconnect.
func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add func(derp.PeerPresentMessage), remove func(derp.PeerGoneMessage)) {
if !c.WatchConnectionChanges { if !c.WatchConnectionChanges {
if c.isStarted() { if c.isStarted() {
panic("invalid use of RunWatchConnectionLoop on already-started Client without setting Client.RunWatchConnectionLoop") panic("invalid use of RunWatchConnectionLoop on already-started Client without setting Client.RunWatchConnectionLoop")
@ -62,7 +66,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
} }
logf("reconnected; clearing %d forwarding mappings", len(present)) logf("reconnected; clearing %d forwarding mappings", len(present))
for k := range present { for k := range present {
remove(k) remove(derp.PeerGoneMessage{Peer: k, Reason: derp.PeerGoneReasonMeshConnBroke})
} }
present = map[key.NodePublic]bool{} present = map[key.NodePublic]bool{}
} }
@ -84,13 +88,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
}) })
defer timer.Stop() defer timer.Stop()
updatePeer := func(k key.NodePublic, ipPort netip.AddrPort, isPresent bool) { updatePeer := func(k key.NodePublic, isPresent bool) {
if isPresent {
add(k, ipPort)
} else {
remove(k)
}
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if isPresent { if isPresent {
@ -148,7 +146,8 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
} }
switch m := m.(type) { switch m := m.(type) {
case derp.PeerPresentMessage: case derp.PeerPresentMessage:
updatePeer(m.Key, m.IPPort, true) add(m)
updatePeer(m.Key, true)
case derp.PeerGoneMessage: case derp.PeerGoneMessage:
switch m.Reason { switch m.Reason {
case derp.PeerGoneReasonDisconnected: case derp.PeerGoneReasonDisconnected:
@ -160,7 +159,8 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
logf("Recv: peer %s not at server %s for unknown reason %v", logf("Recv: peer %s not at server %s for unknown reason %v",
key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString(), m.Reason) key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString(), m.Reason)
} }
updatePeer(key.NodePublic(m.Peer), netip.AddrPort{}, false) remove(m)
updatePeer(m.Peer, false)
default: default:
continue continue
} }