diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index d1b65a35e..d83dd0f18 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -771,7 +771,8 @@ func (c *Conn) Send(b []byte, ep conn.Endpoint) error { c.logf("magicsock: [unexpected] DERP BUG: attempting to send packet to DERP address %v", addr) return nil } - return c.sendUDPStd(addr, b) + _, err := c.sendUDPStd(addr, b) + return err case *AddrSet: as = v } @@ -786,8 +787,8 @@ func (c *Conn) Send(b []byte, ep conn.Endpoint) error { var success bool var ret error for _, addr := range dsts { - err := c.sendAddr(addr, as.publicKey, b) - if err == nil { + sent, err := c.sendAddr(addr, as.publicKey, b) + if sent { success = true } else if ret == nil { ret = err @@ -809,45 +810,55 @@ var errConnClosed = errors.New("Conn closed") var errDropDerpPacket = errors.New("too many DERP packets queued; dropping") // sendUDP sends UDP packet b to ipp. -func (c *Conn) sendUDP(ipp netaddr.IPPort, b []byte) error { +// See sendAddr's docs on the return value meanings. +func (c *Conn) sendUDP(ipp netaddr.IPPort, b []byte) (sent bool, err error) { ua := ipp.UDPAddr() defer netaddr.PutUDPAddr(ua) return c.sendUDPStd(ua, b) } -func (c *Conn) sendUDPStd(addr *net.UDPAddr, b []byte) (err error) { +// sendUDP sends UDP packet b to addr. +// See sendAddr's docs on the return value meanings. +func (c *Conn) sendUDPStd(addr *net.UDPAddr, b []byte) (sent bool, err error) { switch { case addr.IP.To4() != nil: _, err = c.pconn4.WriteTo(b, addr) if err != nil && c.noV4.Get() { - return nil + return false, nil } case len(addr.IP) == net.IPv6len: if c.pconn6 == nil { // ignore IPv6 dest if we don't have an IPv6 address. - return nil + return false, nil } _, err = c.pconn6.WriteTo(b, addr) if err != nil && c.noV6.Get() { - return nil + return false, nil } default: - return errors.New("bogus sendUDPStd addr type") + panic("bogus sendUDPStd addr type") } - return err + return err == nil, err } // sendAddr sends packet b to addr, which is either a real UDP address // or a fake UDP address representing a DERP server (see derpmap.go). // The provided public key identifies the recipient. -func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.Public, b []byte) error { +// +// The returned err is whether there was an error writing when it +// should've worked. +// The returned sent is whether a packet went out at all. +// An example of when they might be different: sending to an +// IPv6 address when the local machine doesn't have IPv6 support +// returns (false, nil); it's not an error, but nothing was sent. +func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.Public, b []byte) (sent bool, err error) { if addr.IP != derpMagicIPAddr { return c.sendUDP(addr, b) } ch := c.derpWriteChanOfAddr(addr, pubKey) if ch == nil { - return nil + return false, nil } // TODO(bradfitz): this makes garbage for now; we could use a @@ -860,12 +871,12 @@ func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.Public, b []byte) error select { case <-c.donec(): - return errConnClosed + return false, errConnClosed case ch <- derpWriteRequest{addr, pubKey, pkt}: - return nil + return true, nil default: // Too many writes queued. Drop packet. - return errDropDerpPacket + return false, errDropDerpPacket } } @@ -1372,7 +1383,7 @@ func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, *net.UDPAddr, error) { } } -func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.Public, dstDisco tailcfg.DiscoKey, m disco.Message) error { +func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.Public, dstDisco tailcfg.DiscoKey, m disco.Message) (sent bool, err error) { c.mu.Lock() var nonce [disco.NonceLen]byte if _, err := crand.Read(nonce[:]); err != nil { @@ -1386,9 +1397,15 @@ func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.Public, dstDisco c.mu.Unlock() pkt = box.SealAfterPrecomputation(pkt, m.AppendMarshal(nil), &nonce, sharedKey) - err := c.sendAddr(dst, dstKey, pkt) - c.logf("magicsock: disco: sent %T to %v; err=%v", m, dst, err) - return err + sent, err = c.sendAddr(dst, dstKey, pkt) + if sent { + c.logf("magicsock: disco: sent %T to %v", m, dst) + } else if err == nil { + c.logf("magicsock: disco: can't send %T to %v", m, dst) + } else { + c.logf("magicsock: disco: failed to send %T to %v: %v", m, dst, err) + } + return sent, err } // handleDiscoMessage reports whether msg was a Tailscale inter-node discovery message @@ -2528,7 +2545,7 @@ type discoEndpoint struct { // mu protects all following fields. mu sync.Mutex // Lock ordering: Conn.mu, then discoEndpoint.mu - lastSend time.Time + lastSend time.Time // last time there was outgoing packets sent to this peer (from wireguard-go) derpAddr netaddr.IPPort // fallback/bootstrap path, if non-zero (non-zero for well-behaved clients) bestAddr netaddr.IPPort // best non-DERP path; zero if none @@ -2537,6 +2554,9 @@ type discoEndpoint struct { sentPing map[stun.TxID]sentPing endpointState map[netaddr.IPPort]*endpointState + // timers are all outstanding timers. They're all stopped on + // cleanup if the discovery endpoint is removed from the + // network map. timers map[*time.Timer]bool } @@ -2547,8 +2567,9 @@ type endpointState struct { } type sentPing struct { - to netaddr.IPPort - at time.Time + to netaddr.IPPort + at time.Time + timer *time.Timer // timeout timer } // initFakeUDPAddr populates fakeWGAddr with a globally unique fake UDPAddr. @@ -2611,24 +2632,72 @@ func (de *discoEndpoint) send(b []byte) error { return errors.New("no DERP addr") } } - return de.c.sendAddr(bestAddr, de.publicKey, b) + _, err := de.c.sendAddr(bestAddr, de.publicKey, b) + return err +} + +// newTimerLocked creates a new AfterFunc(d, f) and returns it after +// registering it with de.timers. +// +// The timer will unregister itself from de.timers after it fires and after f runs. +func (de *discoEndpoint) newTimerLocked(d time.Duration, f func()) *time.Timer { + var t *time.Timer + t = time.AfterFunc(d, func() { + f() + + de.mu.Lock() + delete(de.timers, t) + de.mu.Unlock() + }) + de.timers[t] = true + return t +} + +// forgetPing is called by a timer when a ping either fails to send or +// has taken too long to get a pong reply. +func (de *discoEndpoint) forgetPing(txid stun.TxID) { + de.mu.Lock() + defer de.mu.Unlock() + if sp, ok := de.sentPing[txid]; ok { + // Stop the timer for the case where sendPing failed to write to UDP. + // In the case of a timer already having fired, this is a no-op: + sp.timer.Stop() + delete(de.sentPing, txid) + delete(de.timers, sp.timer) + } +} + +// sendPing sends a ping with the provided txid to ep. +// The caller should've already been recorded the ping in sentPing +// and set up the timer. +func (de *discoEndpoint) sendPing(ep netaddr.IPPort, txid stun.TxID) { + sent, _ := de.sendDiscoMessage(ep, &disco.Ping{TxID: [12]byte(txid)}) + if !sent { + de.forgetPing(txid) + } } func (de *discoEndpoint) sendPingsLocked(now time.Time) { sent := false for ep, st := range de.endpointState { + ep := ep if !st.lastPing.IsZero() && now.Sub(st.lastPing) < 5*time.Second { continue } st.lastPing = now txid := stun.NewTxID() + t := de.newTimerLocked(5*time.Second, func() { + de.c.logf("magicsock: disco: timeout waiting for ping %x from %v", txid, ep) + de.forgetPing(txid) + }) de.sentPing[txid] = sentPing{ - to: ep, - at: now, + to: ep, + at: now, + timer: t, } sent = true - go de.sendDiscoMessage(ep, &disco.Ping{TxID: [12]byte(txid)}) + go de.sendPing(ep, txid) } derpAddr := de.derpAddr if sent && derpAddr.Port != 0 { @@ -2642,7 +2711,7 @@ func (de *discoEndpoint) sendPingsLocked(now time.Time) { } } -func (de *discoEndpoint) sendDiscoMessage(dst netaddr.IPPort, dm disco.Message) error { +func (de *discoEndpoint) sendDiscoMessage(dst netaddr.IPPort, dm disco.Message) (sent bool, err error) { return de.c.sendDiscoMessage(dst, de.publicKey, de.discoKey, dm) }