Pull request 1799: AG-21072-stats-conf-race

Merge in DNS/adguard-home from AG-21072-stats-conf-race to master

Squashed commit of the following:

commit 26843b70ab8bfe0fa110c4156f2fd3f53ec5ee74
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Tue Apr 4 20:30:22 2023 +0300

    all: imp docs, locks

commit 942a5baa9606376dd06a3ef3284c17133014f768
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Tue Apr 4 19:15:18 2023 +0300

    stats: fix conf races
This commit is contained in:
Ainar Garipov 2023-04-05 15:50:14 +03:00
parent 27dbb427a1
commit 91ecbede7d
8 changed files with 128 additions and 91 deletions

2
go.mod
View File

@ -4,7 +4,7 @@ go 1.19
require ( require (
github.com/AdguardTeam/dnsproxy v0.48.3 github.com/AdguardTeam/dnsproxy v0.48.3
github.com/AdguardTeam/golibs v0.13.0 github.com/AdguardTeam/golibs v0.13.1
github.com/AdguardTeam/urlfilter v0.16.1 github.com/AdguardTeam/urlfilter v0.16.1
github.com/NYTimes/gziphandler v1.1.1 github.com/NYTimes/gziphandler v1.1.1
github.com/ameshkov/dnscrypt/v2 v2.2.6 github.com/ameshkov/dnscrypt/v2 v2.2.6

4
go.sum
View File

@ -2,8 +2,8 @@ github.com/AdguardTeam/dnsproxy v0.48.3 h1:h9xgDSmd1MqsPFNApyaPVXolmSTtzOWOcfWvP
github.com/AdguardTeam/dnsproxy v0.48.3/go.mod h1:Y7g7jRTd/u7+KJ/QvnGI2PCE8vnisp6EsW47/Sz0DZw= github.com/AdguardTeam/dnsproxy v0.48.3/go.mod h1:Y7g7jRTd/u7+KJ/QvnGI2PCE8vnisp6EsW47/Sz0DZw=
github.com/AdguardTeam/golibs v0.4.0/go.mod h1:skKsDKIBB7kkFflLJBpfGX+G8QFTx0WKUzB6TIgtUj4= github.com/AdguardTeam/golibs v0.4.0/go.mod h1:skKsDKIBB7kkFflLJBpfGX+G8QFTx0WKUzB6TIgtUj4=
github.com/AdguardTeam/golibs v0.10.4/go.mod h1:rSfQRGHIdgfxriDDNgNJ7HmE5zRoURq8R+VdR81Zuzw= github.com/AdguardTeam/golibs v0.10.4/go.mod h1:rSfQRGHIdgfxriDDNgNJ7HmE5zRoURq8R+VdR81Zuzw=
github.com/AdguardTeam/golibs v0.13.0 h1:hVBeNQXT/BgcjKz/4FMpFGvEYqXiXDJG+b5XpGCUOLk= github.com/AdguardTeam/golibs v0.13.1 h1:x6ChoXk2jborbCWJ01TyBAEY3SilHts0SCG7yjnf6Sc=
github.com/AdguardTeam/golibs v0.13.0/go.mod h1:rIglKDHdLvFT1UbhumBLHO9S4cvWS9MEyT1njommI/Y= github.com/AdguardTeam/golibs v0.13.1/go.mod h1:7ylQLv2Lqsc3UW3jHoITynYk6Y1tYtgEMkR09ppfsN8=
github.com/AdguardTeam/gomitmproxy v0.2.0/go.mod h1:Qdv0Mktnzer5zpdpi5rAwixNJzW2FN91LjKJCkVbYGU= github.com/AdguardTeam/gomitmproxy v0.2.0/go.mod h1:Qdv0Mktnzer5zpdpi5rAwixNJzW2FN91LjKJCkVbYGU=
github.com/AdguardTeam/urlfilter v0.16.1 h1:ZPi0rjqo8cQf2FVdzo6cqumNoHZx2KPXj2yZa1A5BBw= github.com/AdguardTeam/urlfilter v0.16.1 h1:ZPi0rjqo8cQf2FVdzo6cqumNoHZx2KPXj2yZa1A5BBw=
github.com/AdguardTeam/urlfilter v0.16.1/go.mod h1:46YZDOV1+qtdRDuhZKVPSSp7JWWes0KayqHrKAFBdEI= github.com/AdguardTeam/urlfilter v0.16.1/go.mod h1:46YZDOV1+qtdRDuhZKVPSSp7JWWes0KayqHrKAFBdEI=

View File

@ -12,7 +12,6 @@ import (
"github.com/AdguardTeam/AdGuardHome/internal/filtering" "github.com/AdguardTeam/AdGuardHome/internal/filtering"
"github.com/AdguardTeam/golibs/errors" "github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/stringutil"
"github.com/AdguardTeam/golibs/timeutil" "github.com/AdguardTeam/golibs/timeutil"
"github.com/miekg/dns" "github.com/miekg/dns"
) )
@ -126,9 +125,7 @@ func (l *queryLog) WriteDiskConfig(c *Config) {
defer l.confMu.RUnlock() defer l.confMu.RUnlock()
*c = *l.conf *c = *l.conf
c.Ignored = l.conf.Ignored.Clone()
// TODO(a.garipov): Add stringutil.Set.Clone.
c.Ignored = stringutil.NewSet(l.conf.Ignored.Values()...)
} }
// Clear memory buffer and remove log files // Clear memory buffer and remove log files

