Merge pull request #66 in DNS/adguard-dns from bugfix/356 to master
* commit 'e689c7d940e9a20bc13f024e18b86f3c1e5ba759': Do not lose filter name when saving to yaml coredns querylog -- since we read entire querylog json once at startup, fill querylog cache from it and then rotate it on each incoming DNS query Cache DNS lookups when resolving safebrowsing or parental servers, also cache replacement hostnames as well.
This commit is contained in:
commit
c3df81bb8d
|
@ -50,7 +50,7 @@ type coreDNSConfig struct {
|
||||||
|
|
||||||
type filter struct {
|
type filter struct {
|
||||||
URL string `json:"url"`
|
URL string `json:"url"`
|
||||||
Name string `json:"name" yaml:"-"`
|
Name string `json:"name" yaml:"name"`
|
||||||
Enabled bool `json:"enabled"`
|
Enabled bool `json:"enabled"`
|
||||||
RulesCount int `json:"rules_count" yaml:"-"`
|
RulesCount int `json:"rules_count" yaml:"-"`
|
||||||
contents []byte
|
contents []byte
|
||||||
|
|
|
@ -45,6 +45,16 @@ func init() {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type cacheEntry struct {
|
||||||
|
answer []dns.RR
|
||||||
|
lastUpdated time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
lookupCacheTime = time.Minute * 30
|
||||||
|
lookupCache = map[string]cacheEntry{}
|
||||||
|
)
|
||||||
|
|
||||||
type plugSettings struct {
|
type plugSettings struct {
|
||||||
SafeBrowsingBlockHost string
|
SafeBrowsingBlockHost string
|
||||||
ParentalBlockHost string
|
ParentalBlockHost string
|
||||||
|
@ -324,20 +334,29 @@ func (p *plug) replaceHostWithValAndReply(ctx context.Context, w dns.ResponseWri
|
||||||
records = append(records, result)
|
records = append(records, result)
|
||||||
} else {
|
} else {
|
||||||
// this is a domain name, need to look it up
|
// this is a domain name, need to look it up
|
||||||
req := new(dns.Msg)
|
cacheentry := lookupCache[val]
|
||||||
req.SetQuestion(dns.Fqdn(val), question.Qtype)
|
if time.Since(cacheentry.lastUpdated) > lookupCacheTime {
|
||||||
req.RecursionDesired = true
|
req := new(dns.Msg)
|
||||||
reqstate := request.Request{W: w, Req: req, Context: ctx}
|
req.SetQuestion(dns.Fqdn(val), question.Qtype)
|
||||||
result, err := p.upstream.Lookup(reqstate, dns.Fqdn(val), reqstate.QType())
|
req.RecursionDesired = true
|
||||||
if err != nil {
|
reqstate := request.Request{W: w, Req: req, Context: ctx}
|
||||||
log.Printf("Got error %s\n", err)
|
result, err := p.upstream.Lookup(reqstate, dns.Fqdn(val), reqstate.QType())
|
||||||
return dns.RcodeServerFailure, fmt.Errorf("plugin/dnsfilter: %s", err)
|
if err != nil {
|
||||||
}
|
log.Printf("Got error %s\n", err)
|
||||||
if result != nil {
|
return dns.RcodeServerFailure, fmt.Errorf("plugin/dnsfilter: %s", err)
|
||||||
for _, answer := range result.Answer {
|
|
||||||
answer.Header().Name = question.Name
|
|
||||||
}
|
}
|
||||||
records = result.Answer
|
if result != nil {
|
||||||
|
for _, answer := range result.Answer {
|
||||||
|
answer.Header().Name = question.Name
|
||||||
|
}
|
||||||
|
records = result.Answer
|
||||||
|
cacheentry.answer = result.Answer
|
||||||
|
cacheentry.lastUpdated = time.Now()
|
||||||
|
lookupCache[val] = cacheentry
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// get from cache
|
||||||
|
records = cacheentry.answer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m := new(dns.Msg)
|
m := new(dns.Msg)
|
||||||
|
|
|
@ -25,16 +25,15 @@ const (
|
||||||
queryLogRotationPeriod = time.Hour * 24 // rotate the log every 24 hours
|
queryLogRotationPeriod = time.Hour * 24 // rotate the log every 24 hours
|
||||||
queryLogFileName = "querylog.json" // .gz added during compression
|
queryLogFileName = "querylog.json" // .gz added during compression
|
||||||
queryLogSize = 5000 // maximum API response for /querylog
|
queryLogSize = 5000 // maximum API response for /querylog
|
||||||
queryLogCacheTime = time.Minute // if requested more often than this, give out cached response
|
|
||||||
queryLogTopSize = 500 // Keep in memory only top N values
|
queryLogTopSize = 500 // Keep in memory only top N values
|
||||||
queryLogAPIPort = "8618" // 8618 is sha512sum of "querylog" then each byte summed
|
queryLogAPIPort = "8618" // 8618 is sha512sum of "querylog" then each byte summed
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
logBufferLock sync.RWMutex
|
logBufferLock sync.RWMutex
|
||||||
logBuffer []logEntry
|
logBuffer []*logEntry
|
||||||
|
|
||||||
queryLogCache []logEntry
|
queryLogCache []*logEntry
|
||||||
queryLogLock sync.RWMutex
|
queryLogLock sync.RWMutex
|
||||||
queryLogTime time.Time
|
queryLogTime time.Time
|
||||||
)
|
)
|
||||||
|
@ -77,15 +76,22 @@ func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, ela
|
||||||
Elapsed: elapsed,
|
Elapsed: elapsed,
|
||||||
IP: ip,
|
IP: ip,
|
||||||
}
|
}
|
||||||
var flushBuffer []logEntry
|
var flushBuffer []*logEntry
|
||||||
|
|
||||||
logBufferLock.Lock()
|
logBufferLock.Lock()
|
||||||
logBuffer = append(logBuffer, entry)
|
logBuffer = append(logBuffer, &entry)
|
||||||
if len(logBuffer) >= logBufferCap {
|
if len(logBuffer) >= logBufferCap {
|
||||||
flushBuffer = logBuffer
|
flushBuffer = logBuffer
|
||||||
logBuffer = nil
|
logBuffer = nil
|
||||||
}
|
}
|
||||||
logBufferLock.Unlock()
|
logBufferLock.Unlock()
|
||||||
|
queryLogLock.Lock()
|
||||||
|
queryLogCache = append(queryLogCache, &entry)
|
||||||
|
if len(queryLogCache) > queryLogSize {
|
||||||
|
toremove := len(queryLogCache) - queryLogSize
|
||||||
|
queryLogCache = queryLogCache[toremove:]
|
||||||
|
}
|
||||||
|
queryLogLock.Unlock()
|
||||||
|
|
||||||
// add it to running top
|
// add it to running top
|
||||||
err = runningTop.addEntry(&entry, question, now)
|
err = runningTop.addEntry(&entry, question, now)
|
||||||
|
@ -103,26 +109,14 @@ func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, ela
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleQueryLog(w http.ResponseWriter, r *http.Request) {
|
func handleQueryLog(w http.ResponseWriter, r *http.Request) {
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
queryLogLock.RLock()
|
queryLogLock.RLock()
|
||||||
values := queryLogCache
|
values := make([]*logEntry, len(queryLogCache))
|
||||||
needRefresh := now.Sub(queryLogTime) >= queryLogCacheTime
|
copy(values, queryLogCache)
|
||||||
queryLogLock.RUnlock()
|
queryLogLock.RUnlock()
|
||||||
|
|
||||||
if needRefresh {
|
// reverse it so that newest is first
|
||||||
// need to get fresh data
|
for left, right := 0, len(values)-1; left < right; left, right = left+1, right-1 {
|
||||||
logBufferLock.RLock()
|
values[left], values[right] = values[right], values[left]
|
||||||
values = logBuffer
|
|
||||||
logBufferLock.RUnlock()
|
|
||||||
|
|
||||||
if len(values) < queryLogSize {
|
|
||||||
values = appendFromLogFile(values, queryLogSize, queryLogTimeLimit)
|
|
||||||
}
|
|
||||||
queryLogLock.Lock()
|
|
||||||
queryLogCache = values
|
|
||||||
queryLogTime = now
|
|
||||||
queryLogLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var data = []map[string]interface{}{}
|
var data = []map[string]interface{}{}
|
||||||
|
|
|
@ -19,7 +19,7 @@ var (
|
||||||
|
|
||||||
const enableGzip = false
|
const enableGzip = false
|
||||||
|
|
||||||
func flushToFile(buffer []logEntry) error {
|
func flushToFile(buffer []*logEntry) error {
|
||||||
if len(buffer) == 0 {
|
if len(buffer) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -90,7 +90,7 @@ func flushToFile(buffer []logEntry) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkBuffer(buffer []logEntry, b bytes.Buffer) error {
|
func checkBuffer(buffer []*logEntry, b bytes.Buffer) error {
|
||||||
l := len(buffer)
|
l := len(buffer)
|
||||||
d := json.NewDecoder(&b)
|
d := json.NewDecoder(&b)
|
||||||
|
|
||||||
|
@ -237,11 +237,11 @@ func genericLoader(onEntry func(entry *logEntry) error, needMore func() bool, ti
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendFromLogFile(values []logEntry, maxLen int, timeWindow time.Duration) []logEntry {
|
func appendFromLogFile(values []*logEntry, maxLen int, timeWindow time.Duration) []*logEntry {
|
||||||
a := []logEntry{}
|
a := []*logEntry{}
|
||||||
|
|
||||||
onEntry := func(entry *logEntry) error {
|
onEntry := func(entry *logEntry) error {
|
||||||
a = append(a, *entry)
|
a = append(a, entry)
|
||||||
if len(a) > maxLen {
|
if len(a) > maxLen {
|
||||||
toskip := len(a) - maxLen
|
toskip := len(a) - maxLen
|
||||||
a = a[toskip:]
|
a = a[toskip:]
|
||||||
|
|
|
@ -223,6 +223,14 @@ func fillStatsFromQueryLog() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queryLogLock.Lock()
|
||||||
|
queryLogCache = append(queryLogCache, entry)
|
||||||
|
if len(queryLogCache) > queryLogSize {
|
||||||
|
toremove := len(queryLogCache) - queryLogSize
|
||||||
|
queryLogCache = queryLogCache[toremove:]
|
||||||
|
}
|
||||||
|
queryLogLock.Unlock()
|
||||||
|
|
||||||
requests.IncWithTime(entry.Time)
|
requests.IncWithTime(entry.Time)
|
||||||
if entry.Result.IsFiltered {
|
if entry.Result.IsFiltered {
|
||||||
filtered.IncWithTime(entry.Time)
|
filtered.IncWithTime(entry.Time)
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/benburkert/dns/init"
|
||||||
"github.com/bluele/gcache"
|
"github.com/bluele/gcache"
|
||||||
"golang.org/x/net/publicsuffix"
|
"golang.org/x/net/publicsuffix"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue