2020-02-05 22:16:58 +00:00
// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package derp
// TODO(crawshaw): with predefined serverKey in clients and HMAC on packets we could skip TLS
import (
"bufio"
"context"
2020-08-18 23:32:32 +01:00
"crypto/ed25519"
2020-02-17 21:52:11 +00:00
crand "crypto/rand"
2020-08-18 23:32:32 +01:00
"crypto/x509"
"crypto/x509/pkix"
2020-02-05 22:16:58 +00:00
"encoding/json"
2020-02-20 20:27:12 +00:00
"errors"
2020-02-21 17:35:53 +00:00
"expvar"
2020-02-05 22:16:58 +00:00
"fmt"
"io"
2020-03-05 23:00:56 +00:00
"io/ioutil"
2020-08-12 22:51:21 +01:00
"log"
2021-06-09 23:06:15 +01:00
"math"
2020-08-18 23:32:32 +01:00
"math/big"
2020-08-13 22:06:43 +01:00
"math/rand"
2021-06-18 05:34:01 +01:00
"net/http"
2020-02-28 21:33:47 +00:00
"os"
2021-06-18 05:34:01 +01:00
"os/exec"
2020-03-20 22:22:02 +00:00
"runtime"
2020-02-28 21:33:47 +00:00
"strconv"
2020-06-23 21:59:48 +01:00
"strings"
2020-02-05 22:16:58 +00:00
"sync"
2021-06-09 23:06:15 +01:00
"sync/atomic"
2020-02-05 22:16:58 +00:00
"time"
2020-08-12 22:51:21 +01:00
"go4.org/mem"
2020-02-05 22:16:58 +00:00
"golang.org/x/crypto/nacl/box"
2020-03-22 20:08:17 +00:00
"golang.org/x/sync/errgroup"
2021-07-20 16:54:48 +01:00
"golang.org/x/time/rate"
2021-06-18 05:34:01 +01:00
"inet.af/netaddr"
2021-06-24 21:31:05 +01:00
"tailscale.com/client/tailscale"
2020-08-11 20:16:15 +01:00
"tailscale.com/disco"
2020-03-03 19:33:22 +00:00
"tailscale.com/metrics"
2020-02-17 21:52:11 +00:00
"tailscale.com/types/key"
2020-02-15 03:23:16 +00:00
"tailscale.com/types/logger"
2021-08-04 22:43:01 +01:00
"tailscale.com/types/pad32"
2020-08-07 19:51:44 +01:00
"tailscale.com/version"
2020-02-05 22:16:58 +00:00
)
2020-02-28 21:33:47 +00:00
var debug , _ = strconv . ParseBool ( os . Getenv ( "DERP_DEBUG_LOGS" ) )
2020-08-12 22:51:21 +01:00
// verboseDropKeys is the set of destination public keys that should
// verbosely log whenever DERP drops a packet.
var verboseDropKeys = map [ key . Public ] bool { }
func init ( ) {
keys := os . Getenv ( "TS_DEBUG_VERBOSE_DROPS" )
if keys == "" {
return
}
for _ , keyStr := range strings . Split ( keys , "," ) {
k , err := key . NewPublicFromHexMem ( mem . S ( keyStr ) )
if err != nil {
log . Printf ( "ignoring invalid debug key %q: %v" , keyStr , err )
} else {
verboseDropKeys [ k ] = true
}
}
}
2020-08-13 22:06:43 +01:00
func init ( ) {
rand . Seed ( time . Now ( ) . UnixNano ( ) )
}
2020-03-20 07:38:52 +00:00
const (
perClientSendQueueDepth = 32 // packets buffered for sending
writeTimeout = 2 * time . Second
)
2020-03-20 07:14:43 +00:00
2020-02-20 16:50:25 +00:00
// Server is a DERP server.
2020-02-05 22:16:58 +00:00
type Server struct {
2020-03-12 15:13:07 +00:00
// WriteTimeout, if non-zero, specifies how long to wait
// before failing when writing to a client.
WriteTimeout time . Duration
2020-08-12 22:51:21 +01:00
privateKey key . Private
publicKey key . Public
logf logger . Logf
memSys0 uint64 // runtime.MemStats.Sys at start (or early-ish)
meshKey string
limitedLogf logger . Logf
2020-08-18 23:32:32 +01:00
metaCert [ ] byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
2020-02-05 22:16:58 +00:00
2020-02-21 17:35:53 +00:00
// Counters:
2021-08-04 22:43:01 +01:00
_ pad32 . Four
2021-07-12 21:05:55 +01:00
packetsSent , bytesSent expvar . Int
packetsRecv , bytesRecv expvar . Int
packetsRecvByKind metrics . LabelMap
packetsRecvDisco * expvar . Int
packetsRecvOther * expvar . Int
2021-08-04 22:43:01 +01:00
_ pad32 . Four
2021-07-12 21:05:55 +01:00
packetsDropped expvar . Int
packetsDroppedReason metrics . LabelMap
packetsDroppedReasonCounters [ ] * expvar . Int // indexed by dropReason
packetsDroppedType metrics . LabelMap
packetsDroppedTypeDisco * expvar . Int
packetsDroppedTypeOther * expvar . Int
2021-08-04 22:43:01 +01:00
_ pad32 . Four
2021-07-12 21:05:55 +01:00
packetsForwardedOut expvar . Int
packetsForwardedIn expvar . Int
peerGoneFrames expvar . Int // number of peer gone frames sent
accepts expvar . Int
curClients expvar . Int
curHomeClients expvar . Int // ones with preferred
clientsReplaced expvar . Int
2021-07-20 16:54:48 +01:00
clientsReplaceLimited expvar . Int
clientsReplaceSleeping expvar . Int
2021-07-12 21:05:55 +01:00
unknownFrames expvar . Int
homeMovesIn expvar . Int // established clients announce home server moves in
homeMovesOut expvar . Int // established clients announce home server moves out
multiForwarderCreated expvar . Int
multiForwarderDeleted expvar . Int
removePktForwardOther expvar . Int
avgQueueDuration * uint64 // In milliseconds; accessed atomically
2020-02-21 17:35:53 +00:00
2021-06-24 21:31:05 +01:00
// verifyClients only accepts client connections to the DERP server if the clientKey is a
// known peer in the network, as specified by a running tailscaled's client's local api.
verifyClients bool
2021-07-13 16:14:10 +01:00
mu sync . Mutex
closed bool
netConns map [ Conn ] chan struct { } // chan is closed when conn closes
clients map [ key . Public ] * sclient
watchers map [ * sclient ] bool // mesh peer -> true
2020-06-05 20:47:23 +01:00
// clientsMesh tracks all clients in the cluster, both locally
// and to mesh peers. If the value is nil, that means the
2020-06-23 21:59:48 +01:00
// peer is only local (and thus in the clients Map, but not
// remote). If the value is non-nil, it's remote (+ maybe also
// local).
2020-06-05 20:47:23 +01:00
clientsMesh map [ key . Public ] PacketForwarder
2020-06-22 18:06:42 +01:00
// sentTo tracks which peers have sent to which other peers,
// and at which connection number. This isn't on sclient
// because it includes intra-region forwarded packets as the
// src.
sentTo map [ key . Public ] map [ key . Public ] int64 // src => dst => dst's latest sclient.connNum
2021-06-18 05:34:01 +01:00
// maps from netaddr.IPPort to a client's public key
keyOfAddr map [ netaddr . IPPort ] key . Public
2020-06-03 22:42:20 +01:00
}
// PacketForwarder is something that can forward packets.
//
2021-08-24 15:36:48 +01:00
// It's mostly an interface for circular dependency reasons; the
2020-06-03 22:42:20 +01:00
// typical implementation is derphttp.Client. The other implementation
// is a multiForwarder, which this package creates as needed if a
// public key gets more than one PacketForwarder registered for it.
type PacketForwarder interface {
ForwardPacket ( src , dst key . Public , payload [ ] byte ) error
2020-02-05 22:16:58 +00:00
}
2020-03-12 15:05:03 +00:00
// Conn is the subset of the underlying net.Conn the DERP Server needs.
// It is a defined type so that non-net connections can be used.
type Conn interface {
2021-08-02 17:17:08 +01:00
io . WriteCloser
2020-03-12 15:05:03 +00:00
// The *Deadline methods follow the semantics of net.Conn.
SetDeadline ( time . Time ) error
SetReadDeadline ( time . Time ) error
SetWriteDeadline ( time . Time ) error
}
2020-02-20 16:50:25 +00:00
// NewServer returns a new DERP server. It doesn't listen on its own.
// Connections are given to it via Server.Accept.
2020-02-17 21:52:11 +00:00
func NewServer ( privateKey key . Private , logf logger . Logf ) * Server {
2020-03-20 22:22:02 +00:00
var ms runtime . MemStats
runtime . ReadMemStats ( & ms )
2020-02-05 22:16:58 +00:00
s := & Server {
2020-03-20 07:57:53 +00:00
privateKey : privateKey ,
publicKey : privateKey . Public ( ) ,
logf : logf ,
2020-08-12 22:51:21 +01:00
limitedLogf : logger . RateLimitedFn ( logf , 30 * time . Second , 5 , 100 ) ,
2020-08-11 20:16:15 +01:00
packetsRecvByKind : metrics . LabelMap { Label : "kind" } ,
2020-03-20 07:57:53 +00:00
packetsDroppedReason : metrics . LabelMap { Label : "reason" } ,
2021-07-12 21:05:55 +01:00
packetsDroppedType : metrics . LabelMap { Label : "type" } ,
2020-06-22 18:06:42 +01:00
clients : map [ key . Public ] * sclient { } ,
2020-06-03 22:42:20 +01:00
clientsMesh : map [ key . Public ] PacketForwarder { } ,
2020-06-22 18:06:42 +01:00
netConns : map [ Conn ] chan struct { } { } ,
2020-03-20 22:22:02 +00:00
memSys0 : ms . Sys ,
2020-06-01 23:19:41 +01:00
watchers : map [ * sclient ] bool { } ,
2020-06-22 18:06:42 +01:00
sentTo : map [ key . Public ] map [ key . Public ] int64 { } ,
2021-06-09 23:06:15 +01:00
avgQueueDuration : new ( uint64 ) ,
2021-06-18 05:34:01 +01:00
keyOfAddr : map [ netaddr . IPPort ] key . Public { } ,
2020-03-20 07:57:53 +00:00
}
2020-08-18 23:32:32 +01:00
s . initMetacert ( )
2020-08-11 20:16:15 +01:00
s . packetsRecvDisco = s . packetsRecvByKind . Get ( "disco" )
2020-08-11 20:30:15 +01:00
s . packetsRecvOther = s . packetsRecvByKind . Get ( "other" )
2021-07-12 21:05:55 +01:00
s . packetsDroppedReasonCounters = [ ] * expvar . Int {
s . packetsDroppedReason . Get ( "unknown_dest" ) ,
s . packetsDroppedReason . Get ( "unknown_dest_on_fwd" ) ,
s . packetsDroppedReason . Get ( "gone" ) ,
s . packetsDroppedReason . Get ( "queue_head" ) ,
s . packetsDroppedReason . Get ( "queue_tail" ) ,
s . packetsDroppedReason . Get ( "write_error" ) ,
}
s . packetsDroppedTypeDisco = s . packetsDroppedType . Get ( "disco" )
s . packetsDroppedTypeOther = s . packetsDroppedType . Get ( "other" )
2020-02-05 22:16:58 +00:00
return s
}
2020-06-01 23:19:41 +01:00
// SetMesh sets the pre-shared key that regional DERP servers used to mesh
// amongst themselves.
//
// It must be called before serving begins.
func ( s * Server ) SetMeshKey ( v string ) {
s . meshKey = v
}
2021-06-24 21:31:05 +01:00
// SetVerifyClients sets whether this DERP server verifies clients through tailscaled.
//
// It must be called before serving begins.
func ( s * Server ) SetVerifyClient ( v bool ) {
s . verifyClients = v
}
2020-06-04 19:35:53 +01:00
// HasMeshKey reports whether the server is configured with a mesh key.
2020-06-01 23:19:41 +01:00
func ( s * Server ) HasMeshKey ( ) bool { return s . meshKey != "" }
2020-06-04 19:35:53 +01:00
// MeshKey returns the configured mesh key, if any.
func ( s * Server ) MeshKey ( ) string { return s . meshKey }
// PrivateKey returns the server's private key.
func ( s * Server ) PrivateKey ( ) key . Private { return s . privateKey }
// PublicKey returns the server's public key.
func ( s * Server ) PublicKey ( ) key . Public { return s . publicKey }
2020-02-20 16:50:25 +00:00
// Close closes the server and waits for the connections to disconnect.
2020-02-05 22:16:58 +00:00
func ( s * Server ) Close ( ) error {
2020-02-20 22:27:28 +00:00
s . mu . Lock ( )
wasClosed := s . closed
s . closed = true
s . mu . Unlock ( )
if wasClosed {
return nil
}
2020-02-05 22:16:58 +00:00
var closedChs [ ] chan struct { }
s . mu . Lock ( )
2020-02-20 16:50:25 +00:00
for nc , closed := range s . netConns {
nc . Close ( )
2020-02-05 22:16:58 +00:00
closedChs = append ( closedChs , closed )
}
s . mu . Unlock ( )
for _ , closed := range closedChs {
<- closed
}
return nil
}
2020-02-20 22:27:28 +00:00
func ( s * Server ) isClosed ( ) bool {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return s . closed
}
2020-02-28 21:18:10 +00:00
// Accept adds a new connection to the server and serves it.
//
2020-02-20 16:50:25 +00:00
// The provided bufio ReadWriter must be already connected to nc.
// Accept blocks until the Server is closed or the connection closes
// on its own.
2020-02-28 21:18:10 +00:00
//
// Accept closes nc.
2020-03-12 15:05:03 +00:00
func ( s * Server ) Accept ( nc Conn , brw * bufio . ReadWriter , remoteAddr string ) {
2020-02-05 22:16:58 +00:00
closed := make ( chan struct { } )
s . mu . Lock ( )
2020-03-22 01:24:28 +00:00
s . accepts . Add ( 1 ) // while holding s.mu for connNum read on next line
connNum := s . accepts . Value ( ) // expvar sadly doesn't return new value on Add(1)
2020-02-20 16:50:25 +00:00
s . netConns [ nc ] = closed
2020-02-05 22:16:58 +00:00
s . mu . Unlock ( )
defer func ( ) {
2020-02-20 16:50:25 +00:00
nc . Close ( )
2020-02-05 22:16:58 +00:00
close ( closed )
s . mu . Lock ( )
2020-02-20 16:50:25 +00:00
delete ( s . netConns , nc )
2020-02-05 22:16:58 +00:00
s . mu . Unlock ( )
} ( )
2020-03-22 01:24:28 +00:00
if err := s . accept ( nc , brw , remoteAddr , connNum ) ; err != nil && ! s . isClosed ( ) {
2020-03-12 15:05:03 +00:00
s . logf ( "derp: %s: %v" , remoteAddr , err )
2020-02-05 22:16:58 +00:00
}
}
2020-08-18 23:32:32 +01:00
// initMetacert initialized s.metaCert with a self-signed x509 cert
// encoding this server's public key and protocol version. cmd/derper
// then sends this after the Let's Encrypt leaf + intermediate certs
// after the ServerHello (encrypted in TLS 1.3, not that it matters
// much).
//
// Then the client can save a round trip getting that and can start
// speaking DERP right away. (We don't use ALPN because that's sent in
// the clear and we're being paranoid to not look too weird to any
// middleboxes, given that DERP is an ultimate fallback path). But
// since the post-ServerHello certs are encrypted we can have the
// client also use them as a signal to be able to start speaking DERP
// right away, starting with its identity proof, encrypted to the
// server's public key.
//
// This RTT optimization fails where there's a corp-mandated
// TLS proxy with corp-mandated root certs on employee machines and
// and TLS proxy cleans up unnecessary certs. In that case we just fall
// back to the extra RTT.
func ( s * Server ) initMetacert ( ) {
pub , priv , err := ed25519 . GenerateKey ( crand . Reader )
if err != nil {
log . Fatal ( err )
}
tmpl := & x509 . Certificate {
SerialNumber : big . NewInt ( ProtocolVersion ) ,
Subject : pkix . Name {
CommonName : fmt . Sprintf ( "derpkey%x" , s . publicKey [ : ] ) ,
} ,
2020-08-24 22:55:26 +01:00
// Windows requires NotAfter and NotBefore set:
NotAfter : time . Now ( ) . Add ( 30 * 24 * time . Hour ) ,
NotBefore : time . Now ( ) . Add ( - 30 * 24 * time . Hour ) ,
2020-08-18 23:32:32 +01:00
}
cert , err := x509 . CreateCertificate ( crand . Reader , tmpl , tmpl , pub , priv )
if err != nil {
log . Fatalf ( "CreateCertificate: %v" , err )
}
s . metaCert = cert
}
// MetaCert returns the server metadata cert that can be sent by the
// TLS server to let the client skip a round trip during start-up.
func ( s * Server ) MetaCert ( ) [ ] byte { return s . metaCert }
2020-02-20 17:56:19 +00:00
// registerClient notes that client c is now authenticated and ready for packets.
2021-07-20 16:54:48 +01:00
//
// If c's public key was already connected with a different
// connection, the prior one is closed, unless it's fighting rapidly
// with another client with the same key, in which case the returned
// ok is false, and the caller should wait the provided duration
// before trying again.
func ( s * Server ) registerClient ( c * sclient ) ( ok bool , d time . Duration ) {
2020-02-20 17:56:19 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
old := s . clients [ c . key ]
if old == nil {
2020-03-05 23:00:56 +00:00
c . logf ( "adding connection" )
2020-02-20 17:56:19 +00:00
} else {
2021-07-20 16:54:48 +01:00
// Take over the old rate limiter, discarding the one
// our caller just made.
c . replaceLimiter = old . replaceLimiter
if rr := c . replaceLimiter . ReserveN ( timeNow ( ) , 1 ) ; rr . OK ( ) {
if d := rr . DelayFrom ( timeNow ( ) ) ; d > 0 {
s . clientsReplaceLimited . Add ( 1 )
return false , d
}
}
2020-03-11 18:53:13 +00:00
s . clientsReplaced . Add ( 1 )
2020-03-12 15:05:03 +00:00
c . logf ( "adding connection, replacing %s" , old . remoteAddr )
go old . nc . Close ( )
2020-02-20 17:56:19 +00:00
}
s . clients [ c . key ] = c
2020-06-03 22:42:20 +01:00
if _ , ok := s . clientsMesh [ c . key ] ; ! ok {
s . clientsMesh [ c . key ] = nil // just for varz of total users in cluster
}
2021-06-18 05:34:01 +01:00
s . keyOfAddr [ c . remoteIPPort ] = c . key
2020-03-05 23:00:56 +00:00
s . curClients . Add ( 1 )
2020-06-01 23:19:41 +01:00
s . broadcastPeerStateChangeLocked ( c . key , true )
2021-07-20 16:54:48 +01:00
return true , 0
2020-06-01 23:19:41 +01:00
}
// broadcastPeerStateChangeLocked enqueues a message to all watchers
// (other DERP nodes in the region, or trusted clients) that peer's
// presence changed.
//
// s.mu must be held.
func ( s * Server ) broadcastPeerStateChangeLocked ( peer key . Public , present bool ) {
for w := range s . watchers {
w . peerStateChange = append ( w . peerStateChange , peerConnState { peer : peer , present : present } )
go w . requestMeshUpdate ( )
}
2020-02-20 17:56:19 +00:00
}
2020-02-20 20:27:12 +00:00
// unregisterClient removes a client from the server.
2020-02-20 17:56:19 +00:00
func ( s * Server ) unregisterClient ( c * sclient ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
cur := s . clients [ c . key ]
if cur == c {
2020-03-05 23:00:56 +00:00
c . logf ( "removing connection" )
2020-02-20 17:56:19 +00:00
delete ( s . clients , c . key )
2020-06-22 18:06:42 +01:00
if v , ok := s . clientsMesh [ c . key ] ; ok && v == nil {
delete ( s . clientsMesh , c . key )
s . notePeerGoneFromRegionLocked ( c . key )
}
s . broadcastPeerStateChangeLocked ( c . key , false )
2020-02-20 17:56:19 +00:00
}
2020-06-01 23:19:41 +01:00
if c . canMesh {
delete ( s . watchers , c )
}
2020-03-05 23:00:56 +00:00
2021-06-18 05:34:01 +01:00
delete ( s . keyOfAddr , c . remoteIPPort )
2020-03-05 23:00:56 +00:00
s . curClients . Add ( - 1 )
if c . preferred {
s . curHomeClients . Add ( - 1 )
}
2020-06-22 18:06:42 +01:00
}
// notePeerGoneFromRegionLocked sends peerGone frames to parties that
// key has sent to previously (whether those sends were from a local
// client or forwarded). It must only be called after the key has
// been removed from clientsMesh.
func ( s * Server ) notePeerGoneFromRegionLocked ( key key . Public ) {
if _ , ok := s . clientsMesh [ key ] ; ok {
panic ( "usage" )
}
2020-03-22 01:24:28 +00:00
2020-04-06 08:18:37 +01:00
// Find still-connected peers and either notify that we've gone away
// so they can drop their route entries to us (issue 150)
// or move them over to the active client (in case a replaced client
// connection is being unregistered).
2020-06-22 18:06:42 +01:00
for pubKey , connNum := range s . sentTo [ key ] {
2020-03-22 01:24:28 +00:00
if peer , ok := s . clients [ pubKey ] ; ok && peer . connNum == connNum {
2020-06-22 18:06:42 +01:00
go peer . requestPeerGoneWrite ( key )
2020-03-22 01:24:28 +00:00
}
}
2020-06-22 18:06:42 +01:00
delete ( s . sentTo , key )
2020-02-20 17:56:19 +00:00
}
2020-06-01 23:19:41 +01:00
func ( s * Server ) addWatcher ( c * sclient ) {
if ! c . canMesh {
panic ( "invariant: addWatcher called without permissions" )
}
2020-06-04 16:26:05 +01:00
if c . key == s . publicKey {
// We're connecting to ourself. Do nothing.
return
}
2020-06-01 23:19:41 +01:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
// Queue messages for each already-connected client.
for peer := range s . clients {
c . peerStateChange = append ( c . peerStateChange , peerConnState { peer : peer , present : true } )
}
// And enroll the watcher in future updates (of both
// connections & disconnections).
s . watchers [ c ] = true
go c . requestMeshUpdate ( )
}
2020-03-22 01:24:28 +00:00
func ( s * Server ) accept ( nc Conn , brw * bufio . ReadWriter , remoteAddr string , connNum int64 ) error {
2021-08-02 17:17:08 +01:00
br := brw . Reader
2020-02-20 16:50:25 +00:00
nc . SetDeadline ( time . Now ( ) . Add ( 10 * time . Second ) )
2021-08-02 17:17:08 +01:00
bw := & lazyBufioWriter { w : nc , lbw : brw . Writer }
2020-02-20 16:50:25 +00:00
if err := s . sendServerKey ( bw ) ; err != nil {
2020-02-05 22:16:58 +00:00
return fmt . Errorf ( "send server key: %v" , err )
}
2020-02-20 16:50:25 +00:00
nc . SetDeadline ( time . Now ( ) . Add ( 10 * time . Second ) )
clientKey , clientInfo , err := s . recvClientKey ( br )
2020-02-05 22:16:58 +00:00
if err != nil {
return fmt . Errorf ( "receive client key: %v" , err )
}
if err := s . verifyClient ( clientKey , clientInfo ) ; err != nil {
return fmt . Errorf ( "client %x rejected: %v" , clientKey , err )
}
// At this point we trust the client so we don't time out.
2020-02-20 16:50:25 +00:00
nc . SetDeadline ( time . Time { } )
2020-02-05 22:16:58 +00:00
2020-03-22 01:28:34 +00:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
2021-06-18 05:34:01 +01:00
remoteIPPort , _ := netaddr . ParseIPPort ( remoteAddr )
2020-02-20 16:50:25 +00:00
c := & sclient {
2021-07-12 22:01:51 +01:00
connNum : connNum ,
s : s ,
key : clientKey ,
nc : nc ,
br : br ,
bw : bw ,
logf : logger . WithPrefix ( s . logf , fmt . Sprintf ( "derp client %v/%x: " , remoteAddr , clientKey ) ) ,
done : ctx . Done ( ) ,
remoteAddr : remoteAddr ,
remoteIPPort : remoteIPPort ,
connectedAt : time . Now ( ) ,
sendQueue : make ( chan pkt , perClientSendQueueDepth ) ,
discoSendQueue : make ( chan pkt , perClientSendQueueDepth ) ,
peerGone : make ( chan key . Public ) ,
canMesh : clientInfo . MeshKey != "" && clientInfo . MeshKey == s . meshKey ,
2021-07-20 16:54:48 +01:00
// Allow kicking out previous connections once a
// minute, with a very high burst of 100. Once a
// minute is less than the client's 2 minute
// inactivity timeout.
replaceLimiter : rate . NewLimiter ( rate . Every ( time . Minute ) , 100 ) ,
2020-06-01 23:19:41 +01:00
}
2021-07-20 16:54:48 +01:00
2020-06-01 23:19:41 +01:00
if c . canMesh {
c . meshUpdate = make ( chan struct { } )
2020-02-05 22:16:58 +00:00
}
if clientInfo != nil {
c . info = * clientInfo
}
2021-07-20 16:54:48 +01:00
for {
ok , d := s . registerClient ( c )
if ok {
break
}
s . clientsReplaceSleeping . Add ( 1 )
timeSleep ( d )
s . clientsReplaceSleeping . Add ( - 1 )
}
2020-04-06 07:45:33 +01:00
defer s . unregisterClient ( c )
2021-08-02 17:17:08 +01:00
err = s . sendServerInfo ( c . bw , clientKey )
2020-02-20 22:27:28 +00:00
if err != nil {
return fmt . Errorf ( "send server info: %v" , err )
}
2020-02-05 22:16:58 +00:00
2020-03-22 20:08:17 +00:00
return c . run ( ctx )
2020-03-05 23:00:56 +00:00
}
2021-07-20 16:54:48 +01:00
// for testing
var (
timeSleep = time . Sleep
timeNow = time . Now
)
2020-03-22 20:08:17 +00:00
// run serves the client until there's an error.
// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error.
func ( c * sclient ) run ( ctx context . Context ) error {
// Launch sender, but don't return from run until sender goroutine is done.
var grp errgroup . Group
sendCtx , cancelSender := context . WithCancel ( ctx )
grp . Go ( func ( ) error { return c . sendLoop ( sendCtx ) } )
defer func ( ) {
cancelSender ( )
if err := grp . Wait ( ) ; err != nil && ! c . s . isClosed ( ) {
c . logf ( "sender failed: %v" , err )
}
} ( )
2020-02-20 23:14:24 +00:00
2020-02-05 22:16:58 +00:00
for {
2020-02-20 20:27:12 +00:00
ft , fl , err := readFrameHeader ( c . br )
2020-02-05 22:16:58 +00:00
if err != nil {
2020-03-22 20:08:17 +00:00
if errors . Is ( err , io . EOF ) {
c . logf ( "read EOF" )
return nil
}
if c . s . isClosed ( ) {
c . logf ( "closing; server closed" )
return nil
}
return fmt . Errorf ( "client %x: readFrameHeader: %w" , c . key , err )
2020-02-20 20:27:12 +00:00
}
2020-03-05 23:00:56 +00:00
switch ft {
case frameNotePreferred :
err = c . handleFrameNotePreferred ( ft , fl )
case frameSendPacket :
2020-03-22 01:28:34 +00:00
err = c . handleFrameSendPacket ( ft , fl )
2020-06-03 22:42:20 +01:00
case frameForwardPacket :
err = c . handleFrameForwardPacket ( ft , fl )
2020-06-01 23:19:41 +01:00
case frameWatchConns :
err = c . handleFrameWatchConns ( ft , fl )
2020-06-25 17:33:10 +01:00
case frameClosePeer :
err = c . handleFrameClosePeer ( ft , fl )
2020-03-05 23:00:56 +00:00
default :
2020-03-22 01:28:34 +00:00
err = c . handleUnknownFrame ( ft , fl )
2020-02-20 20:27:12 +00:00
}
if err != nil {
2020-03-05 23:00:56 +00:00
return err
2020-02-05 22:16:58 +00:00
}
2020-03-05 23:00:56 +00:00
}
}
2020-02-05 22:16:58 +00:00
2020-03-22 01:28:34 +00:00
func ( c * sclient ) handleUnknownFrame ( ft frameType , fl uint32 ) error {
2020-03-05 23:00:56 +00:00
_ , err := io . CopyN ( ioutil . Discard , c . br , int64 ( fl ) )
return err
}
2020-02-05 22:16:58 +00:00
2020-03-05 23:00:56 +00:00
func ( c * sclient ) handleFrameNotePreferred ( ft frameType , fl uint32 ) error {
if fl != 1 {
return fmt . Errorf ( "frameNotePreferred wrong size" )
}
v , err := c . br . ReadByte ( )
if err != nil {
return fmt . Errorf ( "frameNotePreferred ReadByte: %v" , err )
}
c . setPreferred ( v != 0 )
return nil
}
2020-06-01 23:19:41 +01:00
func ( c * sclient ) handleFrameWatchConns ( ft frameType , fl uint32 ) error {
if fl != 0 {
return fmt . Errorf ( "handleFrameWatchConns wrong size" )
}
if ! c . canMesh {
return fmt . Errorf ( "insufficient permissions" )
}
c . s . addWatcher ( c )
return nil
}
2020-06-25 17:33:10 +01:00
func ( c * sclient ) handleFrameClosePeer ( ft frameType , fl uint32 ) error {
if fl != keyLen {
return fmt . Errorf ( "handleFrameClosePeer wrong size" )
}
if ! c . canMesh {
return fmt . Errorf ( "insufficient permissions" )
}
var targetKey key . Public
if _ , err := io . ReadFull ( c . br , targetKey [ : ] ) ; err != nil {
return err
}
s := c . s
s . mu . Lock ( )
defer s . mu . Unlock ( )
if target , ok := s . clients [ targetKey ] ; ok {
c . logf ( "frameClosePeer closing peer %x" , targetKey )
go target . nc . Close ( )
} else {
c . logf ( "frameClosePeer failed to find peer %x" , targetKey )
}
return nil
}
2020-06-01 23:19:41 +01:00
2020-06-03 22:42:20 +01:00
// handleFrameForwardPacket reads a "forward packet" frame from the client
// (which must be a trusted client, a peer in our mesh).
func ( c * sclient ) handleFrameForwardPacket ( ft frameType , fl uint32 ) error {
if ! c . canMesh {
return fmt . Errorf ( "insufficient permissions" )
}
s := c . s
srcKey , dstKey , contents , err := s . recvForwardPacket ( c . br , fl )
if err != nil {
return fmt . Errorf ( "client %x: recvForwardPacket: %v" , c . key , err )
}
s . packetsForwardedIn . Add ( 1 )
s . mu . Lock ( )
dst := s . clients [ dstKey ]
2020-06-22 18:06:42 +01:00
if dst != nil {
s . notePeerSendLocked ( srcKey , dst )
}
2020-06-03 22:42:20 +01:00
s . mu . Unlock ( )
if dst == nil {
2021-07-12 21:05:55 +01:00
s . recordDrop ( contents , srcKey , dstKey , dropReasonUnknownDestOnFwd )
2020-06-03 22:42:20 +01:00
return nil
}
return c . sendPkt ( dst , pkt {
2021-06-09 23:06:15 +01:00
bs : contents ,
enqueuedAt : time . Now ( ) ,
src : srcKey ,
2020-06-03 22:42:20 +01:00
} )
}
2020-06-22 18:06:42 +01:00
// notePeerSendLocked records that src sent to dst. We keep track of
// that so when src disconnects, we can tell dst (if it's still
// around) that src is gone (a peerGone frame).
func ( s * Server ) notePeerSendLocked ( src key . Public , dst * sclient ) {
m , ok := s . sentTo [ src ]
if ! ok {
m = map [ key . Public ] int64 { }
s . sentTo [ src ] = m
}
m [ dst . key ] = dst . connNum
}
2020-06-03 22:42:20 +01:00
// handleFrameSendPacket reads a "send packet" frame from the client.
2020-03-22 01:28:34 +00:00
func ( c * sclient ) handleFrameSendPacket ( ft frameType , fl uint32 ) error {
2020-03-05 23:00:56 +00:00
s := c . s
2020-03-22 01:28:34 +00:00
dstKey , contents , err := s . recvPacket ( c . br , fl )
2020-03-05 23:00:56 +00:00
if err != nil {
return fmt . Errorf ( "client %x: recvPacket: %v" , c . key , err )
}
2020-06-03 22:42:20 +01:00
var fwd PacketForwarder
2020-03-05 23:00:56 +00:00
s . mu . Lock ( )
dst := s . clients [ dstKey ]
2020-06-03 22:42:20 +01:00
if dst == nil {
fwd = s . clientsMesh [ dstKey ]
} else {
2020-06-22 18:06:42 +01:00
s . notePeerSendLocked ( c . key , dst )
2020-04-06 08:18:37 +01:00
}
2020-03-05 23:00:56 +00:00
s . mu . Unlock ( )
if dst == nil {
2020-06-03 22:42:20 +01:00
if fwd != nil {
s . packetsForwardedOut . Add ( 1 )
if err := fwd . ForwardPacket ( c . key , dstKey , contents ) ; err != nil {
// TODO:
return nil
}
return nil
}
2021-07-12 21:05:55 +01:00
s . recordDrop ( contents , c . key , dstKey , dropReasonUnknownDest )
2020-03-05 23:00:56 +00:00
return nil
}
2020-02-05 22:16:58 +00:00
2020-03-22 05:17:22 +00:00
p := pkt {
2021-06-09 23:06:15 +01:00
bs : contents ,
enqueuedAt : time . Now ( ) ,
src : c . key ,
2020-03-20 07:14:43 +00:00
}
2020-06-03 22:42:20 +01:00
return c . sendPkt ( dst , p )
}
2021-07-12 21:05:55 +01:00
// dropReason is why we dropped a DERP frame.
type dropReason int
2021-08-02 03:14:08 +01:00
//go:generate go run tailscale.com/cmd/addlicense -year 2021 -file dropreason_string.go go run golang.org/x/tools/cmd/stringer -type=dropReason -trimprefix=dropReason
2021-07-12 21:05:55 +01:00
const (
dropReasonUnknownDest dropReason = iota // unknown destination pubkey
dropReasonUnknownDestOnFwd // unknown destination pubkey on a derp-forwarded packet
dropReasonGone // destination tailscaled disconnected before we could send
dropReasonQueueHead // destination queue is full, dropped packet at queue head
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
dropReasonWriteError // OS write() failed
)
func ( s * Server ) recordDrop ( packetBytes [ ] byte , srcKey , dstKey key . Public , reason dropReason ) {
s . packetsDropped . Add ( 1 )
s . packetsDroppedReasonCounters [ reason ] . Add ( 1 )
if disco . LooksLikeDiscoWrapper ( packetBytes ) {
s . packetsDroppedTypeDisco . Add ( 1 )
} else {
s . packetsDroppedTypeOther . Add ( 1 )
}
if verboseDropKeys [ dstKey ] {
// Preformat the log string prior to calling limitedLogf. The
// limiter acts based on the format string, and we want to
// rate-limit per src/dst keys, not on the generic "dropped
// stuff" message.
msg := fmt . Sprintf ( "drop (%s) %s -> %s" , srcKey . ShortString ( ) , reason , dstKey . ShortString ( ) )
s . limitedLogf ( msg )
}
if debug {
s . logf ( "dropping packet reason=%s dst=%s disco=%v" , reason , dstKey , disco . LooksLikeDiscoWrapper ( packetBytes ) )
}
}
2020-06-03 22:42:20 +01:00
func ( c * sclient ) sendPkt ( dst * sclient , p pkt ) error {
s := c . s
dstKey := dst . key
2020-03-20 07:14:43 +00:00
// Attempt to queue for sending up to 3 times. On each attempt, if
// the queue is full, try to drop from queue head to prioritize
// fresher packets.
2021-07-12 22:01:51 +01:00
sendQueue := dst . sendQueue
if disco . LooksLikeDiscoWrapper ( p . bs ) {
sendQueue = dst . discoSendQueue
}
2020-03-20 07:14:43 +00:00
for attempt := 0 ; attempt < 3 ; attempt ++ {
2020-03-22 03:43:50 +00:00
select {
case <- dst . done :
2021-07-12 21:05:55 +01:00
s . recordDrop ( p . bs , c . key , dstKey , dropReasonGone )
2020-03-22 03:43:50 +00:00
return nil
default :
}
2020-03-20 07:14:43 +00:00
select {
2021-07-12 22:01:51 +01:00
case sendQueue <- p :
2020-03-20 07:14:43 +00:00
return nil
default :
}
2020-02-05 22:16:58 +00:00
2020-03-20 07:14:43 +00:00
select {
2021-07-12 22:01:51 +01:00
case pkt := <- sendQueue :
2021-07-12 21:05:55 +01:00
s . recordDrop ( pkt . bs , c . key , dstKey , dropReasonQueueHead )
2021-06-09 23:06:15 +01:00
c . recordQueueTime ( pkt . enqueuedAt )
2020-03-20 07:14:43 +00:00
default :
2020-02-05 22:16:58 +00:00
}
2020-03-20 07:14:43 +00:00
}
// Failed to make room for packet. This can happen in a heavily
// contended queue with racing writers. Give up and tail-drop in
// this case to keep reader unblocked.
2021-07-12 21:05:55 +01:00
s . recordDrop ( p . bs , c . key , dstKey , dropReasonQueueTail )
2020-03-12 15:10:55 +00:00
return nil
2020-02-05 22:16:58 +00:00
}
2020-03-22 01:24:28 +00:00
// requestPeerGoneWrite sends a request to write a "peer gone" frame
// that the provided peer has disconnected. It blocks until either the
// write request is scheduled, or the client has closed.
func ( c * sclient ) requestPeerGoneWrite ( peer key . Public ) {
select {
case c . peerGone <- peer :
case <- c . done :
}
}
2020-06-01 23:19:41 +01:00
func ( c * sclient ) requestMeshUpdate ( ) {
if ! c . canMesh {
panic ( "unexpected requestMeshUpdate" )
}
select {
case c . meshUpdate <- struct { } { } :
case <- c . done :
}
}
2020-03-04 17:35:32 +00:00
func ( s * Server ) verifyClient ( clientKey key . Public , info * clientInfo ) error {
2021-06-24 21:31:05 +01:00
if ! s . verifyClients {
return nil
}
status , err := tailscale . Status ( context . TODO ( ) )
if err != nil {
return fmt . Errorf ( "failed to query local tailscaled status: %w" , err )
}
2021-07-13 16:25:43 +01:00
if clientKey == status . Self . PublicKey {
return nil
}
2021-06-24 21:31:05 +01:00
if _ , exists := status . Peer [ clientKey ] ; ! exists {
return fmt . Errorf ( "client %v not in set of peers" , clientKey )
}
// TODO(bradfitz): add policy for configurable bandwidth rate per client?
2020-02-05 22:16:58 +00:00
return nil
}
2021-08-02 17:17:08 +01:00
func ( s * Server ) sendServerKey ( lw * lazyBufioWriter ) error {
2020-02-20 20:27:12 +00:00
buf := make ( [ ] byte , 0 , len ( magic ) + len ( s . publicKey ) )
buf = append ( buf , magic ... )
buf = append ( buf , s . publicKey [ : ] ... )
2021-08-02 17:17:08 +01:00
err := writeFrame ( lw . bw ( ) , frameServerKey , buf )
lw . Flush ( ) // redundant (no-op) flush to release bufio.Writer
return err
2020-02-05 22:16:58 +00:00
}
2020-03-04 17:35:32 +00:00
type serverInfo struct {
2020-08-19 22:36:43 +01:00
Version int ` json:"version,omitempty" `
2020-03-04 17:35:32 +00:00
}
2021-08-02 17:17:08 +01:00
func ( s * Server ) sendServerInfo ( bw * lazyBufioWriter , clientKey key . Public ) error {
2020-02-05 22:16:58 +00:00
var nonce [ 24 ] byte
2020-02-17 21:52:11 +00:00
if _ , err := crand . Read ( nonce [ : ] ) ; err != nil {
2020-02-05 22:16:58 +00:00
return err
}
2020-08-18 23:32:32 +01:00
msg , err := json . Marshal ( serverInfo { Version : ProtocolVersion } )
2020-03-04 17:35:32 +00:00
if err != nil {
return err
}
2020-02-17 21:52:11 +00:00
msgbox := box . Seal ( nil , msg , & nonce , clientKey . B32 ( ) , s . privateKey . B32 ( ) )
2021-08-02 17:17:08 +01:00
if err := writeFrameHeader ( bw . bw ( ) , frameServerInfo , nonceLen + uint32 ( len ( msgbox ) ) ) ; err != nil {
2020-02-05 22:16:58 +00:00
return err
}
2020-02-20 16:50:25 +00:00
if _ , err := bw . Write ( nonce [ : ] ) ; err != nil {
2020-02-05 22:16:58 +00:00
return err
}
2020-02-20 16:50:25 +00:00
if _ , err := bw . Write ( msgbox ) ; err != nil {
2020-02-05 22:16:58 +00:00
return err
}
2020-02-20 16:50:25 +00:00
return bw . Flush ( )
2020-02-05 22:16:58 +00:00
}
2020-02-20 20:27:12 +00:00
// recvClientKey reads the frameClientInfo frame from the client (its
// proof of identity) upon its initial connection. It should be
// considered especially untrusted at this point.
2020-03-04 17:35:32 +00:00
func ( s * Server ) recvClientKey ( br * bufio . Reader ) ( clientKey key . Public , info * clientInfo , err error ) {
2020-02-20 20:27:12 +00:00
fl , err := readFrameTypeHeader ( br , frameClientInfo )
if err != nil {
2020-06-04 19:28:00 +01:00
return zpub , nil , err
2020-02-20 20:27:12 +00:00
}
const minLen = keyLen + nonceLen
if fl < minLen {
2020-06-04 19:28:00 +01:00
return zpub , nil , errors . New ( "short client info" )
2020-02-20 20:27:12 +00:00
}
// We don't trust the client at all yet, so limit its input size to limit
// things like JSON resource exhausting (http://github.com/golang/go/issues/31789).
if fl > 256 << 10 {
2020-06-04 19:28:00 +01:00
return zpub , nil , errors . New ( "long client info" )
2020-02-20 20:27:12 +00:00
}
2020-02-18 18:08:51 +00:00
if _ , err := io . ReadFull ( br , clientKey [ : ] ) ; err != nil {
2020-06-04 19:28:00 +01:00
return zpub , nil , err
2020-02-05 22:16:58 +00:00
}
var nonce [ 24 ] byte
2020-02-18 18:08:51 +00:00
if _ , err := io . ReadFull ( br , nonce [ : ] ) ; err != nil {
2020-06-04 19:28:00 +01:00
return zpub , nil , fmt . Errorf ( "nonce: %v" , err )
2020-02-05 22:16:58 +00:00
}
2020-02-20 20:27:12 +00:00
msgLen := int ( fl - minLen )
2020-02-05 22:16:58 +00:00
msgbox := make ( [ ] byte , msgLen )
2020-02-18 18:08:51 +00:00
if _ , err := io . ReadFull ( br , msgbox ) ; err != nil {
2020-06-04 19:28:00 +01:00
return zpub , nil , fmt . Errorf ( "msgbox: %v" , err )
2020-02-05 22:16:58 +00:00
}
2020-02-17 21:52:11 +00:00
msg , ok := box . Open ( nil , msgbox , & nonce , ( * [ 32 ] byte ) ( & clientKey ) , s . privateKey . B32 ( ) )
2020-02-05 22:16:58 +00:00
if ! ok {
2020-06-04 19:28:00 +01:00
return zpub , nil , fmt . Errorf ( "msgbox: cannot open len=%d with client key %x" , msgLen , clientKey [ : ] )
2020-02-05 22:16:58 +00:00
}
2020-03-04 17:35:32 +00:00
info = new ( clientInfo )
2020-02-05 22:16:58 +00:00
if err := json . Unmarshal ( msg , info ) ; err != nil {
2020-06-04 19:28:00 +01:00
return zpub , nil , fmt . Errorf ( "msg: %v" , err )
2020-02-05 22:16:58 +00:00
}
return clientKey , info , nil
}
2020-03-22 01:28:34 +00:00
func ( s * Server ) recvPacket ( br * bufio . Reader , frameLen uint32 ) ( dstKey key . Public , contents [ ] byte , err error ) {
2020-02-20 20:27:12 +00:00
if frameLen < keyLen {
2020-06-04 19:28:00 +01:00
return zpub , nil , errors . New ( "short send packet frame" )
2020-02-05 22:16:58 +00:00
}
2020-08-12 00:40:36 +01:00
if err := readPublicKey ( br , & dstKey ) ; err != nil {
2020-06-04 19:28:00 +01:00
return zpub , nil , err
2020-02-05 22:16:58 +00:00
}
2020-02-20 20:27:12 +00:00
packetLen := frameLen - keyLen
2020-02-21 03:10:54 +00:00
if packetLen > MaxPacketSize {
2020-06-04 19:28:00 +01:00
return zpub , nil , fmt . Errorf ( "data packet longer (%d) than max of %v" , packetLen , MaxPacketSize )
2020-02-20 23:14:24 +00:00
}
2020-02-05 22:16:58 +00:00
contents = make ( [ ] byte , packetLen )
2020-02-18 18:08:51 +00:00
if _ , err := io . ReadFull ( br , contents ) ; err != nil {
2020-06-04 19:28:00 +01:00
return zpub , nil , err
2020-02-05 22:16:58 +00:00
}
2020-03-03 19:33:22 +00:00
s . packetsRecv . Add ( 1 )
s . bytesRecv . Add ( int64 ( len ( contents ) ) )
2020-08-11 20:16:15 +01:00
if disco . LooksLikeDiscoWrapper ( contents ) {
s . packetsRecvDisco . Add ( 1 )
} else {
2020-08-11 20:30:15 +01:00
s . packetsRecvOther . Add ( 1 )
2020-08-11 20:16:15 +01:00
}
2020-02-05 22:16:58 +00:00
return dstKey , contents , nil
}
2020-06-04 19:28:00 +01:00
// zpub is the key.Public zero value.
var zpub key . Public
2020-06-03 22:42:20 +01:00
func ( s * Server ) recvForwardPacket ( br * bufio . Reader , frameLen uint32 ) ( srcKey , dstKey key . Public , contents [ ] byte , err error ) {
if frameLen < keyLen * 2 {
return zpub , zpub , nil , errors . New ( "short send packet frame" )
}
if _ , err := io . ReadFull ( br , srcKey [ : ] ) ; err != nil {
return zpub , zpub , nil , err
}
if _ , err := io . ReadFull ( br , dstKey [ : ] ) ; err != nil {
return zpub , zpub , nil , err
}
packetLen := frameLen - keyLen * 2
if packetLen > MaxPacketSize {
return zpub , zpub , nil , fmt . Errorf ( "data packet longer (%d) than max of %v" , packetLen , MaxPacketSize )
}
contents = make ( [ ] byte , packetLen )
if _ , err := io . ReadFull ( br , contents ) ; err != nil {
return zpub , zpub , nil , err
}
// TODO: was s.packetsRecv.Add(1)
// TODO: was s.bytesRecv.Add(int64(len(contents)))
return srcKey , dstKey , contents , nil
}
2020-02-20 16:50:25 +00:00
// sclient is a client connection to the server.
//
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
type sclient struct {
2020-03-20 07:14:43 +00:00
// Static after construction.
2021-07-12 22:01:51 +01:00
connNum int64 // process-wide unique counter, incremented each Accept
s * Server
nc Conn
key key . Public
info clientInfo
logf logger . Logf
done <- chan struct { } // closed when connection closes
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
remoteIPPort netaddr . IPPort // zero if remoteAddr is not ip:port.
sendQueue chan pkt // packets queued to this client; never closed
discoSendQueue chan pkt // important packets queued to this client; never closed
peerGone chan key . Public // write request that a previous sender has disconnected (not used by mesh peers)
meshUpdate chan struct { } // write request to write peerStateChange
canMesh bool // clientInfo had correct mesh token for inter-region routing
2020-03-20 07:14:43 +00:00
2021-07-20 16:54:48 +01:00
// replaceLimiter controls how quickly two connections with
// the same client key can kick each other off the server by
// taking over ownership of a key.
replaceLimiter * rate . Limiter
2020-03-20 07:14:43 +00:00
// Owned by run, not thread-safe.
br * bufio . Reader
2020-03-09 17:25:04 +00:00
connectedAt time . Time
2020-03-20 07:14:43 +00:00
preferred bool
2020-02-05 22:16:58 +00:00
2020-03-20 07:14:43 +00:00
// Owned by sender, not thread-safe.
2021-08-02 17:17:08 +01:00
bw * lazyBufioWriter
2020-04-06 08:18:37 +01:00
2020-06-01 23:19:41 +01:00
// Guarded by s.mu
//
// peerStateChange is used by mesh peers (a set of regional
// DERP servers) and contains records that need to be sent to
// the client for them to update their map of who's connected
// to this node.
peerStateChange [ ] peerConnState
}
// peerConnState represents whether a peer is connected to the server
// or not.
type peerConnState struct {
peer key . Public
present bool
2020-03-20 07:14:43 +00:00
}
2020-03-05 23:00:56 +00:00
2020-03-22 05:17:22 +00:00
// pkt is a request to write a data frame to an sclient.
type pkt struct {
2020-03-22 03:34:49 +00:00
// src is the who's the sender of the packet.
2020-03-20 07:14:43 +00:00
src key . Public
2020-03-22 03:34:49 +00:00
2021-06-09 23:06:15 +01:00
// enqueuedAt is when a packet was put onto a queue before it was sent,
// and is used for reporting metrics on the duration of packets in the queue.
enqueuedAt time . Time
2020-03-22 03:34:49 +00:00
// bs is the data packet bytes.
2020-03-22 05:17:22 +00:00
// The memory is owned by pkt.
2020-03-22 03:34:49 +00:00
bs [ ] byte
2020-02-05 22:16:58 +00:00
}
2020-03-05 23:00:56 +00:00
func ( c * sclient ) setPreferred ( v bool ) {
if c . preferred == v {
return
}
2020-03-06 03:02:54 +00:00
c . preferred = v
2020-03-09 17:25:04 +00:00
var homeMove * expvar . Int
2020-03-05 23:00:56 +00:00
if v {
c . s . curHomeClients . Add ( 1 )
2020-03-09 17:25:04 +00:00
homeMove = & c . s . homeMovesIn
2020-03-05 23:00:56 +00:00
} else {
c . s . curHomeClients . Add ( - 1 )
2020-03-09 17:25:04 +00:00
homeMove = & c . s . homeMovesOut
}
// Keep track of varz for home serve moves in/out. But ignore
// the initial packet set when a client connects, which we
// assume happens within 5 seconds. In any case, just for
// graphs, so not important to miss a move. But it shouldn't:
// the netcheck/re-STUNs in magicsock only happen about every
// 30 seconds.
if time . Since ( c . connectedAt ) > 5 * time . Second {
homeMove . Add ( 1 )
2020-03-05 23:00:56 +00:00
}
}
2021-06-09 23:06:15 +01:00
// expMovingAverage returns the new moving average given the previous average,
// a new value, and an alpha decay factor.
// https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
func expMovingAverage ( prev , newValue , alpha float64 ) float64 {
return alpha * newValue + ( 1 - alpha ) * prev
}
// recordQueueTime updates the average queue duration metric after a packet has been sent.
func ( c * sclient ) recordQueueTime ( enqueuedAt time . Time ) {
elapsed := float64 ( time . Since ( enqueuedAt ) . Milliseconds ( ) )
for {
old := atomic . LoadUint64 ( c . s . avgQueueDuration )
newAvg := expMovingAverage ( math . Float64frombits ( old ) , elapsed , 0.1 )
if atomic . CompareAndSwapUint64 ( c . s . avgQueueDuration , old , math . Float64bits ( newAvg ) ) {
break
}
}
}
2020-03-22 20:08:17 +00:00
func ( c * sclient ) sendLoop ( ctx context . Context ) error {
2020-03-20 07:14:43 +00:00
defer func ( ) {
2020-03-22 20:08:17 +00:00
// If the sender shuts down unilaterally due to an error, close so
// that the receive loop unblocks and cleans up the rest.
c . nc . Close ( )
2020-03-20 07:14:43 +00:00
// Drain the send queue to count dropped packets
for {
2020-03-22 03:43:50 +00:00
select {
2021-07-12 21:05:55 +01:00
case pkt := <- c . sendQueue :
c . s . recordDrop ( pkt . bs , pkt . src , c . key , dropReasonGone )
2021-07-12 22:01:51 +01:00
case pkt := <- c . discoSendQueue :
c . s . recordDrop ( pkt . bs , pkt . src , c . key , dropReasonGone )
2020-03-22 03:43:50 +00:00
default :
return
2020-03-20 07:14:43 +00:00
}
}
} ( )
2020-08-13 22:06:43 +01:00
jitter := time . Duration ( rand . Intn ( 5000 ) ) * time . Millisecond
2020-03-20 07:14:43 +00:00
keepAliveTick := time . NewTicker ( keepAlive + jitter )
defer keepAliveTick . Stop ( )
2020-02-05 22:16:58 +00:00
2020-03-22 01:24:28 +00:00
var werr error // last write error
2020-02-05 22:16:58 +00:00
for {
2020-03-22 01:24:28 +00:00
if werr != nil {
return werr
}
// First, a non-blocking select (with a default) that
// does as many non-flushing writes as possible.
2020-02-05 22:16:58 +00:00
select {
2020-03-22 20:08:17 +00:00
case <- ctx . Done ( ) :
2020-02-05 22:16:58 +00:00
return nil
2020-03-22 01:24:28 +00:00
case peer := <- c . peerGone :
werr = c . sendPeerGone ( peer )
continue
2020-06-01 23:19:41 +01:00
case <- c . meshUpdate :
werr = c . sendMeshUpdates ( )
continue
2020-03-22 01:24:28 +00:00
case msg := <- c . sendQueue :
werr = c . sendPacket ( msg . src , msg . bs )
2021-06-09 23:06:15 +01:00
c . recordQueueTime ( msg . enqueuedAt )
2020-03-22 01:24:28 +00:00
continue
2021-07-12 22:01:51 +01:00
case msg := <- c . discoSendQueue :
werr = c . sendPacket ( msg . src , msg . bs )
c . recordQueueTime ( msg . enqueuedAt )
continue
2020-03-22 01:24:28 +00:00
case <- keepAliveTick . C :
werr = c . sendKeepAlive ( )
continue
default :
// Flush any writes from the 3 sends above, or from
// the blocking loop below.
if werr = c . bw . Flush ( ) ; werr != nil {
return werr
2020-02-05 22:16:58 +00:00
}
2020-03-22 01:24:28 +00:00
}
2020-02-05 22:16:58 +00:00
2020-03-22 01:24:28 +00:00
// Then a blocking select with same:
select {
2020-03-22 20:08:17 +00:00
case <- ctx . Done ( ) :
2020-03-22 01:24:28 +00:00
return nil
2020-03-22 01:24:28 +00:00
case peer := <- c . peerGone :
werr = c . sendPeerGone ( peer )
2020-06-01 23:19:41 +01:00
case <- c . meshUpdate :
werr = c . sendMeshUpdates ( )
continue
2020-03-22 01:24:28 +00:00
case msg := <- c . sendQueue :
werr = c . sendPacket ( msg . src , msg . bs )
2021-06-09 23:06:15 +01:00
c . recordQueueTime ( msg . enqueuedAt )
2021-07-12 22:01:51 +01:00
case msg := <- c . discoSendQueue :
werr = c . sendPacket ( msg . src , msg . bs )
c . recordQueueTime ( msg . enqueuedAt )
2020-03-20 07:14:43 +00:00
case <- keepAliveTick . C :
2020-03-22 01:24:28 +00:00
werr = c . sendKeepAlive ( )
2020-02-05 22:16:58 +00:00
}
}
}
2020-03-22 01:24:28 +00:00
func ( c * sclient ) setWriteDeadline ( ) {
2020-03-20 07:38:52 +00:00
c . nc . SetWriteDeadline ( time . Now ( ) . Add ( writeTimeout ) )
2020-03-22 01:24:28 +00:00
}
// sendKeepAlive sends a keep-alive frame, without flushing.
func ( c * sclient ) sendKeepAlive ( ) error {
c . setWriteDeadline ( )
2021-08-02 17:17:08 +01:00
return writeFrameHeader ( c . bw . bw ( ) , frameKeepAlive , 0 )
2020-03-20 07:14:43 +00:00
}
2020-03-22 01:24:28 +00:00
// sendPeerGone sends a peerGone frame, without flushing.
func ( c * sclient ) sendPeerGone ( peer key . Public ) error {
c . s . peerGoneFrames . Add ( 1 )
c . setWriteDeadline ( )
2021-08-02 17:17:08 +01:00
if err := writeFrameHeader ( c . bw . bw ( ) , framePeerGone , keyLen ) ; err != nil {
2020-03-22 01:24:28 +00:00
return err
}
_ , err := c . bw . Write ( peer [ : ] )
return err
}
2020-06-01 23:19:41 +01:00
// sendPeerPresent sends a peerPresent frame, without flushing.
func ( c * sclient ) sendPeerPresent ( peer key . Public ) error {
c . setWriteDeadline ( )
2021-08-02 17:17:08 +01:00
if err := writeFrameHeader ( c . bw . bw ( ) , framePeerPresent , keyLen ) ; err != nil {
2020-06-01 23:19:41 +01:00
return err
}
_ , err := c . bw . Write ( peer [ : ] )
return err
}
// sendMeshUpdates drains as many mesh peerStateChange entries as
// possible into the write buffer WITHOUT flushing or otherwise
// blocking (as it holds c.s.mu while working). If it can't drain them
// all, it schedules itself to be called again in the future.
func ( c * sclient ) sendMeshUpdates ( ) error {
c . s . mu . Lock ( )
defer c . s . mu . Unlock ( )
writes := 0
for _ , pcs := range c . peerStateChange {
if c . bw . Available ( ) <= frameHeaderLen + keyLen {
break
}
var err error
if pcs . present {
err = c . sendPeerPresent ( pcs . peer )
} else {
err = c . sendPeerGone ( pcs . peer )
}
if err != nil {
// Shouldn't happen, though, as we're writing
// into available buffer space, not the
// network.
return err
}
writes ++
}
remain := copy ( c . peerStateChange , c . peerStateChange [ writes : ] )
c . peerStateChange = c . peerStateChange [ : remain ]
// Did we manage to write them all into the bufio buffer without flushing?
if len ( c . peerStateChange ) == 0 {
if cap ( c . peerStateChange ) > 16 {
c . peerStateChange = nil
}
} else {
// Didn't finish in the buffer space provided; schedule a future run.
go c . requestMeshUpdate ( )
}
return nil
}
2020-03-20 19:00:20 +00:00
// sendPacket writes contents to the client in a RecvPacket frame. If
// srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses
// DERPv2. The bytes of contents are only valid until this function
// returns, do not retain slices.
2020-03-22 01:24:28 +00:00
// It does not flush its bufio.Writer.
2020-03-20 07:14:43 +00:00
func ( c * sclient ) sendPacket ( srcKey key . Public , contents [ ] byte ) ( err error ) {
defer func ( ) {
// Stats update.
if err != nil {
2021-07-12 21:05:55 +01:00
c . s . recordDrop ( contents , srcKey , c . key , dropReasonWriteError )
2020-03-20 07:14:43 +00:00
} else {
c . s . packetsSent . Add ( 1 )
c . s . bytesSent . Add ( int64 ( len ( contents ) ) )
}
} ( )
2020-03-22 01:24:28 +00:00
c . setWriteDeadline ( )
2020-03-20 07:14:43 +00:00
withKey := ! srcKey . IsZero ( )
pktLen := len ( contents )
if withKey {
pktLen += len ( srcKey )
}
2021-08-02 17:17:08 +01:00
if err = writeFrameHeader ( c . bw . bw ( ) , frameRecvPacket , uint32 ( pktLen ) ) ; err != nil {
2020-03-20 07:14:43 +00:00
return err
}
if withKey {
2021-08-02 17:17:08 +01:00
err := writePublicKey ( c . bw . bw ( ) , & srcKey )
2020-08-12 00:40:36 +01:00
if err != nil {
2020-03-20 07:14:43 +00:00
return err
}
}
2020-03-22 01:24:28 +00:00
_ , err = c . bw . Write ( contents )
return err
2020-03-20 07:14:43 +00:00
}
2020-06-03 22:42:20 +01:00
// AddPacketForwarder registers fwd as a packet forwarder for dst.
// fwd must be comparable.
func ( s * Server ) AddPacketForwarder ( dst key . Public , fwd PacketForwarder ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
if prev , ok := s . clientsMesh [ dst ] ; ok {
if prev == fwd {
// Duplicate registration of same forwarder. Ignore.
return
}
if m , ok := prev . ( multiForwarder ) ; ok {
2021-08-22 03:50:50 +01:00
if _ , ok := m [ fwd ] ; ok {
2020-06-03 22:42:20 +01:00
// Duplicate registration of same forwarder in set; ignore.
return
}
m [ fwd ] = m . maxVal ( ) + 1
return
}
2020-06-05 20:47:23 +01:00
if prev != nil {
// Otherwise, the existing value is not a set,
// not a dup, and not local-only (nil) so make
// it a set.
fwd = multiForwarder {
prev : 1 , // existed 1st, higher priority
fwd : 2 , // the passed in fwd is in 2nd place
}
s . multiForwarderCreated . Add ( 1 )
2020-06-03 22:42:20 +01:00
}
}
s . clientsMesh [ dst ] = fwd
}
// RemovePacketForwarder removes fwd as a packet forwarder for dst.
// fwd must be comparable.
func ( s * Server ) RemovePacketForwarder ( dst key . Public , fwd PacketForwarder ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
v , ok := s . clientsMesh [ dst ]
if ! ok {
return
}
if m , ok := v . ( multiForwarder ) ; ok {
if len ( m ) < 2 {
panic ( "unexpected" )
}
delete ( m , fwd )
// If fwd was in m and we no longer need to be a
// multiForwarder, replace the entry with the
// remaining PacketForwarder.
if len ( m ) == 1 {
var remain PacketForwarder
for k := range m {
remain = k
}
s . clientsMesh [ dst ] = remain
s . multiForwarderDeleted . Add ( 1 )
}
return
}
if v != fwd {
2020-06-23 21:59:48 +01:00
s . removePktForwardOther . Add ( 1 )
2020-06-03 22:42:20 +01:00
// Delete of an entry that wasn't in the
// map. Harmless, so ignore.
// (This might happen if a user is moving around
// between nodes and/or the server sent duplicate
// connection change broadcasts.)
return
}
if _ , isLocal := s . clients [ dst ] ; isLocal {
s . clientsMesh [ dst ] = nil
} else {
delete ( s . clientsMesh , dst )
2020-06-22 18:06:42 +01:00
s . notePeerGoneFromRegionLocked ( dst )
2020-06-03 22:42:20 +01:00
}
}
// multiForwarder is a PacketForwarder that represents a set of
// forwarding options. It's used in the rare cases that a client is
// connected to multiple DERP nodes in a region. That shouldn't really
// happen except for perhaps during brief moments while the client is
// reconfiguring, in which case we don't want to forget where the
// client is. The map value is unique connection number; the lowest
// one has been seen the longest. It's used to make sure we forward
// packets consistently to the same node and don't pick randomly.
type multiForwarder map [ PacketForwarder ] uint8
func ( m multiForwarder ) maxVal ( ) ( max uint8 ) {
for _ , v := range m {
if v > max {
max = v
}
}
return
}
func ( m multiForwarder ) ForwardPacket ( src , dst key . Public , payload [ ] byte ) error {
var fwd PacketForwarder
var lowest uint8
for k , v := range m {
if fwd == nil || v < lowest {
fwd = k
lowest = v
}
}
return fwd . ForwardPacket ( src , dst , payload )
}
2020-03-03 19:33:22 +00:00
func ( s * Server ) expVarFunc ( f func ( ) interface { } ) expvar . Func {
return expvar . Func ( func ( ) interface { } {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return f ( )
} )
2020-02-21 17:35:53 +00:00
}
// ExpVar returns an expvar variable suitable for registering with expvar.Publish.
func ( s * Server ) ExpVar ( ) expvar . Var {
2020-03-03 19:33:22 +00:00
m := new ( metrics . Set )
2020-03-20 22:22:02 +00:00
m . Set ( "gauge_memstats_sys0" , expvar . Func ( func ( ) interface { } { return int64 ( s . memSys0 ) } ) )
2020-06-01 23:19:41 +01:00
m . Set ( "gauge_watchers" , s . expVarFunc ( func ( ) interface { } { return len ( s . watchers ) } ) )
2020-06-25 16:43:28 +01:00
m . Set ( "gauge_current_connections" , & s . curClients )
m . Set ( "gauge_current_home_connections" , & s . curHomeClients )
2020-06-03 22:42:20 +01:00
m . Set ( "gauge_clients_total" , expvar . Func ( func ( ) interface { } { return len ( s . clientsMesh ) } ) )
2020-06-23 21:59:48 +01:00
m . Set ( "gauge_clients_local" , expvar . Func ( func ( ) interface { } { return len ( s . clients ) } ) )
2020-06-03 22:42:20 +01:00
m . Set ( "gauge_clients_remote" , expvar . Func ( func ( ) interface { } { return len ( s . clientsMesh ) - len ( s . clients ) } ) )
2020-03-03 19:33:22 +00:00
m . Set ( "accepts" , & s . accepts )
2020-03-11 18:53:13 +00:00
m . Set ( "clients_replaced" , & s . clientsReplaced )
2021-07-20 16:54:48 +01:00
m . Set ( "clients_replace_limited" , & s . clientsReplaceLimited )
m . Set ( "gauge_clients_replace_sleeping" , & s . clientsReplaceSleeping )
2020-03-03 19:33:22 +00:00
m . Set ( "bytes_received" , & s . bytesRecv )
m . Set ( "bytes_sent" , & s . bytesSent )
m . Set ( "packets_dropped" , & s . packetsDropped )
2020-03-20 22:52:50 +00:00
m . Set ( "counter_packets_dropped_reason" , & s . packetsDroppedReason )
2021-07-12 21:32:04 +01:00
m . Set ( "counter_packets_dropped_type" , & s . packetsDroppedType )
2020-08-11 20:59:08 +01:00
m . Set ( "counter_packets_received_kind" , & s . packetsRecvByKind )
2020-03-03 19:33:22 +00:00
m . Set ( "packets_sent" , & s . packetsSent )
m . Set ( "packets_received" , & s . packetsRecv )
2020-03-05 23:00:56 +00:00
m . Set ( "unknown_frames" , & s . unknownFrames )
2020-03-09 17:25:04 +00:00
m . Set ( "home_moves_in" , & s . homeMovesIn )
m . Set ( "home_moves_out" , & s . homeMovesOut )
2020-03-22 01:24:28 +00:00
m . Set ( "peer_gone_frames" , & s . peerGoneFrames )
2020-06-03 22:42:20 +01:00
m . Set ( "packets_forwarded_out" , & s . packetsForwardedOut )
m . Set ( "packets_forwarded_in" , & s . packetsForwardedIn )
m . Set ( "multiforwarder_created" , & s . multiForwarderCreated )
m . Set ( "multiforwarder_deleted" , & s . multiForwarderDeleted )
2020-06-23 21:59:48 +01:00
m . Set ( "packet_forwarder_delete_other_value" , & s . removePktForwardOther )
2021-06-09 23:06:15 +01:00
m . Set ( "average_queue_duration_ms" , expvar . Func ( func ( ) interface { } {
return math . Float64frombits ( atomic . LoadUint64 ( s . avgQueueDuration ) )
} ) )
2020-08-07 19:51:44 +01:00
var expvarVersion expvar . String
2020-10-27 04:23:58 +00:00
expvarVersion . Set ( version . Long )
2020-08-07 19:51:44 +01:00
m . Set ( "version" , & expvarVersion )
2020-03-03 19:33:22 +00:00
return m
2020-02-21 17:35:53 +00:00
}
2020-06-23 21:59:48 +01:00
func ( s * Server ) ConsistencyCheck ( ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
var errs [ ] string
var nilMeshNotInClient int
for k , f := range s . clientsMesh {
if f == nil {
if _ , ok := s . clients [ k ] ; ! ok {
nilMeshNotInClient ++
}
}
}
if nilMeshNotInClient != 0 {
errs = append ( errs , fmt . Sprintf ( "%d s.clientsMesh keys not in s.clients" , nilMeshNotInClient ) )
}
var clientNotInMesh int
for k := range s . clients {
if _ , ok := s . clientsMesh [ k ] ; ! ok {
clientNotInMesh ++
}
}
if clientNotInMesh != 0 {
errs = append ( errs , fmt . Sprintf ( "%d s.clients keys not in s.clientsMesh" , clientNotInMesh ) )
}
if s . curClients . Value ( ) != int64 ( len ( s . clients ) ) {
errs = append ( errs , fmt . Sprintf ( "expvar connections = %d != clients map says of %d" ,
s . curClients . Value ( ) ,
len ( s . clients ) ) )
}
if len ( errs ) == 0 {
return nil
}
return errors . New ( strings . Join ( errs , ", " ) )
}
2020-08-12 00:40:36 +01:00
// readPublicKey reads key from br.
// It is ~4x slower than io.ReadFull(br, key),
// but it prevents key from escaping and thus being allocated.
// If io.ReadFull(br, key) does not cause key to escape, use that instead.
func readPublicKey ( br * bufio . Reader , key * key . Public ) error {
// Do io.ReadFull(br, key), but one byte at a time, to avoid allocation.
for i := range key {
b , err := br . ReadByte ( )
if err != nil {
return err
}
key [ i ] = b
}
return nil
}
// writePublicKey writes key to bw.
// It is ~3x slower than bw.Write(key[:]),
// but it prevents key from escaping and thus being allocated.
// If bw.Write(key[:]) does not cause key to escape, use that instead.
func writePublicKey ( bw * bufio . Writer , key * key . Public ) error {
// Do bw.Write(key[:]), but one byte at a time to avoid allocation.
for _ , b := range key {
err := bw . WriteByte ( b )
if err != nil {
return err
}
}
return nil
}
2021-06-18 05:34:01 +01:00
const minTimeBetweenLogs = 2 * time . Second
// BytesSentRecv records the number of bytes that have been sent since the last traffic check
// for a given process, as well as the public key of the process sending those bytes.
type BytesSentRecv struct {
Sent uint64
Recv uint64
// Key is the public key of the client which sent/received these bytes.
Key key . Public
}
// parseSSOutput parses the output from the specific call to ss in ServeDebugTraffic.
// Separated out for ease of testing.
func parseSSOutput ( raw string ) map [ netaddr . IPPort ] BytesSentRecv {
newState := map [ netaddr . IPPort ] BytesSentRecv { }
// parse every 2 lines and get src and dst ips, and kv pairs
lines := strings . Split ( raw , "\n" )
for i := 0 ; i < len ( lines ) ; i += 2 {
ipInfo := strings . Fields ( strings . TrimSpace ( lines [ i ] ) )
if len ( ipInfo ) < 5 {
continue
}
2021-06-19 00:02:41 +01:00
src , err := netaddr . ParseIPPort ( ipInfo [ 4 ] )
2021-06-18 05:34:01 +01:00
if err != nil {
continue
}
stats := strings . Fields ( strings . TrimSpace ( lines [ i + 1 ] ) )
stat := BytesSentRecv { }
for _ , s := range stats {
if strings . Contains ( s , "bytes_sent" ) {
sent , err := strconv . Atoi ( s [ strings . Index ( s , ":" ) + 1 : ] )
if err == nil {
stat . Sent = uint64 ( sent )
}
} else if strings . Contains ( s , "bytes_received" ) {
recv , err := strconv . Atoi ( s [ strings . Index ( s , ":" ) + 1 : ] )
if err == nil {
stat . Recv = uint64 ( recv )
}
}
}
newState [ src ] = stat
}
return newState
}
func ( s * Server ) ServeDebugTraffic ( w http . ResponseWriter , r * http . Request ) {
prevState := map [ netaddr . IPPort ] BytesSentRecv { }
enc := json . NewEncoder ( w )
for r . Context ( ) . Err ( ) == nil {
output , err := exec . Command ( "ss" , "-i" , "-H" , "-t" ) . Output ( )
if err != nil {
fmt . Fprintf ( w , "ss failed: %v" , err )
return
}
newState := parseSSOutput ( string ( output ) )
s . mu . Lock ( )
for k , next := range newState {
prev := prevState [ k ]
if prev . Sent < next . Sent || prev . Recv < next . Recv {
if pkey , ok := s . keyOfAddr [ k ] ; ok {
next . Key = pkey
if err := enc . Encode ( next ) ; err != nil {
s . mu . Unlock ( )
return
}
}
}
}
s . mu . Unlock ( )
prevState = newState
if _ , err := fmt . Fprintln ( w ) ; err != nil {
return
}
if f , ok := w . ( http . Flusher ) ; ok {
f . Flush ( )
}
time . Sleep ( minTimeBetweenLogs )
}
}
2021-08-02 17:17:08 +01:00
var bufioWriterPool = & sync . Pool {
New : func ( ) interface { } {
return bufio . NewWriterSize ( ioutil . Discard , 2 << 10 )
} ,
}
// lazyBufioWriter is a bufio.Writer-like wrapping writer that lazily
// allocates its actual bufio.Writer from a sync.Pool, releasing it to
// the pool upon flush.
//
// We do this to reduce memory overhead; most DERP connections are
// idle and the idle bufio.Writers were 30% of overall memory usage.
type lazyBufioWriter struct {
w io . Writer // underlying
lbw * bufio . Writer // lazy; nil means it needs an associated buffer
}
func ( w * lazyBufioWriter ) bw ( ) * bufio . Writer {
if w . lbw == nil {
w . lbw = bufioWriterPool . Get ( ) . ( * bufio . Writer )
w . lbw . Reset ( w . w )
}
return w . lbw
}
func ( w * lazyBufioWriter ) Available ( ) int { return w . bw ( ) . Available ( ) }
func ( w * lazyBufioWriter ) Write ( p [ ] byte ) ( int , error ) { return w . bw ( ) . Write ( p ) }
func ( w * lazyBufioWriter ) Flush ( ) error {
if w . lbw == nil {
return nil
}
err := w . lbw . Flush ( )
w . lbw . Reset ( ioutil . Discard )
bufioWriterPool . Put ( w . lbw )
w . lbw = nil
return err
}