diff --git a/prober/derp.go b/prober/derp.go index ad8cf7066..e7ea60f9c 100644 --- a/prober/derp.go +++ b/prober/derp.go @@ -10,9 +10,9 @@ import ( crand "crypto/rand" "encoding/json" "errors" + "expvar" "fmt" "log" - "maps" "net" "net/http" "strconv" @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" "tailscale.com/derp" "tailscale.com/derp/derphttp" "tailscale.com/net/stun" @@ -42,11 +43,14 @@ type derpProber struct { bwInterval time.Duration bwProbeSize int64 - // Probe functions that can be overridden for testing. - tlsProbeFn func(string) ProbeFunc - udpProbeFn func(string, int) ProbeFunc - meshProbeFn func(string, string) ProbeFunc - bwProbeFn func(string, string, int64) ProbeFunc + // Probe class for fetching & updating the DERP map. + ProbeMap ProbeClass + + // Probe classes for probing individual derpers. + tlsProbeFn func(string) ProbeClass + udpProbeFn func(string, int) ProbeClass + meshProbeFn func(string, string) ProbeClass + bwProbeFn func(string, string, int64) ProbeClass sync.Mutex lastDERPMap *tailcfg.DERPMap @@ -100,6 +104,10 @@ func DERP(p *Prober, derpMapURL string, opts ...DERPOpt) (*derpProber, error) { nodes: make(map[string]*tailcfg.DERPNode), probes: make(map[string]*Probe), } + d.ProbeMap = ProbeClass{ + Probe: d.probeMapFn, + Labels: Labels{"class": "derp_map"}, + } for _, o := range opts { o(d) } @@ -109,10 +117,10 @@ func DERP(p *Prober, derpMapURL string, opts ...DERPOpt) (*derpProber, error) { return d, nil } -// ProbeMap fetches the DERPMap and creates/destroys probes for each +// probeMapFn fetches the DERPMap and creates/destroys probes for each // DERP server as necessary. It should get regularly executed as a // probe function itself. -func (d *derpProber) ProbeMap(ctx context.Context) error { +func (d *derpProber) probeMapFn(ctx context.Context) error { if err := d.updateMap(ctx); err != nil { return err } @@ -123,7 +131,7 @@ func (d *derpProber) ProbeMap(ctx context.Context) error { for _, region := range d.lastDERPMap.Regions { for _, server := range region.Nodes { - labels := map[string]string{ + labels := Labels{ "region": region.RegionCode, "region_id": strconv.Itoa(region.RegionID), "hostname": server.HostName, @@ -169,18 +177,11 @@ func (d *derpProber) ProbeMap(ctx context.Context) error { } if d.bwInterval > 0 && d.bwProbeSize > 0 { - bwLabels := maps.Clone(labels) - bwLabels["probe_size_bytes"] = fmt.Sprintf("%d", d.bwProbeSize) - if server.Name == to.Name { - bwLabels["derp_path"] = "single" - } else { - bwLabels["derp_path"] = "mesh" - } n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name) wantProbes[n] = true if d.probes[n] == nil { log.Printf("adding DERP bandwidth probe for %s->%s (%s) %v bytes every %v", server.Name, to.Name, region.RegionName, d.bwProbeSize, d.bwInterval) - d.probes[n] = d.p.Run(n, d.bwInterval, bwLabels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize)) + d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize)) } } } @@ -198,32 +199,61 @@ func (d *derpProber) ProbeMap(ctx context.Context) error { return nil } -// probeMesh returs a probe func that sends a test packet through a pair of DERP +// probeMesh returs a probe class that sends a test packet through a pair of DERP // servers (or just one server, if 'from' and 'to' are the same). 'from' and 'to' // are expected to be names (DERPNode.Name) of two DERP servers in the same region. -func (d *derpProber) probeMesh(from, to string) ProbeFunc { - return func(ctx context.Context) error { - fromN, toN, err := d.getNodePair(from, to) - if err != nil { - return err - } +func (d *derpProber) probeMesh(from, to string) ProbeClass { + labels := Labels{ + "class": "derp_mesh", + } + if from == to { + labels["derp_path"] = "single" + } else { + labels["derp_path"] = "mesh" + } + return ProbeClass{ + Probe: func(ctx context.Context) error { + fromN, toN, err := d.getNodePair(from, to) + if err != nil { + return err + } - dm := d.lastDERPMap - return derpProbeNodePair(ctx, dm, fromN, toN) + dm := d.lastDERPMap + return derpProbeNodePair(ctx, dm, fromN, toN) + }, + Labels: labels, } } -// probeBandwidth returs a probe func that sends a payload of a given size +// probeBandwidth returs a probe class that sends a payload of a given size // through a pair of DERP servers (or just one server, if 'from' and 'to' are // the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two // DERP servers in the same region. -func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeFunc { - return func(ctx context.Context) error { - fromN, toN, err := d.getNodePair(from, to) - if err != nil { - return err - } - return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size) +func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass { + labels := Labels{ + "class": "derp_bw", + } + if from == to { + labels["derp_path"] = "single" + } else { + labels["derp_path"] = "mesh" + } + var transferTime expvar.Float + return ProbeClass{ + Probe: func(ctx context.Context) error { + fromN, toN, err := d.getNodePair(from, to) + if err != nil { + return err + } + return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size, &transferTime) + }, + Labels: labels, + Metrics: func(l prometheus.Labels) []prometheus.Metric { + return []prometheus.Metric{ + prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_size_bytes", "Payload size of the bandwidth prober", nil, l), prometheus.GaugeValue, float64(size)), + prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_transfer_time_seconds_total", "Time it took to transfer data", nil, l), prometheus.CounterValue, transferTime.Value()), + } + }, } } @@ -289,9 +319,12 @@ func (d *derpProber) updateMap(ctx context.Context) error { return nil } -func (d *derpProber) ProbeUDP(ipaddr string, port int) ProbeFunc { - return func(ctx context.Context) error { - return derpProbeUDP(ctx, ipaddr, port) +func (d *derpProber) ProbeUDP(ipaddr string, port int) ProbeClass { + return ProbeClass{ + Probe: func(ctx context.Context) error { + return derpProbeUDP(ctx, ipaddr, port) + }, + Labels: Labels{"class": "derp_udp"}, } } @@ -347,7 +380,7 @@ func derpProbeUDP(ctx context.Context, ipStr string, port int) error { // derpProbeBandwidth sends a payload of a given size between two local // DERP clients connected to two DERP servers. -func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64) (err error) { +func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTime *expvar.Float) (err error) { // This probe uses clients with isProber=false to avoid spamming the derper logs with every packet // sent by the bandwidth probe. fromc, err := newConn(ctx, dm, from, false) @@ -368,6 +401,9 @@ func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tail time.Sleep(100 * time.Millisecond) // pretty arbitrary } + start := time.Now() + defer func() { transferTime.Add(time.Since(start).Seconds()) }() + if err := runDerpProbeNodePair(ctx, from, to, fromc, toc, size); err != nil { // Record pubkeys on failed probes to aid investigation. return fmt.Errorf("%s -> %s: %w", diff --git a/prober/derp_test.go b/prober/derp_test.go index 5674f9653..138852b91 100644 --- a/prober/derp_test.go +++ b/prober/derp_test.go @@ -60,16 +60,16 @@ func TestDerpProber(t *testing.T) { p: p, derpMapURL: srv.URL, tlsInterval: time.Second, - tlsProbeFn: func(_ string) ProbeFunc { return func(context.Context) error { return nil } }, + tlsProbeFn: func(_ string) ProbeClass { return FuncProbe(func(context.Context) error { return nil }) }, udpInterval: time.Second, - udpProbeFn: func(_ string, _ int) ProbeFunc { return func(context.Context) error { return nil } }, + udpProbeFn: func(_ string, _ int) ProbeClass { return FuncProbe(func(context.Context) error { return nil }) }, meshInterval: time.Second, - meshProbeFn: func(_, _ string) ProbeFunc { return func(context.Context) error { return nil } }, + meshProbeFn: func(_, _ string) ProbeClass { return FuncProbe(func(context.Context) error { return nil }) }, nodes: make(map[string]*tailcfg.DERPNode), probes: make(map[string]*Probe), } - if err := dp.ProbeMap(context.Background()); err != nil { - t.Errorf("unexpected ProbeMap() error: %s", err) + if err := dp.probeMapFn(context.Background()); err != nil { + t.Errorf("unexpected probeMapFn() error: %s", err) } if len(dp.nodes) != 2 || dp.nodes["n1"] == nil || dp.nodes["n2"] == nil { t.Errorf("unexpected nodes: %+v", dp.nodes) @@ -89,8 +89,8 @@ func TestDerpProber(t *testing.T) { IPv4: "1.1.1.1", IPv6: "::1", }) - if err := dp.ProbeMap(context.Background()); err != nil { - t.Errorf("unexpected ProbeMap() error: %s", err) + if err := dp.probeMapFn(context.Background()); err != nil { + t.Errorf("unexpected probeMapFn() error: %s", err) } if len(dp.nodes) != 3 { t.Errorf("unexpected nodes: %+v", dp.nodes) @@ -102,8 +102,8 @@ func TestDerpProber(t *testing.T) { // Remove 2 nodes and check that probes have been destroyed. dm.Regions[0].Nodes = dm.Regions[0].Nodes[:1] - if err := dp.ProbeMap(context.Background()); err != nil { - t.Errorf("unexpected ProbeMap() error: %s", err) + if err := dp.probeMapFn(context.Background()); err != nil { + t.Errorf("unexpected probeMapFn() error: %s", err) } if len(dp.nodes) != 1 { t.Errorf("unexpected nodes: %+v", dp.nodes) diff --git a/prober/http.go b/prober/http.go index 5c3355e46..ac9374a01 100644 --- a/prober/http.go +++ b/prober/http.go @@ -13,14 +13,17 @@ import ( const maxHTTPBody = 4 << 20 // MiB -// HTTP returns a Probe that healthchecks an HTTP URL. +// HTTP returns a ProbeClass that healthchecks an HTTP URL. // -// The ProbeFunc sends a GET request for url, expects an HTTP 200 +// The probe function sends a GET request for url, expects an HTTP 200 // response, and verifies that want is present in the response // body. -func HTTP(url, wantText string) ProbeFunc { - return func(ctx context.Context) error { - return probeHTTP(ctx, url, []byte(wantText)) +func HTTP(url, wantText string) ProbeClass { + return ProbeClass{ + Probe: func(ctx context.Context) error { + return probeHTTP(ctx, url, []byte(wantText)) + }, + Labels: Labels{"class": "http"}, } } diff --git a/prober/prober.go b/prober/prober.go index 35e8b12a3..3cb0efa9a 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -12,6 +12,7 @@ import ( "fmt" "hash/fnv" "log" + "maps" "math/rand" "sync" "time" @@ -19,10 +20,29 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// ProbeFunc is a function that probes something and reports whether -// the probe succeeded. The provided context's deadline must be obeyed -// for correct probe scheduling. -type ProbeFunc func(context.Context) error +// ProbeClass defines a probe of a specific type: a probing function that will +// be regularly ran, and metric labels that will be added automatically to all +// probes using this class. +type ProbeClass struct { + // Probe is a function that probes something and reports whether the Probe + // succeeded. The provided context's deadline must be obeyed for correct + // Probe scheduling. + Probe func(context.Context) error + + // Labels defines a set of metric labels that will be added to all metrics + // exposed by this probe class. + Labels Labels + + // Metrics allows a probe class to export custom Metrics. Can be nil. + Metrics func(prometheus.Labels) []prometheus.Metric +} + +// FuncProbe wraps a simple probe function in a ProbeClass. +func FuncProbe(fn func(context.Context) error) ProbeClass { + return ProbeClass{ + Probe: fn, + } +} // a Prober manages a set of probes and keeps track of their results. type Prober struct { @@ -61,10 +81,10 @@ func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Pro return p } -// Run executes fun every interval, and exports probe results under probeName. +// Run executes probe class function every interval, and exports probe results under probeName. // // Registering a probe under an already-registered name panics. -func (p *Prober) Run(name string, interval time.Duration, labels map[string]string, fun ProbeFunc) *Probe { +func (p *Prober) Run(name string, interval time.Duration, labels Labels, pc ProbeClass) *Probe { p.mu.Lock() defer p.mu.Unlock() if _, ok := p.probes[name]; ok { @@ -72,6 +92,9 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri } l := prometheus.Labels{"name": name} + for k, v := range pc.Labels { + l[k] = v + } for k, v := range labels { l[k] = v } @@ -84,10 +107,11 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri stopped: make(chan struct{}), name: name, - doProbe: fun, + probeClass: pc, interval: interval, initialDelay: initialDelay(name, interval), metrics: prometheus.NewRegistry(), + metricLabels: l, mInterval: prometheus.NewDesc("interval_secs", "Probe interval in seconds", nil, l), mStartTime: prometheus.NewDesc("start_secs", "Latest probe start time (seconds since epoch)", nil, l), mEndTime: prometheus.NewDesc("end_secs", "Latest probe end time (seconds since epoch)", nil, l), @@ -177,7 +201,7 @@ type Probe struct { stopped chan struct{} // closed when shutdown is complete name string - doProbe ProbeFunc + probeClass ProbeClass interval time.Duration initialDelay time.Duration tick ticker @@ -185,14 +209,15 @@ type Probe struct { // metrics is a Prometheus metrics registry for metrics exported by this probe. // Using a separate registry allows cleanly removing metrics exported by this // probe when it gets unregistered. - metrics *prometheus.Registry - mInterval *prometheus.Desc - mStartTime *prometheus.Desc - mEndTime *prometheus.Desc - mLatency *prometheus.Desc - mResult *prometheus.Desc - mAttempts *prometheus.CounterVec - mSeconds *prometheus.CounterVec + metrics *prometheus.Registry + metricLabels prometheus.Labels + mInterval *prometheus.Desc + mStartTime *prometheus.Desc + mEndTime *prometheus.Desc + mLatency *prometheus.Desc + mResult *prometheus.Desc + mAttempts *prometheus.CounterVec + mSeconds *prometheus.CounterVec mu sync.Mutex start time.Time // last time doProbe started @@ -268,7 +293,7 @@ func (p *Probe) run() { ctx, cancel := context.WithTimeout(p.ctx, timeout) defer cancel() - err := p.doProbe(ctx) + err := p.probeClass.Probe(ctx) p.recordEnd(start, err) if err != nil { log.Printf("probe %s: %v", p.name, err) @@ -349,6 +374,11 @@ func (p *Probe) Describe(ch chan<- *prometheus.Desc) { ch <- p.mLatency p.mAttempts.Describe(ch) p.mSeconds.Describe(ch) + if p.probeClass.Metrics != nil { + for _, m := range p.probeClass.Metrics(p.metricLabels) { + ch <- m.Desc() + } + } } // Collect implements prometheus.Collector. @@ -373,6 +403,11 @@ func (p *Probe) Collect(ch chan<- prometheus.Metric) { } p.mAttempts.Collect(ch) p.mSeconds.Collect(ch) + if p.probeClass.Metrics != nil { + for _, m := range p.probeClass.Metrics(p.metricLabels) { + ch <- m + } + } } // ticker wraps a time.Ticker in a way that can be faked for tests. @@ -401,3 +436,12 @@ func initialDelay(seed string, interval time.Duration) time.Duration { r := rand.New(rand.NewSource(int64(h.Sum64()))).Float64() return time.Duration(float64(interval) * r) } + +// Labels is a set of metric labels used by a prober. +type Labels map[string]string + +func (l Labels) With(k, v string) Labels { + new := maps.Clone(l) + new[k] = v + return new +} diff --git a/prober/prober_test.go b/prober/prober_test.go index 0eedf01cc..63d4236ed 100644 --- a/prober/prober_test.go +++ b/prober/prober_test.go @@ -51,10 +51,10 @@ func TestProberTiming(t *testing.T) { } } - p.Run("test-probe", probeInterval, nil, func(context.Context) error { + p.Run("test-probe", probeInterval, nil, FuncProbe(func(context.Context) error { invoked <- struct{}{} return nil - }) + })) waitActiveProbes(t, p, clk, 1) @@ -93,10 +93,10 @@ func TestProberTimingSpread(t *testing.T) { } } - probe := p.Run("test-spread-probe", probeInterval, nil, func(context.Context) error { + probe := p.Run("test-spread-probe", probeInterval, nil, FuncProbe(func(context.Context) error { invoked <- struct{}{} return nil - }) + })) waitActiveProbes(t, p, clk, 1) @@ -156,12 +156,12 @@ func TestProberRun(t *testing.T) { var probes []*Probe for i := 0; i < startingProbes; i++ { - probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, nil, func(context.Context) error { + probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, nil, FuncProbe(func(context.Context) error { mu.Lock() defer mu.Unlock() cnt++ return nil - })) + }))) } checkCnt := func(want int) { @@ -207,13 +207,13 @@ func TestPrometheus(t *testing.T) { p := newForTest(clk.Now, clk.NewTicker).WithMetricNamespace("probe") var succeed atomic.Bool - p.Run("testprobe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error { + p.Run("testprobe", probeInterval, map[string]string{"label": "value"}, FuncProbe(func(context.Context) error { clk.Advance(aFewMillis) if succeed.Load() { return nil } return errors.New("failing, as instructed by test") - }) + })) waitActiveProbes(t, p, clk, 1) @@ -274,14 +274,14 @@ func TestOnceMode(t *testing.T) { clk := newFakeTime() p := newForTest(clk.Now, clk.NewTicker).WithOnce(true) - p.Run("probe1", probeInterval, nil, func(context.Context) error { return nil }) - p.Run("probe2", probeInterval, nil, func(context.Context) error { return fmt.Errorf("error2") }) - p.Run("probe3", probeInterval, nil, func(context.Context) error { - p.Run("probe4", probeInterval, nil, func(context.Context) error { + p.Run("probe1", probeInterval, nil, FuncProbe(func(context.Context) error { return nil })) + p.Run("probe2", probeInterval, nil, FuncProbe(func(context.Context) error { return fmt.Errorf("error2") })) + p.Run("probe3", probeInterval, nil, FuncProbe(func(context.Context) error { + p.Run("probe4", probeInterval, nil, FuncProbe(func(context.Context) error { return fmt.Errorf("error4") - }) + })) return nil - }) + })) p.Wait() wantCount := 4 diff --git a/prober/tcp.go b/prober/tcp.go index 9f35a2697..b7e55aced 100644 --- a/prober/tcp.go +++ b/prober/tcp.go @@ -12,9 +12,12 @@ import ( // TCP returns a Probe that healthchecks a TCP endpoint. // // The ProbeFunc reports whether it can successfully connect to addr. -func TCP(addr string) ProbeFunc { - return func(ctx context.Context) error { - return probeTCP(ctx, addr) +func TCP(addr string) ProbeClass { + return ProbeClass{ + Probe: func(ctx context.Context) error { + return probeTCP(ctx, addr) + }, + Labels: Labels{"class": "tcp"}, } } diff --git a/prober/tls.go b/prober/tls.go index db25c9fd5..d8850d3e7 100644 --- a/prober/tls.go +++ b/prober/tls.go @@ -27,22 +27,28 @@ const expiresSoon = 7 * 24 * time.Hour // 7 days from now // The ProbeFunc connects to a hostPort (host:port string), does a TLS // handshake, verifies that the hostname matches the presented certificate, // checks certificate validity time and OCSP revocation status. -func TLS(hostPort string) ProbeFunc { - return func(ctx context.Context) error { - certDomain, _, err := net.SplitHostPort(hostPort) - if err != nil { - return err - } - return probeTLS(ctx, certDomain, hostPort) +func TLS(hostPort string) ProbeClass { + return ProbeClass{ + Probe: func(ctx context.Context) error { + certDomain, _, err := net.SplitHostPort(hostPort) + if err != nil { + return err + } + return probeTLS(ctx, certDomain, hostPort) + }, + Labels: Labels{"class": "tls"}, } } // TLSWithIP is like TLS, but dials the provided dialAddr instead // of using DNS resolution. The certDomain is the expected name in // the cert (and the SNI name to send). -func TLSWithIP(certDomain string, dialAddr netip.AddrPort) ProbeFunc { - return func(ctx context.Context) error { - return probeTLS(ctx, certDomain, dialAddr.String()) +func TLSWithIP(certDomain string, dialAddr netip.AddrPort) ProbeClass { + return ProbeClass{ + Probe: func(ctx context.Context) error { + return probeTLS(ctx, certDomain, dialAddr.String()) + }, + Labels: Labels{"class": "tls"}, } }