From d06ceffd02d03539d5a6f6422b8807ea73ad760a Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 6 Feb 2021 21:27:02 -0800 Subject: [PATCH] wgengine/magicsock: add disabled failing (deadlocking) test for #1282 The fix can make this test run unconditionally. This moves code from 5c619882bc4911a2c9e7d0bb491b9e50d27afcd7 for testability but doesn't fix it yet. The #1282 problem remains (when I wrote its wake-up mechanism, I forgot there were N DERP readers funneling into 1 UDP reader, and the code just isn't correct at all for that case). Also factor out some test helper code from BenchmarkReceiveFrom. The refactoring in magicsock.go for testability should have no behavior change. (cherry picked from commit 6d2b8df06df1d20d4ae92793a066340bb26b6e25) --- wgengine/magicsock/magicsock.go | 40 +++++---- wgengine/magicsock/magicsock_test.go | 120 ++++++++++++++++++++++----- 2 files changed, 127 insertions(+), 33 deletions(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index b4a1817b8..f7670e329 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -1443,28 +1443,40 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netaddr.IPPort, d continue } - // Before we wake up ReceiveIPv4 with SetReadDeadline, - // note that a DERP packet has arrived. ReceiveIPv4 - // will read this field to note that its UDP read - // error is due to us. - atomic.AddInt64(&c.derpRecvCountAtomic, 1) - // Cancel the pconn read goroutine. - c.pconn4.SetReadDeadline(aLongTimeAgo) + if !c.sendDerpReadResult(ctx, res) { + return + } select { case <-ctx.Done(): return - case c.derpRecvCh <- res: - select { - case <-ctx.Done(): - return - case <-didCopy: - continue - } + case <-didCopy: + continue } } } +// sendDerpReadResult sends res to c.derpRecvCh and reports whether it +// was sent. (It reports false if ctx was done first.) +// +// This includes doing the whole wake-up dance to interrupt +// ReceiveIPv4's blocking UDP read. +func (c *Conn) sendDerpReadResult(ctx context.Context, res derpReadResult) (sent bool) { + // Before we wake up ReceiveIPv4 with SetReadDeadline, + // note that a DERP packet has arrived. ReceiveIPv4 + // will read this field to note that its UDP read + // error is due to us. + atomic.AddInt64(&c.derpRecvCountAtomic, 1) + // Cancel the pconn read goroutine. + c.pconn4.SetReadDeadline(aLongTimeAgo) + select { + case <-ctx.Done(): + return false + case c.derpRecvCh <- res: + return true + } +} + type derpWriteRequest struct { addr netaddr.IPPort pubKey key.Public diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 620394b12..ed056ffd0 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -11,12 +11,14 @@ import ( "crypto/tls" "encoding/binary" "encoding/json" + "flag" "fmt" "io/ioutil" "net" "net/http" "net/http/httptest" "os" + "runtime" "strconv" "strings" "sync" @@ -1410,19 +1412,112 @@ func Test32bitAlignment(t *testing.T) { atomic.AddInt64(&c.derpRecvCountAtomic, 1) } -func BenchmarkReceiveFrom(b *testing.B) { - port := pickPort(b) +// newNonLegacyTestConn returns a new Conn with DisableLegacyNetworking set true. +func newNonLegacyTestConn(t testing.TB) *Conn { + t.Helper() + port := pickPort(t) conn, err := NewConn(Options{ - Logf: b.Logf, + Logf: t.Logf, Port: port, EndpointsFunc: func(eps []string) { - b.Logf("endpoints: %q", eps) + t.Logf("endpoints: %q", eps) }, DisableLegacyNetworking: true, }) if err != nil { - b.Fatal(err) + t.Fatal(err) } + return conn +} + +var testIssue1282 = flag.Bool("test-issue-1282", false, "run test for https://github.com/tailscale/tailscale/issues/1282 on all CPUs") + +// Tests concurrent DERP readers pushing DERP data into ReceiveIPv4 +// (which should blend all DERP reads into UDP reads). +func TestDerpReceiveFromIPv4(t *testing.T) { + conn := newNonLegacyTestConn(t) + defer conn.Close() + + sendConn, err := net.ListenPacket("udp4", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer sendConn.Close() + nodeKey, _ := addTestEndpoint(conn, sendConn) + + var sends int = 500 + senders := runtime.NumCPU() + if !*testIssue1282 { + t.Logf("--test-issue-1282 was not specified; so doing single-threaded test (instead of NumCPU=%d) to work around https://github.com/tailscale/tailscale/issues/1282", senders) + senders = 1 + } + sends -= (sends % senders) + var wg sync.WaitGroup + defer wg.Wait() + t.Logf("doing %v sends over %d senders", sends, senders) + ctx := context.Background() + + for i := 0; i < senders; i++ { + wg.Add(1) + regionID := i + 1 + go func() { + defer wg.Done() + ch := make(chan bool, 1) + for i := 0; i < sends/senders; i++ { + if !conn.sendDerpReadResult(ctx, derpReadResult{ + regionID: regionID, + n: 123, + src: key.Public(nodeKey), + copyBuf: func(dst []byte) int { + ch <- true + return 123 + }, + }) { + t.Error("unexpected false") + return + } + <-ch + } + }() + } + + buf := make([]byte, 1500) + for i := 0; i < sends; i++ { + n, ep, err := conn.ReceiveIPv4(buf) + if err != nil { + t.Fatal(err) + } + _ = n + _ = ep + } +} + +// addTestEndpoint sets conn's network map to a single peer expected +// to receive packets from sendConn (or DERP), and returns that peer's +// nodekey and discokey. +func addTestEndpoint(conn *Conn, sendConn net.PacketConn) (tailcfg.NodeKey, tailcfg.DiscoKey) { + // Give conn just enough state that it'll recognize sendConn as a + // valid peer and not fall through to the legacy magicsock + // codepath. + discoKey := tailcfg.DiscoKey{31: 1} + nodeKey := tailcfg.NodeKey{0: 'N', 1: 'K'} + conn.SetNetworkMap(&controlclient.NetworkMap{ + Peers: []*tailcfg.Node{ + { + Key: nodeKey, + DiscoKey: discoKey, + Endpoints: []string{sendConn.LocalAddr().String()}, + }, + }, + }) + conn.SetPrivateKey(wgkey.Private{0: 1}) + conn.CreateEndpoint([32]byte(nodeKey), "0000000000000000000000000000000000000000000000000000000000000001.disco.tailscale:12345") + conn.addValidDiscoPathForTest(discoKey, netaddr.MustParseIPPort(sendConn.LocalAddr().String())) + return nodeKey, discoKey +} + +func BenchmarkReceiveFrom(b *testing.B) { + conn := newNonLegacyTestConn(b) defer conn.Close() sendConn, err := net.ListenPacket("udp4", "127.0.0.1:0") @@ -1431,20 +1526,7 @@ func BenchmarkReceiveFrom(b *testing.B) { } defer sendConn.Close() - // Give conn just enough state that it'll recognize sendConn as a - // valid peer and not fall through to the legacy magicsock - // codepath. - discoKey := tailcfg.DiscoKey{31: 1} - conn.SetNetworkMap(&controlclient.NetworkMap{ - Peers: []*tailcfg.Node{ - { - DiscoKey: discoKey, - Endpoints: []string{sendConn.LocalAddr().String()}, - }, - }, - }) - conn.CreateEndpoint([32]byte{1: 1}, "0000000000000000000000000000000000000000000000000000000000000001.disco.tailscale:12345") - conn.addValidDiscoPathForTest(discoKey, netaddr.MustParseIPPort(sendConn.LocalAddr().String())) + addTestEndpoint(conn, sendConn) var dstAddr net.Addr = conn.pconn4.LocalAddr() sendBuf := make([]byte, 1<<10)