944 lines
25 KiB
Go
944 lines
25 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package magicsock
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"math/rand"
|
|
"net"
|
|
"net/netip"
|
|
"reflect"
|
|
"runtime"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/tailscale/wireguard-go/conn"
|
|
"tailscale.com/derp"
|
|
"tailscale.com/derp/derphttp"
|
|
"tailscale.com/health"
|
|
"tailscale.com/logtail/backoff"
|
|
"tailscale.com/net/dnscache"
|
|
"tailscale.com/net/tsaddr"
|
|
"tailscale.com/syncs"
|
|
"tailscale.com/tailcfg"
|
|
"tailscale.com/types/key"
|
|
"tailscale.com/types/logger"
|
|
"tailscale.com/util/mak"
|
|
"tailscale.com/util/sysresources"
|
|
)
|
|
|
|
// useDerpRoute reports whether magicsock should enable the DERP
|
|
// return path optimization (Issue 150).
|
|
//
|
|
// By default it's enabled, unless an environment variable
|
|
// or control says to disable it.
|
|
func (c *Conn) useDerpRoute() bool {
|
|
if b, ok := debugUseDerpRoute().Get(); ok {
|
|
return b
|
|
}
|
|
return c.controlKnobs == nil || !c.controlKnobs.DisableDRPO.Load()
|
|
}
|
|
|
|
// derpRoute is a route entry for a public key, saying that a certain
|
|
// peer should be available at DERP node derpID, as long as the
|
|
// current connection for that derpID is dc. (but dc should not be
|
|
// used to write directly; it's owned by the read/write loops)
|
|
type derpRoute struct {
|
|
derpID int
|
|
dc *derphttp.Client // don't use directly; see comment above
|
|
}
|
|
|
|
// removeDerpPeerRoute removes a DERP route entry previously added by addDerpPeerRoute.
|
|
func (c *Conn) removeDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
r2 := derpRoute{derpID, dc}
|
|
if r, ok := c.derpRoute[peer]; ok && r == r2 {
|
|
delete(c.derpRoute, peer)
|
|
}
|
|
}
|
|
|
|
// addDerpPeerRoute adds a DERP route entry, noting that peer was seen
|
|
// on DERP node derpID, at least on the connection identified by dc.
|
|
// See issue 150 for details.
|
|
func (c *Conn) addDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
mak.Set(&c.derpRoute, peer, derpRoute{derpID, dc})
|
|
}
|
|
|
|
// activeDerp contains fields for an active DERP connection.
|
|
type activeDerp struct {
|
|
c *derphttp.Client
|
|
cancel context.CancelFunc
|
|
writeCh chan<- derpWriteRequest
|
|
// lastWrite is the time of the last request for its write
|
|
// channel (currently even if there was no write).
|
|
// It is always non-nil and initialized to a non-zero Time.
|
|
lastWrite *time.Time
|
|
createTime time.Time
|
|
}
|
|
|
|
var processStartUnixNano = time.Now().UnixNano()
|
|
|
|
// pickDERPFallback returns a non-zero but deterministic DERP node to
|
|
// connect to. This is only used if netcheck couldn't find the
|
|
// nearest one (for instance, if UDP is blocked and thus STUN latency
|
|
// checks aren't working).
|
|
//
|
|
// c.mu must NOT be held.
|
|
func (c *Conn) pickDERPFallback() int {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if !c.wantDerpLocked() {
|
|
return 0
|
|
}
|
|
ids := c.derpMap.RegionIDs()
|
|
if len(ids) == 0 {
|
|
// No DERP regions in non-nil map.
|
|
return 0
|
|
}
|
|
|
|
// TODO: figure out which DERP region most of our peers are using,
|
|
// and use that region as our fallback.
|
|
//
|
|
// If we already had selected something in the past and it has any
|
|
// peers, we want to stay on it. If there are no peers at all,
|
|
// stay on whatever DERP we previously picked. If we need to pick
|
|
// one and have no peer info, pick a region randomly.
|
|
//
|
|
// We used to do the above for legacy clients, but never updated
|
|
// it for disco.
|
|
|
|
if c.myDerp != 0 {
|
|
return c.myDerp
|
|
}
|
|
|
|
h := fnv.New64()
|
|
fmt.Fprintf(h, "%p/%d", c, processStartUnixNano) // arbitrary
|
|
return ids[rand.New(rand.NewSource(int64(h.Sum64()))).Intn(len(ids))]
|
|
}
|
|
|
|
func (c *Conn) derpRegionCodeLocked(regionID int) string {
|
|
if c.derpMap == nil {
|
|
return ""
|
|
}
|
|
if dr, ok := c.derpMap.Regions[regionID]; ok {
|
|
return dr.RegionCode
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// c.mu must NOT be held.
|
|
func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if !c.wantDerpLocked() {
|
|
c.myDerp = 0
|
|
health.SetMagicSockDERPHome(0, c.homeless)
|
|
return false
|
|
}
|
|
if c.homeless {
|
|
c.myDerp = 0
|
|
health.SetMagicSockDERPHome(0, c.homeless)
|
|
return false
|
|
}
|
|
if derpNum == c.myDerp {
|
|
// No change.
|
|
return true
|
|
}
|
|
if c.myDerp != 0 && derpNum != 0 {
|
|
metricDERPHomeChange.Add(1)
|
|
}
|
|
c.myDerp = derpNum
|
|
health.SetMagicSockDERPHome(derpNum, c.homeless)
|
|
|
|
if c.privateKey.IsZero() {
|
|
// No private key yet, so DERP connections won't come up anyway.
|
|
// Return early rather than ultimately log a couple lines of noise.
|
|
return true
|
|
}
|
|
|
|
// On change, notify all currently connected DERP servers and
|
|
// start connecting to our home DERP if we are not already.
|
|
dr := c.derpMap.Regions[derpNum]
|
|
if dr == nil {
|
|
c.logf("[unexpected] magicsock: derpMap.Regions[%v] is nil", derpNum)
|
|
} else {
|
|
c.logf("magicsock: home is now derp-%v (%v)", derpNum, c.derpMap.Regions[derpNum].RegionCode)
|
|
}
|
|
for i, ad := range c.activeDerp {
|
|
go ad.c.NotePreferred(i == c.myDerp)
|
|
}
|
|
c.goDerpConnect(derpNum)
|
|
return true
|
|
}
|
|
|
|
// startDerpHomeConnectLocked starts connecting to our DERP home, if any.
|
|
//
|
|
// c.mu must be held.
|
|
func (c *Conn) startDerpHomeConnectLocked() {
|
|
c.goDerpConnect(c.myDerp)
|
|
}
|
|
|
|
// goDerpConnect starts a goroutine to start connecting to the given
|
|
// DERP node.
|
|
//
|
|
// c.mu may be held, but does not need to be.
|
|
func (c *Conn) goDerpConnect(node int) {
|
|
if node == 0 {
|
|
return
|
|
}
|
|
go c.derpWriteChanOfAddr(netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(node)), key.NodePublic{})
|
|
}
|
|
|
|
var (
|
|
bufferedDerpWrites int
|
|
bufferedDerpWritesOnce sync.Once
|
|
)
|
|
|
|
// bufferedDerpWritesBeforeDrop returns how many packets writes can be queued
|
|
// up the DERP client to write on the wire before we start dropping.
|
|
func bufferedDerpWritesBeforeDrop() int {
|
|
// For mobile devices, always return the previous minimum value of 32;
|
|
// we can do this outside the sync.Once to avoid that overhead.
|
|
if runtime.GOOS == "ios" || runtime.GOOS == "android" {
|
|
return 32
|
|
}
|
|
|
|
bufferedDerpWritesOnce.Do(func() {
|
|
// Some rough sizing: for the previous fixed value of 32, the
|
|
// total consumed memory can be:
|
|
// = numDerpRegions * messages/region * sizeof(message)
|
|
//
|
|
// For sake of this calculation, assume 100 DERP regions; at
|
|
// time of writing (2023-04-03), we have 24.
|
|
//
|
|
// A reasonable upper bound for the worst-case average size of
|
|
// a message is a *disco.CallMeMaybe message with 16 endpoints;
|
|
// since sizeof(netip.AddrPort) = 32, that's 512 bytes. Thus:
|
|
// = 100 * 32 * 512
|
|
// = 1638400 (1.6MiB)
|
|
//
|
|
// On a reasonably-small node with 4GiB of memory that's
|
|
// connected to each region and handling a lot of load, 1.6MiB
|
|
// is about 0.04% of the total system memory.
|
|
//
|
|
// For sake of this calculation, then, let's double that memory
|
|
// usage to 0.08% and scale based on total system memory.
|
|
//
|
|
// For a 16GiB Linux box, this should buffer just over 256
|
|
// messages.
|
|
systemMemory := sysresources.TotalMemory()
|
|
memoryUsable := float64(systemMemory) * 0.0008
|
|
|
|
const (
|
|
theoreticalDERPRegions = 100
|
|
messageMaximumSizeBytes = 512
|
|
)
|
|
bufferedDerpWrites = int(memoryUsable / (theoreticalDERPRegions * messageMaximumSizeBytes))
|
|
|
|
// Never drop below the previous minimum value.
|
|
if bufferedDerpWrites < 32 {
|
|
bufferedDerpWrites = 32
|
|
}
|
|
})
|
|
return bufferedDerpWrites
|
|
}
|
|
|
|
// derpWriteChanOfAddr returns a DERP client for fake UDP addresses that
|
|
// represent DERP servers, creating them as necessary. For real UDP
|
|
// addresses, it returns nil.
|
|
//
|
|
// If peer is non-zero, it can be used to find an active reverse
|
|
// path, without using addr.
|
|
func (c *Conn) derpWriteChanOfAddr(addr netip.AddrPort, peer key.NodePublic) chan<- derpWriteRequest {
|
|
if addr.Addr() != tailcfg.DerpMagicIPAddr {
|
|
return nil
|
|
}
|
|
regionID := int(addr.Port())
|
|
|
|
if c.networkDown() {
|
|
return nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if !c.wantDerpLocked() || c.closed {
|
|
return nil
|
|
}
|
|
if c.derpMap == nil || c.derpMap.Regions[regionID] == nil {
|
|
return nil
|
|
}
|
|
if c.privateKey.IsZero() {
|
|
c.logf("magicsock: DERP lookup of %v with no private key; ignoring", addr)
|
|
return nil
|
|
}
|
|
|
|
// See if we have a connection open to that DERP node ID
|
|
// first. If so, might as well use it. (It's a little
|
|
// arbitrary whether we use this one vs. the reverse route
|
|
// below when we have both.)
|
|
ad, ok := c.activeDerp[regionID]
|
|
if ok {
|
|
*ad.lastWrite = time.Now()
|
|
c.setPeerLastDerpLocked(peer, regionID, regionID)
|
|
return ad.writeCh
|
|
}
|
|
|
|
// If we don't have an open connection to the peer's home DERP
|
|
// node, see if we have an open connection to a DERP node
|
|
// where we'd heard from that peer already. For instance,
|
|
// perhaps peer's home is Frankfurt, but they dialed our home DERP
|
|
// node in SF to reach us, so we can reply to them using our
|
|
// SF connection rather than dialing Frankfurt. (Issue 150)
|
|
if !peer.IsZero() && c.useDerpRoute() {
|
|
if r, ok := c.derpRoute[peer]; ok {
|
|
if ad, ok := c.activeDerp[r.derpID]; ok && ad.c == r.dc {
|
|
c.setPeerLastDerpLocked(peer, r.derpID, regionID)
|
|
*ad.lastWrite = time.Now()
|
|
return ad.writeCh
|
|
}
|
|
}
|
|
}
|
|
|
|
why := "home-keep-alive"
|
|
if !peer.IsZero() {
|
|
why = peer.ShortString()
|
|
}
|
|
c.logf("magicsock: adding connection to derp-%v for %v", regionID, why)
|
|
|
|
firstDerp := false
|
|
if c.activeDerp == nil {
|
|
firstDerp = true
|
|
c.activeDerp = make(map[int]activeDerp)
|
|
c.prevDerp = make(map[int]*syncs.WaitGroupChan)
|
|
}
|
|
|
|
// Note that derphttp.NewRegionClient does not dial the server
|
|
// (it doesn't block) so it is safe to do under the c.mu lock.
|
|
dc := derphttp.NewRegionClient(c.privateKey, c.logf, c.netMon, func() *tailcfg.DERPRegion {
|
|
// Warning: it is not legal to acquire
|
|
// magicsock.Conn.mu from this callback.
|
|
// It's run from derphttp.Client.connect (via Send, etc)
|
|
// and the lock ordering rules are that magicsock.Conn.mu
|
|
// must be acquired before derphttp.Client.mu.
|
|
// See https://github.com/tailscale/tailscale/issues/3726
|
|
if c.connCtx.Err() != nil {
|
|
// We're closing anyway; return nil to stop dialing.
|
|
return nil
|
|
}
|
|
derpMap := c.derpMapAtomic.Load()
|
|
if derpMap == nil {
|
|
return nil
|
|
}
|
|
return derpMap.Regions[regionID]
|
|
})
|
|
|
|
dc.SetCanAckPings(true)
|
|
dc.NotePreferred(c.myDerp == regionID)
|
|
dc.SetAddressFamilySelector(derpAddrFamSelector{c})
|
|
dc.DNSCache = dnscache.Get()
|
|
|
|
ctx, cancel := context.WithCancel(c.connCtx)
|
|
ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop())
|
|
|
|
ad.c = dc
|
|
ad.writeCh = ch
|
|
ad.cancel = cancel
|
|
ad.lastWrite = new(time.Time)
|
|
*ad.lastWrite = time.Now()
|
|
ad.createTime = time.Now()
|
|
c.activeDerp[regionID] = ad
|
|
metricNumDERPConns.Set(int64(len(c.activeDerp)))
|
|
c.logActiveDerpLocked()
|
|
c.setPeerLastDerpLocked(peer, regionID, regionID)
|
|
c.scheduleCleanStaleDerpLocked()
|
|
|
|
// Build a startGate for the derp reader+writer
|
|
// goroutines, so they don't start running until any
|
|
// previous generation is closed.
|
|
startGate := syncs.ClosedChan()
|
|
if prev := c.prevDerp[regionID]; prev != nil {
|
|
startGate = prev.DoneChan()
|
|
}
|
|
// And register a WaitGroup(Chan) for this generation.
|
|
wg := syncs.NewWaitGroupChan()
|
|
wg.Add(2)
|
|
c.prevDerp[regionID] = wg
|
|
|
|
if firstDerp {
|
|
startGate = c.derpStarted
|
|
go func() {
|
|
dc.Connect(ctx)
|
|
close(c.derpStarted)
|
|
c.muCond.Broadcast()
|
|
}()
|
|
}
|
|
|
|
go c.runDerpReader(ctx, addr, dc, wg, startGate)
|
|
go c.runDerpWriter(ctx, dc, ch, wg, startGate)
|
|
go c.derpActiveFunc()
|
|
|
|
return ad.writeCh
|
|
}
|
|
|
|
// setPeerLastDerpLocked notes that peer is now being written to via
|
|
// the provided DERP regionID, and that the peer advertises a DERP
|
|
// home region ID of homeID.
|
|
//
|
|
// If there's any change, it logs.
|
|
//
|
|
// c.mu must be held.
|
|
func (c *Conn) setPeerLastDerpLocked(peer key.NodePublic, regionID, homeID int) {
|
|
if peer.IsZero() {
|
|
return
|
|
}
|
|
old := c.peerLastDerp[peer]
|
|
if old == regionID {
|
|
return
|
|
}
|
|
c.peerLastDerp[peer] = regionID
|
|
|
|
var newDesc string
|
|
switch {
|
|
case regionID == homeID && regionID == c.myDerp:
|
|
newDesc = "shared home"
|
|
case regionID == homeID:
|
|
newDesc = "their home"
|
|
case regionID == c.myDerp:
|
|
newDesc = "our home"
|
|
case regionID != homeID:
|
|
newDesc = "alt"
|
|
}
|
|
if old == 0 {
|
|
c.logf("[v1] magicsock: derp route for %s set to derp-%d (%s)", peer.ShortString(), regionID, newDesc)
|
|
} else {
|
|
c.logf("[v1] magicsock: derp route for %s changed from derp-%d => derp-%d (%s)", peer.ShortString(), old, regionID, newDesc)
|
|
}
|
|
}
|
|
|
|
// derpReadResult is the type sent by runDerpClient to ReceiveIPv4
|
|
// when a DERP packet is available.
|
|
//
|
|
// Notably, it doesn't include the derp.ReceivedPacket because we
|
|
// don't want to give the receiver access to the aliased []byte. To
|
|
// get at the packet contents they need to call copyBuf to copy it
|
|
// out, which also releases the buffer.
|
|
type derpReadResult struct {
|
|
regionID int
|
|
n int // length of data received
|
|
src key.NodePublic
|
|
// copyBuf is called to copy the data to dst. It returns how
|
|
// much data was copied, which will be n if dst is large
|
|
// enough. copyBuf can only be called once.
|
|
// If copyBuf is nil, that's a signal from the sender to ignore
|
|
// this message.
|
|
copyBuf func(dst []byte) int
|
|
}
|
|
|
|
// runDerpReader runs in a goroutine for the life of a DERP
|
|
// connection, handling received packets.
|
|
func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netip.AddrPort, dc *derphttp.Client, wg *syncs.WaitGroupChan, startGate <-chan struct{}) {
|
|
defer wg.Decr()
|
|
defer dc.Close()
|
|
|
|
select {
|
|
case <-startGate:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
didCopy := make(chan struct{}, 1)
|
|
regionID := int(derpFakeAddr.Port())
|
|
res := derpReadResult{regionID: regionID}
|
|
var pkt derp.ReceivedPacket
|
|
res.copyBuf = func(dst []byte) int {
|
|
n := copy(dst, pkt.Data)
|
|
didCopy <- struct{}{}
|
|
return n
|
|
}
|
|
|
|
defer health.SetDERPRegionConnectedState(regionID, false)
|
|
defer health.SetDERPRegionHealth(regionID, "")
|
|
|
|
// peerPresent is the set of senders we know are present on this
|
|
// connection, based on messages we've received from the server.
|
|
peerPresent := map[key.NodePublic]bool{}
|
|
bo := backoff.NewBackoff(fmt.Sprintf("derp-%d", regionID), c.logf, 5*time.Second)
|
|
var lastPacketTime time.Time
|
|
var lastPacketSrc key.NodePublic
|
|
|
|
for {
|
|
msg, connGen, err := dc.RecvDetail()
|
|
if err != nil {
|
|
health.SetDERPRegionConnectedState(regionID, false)
|
|
// Forget that all these peers have routes.
|
|
for peer := range peerPresent {
|
|
delete(peerPresent, peer)
|
|
c.removeDerpPeerRoute(peer, regionID, dc)
|
|
}
|
|
if err == derphttp.ErrClientClosed {
|
|
return
|
|
}
|
|
if c.networkDown() {
|
|
c.logf("[v1] magicsock: derp.Recv(derp-%d): network down, closing", regionID)
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
c.logf("magicsock: [%p] derp.Recv(derp-%d): %v", dc, regionID, err)
|
|
|
|
// If our DERP connection broke, it might be because our network
|
|
// conditions changed. Start that check.
|
|
c.ReSTUN("derp-recv-error")
|
|
|
|
// Back off a bit before reconnecting.
|
|
bo.BackOff(ctx, err)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
continue
|
|
}
|
|
bo.BackOff(ctx, nil) // reset
|
|
|
|
now := time.Now()
|
|
if lastPacketTime.IsZero() || now.Sub(lastPacketTime) > 5*time.Second {
|
|
health.NoteDERPRegionReceivedFrame(regionID)
|
|
lastPacketTime = now
|
|
}
|
|
|
|
switch m := msg.(type) {
|
|
case derp.ServerInfoMessage:
|
|
health.SetDERPRegionConnectedState(regionID, true)
|
|
health.SetDERPRegionHealth(regionID, "") // until declared otherwise
|
|
c.logf("magicsock: derp-%d connected; connGen=%v", regionID, connGen)
|
|
continue
|
|
case derp.ReceivedPacket:
|
|
pkt = m
|
|
res.n = len(m.Data)
|
|
res.src = m.Source
|
|
if logDerpVerbose() {
|
|
c.logf("magicsock: got derp-%v packet: %q", regionID, m.Data)
|
|
}
|
|
// If this is a new sender we hadn't seen before, remember it and
|
|
// register a route for this peer.
|
|
if res.src != lastPacketSrc { // avoid map lookup w/ high throughput single peer
|
|
lastPacketSrc = res.src
|
|
if _, ok := peerPresent[res.src]; !ok {
|
|
peerPresent[res.src] = true
|
|
c.addDerpPeerRoute(res.src, regionID, dc)
|
|
}
|
|
}
|
|
case derp.PingMessage:
|
|
// Best effort reply to the ping.
|
|
pingData := [8]byte(m)
|
|
go func() {
|
|
if err := dc.SendPong(pingData); err != nil {
|
|
c.logf("magicsock: derp-%d SendPong error: %v", regionID, err)
|
|
}
|
|
}()
|
|
continue
|
|
case derp.HealthMessage:
|
|
health.SetDERPRegionHealth(regionID, m.Problem)
|
|
case derp.PeerGoneMessage:
|
|
switch m.Reason {
|
|
case derp.PeerGoneReasonDisconnected:
|
|
// Do nothing.
|
|
case derp.PeerGoneReasonNotHere:
|
|
metricRecvDiscoDERPPeerNotHere.Add(1)
|
|
c.logf("[unexpected] magicsock: derp-%d does not know about peer %s, removing route",
|
|
regionID, key.NodePublic(m.Peer).ShortString())
|
|
default:
|
|
metricRecvDiscoDERPPeerGoneUnknown.Add(1)
|
|
c.logf("[unexpected] magicsock: derp-%d peer %s gone, reason %v, removing route",
|
|
regionID, key.NodePublic(m.Peer).ShortString(), m.Reason)
|
|
}
|
|
c.removeDerpPeerRoute(key.NodePublic(m.Peer), regionID, dc)
|
|
default:
|
|
// Ignore.
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case c.derpRecvCh <- res:
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-didCopy:
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
type derpWriteRequest struct {
|
|
addr netip.AddrPort
|
|
pubKey key.NodePublic
|
|
b []byte // copied; ownership passed to receiver
|
|
}
|
|
|
|
// runDerpWriter runs in a goroutine for the life of a DERP
|
|
// connection, handling received packets.
|
|
func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan derpWriteRequest, wg *syncs.WaitGroupChan, startGate <-chan struct{}) {
|
|
defer wg.Decr()
|
|
select {
|
|
case <-startGate:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case wr := <-ch:
|
|
err := dc.Send(wr.pubKey, wr.b)
|
|
if err != nil {
|
|
c.logf("magicsock: derp.Send(%v): %v", wr.addr, err)
|
|
metricSendDERPError.Add(1)
|
|
} else {
|
|
metricSendDERP.Add(1)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint) (int, error) {
|
|
health.ReceiveDERP.Enter()
|
|
defer health.ReceiveDERP.Exit()
|
|
|
|
for dm := range c.derpRecvCh {
|
|
if c.isClosed() {
|
|
break
|
|
}
|
|
n, ep := c.processDERPReadResult(dm, buffs[0])
|
|
if n == 0 {
|
|
// No data read occurred. Wait for another packet.
|
|
continue
|
|
}
|
|
metricRecvDataDERP.Add(1)
|
|
sizes[0] = n
|
|
eps[0] = ep
|
|
return 1, nil
|
|
}
|
|
return 0, net.ErrClosed
|
|
}
|
|
|
|
func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *endpoint) {
|
|
if dm.copyBuf == nil {
|
|
return 0, nil
|
|
}
|
|
var regionID int
|
|
n, regionID = dm.n, dm.regionID
|
|
ncopy := dm.copyBuf(b)
|
|
if ncopy != n {
|
|
err := fmt.Errorf("received DERP packet of length %d that's too big for WireGuard buf size %d", n, ncopy)
|
|
c.logf("magicsock: %v", err)
|
|
return 0, nil
|
|
}
|
|
|
|
ipp := netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(regionID))
|
|
if c.handleDiscoMessage(b[:n], ipp, dm.src, discoRXPathDERP) {
|
|
return 0, nil
|
|
}
|
|
|
|
var ok bool
|
|
c.mu.Lock()
|
|
ep, ok = c.peerMap.endpointForNodeKey(dm.src)
|
|
c.mu.Unlock()
|
|
if !ok {
|
|
// We don't know anything about this node key, nothing to
|
|
// record or process.
|
|
return 0, nil
|
|
}
|
|
|
|
ep.noteRecvActivity(ipp)
|
|
if stats := c.stats.Load(); stats != nil {
|
|
stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n)
|
|
}
|
|
return n, ep
|
|
}
|
|
|
|
// SetDERPMap controls which (if any) DERP servers are used.
|
|
// A nil value means to disable DERP; it's disabled by default.
|
|
func (c *Conn) SetDERPMap(dm *tailcfg.DERPMap) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
var derpAddr = debugUseDERPAddr()
|
|
if derpAddr != "" {
|
|
derpPort := 443
|
|
if debugUseDERPHTTP() {
|
|
// Match the port for -dev in derper.go
|
|
derpPort = 3340
|
|
}
|
|
dm = &tailcfg.DERPMap{
|
|
OmitDefaultRegions: true,
|
|
Regions: map[int]*tailcfg.DERPRegion{
|
|
999: {
|
|
RegionID: 999,
|
|
Nodes: []*tailcfg.DERPNode{{
|
|
Name: "999dev",
|
|
RegionID: 999,
|
|
HostName: derpAddr,
|
|
DERPPort: derpPort,
|
|
}},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
if reflect.DeepEqual(dm, c.derpMap) {
|
|
return
|
|
}
|
|
|
|
c.derpMapAtomic.Store(dm)
|
|
old := c.derpMap
|
|
c.derpMap = dm
|
|
if dm == nil {
|
|
c.closeAllDerpLocked("derp-disabled")
|
|
return
|
|
}
|
|
|
|
// Reconnect any DERP region that changed definitions.
|
|
if old != nil {
|
|
changes := false
|
|
for rid, oldDef := range old.Regions {
|
|
if reflect.DeepEqual(oldDef, dm.Regions[rid]) {
|
|
continue
|
|
}
|
|
changes = true
|
|
if rid == c.myDerp {
|
|
c.myDerp = 0
|
|
}
|
|
c.closeDerpLocked(rid, "derp-region-redefined")
|
|
}
|
|
if changes {
|
|
c.logActiveDerpLocked()
|
|
}
|
|
}
|
|
|
|
go c.ReSTUN("derp-map-update")
|
|
}
|
|
func (c *Conn) wantDerpLocked() bool { return c.derpMap != nil }
|
|
|
|
// c.mu must be held.
|
|
func (c *Conn) closeAllDerpLocked(why string) {
|
|
if len(c.activeDerp) == 0 {
|
|
return // without the useless log statement
|
|
}
|
|
for i := range c.activeDerp {
|
|
c.closeDerpLocked(i, why)
|
|
}
|
|
c.logActiveDerpLocked()
|
|
}
|
|
|
|
// DebugBreakDERPConns breaks all DERP connections for debug/testing reasons.
|
|
func (c *Conn) DebugBreakDERPConns() error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if len(c.activeDerp) == 0 {
|
|
c.logf("magicsock: DebugBreakDERPConns: no active DERP connections")
|
|
return nil
|
|
}
|
|
c.closeAllDerpLocked("debug-break-derp")
|
|
c.startDerpHomeConnectLocked()
|
|
return nil
|
|
}
|
|
|
|
// maybeCloseDERPsOnRebind, in response to a rebind, closes all
|
|
// DERP connections that don't have a local address in okayLocalIPs
|
|
// and pings all those that do.
|
|
func (c *Conn) maybeCloseDERPsOnRebind(okayLocalIPs []netip.Prefix) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
for regionID, ad := range c.activeDerp {
|
|
la, err := ad.c.LocalAddr()
|
|
if err != nil {
|
|
c.closeOrReconnectDERPLocked(regionID, "rebind-no-localaddr")
|
|
continue
|
|
}
|
|
if !tsaddr.PrefixesContainsIP(okayLocalIPs, la.Addr()) {
|
|
c.closeOrReconnectDERPLocked(regionID, "rebind-default-route-change")
|
|
continue
|
|
}
|
|
regionID := regionID
|
|
dc := ad.c
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
if err := dc.Ping(ctx); err != nil {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.closeOrReconnectDERPLocked(regionID, "rebind-ping-fail")
|
|
return
|
|
}
|
|
c.logf("post-rebind ping of DERP region %d okay", regionID)
|
|
}()
|
|
}
|
|
c.logActiveDerpLocked()
|
|
}
|
|
|
|
// closeOrReconnectDERPLocked closes the DERP connection to the
|
|
// provided regionID and starts reconnecting it if it's our current
|
|
// home DERP.
|
|
//
|
|
// why is a reason for logging.
|
|
//
|
|
// c.mu must be held.
|
|
func (c *Conn) closeOrReconnectDERPLocked(regionID int, why string) {
|
|
c.closeDerpLocked(regionID, why)
|
|
if !c.privateKey.IsZero() && c.myDerp == regionID {
|
|
c.startDerpHomeConnectLocked()
|
|
}
|
|
}
|
|
|
|
// c.mu must be held.
|
|
// It is the responsibility of the caller to call logActiveDerpLocked after any set of closes.
|
|
func (c *Conn) closeDerpLocked(regionID int, why string) {
|
|
if ad, ok := c.activeDerp[regionID]; ok {
|
|
c.logf("magicsock: closing connection to derp-%v (%v), age %v", regionID, why, time.Since(ad.createTime).Round(time.Second))
|
|
go ad.c.Close()
|
|
ad.cancel()
|
|
delete(c.activeDerp, regionID)
|
|
metricNumDERPConns.Set(int64(len(c.activeDerp)))
|
|
}
|
|
}
|
|
|
|
// c.mu must be held.
|
|
func (c *Conn) logActiveDerpLocked() {
|
|
now := time.Now()
|
|
c.logf("magicsock: %v active derp conns%s", len(c.activeDerp), logger.ArgWriter(func(buf *bufio.Writer) {
|
|
if len(c.activeDerp) == 0 {
|
|
return
|
|
}
|
|
buf.WriteString(":")
|
|
c.foreachActiveDerpSortedLocked(func(node int, ad activeDerp) {
|
|
fmt.Fprintf(buf, " derp-%d=cr%v,wr%v", node, simpleDur(now.Sub(ad.createTime)), simpleDur(now.Sub(*ad.lastWrite)))
|
|
})
|
|
}))
|
|
}
|
|
|
|
// c.mu must be held.
|
|
func (c *Conn) foreachActiveDerpSortedLocked(fn func(regionID int, ad activeDerp)) {
|
|
if len(c.activeDerp) < 2 {
|
|
for id, ad := range c.activeDerp {
|
|
fn(id, ad)
|
|
}
|
|
return
|
|
}
|
|
ids := make([]int, 0, len(c.activeDerp))
|
|
for id := range c.activeDerp {
|
|
ids = append(ids, id)
|
|
}
|
|
sort.Ints(ids)
|
|
for _, id := range ids {
|
|
fn(id, c.activeDerp[id])
|
|
}
|
|
}
|
|
|
|
func (c *Conn) cleanStaleDerp() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.closed {
|
|
return
|
|
}
|
|
c.derpCleanupTimerArmed = false
|
|
|
|
tooOld := time.Now().Add(-derpInactiveCleanupTime)
|
|
dirty := false
|
|
someNonHomeOpen := false
|
|
for i, ad := range c.activeDerp {
|
|
if i == c.myDerp {
|
|
continue
|
|
}
|
|
if ad.lastWrite.Before(tooOld) {
|
|
c.closeDerpLocked(i, "idle")
|
|
dirty = true
|
|
} else {
|
|
someNonHomeOpen = true
|
|
}
|
|
}
|
|
if dirty {
|
|
c.logActiveDerpLocked()
|
|
}
|
|
if someNonHomeOpen {
|
|
c.scheduleCleanStaleDerpLocked()
|
|
}
|
|
}
|
|
|
|
func (c *Conn) scheduleCleanStaleDerpLocked() {
|
|
if c.derpCleanupTimerArmed {
|
|
// Already going to fire soon. Let the existing one
|
|
// fire lest it get infinitely delayed by repeated
|
|
// calls to scheduleCleanStaleDerpLocked.
|
|
return
|
|
}
|
|
c.derpCleanupTimerArmed = true
|
|
if c.derpCleanupTimer != nil {
|
|
c.derpCleanupTimer.Reset(derpCleanStaleInterval)
|
|
} else {
|
|
c.derpCleanupTimer = time.AfterFunc(derpCleanStaleInterval, c.cleanStaleDerp)
|
|
}
|
|
}
|
|
|
|
// DERPs reports the number of active DERP connections.
|
|
func (c *Conn) DERPs() int {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
return len(c.activeDerp)
|
|
}
|
|
|
|
func (c *Conn) derpRegionCodeOfIDLocked(regionID int) string {
|
|
if c.derpMap == nil {
|
|
return ""
|
|
}
|
|
if r, ok := c.derpMap.Regions[regionID]; ok {
|
|
return r.RegionCode
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// derpAddrFamSelector is the derphttp.AddressFamilySelector we pass
|
|
// to derphttp.Client.SetAddressFamilySelector.
|
|
//
|
|
// It provides the hint as to whether in an IPv4-vs-IPv6 race that
|
|
// IPv4 should be held back a bit to give IPv6 a better-than-50/50
|
|
// chance of winning. We only return true when we believe IPv6 will
|
|
// work anyway, so we don't artificially delay the connection speed.
|
|
type derpAddrFamSelector struct{ c *Conn }
|
|
|
|
func (s derpAddrFamSelector) PreferIPv6() bool {
|
|
if r := s.c.lastNetCheckReport.Load(); r != nil {
|
|
return r.IPv6
|
|
}
|
|
return false
|
|
}
|
|
|
|
const (
|
|
// derpInactiveCleanupTime is how long a non-home DERP connection
|
|
// needs to be idle (last written to) before we close it.
|
|
derpInactiveCleanupTime = 60 * time.Second
|
|
|
|
// derpCleanStaleInterval is how often cleanStaleDerp runs when there
|
|
// are potentially-stale DERP connections to close.
|
|
derpCleanStaleInterval = 15 * time.Second
|
|
)
|