2020-02-05 22:16:58 +00:00
|
|
|
// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style
|
|
|
|
// license that can be found in the LICENSE file.
|
|
|
|
|
|
|
|
// Package logtail sends logs to log.tailscale.io.
|
|
|
|
package logtail
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
2022-05-18 06:28:57 +01:00
|
|
|
"crypto/rand"
|
|
|
|
"encoding/binary"
|
2020-02-05 22:16:58 +00:00
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
|
|
|
"net/http"
|
|
|
|
"os"
|
2020-11-24 02:35:49 +00:00
|
|
|
"strconv"
|
2022-05-18 06:28:57 +01:00
|
|
|
"sync"
|
2021-05-25 23:02:52 +01:00
|
|
|
"sync/atomic"
|
2020-02-05 22:16:58 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"tailscale.com/logtail/backoff"
|
2021-03-02 23:21:32 +00:00
|
|
|
"tailscale.com/net/interfaces"
|
2022-04-18 21:43:03 +01:00
|
|
|
"tailscale.com/syncs"
|
Add tstest.PanicOnLog(), and fix various problems detected by this.
If a test calls log.Printf, 'go test' horrifyingly rearranges the
output to no longer be in chronological order, which makes debugging
virtually impossible. Let's stop that from happening by making
log.Printf panic if called from any module, no matter how deep, during
tests.
This required us to change the default error handler in at least one
http.Server, as well as plumbing a bunch of logf functions around,
especially in magicsock and wgengine, but also in logtail and backoff.
To add insult to injury, 'go test' also rearranges the output when a
parent test has multiple sub-tests (all the sub-test's t.Logf is always
printed after all the parent tests t.Logf), so we need to screw around
with a special Logf that can point at the "current" t (current_t.Logf)
in some places. Probably our entire way of using subtests is wrong,
since 'go test' would probably like to run them all in parallel if you
called t.Parallel(), but it definitely can't because the're all
manipulating the shared state created by the parent test. They should
probably all be separate toplevel tests instead, with common
setup/teardown logic. But that's a job for another time.
Signed-off-by: Avery Pennarun <apenwarr@tailscale.com>
2020-05-14 03:59:54 +01:00
|
|
|
tslogger "tailscale.com/types/logger"
|
2021-03-02 06:09:43 +00:00
|
|
|
"tailscale.com/wgengine/monitor"
|
2020-02-05 22:16:58 +00:00
|
|
|
)
|
|
|
|
|
2020-04-26 16:31:07 +01:00
|
|
|
// DefaultHost is the default host name to upload logs to when
|
|
|
|
// Config.BaseURL isn't provided.
|
|
|
|
const DefaultHost = "log.tailscale.io"
|
|
|
|
|
2021-11-16 04:52:43 +00:00
|
|
|
const (
|
|
|
|
// CollectionNode is the name of a logtail Config.Collection
|
|
|
|
// for tailscaled (or equivalent: IPNExtension, Android app).
|
|
|
|
CollectionNode = "tailnode.log.tailscale.io"
|
|
|
|
)
|
|
|
|
|
2020-02-05 22:16:58 +00:00
|
|
|
type Encoder interface {
|
|
|
|
EncodeAll(src, dst []byte) []byte
|
|
|
|
Close() error
|
|
|
|
}
|
|
|
|
|
|
|
|
type Config struct {
|
|
|
|
Collection string // collection name, a domain name
|
|
|
|
PrivateID PrivateID // machine-specific private identifier
|
|
|
|
BaseURL string // if empty defaults to "https://log.tailscale.io"
|
|
|
|
HTTPC *http.Client // if empty defaults to http.DefaultClient
|
|
|
|
SkipClientTime bool // if true, client_time is not written to logs
|
|
|
|
LowMemory bool // if true, logtail minimizes memory use
|
|
|
|
TimeNow func() time.Time // if set, subsitutes uses of time.Now
|
|
|
|
Stderr io.Writer // if set, logs are sent here instead of os.Stderr
|
2020-12-21 18:53:18 +00:00
|
|
|
StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
|
2020-02-05 22:16:58 +00:00
|
|
|
Buffer Buffer // temp storage, if nil a MemoryBuffer
|
|
|
|
NewZstdEncoder func() Encoder // if set, used to compress logs for transmission
|
2020-03-20 02:13:36 +00:00
|
|
|
|
2021-11-16 04:52:43 +00:00
|
|
|
// MetricsDelta, if non-nil, is a func that returns an encoding
|
|
|
|
// delta in clientmetrics to upload alongside existing logs.
|
|
|
|
// It can return either an empty string (for nothing) or a string
|
|
|
|
// that's safe to embed in a JSON string literal without further escaping.
|
|
|
|
MetricsDelta func() string
|
|
|
|
|
2020-11-17 21:43:40 +00:00
|
|
|
// DrainLogs, if non-nil, disables automatic uploading of new logs,
|
2020-03-20 02:13:36 +00:00
|
|
|
// so that logs are only uploaded when a token is sent to DrainLogs.
|
|
|
|
DrainLogs <-chan struct{}
|
2022-05-18 06:28:57 +01:00
|
|
|
|
|
|
|
// IncludeProcID, if true, results in an ephemeral process identifier being
|
|
|
|
// included in logs. The ID is random and not guaranteed to be globally
|
|
|
|
// unique, but it can be used to distinguish between different instances
|
|
|
|
// running with same PrivateID.
|
|
|
|
IncludeProcID bool
|
|
|
|
|
|
|
|
// IncludeProcSequence, if true, results in an ephemeral sequence number
|
|
|
|
// being included in the logs. The sequence number is incremented for each
|
|
|
|
// log message sent, but is not peristed across process restarts.
|
|
|
|
IncludeProcSequence bool
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
|
2020-12-21 17:03:39 +00:00
|
|
|
func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
|
2020-02-05 22:16:58 +00:00
|
|
|
if cfg.BaseURL == "" {
|
2020-04-26 16:31:07 +01:00
|
|
|
cfg.BaseURL = "https://" + DefaultHost
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
if cfg.HTTPC == nil {
|
|
|
|
cfg.HTTPC = http.DefaultClient
|
|
|
|
}
|
|
|
|
if cfg.TimeNow == nil {
|
|
|
|
cfg.TimeNow = time.Now
|
|
|
|
}
|
|
|
|
if cfg.Stderr == nil {
|
|
|
|
cfg.Stderr = os.Stderr
|
|
|
|
}
|
|
|
|
if cfg.Buffer == nil {
|
|
|
|
pendingSize := 256
|
|
|
|
if cfg.LowMemory {
|
|
|
|
pendingSize = 64
|
|
|
|
}
|
|
|
|
cfg.Buffer = NewMemoryBuffer(pendingSize)
|
|
|
|
}
|
2022-05-18 06:28:57 +01:00
|
|
|
var procID uint32
|
|
|
|
if cfg.IncludeProcID {
|
|
|
|
keyBytes := make([]byte, 4)
|
|
|
|
rand.Read(keyBytes)
|
|
|
|
procID = binary.LittleEndian.Uint32(keyBytes)
|
|
|
|
if procID == 0 {
|
|
|
|
// 0 is the empty/off value, assign a different (non-zero) value to
|
|
|
|
// make sure we still include an ID (actual value does not matter).
|
|
|
|
procID = 7
|
|
|
|
}
|
|
|
|
}
|
2020-12-21 17:03:39 +00:00
|
|
|
l := &Logger{
|
2022-03-15 02:52:06 +00:00
|
|
|
privateID: cfg.PrivateID,
|
2020-02-05 22:16:58 +00:00
|
|
|
stderr: cfg.Stderr,
|
2021-05-25 23:02:52 +01:00
|
|
|
stderrLevel: int64(cfg.StderrLevel),
|
2020-02-05 22:16:58 +00:00
|
|
|
httpc: cfg.HTTPC,
|
|
|
|
url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String(),
|
|
|
|
lowMem: cfg.LowMemory,
|
|
|
|
buffer: cfg.Buffer,
|
|
|
|
skipClientTime: cfg.SkipClientTime,
|
|
|
|
sent: make(chan struct{}, 1),
|
|
|
|
sentinel: make(chan int32, 16),
|
2020-03-20 02:13:36 +00:00
|
|
|
drainLogs: cfg.DrainLogs,
|
2020-02-05 22:16:58 +00:00
|
|
|
timeNow: cfg.TimeNow,
|
2020-08-09 05:03:20 +01:00
|
|
|
bo: backoff.NewBackoff("logtail", logf, 30*time.Second),
|
2021-11-16 04:52:43 +00:00
|
|
|
metricsDelta: cfg.MetricsDelta,
|
2020-02-05 22:16:58 +00:00
|
|
|
|
2022-05-18 06:28:57 +01:00
|
|
|
procID: procID,
|
|
|
|
includeProcSequence: cfg.IncludeProcSequence,
|
|
|
|
|
2020-02-05 22:16:58 +00:00
|
|
|
shutdownStart: make(chan struct{}),
|
|
|
|
shutdownDone: make(chan struct{}),
|
|
|
|
}
|
|
|
|
if cfg.NewZstdEncoder != nil {
|
|
|
|
l.zstdEncoder = cfg.NewZstdEncoder()
|
|
|
|
}
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
l.uploadCancel = cancel
|
|
|
|
|
|
|
|
go l.uploading(ctx)
|
|
|
|
l.Write([]byte("logtail started"))
|
|
|
|
return l
|
|
|
|
}
|
|
|
|
|
2020-12-21 17:03:39 +00:00
|
|
|
// Logger writes logs, splitting them as configured between local
|
|
|
|
// logging facilities and uploading to a log server.
|
|
|
|
type Logger struct {
|
2020-02-05 22:16:58 +00:00
|
|
|
stderr io.Writer
|
2021-05-25 23:02:52 +01:00
|
|
|
stderrLevel int64 // accessed atomically
|
2020-02-05 22:16:58 +00:00
|
|
|
httpc *http.Client
|
|
|
|
url string
|
|
|
|
lowMem bool
|
|
|
|
skipClientTime bool
|
2021-03-02 06:09:43 +00:00
|
|
|
linkMonitor *monitor.Mon
|
2020-02-05 22:16:58 +00:00
|
|
|
buffer Buffer
|
|
|
|
sent chan struct{} // signal to speed up drain
|
2020-03-20 02:13:36 +00:00
|
|
|
drainLogs <-chan struct{} // if non-nil, external signal to attempt a drain
|
2020-02-05 22:16:58 +00:00
|
|
|
sentinel chan int32
|
|
|
|
timeNow func() time.Time
|
2020-08-09 05:03:20 +01:00
|
|
|
bo *backoff.Backoff
|
2020-02-05 22:16:58 +00:00
|
|
|
zstdEncoder Encoder
|
|
|
|
uploadCancel func()
|
2021-07-28 23:36:23 +01:00
|
|
|
explainedRaw bool
|
2021-11-16 04:52:43 +00:00
|
|
|
metricsDelta func() string // or nil
|
2022-03-15 02:52:06 +00:00
|
|
|
privateID PrivateID
|
2020-02-05 22:16:58 +00:00
|
|
|
|
2022-05-18 06:28:57 +01:00
|
|
|
procID uint32
|
|
|
|
includeProcSequence bool
|
2022-07-28 05:06:25 +01:00
|
|
|
|
|
|
|
writeLock sync.Mutex // guards increments of procSequence
|
|
|
|
procSequence uint64
|
2022-05-18 06:28:57 +01:00
|
|
|
|
2020-02-05 22:16:58 +00:00
|
|
|
shutdownStart chan struct{} // closed when shutdown begins
|
2021-08-02 22:32:02 +01:00
|
|
|
shutdownDone chan struct{} // closed when shutdown complete
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
|
2020-12-21 18:53:18 +00:00
|
|
|
// SetVerbosityLevel controls the verbosity level that should be
|
|
|
|
// written to stderr. 0 is the default (not verbose). Levels 1 or higher
|
|
|
|
// are increasingly verbose.
|
|
|
|
func (l *Logger) SetVerbosityLevel(level int) {
|
2021-05-25 23:02:52 +01:00
|
|
|
atomic.StoreInt64(&l.stderrLevel, int64(level))
|
2020-12-21 18:53:18 +00:00
|
|
|
}
|
|
|
|
|
2021-03-02 06:09:43 +00:00
|
|
|
// SetLinkMonitor sets the optional the link monitor.
|
|
|
|
//
|
|
|
|
// It should not be changed concurrently with log writes and should
|
|
|
|
// only be set once.
|
|
|
|
func (l *Logger) SetLinkMonitor(lm *monitor.Mon) {
|
|
|
|
l.linkMonitor = lm
|
|
|
|
}
|
|
|
|
|
2022-03-15 02:52:06 +00:00
|
|
|
// PrivateID returns the logger's private log ID.
|
|
|
|
//
|
|
|
|
// It exists for internal use only.
|
|
|
|
func (l *Logger) PrivateID() PrivateID { return l.privateID }
|
|
|
|
|
2020-12-21 17:03:39 +00:00
|
|
|
// Shutdown gracefully shuts down the logger while completing any
|
|
|
|
// remaining uploads.
|
|
|
|
//
|
|
|
|
// It will block, continuing to try and upload unless the passed
|
|
|
|
// context object interrupts it by being done.
|
|
|
|
// If the shutdown is interrupted, an error is returned.
|
|
|
|
func (l *Logger) Shutdown(ctx context.Context) error {
|
2020-02-05 22:16:58 +00:00
|
|
|
done := make(chan struct{})
|
|
|
|
go func() {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
l.uploadCancel()
|
|
|
|
<-l.shutdownDone
|
|
|
|
case <-l.shutdownDone:
|
|
|
|
}
|
|
|
|
close(done)
|
|
|
|
}()
|
|
|
|
|
|
|
|
close(l.shutdownStart)
|
|
|
|
io.WriteString(l, "logger closing down\n")
|
|
|
|
<-done
|
|
|
|
|
|
|
|
if l.zstdEncoder != nil {
|
|
|
|
return l.zstdEncoder.Close()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-12-21 17:03:39 +00:00
|
|
|
// Close shuts down this logger object, the background log uploader
|
|
|
|
// process, and any associated goroutines.
|
|
|
|
//
|
|
|
|
// Deprecated: use Shutdown
|
|
|
|
func (l *Logger) Close() {
|
2020-02-11 07:36:17 +00:00
|
|
|
l.Shutdown(context.Background())
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
|
2020-03-20 02:13:36 +00:00
|
|
|
// drainBlock is called by drainPending when there are no logs to drain.
|
|
|
|
//
|
|
|
|
// In typical operation, every call to the Write method unblocks and triggers
|
|
|
|
// a buffer.TryReadline, so logs are written with very low latency.
|
|
|
|
//
|
|
|
|
// If the caller provides a DrainLogs channel, then unblock-drain-on-Write
|
|
|
|
// is disabled, and it is up to the caller to trigger unblock the drain.
|
2020-12-21 17:03:39 +00:00
|
|
|
func (l *Logger) drainBlock() (shuttingDown bool) {
|
2020-03-20 02:13:36 +00:00
|
|
|
if l.drainLogs == nil {
|
|
|
|
select {
|
|
|
|
case <-l.shutdownStart:
|
|
|
|
return true
|
|
|
|
case <-l.sent:
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
select {
|
|
|
|
case <-l.shutdownStart:
|
|
|
|
return true
|
|
|
|
case <-l.drainLogs:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
// drainPending drains and encodes a batch of logs from the buffer for upload.
|
2021-08-17 20:29:58 +01:00
|
|
|
// It uses scratch as its initial buffer.
|
2020-03-20 02:13:36 +00:00
|
|
|
// If no logs are available, drainPending blocks until logs are available.
|
2021-08-17 20:29:58 +01:00
|
|
|
func (l *Logger) drainPending(scratch []byte) (res []byte) {
|
|
|
|
buf := bytes.NewBuffer(scratch[:0])
|
2021-08-17 20:14:38 +01:00
|
|
|
buf.WriteByte('[')
|
2020-02-05 22:16:58 +00:00
|
|
|
entries := 0
|
|
|
|
|
|
|
|
var batchDone bool
|
2020-03-16 05:41:50 +00:00
|
|
|
const maxLen = 256 << 10
|
|
|
|
for buf.Len() < maxLen && !batchDone {
|
2020-02-05 22:16:58 +00:00
|
|
|
b, err := l.buffer.TryReadLine()
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
} else if err != nil {
|
|
|
|
b = []byte(fmt.Sprintf("reading ringbuffer: %v", err))
|
|
|
|
batchDone = true
|
|
|
|
} else if b == nil {
|
|
|
|
if entries > 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2020-03-20 02:13:36 +00:00
|
|
|
batchDone = l.drainBlock()
|
2020-02-05 22:16:58 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(b) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if b[0] != '{' || !json.Valid(b) {
|
|
|
|
// This is probably a log added to stderr by filch
|
|
|
|
// outside of the logtail logger. Encode it.
|
2021-07-28 23:36:23 +01:00
|
|
|
if !l.explainedRaw {
|
|
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
|
|
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n")
|
|
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
|
|
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR:\n")
|
|
|
|
l.explainedRaw = true
|
|
|
|
}
|
|
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b)
|
2022-05-18 06:28:57 +01:00
|
|
|
// Do not add a client time, as it could have been
|
|
|
|
// been written a long time ago. Don't include instance key or ID
|
|
|
|
// either, since this came from a different instance.
|
|
|
|
b = l.encodeText(b, true, 0, 0, 0)
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
|
2021-08-17 20:14:38 +01:00
|
|
|
if entries > 0 {
|
2020-02-05 22:16:58 +00:00
|
|
|
buf.WriteByte(',')
|
|
|
|
}
|
2021-08-17 20:14:38 +01:00
|
|
|
buf.Write(b)
|
2020-02-05 22:16:58 +00:00
|
|
|
entries++
|
|
|
|
}
|
|
|
|
|
2021-08-17 20:14:38 +01:00
|
|
|
buf.WriteByte(']')
|
|
|
|
if buf.Len() <= len("[]") {
|
2020-02-05 22:16:58 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return buf.Bytes()
|
|
|
|
}
|
|
|
|
|
|
|
|
// This is the goroutine that repeatedly uploads logs in the background.
|
2020-12-21 17:03:39 +00:00
|
|
|
func (l *Logger) uploading(ctx context.Context) {
|
2020-02-05 22:16:58 +00:00
|
|
|
defer close(l.shutdownDone)
|
|
|
|
|
2021-08-17 20:29:58 +01:00
|
|
|
scratch := make([]byte, 4096) // reusable buffer to write into
|
2020-02-05 22:16:58 +00:00
|
|
|
for {
|
2021-08-17 20:29:58 +01:00
|
|
|
body := l.drainPending(scratch)
|
2020-11-24 02:35:49 +00:00
|
|
|
origlen := -1 // sentinel value: uncompressed
|
|
|
|
// Don't attempt to compress tiny bodies; not worth the CPU cycles.
|
|
|
|
if l.zstdEncoder != nil && len(body) > 256 {
|
|
|
|
zbody := l.zstdEncoder.EncodeAll(body, nil)
|
|
|
|
// Only send it compressed if the bandwidth savings are sufficient.
|
|
|
|
// Just the extra headers associated with enabling compression
|
|
|
|
// are 50 bytes by themselves.
|
|
|
|
if len(body)-len(zbody) > 64 {
|
|
|
|
origlen = len(body)
|
|
|
|
body = zbody
|
|
|
|
}
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for len(body) > 0 {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
2020-11-24 02:35:49 +00:00
|
|
|
uploaded, err := l.upload(ctx, body, origlen)
|
2020-02-05 22:16:58 +00:00
|
|
|
if err != nil {
|
2021-03-02 23:21:32 +00:00
|
|
|
if !l.internetUp() {
|
|
|
|
fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n")
|
|
|
|
l.awaitInternetUp(ctx)
|
|
|
|
continue
|
|
|
|
}
|
2020-02-05 22:16:58 +00:00
|
|
|
fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err)
|
|
|
|
}
|
2020-06-05 08:06:29 +01:00
|
|
|
l.bo.BackOff(ctx, err)
|
2020-02-05 22:16:58 +00:00
|
|
|
if uploaded {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-l.shutdownStart:
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-02 23:21:32 +00:00
|
|
|
func (l *Logger) internetUp() bool {
|
|
|
|
if l.linkMonitor == nil {
|
|
|
|
// No way to tell, so assume it is.
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return l.linkMonitor.InterfaceState().AnyInterfaceUp()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *Logger) awaitInternetUp(ctx context.Context) {
|
|
|
|
upc := make(chan bool, 1)
|
|
|
|
defer l.linkMonitor.RegisterChangeCallback(func(changed bool, st *interfaces.State) {
|
|
|
|
if st.AnyInterfaceUp() {
|
|
|
|
select {
|
|
|
|
case upc <- true:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})()
|
|
|
|
if l.internetUp() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case <-upc:
|
|
|
|
fmt.Fprintf(l.stderr, "logtail: internet back up\n")
|
|
|
|
case <-ctx.Done():
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-24 02:35:49 +00:00
|
|
|
// upload uploads body to the log server.
|
|
|
|
// origlen indicates the pre-compression body length.
|
|
|
|
// origlen of -1 indicates that the body is not compressed.
|
2020-12-21 17:03:39 +00:00
|
|
|
func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (uploaded bool, err error) {
|
2022-05-27 23:59:08 +01:00
|
|
|
const maxUploadTime = 45 * time.Second
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "POST", l.url, bytes.NewReader(body))
|
2020-02-05 22:16:58 +00:00
|
|
|
if err != nil {
|
|
|
|
// I know of no conditions under which this could fail.
|
|
|
|
// Report it very loudly.
|
|
|
|
// TODO record logs to disk
|
|
|
|
panic("logtail: cannot build http request: " + err.Error())
|
|
|
|
}
|
2020-11-24 02:35:49 +00:00
|
|
|
if origlen != -1 {
|
2020-02-05 22:16:58 +00:00
|
|
|
req.Header.Add("Content-Encoding", "zstd")
|
2020-11-24 02:35:49 +00:00
|
|
|
req.Header.Add("Orig-Content-Length", strconv.Itoa(origlen))
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
2020-04-05 16:45:29 +01:00
|
|
|
req.Header["User-Agent"] = nil // not worth writing one; save some bytes
|
2020-02-05 22:16:58 +00:00
|
|
|
|
|
|
|
compressedNote := "not-compressed"
|
2020-11-24 02:35:49 +00:00
|
|
|
if origlen != -1 {
|
2020-02-05 22:16:58 +00:00
|
|
|
compressedNote = "compressed"
|
|
|
|
}
|
|
|
|
|
|
|
|
resp, err := l.httpc.Do(req)
|
|
|
|
if err != nil {
|
|
|
|
return false, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
2020-04-04 18:46:54 +01:00
|
|
|
|
2020-02-05 22:16:58 +00:00
|
|
|
if resp.StatusCode != 200 {
|
|
|
|
uploaded = resp.StatusCode == 400 // the server saved the logs anyway
|
2020-04-04 18:46:54 +01:00
|
|
|
b, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
2020-03-16 05:41:50 +00:00
|
|
|
return uploaded, fmt.Errorf("log upload of %d bytes %s failed %d: %q", len(body), compressedNote, resp.StatusCode, b)
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
2020-04-04 18:46:54 +01:00
|
|
|
|
|
|
|
// Try to read to EOF, in case server's response is
|
|
|
|
// chunked. We want to reuse the TCP connection if it's
|
|
|
|
// HTTP/1. On success, we expect 0 bytes.
|
|
|
|
// TODO(bradfitz): can remove a few days after 2020-04-04 once
|
|
|
|
// server is fixed.
|
|
|
|
if resp.ContentLength == -1 {
|
|
|
|
resp.Body.Read(make([]byte, 1))
|
|
|
|
}
|
2020-02-05 22:16:58 +00:00
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
|
2020-12-21 17:03:39 +00:00
|
|
|
// Flush uploads all logs to the server.
|
|
|
|
// It blocks until complete or there is an unrecoverable error.
|
|
|
|
func (l *Logger) Flush() error {
|
2020-02-05 22:16:58 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-04-18 21:43:03 +01:00
|
|
|
// logtailDisabled is whether logtail uploads to logcatcher are disabled.
|
|
|
|
var logtailDisabled syncs.AtomicBool
|
|
|
|
|
|
|
|
// Disable disables logtail uploads for the lifetime of the process.
|
|
|
|
func Disable() {
|
|
|
|
logtailDisabled.Set(true)
|
|
|
|
}
|
|
|
|
|
2022-07-28 05:06:25 +01:00
|
|
|
func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
|
2022-04-18 21:43:03 +01:00
|
|
|
if logtailDisabled.Get() {
|
|
|
|
return len(jsonBlob), nil
|
|
|
|
}
|
2020-02-05 22:16:58 +00:00
|
|
|
n, err := l.buffer.Write(jsonBlob)
|
2020-03-20 02:13:36 +00:00
|
|
|
if l.drainLogs == nil {
|
|
|
|
select {
|
|
|
|
case l.sent <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
|
2020-04-04 22:16:33 +01:00
|
|
|
// TODO: instead of allocating, this should probably just append
|
|
|
|
// directly into the output log buffer.
|
2022-05-18 06:28:57 +01:00
|
|
|
func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
|
2020-02-05 22:16:58 +00:00
|
|
|
now := l.timeNow()
|
|
|
|
|
2020-04-04 22:16:33 +01:00
|
|
|
// Factor in JSON encoding overhead to try to only do one alloc
|
|
|
|
// in the make below (so appends don't resize the buffer).
|
2022-05-18 06:28:57 +01:00
|
|
|
overhead := len(`{"text": ""}\n`)
|
|
|
|
includeLogtail := !skipClientTime || procID != 0 || procSequence != 0
|
|
|
|
if includeLogtail {
|
|
|
|
overhead += len(`"logtail": {},`)
|
|
|
|
}
|
2020-04-04 22:16:33 +01:00
|
|
|
if !skipClientTime {
|
2022-05-18 06:28:57 +01:00
|
|
|
overhead += len(`"client_time": "2006-01-02T15:04:05.999999999Z07:00",`)
|
|
|
|
}
|
|
|
|
if procID != 0 {
|
|
|
|
overhead += len(`"proc_id": 4294967296,`)
|
|
|
|
}
|
|
|
|
if procSequence != 0 {
|
|
|
|
overhead += len(`"proc_seq": 9007199254740992,`)
|
2020-04-04 22:16:33 +01:00
|
|
|
}
|
|
|
|
// TODO: do a pass over buf and count how many backslashes will be needed?
|
|
|
|
// For now just factor in a dozen.
|
|
|
|
overhead += 12
|
|
|
|
|
2022-01-13 22:02:46 +00:00
|
|
|
// Put a sanity cap on buf's size.
|
|
|
|
max := 16 << 10
|
|
|
|
if l.lowMem {
|
|
|
|
max = 255
|
|
|
|
}
|
|
|
|
var nTruncated int
|
|
|
|
if len(buf) > max {
|
|
|
|
nTruncated = len(buf) - max
|
|
|
|
// TODO: this can break a UTF-8 character
|
|
|
|
// mid-encoding. We don't tend to log
|
|
|
|
// non-ASCII stuff ourselves, but e.g. client
|
|
|
|
// names might be.
|
|
|
|
buf = buf[:max]
|
|
|
|
}
|
|
|
|
|
2020-04-04 22:16:33 +01:00
|
|
|
b := make([]byte, 0, len(buf)+overhead)
|
2020-02-05 22:16:58 +00:00
|
|
|
b = append(b, '{')
|
|
|
|
|
2022-05-18 06:28:57 +01:00
|
|
|
if includeLogtail {
|
|
|
|
b = append(b, `"logtail": {`...)
|
|
|
|
if !skipClientTime {
|
|
|
|
b = append(b, `"client_time": "`...)
|
|
|
|
b = now.AppendFormat(b, time.RFC3339Nano)
|
|
|
|
b = append(b, `",`...)
|
|
|
|
}
|
|
|
|
if procID != 0 {
|
|
|
|
b = append(b, `"proc_id": `...)
|
|
|
|
b = strconv.AppendUint(b, uint64(procID), 10)
|
|
|
|
b = append(b, ',')
|
|
|
|
}
|
|
|
|
if procSequence != 0 {
|
|
|
|
b = append(b, `"proc_seq": `...)
|
|
|
|
b = strconv.AppendUint(b, procSequence, 10)
|
|
|
|
b = append(b, ',')
|
|
|
|
}
|
|
|
|
b = bytes.TrimRight(b, ",")
|
|
|
|
b = append(b, "}, "...)
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
|
2021-11-16 04:52:43 +00:00
|
|
|
if l.metricsDelta != nil {
|
|
|
|
if d := l.metricsDelta(); d != "" {
|
|
|
|
b = append(b, `"metrics": "`...)
|
|
|
|
b = append(b, d...)
|
|
|
|
b = append(b, `",`...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-13 16:54:23 +00:00
|
|
|
// Add the log level, if non-zero. Note that we only use log
|
|
|
|
// levels 1 and 2 currently. It's unlikely we'll ever make it
|
|
|
|
// past 9.
|
|
|
|
if level > 0 && level < 10 {
|
|
|
|
b = append(b, `"v":`...)
|
|
|
|
b = append(b, '0'+byte(level))
|
|
|
|
b = append(b, ',')
|
|
|
|
}
|
2020-02-05 22:16:58 +00:00
|
|
|
b = append(b, "\"text\": \""...)
|
2022-01-13 22:02:46 +00:00
|
|
|
for _, c := range buf {
|
2020-02-05 22:16:58 +00:00
|
|
|
switch c {
|
|
|
|
case '\b':
|
|
|
|
b = append(b, '\\', 'b')
|
|
|
|
case '\f':
|
|
|
|
b = append(b, '\\', 'f')
|
|
|
|
case '\n':
|
|
|
|
b = append(b, '\\', 'n')
|
|
|
|
case '\r':
|
|
|
|
b = append(b, '\\', 'r')
|
|
|
|
case '\t':
|
|
|
|
b = append(b, '\\', 't')
|
|
|
|
case '"':
|
|
|
|
b = append(b, '\\', '"')
|
|
|
|
case '\\':
|
|
|
|
b = append(b, '\\', '\\')
|
|
|
|
default:
|
2020-04-04 22:16:33 +01:00
|
|
|
// TODO: what about binary gibberish or non UTF-8?
|
2020-02-05 22:16:58 +00:00
|
|
|
b = append(b, c)
|
|
|
|
}
|
2022-01-13 22:02:46 +00:00
|
|
|
}
|
|
|
|
if nTruncated > 0 {
|
|
|
|
b = append(b, "…+"...)
|
|
|
|
b = strconv.AppendInt(b, int64(nTruncated), 10)
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
b = append(b, "\"}\n"...)
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
2022-07-28 05:06:25 +01:00
|
|
|
func (l *Logger) encodeLocked(buf []byte, level int) []byte {
|
2022-05-18 06:28:57 +01:00
|
|
|
if l.includeProcSequence {
|
|
|
|
l.procSequence++
|
|
|
|
}
|
2020-02-05 22:16:58 +00:00
|
|
|
if buf[0] != '{' {
|
2022-05-18 06:28:57 +01:00
|
|
|
return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
now := l.timeNow()
|
|
|
|
|
2022-03-16 23:27:57 +00:00
|
|
|
obj := make(map[string]any)
|
2020-02-05 22:16:58 +00:00
|
|
|
if err := json.Unmarshal(buf, &obj); err != nil {
|
|
|
|
for k := range obj {
|
|
|
|
delete(obj, k)
|
|
|
|
}
|
|
|
|
obj["text"] = string(buf)
|
|
|
|
}
|
|
|
|
if txt, isStr := obj["text"].(string); l.lowMem && isStr && len(txt) > 254 {
|
|
|
|
// TODO(crawshaw): trim to unicode code point
|
|
|
|
obj["text"] = txt[:254] + "…"
|
|
|
|
}
|
|
|
|
|
|
|
|
hasLogtail := obj["logtail"] != nil
|
|
|
|
if hasLogtail {
|
|
|
|
obj["error_has_logtail"] = obj["logtail"]
|
|
|
|
obj["logtail"] = nil
|
|
|
|
}
|
2022-05-18 06:28:57 +01:00
|
|
|
if !l.skipClientTime || l.procID != 0 || l.procSequence != 0 {
|
|
|
|
logtail := map[string]any{}
|
|
|
|
if !l.skipClientTime {
|
|
|
|
logtail["client_time"] = now.Format(time.RFC3339Nano)
|
|
|
|
}
|
|
|
|
if l.procID != 0 {
|
|
|
|
logtail["proc_id"] = l.procID
|
|
|
|
}
|
|
|
|
if l.procSequence != 0 {
|
|
|
|
logtail["proc_seq"] = l.procSequence
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
2022-05-18 06:28:57 +01:00
|
|
|
obj["logtail"] = logtail
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
2022-02-18 04:41:49 +00:00
|
|
|
if level > 0 {
|
|
|
|
obj["v"] = level
|
|
|
|
}
|
2020-02-05 22:16:58 +00:00
|
|
|
|
|
|
|
b, err := json.Marshal(obj)
|
|
|
|
if err != nil {
|
|
|
|
fmt.Fprintf(l.stderr, "logtail: re-encoding JSON failed: %v\n", err)
|
|
|
|
// I know of no conditions under which this could fail.
|
|
|
|
// Report it very loudly.
|
|
|
|
panic("logtail: re-encoding JSON failed: " + err.Error())
|
|
|
|
}
|
|
|
|
b = append(b, '\n')
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
2021-12-16 03:07:52 +00:00
|
|
|
// Logf logs to l using the provided fmt-style format and optional arguments.
|
2022-03-16 23:27:57 +00:00
|
|
|
func (l *Logger) Logf(format string, args ...any) {
|
2021-12-16 03:07:52 +00:00
|
|
|
fmt.Fprintf(l, format, args...)
|
|
|
|
}
|
|
|
|
|
2020-12-21 17:03:39 +00:00
|
|
|
// Write logs an encoded JSON blob.
|
|
|
|
//
|
|
|
|
// If the []byte passed to Write is not an encoded JSON blob,
|
|
|
|
// then contents is fit into a JSON blob and written.
|
|
|
|
//
|
|
|
|
// This is intended as an interface for the stdlib "log" package.
|
|
|
|
func (l *Logger) Write(buf []byte) (int, error) {
|
2020-02-05 22:16:58 +00:00
|
|
|
if len(buf) == 0 {
|
|
|
|
return 0, nil
|
|
|
|
}
|
2020-12-21 18:53:18 +00:00
|
|
|
level, buf := parseAndRemoveLogLevel(buf)
|
2021-05-25 23:02:52 +01:00
|
|
|
if l.stderr != nil && l.stderr != ioutil.Discard && int64(level) <= atomic.LoadInt64(&l.stderrLevel) {
|
2020-02-05 22:16:58 +00:00
|
|
|
if buf[len(buf)-1] == '\n' {
|
|
|
|
l.stderr.Write(buf)
|
|
|
|
} else {
|
|
|
|
// The log package always line-terminates logs,
|
|
|
|
// so this is an uncommon path.
|
2020-03-16 05:41:50 +00:00
|
|
|
withNL := append(buf[:len(buf):len(buf)], '\n')
|
|
|
|
l.stderr.Write(withNL)
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
|
|
|
}
|
2022-07-28 05:06:25 +01:00
|
|
|
|
2022-05-18 06:28:57 +01:00
|
|
|
l.writeLock.Lock()
|
2022-07-28 05:06:25 +01:00
|
|
|
defer l.writeLock.Unlock()
|
|
|
|
|
|
|
|
b := l.encodeLocked(buf, level)
|
|
|
|
_, err := l.sendLocked(b)
|
2020-07-25 11:40:18 +01:00
|
|
|
return len(buf), err
|
2020-02-05 22:16:58 +00:00
|
|
|
}
|
2020-12-21 18:53:18 +00:00
|
|
|
|
|
|
|
var (
|
|
|
|
openBracketV = []byte("[v")
|
|
|
|
v1 = []byte("[v1] ")
|
|
|
|
v2 = []byte("[v2] ")
|
2022-02-18 04:41:49 +00:00
|
|
|
vJSON = []byte("[v\x00JSON]") // precedes log level '0'-'9' byte, then JSON value
|
2020-12-21 18:53:18 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// level 0 is normal (or unknown) level; 1+ are increasingly verbose
|
|
|
|
func parseAndRemoveLogLevel(buf []byte) (level int, cleanBuf []byte) {
|
|
|
|
if len(buf) == 0 || buf[0] == '{' || !bytes.Contains(buf, openBracketV) {
|
|
|
|
return 0, buf
|
|
|
|
}
|
|
|
|
if bytes.Contains(buf, v1) {
|
|
|
|
return 1, bytes.ReplaceAll(buf, v1, nil)
|
|
|
|
}
|
|
|
|
if bytes.Contains(buf, v2) {
|
|
|
|
return 2, bytes.ReplaceAll(buf, v2, nil)
|
|
|
|
}
|
2022-02-18 04:41:49 +00:00
|
|
|
if i := bytes.Index(buf, vJSON); i != -1 {
|
|
|
|
rest := buf[i+len(vJSON):]
|
|
|
|
if len(rest) >= 2 {
|
|
|
|
v := rest[0]
|
|
|
|
if v >= '0' && v <= '9' {
|
|
|
|
return int(v - '0'), rest[1:]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-12-21 18:53:18 +00:00
|
|
|
return 0, buf
|
|
|
|
}
|