diff --git a/internal/aghalg/ringbuffer.go b/internal/aghalg/ringbuffer.go new file mode 100644 index 00000000..b97bcc67 --- /dev/null +++ b/internal/aghalg/ringbuffer.go @@ -0,0 +1,102 @@ +package aghalg + +import ( + "github.com/AdguardTeam/golibs/errors" +) + +// RingBuffer is the implementation of ring buffer data structure. +type RingBuffer[T any] struct { + buf []T + cur int + full bool +} + +// NewRingBuffer initializes the new instance of ring buffer. size must be +// greater or equal to zero. +func NewRingBuffer[T any](size int) (rb *RingBuffer[T]) { + if size < 0 { + panic(errors.Error("ring buffer: size must be greater or equal to zero")) + } + + return &RingBuffer[T]{ + buf: make([]T, size), + } +} + +// Append appends an element to the buffer. +func (rb *RingBuffer[T]) Append(e T) { + if len(rb.buf) == 0 { + return + } + + rb.buf[rb.cur] = e + rb.cur = (rb.cur + 1) % cap(rb.buf) + if rb.cur == 0 { + rb.full = true + } +} + +// Range calls cb for each element of the buffer. If cb returns false it stops. +func (rb *RingBuffer[T]) Range(cb func(T) (cont bool)) { + before, after := rb.splitCur() + + for _, e := range before { + if !cb(e) { + return + } + } + + for _, e := range after { + if !cb(e) { + return + } + } +} + +// ReverseRange calls cb for each element of the buffer in reverse order. If +// cb returns false it stops. +func (rb *RingBuffer[T]) ReverseRange(cb func(T) (cont bool)) { + before, after := rb.splitCur() + + for i := len(after) - 1; i >= 0; i-- { + if !cb(after[i]) { + return + } + } + + for i := len(before) - 1; i >= 0; i-- { + if !cb(before[i]) { + return + } + } +} + +// splitCur splits the buffer in two, before and after current position in +// chronological order. If buffer is not full, after is nil. +func (rb *RingBuffer[T]) splitCur() (before, after []T) { + if len(rb.buf) == 0 { + return nil, nil + } + + cur := rb.cur + if !rb.full { + return rb.buf[:cur], nil + } + + return rb.buf[cur:], rb.buf[:cur] +} + +// Len returns a length of the buffer. +func (rb *RingBuffer[T]) Len() (l int) { + if !rb.full { + return rb.cur + } + + return cap(rb.buf) +} + +// Clear clears the buffer. +func (rb *RingBuffer[T]) Clear() { + rb.full = false + rb.cur = 0 +} diff --git a/internal/aghalg/ringbuffer_test.go b/internal/aghalg/ringbuffer_test.go new file mode 100644 index 00000000..43fbb46b --- /dev/null +++ b/internal/aghalg/ringbuffer_test.go @@ -0,0 +1,173 @@ +package aghalg_test + +import ( + "testing" + + "github.com/AdguardTeam/AdGuardHome/internal/aghalg" + "github.com/stretchr/testify/assert" + "golang.org/x/exp/slices" +) + +// elements is a helper function that returns n elements of the buffer. +func elements(b *aghalg.RingBuffer[int], n int, reverse bool) (es []int) { + fn := b.Range + if reverse { + fn = b.ReverseRange + } + + i := 0 + fn(func(e int) (cont bool) { + if i >= n { + return false + } + + es = append(es, e) + i++ + + return true + }) + + return es +} + +func TestNewRingBuffer(t *testing.T) { + t.Run("success_and_clear", func(t *testing.T) { + b := aghalg.NewRingBuffer[int](5) + for i := 0; i < 10; i++ { + b.Append(i) + } + assert.Equal(t, []int{5, 6, 7, 8, 9}, elements(b, b.Len(), false)) + + b.Clear() + assert.Zero(t, b.Len()) + }) + + t.Run("negative_size", func(t *testing.T) { + assert.PanicsWithError(t, "ring buffer: size must be greater or equal to zero", func() { + aghalg.NewRingBuffer[int](-5) + }) + }) + + t.Run("zero", func(t *testing.T) { + b := aghalg.NewRingBuffer[int](0) + for i := 0; i < 10; i++ { + b.Append(i) + assert.Equal(t, 0, b.Len()) + assert.Empty(t, elements(b, b.Len(), false)) + assert.Empty(t, elements(b, b.Len(), true)) + } + }) + + t.Run("single", func(t *testing.T) { + b := aghalg.NewRingBuffer[int](1) + for i := 0; i < 10; i++ { + b.Append(i) + assert.Equal(t, 1, b.Len()) + assert.Equal(t, []int{i}, elements(b, b.Len(), false)) + assert.Equal(t, []int{i}, elements(b, b.Len(), true)) + } + }) +} + +func TestRingBuffer_Range(t *testing.T) { + const size = 5 + + b := aghalg.NewRingBuffer[int](size) + + testCases := []struct { + name string + want []int + count int + length int + }{{ + name: "three", + count: 3, + length: 3, + want: []int{0, 1, 2}, + }, { + name: "ten", + count: 10, + length: size, + want: []int{5, 6, 7, 8, 9}, + }, { + name: "hundred", + count: 100, + length: size, + want: []int{95, 96, 97, 98, 99}, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for i := 0; i < tc.count; i++ { + b.Append(i) + } + + bufLen := b.Len() + assert.Equal(t, tc.length, bufLen) + + want := tc.want + assert.Equal(t, want, elements(b, bufLen, false)) + assert.Equal(t, want[:len(want)-1], elements(b, bufLen-1, false)) + assert.Equal(t, want[:len(want)/2], elements(b, bufLen/2, false)) + + want = want[:cap(want)] + slices.Reverse(want) + + assert.Equal(t, want, elements(b, bufLen, true)) + assert.Equal(t, want[:len(want)-1], elements(b, bufLen-1, true)) + assert.Equal(t, want[:len(want)/2], elements(b, bufLen/2, true)) + }) + } +} + +func TestRingBuffer_Range_increment(t *testing.T) { + const size = 5 + + b := aghalg.NewRingBuffer[int](size) + + testCases := []struct { + name string + want []int + }{{ + name: "one", + want: []int{0}, + }, { + name: "two", + want: []int{0, 1}, + }, { + name: "three", + want: []int{0, 1, 2}, + }, { + name: "four", + want: []int{0, 1, 2, 3}, + }, { + name: "five", + want: []int{0, 1, 2, 3, 4}, + }, { + name: "six", + want: []int{1, 2, 3, 4, 5}, + }, { + name: "seven", + want: []int{2, 3, 4, 5, 6}, + }, { + name: "eight", + want: []int{3, 4, 5, 6, 7}, + }, { + name: "nine", + want: []int{4, 5, 6, 7, 8}, + }, { + name: "ten", + want: []int{5, 6, 7, 8, 9}, + }} + + for i, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + b.Append(i) + + assert.Equal(t, tc.want, elements(b, b.Len(), false)) + + slices.Reverse(tc.want) + assert.Equal(t, tc.want, elements(b, b.Len(), true)) + }) + } +} diff --git a/internal/home/config.go b/internal/home/config.go index 7a70b893..96472168 100644 --- a/internal/home/config.go +++ b/internal/home/config.go @@ -262,7 +262,7 @@ type queryLogConfig struct { // MemSize is the number of entries kept in memory before they are flushed // to disk. - MemSize uint32 `yaml:"size_memory"` + MemSize int `yaml:"size_memory"` // Enabled defines if the query log is enabled. Enabled bool `yaml:"enabled"` diff --git a/internal/querylog/qlog.go b/internal/querylog/qlog.go index 67363510..988e2b0f 100644 --- a/internal/querylog/qlog.go +++ b/internal/querylog/qlog.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/AdguardTeam/AdGuardHome/internal/aghalg" "github.com/AdguardTeam/AdGuardHome/internal/aghnet" "github.com/AdguardTeam/AdGuardHome/internal/filtering" "github.com/AdguardTeam/golibs/errors" @@ -29,12 +30,12 @@ type queryLog struct { findClient func(ids []string) (c *Client, err error) - // logFile is the path to the log file. - logFile string - // buffer contains recent log entries. The entries in this buffer must not // be modified. - buffer []*logEntry + buffer *aghalg.RingBuffer[*logEntry] + + // logFile is the path to the log file. + logFile string // bufferLock protects buffer. bufferLock sync.RWMutex @@ -195,7 +196,7 @@ func newLogEntry(params *AddParams) (entry *logEntry) { // Add implements the [QueryLog] interface for *queryLog. func (l *queryLog) Add(params *AddParams) { var isEnabled, fileIsEnabled bool - var memSize uint32 + var memSize int func() { l.confMu.RLock() defer l.confMu.RUnlock() @@ -204,7 +205,7 @@ func (l *queryLog) Add(params *AddParams) { memSize = l.conf.MemSize }() - if !isEnabled { + if !isEnabled || memSize == 0 { return } @@ -221,36 +222,18 @@ func (l *queryLog) Add(params *AddParams) { entry := newLogEntry(params) - needFlush := false - func() { - l.bufferLock.Lock() - defer l.bufferLock.Unlock() + l.bufferLock.Lock() + defer l.bufferLock.Unlock() - l.buffer = append(l.buffer, entry) + l.buffer.Append(entry) - if !fileIsEnabled { - if len(l.buffer) > int(memSize) { - // Writing to file is disabled, so just remove the oldest entry - // from the slices. - // - // TODO(a.garipov): This should be replaced by a proper ring - // buffer, but it's currently difficult to do that. - l.buffer[0] = nil - l.buffer = l.buffer[1:] - } - } else if !l.flushPending { - needFlush = len(l.buffer) >= int(memSize) - if needFlush { - l.flushPending = true - } - } - }() + if !l.flushPending && fileIsEnabled && l.buffer.Len() >= memSize { + l.flushPending = true - if needFlush { go func() { flushErr := l.flushLogBuffer() if flushErr != nil { - log.Error("querylog: flushing after adding: %s", err) + log.Error("querylog: flushing after adding: %s", flushErr) } }() } diff --git a/internal/querylog/querylog.go b/internal/querylog/querylog.go index 7a09f40f..b9292c7d 100644 --- a/internal/querylog/querylog.go +++ b/internal/querylog/querylog.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/AdguardTeam/AdGuardHome/internal/aghalg" "github.com/AdguardTeam/AdGuardHome/internal/aghhttp" "github.com/AdguardTeam/AdGuardHome/internal/aghnet" "github.com/AdguardTeam/AdGuardHome/internal/filtering" @@ -62,7 +63,7 @@ type Config struct { // MemSize is the number of entries kept in a memory buffer before they are // flushed to disk. - MemSize uint32 + MemSize int // Enabled tells if the query log is enabled. Enabled bool @@ -142,9 +143,15 @@ func newQueryLog(conf Config) (l *queryLog, err error) { } } + if conf.MemSize < 0 { + return nil, errors.Error("memory size must be greater or equal to zero") + } + l = &queryLog{ findClient: findClient, + buffer: aghalg.NewRingBuffer[*logEntry](conf.MemSize), + conf: &Config{}, confMu: &sync.RWMutex{}, logFile: filepath.Join(conf.BaseDir, queryLogFileName), diff --git a/internal/querylog/querylogfile.go b/internal/querylog/querylogfile.go index 96682c0f..3783376f 100644 --- a/internal/querylog/querylogfile.go +++ b/internal/querylog/querylogfile.go @@ -14,68 +14,75 @@ import ( // flushLogBuffer flushes the current buffer to file and resets the current // buffer. func (l *queryLog) flushLogBuffer() (err error) { + defer func() { err = errors.Annotate(err, "flushing log buffer: %w") }() l.fileFlushLock.Lock() defer l.fileFlushLock.Unlock() - var flushBuffer []*logEntry - func() { - l.bufferLock.Lock() - defer l.bufferLock.Unlock() + b, err := l.encodeEntries() + if err != nil { + // Don't wrap the error since it's informative enough as is. + return err + } - flushBuffer = l.buffer - l.buffer = nil - l.flushPending = false - }() - - err = l.flushToFile(flushBuffer) - - return errors.Annotate(err, "writing to file: %w") + return l.flushToFile(b) } -// flushToFile saves the specified log entries to the query log file -func (l *queryLog) flushToFile(buffer []*logEntry) (err error) { - if len(buffer) == 0 { - log.Debug("querylog: nothing to write to a file") +// encodeEntries returns JSON encoded log entries, logs estimated time, clears +// the log buffer. +func (l *queryLog) encodeEntries() (b *bytes.Buffer, err error) { + l.bufferLock.Lock() + defer l.bufferLock.Unlock() - return nil + bufLen := l.buffer.Len() + if bufLen == 0 { + return nil, errors.Error("nothing to write to a file") } start := time.Now() - var b bytes.Buffer - e := json.NewEncoder(&b) - for _, entry := range buffer { - err = e.Encode(entry) - if err != nil { - log.Error("Failed to marshal entry: %s", err) + b = &bytes.Buffer{} + e := json.NewEncoder(b) - return err - } + l.buffer.Range(func(entry *logEntry) (cont bool) { + err = e.Encode(entry) + + return err == nil + }) + + if err != nil { + // Don't wrap the error since it's informative enough as is. + return nil, err } elapsed := time.Since(start) - log.Debug("%d elements serialized via json in %v: %d kB, %v/entry, %v/entry", len(buffer), elapsed, b.Len()/1024, float64(b.Len())/float64(len(buffer)), elapsed/time.Duration(len(buffer))) + log.Debug("%d elements serialized via json in %v: %d kB, %v/entry, %v/entry", bufLen, elapsed, b.Len()/1024, float64(b.Len())/float64(bufLen), elapsed/time.Duration(bufLen)) - var zb bytes.Buffer - filename := l.logFile - zb = b + l.buffer.Clear() + l.flushPending = false + return b, nil +} + +// flushToFile saves the encoded log entries to the query log file. +func (l *queryLog) flushToFile(b *bytes.Buffer) (err error) { l.fileWriteLock.Lock() defer l.fileWriteLock.Unlock() + + filename := l.logFile + f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644) if err != nil { - log.Error("failed to create file \"%s\": %s", filename, err) - return err + return fmt.Errorf("creating file %q: %w", filename, err) } + defer func() { err = errors.WithDeferred(err, f.Close()) }() - n, err := f.Write(zb.Bytes()) + n, err := f.Write(b.Bytes()) if err != nil { - log.Error("Couldn't write to file: %s", err) - return err + return fmt.Errorf("writing to file %q: %w", filename, err) } - log.Debug("querylog: ok \"%s\": %v bytes written", filename, n) + log.Debug("querylog: ok %q: %v bytes written", filename, n) return nil } diff --git a/internal/querylog/search.go b/internal/querylog/search.go index d89a60d5..b4fd3ffe 100644 --- a/internal/querylog/search.go +++ b/internal/querylog/search.go @@ -51,13 +51,12 @@ func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entrie l.bufferLock.Lock() defer l.bufferLock.Unlock() - // Go through the buffer in the reverse order, from newer to older. - var err error - for i := len(l.buffer) - 1; i >= 0; i-- { + l.buffer.ReverseRange(func(entry *logEntry) (cont bool) { // A shallow clone is enough, since the only thing that this loop // modifies is the client field. - e := l.buffer[i].shallowClone() + e := entry.shallowClone() + var err error e.client, err = l.client(e.ClientID, e.IP.String(), cache) if err != nil { msg := "querylog: enriching memory record at time %s" + @@ -70,9 +69,11 @@ func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entrie if params.match(e) { entries = append(entries, e) } - } - return entries, len(l.buffer) + return true + }) + + return entries, l.buffer.Len() } // search - searches log entries in the query log using specified parameters