From c153e6ae2fe4b48af82f6e8e6d934f9f5237dad9 Mon Sep 17 00:00:00 2001 From: Anton Tolchanov Date: Mon, 3 Apr 2023 11:35:12 +0100 Subject: [PATCH] prober: migrate to Prometheus metric library This provides an example of using native Prometheus metrics with tsweb. Prober library seems to be the only user of PrometheusVar, so I am removing support for it in tsweb. Updates https://github.com/tailscale/corp/issues/10205 Signed-off-by: Anton Tolchanov --- cmd/derpprobe/derpprobe.go | 4 +- prober/prober.go | 169 ++++++++++++++++--------------------- prober/prober_test.go | 150 ++++++++++---------------------- tsweb/tsweb.go | 12 --- tsweb/tsweb_test.go | 17 ---- 5 files changed, 122 insertions(+), 230 deletions(-) diff --git a/cmd/derpprobe/derpprobe.go b/cmd/derpprobe/derpprobe.go index 3a295250b..6217eece3 100644 --- a/cmd/derpprobe/derpprobe.go +++ b/cmd/derpprobe/derpprobe.go @@ -5,7 +5,6 @@ package main import ( - "expvar" "flag" "fmt" "html" @@ -30,7 +29,7 @@ var ( func main() { flag.Parse() - p := prober.New().WithSpread(*spread).WithOnce(*probeOnce) + p := prober.New().WithSpread(*spread).WithOnce(*probeOnce).WithMetricNamespace("derpprobe") dp, err := prober.DERP(p, *derpMapURL, *interval, *interval, *interval) if err != nil { log.Fatal(err) @@ -53,7 +52,6 @@ func main() { mux := http.NewServeMux() tsweb.Debugger(mux) - expvar.Publish("derpprobe", p.Expvar()) mux.HandleFunc("/", http.HandlerFunc(serveFunc(p))) log.Fatal(http.ListenAndServe(*listen, mux)) } diff --git a/prober/prober.go b/prober/prober.go index 4a4fdf8b3..06891fe44 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -8,18 +8,15 @@ package prober import ( "context" - "encoding/json" "errors" - "expvar" "fmt" "hash/fnv" - "io" "log" "math/rand" - "sort" - "strings" "sync" "time" + + "github.com/prometheus/client_golang/prometheus" ) // ProbeFunc is a function that probes something and reports whether @@ -42,6 +39,9 @@ type Prober struct { mu sync.Mutex // protects all following fields probes map[string]*Probe + + namespace string + metrics *prometheus.Registry } // New returns a new Prober. @@ -50,21 +50,15 @@ func New() *Prober { } func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Prober { - return &Prober{ + p := &Prober{ now: now, newTicker: newTicker, probes: map[string]*Probe{}, + metrics: prometheus.NewRegistry(), + namespace: "prober", } -} - -// Expvar returns the metrics for running probes. -func (p *Prober) Expvar() expvar.Var { - return varExporter{p} -} - -// ProbeInfo returns information about most recent probe runs. -func (p *Prober) ProbeInfo() map[string]ProbeInfo { - return varExporter{p}.probeInfo() + prometheus.DefaultRegisterer.MustRegister(p.metrics) + return p } // Run executes fun every interval, and exports probe results under probeName. @@ -77,6 +71,11 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri panic(fmt.Sprintf("probe named %q already registered", name)) } + l := prometheus.Labels{"name": name} + for k, v := range labels { + l[k] = v + } + ctx, cancel := context.WithCancel(context.Background()) probe := &Probe{ prober: p, @@ -88,8 +87,17 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri doProbe: fun, interval: interval, initialDelay: initialDelay(name, interval), - labels: labels, + metrics: prometheus.NewRegistry(), + mInterval: prometheus.NewDesc("interval_secs", "Probe interval in seconds", nil, l), + mStartTime: prometheus.NewDesc("start_secs", "Latest probe start time (seconds since epoch)", nil, l), + mEndTime: prometheus.NewDesc("end_secs", "Latest probe end time (seconds since epoch)", nil, l), + mLatency: prometheus.NewDesc("latency_millis", "Latest probe latency (ms)", nil, l), + mResult: prometheus.NewDesc("result", "Latest probe result (1 = success, 0 = failure)", nil, l), } + + prometheus.WrapRegistererWithPrefix(p.namespace+"_", p.metrics).MustRegister(probe.metrics) + probe.metrics.MustRegister(probe) + p.probes[name] = probe go probe.loop() return probe @@ -98,6 +106,8 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri func (p *Prober) unregister(probe *Probe) { p.mu.Lock() defer p.mu.Unlock() + probe.metrics.Unregister(probe) + p.metrics.Unregister(probe.metrics) name := probe.name delete(p.probes, name) } @@ -116,6 +126,12 @@ func (p *Prober) WithOnce(s bool) *Prober { return p } +// WithMetricNamespace allows changing metric name prefix from the default `prober`. +func (p *Prober) WithMetricNamespace(n string) *Prober { + p.namespace = n + return p +} + // Wait blocks until all probes have finished execution. It should typically // be used with the `once` mode to wait for probes to finish before collecting // their results. @@ -159,7 +175,16 @@ type Probe struct { interval time.Duration initialDelay time.Duration tick ticker - labels map[string]string + + // metrics is a Prometheus metrics registry for metrics exported by this probe. + // Using a separate registry allows cleanly removing metrics exported by this + // probe when it gets unregistered. + metrics *prometheus.Registry + mInterval *prometheus.Desc + mStartTime *prometheus.Desc + mEndTime *prometheus.Desc + mLatency *prometheus.Desc + mResult *prometheus.Desc mu sync.Mutex start time.Time // last time doProbe started @@ -264,35 +289,28 @@ func (p *Probe) recordEnd(start time.Time, err error) { } } -type varExporter struct { - p *Prober -} - -// ProbeInfo is the state of a Probe. Used in expvar-format debug -// data. +// ProbeInfo is the state of a Probe. type ProbeInfo struct { - Labels map[string]string Start time.Time End time.Time - Latency string // as a string because time.Duration doesn't encode readably to JSON + Latency string Result bool Error string } -func (v varExporter) probeInfo() map[string]ProbeInfo { +func (p *Prober) ProbeInfo() map[string]ProbeInfo { out := map[string]ProbeInfo{} - v.p.mu.Lock() - probes := make([]*Probe, 0, len(v.p.probes)) - for _, probe := range v.p.probes { + p.mu.Lock() + probes := make([]*Probe, 0, len(p.probes)) + for _, probe := range p.probes { probes = append(probes, probe) } - v.p.mu.Unlock() + p.mu.Unlock() for _, probe := range probes { probe.mu.Lock() inf := ProbeInfo{ - Labels: probe.labels, Start: probe.start, End: probe.end, Result: probe.succeeded, @@ -309,71 +327,34 @@ func (v varExporter) probeInfo() map[string]ProbeInfo { return out } -// String implements expvar.Var, returning the prober's state as an -// encoded JSON map of probe name to its ProbeInfo. -func (v varExporter) String() string { - bs, err := json.Marshal(v.probeInfo()) - if err != nil { - return fmt.Sprintf(`{"error": %q}`, err) - } - return string(bs) +// Describe implements prometheus.Collector. +func (p *Probe) Describe(ch chan<- *prometheus.Desc) { + ch <- p.mInterval + ch <- p.mStartTime + ch <- p.mEndTime + ch <- p.mResult + ch <- p.mLatency } -// WritePrometheus writes the state of all probes to w. -// -// For each probe, WritePrometheus exports 5 variables: -// - _interval_secs, how frequently the probe runs. -// - _start_secs, when the probe last started running, in seconds since epoch. -// - _end_secs, when the probe last finished running, in seconds since epoch. -// - _latency_millis, how long the last probe cycle took, in -// milliseconds. This is just (end_secs-start_secs) in an easier to -// graph form. -// - _result, 1 if the last probe succeeded, 0 if it failed. -// -// Each probe has a set of static key/value labels (defined once at -// probe creation), which are added as Prometheus metric labels to -// that probe's variables. -func (v varExporter) WritePrometheus(w io.Writer, prefix string) { - v.p.mu.Lock() - probes := make([]*Probe, 0, len(v.p.probes)) - for _, probe := range v.p.probes { - probes = append(probes, probe) +// Collect implements prometheus.Collector. +func (p *Probe) Collect(ch chan<- prometheus.Metric) { + p.mu.Lock() + defer p.mu.Unlock() + ch <- prometheus.MustNewConstMetric(p.mInterval, prometheus.GaugeValue, p.interval.Seconds()) + if !p.start.IsZero() { + ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix())) } - v.p.mu.Unlock() - - sort.Slice(probes, func(i, j int) bool { - return probes[i].name < probes[j].name - }) - for _, probe := range probes { - probe.mu.Lock() - keys := make([]string, 0, len(probe.labels)) - for k := range probe.labels { - keys = append(keys, k) - } - sort.Strings(keys) - var sb strings.Builder - fmt.Fprintf(&sb, "name=%q", probe.name) - for _, k := range keys { - fmt.Fprintf(&sb, ",%s=%q", k, probe.labels[k]) - } - labels := sb.String() - - fmt.Fprintf(w, "%s_interval_secs{%s} %f\n", prefix, labels, probe.interval.Seconds()) - if !probe.start.IsZero() { - fmt.Fprintf(w, "%s_start_secs{%s} %d\n", prefix, labels, probe.start.Unix()) - } - if !probe.end.IsZero() { - fmt.Fprintf(w, "%s_end_secs{%s} %d\n", prefix, labels, probe.end.Unix()) - if probe.latency > 0 { - fmt.Fprintf(w, "%s_latency_millis{%s} %d\n", prefix, labels, probe.latency.Milliseconds()) - } - if probe.succeeded { - fmt.Fprintf(w, "%s_result{%s} 1\n", prefix, labels) - } else { - fmt.Fprintf(w, "%s_result{%s} 0\n", prefix, labels) - } - } - probe.mu.Unlock() + if p.end.IsZero() { + return + } + ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix())) + if p.succeeded { + ch <- prometheus.MustNewConstMetric(p.mResult, prometheus.GaugeValue, 1) + } else { + ch <- prometheus.MustNewConstMetric(p.mResult, prometheus.GaugeValue, 0) + } + if p.latency > 0 { + ch <- prometheus.MustNewConstMetric(p.mLatency, prometheus.GaugeValue, float64(p.latency.Milliseconds())) } } diff --git a/prober/prober_test.go b/prober/prober_test.go index 2535884e7..0eedf01cc 100644 --- a/prober/prober_test.go +++ b/prober/prober_test.go @@ -4,9 +4,7 @@ package prober import ( - "bytes" "context" - "encoding/json" "errors" "fmt" "strings" @@ -15,9 +13,8 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" + "github.com/prometheus/client_golang/prometheus/testutil" "tailscale.com/tstest" - "tailscale.com/tsweb" ) const ( @@ -187,6 +184,9 @@ func TestProberRun(t *testing.T) { checkCnt(startingProbes) clk.Advance(probeInterval + halfProbeInterval) checkCnt(startingProbes) + if c, err := testutil.GatherAndCount(p.metrics, "prober_result"); c != startingProbes || err != nil { + t.Fatalf("expected %d prober_result metrics; got %d (error %s)", startingProbes, c, err) + } keep := startingProbes / 2 @@ -197,69 +197,14 @@ func TestProberRun(t *testing.T) { clk.Advance(probeInterval) checkCnt(keep) -} - -func TestExpvar(t *testing.T) { - clk := newFakeTime() - p := newForTest(clk.Now, clk.NewTicker) - - var succeed atomic.Bool - p.Run("probe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error { - clk.Advance(aFewMillis) - if succeed.Load() { - return nil - } - return errors.New("failing, as instructed by test") - }) - - waitActiveProbes(t, p, clk, 1) - - check := func(name string, want ProbeInfo) { - t.Helper() - err := tstest.WaitFor(convergenceTimeout, func() error { - vars := probeExpvar(t, p) - if got, want := len(vars), 1; got != want { - return fmt.Errorf("wrong probe count in expvar, got %d want %d", got, want) - } - for k, v := range vars { - if k != name { - return fmt.Errorf("wrong probe name in expvar, got %q want %q", k, name) - } - if diff := cmp.Diff(v, &want); diff != "" { - return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff) - } - } - return nil - }) - if err != nil { - t.Fatal(err) - } + if c, err := testutil.GatherAndCount(p.metrics, "prober_result"); c != keep || err != nil { + t.Fatalf("expected %d prober_result metrics; got %d (error %s)", keep, c, err) } - - check("probe", ProbeInfo{ - Labels: map[string]string{"label": "value"}, - Start: epoch, - End: epoch.Add(aFewMillis), - Result: false, - Error: "failing, as instructed by test", - }) - - succeed.Store(true) - clk.Advance(probeInterval + halfProbeInterval) - - st := epoch.Add(probeInterval + halfProbeInterval + aFewMillis) - check("probe", ProbeInfo{ - Labels: map[string]string{"label": "value"}, - Start: st, - End: st.Add(aFewMillis), - Latency: aFewMillis.String(), - Result: true, - }) } func TestPrometheus(t *testing.T) { clk := newFakeTime() - p := newForTest(clk.Now, clk.NewTicker) + p := newForTest(clk.Now, clk.NewTicker).WithMetricNamespace("probe") var succeed atomic.Bool p.Run("testprobe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error { @@ -273,18 +218,22 @@ func TestPrometheus(t *testing.T) { waitActiveProbes(t, p, clk, 1) err := tstest.WaitFor(convergenceTimeout, func() error { - var b bytes.Buffer - p.Expvar().(tsweb.PrometheusVar).WritePrometheus(&b, "probe") - want := strings.TrimSpace(fmt.Sprintf(` -probe_interval_secs{name="testprobe",label="value"} %f -probe_start_secs{name="testprobe",label="value"} %d -probe_end_secs{name="testprobe",label="value"} %d -probe_result{name="testprobe",label="value"} 0 -`, probeInterval.Seconds(), epoch.Unix(), epoch.Add(aFewMillis).Unix())) - if diff := cmp.Diff(strings.TrimSpace(b.String()), want); diff != "" { - return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff) - } - return nil + want := fmt.Sprintf(` +# HELP probe_interval_secs Probe interval in seconds +# TYPE probe_interval_secs gauge +probe_interval_secs{label="value",name="testprobe"} %f +# HELP probe_start_secs Latest probe start time (seconds since epoch) +# TYPE probe_start_secs gauge +probe_start_secs{label="value",name="testprobe"} %d +# HELP probe_end_secs Latest probe end time (seconds since epoch) +# TYPE probe_end_secs gauge +probe_end_secs{label="value",name="testprobe"} %d +# HELP probe_result Latest probe result (1 = success, 0 = failure) +# TYPE probe_result gauge +probe_result{label="value",name="testprobe"} 0 +`, probeInterval.Seconds(), epoch.Unix(), epoch.Add(aFewMillis).Unix()) + return testutil.GatherAndCompare(p.metrics, strings.NewReader(want), + "probe_interval_secs", "probe_start_secs", "probe_end_secs", "probe_result") }) if err != nil { t.Fatal(err) @@ -294,21 +243,27 @@ probe_result{name="testprobe",label="value"} 0 clk.Advance(probeInterval + halfProbeInterval) err = tstest.WaitFor(convergenceTimeout, func() error { - var b bytes.Buffer - p.Expvar().(tsweb.PrometheusVar).WritePrometheus(&b, "probe") start := epoch.Add(probeInterval + halfProbeInterval) end := start.Add(aFewMillis) - want := strings.TrimSpace(fmt.Sprintf(` -probe_interval_secs{name="testprobe",label="value"} %f -probe_start_secs{name="testprobe",label="value"} %d -probe_end_secs{name="testprobe",label="value"} %d -probe_latency_millis{name="testprobe",label="value"} %d -probe_result{name="testprobe",label="value"} 1 -`, probeInterval.Seconds(), start.Unix(), end.Unix(), aFewMillis.Milliseconds())) - if diff := cmp.Diff(strings.TrimSpace(b.String()), want); diff != "" { - return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff) - } - return nil + want := fmt.Sprintf(` +# HELP probe_interval_secs Probe interval in seconds +# TYPE probe_interval_secs gauge +probe_interval_secs{label="value",name="testprobe"} %f +# HELP probe_start_secs Latest probe start time (seconds since epoch) +# TYPE probe_start_secs gauge +probe_start_secs{label="value",name="testprobe"} %d +# HELP probe_end_secs Latest probe end time (seconds since epoch) +# TYPE probe_end_secs gauge +probe_end_secs{label="value",name="testprobe"} %d +# HELP probe_latency_millis Latest probe latency (ms) +# TYPE probe_latency_millis gauge +probe_latency_millis{label="value",name="testprobe"} %d +# HELP probe_result Latest probe result (1 = success, 0 = failure) +# TYPE probe_result gauge +probe_result{label="value",name="testprobe"} 1 +`, probeInterval.Seconds(), start.Unix(), end.Unix(), aFewMillis.Milliseconds()) + return testutil.GatherAndCompare(p.metrics, strings.NewReader(want), + "probe_interval_secs", "probe_start_secs", "probe_end_secs", "probe_latency_millis", "probe_result") }) if err != nil { t.Fatal(err) @@ -329,13 +284,10 @@ func TestOnceMode(t *testing.T) { }) p.Wait() - info := p.ProbeInfo() - if len(info) != 4 { - t.Errorf("expected 4 probe results, got %+v", info) - } - for _, p := range info { - if p.End.IsZero() { - t.Errorf("expected all probes to finish; got %+v", p) + wantCount := 4 + for _, metric := range []string{"prober_result", "prober_end_secs"} { + if c, err := testutil.GatherAndCount(p.metrics, metric); c != wantCount || err != nil { + t.Fatalf("expected %d %s metrics; got %d (error %s)", wantCount, metric, c, err) } } } @@ -433,16 +385,6 @@ func (t *fakeTime) activeTickers() (count int) { return } -func probeExpvar(t *testing.T, p *Prober) map[string]*ProbeInfo { - t.Helper() - s := p.Expvar().String() - ret := map[string]*ProbeInfo{} - if err := json.Unmarshal([]byte(s), &ret); err != nil { - t.Fatalf("expvar json decode failed: %v", err) - } - return ret -} - func waitActiveProbes(t *testing.T, p *Prober, clk *fakeTime, want int) { t.Helper() err := tstest.WaitFor(convergenceTimeout, func() error { diff --git a/tsweb/tsweb.go b/tsweb/tsweb.go index 049765773..3739d18ea 100644 --- a/tsweb/tsweb.go +++ b/tsweb/tsweb.go @@ -448,15 +448,6 @@ func Error(code int, msg string, err error) HTTPError { return HTTPError{Code: code, Msg: msg, Err: err} } -// PrometheusVar is a value that knows how to format itself into -// Prometheus metric syntax. -type PrometheusVar interface { - // WritePrometheus writes the value of the var to w, in Prometheus - // metric syntax. All variables names written out must start with - // prefix (or write out a single variable named exactly prefix) - WritePrometheus(w io.Writer, prefix string) -} - // WritePrometheusExpvar writes kv to w in Prometheus metrics format. // // See VarzHandler for conventions. This is exported primarily for @@ -510,9 +501,6 @@ func writePromExpVar(w io.Writer, prefix string, kv expvar.KeyValue) { name, typ, label := prometheusMetric(prefix, key) switch v := kv.Value.(type) { - case PrometheusVar: - v.WritePrometheus(w, name) - return case *expvar.Int: if typ == "" { typ = "counter" diff --git a/tsweb/tsweb_test.go b/tsweb/tsweb_test.go index 37522029a..cf67d474f 100644 --- a/tsweb/tsweb_test.go +++ b/tsweb/tsweb_test.go @@ -9,7 +9,6 @@ import ( "errors" "expvar" "fmt" - "io" "net" "net/http" "net/http/httptest" @@ -570,12 +569,6 @@ foo_totalY 4 expvar.Func(func() any { return 123 }), "num_goroutines 123\n", }, - { - "var_that_exports_itself", - "custom_var", - promWriter{}, - "custom_var_value 42\n", - }, { "string_version_var", "foo_version", @@ -694,16 +687,6 @@ func (a expvarAdapter2) PrometheusMetricsReflectRoot() any { return a.st } -type promWriter struct{} - -func (promWriter) WritePrometheus(w io.Writer, prefix string) { - fmt.Fprintf(w, "%s_value 42\n", prefix) -} - -func (promWriter) String() string { - return "" -} - func TestAcceptsEncoding(t *testing.T) { tests := []struct { in, enc string