logtail: fix race condition with sockstats label (#8578)

Updates tailscale/corp#8427

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
This commit is contained in:
Joe Tsai 2023-07-11 10:51:51 -07:00 committed by GitHub
parent 2bbedd2001
commit 49015b00fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 9 additions and 4 deletions

View File

@ -146,7 +146,6 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
flushDelayFn: cfg.FlushDelayFn,
timeNow: cfg.TimeNow,
metricsDelta: cfg.MetricsDelta,
sockstatsLabel: sockstats.LabelLogtailLogger,
procID: procID,
includeProcSequence: cfg.IncludeProcSequence,
@ -154,6 +153,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
shutdownStart: make(chan struct{}),
shutdownDone: make(chan struct{}),
}
l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
if cfg.NewZstdEncoder != nil {
l.zstdEncoder = cfg.NewZstdEncoder()
}
@ -188,7 +188,7 @@ type Logger struct {
metricsDelta func() string // or nil
privateID logid.PrivateID
httpDoCalls atomic.Int32
sockstatsLabel sockstats.Label
sockstatsLabel atomicSocktatsLabel
procID uint32
includeProcSequence bool
@ -202,6 +202,11 @@ type Logger struct {
shutdownDone chan struct{} // closed when shutdown complete
}
type atomicSocktatsLabel struct{ p atomic.Uint32 }
func (p *atomicSocktatsLabel) Load() sockstats.Label { return sockstats.Label(p.p.Load()) }
func (p *atomicSocktatsLabel) Store(label sockstats.Label) { p.p.Store(uint32(label)) }
// SetVerbosityLevel controls the verbosity level that should be
// written to stderr. 0 is the default (not verbose). Levels 1 or higher
// are increasingly verbose.
@ -219,7 +224,7 @@ func (l *Logger) SetNetMon(lm *netmon.Monitor) {
// SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
func (l *Logger) SetSockstatsLabel(label sockstats.Label) {
l.sockstatsLabel = label
l.sockstatsLabel.Store(label)
}
// PrivateID returns the logger's private log ID.
@ -445,7 +450,7 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
// origlen of -1 indicates that the body is not compressed.
func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAfter time.Duration, err error) {
const maxUploadTime = 45 * time.Second
ctx = sockstats.WithSockStats(ctx, l.sockstatsLabel, l.Logf)
ctx = sockstats.WithSockStats(ctx, l.sockstatsLabel.Load(), l.Logf)
ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
defer cancel()