wgengine/magicsock: add disabled failing (deadlocking) test for #1282
The fix can make this test run unconditionally. This moves code from5c619882bc
for testability but doesn't fix it yet. The #1282 problem remains (when I wrote its wake-up mechanism, I forgot there were N DERP readers funneling into 1 UDP reader, and the code just isn't correct at all for that case). Also factor out some test helper code from BenchmarkReceiveFrom. The refactoring in magicsock.go for testability should have no behavior change. (cherry picked from commit6d2b8df06d
)
This commit is contained in:
parent
0cf60b5185
commit
d06ceffd02
|
@ -1443,28 +1443,40 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netaddr.IPPort, d
|
|||
continue
|
||||
|
||||
}
|
||||
// Before we wake up ReceiveIPv4 with SetReadDeadline,
|
||||
// note that a DERP packet has arrived. ReceiveIPv4
|
||||
// will read this field to note that its UDP read
|
||||
// error is due to us.
|
||||
atomic.AddInt64(&c.derpRecvCountAtomic, 1)
|
||||
// Cancel the pconn read goroutine.
|
||||
c.pconn4.SetReadDeadline(aLongTimeAgo)
|
||||
|
||||
if !c.sendDerpReadResult(ctx, res) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case c.derpRecvCh <- res:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-didCopy:
|
||||
continue
|
||||
}
|
||||
case <-didCopy:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendDerpReadResult sends res to c.derpRecvCh and reports whether it
|
||||
// was sent. (It reports false if ctx was done first.)
|
||||
//
|
||||
// This includes doing the whole wake-up dance to interrupt
|
||||
// ReceiveIPv4's blocking UDP read.
|
||||
func (c *Conn) sendDerpReadResult(ctx context.Context, res derpReadResult) (sent bool) {
|
||||
// Before we wake up ReceiveIPv4 with SetReadDeadline,
|
||||
// note that a DERP packet has arrived. ReceiveIPv4
|
||||
// will read this field to note that its UDP read
|
||||
// error is due to us.
|
||||
atomic.AddInt64(&c.derpRecvCountAtomic, 1)
|
||||
// Cancel the pconn read goroutine.
|
||||
c.pconn4.SetReadDeadline(aLongTimeAgo)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case c.derpRecvCh <- res:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
type derpWriteRequest struct {
|
||||
addr netaddr.IPPort
|
||||
pubKey key.Public
|
||||
|
|
|
@ -11,12 +11,14 @@ import (
|
|||
"crypto/tls"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -1410,19 +1412,112 @@ func Test32bitAlignment(t *testing.T) {
|
|||
atomic.AddInt64(&c.derpRecvCountAtomic, 1)
|
||||
}
|
||||
|
||||
func BenchmarkReceiveFrom(b *testing.B) {
|
||||
port := pickPort(b)
|
||||
// newNonLegacyTestConn returns a new Conn with DisableLegacyNetworking set true.
|
||||
func newNonLegacyTestConn(t testing.TB) *Conn {
|
||||
t.Helper()
|
||||
port := pickPort(t)
|
||||
conn, err := NewConn(Options{
|
||||
Logf: b.Logf,
|
||||
Logf: t.Logf,
|
||||
Port: port,
|
||||
EndpointsFunc: func(eps []string) {
|
||||
b.Logf("endpoints: %q", eps)
|
||||
t.Logf("endpoints: %q", eps)
|
||||
},
|
||||
DisableLegacyNetworking: true,
|
||||
})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
return conn
|
||||
}
|
||||
|
||||
var testIssue1282 = flag.Bool("test-issue-1282", false, "run test for https://github.com/tailscale/tailscale/issues/1282 on all CPUs")
|
||||
|
||||
// Tests concurrent DERP readers pushing DERP data into ReceiveIPv4
|
||||
// (which should blend all DERP reads into UDP reads).
|
||||
func TestDerpReceiveFromIPv4(t *testing.T) {
|
||||
conn := newNonLegacyTestConn(t)
|
||||
defer conn.Close()
|
||||
|
||||
sendConn, err := net.ListenPacket("udp4", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer sendConn.Close()
|
||||
nodeKey, _ := addTestEndpoint(conn, sendConn)
|
||||
|
||||
var sends int = 500
|
||||
senders := runtime.NumCPU()
|
||||
if !*testIssue1282 {
|
||||
t.Logf("--test-issue-1282 was not specified; so doing single-threaded test (instead of NumCPU=%d) to work around https://github.com/tailscale/tailscale/issues/1282", senders)
|
||||
senders = 1
|
||||
}
|
||||
sends -= (sends % senders)
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
t.Logf("doing %v sends over %d senders", sends, senders)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < senders; i++ {
|
||||
wg.Add(1)
|
||||
regionID := i + 1
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ch := make(chan bool, 1)
|
||||
for i := 0; i < sends/senders; i++ {
|
||||
if !conn.sendDerpReadResult(ctx, derpReadResult{
|
||||
regionID: regionID,
|
||||
n: 123,
|
||||
src: key.Public(nodeKey),
|
||||
copyBuf: func(dst []byte) int {
|
||||
ch <- true
|
||||
return 123
|
||||
},
|
||||
}) {
|
||||
t.Error("unexpected false")
|
||||
return
|
||||
}
|
||||
<-ch
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
buf := make([]byte, 1500)
|
||||
for i := 0; i < sends; i++ {
|
||||
n, ep, err := conn.ReceiveIPv4(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_ = n
|
||||
_ = ep
|
||||
}
|
||||
}
|
||||
|
||||
// addTestEndpoint sets conn's network map to a single peer expected
|
||||
// to receive packets from sendConn (or DERP), and returns that peer's
|
||||
// nodekey and discokey.
|
||||
func addTestEndpoint(conn *Conn, sendConn net.PacketConn) (tailcfg.NodeKey, tailcfg.DiscoKey) {
|
||||
// Give conn just enough state that it'll recognize sendConn as a
|
||||
// valid peer and not fall through to the legacy magicsock
|
||||
// codepath.
|
||||
discoKey := tailcfg.DiscoKey{31: 1}
|
||||
nodeKey := tailcfg.NodeKey{0: 'N', 1: 'K'}
|
||||
conn.SetNetworkMap(&controlclient.NetworkMap{
|
||||
Peers: []*tailcfg.Node{
|
||||
{
|
||||
Key: nodeKey,
|
||||
DiscoKey: discoKey,
|
||||
Endpoints: []string{sendConn.LocalAddr().String()},
|
||||
},
|
||||
},
|
||||
})
|
||||
conn.SetPrivateKey(wgkey.Private{0: 1})
|
||||
conn.CreateEndpoint([32]byte(nodeKey), "0000000000000000000000000000000000000000000000000000000000000001.disco.tailscale:12345")
|
||||
conn.addValidDiscoPathForTest(discoKey, netaddr.MustParseIPPort(sendConn.LocalAddr().String()))
|
||||
return nodeKey, discoKey
|
||||
}
|
||||
|
||||
func BenchmarkReceiveFrom(b *testing.B) {
|
||||
conn := newNonLegacyTestConn(b)
|
||||
defer conn.Close()
|
||||
|
||||
sendConn, err := net.ListenPacket("udp4", "127.0.0.1:0")
|
||||
|
@ -1431,20 +1526,7 @@ func BenchmarkReceiveFrom(b *testing.B) {
|
|||
}
|
||||
defer sendConn.Close()
|
||||
|
||||
// Give conn just enough state that it'll recognize sendConn as a
|
||||
// valid peer and not fall through to the legacy magicsock
|
||||
// codepath.
|
||||
discoKey := tailcfg.DiscoKey{31: 1}
|
||||
conn.SetNetworkMap(&controlclient.NetworkMap{
|
||||
Peers: []*tailcfg.Node{
|
||||
{
|
||||
DiscoKey: discoKey,
|
||||
Endpoints: []string{sendConn.LocalAddr().String()},
|
||||
},
|
||||
},
|
||||
})
|
||||
conn.CreateEndpoint([32]byte{1: 1}, "0000000000000000000000000000000000000000000000000000000000000001.disco.tailscale:12345")
|
||||
conn.addValidDiscoPathForTest(discoKey, netaddr.MustParseIPPort(sendConn.LocalAddr().String()))
|
||||
addTestEndpoint(conn, sendConn)
|
||||
|
||||
var dstAddr net.Addr = conn.pconn4.LocalAddr()
|
||||
sendBuf := make([]byte, 1<<10)
|
||||
|
|
Loading…
Reference in New Issue