diff --git a/cmd/stunstamp/stunstamp.go b/cmd/stunstamp/stunstamp.go index 605901775..0ec0efeb2 100644 --- a/cmd/stunstamp/stunstamp.go +++ b/cmd/stunstamp/stunstamp.go @@ -89,13 +89,19 @@ func (t timestampSource) String() string { } } -type result struct { - at time.Time +// resultKey contains the stable dimensions and their values for a given +// timeseries, i.e. not time and not rtt/timeout. +type resultKey struct { meta nodeMeta timestampSource timestampSource connStability connStability dstPort int - rtt *time.Duration // nil signifies failure, e.g. timeout +} + +type result struct { + key resultKey + at time.Time + rtt *time.Duration // nil signifies failure, e.g. timeout } func measureRTT(conn io.ReadWriteCloser, dst *net.UDPAddr) (rtt time.Duration, err error) { @@ -149,6 +155,10 @@ type nodeMeta struct { type measureFn func(conn io.ReadWriteCloser, dst *net.UDPAddr) (rtt time.Duration, err error) +// probe measures STUN round trip time for the node described by meta over +// conn against dstPort. It may return a nil duration and nil error if the +// STUN request timed out. A non-nil error indicates an unrecoverable or +// non-temporary error. func probe(meta nodeMeta, conn io.ReadWriteCloser, fn measureFn, dstPort int) (*time.Duration, error) { ua := &net.UDPAddr{ IP: net.IP(meta.addr.AsSlice()), @@ -162,10 +172,15 @@ func probe(meta nodeMeta, conn io.ReadWriteCloser, fn measureFn, dstPort int) (* log.Printf("temp error measuring RTT to %s(%s): %v", meta.hostname, ua.String(), err) return nil, nil } + return nil, err } return &rtt, nil } +// nodeMetaFromDERPMap parses the provided DERP map in order to update nodeMeta +// in the provided nodeMetaByAddr. It returns a slice of nodeMeta containing +// the nodes that are no longer seen in the DERP map, but were previously held +// in nodeMetaByAddr. func nodeMetaFromDERPMap(dm *tailcfg.DERPMap, nodeMetaByAddr map[netip.Addr]nodeMeta, ipv6 bool) (stale []nodeMeta, err error) { // Parse the new derp map before making any state changes in nodeMetaByAddr. // If parse fails we just stick with the old state. @@ -271,10 +286,12 @@ func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Ad doProbe := func(conn io.ReadWriteCloser, meta nodeMeta, source timestampSource, dstPort int) { defer wg.Done() r := result{ - at: at, - meta: meta, - timestampSource: source, - dstPort: dstPort, + key: resultKey{ + meta: meta, + timestampSource: source, + dstPort: dstPort, + }, + at: at, } if conn == nil { var err error @@ -293,7 +310,7 @@ func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Ad } defer conn.Close() } else { - r.connStability = stableConn + r.key.connStability = stableConn } fn := measureRTT if source == timestampSourceKernel { @@ -373,7 +390,12 @@ const ( stableConn connStability = true ) -func timeSeriesLabels(meta nodeMeta, instance string, source timestampSource, stability connStability, dstPort int) []prompb.Label { +const ( + rttMetricName = "stunstamp_derp_stun_rtt_ns" + timeoutsMetricName = "stunstamp_derp_stun_timeouts_total" +) + +func timeSeriesLabels(metricName string, meta nodeMeta, instance string, source timestampSource, stability connStability, dstPort int) []prompb.Label { addressFamily := "ipv4" if meta.addr.Is6() { addressFamily = "ipv6" @@ -409,7 +431,7 @@ func timeSeriesLabels(meta nodeMeta, instance string, source timestampSource, st }) labels = append(labels, prompb.Label{ Name: "__name__", - Value: "stunstamp_derp_stun_rtt_ns", + Value: metricName, }) labels = append(labels, prompb.Label{ Name: "timestamp_source", @@ -443,20 +465,36 @@ func staleMarkersFromNodeMeta(stale []nodeMeta, instance string, dstPorts []int) }, } staleMarkers = append(staleMarkers, prompb.TimeSeries{ - Labels: timeSeriesLabels(s, instance, timestampSourceUserspace, unstableConn, dstPort), + Labels: timeSeriesLabels(rttMetricName, s, instance, timestampSourceUserspace, unstableConn, dstPort), Samples: samples, }) staleMarkers = append(staleMarkers, prompb.TimeSeries{ - Labels: timeSeriesLabels(s, instance, timestampSourceUserspace, stableConn, dstPort), + Labels: timeSeriesLabels(rttMetricName, s, instance, timestampSourceUserspace, stableConn, dstPort), + Samples: samples, + }) + staleMarkers = append(staleMarkers, prompb.TimeSeries{ + Labels: timeSeriesLabels(timeoutsMetricName, s, instance, timestampSourceUserspace, unstableConn, dstPort), + Samples: samples, + }) + staleMarkers = append(staleMarkers, prompb.TimeSeries{ + Labels: timeSeriesLabels(timeoutsMetricName, s, instance, timestampSourceUserspace, stableConn, dstPort), Samples: samples, }) if supportsKernelTS() { staleMarkers = append(staleMarkers, prompb.TimeSeries{ - Labels: timeSeriesLabels(s, instance, timestampSourceKernel, unstableConn, dstPort), + Labels: timeSeriesLabels(rttMetricName, s, instance, timestampSourceKernel, unstableConn, dstPort), Samples: samples, }) staleMarkers = append(staleMarkers, prompb.TimeSeries{ - Labels: timeSeriesLabels(s, instance, timestampSourceKernel, stableConn, dstPort), + Labels: timeSeriesLabels(rttMetricName, s, instance, timestampSourceKernel, stableConn, dstPort), + Samples: samples, + }) + staleMarkers = append(staleMarkers, prompb.TimeSeries{ + Labels: timeSeriesLabels(timeoutsMetricName, s, instance, timestampSourceKernel, unstableConn, dstPort), + Samples: samples, + }) + staleMarkers = append(staleMarkers, prompb.TimeSeries{ + Labels: timeSeriesLabels(timeoutsMetricName, s, instance, timestampSourceKernel, stableConn, dstPort), Samples: samples, }) } @@ -465,25 +503,47 @@ func staleMarkersFromNodeMeta(stale []nodeMeta, instance string, dstPorts []int) return staleMarkers } -func resultToPromTimeSeries(r result, instance string) prompb.TimeSeries { - labels := timeSeriesLabels(r.meta, instance, r.timestampSource, r.connStability, r.dstPort) - samples := make([]prompb.Sample, 1) - samples[0].Timestamp = r.at.UnixMilli() - if r.rtt != nil { - samples[0].Value = float64(*r.rtt) - } else { - samples[0].Value = math.NaN() - // TODO: timeout counter +// resultsToPromTimeSeries returns a slice of prometheus TimeSeries for the +// provided results and instance. timeouts is updated based on results, i.e. +// all result.key's are added to timeouts if they do not exist, and removed +// from timeouts if they are not present in results. +func resultsToPromTimeSeries(results []result, instance string, timeouts map[resultKey]uint64) []prompb.TimeSeries { + all := make([]prompb.TimeSeries, 0, len(results)*2) + seenKeys := make(map[resultKey]bool) + for _, r := range results { + timeoutsCount := timeouts[r.key] // a non-existent key will return a zero val + seenKeys[r.key] = true + rttLabels := timeSeriesLabels(rttMetricName, r.key.meta, instance, r.key.timestampSource, r.key.connStability, r.key.dstPort) + rttSamples := make([]prompb.Sample, 1) + rttSamples[0].Timestamp = r.at.UnixMilli() + if r.rtt != nil { + rttSamples[0].Value = float64(*r.rtt) + } else { + rttSamples[0].Value = math.NaN() + timeoutsCount++ + } + rttTS := prompb.TimeSeries{ + Labels: rttLabels, + Samples: rttSamples, + } + all = append(all, rttTS) + timeouts[r.key] = timeoutsCount + timeoutsLabels := timeSeriesLabels(timeoutsMetricName, r.key.meta, instance, r.key.timestampSource, r.key.connStability, r.key.dstPort) + timeoutsSamples := make([]prompb.Sample, 1) + timeoutsSamples[0].Timestamp = r.at.UnixMilli() + timeoutsSamples[0].Value = float64(timeoutsCount) + timeoutsTS := prompb.TimeSeries{ + Labels: timeoutsLabels, + Samples: timeoutsSamples, + } + all = append(all, timeoutsTS) } - ts := prompb.TimeSeries{ - Labels: labels, - Samples: samples, + for k := range timeouts { + if !seenKeys[k] { + delete(timeouts, k) + } } - slices.SortFunc(ts.Labels, func(a, b prompb.Label) int { - // prometheus remote-write spec requires lexicographically sorted label names - return cmp.Compare(a.Name, b.Name) - }) - return ts + return all } type remoteWriteClient struct { @@ -719,6 +779,10 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT // comes into play. stableConns := make(map[netip.Addr]map[int][2]io.ReadWriteCloser) + // timeouts holds counts of timeout events. Values are persisted for the + // lifetime of the related node in the DERP map. + timeouts := make(map[resultKey]uint64) + derpMapTicker := time.NewTicker(time.Minute * 5) defer derpMapTicker.Stop() probeTicker := time.NewTicker(*flagInterval) @@ -744,10 +808,7 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT shutdown() return } - ts := make([]prompb.TimeSeries, 0, len(results)) - for _, r := range results { - ts = append(ts, resultToPromTimeSeries(r, *flagInstance)) - } + ts := resultsToPromTimeSeries(results, *flagInstance, timeouts) select { case tsCh <- ts: default: @@ -766,11 +827,11 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT } for _, result := range results { af := 4 - if result.meta.addr.Is6() { + if result.key.meta.addr.Is6() { af = 6 } _, err = tx.Exec("INSERT INTO rtt(at_unix, region_id, hostname, af, address, timestamp_source, stable_conn, dst_port, rtt_ns) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)", - result.at.Unix(), result.meta.regionID, result.meta.hostname, af, result.meta.addr.String(), result.timestampSource, result.connStability, result.dstPort, result.rtt) + result.at.Unix(), result.key.meta.regionID, result.key.meta.hostname, af, result.key.meta.addr.String(), result.key.timestampSource, result.key.connStability, result.key.dstPort, result.rtt) if err != nil { tx.Rollback() log.Printf("error adding result to tx: %v", err)