diff --git a/derp/derp_server.go b/derp/derp_server.go index 327f104a1..5f3a226f3 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -279,6 +279,7 @@ func (s *dupClientSet) removeClient(c *sclient) bool { // public key gets more than one PacketForwarder registered for it. type PacketForwarder interface { ForwardPacket(src, dst key.NodePublic, payload []byte) error + String() string } // Conn is the subset of the underlying net.Conn the DERP Server needs. @@ -495,6 +496,7 @@ func (s *Server) registerClient(c *sclient) { switch set := set.(type) { case nil: s.clients[c.key] = singleClient{c} + c.debug("register single client") case singleClient: s.dupClientKeys.Add(1) s.dupClientConns.Add(2) // both old and new count @@ -510,6 +512,7 @@ func (s *Server) registerClient(c *sclient) { }, sendHistory: []*sclient{old}, } + c.debug("register duplicate client") case *dupClientSet: s.dupClientConns.Add(1) // the gauge s.dupClientConnTotal.Add(1) // the counter @@ -517,6 +520,7 @@ func (s *Server) registerClient(c *sclient) { set.set[c] = true set.last = c set.sendHistory = append(set.sendHistory, c) + c.debug("register another duplicate client") } if _, ok := s.clientsMesh[c.key]; !ok { @@ -549,7 +553,7 @@ func (s *Server) unregisterClient(c *sclient) { case nil: c.logf("[unexpected]; clients map is empty") case singleClient: - c.logf("removing connection") + c.logf("removed connection") delete(s.clients, c.key) if v, ok := s.clientsMesh[c.key]; ok && v == nil { delete(s.clientsMesh, c.key) @@ -557,6 +561,7 @@ func (s *Server) unregisterClient(c *sclient) { } s.broadcastPeerStateChangeLocked(c.key, false) case *dupClientSet: + c.debug("removed duplicate client") if set.removeClient(c) { s.dupClientConns.Add(-1) } else { @@ -673,7 +678,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem nc: nc, br: br, bw: bw, - logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)), + logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v%s: ", remoteAddr, clientKey.ShortString())), done: ctx.Done(), remoteAddr: remoteAddr, remoteIPPort: remoteIPPort, @@ -690,6 +695,9 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem } if clientInfo != nil { c.info = *clientInfo + if envknob.Bool("DERP_PROBER_DEBUG_LOGS") && clientInfo.IsProber { + c.debugLogging = true + } } s.registerClient(c) @@ -726,6 +734,7 @@ func (c *sclient) run(ctx context.Context) error { for { ft, fl, err := readFrameHeader(c.br) + c.debug("read frame type %d len %d err %v", ft, fl, err) if err != nil { if errors.Is(err, io.EOF) { c.logf("read EOF") @@ -735,7 +744,7 @@ func (c *sclient) run(ctx context.Context) error { c.logf("closing; server closed") return nil } - return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err) + return fmt.Errorf("client %s: readFrameHeader: %w", c.key.ShortString(), err) } c.s.noteClientActivity(c) switch ft { @@ -883,6 +892,8 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { return nil } + dst.debug("received forwarded packet from %s via %s", srcKey.ShortString(), c.key.ShortString()) + return c.sendPkt(dst, pkt{ bs: contents, enqueuedAt: time.Now(), @@ -930,7 +941,9 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { if dst == nil { if fwd != nil { s.packetsForwardedOut.Add(1) - if err := fwd.ForwardPacket(c.key, dstKey, contents); err != nil { + err := fwd.ForwardPacket(c.key, dstKey, contents) + c.debug("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err) + if err != nil { // TODO: return nil } @@ -941,8 +954,10 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { reason = dropReasonDupClient } s.recordDrop(contents, c.key, dstKey, reason) + c.debug("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason) return nil } + c.debug("SendPacket for %s, sending directly", dstKey.ShortString()) p := pkt{ bs: contents, @@ -952,6 +967,12 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { return c.sendPkt(dst, p) } +func (c *sclient) debug(format string, v ...any) { + if c.debugLogging { + c.logf(format, v...) + } +} + // dropReason is why we dropped a DERP frame. type dropReason int @@ -1003,11 +1024,13 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { select { case <-dst.done: s.recordDrop(p.bs, c.key, dstKey, dropReasonGone) + dst.debug("sendPkt attempt %d dropped, dst gone", attempt) return nil default: } select { case sendQueue <- p: + dst.debug("sendPkt attempt %d enqueued", attempt) return nil default: } @@ -1023,6 +1046,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { // contended queue with racing writers. Give up and tail-drop in // this case to keep reader unblocked. s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail) + dst.debug("sendPkt attempt %d dropped, queue full") return nil } @@ -1258,6 +1282,8 @@ type sclient struct { isDup atomic.Bool // whether more than 1 sclient for key is connected isDisabled atomic.Bool // whether sends to this peer are disabled due to active/active dups + debugLogging bool + // replaceLimiter controls how quickly two connections with // the same client key can kick each other off the server by // taking over ownership of a key. @@ -1529,6 +1555,7 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error) c.s.packetsSent.Add(1) c.s.bytesSent.Add(int64(len(contents))) } + c.debug("sendPacket from %s: %v", srcKey.ShortString(), err) }() c.setWriteDeadline() @@ -1689,6 +1716,10 @@ func (f *multiForwarder) ForwardPacket(src, dst key.NodePublic, payload []byte) return f.fwd.Load().ForwardPacket(src, dst, payload) } +func (f *multiForwarder) String() string { + return fmt.Sprintf("", f.fwd.Load(), len(f.all)) +} + func (s *Server) expVarFunc(f func() any) expvar.Func { return expvar.Func(func() any { s.mu.Lock() diff --git a/derp/derp_test.go b/derp/derp_test.go index 20a6432b8..ba4568bd5 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -660,6 +660,9 @@ type testFwd int func (testFwd) ForwardPacket(key.NodePublic, key.NodePublic, []byte) error { panic("not called in tests") } +func (testFwd) String() string { + panic("not called in tests") +} func pubAll(b byte) (ret key.NodePublic) { var bs [32]byte @@ -787,6 +790,7 @@ type channelFwd struct { c chan []byte } +func (f channelFwd) String() string { return "" } func (f channelFwd) ForwardPacket(_ key.NodePublic, _ key.NodePublic, packet []byte) error { f.c <- packet return nil diff --git a/derp/derphttp/derphttp_client.go b/derp/derphttp/derphttp_client.go index 1a870b0fb..1e7cbf551 100644 --- a/derp/derphttp/derphttp_client.go +++ b/derp/derphttp/derphttp_client.go @@ -82,6 +82,10 @@ type Client struct { pingOut map[derp.PingMessage]chan<- bool // chan to send to on pong } +func (c *Client) String() string { + return fmt.Sprintf("", c.serverPubKey.ShortString(), c.url) +} + // NewRegionClient returns a new DERP-over-HTTP client. It connects lazily. // To trigger a connection, use Connect. func NewRegionClient(privateKey key.NodePrivate, logf logger.Logf, getRegion func() *tailcfg.DERPRegion) *Client {