diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index b4a1817b8..f7670e329 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -1443,28 +1443,40 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netaddr.IPPort, d continue } - // Before we wake up ReceiveIPv4 with SetReadDeadline, - // note that a DERP packet has arrived. ReceiveIPv4 - // will read this field to note that its UDP read - // error is due to us. - atomic.AddInt64(&c.derpRecvCountAtomic, 1) - // Cancel the pconn read goroutine. - c.pconn4.SetReadDeadline(aLongTimeAgo) + if !c.sendDerpReadResult(ctx, res) { + return + } select { case <-ctx.Done(): return - case c.derpRecvCh <- res: - select { - case <-ctx.Done(): - return - case <-didCopy: - continue - } + case <-didCopy: + continue } } } +// sendDerpReadResult sends res to c.derpRecvCh and reports whether it +// was sent. (It reports false if ctx was done first.) +// +// This includes doing the whole wake-up dance to interrupt +// ReceiveIPv4's blocking UDP read. +func (c *Conn) sendDerpReadResult(ctx context.Context, res derpReadResult) (sent bool) { + // Before we wake up ReceiveIPv4 with SetReadDeadline, + // note that a DERP packet has arrived. ReceiveIPv4 + // will read this field to note that its UDP read + // error is due to us. + atomic.AddInt64(&c.derpRecvCountAtomic, 1) + // Cancel the pconn read goroutine. + c.pconn4.SetReadDeadline(aLongTimeAgo) + select { + case <-ctx.Done(): + return false + case c.derpRecvCh <- res: + return true + } +} + type derpWriteRequest struct { addr netaddr.IPPort pubKey key.Public diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 620394b12..ed056ffd0 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -11,12 +11,14 @@ import ( "crypto/tls" "encoding/binary" "encoding/json" + "flag" "fmt" "io/ioutil" "net" "net/http" "net/http/httptest" "os" + "runtime" "strconv" "strings" "sync" @@ -1410,19 +1412,112 @@ func Test32bitAlignment(t *testing.T) { atomic.AddInt64(&c.derpRecvCountAtomic, 1) } -func BenchmarkReceiveFrom(b *testing.B) { - port := pickPort(b) +// newNonLegacyTestConn returns a new Conn with DisableLegacyNetworking set true. +func newNonLegacyTestConn(t testing.TB) *Conn { + t.Helper() + port := pickPort(t) conn, err := NewConn(Options{ - Logf: b.Logf, + Logf: t.Logf, Port: port, EndpointsFunc: func(eps []string) { - b.Logf("endpoints: %q", eps) + t.Logf("endpoints: %q", eps) }, DisableLegacyNetworking: true, }) if err != nil { - b.Fatal(err) + t.Fatal(err) } + return conn +} + +var testIssue1282 = flag.Bool("test-issue-1282", false, "run test for https://github.com/tailscale/tailscale/issues/1282 on all CPUs") + +// Tests concurrent DERP readers pushing DERP data into ReceiveIPv4 +// (which should blend all DERP reads into UDP reads). +func TestDerpReceiveFromIPv4(t *testing.T) { + conn := newNonLegacyTestConn(t) + defer conn.Close() + + sendConn, err := net.ListenPacket("udp4", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer sendConn.Close() + nodeKey, _ := addTestEndpoint(conn, sendConn) + + var sends int = 500 + senders := runtime.NumCPU() + if !*testIssue1282 { + t.Logf("--test-issue-1282 was not specified; so doing single-threaded test (instead of NumCPU=%d) to work around https://github.com/tailscale/tailscale/issues/1282", senders) + senders = 1 + } + sends -= (sends % senders) + var wg sync.WaitGroup + defer wg.Wait() + t.Logf("doing %v sends over %d senders", sends, senders) + ctx := context.Background() + + for i := 0; i < senders; i++ { + wg.Add(1) + regionID := i + 1 + go func() { + defer wg.Done() + ch := make(chan bool, 1) + for i := 0; i < sends/senders; i++ { + if !conn.sendDerpReadResult(ctx, derpReadResult{ + regionID: regionID, + n: 123, + src: key.Public(nodeKey), + copyBuf: func(dst []byte) int { + ch <- true + return 123 + }, + }) { + t.Error("unexpected false") + return + } + <-ch + } + }() + } + + buf := make([]byte, 1500) + for i := 0; i < sends; i++ { + n, ep, err := conn.ReceiveIPv4(buf) + if err != nil { + t.Fatal(err) + } + _ = n + _ = ep + } +} + +// addTestEndpoint sets conn's network map to a single peer expected +// to receive packets from sendConn (or DERP), and returns that peer's +// nodekey and discokey. +func addTestEndpoint(conn *Conn, sendConn net.PacketConn) (tailcfg.NodeKey, tailcfg.DiscoKey) { + // Give conn just enough state that it'll recognize sendConn as a + // valid peer and not fall through to the legacy magicsock + // codepath. + discoKey := tailcfg.DiscoKey{31: 1} + nodeKey := tailcfg.NodeKey{0: 'N', 1: 'K'} + conn.SetNetworkMap(&controlclient.NetworkMap{ + Peers: []*tailcfg.Node{ + { + Key: nodeKey, + DiscoKey: discoKey, + Endpoints: []string{sendConn.LocalAddr().String()}, + }, + }, + }) + conn.SetPrivateKey(wgkey.Private{0: 1}) + conn.CreateEndpoint([32]byte(nodeKey), "0000000000000000000000000000000000000000000000000000000000000001.disco.tailscale:12345") + conn.addValidDiscoPathForTest(discoKey, netaddr.MustParseIPPort(sendConn.LocalAddr().String())) + return nodeKey, discoKey +} + +func BenchmarkReceiveFrom(b *testing.B) { + conn := newNonLegacyTestConn(b) defer conn.Close() sendConn, err := net.ListenPacket("udp4", "127.0.0.1:0") @@ -1431,20 +1526,7 @@ func BenchmarkReceiveFrom(b *testing.B) { } defer sendConn.Close() - // Give conn just enough state that it'll recognize sendConn as a - // valid peer and not fall through to the legacy magicsock - // codepath. - discoKey := tailcfg.DiscoKey{31: 1} - conn.SetNetworkMap(&controlclient.NetworkMap{ - Peers: []*tailcfg.Node{ - { - DiscoKey: discoKey, - Endpoints: []string{sendConn.LocalAddr().String()}, - }, - }, - }) - conn.CreateEndpoint([32]byte{1: 1}, "0000000000000000000000000000000000000000000000000000000000000001.disco.tailscale:12345") - conn.addValidDiscoPathForTest(discoKey, netaddr.MustParseIPPort(sendConn.LocalAddr().String())) + addTestEndpoint(conn, sendConn) var dstAddr net.Addr = conn.pconn4.LocalAddr() sendBuf := make([]byte, 1<<10)