From 6d90966c1fc097c7738f81cf8a14538482221233 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Thu, 21 Mar 2024 06:15:47 -0700 Subject: [PATCH] logtail: move a scratch buffer to Logger Rather than pass around a scratch buffer, put it on the Logger. This is a baby step towards removing the background uploading goroutine and starting it as needed. Updates tailscale/corp#18514 (insofar as it led me to look at this code) Change-Id: I6fd94581c28bde40fdb9fca788eb9590bcedae1b Signed-off-by: Brad Fitzpatrick --- logtail/logtail.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/logtail/logtail.go b/logtail/logtail.go index c6d6c5a06..da19c3bbc 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -185,6 +185,7 @@ type Logger struct { netMonitor *netmon.Monitor buffer Buffer drainWake chan struct{} // signal to speed up drain + drainBuf bytes.Buffer // owned by drainPending for reuse flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay flushPending atomic.Bool sentinel chan int32 @@ -302,10 +303,10 @@ func (l *Logger) drainBlock() (shuttingDown bool) { } // drainPending drains and encodes a batch of logs from the buffer for upload. -// It uses scratch as its initial buffer. // If no logs are available, drainPending blocks until logs are available. -func (l *Logger) drainPending(scratch []byte) (res []byte) { - buf := bytes.NewBuffer(scratch[:0]) +func (l *Logger) drainPending() (res []byte) { + buf := &l.drainBuf + buf.Reset() buf.WriteByte('[') entries := 0 @@ -323,6 +324,14 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) { break } + // We're about to block. If we're holding on to too much memory + // in our buffer from a previous large write, let it go. + if buf.Available() > 4<<10 { + cur := buf.Bytes() + l.drainBuf = bytes.Buffer{} + buf.Write(cur) + } + batchDone = l.drainBlock() continue } @@ -365,9 +374,8 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) { func (l *Logger) uploading(ctx context.Context) { defer close(l.shutdownDone) - scratch := make([]byte, 4096) // reusable buffer to write into for { - body := l.drainPending(scratch) + body := l.drainPending() origlen := -1 // sentinel value: uncompressed // Don't attempt to compress tiny bodies; not worth the CPU cycles. if (l.compressLogs || l.zstdEncoder != nil) && len(body) > 256 {