2023-01-27 21:37:20 +00:00
|
|
|
// Copyright (c) Tailscale Inc & AUTHORS
|
|
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
2020-02-05 22:16:58 +00:00
|
|
|
|
|
|
|
package logtail
|
|
|
|
|
|
|
|
import (
|
2024-04-08 23:01:07 +01:00
|
|
|
"bytes"
|
2020-02-05 22:16:58 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Buffer interface {
|
|
|
|
// TryReadLine tries to read a log line from the ring buffer.
|
|
|
|
// If no line is available it returns a nil slice.
|
|
|
|
// If the ring buffer is closed it returns io.EOF.
|
2020-03-20 02:13:36 +00:00
|
|
|
//
|
|
|
|
// The returned slice may point to data that will be overwritten
|
|
|
|
// by a subsequent call to TryReadLine.
|
2020-02-05 22:16:58 +00:00
|
|
|
TryReadLine() ([]byte, error)
|
|
|
|
|
|
|
|
// Write writes a log line into the ring buffer.
|
2024-04-08 23:01:07 +01:00
|
|
|
// Implementations must not retain the provided buffer.
|
2020-02-05 22:16:58 +00:00
|
|
|
Write([]byte) (int, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewMemoryBuffer(numEntries int) Buffer {
|
|
|
|
return &memBuffer{
|
|
|
|
pending: make(chan qentry, numEntries),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type memBuffer struct {
|
|
|
|
next []byte
|
|
|
|
pending chan qentry
|
|
|
|
|
|
|
|
dropMu sync.Mutex
|
|
|
|
dropCount int
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *memBuffer) TryReadLine() ([]byte, error) {
|
|
|
|
if m.next != nil {
|
|
|
|
msg := m.next
|
|
|
|
m.next = nil
|
|
|
|
return msg, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case ent := <-m.pending:
|
|
|
|
if ent.dropCount > 0 {
|
|
|
|
m.next = ent.msg
|
2022-08-04 05:29:11 +01:00
|
|
|
return fmt.Appendf(nil, "----------- %d logs dropped ----------", ent.dropCount), nil
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
return ent.msg, nil
|
|
|
|
default:
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *memBuffer) Write(b []byte) (int, error) {
|
|
|
|
m.dropMu.Lock()
|
|
|
|
defer m.dropMu.Unlock()
|
|
|
|
|
|
|
|
ent := qentry{
|
2024-04-08 23:01:07 +01:00
|
|
|
msg: bytes.Clone(b),
|
2020-02-05 22:16:58 +00:00
|
|
|
dropCount: m.dropCount,
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case m.pending <- ent:
|
|
|
|
m.dropCount = 0
|
|
|
|
return len(b), nil
|
|
|
|
default:
|
|
|
|
m.dropCount++
|
|
|
|
return 0, errBufferFull
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type qentry struct {
|
|
|
|
msg []byte
|
|
|
|
dropCount int
|
|
|
|
}
|
|
|
|
|
|
|
|
var errBufferFull = errors.New("logtail: buffer full")
|