Pull request: querylog imp code
Merge in DNS/adguard-home from querylog-imp-code to master Squashed commit of the following: commit a58ad36508a2355b686d314dec51ac0b5e357281 Merge: df5494f2c941eb1dd7
Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Wed May 24 15:26:55 2023 +0300 Merge remote-tracking branch 'origin/master' into querylog-imp-code commit df5494f2c337736690a3c2a547c2d71858d0378f Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Wed May 24 15:24:43 2023 +0300 querylog: imp code commit 8c3c2b76dd5858e7b107f222c112e9cde2477fb3 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Wed May 24 12:14:15 2023 +0300 all: lint script commit be04a4decfaf20a1649d32ecaab3c1c6bb205ffd Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Wed May 24 12:03:12 2023 +0300 querylog: imp code commit fe7beacff3a5cfcf2332c4998b9c65820284eaf7 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Wed May 24 11:57:33 2023 +0300 querylog: imp docs commit 2ae239c57d12524fbc092f582842af2ad726c1d0 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Wed May 24 11:46:54 2023 +0300 querylog: imp code commit 417216cefbf154fa870f8f43468f35e0e345971f Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Wed May 24 11:25:44 2023 +0300 querylog: imp code commit 514b6ee99113844a4e0dad30dc53703e3220c289 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Wed May 24 11:14:13 2023 +0300 querylog: imp docs commit 321351a3abb524208daacd5a3a7fbf5f07ab259d Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Mon May 22 16:38:31 2023 +0300 querylog: imp code commit ee91de5c43210b5bc213f933d411adb894d2e586 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Mon May 22 16:01:32 2023 +0300 querylog: imp code commit 862ff12177fb769d5cb2ec250eaee538dc91d70a Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Mon May 22 15:07:24 2023 +0300 querylog: imp code commit cc62c1c4ae8b813d03ccf51b596ba1ebf44d9a1f Merge: 37ace34e924b41100c
Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Mon May 22 13:09:10 2023 +0300 Merge remote-tracking branch 'origin/master' into querylog-imp-code commit 37ace34e91e5189bef6e774db960f40cdaa18270 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Mon May 22 11:23:08 2023 +0300 querylog: imp code commit 8417815a6349f10b5dbad410ce28aab98bc479fa Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Mon May 22 11:08:29 2023 +0300 querylog: imp docs commit 4e5cde74d25713f78675aa3e18083b4fb5e619f3 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Fri May 19 16:41:34 2023 +0300 querylog: imp code commit 3494eab7006240f652a0217d305ac916bd6c3c83 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Fri May 19 16:13:08 2023 +0300 all: lint script commit 704534ce6278e7d9b1bef30a3acc4e59f25693bc Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Fri May 19 16:12:04 2023 +0300 querylog: imp code commit 48510102a2fa5187f78067d2b9157dac62f8bb56 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Fri May 19 15:52:57 2023 +0300 querylog: imp code commit 89c273aea0e6758eb749a2d3bbaf1bc385a57797 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Fri May 19 15:40:50 2023 +0300 querylog: imp code commit 0057fe64553ad38de0fda10efb9d3512c9a00e45 Author: Dimitry Kolyshev <dkolyshev@adguard.com> Date: Fri May 19 13:54:46 2023 +0300 querylog: imp code ... and 1 more commit
This commit is contained in:
parent
941eb1dd73
commit
cbc7985e75
|
@ -3,19 +3,24 @@ package querylog
|
||||||
import (
|
import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/AdguardTeam/AdGuardHome/internal/filtering"
|
"github.com/AdguardTeam/AdGuardHome/internal/filtering"
|
||||||
|
"github.com/AdguardTeam/golibs/errors"
|
||||||
"github.com/AdguardTeam/golibs/log"
|
"github.com/AdguardTeam/golibs/log"
|
||||||
"github.com/AdguardTeam/urlfilter/rules"
|
"github.com/AdguardTeam/urlfilter/rules"
|
||||||
"github.com/miekg/dns"
|
"github.com/miekg/dns"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// logEntryHandler represents a handler for decoding json token to the logEntry
|
||||||
|
// struct.
|
||||||
type logEntryHandler func(t json.Token, ent *logEntry) error
|
type logEntryHandler func(t json.Token, ent *logEntry) error
|
||||||
|
|
||||||
|
// logEntryHandlers is the map of log entry decode handlers for various keys.
|
||||||
var logEntryHandlers = map[string]logEntryHandler{
|
var logEntryHandlers = map[string]logEntryHandler{
|
||||||
"CID": func(t json.Token, ent *logEntry) error {
|
"CID": func(t json.Token, ent *logEntry) error {
|
||||||
v, ok := t.(string)
|
v, ok := t.(string)
|
||||||
|
@ -166,6 +171,7 @@ var logEntryHandlers = map[string]logEntryHandler{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decodeResultRuleKey decodes the token of "Rules" type to logEntry struct.
|
||||||
func decodeResultRuleKey(key string, i int, dec *json.Decoder, ent *logEntry) {
|
func decodeResultRuleKey(key string, i int, dec *json.Decoder, ent *logEntry) {
|
||||||
var vToken json.Token
|
var vToken json.Token
|
||||||
switch key {
|
switch key {
|
||||||
|
@ -189,6 +195,8 @@ func decodeResultRuleKey(key string, i int, dec *json.Decoder, ent *logEntry) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decodeVTokenAndAddRule decodes the "Rules" toke as [filtering.ResultRule]
|
||||||
|
// and then adds the decoded object to the slice of result rules.
|
||||||
func decodeVTokenAndAddRule(
|
func decodeVTokenAndAddRule(
|
||||||
key string,
|
key string,
|
||||||
i int,
|
i int,
|
||||||
|
@ -213,6 +221,8 @@ func decodeVTokenAndAddRule(
|
||||||
return newRules, vToken
|
return newRules, vToken
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decodeResultRules parses the dec's tokens into logEntry ent interpreting it
|
||||||
|
// as a slice of the result rules.
|
||||||
func decodeResultRules(dec *json.Decoder, ent *logEntry) {
|
func decodeResultRules(dec *json.Decoder, ent *logEntry) {
|
||||||
for {
|
for {
|
||||||
delimToken, err := dec.Token()
|
delimToken, err := dec.Token()
|
||||||
|
@ -224,48 +234,53 @@ func decodeResultRules(dec *json.Decoder, ent *logEntry) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if d, ok := delimToken.(json.Delim); ok {
|
if d, ok := delimToken.(json.Delim); !ok {
|
||||||
if d != '[' {
|
return
|
||||||
log.Debug("decodeResultRules: unexpected delim %q", d)
|
} else if d != '[' {
|
||||||
|
log.Debug("decodeResultRules: unexpected delim %q", d)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = decodeResultRuleToken(dec, ent)
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF && !errors.Is(err, ErrEndOfToken) {
|
||||||
|
log.Debug("decodeResultRules err: %s", err)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
i := 0
|
// decodeResultRuleToken decodes the tokens of "Rules" type to the logEntry ent.
|
||||||
for {
|
func decodeResultRuleToken(dec *json.Decoder, ent *logEntry) (err error) {
|
||||||
var keyToken json.Token
|
i := 0
|
||||||
keyToken, err = dec.Token()
|
for {
|
||||||
if err != nil {
|
var keyToken json.Token
|
||||||
if err != io.EOF {
|
keyToken, err = dec.Token()
|
||||||
log.Debug("decodeResultRules err: %s", err)
|
if err != nil {
|
||||||
}
|
// Don't wrap the error, because it's informative enough as is.
|
||||||
|
return err
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if d, ok := keyToken.(json.Delim); ok {
|
|
||||||
switch d {
|
|
||||||
case '}':
|
|
||||||
i++
|
|
||||||
case ']':
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
// Go on.
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
key, ok := keyToken.(string)
|
|
||||||
if !ok {
|
|
||||||
log.Debug("decodeResultRules: keyToken is %T (%[1]v) and not string", keyToken)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
decodeResultRuleKey(key, i, dec, ent)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if d, ok := keyToken.(json.Delim); ok {
|
||||||
|
switch d {
|
||||||
|
case '}':
|
||||||
|
i++
|
||||||
|
case ']':
|
||||||
|
return ErrEndOfToken
|
||||||
|
default:
|
||||||
|
// Go on.
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
key, ok := keyToken.(string)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("keyToken is %T (%[1]v) and not string", keyToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
decodeResultRuleKey(key, i, dec, ent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -322,6 +337,8 @@ func decodeResultReverseHosts(dec *json.Decoder, ent *logEntry) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decodeResultIPList parses the dec's tokens into logEntry ent interpreting it
|
||||||
|
// as the result IP addresses list.
|
||||||
func decodeResultIPList(dec *json.Decoder, ent *logEntry) {
|
func decodeResultIPList(dec *json.Decoder, ent *logEntry) {
|
||||||
for {
|
for {
|
||||||
itemToken, err := dec.Token()
|
itemToken, err := dec.Token()
|
||||||
|
@ -355,6 +372,8 @@ func decodeResultIPList(dec *json.Decoder, ent *logEntry) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decodeResultDNSRewriteResultKey decodes the token of "DNSRewriteResult" type
|
||||||
|
// to the logEntry struct.
|
||||||
func decodeResultDNSRewriteResultKey(key string, dec *json.Decoder, ent *logEntry) {
|
func decodeResultDNSRewriteResultKey(key string, dec *json.Decoder, ent *logEntry) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -395,50 +414,29 @@ func decodeResultDNSRewriteResultKey(key string, dec *json.Decoder, ent *logEntr
|
||||||
log.Debug("decodeResultDNSRewriteResultKey response err: %s", err)
|
log.Debug("decodeResultDNSRewriteResultKey response err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for rrType, rrValues := range ent.Result.DNSRewriteResult.Response {
|
ent.parseDNSRewriteResultIPs()
|
||||||
switch rrType {
|
|
||||||
case
|
|
||||||
dns.TypeA,
|
|
||||||
dns.TypeAAAA:
|
|
||||||
for i, v := range rrValues {
|
|
||||||
s, _ := v.(string)
|
|
||||||
rrValues[i] = net.ParseIP(s)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
// Go on.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
// Go on.
|
// Go on.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decodeResultDNSRewriteResult parses the dec's tokens into logEntry ent
|
||||||
|
// interpreting it as the result DNSRewriteResult.
|
||||||
func decodeResultDNSRewriteResult(dec *json.Decoder, ent *logEntry) {
|
func decodeResultDNSRewriteResult(dec *json.Decoder, ent *logEntry) {
|
||||||
for {
|
for {
|
||||||
keyToken, err := dec.Token()
|
key, err := parseKeyToken(dec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF && !errors.Is(err, ErrEndOfToken) {
|
||||||
log.Debug("decodeResultDNSRewriteResult err: %s", err)
|
log.Debug("decodeResultDNSRewriteResult: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if d, ok := keyToken.(json.Delim); ok {
|
if key == "" {
|
||||||
if d == '}' {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
key, ok := keyToken.(string)
|
|
||||||
if !ok {
|
|
||||||
log.Debug("decodeResultDNSRewriteResult: keyToken is %T (%[1]v) and not string", keyToken)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
decodeResultDNSRewriteResultKey(key, dec, ent)
|
decodeResultDNSRewriteResultKey(key, dec, ent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -474,34 +472,51 @@ func translateResult(ent *logEntry) {
|
||||||
res.IPList = nil
|
res.IPList = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrEndOfToken is an error returned by parse key token when the closing
|
||||||
|
// bracket is found.
|
||||||
|
const ErrEndOfToken errors.Error = "end of token"
|
||||||
|
|
||||||
|
// parseKeyToken parses the dec's token key.
|
||||||
|
func parseKeyToken(dec *json.Decoder) (key string, err error) {
|
||||||
|
keyToken, err := dec.Token()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if d, ok := keyToken.(json.Delim); ok {
|
||||||
|
if d == '}' {
|
||||||
|
return "", ErrEndOfToken
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
key, ok := keyToken.(string)
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("keyToken is %T (%[1]v) and not string", keyToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
return key, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// decodeResult decodes a token of "Result" type to logEntry struct.
|
||||||
func decodeResult(dec *json.Decoder, ent *logEntry) {
|
func decodeResult(dec *json.Decoder, ent *logEntry) {
|
||||||
defer translateResult(ent)
|
defer translateResult(ent)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
keyToken, err := dec.Token()
|
key, err := parseKeyToken(dec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF && !errors.Is(err, ErrEndOfToken) {
|
||||||
log.Debug("decodeResult err: %s", err)
|
log.Debug("decodeResult: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if d, ok := keyToken.(json.Delim); ok {
|
if key == "" {
|
||||||
if d == '}' {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
key, ok := keyToken.(string)
|
|
||||||
if !ok {
|
|
||||||
log.Debug("decodeResult: keyToken is %T (%[1]v) and not string", keyToken)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
decHandler, ok := resultDecHandlers[key]
|
decHandler, ok := resultDecHandlers[key]
|
||||||
if ok {
|
if ok {
|
||||||
decHandler(dec, ent)
|
decHandler(dec, ent)
|
||||||
|
@ -527,13 +542,16 @@ func decodeResult(dec *json.Decoder, ent *logEntry) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resultHandlers is the map of log entry decode handlers for various keys.
|
||||||
var resultHandlers = map[string]logEntryHandler{
|
var resultHandlers = map[string]logEntryHandler{
|
||||||
"IsFiltered": func(t json.Token, ent *logEntry) error {
|
"IsFiltered": func(t json.Token, ent *logEntry) error {
|
||||||
v, ok := t.(bool)
|
v, ok := t.(bool)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ent.Result.IsFiltered = v
|
ent.Result.IsFiltered = v
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
"Rule": func(t json.Token, ent *logEntry) error {
|
"Rule": func(t json.Token, ent *logEntry) error {
|
||||||
|
@ -578,11 +596,14 @@ var resultHandlers = map[string]logEntryHandler{
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
i, err := v.Int64()
|
i, err := v.Int64()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ent.Result.Reason = filtering.Reason(i)
|
ent.Result.Reason = filtering.Reason(i)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
"ServiceName": func(t json.Token, ent *logEntry) error {
|
"ServiceName": func(t json.Token, ent *logEntry) error {
|
||||||
|
@ -607,6 +628,7 @@ var resultHandlers = map[string]logEntryHandler{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resultDecHandlers is the map of decode handlers for various keys.
|
||||||
var resultDecHandlers = map[string]func(dec *json.Decoder, ent *logEntry){
|
var resultDecHandlers = map[string]func(dec *json.Decoder, ent *logEntry){
|
||||||
"ReverseHosts": decodeResultReverseHosts,
|
"ReverseHosts": decodeResultReverseHosts,
|
||||||
"IPList": decodeResultIPList,
|
"IPList": decodeResultIPList,
|
||||||
|
@ -614,9 +636,11 @@ var resultDecHandlers = map[string]func(dec *json.Decoder, ent *logEntry){
|
||||||
"DNSRewriteResult": decodeResultDNSRewriteResult,
|
"DNSRewriteResult": decodeResultDNSRewriteResult,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decodeLogEntry decodes string str to logEntry ent.
|
||||||
func decodeLogEntry(ent *logEntry, str string) {
|
func decodeLogEntry(ent *logEntry, str string) {
|
||||||
dec := json.NewDecoder(strings.NewReader(str))
|
dec := json.NewDecoder(strings.NewReader(str))
|
||||||
dec.UseNumber()
|
dec.UseNumber()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
keyToken, err := dec.Token()
|
keyToken, err := dec.Token()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -68,3 +68,19 @@ func (e *logEntry) addResponse(resp *dns.Msg, isOrig bool) {
|
||||||
log.Error("querylog: %s", err)
|
log.Error("querylog: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// parseDNSRewriteResultIPs fills logEntry's DNSRewriteResult response records
|
||||||
|
// with the IP addresses parsed from the raw strings.
|
||||||
|
func (e *logEntry) parseDNSRewriteResultIPs() {
|
||||||
|
for rrType, rrValues := range e.Result.DNSRewriteResult.Response {
|
||||||
|
switch rrType {
|
||||||
|
case dns.TypeA, dns.TypeAAAA:
|
||||||
|
for i, v := range rrValues {
|
||||||
|
s, _ := v.(string)
|
||||||
|
rrValues[i] = net.ParseIP(s)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// Go on.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -16,32 +16,35 @@ import (
|
||||||
"github.com/miekg/dns"
|
"github.com/miekg/dns"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// queryLogFileName is a name of the log file. ".gz" extension is added later
|
||||||
queryLogFileName = "querylog.json" // .gz added during compression
|
// during compression.
|
||||||
)
|
const queryLogFileName = "querylog.json"
|
||||||
|
|
||||||
// queryLog is a structure that writes and reads the DNS query log
|
// queryLog is a structure that writes and reads the DNS query log.
|
||||||
type queryLog struct {
|
type queryLog struct {
|
||||||
findClient func(ids []string) (c *Client, err error)
|
|
||||||
|
|
||||||
// confMu protects conf.
|
// confMu protects conf.
|
||||||
confMu *sync.RWMutex
|
confMu *sync.RWMutex
|
||||||
conf *Config
|
|
||||||
|
conf *Config
|
||||||
|
anonymizer *aghnet.IPMut
|
||||||
|
|
||||||
|
findClient func(ids []string) (c *Client, err error)
|
||||||
|
|
||||||
// logFile is the path to the log file.
|
// logFile is the path to the log file.
|
||||||
logFile string
|
logFile string
|
||||||
|
|
||||||
// bufferLock protects buffer.
|
|
||||||
bufferLock sync.RWMutex
|
|
||||||
// buffer contains recent log entries. The entries in this buffer must not
|
// buffer contains recent log entries. The entries in this buffer must not
|
||||||
// be modified.
|
// be modified.
|
||||||
buffer []*logEntry
|
buffer []*logEntry
|
||||||
|
|
||||||
fileFlushLock sync.Mutex // synchronize a file-flushing goroutine and main thread
|
// bufferLock protects buffer.
|
||||||
flushPending bool // don't start another goroutine while the previous one is still running
|
bufferLock sync.RWMutex
|
||||||
|
|
||||||
|
// fileFlushLock synchronizes a file-flushing goroutine and main thread.
|
||||||
|
fileFlushLock sync.Mutex
|
||||||
fileWriteLock sync.Mutex
|
fileWriteLock sync.Mutex
|
||||||
|
|
||||||
anonymizer *aghnet.IPMut
|
flushPending bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClientProto values are names of the client protocols.
|
// ClientProto values are names of the client protocols.
|
||||||
|
@ -155,6 +158,43 @@ func (l *queryLog) clear() {
|
||||||
log.Debug("querylog: cleared")
|
log.Debug("querylog: cleared")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newLogEntry creates an instance of logEntry from parameters.
|
||||||
|
func newLogEntry(params *AddParams) (entry *logEntry) {
|
||||||
|
q := params.Question.Question[0]
|
||||||
|
|
||||||
|
entry = &logEntry{
|
||||||
|
// TODO(d.kolyshev): Export this timestamp to func params.
|
||||||
|
Time: time.Now(),
|
||||||
|
|
||||||
|
QHost: strings.ToLower(q.Name[:len(q.Name)-1]),
|
||||||
|
QType: dns.Type(q.Qtype).String(),
|
||||||
|
QClass: dns.Class(q.Qclass).String(),
|
||||||
|
|
||||||
|
ClientID: params.ClientID,
|
||||||
|
ClientProto: params.ClientProto,
|
||||||
|
|
||||||
|
Result: *params.Result,
|
||||||
|
Upstream: params.Upstream,
|
||||||
|
|
||||||
|
IP: params.ClientIP,
|
||||||
|
|
||||||
|
Elapsed: params.Elapsed,
|
||||||
|
|
||||||
|
Cached: params.Cached,
|
||||||
|
AuthenticatedData: params.AuthenticatedData,
|
||||||
|
}
|
||||||
|
|
||||||
|
if params.ReqECS != nil {
|
||||||
|
entry.ReqECS = params.ReqECS.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
entry.addResponse(params.Answer, false)
|
||||||
|
entry.addResponse(params.OrigAnswer, true)
|
||||||
|
|
||||||
|
return entry
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add implements the [QueryLog] interface for *queryLog.
|
||||||
func (l *queryLog) Add(params *AddParams) {
|
func (l *queryLog) Add(params *AddParams) {
|
||||||
var isEnabled, fileIsEnabled bool
|
var isEnabled, fileIsEnabled bool
|
||||||
var memSize uint32
|
var memSize uint32
|
||||||
|
@ -181,35 +221,7 @@ func (l *queryLog) Add(params *AddParams) {
|
||||||
params.Result = &filtering.Result{}
|
params.Result = &filtering.Result{}
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
entry := newLogEntry(params)
|
||||||
q := params.Question.Question[0]
|
|
||||||
entry := &logEntry{
|
|
||||||
Time: now,
|
|
||||||
|
|
||||||
QHost: strings.ToLower(q.Name[:len(q.Name)-1]),
|
|
||||||
QType: dns.Type(q.Qtype).String(),
|
|
||||||
QClass: dns.Class(q.Qclass).String(),
|
|
||||||
|
|
||||||
ClientID: params.ClientID,
|
|
||||||
ClientProto: params.ClientProto,
|
|
||||||
|
|
||||||
Result: *params.Result,
|
|
||||||
Upstream: params.Upstream,
|
|
||||||
|
|
||||||
IP: params.ClientIP,
|
|
||||||
|
|
||||||
Elapsed: params.Elapsed,
|
|
||||||
|
|
||||||
Cached: params.Cached,
|
|
||||||
AuthenticatedData: params.AuthenticatedData,
|
|
||||||
}
|
|
||||||
|
|
||||||
if params.ReqECS != nil {
|
|
||||||
entry.ReqECS = params.ReqECS.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
entry.addResponse(params.Answer, false)
|
|
||||||
entry.addResponse(params.OrigAnswer, true)
|
|
||||||
|
|
||||||
needFlush := false
|
needFlush := false
|
||||||
func() {
|
func() {
|
||||||
|
|
|
@ -45,9 +45,10 @@ func TestQueryLog(t *testing.T) {
|
||||||
addEntry(l, "example.com", net.IPv4(1, 1, 1, 4), net.IPv4(2, 2, 2, 4))
|
addEntry(l, "example.com", net.IPv4(1, 1, 1, 4), net.IPv4(2, 2, 2, 4))
|
||||||
|
|
||||||
type tcAssertion struct {
|
type tcAssertion struct {
|
||||||
num int
|
host string
|
||||||
host string
|
answer net.IP
|
||||||
answer, client net.IP
|
client net.IP
|
||||||
|
num int
|
||||||
}
|
}
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
|
|
|
@ -12,141 +12,181 @@ import (
|
||||||
"github.com/AdguardTeam/golibs/log"
|
"github.com/AdguardTeam/golibs/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Timestamp not found errors.
|
|
||||||
const (
|
const (
|
||||||
ErrTSNotFound errors.Error = "ts not found"
|
// Timestamp not found errors.
|
||||||
ErrTSTooLate errors.Error = "ts too late"
|
errTSNotFound errors.Error = "ts not found"
|
||||||
ErrTSTooEarly errors.Error = "ts too early"
|
errTSTooLate errors.Error = "ts too late"
|
||||||
|
errTSTooEarly errors.Error = "ts too early"
|
||||||
|
|
||||||
|
// maxEntrySize is a maximum size of the entry.
|
||||||
|
//
|
||||||
|
// TODO: Find a way to grow buffer instead of relying on this value when
|
||||||
|
// reading strings.
|
||||||
|
maxEntrySize = 16 * 1024
|
||||||
|
|
||||||
|
// bufferSize should be enough for at least this number of entries.
|
||||||
|
bufferSize = 100 * maxEntrySize
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: Find a way to grow buffer instead of relying on this value when reading strings
|
// qLogFile represents a single query log file. It allows reading from the
|
||||||
const maxEntrySize = 16 * 1024
|
// file in the reverse order.
|
||||||
|
|
||||||
// buffer should be enough for at least this number of entries
|
|
||||||
const bufferSize = 100 * maxEntrySize
|
|
||||||
|
|
||||||
// QLogFile represents a single query log file
|
|
||||||
// It allows reading from the file in the reverse order
|
|
||||||
//
|
//
|
||||||
// Please note that this is a stateful object.
|
// Please note, that this is a stateful object. Internally, it contains a
|
||||||
// Internally, it contains a pointer to a specific position in the file,
|
// pointer to a specific position in the file, and it reads lines in reverse
|
||||||
// and it reads lines in reverse order starting from that position.
|
// order starting from that position.
|
||||||
type QLogFile struct {
|
type qLogFile struct {
|
||||||
file *os.File // the query log file
|
// file is the query log file.
|
||||||
position int64 // current position in the file
|
file *os.File
|
||||||
|
|
||||||
buffer []byte // buffer that we've read from the file
|
// buffer that we've read from the file.
|
||||||
bufferStart int64 // start of the buffer (in the file)
|
buffer []byte
|
||||||
bufferLen int // buffer len
|
|
||||||
|
|
||||||
lock sync.Mutex // We use mutex to make it thread-safe
|
// lock is a mutex to make it thread-safe.
|
||||||
|
lock sync.Mutex
|
||||||
|
|
||||||
|
// position is the position in the file.
|
||||||
|
position int64
|
||||||
|
|
||||||
|
// bufferStart is the start of the buffer (in the file).
|
||||||
|
bufferStart int64
|
||||||
|
|
||||||
|
// bufferLen is the length of the buffer.
|
||||||
|
bufferLen int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQLogFile initializes a new instance of the QLogFile
|
// newQLogFile initializes a new instance of the qLogFile.
|
||||||
func NewQLogFile(path string) (*QLogFile, error) {
|
func newQLogFile(path string) (qf *qLogFile, err error) {
|
||||||
f, err := os.OpenFile(path, os.O_RDONLY, 0o644)
|
f, err := os.OpenFile(path, os.O_RDONLY, 0o644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &QLogFile{
|
return &qLogFile{file: f}, nil
|
||||||
file: f,
|
}
|
||||||
}, nil
|
|
||||||
|
// validateQLogLineIdx returns error if the line index is not valid to continue
|
||||||
|
// search.
|
||||||
|
func (q *qLogFile) validateQLogLineIdx(lineIdx, lastProbeLineIdx, ts, fSize int64) (err error) {
|
||||||
|
if lineIdx == lastProbeLineIdx {
|
||||||
|
if lineIdx == 0 {
|
||||||
|
return errTSTooEarly
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we're testing the same line twice then most likely the scope is
|
||||||
|
// too narrow and we won't find anything anymore in any other file.
|
||||||
|
return fmt.Errorf("looking up timestamp %d in %q: %w", ts, q.file.Name(), errTSNotFound)
|
||||||
|
} else if lineIdx == fSize {
|
||||||
|
return errTSTooLate
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// seekTS performs binary search in the query log file looking for a record
|
// seekTS performs binary search in the query log file looking for a record
|
||||||
// with the specified timestamp. Once the record is found, it sets
|
// with the specified timestamp. Once the record is found, it sets "position"
|
||||||
// "position" so that the next ReadNext call returned that record.
|
// so that the next ReadNext call returned that record.
|
||||||
//
|
//
|
||||||
// The algorithm is rather simple:
|
// The algorithm is rather simple:
|
||||||
// 1. It starts with the position in the middle of a file
|
// 1. It starts with the position in the middle of a file.
|
||||||
// 2. Shifts back to the beginning of the line
|
// 2. Shifts back to the beginning of the line.
|
||||||
// 3. Checks the log record timestamp
|
// 3. Checks the log record timestamp.
|
||||||
// 4. If it is lower than the timestamp we are looking for,
|
// 4. If it is lower than the timestamp we are looking for, it shifts seek
|
||||||
// it shifts seek position to 3/4 of the file. Otherwise, to 1/4 of the file.
|
// position to 3/4 of the file. Otherwise, to 1/4 of the file.
|
||||||
// 5. It performs the search again, every time the search scope is narrowed twice.
|
// 5. It performs the search again, every time the search scope is narrowed
|
||||||
|
// twice.
|
||||||
//
|
//
|
||||||
// Returns:
|
// Returns:
|
||||||
// * It returns the position of the the line with the timestamp we were looking for
|
// - It returns the position of the line with the timestamp we were looking
|
||||||
// so that when we call "ReadNext" this line was returned.
|
// for so that when we call "ReadNext" this line was returned.
|
||||||
// * Depth of the search (how many times we compared timestamps).
|
// - Depth of the search (how many times we compared timestamps).
|
||||||
// * If we could not find it, it returns one of the errors described above.
|
// - If we could not find it, it returns one of the errors described above.
|
||||||
func (q *QLogFile) seekTS(timestamp int64) (int64, int, error) {
|
func (q *qLogFile) seekTS(timestamp int64) (pos int64, depth int, err error) {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
// Empty the buffer
|
// Empty the buffer.
|
||||||
q.buffer = nil
|
q.buffer = nil
|
||||||
|
|
||||||
// First of all, check the file size
|
// First of all, check the file size.
|
||||||
fileInfo, err := q.file.Stat()
|
fileInfo, err := q.file.Stat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Define the search scope
|
// Define the search scope.
|
||||||
start := int64(0) // start of the search interval (position in the file)
|
|
||||||
end := fileInfo.Size() // end of the search interval (position in the file)
|
// Start of the search interval (position in the file).
|
||||||
probe := (end - start) / 2 // probe -- approximate index of the line we'll try to check
|
start := int64(0)
|
||||||
|
// End of the search interval (position in the file).
|
||||||
|
end := fileInfo.Size()
|
||||||
|
// Probe is the approximate index of the line we'll try to check.
|
||||||
|
probe := (end - start) / 2
|
||||||
|
|
||||||
var line string
|
var line string
|
||||||
var lineIdx int64 // index of the probe line in the file
|
// Index of the probe line in the file.
|
||||||
|
var lineIdx int64
|
||||||
var lineEndIdx int64
|
var lineEndIdx int64
|
||||||
var lastProbeLineIdx int64 // index of the last probe line
|
// Index of the last probe line.
|
||||||
|
var lastProbeLineIdx int64
|
||||||
lastProbeLineIdx = -1
|
lastProbeLineIdx = -1
|
||||||
|
|
||||||
// Count seek depth in order to detect mistakes
|
// Count seek depth in order to detect mistakes. If depth is too large,
|
||||||
// If depth is too large, we should stop the search
|
// we should stop the search.
|
||||||
depth := 0
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Get the line at the specified position
|
// Get the line at the specified position.
|
||||||
line, lineIdx, lineEndIdx, err = q.readProbeLine(probe)
|
line, lineIdx, lineEndIdx, err = q.readProbeLine(probe)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, depth, err
|
return 0, depth, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if lineIdx == lastProbeLineIdx {
|
// Check if the line index if invalid.
|
||||||
if lineIdx == 0 {
|
err = q.validateQLogLineIdx(lineIdx, lastProbeLineIdx, timestamp, fileInfo.Size())
|
||||||
return 0, depth, ErrTSTooEarly
|
if err != nil {
|
||||||
}
|
return 0, depth, err
|
||||||
|
|
||||||
// If we're testing the same line twice then most likely
|
|
||||||
// the scope is too narrow and we won't find anything
|
|
||||||
// anymore in any other file.
|
|
||||||
return 0, depth, fmt.Errorf("looking up timestamp %d in %q: %w", timestamp, q.file.Name(), ErrTSNotFound)
|
|
||||||
} else if lineIdx == fileInfo.Size() {
|
|
||||||
return 0, depth, ErrTSTooLate
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the last found idx
|
// Save the last found idx.
|
||||||
lastProbeLineIdx = lineIdx
|
lastProbeLineIdx = lineIdx
|
||||||
|
|
||||||
// Get the timestamp from the query log record
|
// Get the timestamp from the query log record.
|
||||||
ts := readQLogTimestamp(line)
|
ts := readQLogTimestamp(line)
|
||||||
if ts == 0 {
|
if ts == 0 {
|
||||||
return 0, depth, fmt.Errorf("looking up timestamp %d in %q: record %q has empty timestamp", timestamp, q.file.Name(), line)
|
return 0, depth, fmt.Errorf(
|
||||||
|
"looking up timestamp %d in %q: record %q has empty timestamp",
|
||||||
|
timestamp,
|
||||||
|
q.file.Name(),
|
||||||
|
line,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ts == timestamp {
|
if ts == timestamp {
|
||||||
// Hurray, returning the result
|
// Hurray, returning the result.
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Narrow the scope and repeat the search
|
// Narrow the scope and repeat the search.
|
||||||
if ts > timestamp {
|
if ts > timestamp {
|
||||||
// If the timestamp we're looking for is OLDER than what we found
|
// If the timestamp we're looking for is OLDER than what we found,
|
||||||
// Then the line is somewhere on the LEFT side from the current probe position
|
// then the line is somewhere on the LEFT side from the current
|
||||||
|
// probe position.
|
||||||
end = lineIdx
|
end = lineIdx
|
||||||
} else {
|
} else {
|
||||||
// If the timestamp we're looking for is NEWER than what we found
|
// If the timestamp we're looking for is NEWER than what we found,
|
||||||
// Then the line is somewhere on the RIGHT side from the current probe position
|
// then the line is somewhere on the RIGHT side from the current
|
||||||
|
// probe position.
|
||||||
start = lineEndIdx
|
start = lineEndIdx
|
||||||
}
|
}
|
||||||
probe = start + (end-start)/2
|
probe = start + (end-start)/2
|
||||||
|
|
||||||
depth++
|
depth++
|
||||||
if depth >= 100 {
|
if depth >= 100 {
|
||||||
return 0, depth, fmt.Errorf("looking up timestamp %d in %q: depth %d too high: %w", timestamp, q.file.Name(), depth, ErrTSNotFound)
|
return 0, depth, fmt.Errorf(
|
||||||
|
"looking up timestamp %d in %q: depth %d too high: %w",
|
||||||
|
timestamp,
|
||||||
|
q.file.Name(),
|
||||||
|
depth,
|
||||||
|
errTSNotFound,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,37 +194,39 @@ func (q *QLogFile) seekTS(timestamp int64) (int64, int, error) {
|
||||||
return q.position, depth, nil
|
return q.position, depth, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeekStart changes the current position to the end of the file
|
// SeekStart changes the current position to the end of the file. Please note,
|
||||||
// Please note that we're reading query log in the reverse order
|
// that we're reading query log in the reverse order and that's why log start
|
||||||
// and that's why log start is actually the end of file
|
// is actually the end of file.
|
||||||
//
|
//
|
||||||
// Returns nil if we were able to change the current position.
|
// Returns nil if we were able to change the current position. Returns error
|
||||||
// Returns error in any other case.
|
// in any other case.
|
||||||
func (q *QLogFile) SeekStart() (int64, error) {
|
func (q *qLogFile) SeekStart() (int64, error) {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
// Empty the buffer
|
// Empty the buffer.
|
||||||
q.buffer = nil
|
q.buffer = nil
|
||||||
|
|
||||||
// First of all, check the file size
|
// First of all, check the file size.
|
||||||
fileInfo, err := q.file.Stat()
|
fileInfo, err := q.file.Stat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Place the position to the very end of file
|
// Place the position to the very end of file.
|
||||||
q.position = fileInfo.Size() - 1
|
q.position = fileInfo.Size() - 1
|
||||||
if q.position < 0 {
|
if q.position < 0 {
|
||||||
q.position = 0
|
q.position = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
return q.position, nil
|
return q.position, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadNext reads the next line (in the reverse order) from the file
|
// ReadNext reads the next line (in the reverse order) from the file and shifts
|
||||||
// and shifts the current position left to the next (actually prev) line.
|
// the current position left to the next (actually prev) line.
|
||||||
// returns io.EOF if there's nothing to read more
|
//
|
||||||
func (q *QLogFile) ReadNext() (string, error) {
|
// Returns io.EOF if there's nothing more to read.
|
||||||
|
func (q *qLogFile) ReadNext() (string, error) {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
|
@ -197,35 +239,34 @@ func (q *QLogFile) ReadNext() (string, error) {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shift position
|
// Shift position.
|
||||||
if lineIdx == 0 {
|
if lineIdx == 0 {
|
||||||
q.position = 0
|
q.position = 0
|
||||||
} else {
|
} else {
|
||||||
// there's usually a line break before the line
|
// There's usually a line break before the line, so we should shift one
|
||||||
// so we should shift one more char left from the line
|
// more char left from the line "\nline".
|
||||||
// line\nline
|
|
||||||
q.position = lineIdx - 1
|
q.position = lineIdx - 1
|
||||||
}
|
}
|
||||||
return line, err
|
return line, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close frees the underlying resources
|
// Close frees the underlying resources.
|
||||||
func (q *QLogFile) Close() error {
|
func (q *qLogFile) Close() error {
|
||||||
return q.file.Close()
|
return q.file.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// readNextLine reads the next line from the specified position
|
// readNextLine reads the next line from the specified position. This line
|
||||||
// this line actually have to END on that position.
|
// actually have to END on that position.
|
||||||
//
|
//
|
||||||
// the algorithm is:
|
// The algorithm is:
|
||||||
// 1. check if we have the buffer initialized
|
// 1. Check if we have the buffer initialized.
|
||||||
// 2. if it is, scan it and look for the line there
|
// 2. If it is so, scan it and look for the line there.
|
||||||
// 3. if we cannot find the line there, read the prev chunk into the buffer
|
// 3. If we cannot find the line there, read the prev chunk into the buffer.
|
||||||
// 4. read the line from the buffer
|
// 4. Read the line from the buffer.
|
||||||
func (q *QLogFile) readNextLine(position int64) (string, int64, error) {
|
func (q *qLogFile) readNextLine(position int64) (string, int64, error) {
|
||||||
relativePos := position - q.bufferStart
|
relativePos := position - q.bufferStart
|
||||||
if q.buffer == nil || (relativePos < maxEntrySize && q.bufferStart != 0) {
|
if q.buffer == nil || (relativePos < maxEntrySize && q.bufferStart != 0) {
|
||||||
// Time to re-init the buffer
|
// Time to re-init the buffer.
|
||||||
err := q.initBuffer(position)
|
err := q.initBuffer(position)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", 0, err
|
return "", 0, err
|
||||||
|
@ -233,8 +274,7 @@ func (q *QLogFile) readNextLine(position int64) (string, int64, error) {
|
||||||
relativePos = position - q.bufferStart
|
relativePos = position - q.bufferStart
|
||||||
}
|
}
|
||||||
|
|
||||||
// Look for the end of the prev line
|
// Look for the end of the prev line, this is where we'll read from.
|
||||||
// This is where we'll read from
|
|
||||||
startLine := int64(0)
|
startLine := int64(0)
|
||||||
for i := relativePos - 1; i >= 0; i-- {
|
for i := relativePos - 1; i >= 0; i-- {
|
||||||
if q.buffer[i] == '\n' {
|
if q.buffer[i] == '\n' {
|
||||||
|
@ -245,18 +285,19 @@ func (q *QLogFile) readNextLine(position int64) (string, int64, error) {
|
||||||
|
|
||||||
line := string(q.buffer[startLine:relativePos])
|
line := string(q.buffer[startLine:relativePos])
|
||||||
lineIdx := q.bufferStart + startLine
|
lineIdx := q.bufferStart + startLine
|
||||||
|
|
||||||
return line, lineIdx, nil
|
return line, lineIdx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// initBuffer initializes the QLogFile buffer.
|
// initBuffer initializes the qLogFile buffer. The goal is to read a chunk of
|
||||||
// the goal is to read a chunk of file that includes the line with the specified position.
|
// file that includes the line with the specified position.
|
||||||
func (q *QLogFile) initBuffer(position int64) error {
|
func (q *qLogFile) initBuffer(position int64) error {
|
||||||
q.bufferStart = int64(0)
|
q.bufferStart = int64(0)
|
||||||
if position > bufferSize {
|
if position > bufferSize {
|
||||||
q.bufferStart = position - bufferSize
|
q.bufferStart = position - bufferSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek to this position
|
// Seek to this position.
|
||||||
_, err := q.file.Seek(q.bufferStart, io.SeekStart)
|
_, err := q.file.Seek(q.bufferStart, io.SeekStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -271,34 +312,35 @@ func (q *QLogFile) initBuffer(position int64) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// readProbeLine reads a line that includes the specified position
|
// readProbeLine reads a line that includes the specified position. This
|
||||||
// this method is supposed to be used when we use binary search in the Seek method
|
// method is supposed to be used when we use binary search in the Seek method.
|
||||||
// in the case of consecutive reads, use readNext (it uses a better buffer)
|
// In the case of consecutive reads, use readNext, cause it uses better buffer.
|
||||||
func (q *QLogFile) readProbeLine(position int64) (string, int64, int64, error) {
|
func (q *qLogFile) readProbeLine(position int64) (string, int64, int64, error) {
|
||||||
// First of all, we should read a buffer that will include the query log line
|
// First of all, we should read a buffer that will include the query log
|
||||||
// In order to do this, we'll define the boundaries
|
// line. In order to do this, we'll define the boundaries.
|
||||||
seekPosition := int64(0)
|
seekPosition := int64(0)
|
||||||
relativePos := position // position relative to the buffer we're going to read
|
// Position relative to the buffer we're going to read.
|
||||||
|
relativePos := position
|
||||||
if position > maxEntrySize {
|
if position > maxEntrySize {
|
||||||
seekPosition = position - maxEntrySize
|
seekPosition = position - maxEntrySize
|
||||||
relativePos = maxEntrySize
|
relativePos = maxEntrySize
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek to this position
|
// Seek to this position.
|
||||||
_, err := q.file.Seek(seekPosition, io.SeekStart)
|
_, err := q.file.Seek(seekPosition, io.SeekStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", 0, 0, err
|
return "", 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// The buffer size is 2*maxEntrySize
|
// The buffer size is 2*maxEntrySize.
|
||||||
buffer := make([]byte, maxEntrySize*2)
|
buffer := make([]byte, maxEntrySize*2)
|
||||||
bufferLen, err := q.file.Read(buffer)
|
bufferLen, err := q.file.Read(buffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", 0, 0, err
|
return "", 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now start looking for the new line character starting
|
// Now start looking for the new line character starting from the
|
||||||
// from the relativePos and going left
|
// relativePos and going left.
|
||||||
startLine := int64(0)
|
startLine := int64(0)
|
||||||
for i := relativePos - 1; i >= 0; i-- {
|
for i := relativePos - 1; i >= 0; i-- {
|
||||||
if buffer[i] == '\n' {
|
if buffer[i] == '\n' {
|
||||||
|
@ -306,7 +348,7 @@ func (q *QLogFile) readProbeLine(position int64) (string, int64, int64, error) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Looking for the end of line now
|
// Looking for the end of line now.
|
||||||
endLine := int64(bufferLen)
|
endLine := int64(bufferLen)
|
||||||
lineEndIdx := endLine + seekPosition
|
lineEndIdx := endLine + seekPosition
|
||||||
for i := relativePos; i < int64(bufferLen); i++ {
|
for i := relativePos; i < int64(bufferLen); i++ {
|
||||||
|
@ -317,13 +359,13 @@ func (q *QLogFile) readProbeLine(position int64) (string, int64, int64, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally we can return the string we were looking for
|
// Finally we can return the string we were looking for.
|
||||||
lineIdx := startLine + seekPosition
|
lineIdx := startLine + seekPosition
|
||||||
return string(buffer[startLine:endLine]), lineIdx, lineEndIdx, nil
|
return string(buffer[startLine:endLine]), lineIdx, lineEndIdx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// readJSONvalue reads a JSON string in form of '"key":"value"'. prefix must be
|
// readJSONValue reads a JSON string in form of '"key":"value"'. prefix must
|
||||||
// of the form '"key":"' to generate less garbage.
|
// be of the form '"key":"' to generate less garbage.
|
||||||
func readJSONValue(s, prefix string) string {
|
func readJSONValue(s, prefix string) string {
|
||||||
i := strings.Index(s, prefix)
|
i := strings.Index(s, prefix)
|
||||||
if i == -1 {
|
if i == -1 {
|
||||||
|
@ -340,7 +382,7 @@ func readJSONValue(s, prefix string) string {
|
||||||
return s[start:end]
|
return s[start:end]
|
||||||
}
|
}
|
||||||
|
|
||||||
// readQLogTimestamp reads the timestamp field from the query log line
|
// readQLogTimestamp reads the timestamp field from the query log line.
|
||||||
func readQLogTimestamp(str string) int64 {
|
func readQLogTimestamp(str string) int64 {
|
||||||
val := readJSONValue(str, `"T":"`)
|
val := readJSONValue(str, `"T":"`)
|
||||||
if len(val) == 0 {
|
if len(val) == 0 {
|
||||||
|
@ -351,10 +393,12 @@ func readQLogTimestamp(str string) int64 {
|
||||||
log.Error("Couldn't find timestamp: %s", str)
|
log.Error("Couldn't find timestamp: %s", str)
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
tm, err := time.Parse(time.RFC3339Nano, val)
|
tm, err := time.Parse(time.RFC3339Nano, val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Couldn't parse timestamp: %s", val)
|
log.Error("Couldn't parse timestamp: %s", val)
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
return tm.UnixNano()
|
return tm.UnixNano()
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,15 +72,15 @@ func prepareTestFiles(t *testing.T, filesNum, linesNum int) []string {
|
||||||
return files
|
return files
|
||||||
}
|
}
|
||||||
|
|
||||||
// newTestQLogFile creates new *QLogFile for tests and registers the required
|
// newTestQLogFile creates new *qLogFile for tests and registers the required
|
||||||
// cleanup functions.
|
// cleanup functions.
|
||||||
func newTestQLogFile(t *testing.T, linesNum int) (file *QLogFile) {
|
func newTestQLogFile(t *testing.T, linesNum int) (file *qLogFile) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
testFile := prepareTestFiles(t, 1, linesNum)[0]
|
testFile := prepareTestFiles(t, 1, linesNum)[0]
|
||||||
|
|
||||||
// Create the new QLogFile instance.
|
// Create the new qLogFile instance.
|
||||||
file, err := NewQLogFile(testFile)
|
file, err := newQLogFile(testFile)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.NotNil(t, file)
|
assert.NotNil(t, file)
|
||||||
|
@ -240,7 +240,7 @@ func TestQLogFile_SeekTS_bad(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getQLogFileLine(q *QLogFile, lineNumber int) (line string, err error) {
|
func getQLogFileLine(q *qLogFile, lineNumber int) (line string, err error) {
|
||||||
if _, err = q.SeekStart(); err != nil {
|
if _, err = q.SeekStart(); err != nil {
|
||||||
return line, err
|
return line, err
|
||||||
}
|
}
|
||||||
|
@ -256,7 +256,7 @@ func getQLogFileLine(q *QLogFile, lineNumber int) (line string, err error) {
|
||||||
|
|
||||||
// Check adding and loading (with filtering) entries from disk and memory.
|
// Check adding and loading (with filtering) entries from disk and memory.
|
||||||
func TestQLogFile(t *testing.T) {
|
func TestQLogFile(t *testing.T) {
|
||||||
// Create the new QLogFile instance.
|
// Create the new qLogFile instance.
|
||||||
q := newTestQLogFile(t, 2)
|
q := newTestQLogFile(t, 2)
|
||||||
|
|
||||||
// Seek to the start.
|
// Seek to the start.
|
||||||
|
@ -285,7 +285,7 @@ func TestQLogFile(t *testing.T) {
|
||||||
assert.Empty(t, line)
|
assert.Empty(t, line)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestQLogFileData(t *testing.T, data string) (file *QLogFile) {
|
func newTestQLogFileData(t *testing.T, data string) (file *qLogFile) {
|
||||||
f, err := os.CreateTemp(t.TempDir(), "*.txt")
|
f, err := os.CreateTemp(t.TempDir(), "*.txt")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
testutil.CleanupAndRequireSuccess(t, f.Close)
|
testutil.CleanupAndRequireSuccess(t, f.Close)
|
||||||
|
@ -293,7 +293,7 @@ func NewTestQLogFileData(t *testing.T, data string) (file *QLogFile) {
|
||||||
_, err = f.WriteString(data)
|
_, err = f.WriteString(data)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
file, err = NewQLogFile(f.Name())
|
file, err = newQLogFile(f.Name())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
testutil.CleanupAndRequireSuccess(t, file.Close)
|
testutil.CleanupAndRequireSuccess(t, file.Close)
|
||||||
|
|
||||||
|
@ -309,9 +309,9 @@ func TestQLog_Seek(t *testing.T) {
|
||||||
timestamp, _ := time.Parse(time.RFC3339Nano, "2020-08-31T18:44:25.376690873+03:00")
|
timestamp, _ := time.Parse(time.RFC3339Nano, "2020-08-31T18:44:25.376690873+03:00")
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
|
wantErr error
|
||||||
name string
|
name string
|
||||||
delta int
|
delta int
|
||||||
wantErr error
|
|
||||||
wantDepth int
|
wantDepth int
|
||||||
}{{
|
}{{
|
||||||
name: "ok",
|
name: "ok",
|
||||||
|
@ -321,12 +321,12 @@ func TestQLog_Seek(t *testing.T) {
|
||||||
}, {
|
}, {
|
||||||
name: "too_late",
|
name: "too_late",
|
||||||
delta: 2,
|
delta: 2,
|
||||||
wantErr: ErrTSTooLate,
|
wantErr: errTSTooLate,
|
||||||
wantDepth: 2,
|
wantDepth: 2,
|
||||||
}, {
|
}, {
|
||||||
name: "too_early",
|
name: "too_early",
|
||||||
delta: -2,
|
delta: -2,
|
||||||
wantErr: ErrTSTooEarly,
|
wantErr: errTSTooEarly,
|
||||||
wantDepth: 1,
|
wantDepth: 1,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
|
@ -338,7 +338,7 @@ func TestQLog_Seek(t *testing.T) {
|
||||||
timestamp.Add(time.Second).Format(time.RFC3339Nano),
|
timestamp.Add(time.Second).Format(time.RFC3339Nano),
|
||||||
)
|
)
|
||||||
|
|
||||||
q := NewTestQLogFileData(t, data)
|
q := newTestQLogFileData(t, data)
|
||||||
|
|
||||||
_, depth, err := q.seekTS(timestamp.Add(time.Second * time.Duration(tc.delta)).UnixNano())
|
_, depth, err := q.seekTS(timestamp.Add(time.Second * time.Duration(tc.delta)).UnixNano())
|
||||||
require.Truef(t, errors.Is(err, tc.wantErr), "%v", err)
|
require.Truef(t, errors.Is(err, tc.wantErr), "%v", err)
|
||||||
|
|
|
@ -9,36 +9,36 @@ import (
|
||||||
"github.com/AdguardTeam/golibs/log"
|
"github.com/AdguardTeam/golibs/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QLogReader allows reading from multiple query log files in the reverse order.
|
// qLogReader allows reading from multiple query log files in the reverse
|
||||||
|
// order.
|
||||||
//
|
//
|
||||||
// Please note that this is a stateful object.
|
// Please note that this is a stateful object. Internally, it contains a
|
||||||
// Internally, it contains a pointer to a particular query log file, and
|
// pointer to a particular query log file, and to a specific position in this
|
||||||
// to a specific position in this file, and it reads lines in reverse order
|
// file, and it reads lines in reverse order starting from that position.
|
||||||
// starting from that position.
|
type qLogReader struct {
|
||||||
type QLogReader struct {
|
// qFiles is an array with the query log files. The order is from oldest
|
||||||
// qFiles - array with the query log files
|
// to newest.
|
||||||
// The order is - from oldest to newest
|
qFiles []*qLogFile
|
||||||
qFiles []*QLogFile
|
|
||||||
|
|
||||||
currentFile int // Index of the current file
|
// currentFile is the index of the current file.
|
||||||
|
currentFile int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQLogReader initializes a QLogReader instance
|
// newQLogReader initializes a qLogReader instance with the specified files.
|
||||||
// with the specified files
|
func newQLogReader(files []string) (*qLogReader, error) {
|
||||||
func NewQLogReader(files []string) (*QLogReader, error) {
|
qFiles := make([]*qLogFile, 0)
|
||||||
qFiles := make([]*QLogFile, 0)
|
|
||||||
|
|
||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
q, err := NewQLogFile(f)
|
q, err := newQLogFile(f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, os.ErrNotExist) {
|
if errors.Is(err, os.ErrNotExist) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close what we've already opened.
|
// Close what we've already opened.
|
||||||
cerr := closeQFiles(qFiles)
|
cErr := closeQFiles(qFiles)
|
||||||
if cerr != nil {
|
if cErr != nil {
|
||||||
log.Debug("querylog: closing files: %s", cerr)
|
log.Debug("querylog: closing files: %s", cErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -47,31 +47,28 @@ func NewQLogReader(files []string) (*QLogReader, error) {
|
||||||
qFiles = append(qFiles, q)
|
qFiles = append(qFiles, q)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &QLogReader{
|
return &qLogReader{qFiles: qFiles, currentFile: len(qFiles) - 1}, nil
|
||||||
qFiles: qFiles,
|
|
||||||
currentFile: (len(qFiles) - 1),
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// seekTS performs binary search of a query log record with the specified
|
// seekTS performs binary search of a query log record with the specified
|
||||||
// timestamp. If the record is found, it sets QLogReader's position to point to
|
// timestamp. If the record is found, it sets qLogReader's position to point
|
||||||
// that line, so that the next ReadNext call returned this line.
|
// to that line, so that the next ReadNext call returned this line.
|
||||||
func (r *QLogReader) seekTS(timestamp int64) (err error) {
|
func (r *qLogReader) seekTS(timestamp int64) (err error) {
|
||||||
for i := len(r.qFiles) - 1; i >= 0; i-- {
|
for i := len(r.qFiles) - 1; i >= 0; i-- {
|
||||||
q := r.qFiles[i]
|
q := r.qFiles[i]
|
||||||
_, _, err = q.seekTS(timestamp)
|
_, _, err = q.seekTS(timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, ErrTSTooEarly) {
|
if errors.Is(err, errTSTooEarly) {
|
||||||
// Look at the next file, since we've reached the end of this
|
// Look at the next file, since we've reached the end of this
|
||||||
// one. If there is no next file, it's not found.
|
// one. If there is no next file, it's not found.
|
||||||
err = ErrTSNotFound
|
err = errTSNotFound
|
||||||
|
|
||||||
continue
|
continue
|
||||||
} else if errors.Is(err, ErrTSTooLate) {
|
} else if errors.Is(err, errTSTooLate) {
|
||||||
// Just seek to the start then. timestamp is probably between
|
// Just seek to the start then. timestamp is probably between
|
||||||
// the end of the previous one and the start of this one.
|
// the end of the previous one and the start of this one.
|
||||||
return r.SeekStart()
|
return r.SeekStart()
|
||||||
} else if errors.Is(err, ErrTSNotFound) {
|
} else if errors.Is(err, errTSNotFound) {
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("seekts: file at index %d: %w", i, err)
|
return fmt.Errorf("seekts: file at index %d: %w", i, err)
|
||||||
|
@ -80,7 +77,7 @@ func (r *QLogReader) seekTS(timestamp int64) (err error) {
|
||||||
|
|
||||||
// The search is finished, and the searched element has been found.
|
// The search is finished, and the searched element has been found.
|
||||||
// Update currentFile only, position is already set properly in
|
// Update currentFile only, position is already set properly in
|
||||||
// QLogFile.
|
// qLogFile.
|
||||||
r.currentFile = i
|
r.currentFile = i
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -93,13 +90,13 @@ func (r *QLogReader) seekTS(timestamp int64) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeekStart changes the current position to the end of the newest file
|
// SeekStart changes the current position to the end of the newest file.
|
||||||
// Please note that we're reading query log in the reverse order
|
// Please note that we're reading query log in the reverse order and that's why
|
||||||
// and that's why log start is actually the end of file
|
// the log starts actually at the end of file.
|
||||||
//
|
//
|
||||||
// Returns nil if we were able to change the current position.
|
// Returns nil if we were able to change the current position. Returns error
|
||||||
// Returns error in any other case.
|
// in any other cases.
|
||||||
func (r *QLogReader) SeekStart() error {
|
func (r *qLogReader) SeekStart() error {
|
||||||
if len(r.qFiles) == 0 {
|
if len(r.qFiles) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -110,10 +107,12 @@ func (r *QLogReader) SeekStart() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadNext reads the next line (in the reverse order) from the query log files.
|
// ReadNext reads the next line (in the reverse order) from the query log
|
||||||
// and shifts the current position left to the next (actually prev) line (or the next file).
|
// files. Then shifts the current position left to the next (actually prev)
|
||||||
// returns io.EOF if there's nothing to read more.
|
// line (or the next file).
|
||||||
func (r *QLogReader) ReadNext() (string, error) {
|
//
|
||||||
|
// Returns io.EOF if there is nothing more to read.
|
||||||
|
func (r *qLogReader) ReadNext() (string, error) {
|
||||||
if len(r.qFiles) == 0 {
|
if len(r.qFiles) == 0 {
|
||||||
return "", io.EOF
|
return "", io.EOF
|
||||||
}
|
}
|
||||||
|
@ -122,7 +121,7 @@ func (r *QLogReader) ReadNext() (string, error) {
|
||||||
q := r.qFiles[r.currentFile]
|
q := r.qFiles[r.currentFile]
|
||||||
line, err := q.ReadNext()
|
line, err := q.ReadNext()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Shift to the older file
|
// Shift to the older file.
|
||||||
r.currentFile--
|
r.currentFile--
|
||||||
if r.currentFile < 0 {
|
if r.currentFile < 0 {
|
||||||
break
|
break
|
||||||
|
@ -130,10 +129,10 @@ func (r *QLogReader) ReadNext() (string, error) {
|
||||||
|
|
||||||
q = r.qFiles[r.currentFile]
|
q = r.qFiles[r.currentFile]
|
||||||
|
|
||||||
// Set it's position to the start right away
|
// Set its position to the start right away.
|
||||||
_, err = q.SeekStart()
|
_, err = q.SeekStart()
|
||||||
|
|
||||||
// This is unexpected, return an error right away
|
// This is unexpected, return an error right away.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -142,17 +141,17 @@ func (r *QLogReader) ReadNext() (string, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nothing to read anymore
|
// Nothing to read anymore.
|
||||||
return "", io.EOF
|
return "", io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the QLogReader
|
// Close closes the qLogReader.
|
||||||
func (r *QLogReader) Close() error {
|
func (r *qLogReader) Close() error {
|
||||||
return closeQFiles(r.qFiles)
|
return closeQFiles(r.qFiles)
|
||||||
}
|
}
|
||||||
|
|
||||||
// closeQFiles - helper method to close multiple QLogFile instances
|
// closeQFiles is a helper method to close multiple qLogFile instances.
|
||||||
func closeQFiles(qFiles []*QLogFile) error {
|
func closeQFiles(qFiles []*qLogFile) error {
|
||||||
var errs []error
|
var errs []error
|
||||||
|
|
||||||
for _, q := range qFiles {
|
for _, q := range qFiles {
|
||||||
|
@ -163,7 +162,7 @@ func closeQFiles(qFiles []*QLogFile) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
return errors.List("error while closing QLogReader", errs...)
|
return errors.List("error while closing qLogReader", errs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -10,15 +10,15 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
// newTestQLogReader creates new *QLogReader for tests and registers the
|
// newTestQLogReader creates new *qLogReader for tests and registers the
|
||||||
// required cleanup functions.
|
// required cleanup functions.
|
||||||
func newTestQLogReader(t *testing.T, filesNum, linesNum int) (reader *QLogReader) {
|
func newTestQLogReader(t *testing.T, filesNum, linesNum int) (reader *qLogReader) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
testFiles := prepareTestFiles(t, filesNum, linesNum)
|
testFiles := prepareTestFiles(t, filesNum, linesNum)
|
||||||
|
|
||||||
// Create the new QLogReader instance.
|
// Create the new qLogReader instance.
|
||||||
reader, err := NewQLogReader(testFiles)
|
reader, err := newQLogReader(testFiles)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.NotNil(t, reader)
|
assert.NotNil(t, reader)
|
||||||
|
@ -75,9 +75,9 @@ func TestQLogReader_Seek(t *testing.T) {
|
||||||
r := newTestQLogReader(t, 2, 10000)
|
r := newTestQLogReader(t, 2, 10000)
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
|
want error
|
||||||
name string
|
name string
|
||||||
time string
|
time string
|
||||||
want error
|
|
||||||
}{{
|
}{{
|
||||||
name: "not_too_old",
|
name: "not_too_old",
|
||||||
time: "2020-02-18T22:39:35.920973+03:00",
|
time: "2020-02-18T22:39:35.920973+03:00",
|
||||||
|
@ -97,7 +97,7 @@ func TestQLogReader_Seek(t *testing.T) {
|
||||||
}, {
|
}, {
|
||||||
name: "non-existent_long_ago",
|
name: "non-existent_long_ago",
|
||||||
time: "2000-02-19T01:23:16.920973+03:00",
|
time: "2000-02-19T01:23:16.920973+03:00",
|
||||||
want: ErrTSNotFound,
|
want: errTSNotFound,
|
||||||
}, {
|
}, {
|
||||||
name: "non-existent_far_ahead",
|
name: "non-existent_far_ahead",
|
||||||
time: "2100-02-19T01:23:16.920973+03:00",
|
time: "2100-02-19T01:23:16.920973+03:00",
|
||||||
|
@ -105,7 +105,7 @@ func TestQLogReader_Seek(t *testing.T) {
|
||||||
}, {
|
}, {
|
||||||
name: "non-existent_but_could",
|
name: "non-existent_but_could",
|
||||||
time: "2020-02-18T22:36:37.000000+03:00",
|
time: "2020-02-18T22:36:37.000000+03:00",
|
||||||
want: ErrTSNotFound,
|
want: errTSNotFound,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
@ -125,9 +125,9 @@ func TestQLogReader_ReadNext(t *testing.T) {
|
||||||
r := newTestQLogReader(t, filesNum, linesNum)
|
r := newTestQLogReader(t, filesNum, linesNum)
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
|
want error
|
||||||
name string
|
name string
|
||||||
start int
|
start int
|
||||||
want error
|
|
||||||
}{{
|
}{{
|
||||||
name: "ok",
|
name: "ok",
|
||||||
start: 0,
|
start: 0,
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
package querylog
|
package querylog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/AdguardTeam/golibs/errors"
|
||||||
"github.com/AdguardTeam/golibs/log"
|
"github.com/AdguardTeam/golibs/log"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
@ -134,84 +136,112 @@ func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest tim
|
||||||
return entries, oldest
|
return entries, oldest
|
||||||
}
|
}
|
||||||
|
|
||||||
// searchFiles looks up log records from all log files. It optionally uses the
|
// seekRecord changes the current position to the next record older than the
|
||||||
// client cache, if provided. searchFiles does not scan more than
|
// provided parameter.
|
||||||
// maxFileScanEntries so callers may need to call it several times to get all
|
func (r *qLogReader) seekRecord(olderThan time.Time) (err error) {
|
||||||
// results. oldest and total are the time of the oldest processed entry and the
|
if olderThan.IsZero() {
|
||||||
// total number of processed entries, including discarded ones, correspondingly.
|
return r.SeekStart()
|
||||||
func (l *queryLog) searchFiles(
|
}
|
||||||
params *searchParams,
|
|
||||||
cache clientCache,
|
err = r.seekTS(olderThan.UnixNano())
|
||||||
) (entries []*logEntry, oldest time.Time, total int) {
|
if err == nil {
|
||||||
|
// Read to the next record, because we only need the one that goes
|
||||||
|
// after it.
|
||||||
|
_, err = r.ReadNext()
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// setQLogReader creates a reader with the specified files and sets the
|
||||||
|
// position to the next record older than the provided parameter.
|
||||||
|
func (l *queryLog) setQLogReader(olderThan time.Time) (qr *qLogReader, err error) {
|
||||||
files := []string{
|
files := []string{
|
||||||
l.logFile + ".1",
|
l.logFile + ".1",
|
||||||
l.logFile,
|
l.logFile,
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := NewQLogReader(files)
|
r, err := newQLogReader(files)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("querylog: opening qlog reader: %s", err)
|
return nil, fmt.Errorf("opening qlog reader: %s", err)
|
||||||
|
|
||||||
return entries, oldest, 0
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
closeErr := r.Close()
|
|
||||||
if closeErr != nil {
|
|
||||||
log.Error("querylog: closing file: %s", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if params.olderThan.IsZero() {
|
|
||||||
err = r.SeekStart()
|
|
||||||
} else {
|
|
||||||
err = r.seekTS(params.olderThan.UnixNano())
|
|
||||||
if err == nil {
|
|
||||||
// Read to the next record, because we only need the one that goes
|
|
||||||
// after it.
|
|
||||||
_, err = r.ReadNext()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = r.seekRecord(olderThan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("querylog: cannot seek to %s: %s", params.olderThan, err)
|
defer func() { err = errors.WithDeferred(err, r.Close()) }()
|
||||||
|
log.Debug("querylog: cannot seek to %s: %s", olderThan, err)
|
||||||
|
|
||||||
return entries, oldest, 0
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
totalLimit := params.offset + params.limit
|
return r, nil
|
||||||
oldestNano := int64(0)
|
}
|
||||||
|
|
||||||
// By default, we do not scan more than maxFileScanEntries at once. The
|
// readEntries reads entries from the reader to totalLimit. By default, we do
|
||||||
// idea is to make search calls faster so that the UI could handle it and
|
// not scan more than maxFileScanEntries at once. The idea is to make search
|
||||||
// show something quicker. This behavior can be overridden if
|
// calls faster so that the UI could handle it and show something quicker.
|
||||||
// maxFileScanEntries is set to 0.
|
// This behavior can be overridden if maxFileScanEntries is set to 0.
|
||||||
|
func (l *queryLog) readEntries(
|
||||||
|
r *qLogReader,
|
||||||
|
params *searchParams,
|
||||||
|
cache clientCache,
|
||||||
|
totalLimit int,
|
||||||
|
) (entries []*logEntry, oldestNano int64, total int) {
|
||||||
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
|
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
|
||||||
var e *logEntry
|
ent, ts, rErr := l.readNextEntry(r, params, cache)
|
||||||
var ts int64
|
if rErr != nil {
|
||||||
|
if rErr == io.EOF {
|
||||||
e, ts, err = l.readNextEntry(r, params, cache)
|
|
||||||
if err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
oldestNano = 0
|
oldestNano = 0
|
||||||
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Error("querylog: reading next entry: %s", err)
|
log.Error("querylog: reading next entry: %s", rErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
oldestNano = ts
|
oldestNano = ts
|
||||||
total++
|
total++
|
||||||
|
|
||||||
if e != nil {
|
if ent == nil {
|
||||||
entries = append(entries, e)
|
continue
|
||||||
if len(entries) == totalLimit {
|
}
|
||||||
break
|
|
||||||
}
|
entries = append(entries, ent)
|
||||||
|
if len(entries) == totalLimit {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return entries, oldestNano, total
|
||||||
|
}
|
||||||
|
|
||||||
|
// searchFiles looks up log records from all log files. It optionally uses the
|
||||||
|
// client cache, if provided. searchFiles does not scan more than
|
||||||
|
// maxFileScanEntries so callers may need to call it several times to get all
|
||||||
|
// the results. oldest and total are the time of the oldest processed entry
|
||||||
|
// and the total number of processed entries, including discarded ones,
|
||||||
|
// correspondingly.
|
||||||
|
func (l *queryLog) searchFiles(
|
||||||
|
params *searchParams,
|
||||||
|
cache clientCache,
|
||||||
|
) (entries []*logEntry, oldest time.Time, total int) {
|
||||||
|
r, err := l.setQLogReader(params.olderThan)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("querylog: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if r == nil {
|
||||||
|
return entries, oldest, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if closeErr := r.Close(); closeErr != nil {
|
||||||
|
log.Error("querylog: closing file: %s", closeErr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
totalLimit := params.offset + params.limit
|
||||||
|
entries, oldestNano, total := l.readEntries(r, params, cache, totalLimit)
|
||||||
if oldestNano != 0 {
|
if oldestNano != 0 {
|
||||||
oldest = time.Unix(0, oldestNano)
|
oldest = time.Unix(0, oldestNano)
|
||||||
}
|
}
|
||||||
|
@ -243,11 +273,11 @@ func (f quickMatchClientFinder) findClient(clientID, ip string) (c *Client) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// readNextEntry reads the next log entry and checks if it matches the search
|
// readNextEntry reads the next log entry and checks if it matches the search
|
||||||
// criteria. It optionally uses the client cache, if provided. e is nil if the
|
// criteria. It optionally uses the client cache, if provided. e is nil if
|
||||||
// entry doesn't match the search criteria. ts is the timestamp of the
|
// the entry doesn't match the search criteria. ts is the timestamp of the
|
||||||
// processed entry.
|
// processed entry.
|
||||||
func (l *queryLog) readNextEntry(
|
func (l *queryLog) readNextEntry(
|
||||||
r *QLogReader,
|
r *qLogReader,
|
||||||
params *searchParams,
|
params *searchParams,
|
||||||
cache clientCache,
|
cache clientCache,
|
||||||
) (e *logEntry, ts int64, err error) {
|
) (e *logEntry, ts int64, err error) {
|
||||||
|
|
|
@ -2,18 +2,25 @@ package querylog
|
||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
// searchParams represent the search query sent by the client
|
// searchParams represent the search query sent by the client.
|
||||||
type searchParams struct {
|
type searchParams struct {
|
||||||
// searchCriteria - list of search criteria that we use to get filter results
|
// olderThen represents a parameter for entries that are older than this
|
||||||
searchCriteria []searchCriterion
|
// parameter value. If not set, disregard it and return any value.
|
||||||
|
|
||||||
// olderThen - return entries that are older than this value
|
|
||||||
// if not set - disregard it and return any value
|
|
||||||
olderThan time.Time
|
olderThan time.Time
|
||||||
|
|
||||||
offset int // offset for the search
|
// searchCriteria is a list of search criteria that we use to get filter
|
||||||
limit int // limit the number of records returned
|
// results.
|
||||||
maxFileScanEntries int // maximum log entries to scan in query log files. if 0 - no limit
|
searchCriteria []searchCriterion
|
||||||
|
|
||||||
|
// offset for the search.
|
||||||
|
offset int
|
||||||
|
|
||||||
|
// limit the number of records returned.
|
||||||
|
limit int
|
||||||
|
|
||||||
|
// maxFileScanEntries is a maximum of log entries to scan in query log
|
||||||
|
// files. If not set, then no limit.
|
||||||
|
maxFileScanEntries int
|
||||||
}
|
}
|
||||||
|
|
||||||
// newSearchParams - creates an empty instance of searchParams
|
// newSearchParams - creates an empty instance of searchParams
|
||||||
|
|
|
@ -161,7 +161,6 @@ run_linter "$GO" vet ./...
|
||||||
run_linter govulncheck ./...
|
run_linter govulncheck ./...
|
||||||
|
|
||||||
# Apply more lax standards to the code we haven't properly refactored yet.
|
# Apply more lax standards to the code we haven't properly refactored yet.
|
||||||
run_linter gocyclo --over 13 ./internal/querylog
|
|
||||||
run_linter gocyclo --over 12 ./internal/dhcpd
|
run_linter gocyclo --over 12 ./internal/dhcpd
|
||||||
|
|
||||||
# Apply the normal standards to new or somewhat refactored code.
|
# Apply the normal standards to new or somewhat refactored code.
|
||||||
|
@ -173,10 +172,11 @@ run_linter gocyclo --over 10\
|
||||||
./internal/dnsforward/\
|
./internal/dnsforward/\
|
||||||
./internal/filtering/\
|
./internal/filtering/\
|
||||||
./internal/home/\
|
./internal/home/\
|
||||||
|
./internal/next/\
|
||||||
|
./internal/querylog/\
|
||||||
./internal/stats/\
|
./internal/stats/\
|
||||||
./internal/tools/\
|
./internal/tools/\
|
||||||
./internal/updater/\
|
./internal/updater/\
|
||||||
./internal/next/\
|
|
||||||
./internal/version/\
|
./internal/version/\
|
||||||
./scripts/blocked-services/\
|
./scripts/blocked-services/\
|
||||||
./scripts/vetted-filters/\
|
./scripts/vetted-filters/\
|
||||||
|
|
Loading…
Reference in New Issue