tailscale/derp/derp_server.go

2262 lines
67 KiB
Go
Raw Normal View History

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package derp
// TODO(crawshaw): with predefined serverKey in clients and HMAC on packets we could skip TLS
import (
"bufio"
"bytes"
"context"
"crypto/ed25519"
crand "crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/binary"
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"log"
"math"
"math/big"
"math/rand/v2"
"net"
"net/http"
"net/netip"
"os/exec"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go4.org/mem"
"golang.org/x/sync/errgroup"
"tailscale.com/client/tailscale"
"tailscale.com/disco"
"tailscale.com/envknob"
"tailscale.com/metrics"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/tstime"
"tailscale.com/tstime/rate"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/util/mak"
"tailscale.com/util/set"
"tailscale.com/util/slicesx"
"tailscale.com/version"
)
// verboseDropKeys is the set of destination public keys that should
// verbosely log whenever DERP drops a packet.
var verboseDropKeys = map[key.NodePublic]bool{}
func init() {
keys := envknob.String("TS_DEBUG_VERBOSE_DROPS")
if keys == "" {
return
}
for _, keyStr := range strings.Split(keys, ",") {
k, err := key.ParseNodePublicUntyped(mem.S(keyStr))
if err != nil {
log.Printf("ignoring invalid debug key %q: %v", keyStr, err)
} else {
verboseDropKeys[k] = true
}
}
}
const (
perClientSendQueueDepth = 32 // packets buffered for sending
writeTimeout = 2 * time.Second
)
// dupPolicy is a temporary (2021-08-30) mechanism to change the policy
// of how duplicate connection for the same key are handled.
type dupPolicy int8
const (
// lastWriterIsActive is a dupPolicy where the connection
// to send traffic for a peer is the active one.
lastWriterIsActive dupPolicy = iota
// disableFighters is a dupPolicy that detects if peers
// are trying to send interleaved with each other and
// then disables all of them.
disableFighters
)
type align64 [0]atomic.Int64 // for side effect of its 64-bit alignment
// Server is a DERP server.
type Server struct {
// WriteTimeout, if non-zero, specifies how long to wait
// before failing when writing to a client.
WriteTimeout time.Duration
privateKey key.NodePrivate
publicKey key.NodePublic
logf logger.Logf
memSys0 uint64 // runtime.MemStats.Sys at start (or early-ish)
meshKey string
limitedLogf logger.Logf
metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
dupPolicy dupPolicy
debug bool
// Counters:
packetsSent, bytesSent expvar.Int
packetsRecv, bytesRecv expvar.Int
packetsRecvByKind metrics.LabelMap
packetsRecvDisco *expvar.Int
packetsRecvOther *expvar.Int
_ align64
packetsDropped expvar.Int
packetsDroppedReason metrics.LabelMap
packetsDroppedReasonCounters []*expvar.Int // indexed by dropReason
packetsDroppedType metrics.LabelMap
packetsDroppedTypeDisco *expvar.Int
packetsDroppedTypeOther *expvar.Int
_ align64
packetsForwardedOut expvar.Int
packetsForwardedIn expvar.Int
peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
gotPing expvar.Int // number of ping frames from client
sentPong expvar.Int // number of pong frames enqueued to client
accepts expvar.Int
curClients expvar.Int
curHomeClients expvar.Int // ones with preferred
dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
dupClientConns expvar.Int // current number of connections sharing a public key
dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
unknownFrames expvar.Int
homeMovesIn expvar.Int // established clients announce home server moves in
homeMovesOut expvar.Int // established clients announce home server moves out
multiForwarderCreated expvar.Int
multiForwarderDeleted expvar.Int
removePktForwardOther expvar.Int
avgQueueDuration *uint64 // In milliseconds; accessed atomically
tcpRtt metrics.LabelMap // histogram
meshUpdateBatchSize *metrics.Histogram
meshUpdateLoopCount *metrics.Histogram
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
// verifyClientsLocalTailscaled only accepts client connections to the DERP
// server if the clientKey is a known peer in the network, as specified by a
// running tailscaled's client's LocalAPI.
verifyClientsLocalTailscaled bool
verifyClientsURL string
verifyClientsURLFailOpen bool
mu sync.Mutex
closed bool
netConns map[Conn]chan struct{} // chan is closed when conn closes
clients map[key.NodePublic]*clientSet
watchers set.Set[*sclient] // mesh peers
// clientsMesh tracks all clients in the cluster, both locally
// and to mesh peers. If the value is nil, that means the
// peer is only local (and thus in the clients Map, but not
// remote). If the value is non-nil, it's remote (+ maybe also
// local).
clientsMesh map[key.NodePublic]PacketForwarder
// peerGoneWatchers is the set of watchers that subscribed to a
// peer disconnecting from the region overall. When a peer
// is gone from the region, we notify all of these watchers,
// calling their funcs in a new goroutine.
peerGoneWatchers map[key.NodePublic]set.HandleSet[func(key.NodePublic)]
// maps from netip.AddrPort to a client's public key
keyOfAddr map[netip.AddrPort]key.NodePublic
clock tstime.Clock
}
// clientSet represents 1 or more *sclients.
//
// In the common case, client should only have one connection to the
// DERP server for a given key. When they're connected multiple times,
// we record their set of connections in dupClientSet and keep their
// connections open to make them happy (to keep them from spinning,
// etc) and keep track of which is the latest connection. If only the last
// is sending traffic, that last one is the active connection and it
// gets traffic. Otherwise, in the case of a cloned node key, the
// whole set of dups doesn't receive data frames.
//
// All methods should only be called while holding Server.mu.
//
// TODO(bradfitz): Issue 2746: in the future we'll send some sort of
// "health_error" frame to them that'll communicate to the end users
// that they cloned a device key, and we'll also surface it in the
// admin panel, etc.
type clientSet struct {
// activeClient holds the currently active connection for the set. It's nil
// if there are no connections or the connection is disabled.
//
// A pointer to a clientSet can be held by peers for long periods of time
// without holding Server.mu to avoid mutex contention on Server.mu, only
// re-acquiring the mutex and checking the clients map if activeClient is
// nil.
activeClient atomic.Pointer[sclient]
// dup is non-nil if there are multiple connections for the
// public key. It's nil in the common case of only one
// client being connected.
//
// dup is guarded by Server.mu.
dup *dupClientSet
}
// Len returns the number of clients in s, which can be
// 0, 1 (the common case), or more (for buggy or transiently
// reconnecting clients).
func (s *clientSet) Len() int {
if s.dup != nil {
return len(s.dup.set)
}
if s.activeClient.Load() != nil {
return 1
}
return 0
}
// ForeachClient calls f for each client in the set.
//
// The Server.mu must be held.
func (s *clientSet) ForeachClient(f func(*sclient)) {
if s.dup != nil {
for c := range s.dup.set {
f(c)
}
} else if c := s.activeClient.Load(); c != nil {
f(c)
}
}
// A dupClientSet is a clientSet of more than 1 connection.
//
// This can occur in some reasonable cases (temporarily while users
// are changing networks) or in the case of a cloned key. In the
// cloned key case, both peers are speaking and the clients get
// disabled.
//
// All fields are guarded by Server.mu.
type dupClientSet struct {
// set is the set of connected clients for sclient.key,
// including the clientSet's active one.
set set.Set[*sclient]
// last is the most recent addition to set, or nil if the most
// recent one has since disconnected and nobody else has sent
// data since.
last *sclient
// sendHistory is a log of which members of set have sent
// frames to the derp server, with adjacent duplicates
// removed. When a member of set is removed, the same
// element(s) are removed from sendHistory.
sendHistory []*sclient
}
func (s *clientSet) pickActiveClient() *sclient {
d := s.dup
if d == nil {
return s.activeClient.Load()
}
if d.last != nil && !d.last.isDisabled.Load() {
return d.last
}
return nil
}
// removeClient removes c from s and reports whether it was in s
// to begin with.
func (s *dupClientSet) removeClient(c *sclient) bool {
n := len(s.set)
delete(s.set, c)
if s.last == c {
s.last = nil
}
if len(s.set) == n {
return false
}
trim := s.sendHistory[:0]
for _, v := range s.sendHistory {
if s.set.Contains(v) && (len(trim) == 0 || trim[len(trim)-1] != v) {
trim = append(trim, v)
}
}
for i := len(trim); i < len(s.sendHistory); i++ {
s.sendHistory[i] = nil
}
s.sendHistory = trim
if s.last == nil && len(s.sendHistory) > 0 {
s.last = s.sendHistory[len(s.sendHistory)-1]
}
return true
}
// PacketForwarder is something that can forward packets.
//
// It's mostly an interface for circular dependency reasons; the
// typical implementation is derphttp.Client. The other implementation
// is a multiForwarder, which this package creates as needed if a
// public key gets more than one PacketForwarder registered for it.
type PacketForwarder interface {
ForwardPacket(src, dst key.NodePublic, payload []byte) error
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
String() string
}
// Conn is the subset of the underlying net.Conn the DERP Server needs.
// It is a defined type so that non-net connections can be used.
type Conn interface {
io.WriteCloser
LocalAddr() net.Addr
// The *Deadline methods follow the semantics of net.Conn.
SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}
// NewServer returns a new DERP server. It doesn't listen on its own.
// Connections are given to it via Server.Accept.
func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
s := &Server{
debug: envknob.Bool("DERP_DEBUG_LOGS"),
privateKey: privateKey,
publicKey: privateKey.Public(),
logf: logf,
limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100),
packetsRecvByKind: metrics.LabelMap{Label: "kind"},
packetsDroppedReason: metrics.LabelMap{Label: "reason"},
packetsDroppedType: metrics.LabelMap{Label: "type"},
clients: map[key.NodePublic]*clientSet{},
clientsMesh: map[key.NodePublic]PacketForwarder{},
netConns: map[Conn]chan struct{}{},
memSys0: ms.Sys,
watchers: set.Set[*sclient]{},
peerGoneWatchers: map[key.NodePublic]set.HandleSet[func(key.NodePublic)]{},
avgQueueDuration: new(uint64),
tcpRtt: metrics.LabelMap{Label: "le"},
meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}),
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
clock: tstime.StdClock{},
}
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
s.packetsRecvOther = s.packetsRecvByKind.Get("other")
s.packetsDroppedReasonCounters = s.genPacketsDroppedReasonCounters()
s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco")
s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other")
return s
}
func (s *Server) genPacketsDroppedReasonCounters() []*expvar.Int {
getMetric := s.packetsDroppedReason.Get
ret := []*expvar.Int{
dropReasonUnknownDest: getMetric("unknown_dest"),
dropReasonUnknownDestOnFwd: getMetric("unknown_dest_on_fwd"),
dropReasonGoneDisconnected: getMetric("gone_disconnected"),
dropReasonQueueHead: getMetric("queue_head"),
dropReasonQueueTail: getMetric("queue_tail"),
dropReasonWriteError: getMetric("write_error"),
dropReasonDupClient: getMetric("dup_client"),
}
if len(ret) != int(numDropReasons) {
panic("dropReason metrics out of sync")
}
for i := range numDropReasons {
if ret[i] == nil {
panic("dropReason metrics out of sync")
}
}
return ret
}
// SetMesh sets the pre-shared key that regional DERP servers used to mesh
// amongst themselves.
//
// It must be called before serving begins.
func (s *Server) SetMeshKey(v string) {
s.meshKey = v
}
// SetVerifyClients sets whether this DERP server verifies clients through tailscaled.
//
// It must be called before serving begins.
func (s *Server) SetVerifyClient(v bool) {
s.verifyClientsLocalTailscaled = v
}
// SetVerifyClientURL sets the admission controller URL to use for verifying clients.
// If empty, all clients are accepted (unless restricted by SetVerifyClient checking
// against tailscaled).
func (s *Server) SetVerifyClientURL(v string) {
s.verifyClientsURL = v
}
// SetVerifyClientURLFailOpen sets whether to allow clients to connect if the
// admission controller URL is unreachable.
func (s *Server) SetVerifyClientURLFailOpen(v bool) {
s.verifyClientsURLFailOpen = v
}
// HasMeshKey reports whether the server is configured with a mesh key.
func (s *Server) HasMeshKey() bool { return s.meshKey != "" }
// MeshKey returns the configured mesh key, if any.
func (s *Server) MeshKey() string { return s.meshKey }
// PrivateKey returns the server's private key.
func (s *Server) PrivateKey() key.NodePrivate { return s.privateKey }
// PublicKey returns the server's public key.
func (s *Server) PublicKey() key.NodePublic { return s.publicKey }
// Close closes the server and waits for the connections to disconnect.
func (s *Server) Close() error {
s.mu.Lock()
wasClosed := s.closed
s.closed = true
s.mu.Unlock()
if wasClosed {
return nil
}
var closedChs []chan struct{}
s.mu.Lock()
for nc, closed := range s.netConns {
nc.Close()
closedChs = append(closedChs, closed)
}
s.mu.Unlock()
for _, closed := range closedChs {
<-closed
}
return nil
}
func (s *Server) isClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed
}
// IsClientConnectedForTest reports whether the client with specified key is connected.
// This is used in tests to verify that nodes are connected.
func (s *Server) IsClientConnectedForTest(k key.NodePublic) bool {
s.mu.Lock()
defer s.mu.Unlock()
x, ok := s.clients[k]
if !ok {
return false
}
return x.activeClient.Load() != nil
}
// Accept adds a new connection to the server and serves it.
//
// The provided bufio ReadWriter must be already connected to nc.
// Accept blocks until the Server is closed or the connection closes
// on its own.
//
// Accept closes nc.
func (s *Server) Accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string) {
closed := make(chan struct{})
s.mu.Lock()
s.accepts.Add(1) // while holding s.mu for connNum read on next line
connNum := s.accepts.Value() // expvar sadly doesn't return new value on Add(1)
s.netConns[nc] = closed
s.mu.Unlock()
defer func() {
nc.Close()
close(closed)
s.mu.Lock()
delete(s.netConns, nc)
s.mu.Unlock()
}()
if err := s.accept(ctx, nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() {
s.logf("derp: %s: %v", remoteAddr, err)
}
}
// initMetacert initialized s.metaCert with a self-signed x509 cert
// encoding this server's public key and protocol version. cmd/derper
// then sends this after the Let's Encrypt leaf + intermediate certs
// after the ServerHello (encrypted in TLS 1.3, not that it matters
// much).
//
// Then the client can save a round trip getting that and can start
// speaking DERP right away. (We don't use ALPN because that's sent in
// the clear and we're being paranoid to not look too weird to any
// middleboxes, given that DERP is an ultimate fallback path). But
// since the post-ServerHello certs are encrypted we can have the
// client also use them as a signal to be able to start speaking DERP
// right away, starting with its identity proof, encrypted to the
// server's public key.
//
// This RTT optimization fails where there's a corp-mandated
// TLS proxy with corp-mandated root certs on employee machines and
// and TLS proxy cleans up unnecessary certs. In that case we just fall
// back to the extra RTT.
func (s *Server) initMetacert() {
pub, priv, err := ed25519.GenerateKey(crand.Reader)
if err != nil {
log.Fatal(err)
}
tmpl := &x509.Certificate{
SerialNumber: big.NewInt(ProtocolVersion),
Subject: pkix.Name{
CommonName: fmt.Sprintf("derpkey%s", s.publicKey.UntypedHexString()),
},
// Windows requires NotAfter and NotBefore set:
NotAfter: s.clock.Now().Add(30 * 24 * time.Hour),
NotBefore: s.clock.Now().Add(-30 * 24 * time.Hour),
// Per https://github.com/golang/go/issues/51759#issuecomment-1071147836,
// macOS requires BasicConstraints when subject == issuer:
BasicConstraintsValid: true,
}
cert, err := x509.CreateCertificate(crand.Reader, tmpl, tmpl, pub, priv)
if err != nil {
log.Fatalf("CreateCertificate: %v", err)
}
s.metaCert = cert
}
// MetaCert returns the server metadata cert that can be sent by the
// TLS server to let the client skip a round trip during start-up.
func (s *Server) MetaCert() []byte { return s.metaCert }
// registerClient notes that client c is now authenticated and ready for packets.
//
// If c.key is connected more than once, the earlier connection(s) are
// placed in a non-active state where we read from them (primarily to
// observe EOFs/timeouts) but won't send them frames on the assumption
// that they're dead.
func (s *Server) registerClient(c *sclient) {
s.mu.Lock()
defer s.mu.Unlock()
cs, ok := s.clients[c.key]
if !ok {
c.debugLogf("register single client")
cs = &clientSet{}
s.clients[c.key] = cs
}
was := cs.activeClient.Load()
if was == nil {
// Common case.
} else {
was.isDup.Store(true)
c.isDup.Store(true)
}
dup := cs.dup
if dup == nil && was != nil {
s.dupClientKeys.Add(1)
s.dupClientConns.Add(2) // both old and new count
s.dupClientConnTotal.Add(1)
dup = &dupClientSet{
set: set.Of(c, was),
last: c,
sendHistory: []*sclient{was},
}
cs.dup = dup
c.debugLogf("register duplicate client")
} else if dup != nil {
s.dupClientConns.Add(1) // the gauge
s.dupClientConnTotal.Add(1) // the counter
dup.set.Add(c)
dup.last = c
dup.sendHistory = append(dup.sendHistory, c)
c.debugLogf("register another duplicate client")
}
cs.activeClient.Store(c)
if _, ok := s.clientsMesh[c.key]; !ok {
s.clientsMesh[c.key] = nil // just for varz of total users in cluster
}
s.keyOfAddr[c.remoteIPPort] = c.key
s.curClients.Add(1)
s.broadcastPeerStateChangeLocked(c.key, c.remoteIPPort, c.presentFlags(), true)
}
// broadcastPeerStateChangeLocked enqueues a message to all watchers
// (other DERP nodes in the region, or trusted clients) that peer's
// presence changed.
//
// s.mu must be held.
func (s *Server) broadcastPeerStateChangeLocked(peer key.NodePublic, ipPort netip.AddrPort, flags PeerPresentFlags, present bool) {
for w := range s.watchers {
w.peerStateChange = append(w.peerStateChange, peerConnState{
peer: peer,
present: present,
ipPort: ipPort,
flags: flags,
})
go w.requestMeshUpdate()
}
}
// unregisterClient removes a client from the server.
func (s *Server) unregisterClient(c *sclient) {
s.mu.Lock()
defer s.mu.Unlock()
set, ok := s.clients[c.key]
if !ok {
c.logf("[unexpected]; clients map is empty")
return
}
dup := set.dup
if dup == nil {
// The common case.
cur := set.activeClient.Load()
if cur == nil {
c.logf("[unexpected]; active client is nil")
return
}
if cur != c {
c.logf("[unexpected]; active client is not c")
return
}
c.debugLogf("removed connection")
set.activeClient.Store(nil)
delete(s.clients, c.key)
if v, ok := s.clientsMesh[c.key]; ok && v == nil {
delete(s.clientsMesh, c.key)
s.notePeerGoneFromRegionLocked(c.key)
}
s.broadcastPeerStateChangeLocked(c.key, netip.AddrPort{}, 0, false)
} else {
c.debugLogf("removed duplicate client")
if dup.removeClient(c) {
s.dupClientConns.Add(-1)
} else {
c.logf("[unexpected]; dup client set didn't shrink")
}
if dup.set.Len() == 1 {
// If we drop down to one connection, demote it down
// to a regular single client (a nil dup set).
set.dup = nil
s.dupClientConns.Add(-1) // again; for the original one's
s.dupClientKeys.Add(-1)
var remain *sclient
for remain = range dup.set {
break
}
if remain == nil {
panic("unexpected nil remain from single element dup set")
}
remain.isDisabled.Store(false)
remain.isDup.Store(false)
set.activeClient.Store(remain)
} else {
// Still a duplicate. Pick a winner.
set.activeClient.Store(set.pickActiveClient())
}
}
if c.canMesh {
delete(s.watchers, c)
}
delete(s.keyOfAddr, c.remoteIPPort)
s.curClients.Add(-1)
if c.preferred {
s.curHomeClients.Add(-1)
}
}
// addPeerGoneFromRegionWatcher adds a function to be called when peer is gone
// from the region overall. It returns a handle that can be used to remove the
// watcher later.
//
// The provided f func is usually [sclient.onPeerGoneFromRegion], added by
// [sclient.noteSendFromSrc]; this func doesn't take a whole *sclient to make it
// clear what has access to what.
func (s *Server) addPeerGoneFromRegionWatcher(peer key.NodePublic, f func(key.NodePublic)) set.Handle {
s.mu.Lock()
defer s.mu.Unlock()
hset, ok := s.peerGoneWatchers[peer]
if !ok {
hset = set.HandleSet[func(key.NodePublic)]{}
s.peerGoneWatchers[peer] = hset
}
return hset.Add(f)
}
// removePeerGoneFromRegionWatcher removes a peer watcher previously added by
// addPeerGoneFromRegionWatcher, using the handle returned by
// addPeerGoneFromRegionWatcher.
func (s *Server) removePeerGoneFromRegionWatcher(peer key.NodePublic, h set.Handle) {
s.mu.Lock()
defer s.mu.Unlock()
hset, ok := s.peerGoneWatchers[peer]
if !ok {
return
}
delete(hset, h)
if len(hset) == 0 {
delete(s.peerGoneWatchers, peer)
}
}
// notePeerGoneFromRegionLocked sends peerGone frames to parties that
// key has sent to previously (whether those sends were from a local
// client or forwarded). It must only be called after the key has
// been removed from clientsMesh.
func (s *Server) notePeerGoneFromRegionLocked(key key.NodePublic) {
if _, ok := s.clientsMesh[key]; ok {
panic("usage")
}
// Find still-connected peers and either notify that we've gone away
// so they can drop their route entries to us (issue 150)
// or move them over to the active client (in case a replaced client
// connection is being unregistered).
set := s.peerGoneWatchers[key]
for _, f := range set {
go f(key)
}
delete(s.peerGoneWatchers, key)
}
// requestPeerGoneWriteLimited sends a request to write a "peer gone"
// frame, but only in reply to a disco packet, and only if we haven't
// sent one recently.
func (c *sclient) requestPeerGoneWriteLimited(peer key.NodePublic, contents []byte, reason PeerGoneReasonType) {
if disco.LooksLikeDiscoWrapper(contents) != true {
return
}
if c.peerGoneLim.Allow() {
go c.requestPeerGoneWrite(peer, reason)
}
}
func (s *Server) addWatcher(c *sclient) {
if !c.canMesh {
panic("invariant: addWatcher called without permissions")
}
if c.key == s.publicKey {
// We're connecting to ourself. Do nothing.
return
}
s.mu.Lock()
defer s.mu.Unlock()
// Queue messages for each already-connected client.
for peer, clientSet := range s.clients {
ac := clientSet.activeClient.Load()
if ac == nil {
continue
}
c.peerStateChange = append(c.peerStateChange, peerConnState{
peer: peer,
present: true,
ipPort: ac.remoteIPPort,
flags: ac.presentFlags(),
})
}
// And enroll the watcher in future updates (of both
// connections & disconnections).
s.watchers.Add(c)
go c.requestMeshUpdate()
}
func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error {
br := brw.Reader
nc.SetDeadline(time.Now().Add(10 * time.Second))
bw := &lazyBufioWriter{w: nc, lbw: brw.Writer}
if err := s.sendServerKey(bw); err != nil {
return fmt.Errorf("send server key: %v", err)
}
nc.SetDeadline(time.Now().Add(10 * time.Second))
clientKey, clientInfo, err := s.recvClientKey(br)
if err != nil {
return fmt.Errorf("receive client key: %v", err)
}
clientAP, _ := netip.ParseAddrPort(remoteAddr)
if err := s.verifyClient(ctx, clientKey, clientInfo, clientAP.Addr()); err != nil {
return fmt.Errorf("client %v rejected: %v", clientKey, err)
}
// At this point we trust the client so we don't time out.
nc.SetDeadline(time.Time{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
remoteIPPort, _ := netip.ParseAddrPort(remoteAddr)
c := &sclient{
connNum: connNum,
s: s,
key: clientKey,
nc: nc,
br: br,
bw: bw,
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v%s: ", remoteAddr, clientKey.ShortString())),
done: ctx.Done(),
remoteIPPort: remoteIPPort,
connectedAt: s.clock.Now(),
sendQueue: make(chan pkt, perClientSendQueueDepth),
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
sendPongCh: make(chan [8]byte, 1),
peerGone: make(chan peerGoneMsg),
canMesh: s.isMeshPeer(clientInfo),
peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 3),
}
if c.canMesh {
c.meshUpdate = make(chan struct{}, 1) // must be buffered; >1 is fine but wasteful
}
if clientInfo != nil {
c.info = *clientInfo
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
if envknob.Bool("DERP_PROBER_DEBUG_LOGS") && clientInfo.IsProber {
c.debug = true
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
}
}
if s.debug {
c.debug = true
}
s.registerClient(c)
defer s.unregisterClient(c)
err = s.sendServerInfo(c.bw, clientKey)
if err != nil {
return fmt.Errorf("send server info: %v", err)
}
return c.run(ctx)
}
func (s *Server) debugLogf(format string, v ...any) {
if s.debug {
s.logf(format, v...)
}
}
// run serves the client until there's an error.
// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error.
func (c *sclient) run(ctx context.Context) error {
// Launch sender, but don't return from run until sender goroutine is done.
var grp errgroup.Group
sendCtx, cancelSender := context.WithCancel(ctx)
grp.Go(func() error { return c.sendLoop(sendCtx) })
defer func() {
cancelSender()
if err := grp.Wait(); err != nil && !c.s.isClosed() {
if errors.Is(err, context.Canceled) {
c.debugLogf("sender canceled by reader exiting")
} else {
c.logf("sender failed: %v", err)
}
}
}()
c.startStatsLoop(sendCtx)
for {
ft, fl, err := readFrameHeader(c.br)
c.debugLogf("read frame type %d len %d err %v", ft, fl, err)
if err != nil {
if errors.Is(err, io.EOF) {
c.debugLogf("read EOF")
return nil
}
if c.s.isClosed() {
c.logf("closing; server closed")
return nil
}
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
return fmt.Errorf("client %s: readFrameHeader: %w", c.key.ShortString(), err)
}
c.s.noteClientActivity(c)
switch ft {
case frameNotePreferred:
err = c.handleFrameNotePreferred(ft, fl)
case frameSendPacket:
err = c.handleFrameSendPacket(ft, fl)
case frameForwardPacket:
err = c.handleFrameForwardPacket(ft, fl)
case frameWatchConns:
err = c.handleFrameWatchConns(ft, fl)
case frameClosePeer:
err = c.handleFrameClosePeer(ft, fl)
case framePing:
err = c.handleFramePing(ft, fl)
default:
err = c.handleUnknownFrame(ft, fl)
}
if err != nil {
return err
}
}
}
func (c *sclient) handleUnknownFrame(ft frameType, fl uint32) error {
_, err := io.CopyN(io.Discard, c.br, int64(fl))
return err
}
func (c *sclient) handleFrameNotePreferred(ft frameType, fl uint32) error {
if fl != 1 {
return fmt.Errorf("frameNotePreferred wrong size")
}
v, err := c.br.ReadByte()
if err != nil {
return fmt.Errorf("frameNotePreferred ReadByte: %v", err)
}
c.setPreferred(v != 0)
return nil
}
func (c *sclient) handleFrameWatchConns(ft frameType, fl uint32) error {
if fl != 0 {
return fmt.Errorf("handleFrameWatchConns wrong size")
}
if !c.canMesh {
return fmt.Errorf("insufficient permissions")
}
c.s.addWatcher(c)
return nil
}
func (c *sclient) handleFramePing(ft frameType, fl uint32) error {
c.s.gotPing.Add(1)
var m PingMessage
if fl < uint32(len(m)) {
return fmt.Errorf("short ping: %v", fl)
}
if fl > 1000 {
// unreasonably extra large. We leave some extra
// space for future extensibility, but not too much.
return fmt.Errorf("ping body too large: %v", fl)
}
_, err := io.ReadFull(c.br, m[:])
if err != nil {
return err
}
if extra := int64(fl) - int64(len(m)); extra > 0 {
_, err = io.CopyN(io.Discard, c.br, extra)
}
select {
case c.sendPongCh <- [8]byte(m):
default:
// They're pinging too fast. Ignore.
// TODO(bradfitz): add a rate limiter too.
}
return err
}
func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error {
if fl != keyLen {
return fmt.Errorf("handleFrameClosePeer wrong size")
}
if !c.canMesh {
return fmt.Errorf("insufficient permissions")
}
var targetKey key.NodePublic
if err := targetKey.ReadRawWithoutAllocating(c.br); err != nil {
return err
}
s := c.s
s.mu.Lock()
defer s.mu.Unlock()
if set, ok := s.clients[targetKey]; ok {
if set.Len() == 1 {
c.logf("frameClosePeer closing peer %x", targetKey)
} else {
c.logf("frameClosePeer closing peer %x (%d connections)", targetKey, set.Len())
}
set.ForeachClient(func(target *sclient) {
go target.nc.Close()
})
} else {
c.logf("frameClosePeer failed to find peer %x", targetKey)
}
return nil
}
// handleFrameForwardPacket reads a "forward packet" frame from the client
// (which must be a trusted client, a peer in our mesh).
func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
if !c.canMesh {
return fmt.Errorf("insufficient permissions")
}
s := c.s
srcKey, dstKey, contents, err := s.recvForwardPacket(c.br, fl)
if err != nil {
return fmt.Errorf("client %v: recvForwardPacket: %v", c.key, err)
}
s.packetsForwardedIn.Add(1)
var dstLen int
var dst *sclient
s.mu.Lock()
if set, ok := s.clients[dstKey]; ok {
dstLen = set.Len()
dst = set.activeClient.Load()
}
s.mu.Unlock()
if dst == nil {
reason := dropReasonUnknownDestOnFwd
if dstLen > 1 {
reason = dropReasonDupClient
} else {
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
}
s.recordDrop(contents, srcKey, dstKey, reason)
return nil
}
dst.debugLogf("received forwarded packet from %s via %s", srcKey.ShortString(), c.key.ShortString())
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
return c.sendPkt(dst, pkt{
bs: contents,
enqueuedAt: c.s.clock.Now(),
src: srcKey,
})
}
// handleFrameSendPacket reads a "send packet" frame from the client.
func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
s := c.s
dstKey, contents, err := s.recvPacket(c.br, fl)
if err != nil {
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
}
var fwd PacketForwarder
var dstLen int
var dst *sclient
s.mu.Lock()
if set, ok := s.clients[dstKey]; ok {
dstLen = set.Len()
dst = set.activeClient.Load()
}
if dst == nil && dstLen < 1 {
fwd = s.clientsMesh[dstKey]
}
s.mu.Unlock()
if dst == nil {
if fwd != nil {
s.packetsForwardedOut.Add(1)
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
err := fwd.ForwardPacket(c.key, dstKey, contents)
c.debugLogf("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err)
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
if err != nil {
// TODO:
return nil
}
return nil
}
reason := dropReasonUnknownDest
if dstLen > 1 {
reason = dropReasonDupClient
} else {
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
}
s.recordDrop(contents, c.key, dstKey, reason)
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
return nil
}
c.debugLogf("SendPacket for %s, sending directly", dstKey.ShortString())
p := pkt{
bs: contents,
enqueuedAt: c.s.clock.Now(),
src: c.key,
}
return c.sendPkt(dst, p)
}
func (c *sclient) debugLogf(format string, v ...any) {
if c.debug {
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
c.logf(format, v...)
}
}
// dropReason is why we dropped a DERP frame.
type dropReason int
//go:generate go run tailscale.com/cmd/addlicense -file dropreason_string.go go run golang.org/x/tools/cmd/stringer -type=dropReason -trimprefix=dropReason
const (
dropReasonUnknownDest dropReason = iota // unknown destination pubkey
dropReasonUnknownDestOnFwd // unknown destination pubkey on a derp-forwarded packet
dropReasonGoneDisconnected // destination tailscaled disconnected before we could send
dropReasonQueueHead // destination queue is full, dropped packet at queue head
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
dropReasonWriteError // OS write() failed
dropReasonDupClient // the public key is connected 2+ times (active/active, fighting)
numDropReasons // unused; keep last
)
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) {
s.packetsDropped.Add(1)
s.packetsDroppedReasonCounters[reason].Add(1)
looksDisco := disco.LooksLikeDiscoWrapper(packetBytes)
if looksDisco {
s.packetsDroppedTypeDisco.Add(1)
} else {
s.packetsDroppedTypeOther.Add(1)
}
if verboseDropKeys[dstKey] {
// Preformat the log string prior to calling limitedLogf. The
// limiter acts based on the format string, and we want to
// rate-limit per src/dst keys, not on the generic "dropped
// stuff" message.
msg := fmt.Sprintf("drop (%s) %s -> %s", srcKey.ShortString(), reason, dstKey.ShortString())
s.limitedLogf(msg)
}
s.debugLogf("dropping packet reason=%s dst=%s disco=%v", reason, dstKey, looksDisco)
}
func (c *sclient) sendPkt(dst *sclient, p pkt) error {
s := c.s
dstKey := dst.key
// Attempt to queue for sending up to 3 times. On each attempt, if
// the queue is full, try to drop from queue head to prioritize
// fresher packets.
sendQueue := dst.sendQueue
if disco.LooksLikeDiscoWrapper(p.bs) {
sendQueue = dst.discoSendQueue
}
for attempt := 0; attempt < 3; attempt++ {
select {
case <-dst.done:
s.recordDrop(p.bs, c.key, dstKey, dropReasonGoneDisconnected)
dst.debugLogf("sendPkt attempt %d dropped, dst gone", attempt)
return nil
default:
}
select {
case sendQueue <- p:
dst.debugLogf("sendPkt attempt %d enqueued", attempt)
return nil
default:
}
select {
case pkt := <-sendQueue:
s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead)
c.recordQueueTime(pkt.enqueuedAt)
default:
}
}
// Failed to make room for packet. This can happen in a heavily
// contended queue with racing writers. Give up and tail-drop in
// this case to keep reader unblocked.
s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail)
dst.debugLogf("sendPkt attempt %d dropped, queue full")
return nil
}
// onPeerGoneFromRegion is the callback registered with the Server to be
// notified (in a new goroutine) whenever a peer has disconnected from all DERP
// nodes in the current region.
func (c *sclient) onPeerGoneFromRegion(peer key.NodePublic) {
c.requestPeerGoneWrite(peer, PeerGoneReasonDisconnected)
}
// requestPeerGoneWrite sends a request to write a "peer gone" frame
// with an explanation of why it is gone. It blocks until either the
// write request is scheduled, or the client has closed.
func (c *sclient) requestPeerGoneWrite(peer key.NodePublic, reason PeerGoneReasonType) {
select {
case c.peerGone <- peerGoneMsg{
peer: peer,
reason: reason,
}:
case <-c.done:
}
}
// requestMeshUpdate notes that a c's peerStateChange has been appended to and
// should now be written.
//
// It does not block. If a meshUpdate is already pending for this client, it
// does nothing.
func (c *sclient) requestMeshUpdate() {
if !c.canMesh {
panic("unexpected requestMeshUpdate")
}
select {
case c.meshUpdate <- struct{}{}:
default:
}
}
var localClient tailscale.LocalClient
// isMeshPeer reports whether the client is a trusted mesh peer
// node in the DERP region.
func (s *Server) isMeshPeer(info *clientInfo) bool {
return info != nil && info.MeshKey != "" && info.MeshKey == s.meshKey
}
// verifyClient checks whether the client is allowed to connect to the derper,
// depending on how & whether the server's been configured to verify.
func (s *Server) verifyClient(ctx context.Context, clientKey key.NodePublic, info *clientInfo, clientIP netip.Addr) error {
if s.isMeshPeer(info) {
// Trusted mesh peer. No need to verify further. In fact, verifying
// further wouldn't work: it's not part of the tailnet so tailscaled and
// likely the admission control URL wouldn't know about it.
return nil
}
// tailscaled-based verification:
if s.verifyClientsLocalTailscaled {
_, err := localClient.WhoIsNodeKey(ctx, clientKey)
if err == tailscale.ErrPeerNotFound {
return fmt.Errorf("peer %v not authorized (not found in local tailscaled)", clientKey)
}
if err != nil {
if strings.Contains(err.Error(), "invalid 'addr' parameter") {
// Issue 12617
return errors.New("tailscaled version is too old (out of sync with derper binary)")
}
return fmt.Errorf("failed to query local tailscaled status for %v: %w", clientKey, err)
}
}
// admission controller-based verification:
if s.verifyClientsURL != "" {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
jreq, err := json.Marshal(&tailcfg.DERPAdmitClientRequest{
NodePublic: clientKey,
Source: clientIP,
})
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", s.verifyClientsURL, bytes.NewReader(jreq))
if err != nil {
return err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
if s.verifyClientsURLFailOpen {
s.logf("admission controller unreachable; allowing client %v", clientKey)
return nil
}
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("admission controller: %v", res.Status)
}
var jres tailcfg.DERPAdmitClientResponse
if err := json.NewDecoder(io.LimitReader(res.Body, 4<<10)).Decode(&jres); err != nil {
return err
}
if !jres.Allow {
return fmt.Errorf("admission controller: %v/%v not allowed", clientKey, clientIP)
}
// TODO(bradfitz): add policy for configurable bandwidth rate per client?
}
return nil
}
func (s *Server) sendServerKey(lw *lazyBufioWriter) error {
buf := make([]byte, 0, len(magic)+key.NodePublicRawLen)
buf = append(buf, magic...)
buf = s.publicKey.AppendTo(buf)
err := writeFrame(lw.bw(), frameServerKey, buf)
lw.Flush() // redundant (no-op) flush to release bufio.Writer
return err
}
func (s *Server) noteClientActivity(c *sclient) {
if !c.isDup.Load() {
// Fast path for clients that aren't in a dup set.
return
}
if c.isDisabled.Load() {
// If they're already disabled, no point checking more.
return
}
s.mu.Lock()
defer s.mu.Unlock()
cs, ok := s.clients[c.key]
if !ok {
return
}
dup := cs.dup
if dup == nil {
// It became unduped in between the isDup fast path check above
// and the mutex check. Nothing to do.
return
}
if s.dupPolicy == lastWriterIsActive {
dup.last = c
cs.activeClient.Store(c)
} else if dup.last == nil {
// If we didn't have a primary, let the current
// speaker be the primary.
dup.last = c
cs.activeClient.Store(c)
}
if slicesx.LastEqual(dup.sendHistory, c) {
// The client c was the last client to make activity
// in this set and it was already recorded. Nothing to
// do.
return
}
// If we saw this connection send previously, then consider
// the group fighting and disable them all.
if s.dupPolicy == disableFighters {
for _, prior := range dup.sendHistory {
if prior == c {
cs.ForeachClient(func(c *sclient) {
c.isDisabled.Store(true)
if cs.activeClient.Load() == c {
cs.activeClient.Store(nil)
}
})
break
}
}
}
// Append this client to the list of clients who spoke last.
dup.sendHistory = append(dup.sendHistory, c)
}
type serverInfo struct {
Version int `json:"version,omitempty"`
TokenBucketBytesPerSecond int `json:",omitempty"`
TokenBucketBytesBurst int `json:",omitempty"`
}
func (s *Server) sendServerInfo(bw *lazyBufioWriter, clientKey key.NodePublic) error {
msg, err := json.Marshal(serverInfo{Version: ProtocolVersion})
if err != nil {
return err
}
msgbox := s.privateKey.SealTo(clientKey, msg)
if err := writeFrameHeader(bw.bw(), frameServerInfo, uint32(len(msgbox))); err != nil {
return err
}
if _, err := bw.Write(msgbox); err != nil {
return err
}
return bw.Flush()
}
// recvClientKey reads the frameClientInfo frame from the client (its
// proof of identity) upon its initial connection. It should be
// considered especially untrusted at this point.
func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info *clientInfo, err error) {
fl, err := readFrameTypeHeader(br, frameClientInfo)
if err != nil {
return zpub, nil, err
}
const minLen = keyLen + nonceLen
if fl < minLen {
return zpub, nil, errors.New("short client info")
}
// We don't trust the client at all yet, so limit its input size to limit
// things like JSON resource exhausting (http://github.com/golang/go/issues/31789).
if fl > 256<<10 {
return zpub, nil, errors.New("long client info")
}
if err := clientKey.ReadRawWithoutAllocating(br); err != nil {
return zpub, nil, err
}
msgLen := int(fl - keyLen)
msgbox := make([]byte, msgLen)
if _, err := io.ReadFull(br, msgbox); err != nil {
return zpub, nil, fmt.Errorf("msgbox: %v", err)
}
msg, ok := s.privateKey.OpenFrom(clientKey, msgbox)
if !ok {
return zpub, nil, fmt.Errorf("msgbox: cannot open len=%d with client key %s", msgLen, clientKey)
}
info = new(clientInfo)
if err := json.Unmarshal(msg, info); err != nil {
return zpub, nil, fmt.Errorf("msg: %v", err)
}
return clientKey, info, nil
}
func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
if frameLen < keyLen {
return zpub, nil, errors.New("short send packet frame")
}
if err := dstKey.ReadRawWithoutAllocating(br); err != nil {
return zpub, nil, err
}
packetLen := frameLen - keyLen
if packetLen > MaxPacketSize {
return zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize)
}
contents = make([]byte, packetLen)
if _, err := io.ReadFull(br, contents); err != nil {
return zpub, nil, err
}
s.packetsRecv.Add(1)
s.bytesRecv.Add(int64(len(contents)))
if disco.LooksLikeDiscoWrapper(contents) {
s.packetsRecvDisco.Add(1)
} else {
s.packetsRecvOther.Add(1)
}
return dstKey, contents, nil
}
// zpub is the key.NodePublic zero value.
var zpub key.NodePublic
func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, dstKey key.NodePublic, contents []byte, err error) {
if frameLen < keyLen*2 {
return zpub, zpub, nil, errors.New("short send packet frame")
}
if err := srcKey.ReadRawWithoutAllocating(br); err != nil {
return zpub, zpub, nil, err
}
if err := dstKey.ReadRawWithoutAllocating(br); err != nil {
return zpub, zpub, nil, err
}
packetLen := frameLen - keyLen*2
if packetLen > MaxPacketSize {
return zpub, zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize)
}
contents = make([]byte, packetLen)
if _, err := io.ReadFull(br, contents); err != nil {
return zpub, zpub, nil, err
}
// TODO: was s.packetsRecv.Add(1)
// TODO: was s.bytesRecv.Add(int64(len(contents)))
return srcKey, dstKey, contents, nil
}
// sclient is a client connection to the server.
//
// A node (a wireguard public key) can be connected multiple times to a DERP server
// and thus have multiple sclient instances. An sclient represents
// only one of these possibly multiple connections. See clientSet for the
// type that represents the set of all connections for a given key.
//
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
type sclient struct {
// Static after construction.
connNum int64 // process-wide unique counter, incremented each Accept
s *Server
nc Conn
key key.NodePublic
info clientInfo
logf logger.Logf
done <-chan struct{} // closed when connection closes
remoteIPPort netip.AddrPort // zero if remoteAddr is not ip:port.
sendQueue chan pkt // packets queued to this client; never closed
discoSendQueue chan pkt // important packets queued to this client; never closed
sendPongCh chan [8]byte // pong replies to send to the client; never closed
peerGone chan peerGoneMsg // write request that a peer is not at this server (not used by mesh peers)
meshUpdate chan struct{} // write request to write peerStateChange
canMesh bool // clientInfo had correct mesh token for inter-region routing
isDup atomic.Bool // whether more than 1 sclient for key is connected
isDisabled atomic.Bool // whether sends to this peer are disabled due to active/active dups
debug bool // turn on for verbose logging
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
// Owned by run, not thread-safe.
br *bufio.Reader
connectedAt time.Time
preferred bool
// Owned by sendLoop, not thread-safe.
sawSrc map[key.NodePublic]set.Handle
bw *lazyBufioWriter
// Guarded by s.mu
//
// peerStateChange is used by mesh peers (a set of regional
// DERP servers) and contains records that need to be sent to
// the client for them to update their map of who's connected
// to this node.
peerStateChange []peerConnState
// peerGoneLimiter limits how often the server will inform a
// client that it's trying to establish a direct connection
// through us with a peer we have no record of.
peerGoneLim *rate.Limiter
}
func (c *sclient) presentFlags() PeerPresentFlags {
var f PeerPresentFlags
if c.info.IsProber {
f |= PeerPresentIsProber
}
if c.canMesh {
f |= PeerPresentIsMeshPeer
}
if f == 0 {
return PeerPresentIsRegular
}
return f
}
// peerConnState represents whether a peer is connected to the server
// or not.
type peerConnState struct {
ipPort netip.AddrPort // if present, the peer's IP:port
peer key.NodePublic
flags PeerPresentFlags
present bool
}
// pkt is a request to write a data frame to an sclient.
type pkt struct {
// enqueuedAt is when a packet was put onto a queue before it was sent,
// and is used for reporting metrics on the duration of packets in the queue.
enqueuedAt time.Time
// bs is the data packet bytes.
// The memory is owned by pkt.
bs []byte
// src is the who's the sender of the packet.
src key.NodePublic
}
// peerGoneMsg is a request to write a peerGone frame to an sclient
type peerGoneMsg struct {
peer key.NodePublic
reason PeerGoneReasonType
}
func (c *sclient) setPreferred(v bool) {
if c.preferred == v {
return
}
c.preferred = v
var homeMove *expvar.Int
if v {
c.s.curHomeClients.Add(1)
homeMove = &c.s.homeMovesIn
} else {
c.s.curHomeClients.Add(-1)
homeMove = &c.s.homeMovesOut
}
// Keep track of varz for home serve moves in/out. But ignore
// the initial packet set when a client connects, which we
// assume happens within 5 seconds. In any case, just for
// graphs, so not important to miss a move. But it shouldn't:
// the netcheck/re-STUNs in magicsock only happen about every
// 30 seconds.
if c.s.clock.Since(c.connectedAt) > 5*time.Second {
homeMove.Add(1)
}
}
// expMovingAverage returns the new moving average given the previous average,
// a new value, and an alpha decay factor.
// https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
func expMovingAverage(prev, newValue, alpha float64) float64 {
return alpha*newValue + (1-alpha)*prev
}
// recordQueueTime updates the average queue duration metric after a packet has been sent.
func (c *sclient) recordQueueTime(enqueuedAt time.Time) {
elapsed := float64(c.s.clock.Since(enqueuedAt).Milliseconds())
for {
old := atomic.LoadUint64(c.s.avgQueueDuration)
newAvg := expMovingAverage(math.Float64frombits(old), elapsed, 0.1)
if atomic.CompareAndSwapUint64(c.s.avgQueueDuration, old, math.Float64bits(newAvg)) {
break
}
}
}
// onSendLoopDone is called when the send loop is done
// to clean up.
//
// It must only be called from the sendLoop goroutine.
func (c *sclient) onSendLoopDone() {
// If the sender shuts down unilaterally due to an error, close so
// that the receive loop unblocks and cleans up the rest.
c.nc.Close()
// Clean up watches.
for peer, h := range c.sawSrc {
c.s.removePeerGoneFromRegionWatcher(peer, h)
}
// Drain the send queue to count dropped packets
for {
select {
case pkt := <-c.sendQueue:
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
case pkt := <-c.discoSendQueue:
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
default:
return
}
}
}
func (c *sclient) sendLoop(ctx context.Context) error {
defer c.onSendLoopDone()
jitter := rand.N(5 * time.Second)
keepAliveTick, keepAliveTickChannel := c.s.clock.NewTicker(keepAlive + jitter)
defer keepAliveTick.Stop()
var werr error // last write error
inBatch := -1 // for bufferedWriteFrames
for {
if werr != nil {
return werr
}
inBatch++
// First, a non-blocking select (with a default) that
// does as many non-flushing writes as possible.
select {
case <-ctx.Done():
return nil
case msg := <-c.peerGone:
werr = c.sendPeerGone(msg.peer, msg.reason)
continue
case <-c.meshUpdate:
werr = c.sendMeshUpdates()
continue
case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case msg := <-c.discoSendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case msg := <-c.sendPongCh:
werr = c.sendPong(msg)
continue
case <-keepAliveTickChannel:
werr = c.sendKeepAlive()
continue
default:
// Flush any writes from the 3 sends above, or from
// the blocking loop below.
if werr = c.bw.Flush(); werr != nil {
return werr
}
if inBatch != 0 { // the first loop will almost hit default & be size zero
c.s.bufferedWriteFrames.Observe(float64(inBatch))
inBatch = 0
}
}
// Then a blocking select with same:
select {
case <-ctx.Done():
return nil
case msg := <-c.peerGone:
werr = c.sendPeerGone(msg.peer, msg.reason)
case <-c.meshUpdate:
werr = c.sendMeshUpdates()
case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case msg := <-c.discoSendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case msg := <-c.sendPongCh:
werr = c.sendPong(msg)
case <-keepAliveTickChannel:
werr = c.sendKeepAlive()
}
}
}
func (c *sclient) setWriteDeadline() {
c.nc.SetWriteDeadline(time.Now().Add(writeTimeout))
}
// sendKeepAlive sends a keep-alive frame, without flushing.
func (c *sclient) sendKeepAlive() error {
c.setWriteDeadline()
return writeFrameHeader(c.bw.bw(), frameKeepAlive, 0)
}
// sendPong sends a pong reply, without flushing.
func (c *sclient) sendPong(data [8]byte) error {
c.s.sentPong.Add(1)
c.setWriteDeadline()
if err := writeFrameHeader(c.bw.bw(), framePong, uint32(len(data))); err != nil {
return err
}
_, err := c.bw.Write(data[:])
return err
}
const (
peerGoneFrameLen = keyLen + 1
peerPresentFrameLen = keyLen + 16 + 2 + 1 // 16 byte IP + 2 byte port + 1 byte flags
)
// sendPeerGone sends a peerGone frame, without flushing.
func (c *sclient) sendPeerGone(peer key.NodePublic, reason PeerGoneReasonType) error {
switch reason {
case PeerGoneReasonDisconnected:
c.s.peerGoneDisconnectedFrames.Add(1)
case PeerGoneReasonNotHere:
c.s.peerGoneNotHereFrames.Add(1)
}
c.setWriteDeadline()
data := make([]byte, 0, peerGoneFrameLen)
data = peer.AppendTo(data)
data = append(data, byte(reason))
if err := writeFrameHeader(c.bw.bw(), framePeerGone, uint32(len(data))); err != nil {
return err
}
_, err := c.bw.Write(data)
return err
}
// sendPeerPresent sends a peerPresent frame, without flushing.
func (c *sclient) sendPeerPresent(peer key.NodePublic, ipPort netip.AddrPort, flags PeerPresentFlags) error {
c.setWriteDeadline()
if err := writeFrameHeader(c.bw.bw(), framePeerPresent, peerPresentFrameLen); err != nil {
return err
}
payload := make([]byte, peerPresentFrameLen)
_ = peer.AppendTo(payload[:0])
a16 := ipPort.Addr().As16()
copy(payload[keyLen:], a16[:])
binary.BigEndian.PutUint16(payload[keyLen+16:], ipPort.Port())
payload[keyLen+18] = byte(flags)
_, err := c.bw.Write(payload)
return err
}
// sendMeshUpdates drains all mesh peerStateChange entries into the write buffer
// without flushing.
func (c *sclient) sendMeshUpdates() error {
var lastBatch []peerConnState // memory to best effort reuse
// takeAll returns c.peerStateChange and empties it.
takeAll := func() []peerConnState {
c.s.mu.Lock()
defer c.s.mu.Unlock()
if len(c.peerStateChange) == 0 {
return nil
}
batch := c.peerStateChange
if cap(lastBatch) > 16 {
lastBatch = nil
}
c.peerStateChange = lastBatch[:0]
return batch
}
for loops := 0; ; loops++ {
batch := takeAll()
if len(batch) == 0 {
c.s.meshUpdateLoopCount.Observe(float64(loops))
return nil
}
c.s.meshUpdateBatchSize.Observe(float64(len(batch)))
for _, pcs := range batch {
var err error
if pcs.present {
err = c.sendPeerPresent(pcs.peer, pcs.ipPort, pcs.flags)
} else {
err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected)
}
if err != nil {
return err
}
}
lastBatch = batch
}
}
// sendPacket writes contents to the client in a RecvPacket frame. If
// srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses
// DERPv2. The bytes of contents are only valid until this function
// returns, do not retain slices.
// It does not flush its bufio.Writer.
func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error) {
defer func() {
// Stats update.
if err != nil {
c.s.recordDrop(contents, srcKey, c.key, dropReasonWriteError)
} else {
c.s.packetsSent.Add(1)
c.s.bytesSent.Add(int64(len(contents)))
}
c.debugLogf("sendPacket from %s: %v", srcKey.ShortString(), err)
}()
c.setWriteDeadline()
withKey := !srcKey.IsZero()
pktLen := len(contents)
if withKey {
pktLen += key.NodePublicRawLen
c.noteSendFromSrc(srcKey)
}
if err = writeFrameHeader(c.bw.bw(), frameRecvPacket, uint32(pktLen)); err != nil {
return err
}
if withKey {
if err := srcKey.WriteRawWithoutAllocating(c.bw.bw()); err != nil {
return err
}
}
_, err = c.bw.Write(contents)
return err
}
// noteSendFromSrc notes that we are about to write a packet
// from src to sclient.
//
// It must only be called from the sendLoop goroutine.
func (c *sclient) noteSendFromSrc(src key.NodePublic) {
if _, ok := c.sawSrc[src]; ok {
return
}
h := c.s.addPeerGoneFromRegionWatcher(src, c.onPeerGoneFromRegion)
mak.Set(&c.sawSrc, src, h)
}
// AddPacketForwarder registers fwd as a packet forwarder for dst.
// fwd must be comparable.
func (s *Server) AddPacketForwarder(dst key.NodePublic, fwd PacketForwarder) {
s.mu.Lock()
defer s.mu.Unlock()
if prev, ok := s.clientsMesh[dst]; ok {
if prev == fwd {
// Duplicate registration of same forwarder. Ignore.
return
}
if m, ok := prev.(*multiForwarder); ok {
if _, ok := m.all[fwd]; ok {
// Duplicate registration of same forwarder in set; ignore.
return
}
m.add(fwd)
return
}
if prev != nil {
// Otherwise, the existing value is not a set,
// not a dup, and not local-only (nil) so make
// it a set. `prev` existed first, so will have higher
// priority.
fwd = newMultiForwarder(prev, fwd)
s.multiForwarderCreated.Add(1)
}
}
s.clientsMesh[dst] = fwd
}
// RemovePacketForwarder removes fwd as a packet forwarder for dst.
// fwd must be comparable.
func (s *Server) RemovePacketForwarder(dst key.NodePublic, fwd PacketForwarder) {
s.mu.Lock()
defer s.mu.Unlock()
v, ok := s.clientsMesh[dst]
if !ok {
return
}
if m, ok := v.(*multiForwarder); ok {
if len(m.all) < 2 {
panic("unexpected")
}
if remain, isLast := m.deleteLocked(fwd); isLast {
// If fwd was in m and we no longer need to be a
// multiForwarder, replace the entry with the
// remaining PacketForwarder.
s.clientsMesh[dst] = remain
s.multiForwarderDeleted.Add(1)
}
return
}
if v != fwd {
s.removePktForwardOther.Add(1)
// Delete of an entry that wasn't in the
// map. Harmless, so ignore.
// (This might happen if a user is moving around
// between nodes and/or the server sent duplicate
// connection change broadcasts.)
return
}
if _, isLocal := s.clients[dst]; isLocal {
s.clientsMesh[dst] = nil
} else {
delete(s.clientsMesh, dst)
s.notePeerGoneFromRegionLocked(dst)
}
}
// multiForwarder is a PacketForwarder that represents a set of
// forwarding options. It's used in the rare cases that a client is
// connected to multiple DERP nodes in a region. That shouldn't really
// happen except for perhaps during brief moments while the client is
// reconfiguring, in which case we don't want to forget where the
// client is. The map value is unique connection number; the lowest
// one has been seen the longest. It's used to make sure we forward
// packets consistently to the same node and don't pick randomly.
type multiForwarder struct {
fwd syncs.AtomicValue[PacketForwarder] // preferred forwarder.
all map[PacketForwarder]uint8 // all forwarders, protected by s.mu.
}
// newMultiForwarder creates a new multiForwarder.
// The first PacketForwarder passed to this function will be the preferred one.
func newMultiForwarder(fwds ...PacketForwarder) *multiForwarder {
f := &multiForwarder{all: make(map[PacketForwarder]uint8)}
f.fwd.Store(fwds[0])
for idx, fwd := range fwds {
f.all[fwd] = uint8(idx)
}
return f
}
// add adds a new forwarder to the map with a connection number that
// is higher than the existing ones.
func (f *multiForwarder) add(fwd PacketForwarder) {
var max uint8
for _, v := range f.all {
if v > max {
max = v
}
}
f.all[fwd] = max + 1
}
// deleteLocked removes a packet forwarder from the map. It expects Server.mu to be held.
// If only one forwarder remains after the removal, it will be returned alongside a `true` boolean value.
func (f *multiForwarder) deleteLocked(fwd PacketForwarder) (_ PacketForwarder, isLast bool) {
delete(f.all, fwd)
if fwd == f.fwd.Load() {
// The preferred forwarder has been removed, choose a new one
// based on the lowest index.
var lowestfwd PacketForwarder
var lowest uint8
for k, v := range f.all {
if lowestfwd == nil || v < lowest {
lowestfwd = k
lowest = v
}
}
if lowestfwd != nil {
f.fwd.Store(lowestfwd)
}
}
if len(f.all) == 1 {
for k := range f.all {
return k, true
}
}
return nil, false
}
func (f *multiForwarder) ForwardPacket(src, dst key.NodePublic, payload []byte) error {
return f.fwd.Load().ForwardPacket(src, dst, payload)
}
derp: add optional debug logging for prober clients This allows tracking packet flow via logs for prober clients. Note that the new sclient.debug() function is called on every received packet, but will do nothing for most clients. I have adjusted sclient logging to print public keys in short format rather than full. This takes effect even for existing non-debug logging (mostly client disconnect messages). Example logs for a packet being sent from client [SbsJn] (connected to derper [dM2E3]) to client [10WOo] (connected to derper [AVxvv]): ``` derper [dM2E3]: derp client 10.0.0.1:35470[SbsJn]: register single client mesh("10.0.1.1"): 4 peers derp client 10.0.0.1:35470[SbsJn]: read frame type 4 len 40 err <nil> derp client 10.0.0.1:35470[SbsJn]: SendPacket for [10WOo], forwarding via <derphttp_client.Client [AVxvv] url=https://10.0.1.1/derp>: <nil> derp client 10.0.0.1:35470[SbsJn]: read frame type 0 len 0 err EOF derp client 10.0.0.1:35470[SbsJn]: read EOF derp client 10.0.0.1:35470[SbsJn]: sender failed: context canceled derp client 10.0.0.1:35470[SbsJn]: removing connection derper [AVxvv]: derp client 10.0.1.1:50650[10WOo]: register single client derp client 10.0.1.1:50650[10WOo]: received forwarded packet from [SbsJn] via [dM2E3] derp client 10.0.1.1:50650[10WOo]: sendPkt attempt 0 enqueued derp client 10.0.1.1:50650[10WOo]: sendPacket from [SbsJn]: <nil> derp client 10.0.1.1:50650[10WOo]: read frame type 0 len 0 err EOF derp client 10.0.1.1:50650[10WOo]: read EOF derp client 10.0.1.1:50650[10WOo]: sender failed: context canceled derp client 10.0.1.1:50650[10WOo]: removing connection ``` Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-20 15:15:45 +00:00
func (f *multiForwarder) String() string {
return fmt.Sprintf("<MultiForwarder fwd=%s total=%d>", f.fwd.Load(), len(f.all))
}
func (s *Server) expVarFunc(f func() any) expvar.Func {
return expvar.Func(func() any {
s.mu.Lock()
defer s.mu.Unlock()
return f()
})
}
// ExpVar returns an expvar variable suitable for registering with expvar.Publish.
func (s *Server) ExpVar() expvar.Var {
m := new(metrics.Set)
m.Set("gauge_memstats_sys0", expvar.Func(func() any { return int64(s.memSys0) }))
m.Set("gauge_watchers", s.expVarFunc(func() any { return len(s.watchers) }))
m.Set("gauge_current_file_descriptors", expvar.Func(func() any { return metrics.CurrentFDs() }))
m.Set("gauge_current_connections", &s.curClients)
m.Set("gauge_current_home_connections", &s.curHomeClients)
m.Set("gauge_clients_total", expvar.Func(func() any { return len(s.clientsMesh) }))
m.Set("gauge_clients_local", expvar.Func(func() any { return len(s.clients) }))
m.Set("gauge_clients_remote", expvar.Func(func() any { return len(s.clientsMesh) - len(s.clients) }))
m.Set("gauge_current_dup_client_keys", &s.dupClientKeys)
m.Set("gauge_current_dup_client_conns", &s.dupClientConns)
m.Set("counter_total_dup_client_conns", &s.dupClientConnTotal)
m.Set("accepts", &s.accepts)
m.Set("bytes_received", &s.bytesRecv)
m.Set("bytes_sent", &s.bytesSent)
m.Set("packets_dropped", &s.packetsDropped)
m.Set("counter_packets_dropped_reason", &s.packetsDroppedReason)
m.Set("counter_packets_dropped_type", &s.packetsDroppedType)
2020-08-11 20:59:08 +01:00
m.Set("counter_packets_received_kind", &s.packetsRecvByKind)
m.Set("packets_sent", &s.packetsSent)
m.Set("packets_received", &s.packetsRecv)
m.Set("unknown_frames", &s.unknownFrames)
m.Set("home_moves_in", &s.homeMovesIn)
m.Set("home_moves_out", &s.homeMovesOut)
m.Set("got_ping", &s.gotPing)
m.Set("sent_pong", &s.sentPong)
m.Set("peer_gone_disconnected_frames", &s.peerGoneDisconnectedFrames)
m.Set("peer_gone_not_here_frames", &s.peerGoneNotHereFrames)
m.Set("packets_forwarded_out", &s.packetsForwardedOut)
m.Set("packets_forwarded_in", &s.packetsForwardedIn)
m.Set("multiforwarder_created", &s.multiForwarderCreated)
m.Set("multiforwarder_deleted", &s.multiForwarderDeleted)
m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther)
m.Set("average_queue_duration_ms", expvar.Func(func() any {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
}))
m.Set("counter_tcp_rtt", &s.tcpRtt)
m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize)
m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount)
m.Set("counter_buffered_write_frames", s.bufferedWriteFrames)
var expvarVersion expvar.String
expvarVersion.Set(version.Long())
m.Set("version", &expvarVersion)
return m
}
func (s *Server) ConsistencyCheck() error {
s.mu.Lock()
defer s.mu.Unlock()
var errs []string
var nilMeshNotInClient int
for k, f := range s.clientsMesh {
if f == nil {
if _, ok := s.clients[k]; !ok {
nilMeshNotInClient++
}
}
}
if nilMeshNotInClient != 0 {
errs = append(errs, fmt.Sprintf("%d s.clientsMesh keys not in s.clients", nilMeshNotInClient))
}
var clientNotInMesh int
for k := range s.clients {
if _, ok := s.clientsMesh[k]; !ok {
clientNotInMesh++
}
}
if clientNotInMesh != 0 {
errs = append(errs, fmt.Sprintf("%d s.clients keys not in s.clientsMesh", clientNotInMesh))
}
if s.curClients.Value() != int64(len(s.clients)) {
errs = append(errs, fmt.Sprintf("expvar connections = %d != clients map says of %d",
s.curClients.Value(),
len(s.clients)))
}
if s.verifyClientsLocalTailscaled {
if err := s.checkVerifyClientsLocalTailscaled(); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) == 0 {
return nil
}
return errors.New(strings.Join(errs, ", "))
}
derp: remove two key.Public allocations Reading and writing a [32]byte key to a bufio.Reader/bufio.Writer can easily by done without allocating. Do so. It is slower; on my machine, it adds about 100ns per read/write. However, the overall request takes a minimum of several µs, and it cuts allocations meaningfully, so it is probably worth it. name old time/op new time/op delta SendRecv/msgsize=10-8 9.21µs ± 9% 9.08µs ± 8% ~ (p=0.250 n=15+15) SendRecv/msgsize=100-8 6.51µs ± 9% 6.60µs ± 7% ~ (p=0.259 n=15+13) SendRecv/msgsize=1000-8 7.24µs ±13% 7.61µs ±36% ~ (p=1.000 n=11+15) SendRecv/msgsize=10000-8 19.5µs ±15% 19.9µs ±25% ~ (p=0.890 n=14+15) name old speed new speed delta SendRecv/msgsize=10-8 1.09MB/s ± 8% 1.10MB/s ± 8% ~ (p=0.286 n=15+15) SendRecv/msgsize=100-8 15.4MB/s ± 8% 15.1MB/s ± 6% ~ (p=0.129 n=15+12) SendRecv/msgsize=1000-8 139MB/s ±15% 135MB/s ±28% ~ (p=1.000 n=11+15) SendRecv/msgsize=10000-8 516MB/s ±17% 506MB/s ±21% ~ (p=0.880 n=14+15) name old alloc/op new alloc/op delta SendRecv/msgsize=10-8 170B ± 1% 108B ± 1% -36.63% (p=0.000 n=15+15) SendRecv/msgsize=100-8 265B ± 1% 203B ± 1% -23.34% (p=0.000 n=15+15) SendRecv/msgsize=1000-8 1.18kB ± 1% 1.12kB ± 0% -5.31% (p=0.000 n=14+14) SendRecv/msgsize=10000-8 18.8kB ± 2% 18.8kB ± 2% ~ (p=0.443 n=12+12) name old allocs/op new allocs/op delta SendRecv/msgsize=10-8 4.00 ± 0% 2.00 ± 0% -50.00% (p=0.000 n=15+15) SendRecv/msgsize=100-8 4.00 ± 0% 2.00 ± 0% -50.00% (p=0.000 n=15+15) SendRecv/msgsize=1000-8 4.00 ± 0% 2.00 ± 0% -50.00% (p=0.000 n=15+15) SendRecv/msgsize=10000-8 5.00 ± 0% 3.00 ± 0% -40.00% (p=0.000 n=13+14) Signed-off-by: Josh Bleecher Snyder <josharian@gmail.com>
2020-08-12 00:40:36 +01:00
// checkVerifyClientsLocalTailscaled checks that a verifyClients call can be made successfully for the derper hosts own node key.
func (s *Server) checkVerifyClientsLocalTailscaled() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
status, err := localClient.StatusWithoutPeers(ctx)
if err != nil {
return fmt.Errorf("localClient.Status: %w", err)
}
info := &clientInfo{
IsProber: true,
}
clientIP := netip.IPv6Loopback()
if err := s.verifyClient(ctx, status.Self.PublicKey, info, clientIP); err != nil {
return fmt.Errorf("verifyClient for self nodekey: %w", err)
}
return nil
}
const minTimeBetweenLogs = 2 * time.Second
// BytesSentRecv records the number of bytes that have been sent since the last traffic check
// for a given process, as well as the public key of the process sending those bytes.
type BytesSentRecv struct {
Sent uint64
Recv uint64
// Key is the public key of the client which sent/received these bytes.
Key key.NodePublic
}
// parseSSOutput parses the output from the specific call to ss in ServeDebugTraffic.
// Separated out for ease of testing.
func parseSSOutput(raw string) map[netip.AddrPort]BytesSentRecv {
newState := map[netip.AddrPort]BytesSentRecv{}
// parse every 2 lines and get src and dst ips, and kv pairs
lines := strings.Split(raw, "\n")
for i := 0; i < len(lines); i += 2 {
ipInfo := strings.Fields(strings.TrimSpace(lines[i]))
if len(ipInfo) < 5 {
continue
}
src, err := netip.ParseAddrPort(ipInfo[4])
if err != nil {
continue
}
stats := strings.Fields(strings.TrimSpace(lines[i+1]))
stat := BytesSentRecv{}
for _, s := range stats {
if strings.Contains(s, "bytes_sent") {
sent, err := strconv.Atoi(s[strings.Index(s, ":")+1:])
if err == nil {
stat.Sent = uint64(sent)
}
} else if strings.Contains(s, "bytes_received") {
recv, err := strconv.Atoi(s[strings.Index(s, ":")+1:])
if err == nil {
stat.Recv = uint64(recv)
}
}
}
newState[src] = stat
}
return newState
}
func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) {
prevState := map[netip.AddrPort]BytesSentRecv{}
enc := json.NewEncoder(w)
for r.Context().Err() == nil {
output, err := exec.Command("ss", "-i", "-H", "-t").Output()
if err != nil {
fmt.Fprintf(w, "ss failed: %v", err)
return
}
newState := parseSSOutput(string(output))
s.mu.Lock()
for k, next := range newState {
prev := prevState[k]
if prev.Sent < next.Sent || prev.Recv < next.Recv {
if pkey, ok := s.keyOfAddr[k]; ok {
next.Key = pkey
if err := enc.Encode(next); err != nil {
s.mu.Unlock()
return
}
}
}
}
s.mu.Unlock()
prevState = newState
if _, err := fmt.Fprintln(w); err != nil {
return
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
time.Sleep(minTimeBetweenLogs)
}
}
var bufioWriterPool = &sync.Pool{
New: func() any {
return bufio.NewWriterSize(io.Discard, 2<<10)
},
}
// lazyBufioWriter is a bufio.Writer-like wrapping writer that lazily
// allocates its actual bufio.Writer from a sync.Pool, releasing it to
// the pool upon flush.
//
// We do this to reduce memory overhead; most DERP connections are
// idle and the idle bufio.Writers were 30% of overall memory usage.
type lazyBufioWriter struct {
w io.Writer // underlying
lbw *bufio.Writer // lazy; nil means it needs an associated buffer
}
func (w *lazyBufioWriter) bw() *bufio.Writer {
if w.lbw == nil {
w.lbw = bufioWriterPool.Get().(*bufio.Writer)
w.lbw.Reset(w.w)
}
return w.lbw
}
func (w *lazyBufioWriter) Available() int { return w.bw().Available() }
func (w *lazyBufioWriter) Write(p []byte) (int, error) { return w.bw().Write(p) }
func (w *lazyBufioWriter) Flush() error {
if w.lbw == nil {
return nil
}
err := w.lbw.Flush()
w.lbw.Reset(io.Discard)
bufioWriterPool.Put(w.lbw)
w.lbw = nil
return err
}