diff --git a/logpolicy/logpolicy.go b/logpolicy/logpolicy.go index 696be98f2..c0a29a396 100644 --- a/logpolicy/logpolicy.go +++ b/logpolicy/logpolicy.go @@ -519,6 +519,8 @@ func New(collection string) *Policy { } if collection == logtail.CollectionNode { c.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta + c.IncludeProcID = true + c.IncludeProcSequence = true } if val := getLogTarget(); val != "" { diff --git a/logtail/logtail.go b/logtail/logtail.go index 9e7a26805..19dd6145a 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -8,6 +8,8 @@ package logtail import ( "bytes" "context" + "crypto/rand" + "encoding/binary" "encoding/json" "fmt" "io" @@ -15,6 +17,7 @@ import ( "net/http" "os" "strconv" + "sync" "sync/atomic" "time" @@ -62,6 +65,17 @@ type Config struct { // DrainLogs, if non-nil, disables automatic uploading of new logs, // so that logs are only uploaded when a token is sent to DrainLogs. DrainLogs <-chan struct{} + + // IncludeProcID, if true, results in an ephemeral process identifier being + // included in logs. The ID is random and not guaranteed to be globally + // unique, but it can be used to distinguish between different instances + // running with same PrivateID. + IncludeProcID bool + + // IncludeProcSequence, if true, results in an ephemeral sequence number + // being included in the logs. The sequence number is incremented for each + // log message sent, but is not peristed across process restarts. + IncludeProcSequence bool } func NewLogger(cfg Config, logf tslogger.Logf) *Logger { @@ -84,6 +98,17 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { } cfg.Buffer = NewMemoryBuffer(pendingSize) } + var procID uint32 + if cfg.IncludeProcID { + keyBytes := make([]byte, 4) + rand.Read(keyBytes) + procID = binary.LittleEndian.Uint32(keyBytes) + if procID == 0 { + // 0 is the empty/off value, assign a different (non-zero) value to + // make sure we still include an ID (actual value does not matter). + procID = 7 + } + } l := &Logger{ privateID: cfg.PrivateID, stderr: cfg.Stderr, @@ -100,6 +125,9 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { bo: backoff.NewBackoff("logtail", logf, 30*time.Second), metricsDelta: cfg.MetricsDelta, + procID: procID, + includeProcSequence: cfg.IncludeProcSequence, + shutdownStart: make(chan struct{}), shutdownDone: make(chan struct{}), } @@ -137,6 +165,11 @@ type Logger struct { metricsDelta func() string // or nil privateID PrivateID + procID uint32 + includeProcSequence bool + writeLock sync.Mutex // guards increments of procSequence + procSequence uint64 + shutdownStart chan struct{} // closed when shutdown begins shutdownDone chan struct{} // closed when shutdown complete } @@ -253,8 +286,6 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) { if b[0] != '{' || !json.Valid(b) { // This is probably a log added to stderr by filch // outside of the logtail logger. Encode it. - // Do not add a client time, as it could have been - // been written a long time ago. if !l.explainedRaw { fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n") fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n") @@ -263,7 +294,10 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) { l.explainedRaw = true } fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b) - b = l.encodeText(b, true, 0) + // Do not add a client time, as it could have been + // been written a long time ago. Don't include instance key or ID + // either, since this came from a different instance. + b = l.encodeText(b, true, 0, 0, 0) } if entries > 0 { @@ -437,14 +471,24 @@ func (l *Logger) send(jsonBlob []byte) (int, error) { // TODO: instead of allocating, this should probably just append // directly into the output log buffer. -func (l *Logger) encodeText(buf []byte, skipClientTime bool, level int) []byte { +func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte { now := l.timeNow() // Factor in JSON encoding overhead to try to only do one alloc // in the make below (so appends don't resize the buffer). - overhead := 13 + overhead := len(`{"text": ""}\n`) + includeLogtail := !skipClientTime || procID != 0 || procSequence != 0 + if includeLogtail { + overhead += len(`"logtail": {},`) + } if !skipClientTime { - overhead += 67 + overhead += len(`"client_time": "2006-01-02T15:04:05.999999999Z07:00",`) + } + if procID != 0 { + overhead += len(`"proc_id": 4294967296,`) + } + if procSequence != 0 { + overhead += len(`"proc_seq": 9007199254740992,`) } // TODO: do a pass over buf and count how many backslashes will be needed? // For now just factor in a dozen. @@ -468,10 +512,25 @@ func (l *Logger) encodeText(buf []byte, skipClientTime bool, level int) []byte { b := make([]byte, 0, len(buf)+overhead) b = append(b, '{') - if !skipClientTime { - b = append(b, `"logtail": {"client_time": "`...) - b = now.AppendFormat(b, time.RFC3339Nano) - b = append(b, "\"}, "...) + if includeLogtail { + b = append(b, `"logtail": {`...) + if !skipClientTime { + b = append(b, `"client_time": "`...) + b = now.AppendFormat(b, time.RFC3339Nano) + b = append(b, `",`...) + } + if procID != 0 { + b = append(b, `"proc_id": `...) + b = strconv.AppendUint(b, uint64(procID), 10) + b = append(b, ',') + } + if procSequence != 0 { + b = append(b, `"proc_seq": `...) + b = strconv.AppendUint(b, procSequence, 10) + b = append(b, ',') + } + b = bytes.TrimRight(b, ",") + b = append(b, "}, "...) } if l.metricsDelta != nil { @@ -521,8 +580,11 @@ func (l *Logger) encodeText(buf []byte, skipClientTime bool, level int) []byte { } func (l *Logger) encode(buf []byte, level int) []byte { + if l.includeProcSequence { + l.procSequence++ + } if buf[0] != '{' { - return l.encodeText(buf, l.skipClientTime, level) // text fast-path + return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path } now := l.timeNow() @@ -544,10 +606,18 @@ func (l *Logger) encode(buf []byte, level int) []byte { obj["error_has_logtail"] = obj["logtail"] obj["logtail"] = nil } - if !l.skipClientTime { - obj["logtail"] = map[string]string{ - "client_time": now.Format(time.RFC3339Nano), + if !l.skipClientTime || l.procID != 0 || l.procSequence != 0 { + logtail := map[string]any{} + if !l.skipClientTime { + logtail["client_time"] = now.Format(time.RFC3339Nano) } + if l.procID != 0 { + logtail["proc_id"] = l.procID + } + if l.procSequence != 0 { + logtail["proc_seq"] = l.procSequence + } + obj["logtail"] = logtail } if level > 0 { obj["v"] = level @@ -590,8 +660,10 @@ func (l *Logger) Write(buf []byte) (int, error) { l.stderr.Write(withNL) } } + l.writeLock.Lock() b := l.encode(buf, level) _, err := l.send(b) + l.writeLock.Unlock() return len(buf), err } diff --git a/logtail/logtail_test.go b/logtail/logtail_test.go index f5be19eae..7cd306496 100644 --- a/logtail/logtail_test.go +++ b/logtail/logtail_test.go @@ -216,8 +216,10 @@ var sink []byte func TestLoggerEncodeTextAllocs(t *testing.T) { lg := &Logger{timeNow: time.Now} inBuf := []byte("some text to encode") + procID := uint32(0x24d32ee9) + procSequence := uint64(0x12346) err := tstest.MinAllocsPerRun(t, 1, func() { - sink = lg.encodeText(inBuf, false, 0) + sink = lg.encodeText(inBuf, false, procID, procSequence, 0) }) if err != nil { t.Fatal(err) @@ -333,7 +335,7 @@ func unmarshalOne(t *testing.T, body []byte) map[string]any { func TestEncodeTextTruncation(t *testing.T) { lg := &Logger{timeNow: time.Now, lowMem: true} in := bytes.Repeat([]byte("a"), 300) - b := lg.encodeText(in, true, 0) + b := lg.encodeText(in, true, 0, 0, 0) got := string(b) want := `{"text": "` + strings.Repeat("a", 255) + `…+45"}` + "\n" if got != want { @@ -355,38 +357,40 @@ func TestEncode(t *testing.T) { }{ { "normal", - `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z"}, "text": "normal"}` + "\n", + `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z","proc_id": 7,"proc_seq": 1}, "text": "normal"}` + "\n", }, { "and a [v1] level one", - `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z"}, "v":1,"text": "and a level one"}` + "\n", + `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z","proc_id": 7,"proc_seq": 1}, "v":1,"text": "and a level one"}` + "\n", }, { "[v2] some verbose two", - `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z"}, "v":2,"text": "some verbose two"}` + "\n", + `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z","proc_id": 7,"proc_seq": 1}, "v":2,"text": "some verbose two"}` + "\n", }, { "{}", - `{"logtail":{"client_time":"1970-01-01T00:02:03.000000456Z"}}` + "\n", + `{"logtail":{"client_time":"1970-01-01T00:02:03.000000456Z","proc_id":7,"proc_seq":1}}` + "\n", }, { `{"foo":"bar"}`, - `{"foo":"bar","logtail":{"client_time":"1970-01-01T00:02:03.000000456Z"}}` + "\n", + `{"foo":"bar","logtail":{"client_time":"1970-01-01T00:02:03.000000456Z","proc_id":7,"proc_seq":1}}` + "\n", }, { "foo: [v\x00JSON]0{\"foo\":1}", - "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\"}}\n", + "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\",\"proc_id\":7,\"proc_seq\":1}}\n", }, { "foo: [v\x00JSON]2{\"foo\":1}", - "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\"},\"v\":2}\n", + "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\",\"proc_id\":7,\"proc_seq\":1},\"v\":2}\n", }, } for _, tt := range tests { buf := new(simpleMemBuf) lg := &Logger{ - timeNow: func() time.Time { return time.Unix(123, 456).UTC() }, - buffer: buf, + timeNow: func() time.Time { return time.Unix(123, 456).UTC() }, + buffer: buf, + procID: 7, + procSequence: 1, } io.WriteString(lg, tt.in) got := buf.buf.String()