From a975e86bb8a42f0be9faf16067ee55a6da55d767 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 28 Jun 2020 11:53:37 -0700 Subject: [PATCH] wgengine/magicsock: add new endpoint type used for discovery-supporting peers This adds a new magicsock endpoint type only used when both sides support discovery (that is, are advertising a discovery key). Otherwise the old code is used. So far the new code only communicates over DERP as proof that the new code paths are wired up. None of the actually discovery messaging is implemented yet. Support for discovery (generating and advertising a key) are still behind an environment variable for now. Updates #483 --- control/controlclient/direct.go | 46 +++++- control/controlclient/netmap.go | 47 +++--- ipn/local.go | 4 +- wgengine/magicsock/magicsock.go | 231 ++++++++++++++++++++++++--- wgengine/magicsock/magicsock_test.go | 4 +- 5 files changed, 282 insertions(+), 50 deletions(-) diff --git a/control/controlclient/direct.go b/control/controlclient/direct.go index 8658fd220..e26f864ee 100644 --- a/control/controlclient/direct.go +++ b/control/controlclient/direct.go @@ -451,8 +451,6 @@ func (c *Direct) SetEndpoints(localPort uint16, endpoints []string) (changed boo return c.newEndpoints(localPort, endpoints) } -var debugNetmap, _ = strconv.ParseBool(os.Getenv("TS_DEBUG_NETMAP")) - func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkMap)) error { c.mu.Lock() persist := c.persist @@ -472,7 +470,7 @@ func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkM c.logf("PollNetMap: stream=%v :%v %v", maxPolls, localPort, ep) vlogf := logger.Discard - if debugNetmap { + if Debug.NetMap { vlogf = c.logf } @@ -603,6 +601,18 @@ func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkM if resp.Debug != nil && resp.Debug.LogHeapPprof { go logheap.LogHeap(resp.Debug.LogHeapURL) } + // Temporarily (2020-06-29) support removing all but + // discovery-supporting nodes during development, for + // less noise. + if Debug.OnlyDisco { + filtered := resp.Peers[:0] + for _, p := range resp.Peers { + if !p.DiscoKey.IsZero() { + filtered = append(filtered, p) + } + } + resp.Peers = filtered + } nm := &NetworkMap{ NodeKey: tailcfg.NodeKey(persist.PrivateNodeKey.Public()), @@ -766,3 +776,33 @@ func loadServerKey(ctx context.Context, httpc *http.Client, serverURL string) (w } return key, nil } + +// Debug contains temporary internal-only debug knobs. +// They're unexported to not draw attention to them. +var Debug = initDebug() + +type debug struct { + NetMap bool + OnlyDisco bool + Disco bool +} + +func initDebug() debug { + return debug{ + NetMap: envBool("TS_DEBUG_NETMAP"), + OnlyDisco: os.Getenv("TS_DEBUG_USE_DISCO") == "only", + Disco: os.Getenv("TS_DEBUG_USE_DISCO") == "only" || envBool("TS_DEBUG_USE_DISCO"), + } +} + +func envBool(k string) bool { + e := os.Getenv(k) + if e == "" { + return false + } + v, err := strconv.ParseBool(os.Getenv("TS_DEBUG_NETMAP")) + if err != nil { + panic(fmt.Sprintf("invalid non-bool %q for env var %q", e, k)) + } + return v +} diff --git a/control/controlclient/netmap.go b/control/controlclient/netmap.go index 0556a25e4..917f87463 100644 --- a/control/controlclient/netmap.go +++ b/control/controlclient/netmap.go @@ -204,6 +204,11 @@ func (nm *NetworkMap) WGCfg(logf logger.Logf, uflags int, dnsOverride []wgcfg.IP return wgcfg.FromWgQuick(s, "tailscale") } +// EndpointDiscoSuffix is appended to the hex representation of a peer's discovery key +// and is then the sole wireguard endpoint for peers with a non-zero discovery key. +// This form is then recognize by magicsock's CreateEndpoint. +const EndpointDiscoSuffix = ".disco.tailscale:12345" + func (nm *NetworkMap) _WireGuardConfig(logf logger.Logf, uflags int, dnsOverride []wgcfg.IP, allEndpoints bool) string { buf := new(strings.Builder) fmt.Fprintf(buf, "[Interface]\n") @@ -229,6 +234,9 @@ func (nm *NetworkMap) _WireGuardConfig(logf logger.Logf, uflags int, dnsOverride fmt.Fprintf(buf, "\n") for i, peer := range nm.Peers { + if Debug.OnlyDisco && peer.DiscoKey.IsZero() { + continue + } if (uflags&UAllowSingleHosts) == 0 && len(peer.AllowedIPs) < 2 { logf("wgcfg: %v skipping a single-host peer.\n", peer.Key.ShortString()) continue @@ -239,25 +247,28 @@ func (nm *NetworkMap) _WireGuardConfig(logf logger.Logf, uflags int, dnsOverride fmt.Fprintf(buf, "[Peer]\n") fmt.Fprintf(buf, "PublicKey = %s\n", base64.StdEncoding.EncodeToString(peer.Key[:])) var endpoints []string - if peer.DERP != "" { - endpoints = append(endpoints, peer.DERP) - } - endpoints = append(endpoints, peer.Endpoints...) - if len(endpoints) > 0 { - if len(endpoints) == 1 { - fmt.Fprintf(buf, "Endpoint = %s", endpoints[0]) - } else if allEndpoints { - // TODO(apenwarr): This mode is incompatible. - // Normal wireguard clients don't know how to - // parse it (yet?) - fmt.Fprintf(buf, "Endpoint = %s", - strings.Join(endpoints, ",")) - } else { - fmt.Fprintf(buf, "Endpoint = %s # other endpoints: %s", - endpoints[0], - strings.Join(endpoints[1:], ", ")) + if !peer.DiscoKey.IsZero() { + fmt.Fprintf(buf, "Endpoint = %x%s\n", peer.DiscoKey[:], EndpointDiscoSuffix) + } else { + if peer.DERP != "" { + endpoints = append(endpoints, peer.DERP) + } + endpoints = append(endpoints, peer.Endpoints...) + if len(endpoints) > 0 { + if len(endpoints) == 1 { + fmt.Fprintf(buf, "Endpoint = %s", endpoints[0]) + } else if allEndpoints { + // TODO(apenwarr): This mode is incompatible. + // Normal wireguard clients don't know how to + // parse it (yet?) + fmt.Fprintf(buf, "Endpoint = %s", strings.Join(endpoints, ",")) + } else { + fmt.Fprintf(buf, "Endpoint = %s # other endpoints: %s", + endpoints[0], + strings.Join(endpoints[1:], ", ")) + } + buf.WriteByte('\n') } - buf.WriteByte('\n') } var aips []string for _, allowedIP := range peer.AllowedIPs { diff --git a/ipn/local.go b/ipn/local.go index a940ca4ce..3ba92186a 100644 --- a/ipn/local.go +++ b/ipn/local.go @@ -8,8 +8,6 @@ import ( "context" "errors" "fmt" - "os" - "strconv" "strings" "sync" "time" @@ -362,7 +360,7 @@ func (b *LocalBackend) Start(opts Options) error { b.updateFilter(nil) var discoPublic tailcfg.DiscoKey - if useDisco, _ := strconv.ParseBool(os.Getenv("TS_DEBUG_USE_DISCO")); useDisco { + if controlclient.Debug.Disco { discoPrivate := key.NewPrivate() b.e.SetDiscoPrivateKey(discoPrivate) discoPublic = tailcfg.DiscoKey(discoPrivate.Public()) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index a567c463c..be0183ec3 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -28,6 +28,7 @@ import ( "github.com/tailscale/wireguard-go/conn" "github.com/tailscale/wireguard-go/device" "github.com/tailscale/wireguard-go/wgcfg" + "go4.org/mem" "golang.org/x/crypto/nacl/box" "golang.org/x/time/rate" "inet.af/netaddr" @@ -80,8 +81,8 @@ type Conn struct { // ============================================================ mu sync.Mutex // guards all following fields - started bool - closed bool + started bool // Start was called + closed bool // Close was called endpointsUpdateWaiter *sync.Cond endpointsUpdateActive bool @@ -90,9 +91,11 @@ type Conn struct { peerSet map[key.Public]struct{} discoPrivate key.Private - nodeOfDisco map[tailcfg.DiscoKey]tailcfg.NodeKey + nodeOfDisco map[tailcfg.DiscoKey]*tailcfg.Node discoOfNode map[tailcfg.NodeKey]tailcfg.DiscoKey + endpointOfDisco map[tailcfg.DiscoKey]*discoEndpoint + // addrsByUDP is a map of every remote ip:port to a priority // list of endpoint addresses for a peer. // The priority list is provided by wgengine configuration. @@ -239,13 +242,14 @@ func (o *Options) endpointsFunc() func([]string) { // of NewConn. Mostly for tests. func newConn() *Conn { c := &Conn{ - sendLogLimit: rate.NewLimiter(rate.Every(1*time.Minute), 1), - addrsByUDP: make(map[netaddr.IPPort]*AddrSet), - addrsByKey: make(map[key.Public]*AddrSet), - derpRecvCh: make(chan derpReadResult), - udpRecvCh: make(chan udpReadResult), - derpStarted: make(chan struct{}), - peerLastDerp: make(map[key.Public]int), + sendLogLimit: rate.NewLimiter(rate.Every(1*time.Minute), 1), + addrsByUDP: make(map[netaddr.IPPort]*AddrSet), + addrsByKey: make(map[key.Public]*AddrSet), + derpRecvCh: make(chan derpReadResult), + udpRecvCh: make(chan udpReadResult), + derpStarted: make(chan struct{}), + peerLastDerp: make(map[key.Public]int), + endpointOfDisco: make(map[tailcfg.DiscoKey]*discoEndpoint), } c.endpointsUpdateWaiter = sync.NewCond(&c.mu) return c @@ -732,6 +736,8 @@ func (c *Conn) Send(b []byte, ep conn.Endpoint) error { switch v := ep.(type) { default: panic(fmt.Sprintf("[unexpected] Endpoint type %T", v)) + case *discoEndpoint: + return v.send(b) case *singleEndpoint: addr := (*net.UDPAddr)(v) if addr.IP.Equal(derpMagicIP) { @@ -1179,14 +1185,25 @@ func (c *Conn) awaitUDP4(b []byte) { } return } +} +// wgRecvAddr conditionally alters the returned UDPAddr we tell +// wireguard-go we received a packet from. For peers with discovery +// keys, we always use the same one, a unique synthetic value created +// per peer. +func wgRecvAddr(e conn.Endpoint, addr *net.UDPAddr) *net.UDPAddr { + if de, ok := e.(*discoEndpoint); ok { + return de.fakeWGAddr + } + return addr } func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr, err error) { // First, process any buffered packet from earlier. if addr := c.bufferedIPv4From; addr != nil { c.bufferedIPv4From = nil - return copy(b, c.bufferedIPv4Packet), c.findEndpoint(addr), addr, nil + ep := c.findEndpoint(addr) + return copy(b, c.bufferedIPv4Packet), ep, wgRecvAddr(ep, addr), nil } go c.awaitUDP4(b) @@ -1196,6 +1213,7 @@ func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr // completed a successful receive on udpRecvCh. var addrSet *AddrSet + var discoEp *discoEndpoint select { case dm := <-c.derpRecvCh: @@ -1229,10 +1247,15 @@ func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr } c.mu.Lock() - addrSet = c.addrsByKey[dm.src] + if dk, ok := c.discoOfNode[tailcfg.NodeKey(dm.src)]; ok { + discoEp = c.endpointOfDisco[dk] + } + if discoEp == nil { + addrSet = c.addrsByKey[dm.src] + } c.mu.Unlock() - if addrSet == nil { + if addrSet == nil && discoEp == nil { key := wgcfg.Key(dm.src) c.logf("magicsock: DERP packet from unknown key: %s", key.ShortString()) } @@ -1259,10 +1282,12 @@ func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr if addrSet != nil { ep = addrSet + } else if discoEp != nil { + ep = discoEp } else { ep = c.findEndpoint(addr) } - return n, ep, addr, nil + return n, ep, wgRecvAddr(ep, addr), nil } func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, *net.UDPAddr, error) { @@ -1280,7 +1305,7 @@ func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, *net.UDPAddr, error) { continue } ep := c.findEndpoint(addr) - return n, ep, addr, nil + return n, ep, wgRecvAddr(ep, addr), nil } } @@ -1310,7 +1335,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, addr *net.UDPAddr) bool { return false } - senderNodeKey, ok := c.nodeOfDisco[sender] + senderNode, ok := c.nodeOfDisco[sender] if !ok { // Returning false keeps passing it down, to WireGuard. // WireGuard will almost surely reject it, but give it a chance. @@ -1325,11 +1350,11 @@ func (c *Conn) handleDiscoMessage(msg []byte, addr *net.UDPAddr) bool { sealedBox := msg[headerLen:] payload, ok := box.Open(nil, sealedBox, &nonce, key.Public(sender).B32(), c.discoPrivate.B32()) if !ok { - c.logf("magicsock: failed to open disco message box purportedly from %s (disco key %x)", senderNodeKey.ShortString(), sender[:]) + c.logf("magicsock: failed to open disco message box purportedly from %s (disco key %x)", senderNode.Key.ShortString(), sender[:]) return false } - c.logf("magicsock: got disco message from %s: %x (%q)", senderNodeKey.ShortString(), payload, payload) + c.logf("magicsock: got disco message from %s: %x (%q)", senderNode.Key.ShortString(), payload, payload) return true } @@ -1427,8 +1452,12 @@ func (c *Conn) SetNetworkMap(nm *controlclient.NetworkMap) { numDisco := 0 for _, n := range nm.Peers { - if !n.DiscoKey.IsZero() { - numDisco++ + if n.DiscoKey.IsZero() { + continue + } + numDisco++ + if ep, ok := c.endpointOfDisco[n.DiscoKey]; ok { + ep.updateFromNode(n) } } @@ -1439,12 +1468,12 @@ func (c *Conn) SetNetworkMap(nm *controlclient.NetworkMap) { // the set of discokeys changed. for pass := 1; pass <= 2; pass++ { if c.nodeOfDisco == nil || pass == 2 { - c.nodeOfDisco = map[tailcfg.DiscoKey]tailcfg.NodeKey{} + c.nodeOfDisco = map[tailcfg.DiscoKey]*tailcfg.Node{} c.discoOfNode = map[tailcfg.NodeKey]tailcfg.DiscoKey{} } for _, n := range nm.Peers { if !n.DiscoKey.IsZero() { - c.nodeOfDisco[n.DiscoKey] = n.Key + c.nodeOfDisco[n.DiscoKey] = n if old, ok := c.discoOfNode[n.Key]; ok && old != n.DiscoKey { c.logf("magicsock: node %s changed discovery key from %x to %x", n.Key.ShortString(), old[:8], n.DiscoKey[:8]) // TODO: reset AddrSet states, reset wireguard session key, etc. @@ -1457,6 +1486,14 @@ func (c *Conn) SetNetworkMap(nm *controlclient.NetworkMap) { } } + // Clean c.endpointOfDisco for discovery keys that are no longer present. + for dk, de := range c.endpointOfDisco { + if _, ok := c.nodeOfDisco[dk]; !ok { + de.cleanup() + delete(c.endpointOfDisco, dk) + } + } + } func (c *Conn) wantDerpLocked() bool { return c.derpMap != nil } @@ -1776,9 +1813,15 @@ func (c *Conn) resetAddrSetStates() { as.curAddr = -1 as.stopSpray = as.timeNow().Add(sprayPeriod) } + for _, de := range c.endpointOfDisco { + de.noteConnectivityChange() + } } // AddrSet is a set of UDP addresses that implements wireguard/conn.Endpoint. +// +// This is the legacy endpoint for peers that don't support discovery; +// it predates discoEndpoint. type AddrSet struct { publicKey key.Public // peer public key used for DERP communication @@ -2018,11 +2061,39 @@ func (c *Conn) CreateBind(uint16) (conn.Bind, uint16, error) { } // CreateEndpoint is called by WireGuard to connect to an endpoint. -// The key is the public key of the peer and addrs is a -// comma-separated list of UDP ip:ports. +// +// The key is the public key of the peer and addrs is either: +// +// 1) a comma-separated list of UDP ip:ports (the the peer doesn't have a discovery key) +// 2) ".disco.tailscale:12345", a magic value that means the peer +// is running code that supports active discovery, so CreateEndpoint returns +// a discoEndpoint. +// + func (c *Conn) CreateEndpoint(pubKey [32]byte, addrs string) (conn.Endpoint, error) { pk := key.Public(pubKey) c.logf("magicsock: CreateEndpoint: key=%s: %s", pk.ShortString(), strings.ReplaceAll(addrs, "127.3.3.40:", "derp-")) + + if strings.HasSuffix(addrs, controlclient.EndpointDiscoSuffix) { + discoHex := strings.TrimSuffix(addrs, controlclient.EndpointDiscoSuffix) + discoKey, err := key.NewPublicFromHexMem(mem.S(discoHex)) + if err != nil { + return nil, fmt.Errorf("magicsock: invalid discokey endpoint %q for %v: %w", addrs, pk.ShortString(), err) + } + c.mu.Lock() + defer c.mu.Unlock() + de := &discoEndpoint{ + c: c, + publicKey: pk, // peer public key (for WireGuard + DERP) + discoKey: tailcfg.DiscoKey(discoKey), // for discovery mesages + wgEndpointHostPort: addrs, + } + de.initFakeUDPAddr() + de.updateFromNode(c.nodeOfDisco[de.discoKey]) + c.endpointOfDisco[de.discoKey] = de + return de, nil + } + a := &AddrSet{ Logf: c.logf, publicKey: pk, @@ -2075,6 +2146,9 @@ func (c *Conn) CreateEndpoint(pubKey [32]byte, addrs string) (conn.Endpoint, err return a, nil } +// singleEndpoint is a wireguard-go/conn.Endpoint used for "roaming +// addressed" in releases of Tailscale that predate discovery +// messages. New peers use discoEndpoint. type singleEndpoint net.UDPAddr func (e *singleEndpoint) ClearSrc() {} @@ -2250,3 +2324,112 @@ func udpAddrDebugString(ua net.UDPAddr) string { } return ua.String() } + +// discoEndpoint is a wireguard/conn.Endpoint for new-style peers that +// advertise a DiscoKey and participate in active discovery. +type discoEndpoint struct { + c *Conn + publicKey key.Public // peer public key (for WireGuard + DERP) + discoKey tailcfg.DiscoKey // for discovery mesages + fakeWGAddr *net.UDPAddr // the UDPAddr we tell wireguard-go we're using + wgEndpointHostPort string // string from CreateEndpoint: ".disco.tailscale:12345" + + mu sync.Mutex // Lock ordering: Conn.mu, then discoEndpoint.mu + derpAddr *net.UDPAddr +} + +// initFakeUDPAddr populates fakeWGAddr with a globally unique fake UDPAddr. +// The current implementation just uses the pointer value of de jammed into an IPv6 +// address, but it could also be, say, a counter. +func (de *discoEndpoint) initFakeUDPAddr() { + var addr [16]byte + addr[0] = 0xfd + addr[1] = 0x00 + binary.BigEndian.PutUint64(addr[2:], uint64(reflect.ValueOf(de).Pointer())) + ipp := netaddr.IPPort{ + IP: netaddr.IPFrom16(addr), + Port: 12345, + } + de.fakeWGAddr = ipp.UDPAddr() +} + +func (de *discoEndpoint) Addrs() []wgcfg.Endpoint { + // This has to be the same string that was passed to + // CreateEndpoint, otherwise Reconfig will end up recreating + // Endpoints and losing state over time. + host, portStr, err := net.SplitHostPort(de.wgEndpointHostPort) + if err != nil { + panic(err) + } + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + panic(err) + } + return []wgcfg.Endpoint{{host, uint16(port)}} +} + +func (de *discoEndpoint) ClearSrc() {} +func (de *discoEndpoint) SrcToString() string { panic("unused") } // unused by wireguard-go +func (de *discoEndpoint) SrcIP() net.IP { panic("unused") } // unused by wireguard-go +func (de *discoEndpoint) DstToString() string { return de.wgEndpointHostPort } +func (de *discoEndpoint) DstIP() net.IP { panic("unused") } +func (de *discoEndpoint) DstToBytes() []byte { return de.fakeWGAddr.IP[:] } +func (de *discoEndpoint) UpdateDst(addr *net.UDPAddr) error { + // This is called ~per packet (and requiring a mutex acquisition inside wireguard-go). + // TODO(bradfitz): make that cheaper and/or remove it. We don't need it. + return nil +} + +func (de *discoEndpoint) send(b []byte) error { + // TODO: all the disco messaging & state tracking & spraying, + // bringing over relevant AddrSet code. For now, just do DERP + // as a crutch while I work on other bits. + de.mu.Lock() + derpAddr := de.derpAddr + de.mu.Unlock() + + if derpAddr == nil { + return errors.New("no DERP addr") + } + return de.c.sendAddr(derpAddr, de.publicKey, b) +} + +func (de *discoEndpoint) updateFromNode(n *tailcfg.Node) { + if n == nil { + // TODO: log, error, count? if this even happens. + return + } + de.mu.Lock() + defer de.mu.Unlock() + + if n.DERP == "" { + de.derpAddr = nil + } else { + // TODO: add ParseIPPort to netaddr package; only safe to use ResolveUDPAddr + // here because we know no DNS lookups are involved + ua, _ := net.ResolveUDPAddr("udp", n.DERP) + de.derpAddr = ua + } + + // TODO: parse all the endpoints, not just DERP +} + +// noteConnectivityChange is called when connectivity changes enough +// that we should question our earlier assumptions about which paths +// work. +func (de *discoEndpoint) noteConnectivityChange() { + de.mu.Lock() + defer de.mu.Unlock() + + // TODO: reset state +} + +// cleanup is called when a discovery endpoint is no longer present in the NetworkMap. +// This is where we can do cleanup such as closing goroutines or canceling timers. +func (de *discoEndpoint) cleanup() { + de.mu.Lock() + defer de.mu.Unlock() + + // TODO: real work later, when there's stuff to do + de.c.logf("magicsock: doing cleanup for discovery key %x", de.discoKey[:]) +} diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 6f83af9a4..174be429e 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -844,8 +844,8 @@ func TestDiscoMessage(t *testing.T) { c := &Conn{ logf: t.Logf, discoPrivate: key.NewPrivate(), - nodeOfDisco: map[tailcfg.DiscoKey]tailcfg.NodeKey{ - tailcfg.DiscoKey(peer1Pub): tailcfg.NodeKey{1: 1}, + nodeOfDisco: map[tailcfg.DiscoKey]*tailcfg.Node{ + tailcfg.DiscoKey(peer1Pub): &tailcfg.Node{Key: tailcfg.NodeKey{1: 1}}, }, }