From 1ed9bd76d682299376f404521cf1958a7f9bea7a Mon Sep 17 00:00:00 2001 From: Percy Wegmann Date: Tue, 10 Dec 2024 11:52:51 -0600 Subject: [PATCH] prober: perform DERP bandwidth probes over TUN device to mimic real client Updates tailscale/corp#24635 Co-authored-by: Mario Minardi Signed-off-by: Percy Wegmann --- cmd/derpprobe/derpprobe.go | 27 ++-- prober/derp.go | 323 +++++++++++++++++++++++++++++++++++-- prober/tun_darwin.go | 35 ++++ prober/tun_default.go | 18 +++ prober/tun_linux.go | 36 +++++ 5 files changed, 411 insertions(+), 28 deletions(-) create mode 100644 prober/tun_darwin.go create mode 100644 prober/tun_default.go create mode 100644 prober/tun_linux.go diff --git a/cmd/derpprobe/derpprobe.go b/cmd/derpprobe/derpprobe.go index 8f04326b0..620b96609 100644 --- a/cmd/derpprobe/derpprobe.go +++ b/cmd/derpprobe/derpprobe.go @@ -18,18 +18,19 @@ import ( ) var ( - derpMapURL = flag.String("derp-map", "https://login.tailscale.com/derpmap/default", "URL to DERP map (https:// or file://) or 'local' to use the local tailscaled's DERP map") - versionFlag = flag.Bool("version", false, "print version and exit") - listen = flag.String("listen", ":8030", "HTTP listen address") - probeOnce = flag.Bool("once", false, "probe once and print results, then exit; ignores the listen flag") - spread = flag.Bool("spread", true, "whether to spread probing over time") - interval = flag.Duration("interval", 15*time.Second, "probe interval") - meshInterval = flag.Duration("mesh-interval", 15*time.Second, "mesh probe interval") - stunInterval = flag.Duration("stun-interval", 15*time.Second, "STUN probe interval") - tlsInterval = flag.Duration("tls-interval", 15*time.Second, "TLS probe interval") - bwInterval = flag.Duration("bw-interval", 0, "bandwidth probe interval (0 = no bandwidth probing)") - bwSize = flag.Int64("bw-probe-size-bytes", 1_000_000, "bandwidth probe size") - regionCode = flag.String("region-code", "", "probe only this region (e.g. 'lax'); if left blank, all regions will be probed") + derpMapURL = flag.String("derp-map", "https://login.tailscale.com/derpmap/default", "URL to DERP map (https:// or file://) or 'local' to use the local tailscaled's DERP map") + versionFlag = flag.Bool("version", false, "print version and exit") + listen = flag.String("listen", ":8030", "HTTP listen address") + probeOnce = flag.Bool("once", false, "probe once and print results, then exit; ignores the listen flag") + spread = flag.Bool("spread", true, "whether to spread probing over time") + interval = flag.Duration("interval", 15*time.Second, "probe interval") + meshInterval = flag.Duration("mesh-interval", 15*time.Second, "mesh probe interval") + stunInterval = flag.Duration("stun-interval", 15*time.Second, "STUN probe interval") + tlsInterval = flag.Duration("tls-interval", 15*time.Second, "TLS probe interval") + bwInterval = flag.Duration("bw-interval", 0, "bandwidth probe interval (0 = no bandwidth probing)") + bwSize = flag.Int64("bw-probe-size-bytes", 1_000_000, "bandwidth probe size") + bwTUNIPv4Address = flag.String("bw-tun-ipv4-addr", "", "if specified, bandwidth probes will be performed over a TUN device at this address in order to exercise TCP-in-TCP in similar fashion to TCP over Tailscale via DERP. We will use a /30 subnet including this IP address.") + regionCode = flag.String("region-code", "", "probe only this region (e.g. 'lax'); if left blank, all regions will be probed") ) func main() { @@ -46,7 +47,7 @@ func main() { prober.WithTLSProbing(*tlsInterval), } if *bwInterval > 0 { - opts = append(opts, prober.WithBandwidthProbing(*bwInterval, *bwSize)) + opts = append(opts, prober.WithBandwidthProbing(*bwInterval, *bwSize, *bwTUNIPv4Address)) } if *regionCode != "" { opts = append(opts, prober.WithRegion(*regionCode)) diff --git a/prober/derp.go b/prober/derp.go index bce40e34c..8e8e6ac3d 100644 --- a/prober/derp.go +++ b/prober/derp.go @@ -12,20 +12,27 @@ import ( "errors" "expvar" "fmt" + "io" "log" "net" "net/http" + "net/netip" "strconv" "strings" "sync" "time" "github.com/prometheus/client_golang/prometheus" + wgconn "github.com/tailscale/wireguard-go/conn" + "github.com/tailscale/wireguard-go/device" + "github.com/tailscale/wireguard-go/tun" + "go4.org/netipx" "tailscale.com/client/tailscale" "tailscale.com/derp" "tailscale.com/derp/derphttp" "tailscale.com/net/netmon" "tailscale.com/net/stun" + "tailscale.com/net/tstun" "tailscale.com/syncs" "tailscale.com/tailcfg" "tailscale.com/types/key" @@ -42,8 +49,9 @@ type derpProber struct { tlsInterval time.Duration // Optional bandwidth probing. - bwInterval time.Duration - bwProbeSize int64 + bwInterval time.Duration + bwProbeSize int64 + bwTUNIPv4Prefix *netip.Prefix // Optionally restrict probes to a single regionCode. regionCode string @@ -68,11 +76,18 @@ type DERPOpt func(*derpProber) // WithBandwidthProbing enables bandwidth probing. When enabled, a payload of // `size` bytes will be regularly transferred through each DERP server, and each -// pair of DERP servers in every region. -func WithBandwidthProbing(interval time.Duration, size int64) DERPOpt { +// pair of DERP servers in every region. If tunAddress is specified, probes will +// use a TCP connection over a TUN device at this address in order to exercise +// TCP-in-TCP in similar fashion to TCP over Tailscale via DERP +func WithBandwidthProbing(interval time.Duration, size int64, tunAddress string) DERPOpt { return func(d *derpProber) { d.bwInterval = interval d.bwProbeSize = size + prefix, err := netip.ParsePrefix(fmt.Sprintf("%s/30", tunAddress)) + if err != nil { + log.Fatalf("failed to parse IP prefix from bw-tun-ipv4-addr: %v", err) + } + d.bwTUNIPv4Prefix = &prefix } } @@ -200,7 +215,11 @@ func (d *derpProber) probeMapFn(ctx context.Context) error { n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name) wantProbes[n] = true if d.probes[n] == nil { - log.Printf("adding DERP bandwidth probe for %s->%s (%s) %v bytes every %v", server.Name, to.Name, region.RegionName, d.bwProbeSize, d.bwInterval) + tunString := "" + if d.bwTUNIPv4Prefix != nil { + tunString = " (TUN)" + } + log.Printf("adding%s DERP bandwidth probe for %s->%s (%s) %v bytes every %v", tunString, server.Name, to.Name, region.RegionName, d.bwProbeSize, d.bwInterval) d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize)) } } @@ -251,21 +270,24 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass { if from == to { derpPath = "single" } - var transferTime expvar.Float + var transferTimeSeconds expvar.Float return ProbeClass{ Probe: func(ctx context.Context) error { fromN, toN, err := d.getNodePair(from, to) if err != nil { return err } - return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size, &transferTime) + return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size, &transferTimeSeconds, d.bwTUNIPv4Prefix) + }, + Class: "derp_bw", + Labels: Labels{ + "derp_path": derpPath, + "tcp_in_tcp": strconv.FormatBool(d.bwTUNIPv4Prefix != nil), }, - Class: "derp_bw", - Labels: Labels{"derp_path": derpPath}, Metrics: func(l prometheus.Labels) []prometheus.Metric { return []prometheus.Metric{ prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_size_bytes", "Payload size of the bandwidth prober", nil, l), prometheus.GaugeValue, float64(size)), - prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_transfer_time_seconds_total", "Time it took to transfer data", nil, l), prometheus.CounterValue, transferTime.Value()), + prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_transfer_time_seconds_total", "Time it took to transfer data", nil, l), prometheus.CounterValue, transferTimeSeconds.Value()), } }, } @@ -412,8 +434,10 @@ func derpProbeUDP(ctx context.Context, ipStr string, port int) error { } // derpProbeBandwidth sends a payload of a given size between two local -// DERP clients connected to two DERP servers. -func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTime *expvar.Float) (err error) { +// DERP clients connected to two DERP servers.If tunIPv4Address is specified, +// probes will use a TCP connection over a TUN device at this address in order +// to exercise TCP-in-TCP in similar fashion to TCP over Tailscale via DERP. +func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTimeSeconds *expvar.Float, tunIPv4Prefix *netip.Prefix) (err error) { // This probe uses clients with isProber=false to avoid spamming the derper logs with every packet // sent by the bandwidth probe. fromc, err := newConn(ctx, dm, from, false) @@ -434,10 +458,13 @@ func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tail time.Sleep(100 * time.Millisecond) // pretty arbitrary } - start := time.Now() - defer func() { transferTime.Add(time.Since(start).Seconds()) }() + if tunIPv4Prefix != nil { + err = derpProbeBandwidthTUN(ctx, transferTimeSeconds, from, to, fromc, toc, size, tunIPv4Prefix) + } else { + err = derpProbeBandwidthDirect(ctx, transferTimeSeconds, from, to, fromc, toc, size) + } - if err := runDerpProbeNodePair(ctx, from, to, fromc, toc, size); err != nil { + if err != nil { // Record pubkeys on failed probes to aid investigation. return fmt.Errorf("%s -> %s: %w", fromc.SelfPublicKey().ShortString(), @@ -577,6 +604,272 @@ func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc return nil } +// derpProbeBandwidthDirect takes two DERP clients (fromc and toc) connected to two +// DERP servers (from and to) and sends a test payload of a given size from one +// to another using runDerpProbeNodePair. The time taken to finish the transfer is +// recorded in `transferTimeSeconds`. +func derpProbeBandwidthDirect(ctx context.Context, transferTimeSeconds *expvar.Float, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64) error { + start := time.Now() + defer func() { transferTimeSeconds.Add(time.Since(start).Seconds()) }() + + return runDerpProbeNodePair(ctx, from, to, fromc, toc, size) +} + +// derpProbeBandwidthTUNMu ensures that TUN bandwidth probes don't run concurrently. +// This is necessary to avoid conflicts trying to create the TUN device, and +// it also has the nice benefit of preventing concurrent bandwidth probes from +// influencing each other's results. +// +// This guards derpProbeBandwidthTUN. +var derpProbeBandwidthTUNMu sync.Mutex + +// derpProbeBandwidthTUN takes two DERP clients (fromc and toc) connected to two +// DERP servers (from and to) and sends a test payload of a given size from one +// to another over a TUN device at an address at the start of the usable host IP +// range that the given tunAddress lives in. The time taken to finish the transfer +// is recorded in `transferTimeSeconds`. +func derpProbeBandwidthTUN(ctx context.Context, transferTimeSeconds *expvar.Float, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64, prefix *netip.Prefix) error { + // Make sure all goroutines have finished. + var wg sync.WaitGroup + defer wg.Wait() + + // Close the clients to make sure goroutines that are reading/writing from them terminate. + defer fromc.Close() + defer toc.Close() + + ipRange := netipx.RangeOfPrefix(*prefix) + // Start of the usable host IP range from the address we have been passed in. + ifAddr := ipRange.From().Next() + // Destination address to dial. This is the next address in the range from + // our ifAddr to ensure that the underlying networking stack is actually being + // utilized instead of being optimized away and treated as a loopback. Packets + // sent to this address will be routed over the TUN. + destinationAddr := ifAddr.Next() + + derpProbeBandwidthTUNMu.Lock() + defer derpProbeBandwidthTUNMu.Unlock() + + // Temporarily set up a TUN device with which to simulate a real client TCP connection + // tunneling over DERP. Use `tstun.DefaultTUNMTU()` (e.g., 1280) as our MTU as this is + // the minimum safe MTU used by Tailscale. + dev, err := tun.CreateTUN(tunName, int(tstun.DefaultTUNMTU())) + if err != nil { + return fmt.Errorf("failed to create TUN device: %w", err) + } + defer func() { + if err := dev.Close(); err != nil { + log.Printf("failed to close TUN device: %s", err) + } + }() + mtu, err := dev.MTU() + if err != nil { + return fmt.Errorf("failed to get TUN MTU: %w", err) + } + + name, err := dev.Name() + if err != nil { + return fmt.Errorf("failed to get device name: %w", err) + } + + // Perform platform specific configuration of the TUN device. + err = configureTUN(*prefix, name) + if err != nil { + return fmt.Errorf("failed to configure tun: %w", err) + } + + // Depending on platform, we need some space for headers at the front + // of TUN I/O op buffers. The below constant is more than enough space + // for any platform that this might run on. + tunStartOffset := device.MessageTransportHeaderSize + + // This goroutine reads packets from the TUN device and evaluates if they + // are IPv4 packets destined for loopback via DERP. If so, it performs L3 NAT + // (swap src/dst) and writes them towards DERP in order to loopback via the + // `toc` DERP client. It only reports errors to `tunReadErrC`. + wg.Add(1) + tunReadErrC := make(chan error, 1) + go func() { + defer wg.Done() + + numBufs := wgconn.IdealBatchSize + bufs := make([][]byte, 0, numBufs) + sizes := make([]int, numBufs) + for range numBufs { + bufs = append(bufs, make([]byte, mtu+tunStartOffset)) + } + + destinationAddrBytes := destinationAddr.AsSlice() + scratch := make([]byte, 4) + for { + n, err := dev.Read(bufs, sizes, tunStartOffset) + if err != nil { + tunReadErrC <- err + return + } + + for i := range n { + pkt := bufs[i][tunStartOffset : sizes[i]+tunStartOffset] + // Skip everything except valid IPv4 packets + if len(pkt) < 20 { + // Doesn't even have a full IPv4 header + continue + } + if pkt[0]>>4 != 4 { + // Not IPv4 + continue + } + + if !bytes.Equal(pkt[16:20], destinationAddrBytes) { + // Unexpected dst address + continue + } + + copy(scratch, pkt[12:16]) + copy(pkt[12:16], pkt[16:20]) + copy(pkt[16:20], scratch) + + if err := fromc.Send(toc.SelfPublicKey(), pkt); err != nil { + tunReadErrC <- err + return + } + } + } + }() + + // This goroutine reads packets from the `toc` DERP client and writes them towards the TUN. + // It only reports errors to `recvErrC` channel. + wg.Add(1) + recvErrC := make(chan error, 1) + go func() { + defer wg.Done() + + buf := make([]byte, mtu+tunStartOffset) + bufs := make([][]byte, 1) + + for { + m, err := toc.Recv() + if err != nil { + recvErrC <- fmt.Errorf("failed to receive: %w", err) + return + } + switch v := m.(type) { + case derp.ReceivedPacket: + if v.Source != fromc.SelfPublicKey() { + recvErrC <- fmt.Errorf("got data packet from unexpected source, %v", v.Source) + return + } + pkt := v.Data + copy(buf[tunStartOffset:], pkt) + bufs[0] = buf[:len(pkt)+tunStartOffset] + if _, err := dev.Write(bufs, tunStartOffset); err != nil { + recvErrC <- fmt.Errorf("failed to write to TUN device: %w", err) + return + } + case derp.KeepAliveMessage: + // Silently ignore. + default: + log.Printf("%v: ignoring Recv frame type %T", to.Name, v) + // Loop. + } + } + }() + + // Start a listener to receive the data + l, err := net.Listen("tcp", net.JoinHostPort(ifAddr.String(), "0")) + if err != nil { + return fmt.Errorf("failed to listen: %s", err) + } + defer l.Close() + + // 128KB by default + const writeChunkSize = 128 << 10 + + randData := make([]byte, writeChunkSize) + _, err = crand.Read(randData) + if err != nil { + return fmt.Errorf("failed to initialize random data: %w", err) + } + + // Dial ourselves + _, port, err := net.SplitHostPort(l.Addr().String()) + if err != nil { + return fmt.Errorf("failed to split address %q: %w", l.Addr().String(), err) + } + + connAddr := net.JoinHostPort(destinationAddr.String(), port) + conn, err := net.Dial("tcp", connAddr) + if err != nil { + return fmt.Errorf("failed to dial address %q: %w", connAddr, err) + } + defer conn.Close() + + // Timing only includes the actual sending and receiving of data. + start := time.Now() + + // This goroutine reads data from the TCP stream being looped back via DERP. + // It reports to `readFinishedC` when `size` bytes have been read, or if an + // error occurs. + wg.Add(1) + readFinishedC := make(chan error, 1) + go func() { + defer wg.Done() + + readConn, err := l.Accept() + if err != nil { + readFinishedC <- err + } + defer readConn.Close() + deadline, ok := ctx.Deadline() + if ok { + // Don't try reading past our context's deadline. + if err := readConn.SetReadDeadline(deadline); err != nil { + readFinishedC <- fmt.Errorf("unable to set read deadline: %w", err) + } + } + _, err = io.CopyN(io.Discard, readConn, size) + // Measure transfer time irrespective of whether it succeeded or failed. + transferTimeSeconds.Add(time.Since(start).Seconds()) + readFinishedC <- err + }() + + // This goroutine sends data to the TCP stream being looped back via DERP. + // It only reports errors to `sendErrC`. + wg.Add(1) + sendErrC := make(chan error, 1) + go func() { + defer wg.Done() + + for wrote := 0; wrote < int(size); wrote += len(randData) { + b := randData + if wrote+len(randData) > int(size) { + // This is the last chunk and we don't need the whole thing + b = b[0 : int(size)-wrote] + } + if _, err := conn.Write(b); err != nil { + sendErrC <- fmt.Errorf("failed to write to conn: %w", err) + return + } + } + }() + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout: %w", ctx.Err()) + case err := <-tunReadErrC: + return fmt.Errorf("error reading from TUN via %q: %w", from.Name, err) + case err := <-sendErrC: + return fmt.Errorf("error sending via %q: %w", from.Name, err) + case err := <-recvErrC: + return fmt.Errorf("error receiving from %q: %w", to.Name, err) + case err := <-readFinishedC: + if err != nil { + return fmt.Errorf("error reading from %q to TUN: %w", to.Name, err) + } + } + + return nil +} + func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode, isProber bool) (*derphttp.Client, error) { // To avoid spamming the log with regular connection messages. l := logger.Filtered(log.Printf, func(s string) bool { diff --git a/prober/tun_darwin.go b/prober/tun_darwin.go new file mode 100644 index 000000000..0ef22e41e --- /dev/null +++ b/prober/tun_darwin.go @@ -0,0 +1,35 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build darwin + +package prober + +import ( + "fmt" + "net/netip" + "os/exec" + + "go4.org/netipx" +) + +const tunName = "utun" + +func configureTUN(addr netip.Prefix, tunname string) error { + cmd := exec.Command("ifconfig", tunname, "inet", addr.String(), addr.Addr().String()) + res, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to add address: %w (%s)", err, string(res)) + } + + net := netipx.PrefixIPNet(addr) + nip := net.IP.Mask(net.Mask) + nstr := fmt.Sprintf("%v/%d", nip, addr.Bits()) + cmd = exec.Command("route", "-q", "-n", "add", "-inet", nstr, "-iface", addr.Addr().String()) + res, err = cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to add route: %w (%s)", err, string(res)) + } + + return nil +} diff --git a/prober/tun_default.go b/prober/tun_default.go new file mode 100644 index 000000000..93a5b07fd --- /dev/null +++ b/prober/tun_default.go @@ -0,0 +1,18 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !linux && !darwin + +package prober + +import ( + "fmt" + "net/netip" + "runtime" +) + +const tunName = "unused" + +func configureTUN(addr netip.Prefix, tunname string) error { + return fmt.Errorf("not implemented on " + runtime.GOOS) +} diff --git a/prober/tun_linux.go b/prober/tun_linux.go new file mode 100644 index 000000000..52a31efbb --- /dev/null +++ b/prober/tun_linux.go @@ -0,0 +1,36 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build linux + +package prober + +import ( + "fmt" + "net/netip" + + "github.com/tailscale/netlink" + "go4.org/netipx" +) + +const tunName = "derpprobe" + +func configureTUN(addr netip.Prefix, tunname string) error { + link, err := netlink.LinkByName(tunname) + if err != nil { + return fmt.Errorf("failed to look up link %q: %w", tunname, err) + } + + // We need to bring the TUN device up before assigning an address. This + // allows the OS to automatically create a route for it. Otherwise, we'd + // have to manually create the route. + if err := netlink.LinkSetUp(link); err != nil { + return fmt.Errorf("failed to bring tun %q up: %w", tunname, err) + } + + if err := netlink.AddrReplace(link, &netlink.Addr{IPNet: netipx.PrefixIPNet(addr)}); err != nil { + return fmt.Errorf("failed to add address: %w", err) + } + + return nil +}