Pull request: * querylog: fix end of log handling

Merge in DNS/adguard-home from 2229-query-log to master

Closes #2229.

Squashed commit of the following:

commit 32508a3f3b1e098869e1649a2774f1f17d14d41f
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Tue Nov 10 16:51:25 2020 +0300

    * querylog: add test

commit 774159cc313a0284a8bb8327489671e5d7a3e4eb
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Tue Nov 10 15:26:26 2020 +0300

    * querylog: better errors

commit 27b13a4dcaff9e8f9b08aec81c0c03f62ebd3fa5
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Tue Nov 10 15:18:51 2020 +0300

    * querylog: fix end of log handling
This commit is contained in:
Ainar Garipov 2020-11-10 19:00:55 +03:00
parent 979874c92b
commit 13f708b483
5 changed files with 66 additions and 18 deletions

View File

@ -7,6 +7,14 @@ import (
"strings" "strings"
) )
// Error is the constant error type.
type Error string
// Error implements the error interface for Error.
func (err Error) Error() (msg string) {
return string(err)
}
// manyError is an error containing several wrapped errors. It is created to be // manyError is an error containing several wrapped errors. It is created to be
// a simpler version of the API provided by github.com/joomcode/errorx. // a simpler version of the API provided by github.com/joomcode/errorx.
type manyError struct { type manyError struct {

View File

@ -1,18 +1,21 @@
package querylog package querylog
import ( import (
"errors"
"io" "io"
"os" "os"
"sync" "sync"
"time" "time"
"github.com/AdguardTeam/AdGuardHome/internal/agherr"
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
) )
// ErrSeekNotFound is returned from the Seek method // ErrSeekNotFound is returned from Seek if when it fails to find the requested
// if we failed to find the desired record // record.
var ErrSeekNotFound = errors.New("Seek not found the record") const ErrSeekNotFound agherr.Error = "seek: record not found"
// ErrEndOfLog is returned from Seek when the end of the current log is reached.
const ErrEndOfLog agherr.Error = "seek: end of log"
// TODO: Find a way to grow buffer instead of relying on this value when reading strings // TODO: Find a way to grow buffer instead of relying on this value when reading strings
const maxEntrySize = 16 * 1024 const maxEntrySize = 16 * 1024
@ -39,8 +42,7 @@ type QLogFile struct {
// NewQLogFile initializes a new instance of the QLogFile // NewQLogFile initializes a new instance of the QLogFile
func NewQLogFile(path string) (*QLogFile, error) { func NewQLogFile(path string) (*QLogFile, error) {
f, err := os.OpenFile(path, os.O_RDONLY, 0644) f, err := os.OpenFile(path, os.O_RDONLY, 0o644)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -106,6 +108,8 @@ func (q *QLogFile) Seek(timestamp int64) (int64, int, error) {
// the scope is too narrow and we won't find anything anymore // the scope is too narrow and we won't find anything anymore
log.Error("querylog: didn't find timestamp:%v", timestamp) log.Error("querylog: didn't find timestamp:%v", timestamp)
return 0, depth, ErrSeekNotFound return 0, depth, ErrSeekNotFound
} else if lineIdx == end && lineEndIdx == end {
return 0, depth, ErrEndOfLog
} }
// Save the last found idx // Save the last found idx
@ -227,7 +231,7 @@ func (q *QLogFile) readNextLine(position int64) (string, int64, error) {
// Look for the end of the prev line // Look for the end of the prev line
// This is where we'll read from // This is where we'll read from
var startLine = int64(0) startLine := int64(0)
for i := relativePos - 1; i >= 0; i-- { for i := relativePos - 1; i >= 0; i-- {
if q.buffer[i] == '\n' { if q.buffer[i] == '\n' {
startLine = i + 1 startLine = i + 1
@ -293,7 +297,7 @@ func (q *QLogFile) readProbeLine(position int64) (string, int64, int64, error) {
// Now start looking for the new line character starting // Now start looking for the new line character starting
// from the relativePos and going left // from the relativePos and going left
var startLine = int64(0) startLine := int64(0)
for i := relativePos - 1; i >= 0; i-- { for i := relativePos - 1; i >= 0; i-- {
if buffer[i] == '\n' { if buffer[i] == '\n' {
startLine = i + 1 startLine = i + 1
@ -301,7 +305,7 @@ func (q *QLogFile) readProbeLine(position int64) (string, int64, int64, error) {
} }
} }
// Looking for the end of line now // Looking for the end of line now
var endLine = int64(bufferLen) endLine := int64(bufferLen)
lineEndIdx := endLine + seekPosition lineEndIdx := endLine + seekPosition
for i := relativePos; i < int64(bufferLen); i++ { for i := relativePos; i < int64(bufferLen); i++ {
if buffer[i] == '\n' { if buffer[i] == '\n' {

View File

@ -288,3 +288,32 @@ func TestQLogSeek(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, depth) assert.Equal(t, 1, depth)
} }
func TestQLogSeek_ErrEndOfLog(t *testing.T) {
testDir := prepareTestDir()
t.Cleanup(func() {
_ = os.RemoveAll(testDir)
})
d := `{"T":"2020-08-31T18:44:23.911246629+03:00","QH":"wfqvjymurpwegyv","QT":"A","QC":"IN","CP":"","Answer":"","Result":{},"Elapsed":66286385,"Upstream":"tls://dns-unfiltered.adguard.com:853"}
{"T":"2020-08-31T18:44:25.376690873+03:00"}
{"T":"2020-08-31T18:44:25.382540454+03:00"}
`
f, err := ioutil.TempFile(testDir, "*.txt")
assert.Nil(t, err)
defer f.Close()
_, err = f.WriteString(d)
assert.Nil(t, err)
q, err := NewQLogFile(f.Name())
assert.Nil(t, err)
defer q.Close()
target, err := time.Parse(time.RFC3339, "2020-08-31T18:44:25.382540454+03:00")
assert.Nil(t, err)
_, depth, err := q.Seek(target.UnixNano() + int64(time.Second))
assert.Equal(t, ErrEndOfLog, err)
assert.Equal(t, 2, depth)
}

View File

@ -1,6 +1,7 @@
package querylog package querylog
import ( import (
"errors"
"io" "io"
"github.com/AdguardTeam/AdGuardHome/internal/agherr" "github.com/AdguardTeam/AdGuardHome/internal/agherr"
@ -52,11 +53,14 @@ func (r *QLogReader) Seek(timestamp int64) error {
for i := len(r.qFiles) - 1; i >= 0; i-- { for i := len(r.qFiles) - 1; i >= 0; i-- {
q := r.qFiles[i] q := r.qFiles[i]
_, _, err := q.Seek(timestamp) _, _, err := q.Seek(timestamp)
if err == nil { if err == nil || errors.Is(err, ErrEndOfLog) {
// Our search is finished, we found the element we were looking for // Our search is finished, and we either found the
// Update currentFile only, position is already set properly in the QLogFile // element we were looking for or reached the end of the
// log. Update currentFile only, position is already
// set properly in QLogFile.
r.currentFile = i r.currentFile = i
return nil
return err
} }
} }

View File

@ -1,6 +1,7 @@
package querylog package querylog
import ( import (
"errors"
"io" "io"
"time" "time"
@ -45,6 +46,7 @@ func (l *queryLog) search(params *searchParams) ([]*logEntry, time.Time) {
// remove extra records // remove extra records
entries = entries[:totalLimit] entries = entries[:totalLimit]
} }
if params.offset > 0 { if params.offset > 0 {
if len(entries) > params.offset { if len(entries) > params.offset {
entries = entries[params.offset:] entries = entries[params.offset:]
@ -53,11 +55,9 @@ func (l *queryLog) search(params *searchParams) ([]*logEntry, time.Time) {
oldest = time.Time{} oldest = time.Time{}
} }
} }
if len(entries) == totalLimit {
// change the "oldest" value here. if len(entries) > 0 && len(entries) <= totalLimit {
// we cannot use the "oldest" we got from "searchFiles" anymore // Update oldest after merging in the memory buffer.
// because after adding in-memory records and removing extra records
// the situation has changed
oldest = entries[len(entries)-1].Time oldest = entries[len(entries)-1].Time
} }
@ -95,6 +95,9 @@ func (l *queryLog) searchFiles(params *searchParams) ([]*logEntry, time.Time, in
// The one that was specified in the "oldest" param is not needed, // The one that was specified in the "oldest" param is not needed,
// we need only the one next to it // we need only the one next to it
_, err = r.ReadNext() _, err = r.ReadNext()
} else if errors.Is(err, ErrEndOfLog) {
// We've reached the end of the log.
return entries, time.Time{}, 0
} }
} }