View File

@ -33,6 +33,8 @@ type QueryLog interface {
} }
// Config is the query log configuration structure. // Config is the query log configuration structure.
//
// Do not alter any fields of this structure after using it.
type Config struct { type Config struct {
// Ignored is the list of host names, which should not be written to log. // Ignored is the list of host names, which should not be written to log.
Ignored *stringutil.Set Ignored *stringutil.Set

View File

@ -42,13 +42,21 @@ type StatsResp struct {
AvgProcessingTime float64 `json:"avg_processing_time"` AvgProcessingTime float64 `json:"avg_processing_time"`
} }
// handleStats handles requests to the GET /control/stats endpoint. // handleStats is the handler for the GET /control/stats HTTP API.
func (s *StatsCtx) handleStats(w http.ResponseWriter, r *http.Request) { func (s *StatsCtx) handleStats(w http.ResponseWriter, r *http.Request) {
s.lock.Lock()
defer s.lock.Unlock()
start := time.Now() start := time.Now()
resp, ok := s.getData(uint32(s.limit.Hours()))
var (
resp StatsResp
ok bool
)
func() {
s.confMu.RLock()
defer s.confMu.RUnlock()
resp, ok = s.getData(uint32(s.limit.Hours()))
}()
log.Debug("stats: prepared data in %v", time.Since(start)) log.Debug("stats: prepared data in %v", time.Since(start))
if !ok { if !ok {
@ -80,47 +88,58 @@ type getConfigResp struct {
Enabled aghalg.NullBool `json:"enabled"` Enabled aghalg.NullBool `json:"enabled"`
} }
// handleStatsInfo handles requests to the GET /control/stats_info endpoint. // handleStatsInfo is the handler for the GET /control/stats_info HTTP API.
// //
// Deprecated: Remove it when migration to the new API is over. // Deprecated: Remove it when migration to the new API is over.
func (s *StatsCtx) handleStatsInfo(w http.ResponseWriter, r *http.Request) { func (s *StatsCtx) handleStatsInfo(w http.ResponseWriter, r *http.Request) {
s.lock.Lock() var (
defer s.lock.Unlock() enabled bool
limit time.Duration
)
func() {
s.confMu.RLock()
defer s.confMu.RUnlock()
days := uint32(s.limit / timeutil.Day) enabled, limit = s.enabled, s.limit
}()
days := uint32(limit / timeutil.Day)
ok := checkInterval(days) ok := checkInterval(days)
if !ok || (s.enabled && days == 0) { if !ok || (enabled && days == 0) {
// NOTE: If interval is custom we set it to 90 days for compatibility // NOTE: If interval is custom we set it to 90 days for compatibility
// with old API. // with old API.
days = 90 days = 90
} }
resp := configResp{IntervalDays: days} resp := configResp{IntervalDays: days}
if !s.enabled { if !enabled {
resp.IntervalDays = 0 resp.IntervalDays = 0
} }
_ = aghhttp.WriteJSONResponse(w, r, resp) _ = aghhttp.WriteJSONResponse(w, r, resp)
} }
// handleGetStatsConfig handles requests to the GET /control/stats/config // handleGetStatsConfig is the handler for the GET /control/stats/config HTTP
// endpoint. // API.
func (s *StatsCtx) handleGetStatsConfig(w http.ResponseWriter, r *http.Request) { func (s *StatsCtx) handleGetStatsConfig(w http.ResponseWriter, r *http.Request) {
s.lock.Lock() var resp *getConfigResp
defer s.lock.Unlock() func() {
s.confMu.RLock()
defer s.confMu.RUnlock()
ignored := s.ignored.Values() resp = &getConfigResp{
slices.Sort(ignored) Ignored: s.ignored.Values(),
Interval: float64(s.limit.Milliseconds()),
Enabled: aghalg.BoolToNullBool(s.enabled),
}
}()
slices.Sort(resp.Ignored)
resp := getConfigResp{
Ignored: ignored,
Interval: float64(s.limit.Milliseconds()),
Enabled: aghalg.BoolToNullBool(s.enabled),
}
_ = aghhttp.WriteJSONResponse(w, r, resp) _ = aghhttp.WriteJSONResponse(w, r, resp)
} }
// handleStatsConfig handles requests to the POST /control/stats_config // handleStatsConfig is the handler for the POST /control/stats_config HTTP API.
// endpoint.
// //
// Deprecated: Remove it when migration to the new API is over. // Deprecated: Remove it when migration to the new API is over.
func (s *StatsCtx) handleStatsConfig(w http.ResponseWriter, r *http.Request) { func (s *StatsCtx) handleStatsConfig(w http.ResponseWriter, r *http.Request) {
@ -138,17 +157,18 @@ func (s *StatsCtx) handleStatsConfig(w http.ResponseWriter, r *http.Request) {
return return
} }
limit := time.Duration(reqData.IntervalDays) * timeutil.Day
defer s.configModified() defer s.configModified()
s.lock.Lock() s.confMu.Lock()
defer s.lock.Unlock() defer s.confMu.Unlock()
limit := time.Duration(reqData.IntervalDays) * timeutil.Day
s.setLimit(limit) s.setLimit(limit)
} }
// handlePutStatsConfig handles requests to the PUT /control/stats/config/update // handlePutStatsConfig is the handler for the PUT /control/stats/config/update
// endpoint. // HTTP API.
func (s *StatsCtx) handlePutStatsConfig(w http.ResponseWriter, r *http.Request) { func (s *StatsCtx) handlePutStatsConfig(w http.ResponseWriter, r *http.Request) {
reqData := getConfigResp{} reqData := getConfigResp{}
err := json.NewDecoder(r.Body).Decode(&reqData) err := json.NewDecoder(r.Body).Decode(&reqData)
@ -181,15 +201,15 @@ func (s *StatsCtx) handlePutStatsConfig(w http.ResponseWriter, r *http.Request)
defer s.configModified() defer s.configModified()
s.lock.Lock() s.confMu.Lock()
defer s.lock.Unlock() defer s.confMu.Unlock()
s.ignored = set s.ignored = set
s.limit = ivl s.limit = ivl
s.enabled = reqData.Enabled == aghalg.NBTrue s.enabled = reqData.Enabled == aghalg.NBTrue
} }
// handleStatsReset handles requests to the POST /control/stats_reset endpoint. // handleStatsReset is the handler for the POST /control/stats_reset HTTP API.
func (s *StatsCtx) handleStatsReset(w http.ResponseWriter, r *http.Request) { func (s *StatsCtx) handleStatsReset(w http.ResponseWriter, r *http.Request) {
err := s.clear() err := s.clear()
if err != nil { if err != nil {
@ -205,9 +225,10 @@ func (s *StatsCtx) initWeb() {
s.httpRegister(http.MethodGet, "/control/stats", s.handleStats) s.httpRegister(http.MethodGet, "/control/stats", s.handleStats)
s.httpRegister(http.MethodPost, "/control/stats_reset", s.handleStatsReset) s.httpRegister(http.MethodPost, "/control/stats_reset", s.handleStatsReset)
s.httpRegister(http.MethodPost, "/control/stats_config", s.handleStatsConfig)
s.httpRegister(http.MethodGet, "/control/stats_info", s.handleStatsInfo)
s.httpRegister(http.MethodGet, "/control/stats/config", s.handleGetStatsConfig) s.httpRegister(http.MethodGet, "/control/stats/config", s.handleGetStatsConfig)
s.httpRegister(http.MethodPut, "/control/stats/config/update", s.handlePutStatsConfig) s.httpRegister(http.MethodPut, "/control/stats/config/update", s.handlePutStatsConfig)
// Deprecated handlers.
s.httpRegister(http.MethodGet, "/control/stats_info", s.handleStatsInfo)
s.httpRegister(http.MethodPost, "/control/stats_config", s.handleStatsConfig)
} }

View File

@ -24,11 +24,11 @@ func TestHandleStatsConfig(t *testing.T) {
) )
conf := Config{ conf := Config{
UnitID: func() (id uint32) { return 0 },
ConfigModified: func() {},
Filename: filepath.Join(t.TempDir(), "stats.db"), Filename: filepath.Join(t.TempDir(), "stats.db"),
Limit: time.Hour * 24, Limit: time.Hour * 24,
Enabled: true, Enabled: true,
UnitID: func() (id uint32) { return 0 },
ConfigModified: func() {},
} }
testCases := []struct { testCases := []struct {

View File

@ -41,6 +41,8 @@ func validateIvl(ivl time.Duration) (err error) {
} }
// Config is the configuration structure for the statistics collecting. // Config is the configuration structure for the statistics collecting.
//
// Do not alter any fields of this structure after using it.
type Config struct { type Config struct {
// UnitID is the function to generate the identifier for current unit. If // UnitID is the function to generate the identifier for current unit. If
// nil, the default function is used, see newUnitID. // nil, the default function is used, see newUnitID.
@ -54,6 +56,9 @@ type Config struct {
// endpoints. // endpoints.
HTTPRegister aghhttp.RegisterFunc HTTPRegister aghhttp.RegisterFunc
// Ignored is the list of host names, which should not be counted.
Ignored *stringutil.Set
// Filename is the name of the database file. // Filename is the name of the database file.
Filename string Filename string
@ -62,9 +67,6 @@ type Config struct {
// Enabled tells if the statistics are enabled. // Enabled tells if the statistics are enabled.
Enabled bool Enabled bool
// Ignored is the list of host names, which should not be counted.
Ignored *stringutil.Set
} }
// Interface is the statistics interface to be used by other packages. // Interface is the statistics interface to be used by other packages.
@ -110,20 +112,20 @@ type StatsCtx struct {
// interface. // interface.
configModified func() configModified func()
// confMu protects ignored, limit, and enabled.
confMu *sync.RWMutex
// ignored is the list of host names, which should not be counted.
ignored *stringutil.Set
// filename is the name of database file. // filename is the name of database file.
filename string filename string
// lock protects all the fields below.
lock sync.Mutex
// enabled tells if the statistics are enabled.
enabled bool
// limit is an upper limit for collecting statistics. // limit is an upper limit for collecting statistics.
limit time.Duration limit time.Duration
// ignored is the list of host names, which should not be counted. // enabled tells if the statistics are enabled.
ignored *stringutil.Set enabled bool
} }
// New creates s from conf and properly initializes it. Don't use s before // New creates s from conf and properly initializes it. Don't use s before
@ -131,21 +133,22 @@ type StatsCtx struct {
func New(conf Config) (s *StatsCtx, err error) { func New(conf Config) (s *StatsCtx, err error) {
defer withRecovered(&err) defer withRecovered(&err)
s = &StatsCtx{
enabled: conf.Enabled,
currMu: &sync.RWMutex{},
filename: conf.Filename,
configModified: conf.ConfigModified,
httpRegister: conf.HTTPRegister,
ignored: conf.Ignored,
}
err = validateIvl(conf.Limit) err = validateIvl(conf.Limit)
if err != nil { if err != nil {
return nil, fmt.Errorf("unsupported interval: %w", err) return nil, fmt.Errorf("unsupported interval: %w", err)
} }
s.limit = conf.Limit s = &StatsCtx{
currMu: &sync.RWMutex{},
httpRegister: conf.HTTPRegister,
configModified: conf.ConfigModified,
filename: conf.Filename,
confMu: &sync.RWMutex{},
ignored: conf.Ignored,
limit: conf.Limit,
enabled: conf.Enabled,
}
if s.unitIDGen = newUnitID; conf.UnitID != nil { if s.unitIDGen = newUnitID; conf.UnitID != nil {
s.unitIDGen = conf.UnitID s.unitIDGen = conf.UnitID
@ -244,8 +247,8 @@ func (s *StatsCtx) Close() (err error) {
// Update implements the Interface interface for *StatsCtx. // Update implements the Interface interface for *StatsCtx.
func (s *StatsCtx) Update(e Entry) { func (s *StatsCtx) Update(e Entry) {
s.lock.Lock() s.confMu.Lock()
defer s.lock.Unlock() defer s.confMu.Unlock()
if !s.enabled || s.limit == 0 { if !s.enabled || s.limit == 0 {
return return
@ -276,18 +279,18 @@ func (s *StatsCtx) Update(e Entry) {
// WriteDiskConfig implements the Interface interface for *StatsCtx. // WriteDiskConfig implements the Interface interface for *StatsCtx.
func (s *StatsCtx) WriteDiskConfig(dc *Config) { func (s *StatsCtx) WriteDiskConfig(dc *Config) {
s.lock.Lock() s.confMu.RLock()
defer s.lock.Unlock() defer s.confMu.RUnlock()
dc.Ignored = s.ignored.Clone()
dc.Limit = s.limit dc.Limit = s.limit
dc.Enabled = s.enabled dc.Enabled = s.enabled
dc.Ignored = s.ignored
} }
// TopClientsIP implements the [Interface] interface for *StatsCtx. // TopClientsIP implements the [Interface] interface for *StatsCtx.
func (s *StatsCtx) TopClientsIP(maxCount uint) (ips []netip.Addr) { func (s *StatsCtx) TopClientsIP(maxCount uint) (ips []netip.Addr) {
s.lock.Lock() s.confMu.RLock()
defer s.lock.Unlock() defer s.confMu.RUnlock()
limit := uint32(s.limit.Hours()) limit := uint32(s.limit.Hours())
if !s.enabled || limit == 0 { if !s.enabled || limit == 0 {
@ -382,8 +385,8 @@ func (s *StatsCtx) openDB() (err error) {
func (s *StatsCtx) flush() (cont bool, sleepFor time.Duration) { func (s *StatsCtx) flush() (cont bool, sleepFor time.Duration) {
id := s.unitIDGen() id := s.unitIDGen()
s.lock.Lock() s.confMu.Lock()
defer s.lock.Unlock() defer s.confMu.Unlock()
s.currMu.Lock() s.currMu.Lock()
defer s.currMu.Unlock() defer s.currMu.Unlock()
@ -575,10 +578,14 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, firstID uint32) {
// ShouldCount returns true if request for the host should be counted. // ShouldCount returns true if request for the host should be counted.
func (s *StatsCtx) ShouldCount(host string, _, _ uint16) bool { func (s *StatsCtx) ShouldCount(host string, _, _ uint16) bool {
s.confMu.RLock()
defer s.confMu.RUnlock()
return !s.isIgnored(host) return !s.isIgnored(host)
} }
// isIgnored returns true if the host is in the Ignored list. // isIgnored returns true if the host is in the ignored domains list. It
// assumes that s.confMu is locked for reading.
func (s *StatsCtx) isIgnored(host string) bool { func (s *StatsCtx) isIgnored(host string) bool {
return s.ignored.Has(host) return s.ignored.Has(host)
} }

View File

@ -72,6 +72,19 @@ type Entry struct {
// unit collects the statistics data for a specific period of time. // unit collects the statistics data for a specific period of time.
type unit struct { type unit struct {
// domains stores the number of requests for each domain.
domains map[string]uint64
// blockedDomains stores the number of requests for each domain that has
// been blocked.
blockedDomains map[string]uint64
// clients stores the number of requests from each client.
clients map[string]uint64
// nResult stores the number of requests grouped by it's result.
nResult []uint64
// id is the unique unit's identifier. It's set to an absolute hour number // id is the unique unit's identifier. It's set to an absolute hour number
// since the beginning of UNIX time by the default ID generating function. // since the beginning of UNIX time by the default ID generating function.
// //
@ -81,29 +94,20 @@ type unit struct {
// nTotal stores the total number of requests. // nTotal stores the total number of requests.
nTotal uint64 nTotal uint64
// nResult stores the number of requests grouped by it's result.
nResult []uint64
// timeSum stores the sum of processing time in milliseconds of each request // timeSum stores the sum of processing time in milliseconds of each request
// written by the unit. // written by the unit.
timeSum uint64 timeSum uint64
// domains stores the number of requests for each domain.
domains map[string]uint64
// blockedDomains stores the number of requests for each domain that has
// been blocked.
blockedDomains map[string]uint64
// clients stores the number of requests from each client.
clients map[string]uint64
} }
// newUnit allocates the new *unit. // newUnit allocates the new *unit.
func newUnit(id uint32) (u *unit) { func newUnit(id uint32) (u *unit) {
return &unit{ return &unit{
id: id, domains: map[string]uint64{},
blockedDomains: map[string]uint64{},
clients: map[string]uint64{},
nResult: make([]uint64, resultLast), nResult: make([]uint64, resultLast),
domains: make(map[string]uint64), id: id,
blockedDomains: make(map[string]uint64),
clients: make(map[string]uint64),
} }
} }
@ -115,19 +119,25 @@ type countPair struct {
} }
// unitDB is the structure for serializing statistics data into the database. // unitDB is the structure for serializing statistics data into the database.
//
// NOTE: Do not change the names or types of fields, as this structure is used
// for GOB encoding.
type unitDB struct { type unitDB struct {
// NTotal is the total number of requests.
NTotal uint64
// NResult is the number of requests by the result's kind. // NResult is the number of requests by the result's kind.
NResult []uint64 NResult []uint64
// Domains is the number of requests for each domain name. // Domains is the number of requests for each domain name.
Domains []countPair Domains []countPair
// BlockedDomains is the number of requests blocked for each domain name. // BlockedDomains is the number of requests blocked for each domain name.
BlockedDomains []countPair BlockedDomains []countPair
// Clients is the number of requests from each client. // Clients is the number of requests from each client.
Clients []countPair Clients []countPair
// NTotal is the total number of requests.
NTotal uint64
// TimeAvg is the average of processing times in milliseconds of all the // TimeAvg is the average of processing times in milliseconds of all the
// requests in the unit. // requests in the unit.
TimeAvg uint32 TimeAvg uint32