Pull request: querylog imp code

Merge in DNS/adguard-home from querylog-imp-code to master

Squashed commit of the following:

commit a58ad36508a2355b686d314dec51ac0b5e357281
Merge: df5494f2c 941eb1dd7
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 15:26:55 2023 +0300

    Merge remote-tracking branch 'origin/master' into querylog-imp-code

commit df5494f2c337736690a3c2a547c2d71858d0378f
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 15:24:43 2023 +0300

    querylog: imp code

commit 8c3c2b76dd5858e7b107f222c112e9cde2477fb3
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 12:14:15 2023 +0300

    all: lint script

commit be04a4decfaf20a1649d32ecaab3c1c6bb205ffd
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 12:03:12 2023 +0300

    querylog: imp code

commit fe7beacff3a5cfcf2332c4998b9c65820284eaf7
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 11:57:33 2023 +0300

    querylog: imp docs

commit 2ae239c57d12524fbc092f582842af2ad726c1d0
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 11:46:54 2023 +0300

    querylog: imp code

commit 417216cefbf154fa870f8f43468f35e0e345971f
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 11:25:44 2023 +0300

    querylog: imp code

commit 514b6ee99113844a4e0dad30dc53703e3220c289
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 11:14:13 2023 +0300

    querylog: imp docs

commit 321351a3abb524208daacd5a3a7fbf5f07ab259d
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 16:38:31 2023 +0300

    querylog: imp code

commit ee91de5c43210b5bc213f933d411adb894d2e586
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 16:01:32 2023 +0300

    querylog: imp code

commit 862ff12177fb769d5cb2ec250eaee538dc91d70a
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 15:07:24 2023 +0300

    querylog: imp code

commit cc62c1c4ae8b813d03ccf51b596ba1ebf44d9a1f
Merge: 37ace34e9 24b41100c
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 13:09:10 2023 +0300

    Merge remote-tracking branch 'origin/master' into querylog-imp-code

commit 37ace34e91e5189bef6e774db960f40cdaa18270
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 11:23:08 2023 +0300

    querylog: imp code

commit 8417815a6349f10b5dbad410ce28aab98bc479fa
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 11:08:29 2023 +0300

    querylog: imp docs

commit 4e5cde74d25713f78675aa3e18083b4fb5e619f3
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 16:41:34 2023 +0300

    querylog: imp code

commit 3494eab7006240f652a0217d305ac916bd6c3c83
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 16:13:08 2023 +0300

    all: lint script

commit 704534ce6278e7d9b1bef30a3acc4e59f25693bc
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 16:12:04 2023 +0300

    querylog: imp code

commit 48510102a2fa5187f78067d2b9157dac62f8bb56
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 15:52:57 2023 +0300

    querylog: imp code

commit 89c273aea0e6758eb749a2d3bbaf1bc385a57797
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 15:40:50 2023 +0300

    querylog: imp code

commit 0057fe64553ad38de0fda10efb9d3512c9a00e45
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 13:54:46 2023 +0300

    querylog: imp code

... and 1 more commit
This commit is contained in:
Dimitry Kolyshev 2023-05-24 16:33:15 +03:00
parent 941eb1dd73
commit cbc7985e75
11 changed files with 515 additions and 382 deletions

View File

