log/sockstatlog: add delay before writing logs to disk

Split apart polling of sockstats and logging them to disk.  Add a 3
second delay before writing logs to disk to prevent an infinite upload
loop when uploading stats to logcatcher.

Fixes #7719

Signed-off-by: Will Norris <will@tailscale.com>
This commit is contained in:
Will Norris 2023-03-28 20:27:52 -07:00 committed by Will Norris
parent 985535aebc
commit 62a1e9a44f
2 changed files with 69 additions and 17 deletions

View File

@ -1,7 +1,9 @@
// Copyright (c) Tailscale Inc & AUTHORS // Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause // SPDX-License-Identifier: BSD-3-Clause
// Package sockstatlog provides a logger for capturing and storing network socket stats. // Package sockstatlog provides a logger for capturing network socket stats for debugging.
// Stats are collected at a frequency of 10 Hz and logged to disk.
// Stats are only uploaded to the log server on demand.
package sockstatlog package sockstatlog
import ( import (
@ -25,19 +27,29 @@ import (
"tailscale.com/util/mak" "tailscale.com/util/mak"
) )
// pollPeriod specifies how often to poll for socket stats. // pollInterval specifies how often to poll for socket stats.
const pollPeriod = time.Second / 10 const pollInterval = time.Second / 10
// logInterval specifies how often to log sockstat events to disk.
// This delay is added to prevent an infinite loop when logs are uploaded,
// which itself creates additional sockstat events.
const logInterval = 3 * time.Second
// Logger logs statistics about network sockets. // Logger logs statistics about network sockets.
type Logger struct { type Logger struct {
// enabled identifies whether the logger is enabled.
enabled atomic.Bool enabled atomic.Bool
ctx context.Context ctx context.Context
cancelFn context.CancelFunc cancelFn context.CancelFunc
ticker *time.Ticker // eventCh is used to pass events from the poller to the logger.
logf logger.Logf eventCh chan event
logf logger.Logf
// logger is the underlying logtail logger than manages log files on disk
// and uploading to the log server.
logger *logtail.Logger logger *logtail.Logger
filch *filch.Filch filch *filch.Filch
tr http.RoundTripper tr http.RoundTripper
@ -73,6 +85,7 @@ func SockstatLogID(logID logid.PublicID) logid.PrivateID {
// NewLogger returns a new Logger that will store stats in logdir. // NewLogger returns a new Logger that will store stats in logdir.
// On platforms that do not support sockstat logging, a nil Logger will be returned. // On platforms that do not support sockstat logging, a nil Logger will be returned.
// The returned Logger is not yet enabled, and must be shut down with Shutdown when it is no longer needed. // The returned Logger is not yet enabled, and must be shut down with Shutdown when it is no longer needed.
// Logs will be uploaded to the log server using a new log ID derived from the provided backend logID.
func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID) (*Logger, error) { func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID) (*Logger, error) {
if !sockstats.IsAvailable { if !sockstats.IsAvailable {
return nil, nil return nil, nil
@ -88,10 +101,9 @@ func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID) (*Logger,
} }
logger := &Logger{ logger := &Logger{
ticker: time.NewTicker(pollPeriod), logf: logf,
logf: logf, filch: filch,
filch: filch, tr: logpolicy.NewLogtailTransport(logtail.DefaultHost),
tr: logpolicy.NewLogtailTransport(logtail.DefaultHost),
} }
logger.logger = logtail.NewLogger(logtail.Config{ logger.logger = logtail.NewLogger(logtail.Config{
BaseURL: logpolicy.LogURL(), BaseURL: logpolicy.LogURL(),
@ -124,8 +136,14 @@ func (l *Logger) SetLoggingEnabled(v bool) {
old := l.enabled.Load() old := l.enabled.Load()
if old != v && l.enabled.CompareAndSwap(old, v) { if old != v && l.enabled.CompareAndSwap(old, v) {
if v { if v {
if l.eventCh == nil {
// eventCh should be large enough for the number of events that will occur within logInterval.
// Add an extra second's worth of events to ensure we don't drop any.
l.eventCh = make(chan event, (logInterval+time.Second)/pollInterval)
}
l.ctx, l.cancelFn = context.WithCancel(context.Background()) l.ctx, l.cancelFn = context.WithCancel(context.Background())
go l.poll() go l.poll()
go l.logEvents()
} else { } else {
l.cancelFn() l.cancelFn()
} }
@ -137,19 +155,21 @@ func (l *Logger) Write(p []byte) (int, error) {
} }
// poll fetches the current socket stats at the configured time interval, // poll fetches the current socket stats at the configured time interval,
// calculates the delta since the last poll, and logs any non-zero values. // calculates the delta since the last poll,
// and writes any non-zero values to the logger event channel.
// This method does not return. // This method does not return.
func (l *Logger) poll() { func (l *Logger) poll() {
// last is the last set of socket stats we saw. // last is the last set of socket stats we saw.
var lastStats *sockstats.SockStats var lastStats *sockstats.SockStats
var lastTime time.Time var lastTime time.Time
enc := json.NewEncoder(l) ticker := time.NewTicker(pollInterval)
for { for {
select { select {
case <-l.ctx.Done(): case <-l.ctx.Done():
ticker.Stop()
return return
case t := <-l.ticker.C: case t := <-ticker.C:
stats := sockstats.Get() stats := sockstats.Get()
if lastStats != nil { if lastStats != nil {
diffstats := delta(lastStats, stats) diffstats := delta(lastStats, stats)
@ -162,9 +182,7 @@ func (l *Logger) poll() {
if stats.CurrentInterfaceCellular { if stats.CurrentInterfaceCellular {
e.IsCellularInterface = 1 e.IsCellularInterface = 1
} }
if err := enc.Encode(e); err != nil { l.eventCh <- e
l.logf("sockstatlog: error encoding log: %v", err)
}
} }
} }
lastTime = t lastTime = t
@ -173,6 +191,34 @@ func (l *Logger) poll() {
} }
} }
// logEvents reads events from the event channel at logInterval and logs them to disk.
// This method does not return.
func (l *Logger) logEvents() {
enc := json.NewEncoder(l)
flush := func() {
for {
select {
case e := <-l.eventCh:
if err := enc.Encode(e); err != nil {
l.logf("sockstatlog: error encoding log: %v", err)
}
default:
return
}
}
}
ticker := time.NewTicker(logInterval)
for {
select {
case <-l.ctx.Done():
ticker.Stop()
return
case <-ticker.C:
flush()
}
}
}
func (l *Logger) LogID() string { func (l *Logger) LogID() string {
if l.logger == nil { if l.logger == nil {
return "" return ""
@ -189,7 +235,6 @@ func (l *Logger) Shutdown() {
if l.cancelFn != nil { if l.cancelFn != nil {
l.cancelFn() l.cancelFn()
} }
l.ticker.Stop()
l.filch.Close() l.filch.Close()
l.logger.Shutdown(context.Background()) l.logger.Shutdown(context.Background())

View File

@ -150,6 +150,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
timeNow: cfg.TimeNow, timeNow: cfg.TimeNow,
bo: backoff.NewBackoff("logtail", stdLogf, 30*time.Second), bo: backoff.NewBackoff("logtail", stdLogf, 30*time.Second),
metricsDelta: cfg.MetricsDelta, metricsDelta: cfg.MetricsDelta,
sockstatsLabel: sockstats.LabelLogtailLogger,
procID: procID, procID: procID,
includeProcSequence: cfg.IncludeProcSequence, includeProcSequence: cfg.IncludeProcSequence,
@ -192,6 +193,7 @@ type Logger struct {
metricsDelta func() string // or nil metricsDelta func() string // or nil
privateID logid.PrivateID privateID logid.PrivateID
httpDoCalls atomic.Int32 httpDoCalls atomic.Int32
sockstatsLabel sockstats.Label
procID uint32 procID uint32
includeProcSequence bool includeProcSequence bool
@ -220,6 +222,11 @@ func (l *Logger) SetLinkMonitor(lm *monitor.Mon) {
l.linkMonitor = lm l.linkMonitor = lm
} }
// SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
func (l *Logger) SetSockstatsLabel(label sockstats.Label) {
l.sockstatsLabel = label
}
// PrivateID returns the logger's private log ID. // PrivateID returns the logger's private log ID.
// //
// It exists for internal use only. // It exists for internal use only.
@ -428,7 +435,7 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
// origlen of -1 indicates that the body is not compressed. // origlen of -1 indicates that the body is not compressed.
func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (uploaded bool, err error) { func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (uploaded bool, err error) {
const maxUploadTime = 45 * time.Second const maxUploadTime = 45 * time.Second
ctx = sockstats.WithSockStats(ctx, sockstats.LabelLogtailLogger) ctx = sockstats.WithSockStats(ctx, l.sockstatsLabel)
ctx, cancel := context.WithTimeout(ctx, maxUploadTime) ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
defer cancel() defer cancel()