@ -3,19 +3,24 @@ package querylog
import ( import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"net" "net"
"strings" "strings"
"time" "time"
"github.com/AdguardTeam/AdGuardHome/internal/filtering" "github.com/AdguardTeam/AdGuardHome/internal/filtering"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/urlfilter/rules" "github.com/AdguardTeam/urlfilter/rules"
"github.com/miekg/dns" "github.com/miekg/dns"
) )
// logEntryHandler represents a handler for decoding json token to the logEntry
// struct.
type logEntryHandler func(t json.Token, ent *logEntry) error type logEntryHandler func(t json.Token, ent *logEntry) error
// logEntryHandlers is the map of log entry decode handlers for various keys.
var logEntryHandlers = map[string]logEntryHandler{ var logEntryHandlers = map[string]logEntryHandler{
"CID": func(t json.Token, ent *logEntry) error { "CID": func(t json.Token, ent *logEntry) error {
v, ok := t.(string) v, ok := t.(string)
@ -166,6 +171,7 @@ var logEntryHandlers = map[string]logEntryHandler{
}, },
} }
// decodeResultRuleKey decodes the token of "Rules" type to logEntry struct.
func decodeResultRuleKey(key string, i int, dec *json.Decoder, ent *logEntry) { func decodeResultRuleKey(key string, i int, dec *json.Decoder, ent *logEntry) {
var vToken json.Token var vToken json.Token
switch key { switch key {
@ -189,6 +195,8 @@ func decodeResultRuleKey(key string, i int, dec *json.Decoder, ent *logEntry) {
} }
} }
// decodeVTokenAndAddRule decodes the "Rules" toke as [filtering.ResultRule]
// and then adds the decoded object to the slice of result rules.
func decodeVTokenAndAddRule( func decodeVTokenAndAddRule(
key string, key string,
i int, i int,
@ -213,6 +221,8 @@ func decodeVTokenAndAddRule(
return newRules, vToken return newRules, vToken
} }
// decodeResultRules parses the dec's tokens into logEntry ent interpreting it
// as a slice of the result rules.
func decodeResultRules(dec *json.Decoder, ent *logEntry) { func decodeResultRules(dec *json.Decoder, ent *logEntry) {
for { for {
delimToken, err := dec.Token() delimToken, err := dec.Token()
@ -224,48 +234,53 @@ func decodeResultRules(dec *json.Decoder, ent *logEntry) {
return return
} }
if d, ok := delimToken.(json.Delim); ok { if d, ok := delimToken.(json.Delim); !ok {
if d != '[' { return
log.Debug("decodeResultRules: unexpected delim %q", d) } else if d != '[' {
log.Debug("decodeResultRules: unexpected delim %q", d)
}
err = decodeResultRuleToken(dec, ent)
if err != nil {
if err != io.EOF && !errors.Is(err, ErrEndOfToken) {
log.Debug("decodeResultRules err: %s", err)
} }
} else {
return return
} }
}
}
i := 0 // decodeResultRuleToken decodes the tokens of "Rules" type to the logEntry ent.
for { func decodeResultRuleToken(dec *json.Decoder, ent *logEntry) (err error) {
var keyToken json.Token i := 0
keyToken, err = dec.Token() for {
if err != nil { var keyToken json.Token
if err != io.EOF { keyToken, err = dec.Token()
log.Debug("decodeResultRules err: %s", err) if err != nil {
} // Don't wrap the error, because it's informative enough as is.
return err
return
}
if d, ok := keyToken.(json.Delim); ok {
switch d {
case '}':
i++
case ']':
return
default:
// Go on.
}
continue
}
key, ok := keyToken.(string)
if !ok {
log.Debug("decodeResultRules: keyToken is %T (%[1]v) and not string", keyToken)
return
}
decodeResultRuleKey(key, i, dec, ent)
} }
if d, ok := keyToken.(json.Delim); ok {
switch d {
case '}':
i++
case ']':
return ErrEndOfToken
default:
// Go on.
}
continue
}
key, ok := keyToken.(string)
if !ok {
return fmt.Errorf("keyToken is %T (%[1]v) and not string", keyToken)
}
decodeResultRuleKey(key, i, dec, ent)
} }
} }
@ -322,6 +337,8 @@ func decodeResultReverseHosts(dec *json.Decoder, ent *logEntry) {
} }
} }
// decodeResultIPList parses the dec's tokens into logEntry ent interpreting it
// as the result IP addresses list.
func decodeResultIPList(dec *json.Decoder, ent *logEntry) { func decodeResultIPList(dec *json.Decoder, ent *logEntry) {
for { for {
itemToken, err := dec.Token() itemToken, err := dec.Token()
@ -355,6 +372,8 @@ func decodeResultIPList(dec *json.Decoder, ent *logEntry) {
} }
} }
// decodeResultDNSRewriteResultKey decodes the token of "DNSRewriteResult" type
// to the logEntry struct.
func decodeResultDNSRewriteResultKey(key string, dec *json.Decoder, ent *logEntry) { func decodeResultDNSRewriteResultKey(key string, dec *json.Decoder, ent *logEntry) {
var err error var err error
@ -395,50 +414,29 @@ func decodeResultDNSRewriteResultKey(key string, dec *json.Decoder, ent *logEntr
log.Debug("decodeResultDNSRewriteResultKey response err: %s", err) log.Debug("decodeResultDNSRewriteResultKey response err: %s", err)
} }
for rrType, rrValues := range ent.Result.DNSRewriteResult.Response { ent.parseDNSRewriteResultIPs()
switch rrType {
case
dns.TypeA,
dns.TypeAAAA:
for i, v := range rrValues {
s, _ := v.(string)
rrValues[i] = net.ParseIP(s)
}
default:
// Go on.
}
}
default: default:
// Go on. // Go on.
} }
} }
// decodeResultDNSRewriteResult parses the dec's tokens into logEntry ent
// interpreting it as the result DNSRewriteResult.
func decodeResultDNSRewriteResult(dec *json.Decoder, ent *logEntry) { func decodeResultDNSRewriteResult(dec *json.Decoder, ent *logEntry) {
for { for {
keyToken, err := dec.Token() key, err := parseKeyToken(dec)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF && !errors.Is(err, ErrEndOfToken) {
log.Debug("decodeResultDNSRewriteResult err: %s", err) log.Debug("decodeResultDNSRewriteResult: %s", err)
} }
return return
} }
if d, ok := keyToken.(json.Delim); ok { if key == "" {
if d == '}' {
return
}
continue continue
} }
key, ok := keyToken.(string)
if !ok {
log.Debug("decodeResultDNSRewriteResult: keyToken is %T (%[1]v) and not string", keyToken)
return
}
decodeResultDNSRewriteResultKey(key, dec, ent) decodeResultDNSRewriteResultKey(key, dec, ent)
} }
} }
@ -474,34 +472,51 @@ func translateResult(ent *logEntry) {
res.IPList = nil res.IPList = nil
} }
// ErrEndOfToken is an error returned by parse key token when the closing
// bracket is found.
const ErrEndOfToken errors.Error = "end of token"
// parseKeyToken parses the dec's token key.
func parseKeyToken(dec *json.Decoder) (key string, err error) {
keyToken, err := dec.Token()
if err != nil {
return "", err
}
if d, ok := keyToken.(json.Delim); ok {
if d == '}' {
return "", ErrEndOfToken
}
return "", nil
}
key, ok := keyToken.(string)
if !ok {
return "", fmt.Errorf("keyToken is %T (%[1]v) and not string", keyToken)
}
return key, nil
}
// decodeResult decodes a token of "Result" type to logEntry struct.
func decodeResult(dec *json.Decoder, ent *logEntry) { func decodeResult(dec *json.Decoder, ent *logEntry) {
defer translateResult(ent) defer translateResult(ent)
for { for {
keyToken, err := dec.Token() key, err := parseKeyToken(dec)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF && !errors.Is(err, ErrEndOfToken) {
log.Debug("decodeResult err: %s", err) log.Debug("decodeResult: %s", err)
} }
return return
} }
if d, ok := keyToken.(json.Delim); ok { if key == "" {
if d == '}' {
return
}
continue continue
} }
key, ok := keyToken.(string)
if !ok {
log.Debug("decodeResult: keyToken is %T (%[1]v) and not string", keyToken)
return
}
decHandler, ok := resultDecHandlers[key] decHandler, ok := resultDecHandlers[key]
if ok { if ok {
decHandler(dec, ent) decHandler(dec, ent)
@ -527,13 +542,16 @@ func decodeResult(dec *json.Decoder, ent *logEntry) {
} }
} }
// resultHandlers is the map of log entry decode handlers for various keys.
var resultHandlers = map[string]logEntryHandler{ var resultHandlers = map[string]logEntryHandler{
"IsFiltered": func(t json.Token, ent *logEntry) error { "IsFiltered": func(t json.Token, ent *logEntry) error {
v, ok := t.(bool) v, ok := t.(bool)
if !ok { if !ok {
return nil return nil
} }
ent.Result.IsFiltered = v ent.Result.IsFiltered = v
return nil return nil
}, },
"Rule": func(t json.Token, ent *logEntry) error { "Rule": func(t json.Token, ent *logEntry) error {
@ -578,11 +596,14 @@ var resultHandlers = map[string]logEntryHandler{
if !ok { if !ok {
return nil return nil
} }
i, err := v.Int64() i, err := v.Int64()
if err != nil { if err != nil {
return err return err
} }
ent.Result.Reason = filtering.Reason(i) ent.Result.Reason = filtering.Reason(i)
return nil return nil
}, },
"ServiceName": func(t json.Token, ent *logEntry) error { "ServiceName": func(t json.Token, ent *logEntry) error {
@ -607,6 +628,7 @@ var resultHandlers = map[string]logEntryHandler{
}, },
} }
// resultDecHandlers is the map of decode handlers for various keys.
var resultDecHandlers = map[string]func(dec *json.Decoder, ent *logEntry){ var resultDecHandlers = map[string]func(dec *json.Decoder, ent *logEntry){
"ReverseHosts": decodeResultReverseHosts, "ReverseHosts": decodeResultReverseHosts,
"IPList": decodeResultIPList, "IPList": decodeResultIPList,
@ -614,9 +636,11 @@ var resultDecHandlers = map[string]func(dec *json.Decoder, ent *logEntry){
"DNSRewriteResult": decodeResultDNSRewriteResult, "DNSRewriteResult": decodeResultDNSRewriteResult,
} }
// decodeLogEntry decodes string str to logEntry ent.
func decodeLogEntry(ent *logEntry, str string) { func decodeLogEntry(ent *logEntry, str string) {
dec := json.NewDecoder(strings.NewReader(str)) dec := json.NewDecoder(strings.NewReader(str))
dec.UseNumber() dec.UseNumber()
for { for {
keyToken, err := dec.Token() keyToken, err := dec.Token()
if err != nil { if err != nil {

View File

@ -68,3 +68,19 @@ func (e *logEntry) addResponse(resp *dns.Msg, isOrig bool) {
log.Error("querylog: %s", err) log.Error("querylog: %s", err)
} }
} }
// parseDNSRewriteResultIPs fills logEntry's DNSRewriteResult response records
// with the IP addresses parsed from the raw strings.
func (e *logEntry) parseDNSRewriteResultIPs() {
for rrType, rrValues := range e.Result.DNSRewriteResult.Response {
switch rrType {
case dns.TypeA, dns.TypeAAAA:
for i, v := range rrValues {
s, _ := v.(string)
rrValues[i] = net.ParseIP(s)
}
default:
// Go on.
}
}
}

View File

@ -16,32 +16,35 @@ import (
"github.com/miekg/dns" "github.com/miekg/dns"
) )
const ( // queryLogFileName is a name of the log file. ".gz" extension is added later
queryLogFileName = "querylog.json" // .gz added during compression // during compression.
) const queryLogFileName = "querylog.json"
// queryLog is a structure that writes and reads the DNS query log // queryLog is a structure that writes and reads the DNS query log.
type queryLog struct { type queryLog struct {
findClient func(ids []string) (c *Client, err error)
// confMu protects conf. // confMu protects conf.
confMu *sync.RWMutex confMu *sync.RWMutex
conf *Config
conf *Config
anonymizer *aghnet.IPMut
findClient func(ids []string) (c *Client, err error)
// logFile is the path to the log file. // logFile is the path to the log file.
logFile string logFile string
// bufferLock protects buffer.
bufferLock sync.RWMutex
// buffer contains recent log entries. The entries in this buffer must not // buffer contains recent log entries. The entries in this buffer must not
// be modified. // be modified.
buffer []*logEntry buffer []*logEntry
fileFlushLock sync.Mutex // synchronize a file-flushing goroutine and main thread // bufferLock protects buffer.
flushPending bool // don't start another goroutine while the previous one is still running bufferLock sync.RWMutex
// fileFlushLock synchronizes a file-flushing goroutine and main thread.
fileFlushLock sync.Mutex
fileWriteLock sync.Mutex fileWriteLock sync.Mutex
anonymizer *aghnet.IPMut flushPending bool
} }
// ClientProto values are names of the client protocols. // ClientProto values are names of the client protocols.
@ -155,6 +158,43 @@ func (l *queryLog) clear() {
log.Debug("querylog: cleared") log.Debug("querylog: cleared")
} }
// newLogEntry creates an instance of logEntry from parameters.
func newLogEntry(params *AddParams) (entry *logEntry) {
q := params.Question.Question[0]
entry = &logEntry{
// TODO(d.kolyshev): Export this timestamp to func params.
Time: time.Now(),
QHost: strings.ToLower(q.Name[:len(q.Name)-1]),
QType: dns.Type(q.Qtype).String(),
QClass: dns.Class(q.Qclass).String(),
ClientID: params.ClientID,
ClientProto: params.ClientProto,
Result: *params.Result,
Upstream: params.Upstream,
IP: params.ClientIP,
Elapsed: params.Elapsed,
Cached: params.Cached,
AuthenticatedData: params.AuthenticatedData,
}
if params.ReqECS != nil {
entry.ReqECS = params.ReqECS.String()
}
entry.addResponse(params.Answer, false)
entry.addResponse(params.OrigAnswer, true)
return entry
}
// Add implements the [QueryLog] interface for *queryLog.
func (l *queryLog) Add(params *AddParams) { func (l *queryLog) Add(params *AddParams) {
var isEnabled, fileIsEnabled bool var isEnabled, fileIsEnabled bool
var memSize uint32 var memSize uint32
@ -181,35 +221,7 @@ func (l *queryLog) Add(params *AddParams) {
params.Result = &filtering.Result{} params.Result = &filtering.Result{}
} }
now := time.Now() entry := newLogEntry(params)
q := params.Question.Question[0]
entry := &logEntry{
Time: now,
QHost: strings.ToLower(q.Name[:len(q.Name)-1]),
QType: dns.Type(q.Qtype).String(),
QClass: dns.Class(q.Qclass).String(),
ClientID: params.ClientID,
ClientProto: params.ClientProto,
Result: *params.Result,
Upstream: params.Upstream,
IP: params.ClientIP,
Elapsed: params.Elapsed,
Cached: params.Cached,
AuthenticatedData: params.AuthenticatedData,
}
if params.ReqECS != nil {
entry.ReqECS = params.ReqECS.String()
}
entry.addResponse(params.Answer, false)
entry.addResponse(params.OrigAnswer, true)
needFlush := false needFlush := false
func() { func() {

View File

@ -45,9 +45,10 @@ func TestQueryLog(t *testing.T) {
addEntry(l, "example.com", net.IPv4(1, 1, 1, 4), net.IPv4(2, 2, 2, 4)) addEntry(l, "example.com", net.IPv4(1, 1, 1, 4), net.IPv4(2, 2, 2, 4))
type tcAssertion struct { type tcAssertion struct {
num int host string
host string answer net.IP
answer, client net.IP client net.IP
num int
} }
testCases := []struct { testCases := []struct {

View File

@ -12,141 +12,181 @@ import (
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
) )
// Timestamp not found errors.
const ( const (
ErrTSNotFound errors.Error = "ts not found" // Timestamp not found errors.
ErrTSTooLate errors.Error = "ts too late" errTSNotFound errors.Error = "ts not found"
ErrTSTooEarly errors.Error = "ts too early" errTSTooLate errors.Error = "ts too late"
errTSTooEarly errors.Error = "ts too early"
// maxEntrySize is a maximum size of the entry.
//
// TODO: Find a way to grow buffer instead of relying on this value when
// reading strings.
maxEntrySize = 16 * 1024
// bufferSize should be enough for at least this number of entries.
bufferSize = 100 * maxEntrySize
) )
// TODO: Find a way to grow buffer instead of relying on this value when reading strings // qLogFile represents a single query log file. It allows reading from the
const maxEntrySize = 16 * 1024 // file in the reverse order.
// buffer should be enough for at least this number of entries
const bufferSize = 100 * maxEntrySize
// QLogFile represents a single query log file
// It allows reading from the file in the reverse order
// //
// Please note that this is a stateful object. // Please note, that this is a stateful object. Internally, it contains a
// Internally, it contains a pointer to a specific position in the file, // pointer to a specific position in the file, and it reads lines in reverse
// and it reads lines in reverse order starting from that position. // order starting from that position.
type QLogFile struct { type qLogFile struct {
file *os.File // the query log file // file is the query log file.
position int64 // current position in the file file *os.File
buffer []byte // buffer that we've read from the file // buffer that we've read from the file.
bufferStart int64 // start of the buffer (in the file) buffer []byte
bufferLen int // buffer len
lock sync.Mutex // We use mutex to make it thread-safe // lock is a mutex to make it thread-safe.
lock sync.Mutex
// position is the position in the file.
position int64
// bufferStart is the start of the buffer (in the file).
bufferStart int64
// bufferLen is the length of the buffer.
bufferLen int
} }
// NewQLogFile initializes a new instance of the QLogFile // newQLogFile initializes a new instance of the qLogFile.
func NewQLogFile(path string) (*QLogFile, error) { func newQLogFile(path string) (qf *qLogFile, err error) {
f, err := os.OpenFile(path, os.O_RDONLY, 0o644) f, err := os.OpenFile(path, os.O_RDONLY, 0o644)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &QLogFile{ return &qLogFile{file: f}, nil
file: f, }
}, nil
// validateQLogLineIdx returns error if the line index is not valid to continue
// search.
func (q *qLogFile) validateQLogLineIdx(lineIdx, lastProbeLineIdx, ts, fSize int64) (err error) {
if lineIdx == lastProbeLineIdx {
if lineIdx == 0 {
return errTSTooEarly
}
// If we're testing the same line twice then most likely the scope is
// too narrow and we won't find anything anymore in any other file.
return fmt.Errorf("looking up timestamp %d in %q: %w", ts, q.file.Name(), errTSNotFound)
} else if lineIdx == fSize {
return errTSTooLate
}
return nil
} }
// seekTS performs binary search in the query log file looking for a record // seekTS performs binary search in the query log file looking for a record
// with the specified timestamp. Once the record is found, it sets // with the specified timestamp. Once the record is found, it sets "position"
// "position" so that the next ReadNext call returned that record. // so that the next ReadNext call returned that record.
// //
// The algorithm is rather simple: // The algorithm is rather simple:
// 1. It starts with the position in the middle of a file // 1. It starts with the position in the middle of a file.
// 2. Shifts back to the beginning of the line // 2. Shifts back to the beginning of the line.
// 3. Checks the log record timestamp // 3. Checks the log record timestamp.
// 4. If it is lower than the timestamp we are looking for, // 4. If it is lower than the timestamp we are looking for, it shifts seek
// it shifts seek position to 3/4 of the file. Otherwise, to 1/4 of the file. // position to 3/4 of the file. Otherwise, to 1/4 of the file.
// 5. It performs the search again, every time the search scope is narrowed twice. // 5. It performs the search again, every time the search scope is narrowed
// twice.
// //
// Returns: // Returns:
// * It returns the position of the the line with the timestamp we were looking for // - It returns the position of the line with the timestamp we were looking
// so that when we call "ReadNext" this line was returned. // for so that when we call "ReadNext" this line was returned.
// * Depth of the search (how many times we compared timestamps). // - Depth of the search (how many times we compared timestamps).
// * If we could not find it, it returns one of the errors described above. // - If we could not find it, it returns one of the errors described above.
func (q *QLogFile) seekTS(timestamp int64) (int64, int, error) { func (q *qLogFile) seekTS(timestamp int64) (pos int64, depth int, err error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Empty the buffer // Empty the buffer.
q.buffer = nil q.buffer = nil
// First of all, check the file size // First of all, check the file size.
fileInfo, err := q.file.Stat() fileInfo, err := q.file.Stat()
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
// Define the search scope // Define the search scope.
start := int64(0) // start of the search interval (position in the file)
end := fileInfo.Size() // end of the search interval (position in the file) // Start of the search interval (position in the file).
probe := (end - start) / 2 // probe -- approximate index of the line we'll try to check start := int64(0)
// End of the search interval (position in the file).
end := fileInfo.Size()
// Probe is the approximate index of the line we'll try to check.
probe := (end - start) / 2
var line string var line string
var lineIdx int64 // index of the probe line in the file // Index of the probe line in the file.
var lineIdx int64
var lineEndIdx int64 var lineEndIdx int64
var lastProbeLineIdx int64 // index of the last probe line // Index of the last probe line.
var lastProbeLineIdx int64
lastProbeLineIdx = -1 lastProbeLineIdx = -1
// Count seek depth in order to detect mistakes // Count seek depth in order to detect mistakes. If depth is too large,
// If depth is too large, we should stop the search // we should stop the search.
depth := 0
for { for {
// Get the line at the specified position // Get the line at the specified position.
line, lineIdx, lineEndIdx, err = q.readProbeLine(probe) line, lineIdx, lineEndIdx, err = q.readProbeLine(probe)
if err != nil { if err != nil {
return 0, depth, err return 0, depth, err
} }
if lineIdx == lastProbeLineIdx { // Check if the line index if invalid.
if lineIdx == 0 { err = q.validateQLogLineIdx(lineIdx, lastProbeLineIdx, timestamp, fileInfo.Size())
return 0, depth, ErrTSTooEarly if err != nil {
} return 0, depth, err
// If we're testing the same line twice then most likely
// the scope is too narrow and we won't find anything
// anymore in any other file.
return 0, depth, fmt.Errorf("looking up timestamp %d in %q: %w", timestamp, q.file.Name(), ErrTSNotFound)
} else if lineIdx == fileInfo.Size() {
return 0, depth, ErrTSTooLate
} }
// Save the last found idx // Save the last found idx.
lastProbeLineIdx = lineIdx lastProbeLineIdx = lineIdx
// Get the timestamp from the query log record // Get the timestamp from the query log record.
ts := readQLogTimestamp(line) ts := readQLogTimestamp(line)
if ts == 0 { if ts == 0 {
return 0, depth, fmt.Errorf("looking up timestamp %d in %q: record %q has empty timestamp", timestamp, q.file.Name(), line) return 0, depth, fmt.Errorf(
"looking up timestamp %d in %q: record %q has empty timestamp",
timestamp,
q.file.Name(),
line,
)
} }
if ts == timestamp { if ts == timestamp {
// Hurray, returning the result // Hurray, returning the result.
break break
} }
// Narrow the scope and repeat the search // Narrow the scope and repeat the search.
if ts > timestamp { if ts > timestamp {
// If the timestamp we're looking for is OLDER than what we found // If the timestamp we're looking for is OLDER than what we found,
// Then the line is somewhere on the LEFT side from the current probe position // then the line is somewhere on the LEFT side from the current
// probe position.
end = lineIdx end = lineIdx
} else { } else {
// If the timestamp we're looking for is NEWER than what we found // If the timestamp we're looking for is NEWER than what we found,
// Then the line is somewhere on the RIGHT side from the current probe position // then the line is somewhere on the RIGHT side from the current
// probe position.
start = lineEndIdx start = lineEndIdx
} }
probe = start + (end-start)/2 probe = start + (end-start)/2
depth++ depth++
if depth >= 100 { if depth >= 100 {
return 0, depth, fmt.Errorf("looking up timestamp %d in %q: depth %d too high: %w", timestamp, q.file.Name(), depth, ErrTSNotFound) return 0, depth, fmt.Errorf(
"looking up timestamp %d in %q: depth %d too high: %w",
timestamp,
q.file.Name(),
depth,
errTSNotFound,
)
} }
} }
@ -154,37 +194,39 @@ func (q *QLogFile) seekTS(timestamp int64) (int64, int, error) {
return q.position, depth, nil return q.position, depth, nil
} }
// SeekStart changes the current position to the end of the file // SeekStart changes the current position to the end of the file. Please note,
// Please note that we're reading query log in the reverse order // that we're reading query log in the reverse order and that's why log start
// and that's why log start is actually the end of file // is actually the end of file.
// //
// Returns nil if we were able to change the current position. // Returns nil if we were able to change the current position. Returns error
// Returns error in any other case. // in any other case.
func (q *QLogFile) SeekStart() (int64, error) { func (q *qLogFile) SeekStart() (int64, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Empty the buffer // Empty the buffer.
q.buffer = nil q.buffer = nil
// First of all, check the file size // First of all, check the file size.
fileInfo, err := q.file.Stat() fileInfo, err := q.file.Stat()
if err != nil { if err != nil {
return 0, err return 0, err
} }
// Place the position to the very end of file // Place the position to the very end of file.
q.position = fileInfo.Size() - 1 q.position = fileInfo.Size() - 1
if q.position < 0 { if q.position < 0 {
q.position = 0 q.position = 0
} }
return q.position, nil return q.position, nil
} }
// ReadNext reads the next line (in the reverse order) from the file // ReadNext reads the next line (in the reverse order) from the file and shifts
// and shifts the current position left to the next (actually prev) line. // the current position left to the next (actually prev) line.
// returns io.EOF if there's nothing to read more //
func (q *QLogFile) ReadNext() (string, error) { // Returns io.EOF if there's nothing more to read.
func (q *qLogFile) ReadNext() (string, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -197,35 +239,34 @@ func (q *QLogFile) ReadNext() (string, error) {
return "", err return "", err
} }
// Shift position // Shift position.
if lineIdx == 0 { if lineIdx == 0 {
q.position = 0 q.position = 0
} else { } else {
// there's usually a line break before the line // There's usually a line break before the line, so we should shift one
// so we should shift one more char left from the line // more char left from the line "\nline".
// line\nline
q.position = lineIdx - 1 q.position = lineIdx - 1
} }
return line, err return line, err
} }
// Close frees the underlying resources // Close frees the underlying resources.
func (q *QLogFile) Close() error { func (q *qLogFile) Close() error {
return q.file.Close() return q.file.Close()
} }
// readNextLine reads the next line from the specified position // readNextLine reads the next line from the specified position. This line
// this line actually have to END on that position. // actually have to END on that position.
// //
// the algorithm is: // The algorithm is:
// 1. check if we have the buffer initialized // 1. Check if we have the buffer initialized.
// 2. if it is, scan it and look for the line there // 2. If it is so, scan it and look for the line there.
// 3. if we cannot find the line there, read the prev chunk into the buffer // 3. If we cannot find the line there, read the prev chunk into the buffer.
// 4. read the line from the buffer // 4. Read the line from the buffer.
func (q *QLogFile) readNextLine(position int64) (string, int64, error) { func (q *qLogFile) readNextLine(position int64) (string, int64, error) {
relativePos := position - q.bufferStart relativePos := position - q.bufferStart
if q.buffer == nil || (relativePos < maxEntrySize && q.bufferStart != 0) { if q.buffer == nil || (relativePos < maxEntrySize && q.bufferStart != 0) {
// Time to re-init the buffer // Time to re-init the buffer.
err := q.initBuffer(position) err := q.initBuffer(position)
if err != nil { if err != nil {
return "", 0, err return "", 0, err
@ -233,8 +274,7 @@ func (q *QLogFile) readNextLine(position int64) (string, int64, error) {
relativePos = position - q.bufferStart relativePos = position - q.bufferStart
} }
// Look for the end of the prev line // Look for the end of the prev line, this is where we'll read from.
// This is where we'll read from
startLine := int64(0) startLine := int64(0)
for i := relativePos - 1; i >= 0; i-- { for i := relativePos - 1; i >= 0; i-- {
if q.buffer[i] == '\n' { if q.buffer[i] == '\n' {
@ -245,18 +285,19 @@ func (q *QLogFile) readNextLine(position int64) (string, int64, error) {
line := string(q.buffer[startLine:relativePos]) line := string(q.buffer[startLine:relativePos])
lineIdx := q.bufferStart + startLine lineIdx := q.bufferStart + startLine
return line, lineIdx, nil return line, lineIdx, nil
} }
// initBuffer initializes the QLogFile buffer. // initBuffer initializes the qLogFile buffer. The goal is to read a chunk of
// the goal is to read a chunk of file that includes the line with the specified position. // file that includes the line with the specified position.
func (q *QLogFile) initBuffer(position int64) error { func (q *qLogFile) initBuffer(position int64) error {
q.bufferStart = int64(0) q.bufferStart = int64(0)
if position > bufferSize { if position > bufferSize {
q.bufferStart = position - bufferSize q.bufferStart = position - bufferSize
} }
// Seek to this position // Seek to this position.
_, err := q.file.Seek(q.bufferStart, io.SeekStart) _, err := q.file.Seek(q.bufferStart, io.SeekStart)
if err != nil { if err != nil {
return err return err
@ -271,34 +312,35 @@ func (q *QLogFile) initBuffer(position int64) error {
return err return err
} }
// readProbeLine reads a line that includes the specified position // readProbeLine reads a line that includes the specified position. This
// this method is supposed to be used when we use binary search in the Seek method // method is supposed to be used when we use binary search in the Seek method.
// in the case of consecutive reads, use readNext (it uses a better buffer) // In the case of consecutive reads, use readNext, cause it uses better buffer.
func (q *QLogFile) readProbeLine(position int64) (string, int64, int64, error) { func (q *qLogFile) readProbeLine(position int64) (string, int64, int64, error) {
// First of all, we should read a buffer that will include the query log line // First of all, we should read a buffer that will include the query log
// In order to do this, we'll define the boundaries // line. In order to do this, we'll define the boundaries.
seekPosition := int64(0) seekPosition := int64(0)
relativePos := position // position relative to the buffer we're going to read // Position relative to the buffer we're going to read.
relativePos := position
if position > maxEntrySize { if position > maxEntrySize {
seekPosition = position - maxEntrySize seekPosition = position - maxEntrySize
relativePos = maxEntrySize relativePos = maxEntrySize
} }
// Seek to this position // Seek to this position.
_, err := q.file.Seek(seekPosition, io.SeekStart) _, err := q.file.Seek(seekPosition, io.SeekStart)
if err != nil { if err != nil {
return "", 0, 0, err return "", 0, 0, err
} }
// The buffer size is 2*maxEntrySize // The buffer size is 2*maxEntrySize.
buffer := make([]byte, maxEntrySize*2) buffer := make([]byte, maxEntrySize*2)
bufferLen, err := q.file.Read(buffer) bufferLen, err := q.file.Read(buffer)
if err != nil { if err != nil {
return "", 0, 0, err return "", 0, 0, err
} }
// Now start looking for the new line character starting // Now start looking for the new line character starting from the
// from the relativePos and going left // relativePos and going left.
startLine := int64(0) startLine := int64(0)
for i := relativePos - 1; i >= 0; i-- { for i := relativePos - 1; i >= 0; i-- {
if buffer[i] == '\n' { if buffer[i] == '\n' {
@ -306,7 +348,7 @@ func (q *QLogFile) readProbeLine(position int64) (string, int64, int64, error) {
break break
} }
} }
// Looking for the end of line now // Looking for the end of line now.
endLine := int64(bufferLen) endLine := int64(bufferLen)
lineEndIdx := endLine + seekPosition lineEndIdx := endLine + seekPosition
for i := relativePos; i < int64(bufferLen); i++ { for i := relativePos; i < int64(bufferLen); i++ {
@ -317,13 +359,13 @@ func (q *QLogFile) readProbeLine(position int64) (string, int64, int64, error) {
} }
} }
// Finally we can return the string we were looking for // Finally we can return the string we were looking for.
lineIdx := startLine + seekPosition lineIdx := startLine + seekPosition
return string(buffer[startLine:endLine]), lineIdx, lineEndIdx, nil return string(buffer[startLine:endLine]), lineIdx, lineEndIdx, nil
} }
// readJSONvalue reads a JSON string in form of '"key":"value"'. prefix must be // readJSONValue reads a JSON string in form of '"key":"value"'. prefix must
// of the form '"key":"' to generate less garbage. // be of the form '"key":"' to generate less garbage.
func readJSONValue(s, prefix string) string { func readJSONValue(s, prefix string) string {
i := strings.Index(s, prefix) i := strings.Index(s, prefix)
if i == -1 { if i == -1 {
@ -340,7 +382,7 @@ func readJSONValue(s, prefix string) string {
return s[start:end] return s[start:end]
} }
// readQLogTimestamp reads the timestamp field from the query log line // readQLogTimestamp reads the timestamp field from the query log line.
func readQLogTimestamp(str string) int64 { func readQLogTimestamp(str string) int64 {
val := readJSONValue(str, `"T":"`) val := readJSONValue(str, `"T":"`)
if len(val) == 0 { if len(val) == 0 {
@ -351,10 +393,12 @@ func readQLogTimestamp(str string) int64 {
log.Error("Couldn't find timestamp: %s", str) log.Error("Couldn't find timestamp: %s", str)
return 0 return 0
} }
tm, err := time.Parse(time.RFC3339Nano, val) tm, err := time.Parse(time.RFC3339Nano, val)
if err != nil { if err != nil {
log.Error("Couldn't parse timestamp: %s", val) log.Error("Couldn't parse timestamp: %s", val)
return 0 return 0
} }
return tm.UnixNano() return tm.UnixNano()
} }

View File

@ -72,15 +72,15 @@ func prepareTestFiles(t *testing.T, filesNum, linesNum int) []string {
return files return files
} }
// newTestQLogFile creates new *QLogFile for tests and registers the required // newTestQLogFile creates new *qLogFile for tests and registers the required
// cleanup functions. // cleanup functions.
func newTestQLogFile(t *testing.T, linesNum int) (file *QLogFile) { func newTestQLogFile(t *testing.T, linesNum int) (file *qLogFile) {
t.Helper() t.Helper()
testFile := prepareTestFiles(t, 1, linesNum)[0] testFile := prepareTestFiles(t, 1, linesNum)[0]
// Create the new QLogFile instance. // Create the new qLogFile instance.
file, err := NewQLogFile(testFile) file, err := newQLogFile(testFile)
require.NoError(t, err) require.NoError(t, err)
assert.NotNil(t, file) assert.NotNil(t, file)
@ -240,7 +240,7 @@ func TestQLogFile_SeekTS_bad(t *testing.T) {
} }
} }
func getQLogFileLine(q *QLogFile, lineNumber int) (line string, err error) { func getQLogFileLine(q *qLogFile, lineNumber int) (line string, err error) {
if _, err = q.SeekStart(); err != nil { if _, err = q.SeekStart(); err != nil {
return line, err return line, err
} }
@ -256,7 +256,7 @@ func getQLogFileLine(q *QLogFile, lineNumber int) (line string, err error) {
// Check adding and loading (with filtering) entries from disk and memory. // Check adding and loading (with filtering) entries from disk and memory.
func TestQLogFile(t *testing.T) { func TestQLogFile(t *testing.T) {
// Create the new QLogFile instance. // Create the new qLogFile instance.
q := newTestQLogFile(t, 2) q := newTestQLogFile(t, 2)
// Seek to the start. // Seek to the start.
@ -285,7 +285,7 @@ func TestQLogFile(t *testing.T) {
assert.Empty(t, line) assert.Empty(t, line)
} }
func NewTestQLogFileData(t *testing.T, data string) (file *QLogFile) { func newTestQLogFileData(t *testing.T, data string) (file *qLogFile) {
f, err := os.CreateTemp(t.TempDir(), "*.txt") f, err := os.CreateTemp(t.TempDir(), "*.txt")
require.NoError(t, err) require.NoError(t, err)
testutil.CleanupAndRequireSuccess(t, f.Close) testutil.CleanupAndRequireSuccess(t, f.Close)
@ -293,7 +293,7 @@ func NewTestQLogFileData(t *testing.T, data string) (file *QLogFile) {
_, err = f.WriteString(data) _, err = f.WriteString(data)
require.NoError(t, err) require.NoError(t, err)
file, err = NewQLogFile(f.Name()) file, err = newQLogFile(f.Name())
require.NoError(t, err) require.NoError(t, err)
testutil.CleanupAndRequireSuccess(t, file.Close) testutil.CleanupAndRequireSuccess(t, file.Close)
@ -309,9 +309,9 @@ func TestQLog_Seek(t *testing.T) {
timestamp, _ := time.Parse(time.RFC3339Nano, "2020-08-31T18:44:25.376690873+03:00") timestamp, _ := time.Parse(time.RFC3339Nano, "2020-08-31T18:44:25.376690873+03:00")
testCases := []struct { testCases := []struct {
wantErr error
name string name string
delta int delta int
wantErr error
wantDepth int wantDepth int
}{{ }{{
name: "ok", name: "ok",
@ -321,12 +321,12 @@ func TestQLog_Seek(t *testing.T) {
}, { }, {
name: "too_late", name: "too_late",
delta: 2, delta: 2,
wantErr: ErrTSTooLate, wantErr: errTSTooLate,
wantDepth: 2, wantDepth: 2,
}, { }, {
name: "too_early", name: "too_early",
delta: -2, delta: -2,
wantErr: ErrTSTooEarly, wantErr: errTSTooEarly,
wantDepth: 1, wantDepth: 1,
}} }}
@ -338,7 +338,7 @@ func TestQLog_Seek(t *testing.T) {
timestamp.Add(time.Second).Format(time.RFC3339Nano), timestamp.Add(time.Second).Format(time.RFC3339Nano),
) )
q := NewTestQLogFileData(t, data) q := newTestQLogFileData(t, data)
_, depth, err := q.seekTS(timestamp.Add(time.Second * time.Duration(tc.delta)).UnixNano()) _, depth, err := q.seekTS(timestamp.Add(time.Second * time.Duration(tc.delta)).UnixNano())
require.Truef(t, errors.Is(err, tc.wantErr), "%v", err) require.Truef(t, errors.Is(err, tc.wantErr), "%v", err)

View File

@ -9,36 +9,36 @@ import (
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
) )
// QLogReader allows reading from multiple query log files in the reverse order. // qLogReader allows reading from multiple query log files in the reverse
// order.
// //
// Please note that this is a stateful object. // Please note that this is a stateful object. Internally, it contains a
// Internally, it contains a pointer to a particular query log file, and // pointer to a particular query log file, and to a specific position in this
// to a specific position in this file, and it reads lines in reverse order // file, and it reads lines in reverse order starting from that position.
// starting from that position. type qLogReader struct {
type QLogReader struct { // qFiles is an array with the query log files. The order is from oldest
// qFiles - array with the query log files // to newest.
// The order is - from oldest to newest qFiles []*qLogFile
qFiles []*QLogFile
currentFile int // Index of the current file // currentFile is the index of the current file.
currentFile int
} }
// NewQLogReader initializes a QLogReader instance // newQLogReader initializes a qLogReader instance with the specified files.
// with the specified files func newQLogReader(files []string) (*qLogReader, error) {
func NewQLogReader(files []string) (*QLogReader, error) { qFiles := make([]*qLogFile, 0)
qFiles := make([]*QLogFile, 0)
for _, f := range files { for _, f := range files {
q, err := NewQLogFile(f) q, err := newQLogFile(f)
if err != nil { if err != nil {
if errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) {
continue continue
} }
// Close what we've already opened. // Close what we've already opened.
cerr := closeQFiles(qFiles) cErr := closeQFiles(qFiles)
if cerr != nil { if cErr != nil {
log.Debug("querylog: closing files: %s", cerr) log.Debug("querylog: closing files: %s", cErr)
} }
return nil, err return nil, err
@ -47,31 +47,28 @@ func NewQLogReader(files []string) (*QLogReader, error) {
qFiles = append(qFiles, q) qFiles = append(qFiles, q)
} }
return &QLogReader{ return &qLogReader{qFiles: qFiles, currentFile: len(qFiles) - 1}, nil
qFiles: qFiles,
currentFile: (len(qFiles) - 1),
}, nil
} }
// seekTS performs binary search of a query log record with the specified // seekTS performs binary search of a query log record with the specified
// timestamp. If the record is found, it sets QLogReader's position to point to // timestamp. If the record is found, it sets qLogReader's position to point
// that line, so that the next ReadNext call returned this line. // to that line, so that the next ReadNext call returned this line.
func (r *QLogReader) seekTS(timestamp int64) (err error) { func (r *qLogReader) seekTS(timestamp int64) (err error) {
for i := len(r.qFiles) - 1; i >= 0; i-- { for i := len(r.qFiles) - 1; i >= 0; i-- {
q := r.qFiles[i] q := r.qFiles[i]
_, _, err = q.seekTS(timestamp) _, _, err = q.seekTS(timestamp)
if err != nil { if err != nil {
if errors.Is(err, ErrTSTooEarly) { if errors.Is(err, errTSTooEarly) {
// Look at the next file, since we've reached the end of this // Look at the next file, since we've reached the end of this
// one. If there is no next file, it's not found. // one. If there is no next file, it's not found.
err = ErrTSNotFound err = errTSNotFound
continue continue
} else if errors.Is(err, ErrTSTooLate) { } else if errors.Is(err, errTSTooLate) {
// Just seek to the start then. timestamp is probably between // Just seek to the start then. timestamp is probably between
// the end of the previous one and the start of this one. // the end of the previous one and the start of this one.
return r.SeekStart() return r.SeekStart()
} else if errors.Is(err, ErrTSNotFound) { } else if errors.Is(err, errTSNotFound) {
return err return err
} else { } else {
return fmt.Errorf("seekts: file at index %d: %w", i, err) return fmt.Errorf("seekts: file at index %d: %w", i, err)
@ -80,7 +77,7 @@ func (r *QLogReader) seekTS(timestamp int64) (err error) {
// The search is finished, and the searched element has been found. // The search is finished, and the searched element has been found.
// Update currentFile only, position is already set properly in // Update currentFile only, position is already set properly in
// QLogFile. // qLogFile.
r.currentFile = i r.currentFile = i
return nil return nil
@ -93,13 +90,13 @@ func (r *QLogReader) seekTS(timestamp int64) (err error) {
return nil return nil
} }
// SeekStart changes the current position to the end of the newest file // SeekStart changes the current position to the end of the newest file.
// Please note that we're reading query log in the reverse order // Please note that we're reading query log in the reverse order and that's why
// and that's why log start is actually the end of file // the log starts actually at the end of file.
// //
// Returns nil if we were able to change the current position. // Returns nil if we were able to change the current position. Returns error
// Returns error in any other case. // in any other cases.
func (r *QLogReader) SeekStart() error { func (r *qLogReader) SeekStart() error {
if len(r.qFiles) == 0 { if len(r.qFiles) == 0 {
return nil return nil
} }
@ -110,10 +107,12 @@ func (r *QLogReader) SeekStart() error {
return err return err
} }
// ReadNext reads the next line (in the reverse order) from the query log files. // ReadNext reads the next line (in the reverse order) from the query log
// and shifts the current position left to the next (actually prev) line (or the next file). // files. Then shifts the current position left to the next (actually prev)
// returns io.EOF if there's nothing to read more. // line (or the next file).
func (r *QLogReader) ReadNext() (string, error) { //
// Returns io.EOF if there is nothing more to read.
func (r *qLogReader) ReadNext() (string, error) {
if len(r.qFiles) == 0 { if len(r.qFiles) == 0 {
return "", io.EOF return "", io.EOF
} }
@ -122,7 +121,7 @@ func (r *QLogReader) ReadNext() (string, error) {
q := r.qFiles[r.currentFile] q := r.qFiles[r.currentFile]
line, err := q.ReadNext() line, err := q.ReadNext()
if err != nil { if err != nil {
// Shift to the older file // Shift to the older file.
r.currentFile-- r.currentFile--
if r.currentFile < 0 { if r.currentFile < 0 {
break break
@ -130,10 +129,10 @@ func (r *QLogReader) ReadNext() (string, error) {
q = r.qFiles[r.currentFile] q = r.qFiles[r.currentFile]
// Set it's position to the start right away // Set its position to the start right away.
_, err = q.SeekStart() _, err = q.SeekStart()
// This is unexpected, return an error right away // This is unexpected, return an error right away.
if err != nil { if err != nil {
return "", err return "", err
} }
@ -142,17 +141,17 @@ func (r *QLogReader) ReadNext() (string, error) {
} }
} }
// Nothing to read anymore // Nothing to read anymore.
return "", io.EOF return "", io.EOF
} }
// Close closes the QLogReader // Close closes the qLogReader.
func (r *QLogReader) Close() error { func (r *qLogReader) Close() error {
return closeQFiles(r.qFiles) return closeQFiles(r.qFiles)
} }
// closeQFiles - helper method to close multiple QLogFile instances // closeQFiles is a helper method to close multiple qLogFile instances.
func closeQFiles(qFiles []*QLogFile) error { func closeQFiles(qFiles []*qLogFile) error {
var errs []error var errs []error
for _, q := range qFiles { for _, q := range qFiles {
@ -163,7 +162,7 @@ func closeQFiles(qFiles []*QLogFile) error {
} }
if len(errs) > 0 { if len(errs) > 0 {
return errors.List("error while closing QLogReader", errs...) return errors.List("error while closing qLogReader", errs...)
} }
return nil return nil

View File

@ -10,15 +10,15 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// newTestQLogReader creates new *QLogReader for tests and registers the // newTestQLogReader creates new *qLogReader for tests and registers the
// required cleanup functions. // required cleanup functions.
func newTestQLogReader(t *testing.T, filesNum, linesNum int) (reader *QLogReader) { func newTestQLogReader(t *testing.T, filesNum, linesNum int) (reader *qLogReader) {
t.Helper() t.Helper()
testFiles := prepareTestFiles(t, filesNum, linesNum) testFiles := prepareTestFiles(t, filesNum, linesNum)
// Create the new QLogReader instance. // Create the new qLogReader instance.
reader, err := NewQLogReader(testFiles) reader, err := newQLogReader(testFiles)
require.NoError(t, err) require.NoError(t, err)
assert.NotNil(t, reader) assert.NotNil(t, reader)
@ -75,9 +75,9 @@ func TestQLogReader_Seek(t *testing.T) {
r := newTestQLogReader(t, 2, 10000) r := newTestQLogReader(t, 2, 10000)
testCases := []struct { testCases := []struct {
want error
name string name string
time string time string
want error
}{{ }{{
name: "not_too_old", name: "not_too_old",
time: "2020-02-18T22:39:35.920973+03:00", time: "2020-02-18T22:39:35.920973+03:00",
@ -97,7 +97,7 @@ func TestQLogReader_Seek(t *testing.T) {
}, { }, {
name: "non-existent_long_ago", name: "non-existent_long_ago",
time: "2000-02-19T01:23:16.920973+03:00", time: "2000-02-19T01:23:16.920973+03:00",
want: ErrTSNotFound, want: errTSNotFound,
}, { }, {
name: "non-existent_far_ahead", name: "non-existent_far_ahead",
time: "2100-02-19T01:23:16.920973+03:00", time: "2100-02-19T01:23:16.920973+03:00",
@ -105,7 +105,7 @@ func TestQLogReader_Seek(t *testing.T) {
}, { }, {
name: "non-existent_but_could", name: "non-existent_but_could",
time: "2020-02-18T22:36:37.000000+03:00", time: "2020-02-18T22:36:37.000000+03:00",
want: ErrTSNotFound, want: errTSNotFound,
}} }}
for _, tc := range testCases { for _, tc := range testCases {
@ -125,9 +125,9 @@ func TestQLogReader_ReadNext(t *testing.T) {
r := newTestQLogReader(t, filesNum, linesNum) r := newTestQLogReader(t, filesNum, linesNum)
testCases := []struct { testCases := []struct {
want error
name string name string
start int start int
want error
}{{ }{{
name: "ok", name: "ok",
start: 0, start: 0,

View File

@ -1,9 +1,11 @@
package querylog package querylog
import ( import (
"fmt"
"io" "io"
"time" "time"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
) )
@ -134,84 +136,112 @@ func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest tim
return entries, oldest return entries, oldest
} }
// searchFiles looks up log records from all log files. It optionally uses the // seekRecord changes the current position to the next record older than the
// client cache, if provided. searchFiles does not scan more than // provided parameter.
// maxFileScanEntries so callers may need to call it several times to get all func (r *qLogReader) seekRecord(olderThan time.Time) (err error) {
// results. oldest and total are the time of the oldest processed entry and the if olderThan.IsZero() {
// total number of processed entries, including discarded ones, correspondingly. return r.SeekStart()
func (l *queryLog) searchFiles( }
params *searchParams,
cache clientCache, err = r.seekTS(olderThan.UnixNano())
) (entries []*logEntry, oldest time.Time, total int) { if err == nil {
// Read to the next record, because we only need the one that goes
// after it.
_, err = r.ReadNext()
}
return err
}
// setQLogReader creates a reader with the specified files and sets the
// position to the next record older than the provided parameter.
func (l *queryLog) setQLogReader(olderThan time.Time) (qr *qLogReader, err error) {
files := []string{ files := []string{
l.logFile + ".1", l.logFile + ".1",
l.logFile, l.logFile,
} }
r, err := NewQLogReader(files) r, err := newQLogReader(files)
if err != nil { if err != nil {
log.Error("querylog: opening qlog reader: %s", err) return nil, fmt.Errorf("opening qlog reader: %s", err)
return entries, oldest, 0
}
defer func() {
closeErr := r.Close()
if closeErr != nil {
log.Error("querylog: closing file: %s", err)
}
}()
if params.olderThan.IsZero() {
err = r.SeekStart()
} else {
err = r.seekTS(params.olderThan.UnixNano())
if err == nil {
// Read to the next record, because we only need the one that goes
// after it.
_, err = r.ReadNext()
}
} }
err = r.seekRecord(olderThan)
if err != nil { if err != nil {
log.Debug("querylog: cannot seek to %s: %s", params.olderThan, err) defer func() { err = errors.WithDeferred(err, r.Close()) }()
log.Debug("querylog: cannot seek to %s: %s", olderThan, err)
return entries, oldest, 0 return nil, nil
} }
totalLimit := params.offset + params.limit return r, nil
oldestNano := int64(0) }
// By default, we do not scan more than maxFileScanEntries at once. The // readEntries reads entries from the reader to totalLimit. By default, we do
// idea is to make search calls faster so that the UI could handle it and // not scan more than maxFileScanEntries at once. The idea is to make search
// show something quicker. This behavior can be overridden if // calls faster so that the UI could handle it and show something quicker.
// maxFileScanEntries is set to 0. // This behavior can be overridden if maxFileScanEntries is set to 0.
func (l *queryLog) readEntries(
r *qLogReader,
params *searchParams,
cache clientCache,
totalLimit int,
) (entries []*logEntry, oldestNano int64, total int) {
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 { for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
var e *logEntry ent, ts, rErr := l.readNextEntry(r, params, cache)
var ts int64 if rErr != nil {
if rErr == io.EOF {
e, ts, err = l.readNextEntry(r, params, cache)
if err != nil {
if err == io.EOF {
oldestNano = 0 oldestNano = 0
break break
} }
log.Error("querylog: reading next entry: %s", err) log.Error("querylog: reading next entry: %s", rErr)
} }
oldestNano = ts oldestNano = ts
total++ total++
if e != nil { if ent == nil {
entries = append(entries, e) continue
if len(entries) == totalLimit { }
break
} entries = append(entries, ent)
if len(entries) == totalLimit {
break
} }
} }
return entries, oldestNano, total
}
// searchFiles looks up log records from all log files. It optionally uses the
// client cache, if provided. searchFiles does not scan more than
// maxFileScanEntries so callers may need to call it several times to get all
// the results. oldest and total are the time of the oldest processed entry
// and the total number of processed entries, including discarded ones,
// correspondingly.
func (l *queryLog) searchFiles(
params *searchParams,
cache clientCache,
) (entries []*logEntry, oldest time.Time, total int) {
r, err := l.setQLogReader(params.olderThan)
if err != nil {
log.Error("querylog: %s", err)
}
if r == nil {
return entries, oldest, 0
}
defer func() {
if closeErr := r.Close(); closeErr != nil {
log.Error("querylog: closing file: %s", closeErr)
}
}()
totalLimit := params.offset + params.limit
entries, oldestNano, total := l.readEntries(r, params, cache, totalLimit)
if oldestNano != 0 { if oldestNano != 0 {
oldest = time.Unix(0, oldestNano) oldest = time.Unix(0, oldestNano)
} }
@ -243,11 +273,11 @@ func (f quickMatchClientFinder) findClient(clientID, ip string) (c *Client) {
} }
// readNextEntry reads the next log entry and checks if it matches the search // readNextEntry reads the next log entry and checks if it matches the search
// criteria. It optionally uses the client cache, if provided. e is nil if the // criteria. It optionally uses the client cache, if provided. e is nil if
// entry doesn't match the search criteria. ts is the timestamp of the // the entry doesn't match the search criteria. ts is the timestamp of the
// processed entry. // processed entry.
func (l *queryLog) readNextEntry( func (l *queryLog) readNextEntry(
r *QLogReader, r *qLogReader,
params *searchParams, params *searchParams,
cache clientCache, cache clientCache,
) (e *logEntry, ts int64, err error) { ) (e *logEntry, ts int64, err error) {

View File

@ -2,18 +2,25 @@ package querylog
import "time" import "time"
// searchParams represent the search query sent by the client // searchParams represent the search query sent by the client.
type searchParams struct { type searchParams struct {
// searchCriteria - list of search criteria that we use to get filter results // olderThen represents a parameter for entries that are older than this
searchCriteria []searchCriterion // parameter value. If not set, disregard it and return any value.
// olderThen - return entries that are older than this value
// if not set - disregard it and return any value
olderThan time.Time olderThan time.Time
offset int // offset for the search // searchCriteria is a list of search criteria that we use to get filter
limit int // limit the number of records returned // results.
maxFileScanEntries int // maximum log entries to scan in query log files. if 0 - no limit searchCriteria []searchCriterion
// offset for the search.
offset int
// limit the number of records returned.
limit int
// maxFileScanEntries is a maximum of log entries to scan in query log
// files. If not set, then no limit.
maxFileScanEntries int
} }
// newSearchParams - creates an empty instance of searchParams // newSearchParams - creates an empty instance of searchParams

View File

@ -161,7 +161,6 @@ run_linter "$GO" vet ./...
run_linter govulncheck ./... run_linter govulncheck ./...
# Apply more lax standards to the code we haven't properly refactored yet. # Apply more lax standards to the code we haven't properly refactored yet.
run_linter gocyclo --over 13 ./internal/querylog
run_linter gocyclo --over 12 ./internal/dhcpd run_linter gocyclo --over 12 ./internal/dhcpd
# Apply the normal standards to new or somewhat refactored code. # Apply the normal standards to new or somewhat refactored code.
@ -173,10 +172,11 @@ run_linter gocyclo --over 10\
./internal/dnsforward/\ ./internal/dnsforward/\
./internal/filtering/\ ./internal/filtering/\
./internal/home/\ ./internal/home/\
./internal/next/\
./internal/querylog/\
./internal/stats/\ ./internal/stats/\
./internal/tools/\ ./internal/tools/\
./internal/updater/\ ./internal/updater/\
./internal/next/\
./internal/version/\ ./internal/version/\
./scripts/blocked-services/\ ./scripts/blocked-services/\
./scripts/vetted-filters/\ ./scripts/vetted-filters/\