From 2c0ffd91fddd286254b53be790146a2931b7b55b Mon Sep 17 00:00:00 2001 From: Stanislav Chzhen Date: Mon, 2 Sep 2024 19:47:11 +0300 Subject: [PATCH] all: slog stats --- internal/home/dns.go | 2 + internal/stats/http.go | 3 +- internal/stats/http_test.go | 2 + internal/stats/stats.go | 87 +++++++++++++++------------ internal/stats/stats_internal_test.go | 4 ++ internal/stats/stats_test.go | 4 ++ internal/stats/unit.go | 13 ++-- 7 files changed, 70 insertions(+), 45 deletions(-) diff --git a/internal/home/dns.go b/internal/home/dns.go index ed1f1675..41e521ce 100644 --- a/internal/home/dns.go +++ b/internal/home/dns.go @@ -20,6 +20,7 @@ import ( "github.com/AdguardTeam/AdGuardHome/internal/stats" "github.com/AdguardTeam/golibs/errors" "github.com/AdguardTeam/golibs/log" + "github.com/AdguardTeam/golibs/logutil/slogutil" "github.com/AdguardTeam/golibs/netutil" "github.com/ameshkov/dnscrypt/v2" yaml "gopkg.in/yaml.v3" @@ -54,6 +55,7 @@ func initDNS(l *slog.Logger) (err error) { } statsConf := stats.Config{ + Logger: l.With(slogutil.KeyPrefix, "stats"), Filename: filepath.Join(statsDir, "stats.db"), Limit: config.Stats.Interval.Duration, ConfigModified: onConfigModified, diff --git a/internal/stats/http.go b/internal/stats/http.go index faec0d14..d151e74d 100644 --- a/internal/stats/http.go +++ b/internal/stats/http.go @@ -10,7 +10,6 @@ import ( "github.com/AdguardTeam/AdGuardHome/internal/aghalg" "github.com/AdguardTeam/AdGuardHome/internal/aghhttp" "github.com/AdguardTeam/AdGuardHome/internal/aghnet" - "github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/timeutil" ) @@ -62,7 +61,7 @@ func (s *StatsCtx) handleStats(w http.ResponseWriter, r *http.Request) { resp, ok = s.getData(uint32(s.limit.Hours())) }() - log.Debug("stats: prepared data in %v", time.Since(start)) + s.logger.Debug("prepared data", "elapsed", time.Since(start)) if !ok { // Don't bring the message to the lower case since it's a part of UI diff --git a/internal/stats/http_test.go b/internal/stats/http_test.go index 7e358a03..b53668d6 100644 --- a/internal/stats/http_test.go +++ b/internal/stats/http_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/AdguardTeam/AdGuardHome/internal/aghalg" + "github.com/AdguardTeam/golibs/logutil/slogutil" "github.com/AdguardTeam/golibs/testutil" "github.com/AdguardTeam/golibs/timeutil" "github.com/stretchr/testify/assert" @@ -24,6 +25,7 @@ func TestHandleStatsConfig(t *testing.T) { ) conf := Config{ + Logger: slogutil.NewDiscardLogger(), UnitID: func() (id uint32) { return 0 }, ConfigModified: func() {}, ShouldCountClient: func([]string) bool { return true }, diff --git a/internal/stats/stats.go b/internal/stats/stats.go index e9533ca2..b3372c7b 100644 --- a/internal/stats/stats.go +++ b/internal/stats/stats.go @@ -5,6 +5,7 @@ package stats import ( "fmt" "io" + "log/slog" "net/netip" "os" "sync" @@ -14,7 +15,7 @@ import ( "github.com/AdguardTeam/AdGuardHome/internal/aghhttp" "github.com/AdguardTeam/AdGuardHome/internal/aghnet" "github.com/AdguardTeam/golibs/errors" - "github.com/AdguardTeam/golibs/log" + "github.com/AdguardTeam/golibs/logutil/slogutil" "github.com/AdguardTeam/golibs/timeutil" "go.etcd.io/bbolt" ) @@ -43,6 +44,10 @@ func validateIvl(ivl time.Duration) (err error) { // // Do not alter any fields of this structure after using it. type Config struct { + // Logger is used for logging the operation of the statistics management. + // It must not be nil. + Logger *slog.Logger + // UnitID is the function to generate the identifier for current unit. If // nil, the default function is used, see newUnitID. UnitID UnitIDGenFunc @@ -96,6 +101,10 @@ type Interface interface { // StatsCtx collects the statistics and flushes it to the database. Its default // flushing interval is one hour. type StatsCtx struct { + // logger is used for logging the operation of the statistics management. + // It must not be nil. + logger *slog.Logger + // currMu protects curr. currMu *sync.RWMutex // curr is the actual statistics collection result. @@ -150,6 +159,7 @@ func New(conf Config) (s *StatsCtx, err error) { } s = &StatsCtx{ + logger: conf.Logger, currMu: &sync.RWMutex{}, httpRegister: conf.HTTPRegister, configModified: conf.ConfigModified, @@ -181,18 +191,18 @@ func New(conf Config) (s *StatsCtx, err error) { return nil, fmt.Errorf("stats: opening a transaction: %w", err) } - deleted := deleteOldUnits(tx, id-uint32(s.limit.Hours())-1) - udb = loadUnitFromDB(tx, id) + deleted := s.deleteOldUnits(tx, id-uint32(s.limit.Hours())-1) + udb = s.loadUnitFromDB(tx, id) err = finishTxn(tx, deleted > 0) if err != nil { - log.Error("stats: %s", err) + s.logger.Error("finishing transacation", slogutil.KeyError, err) } s.curr = newUnit(id) s.curr.deserialize(udb) - log.Debug("stats: initialized") + s.logger.Debug("initialized") return s, nil } @@ -237,7 +247,7 @@ func (s *StatsCtx) Close() (err error) { defer func() { cerr := db.Close() if cerr == nil { - log.Debug("stats: database closed") + s.logger.Debug("database closed") } err = errors.WithDeferred(err, cerr) @@ -254,7 +264,7 @@ func (s *StatsCtx) Close() (err error) { udb := s.curr.serialize() - return udb.flushUnitToDB(tx, s.curr.id) + return s.flushUnitToDB(udb, tx, s.curr.id) } // Update implements the [Interface] interface for *StatsCtx. e must not be @@ -269,7 +279,7 @@ func (s *StatsCtx) Update(e *Entry) { err := e.validate() if err != nil { - log.Debug("stats: updating: validating entry: %s", err) + s.logger.Debug("validating entry", slogutil.KeyError, err) return } @@ -278,7 +288,7 @@ func (s *StatsCtx) Update(e *Entry) { defer s.currMu.Unlock() if s.curr == nil { - log.Error("stats: current unit is nil") + s.logger.Error("current unit is nil") return } @@ -333,8 +343,8 @@ func (s *StatsCtx) TopClientsIP(maxCount uint) (ips []netip.Addr) { // deleteOldUnits walks the buckets available to tx and deletes old units. It // returns the number of deletions performed. -func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) { - log.Debug("stats: deleting old units until id %d", firstID) +func (s *StatsCtx) deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) { + s.logger.Debug("deleting old units up to", "unit", firstID) // TODO(a.garipov): See if this is actually necessary. Looks like a rather // bizarre solution. @@ -348,12 +358,12 @@ func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) { err = tx.DeleteBucket(name) if err != nil { - log.Debug("stats: deleting bucket: %s", err) + s.logger.Debug("deleting bucket", slogutil.KeyError, err) return nil } - log.Debug("stats: deleted unit %d (name %x)", nameID, name) + s.logger.Debug("deleted unit", "name_id", nameID, "name", fmt.Sprintf("%x", name)) deleted++ @@ -362,7 +372,7 @@ func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) { err := tx.ForEach(walk) if err != nil && !errors.Is(err, errStop) { - log.Debug("stats: deleting units: %s", err) + s.logger.Debug("deleting units", slogutil.KeyError, err) } return deleted @@ -371,20 +381,19 @@ func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) { // openDB returns an error if the database can't be opened from the specified // file. It's safe for concurrent use. func (s *StatsCtx) openDB() (err error) { - log.Debug("stats: opening database") + s.logger.Debug("opening database") var db *bbolt.DB db, err = bbolt.Open(s.filename, 0o644, nil) if err != nil { if err.Error() == "invalid argument" { - log.Error("AdGuard Home cannot be initialized due to an incompatible file system.\nPlease read the explanation here: https://github.com/AdguardTeam/AdGuardHome/wiki/Getting-Started#limitations") + s.logger.Error("AdGuard Home cannot be initialized due to an incompatible file system.\nPlease read the explanation here: https://github.com/AdguardTeam/AdGuardHome/wiki/Getting-Started#limitations") } return err } - // Use defer to unlock the mutex as soon as possible. - defer log.Debug("stats: database opened") + defer s.logger.Debug("database opened") s.db.Store(db) @@ -424,21 +433,22 @@ func (s *StatsCtx) flushDB(id, limit uint32, ptr *unit) (cont bool, sleepFor tim isCommitable := true tx, err := db.Begin(true) if err != nil { - log.Error("stats: opening transaction: %s", err) + s.logger.Error("opening transaction", slogutil.KeyError, err) return true, 0 } defer func() { if err = finishTxn(tx, isCommitable); err != nil { - log.Error("stats: %s", err) + s.logger.Error("finishing transaction", slogutil.KeyError, err) } }() s.curr = newUnit(id) - flushErr := ptr.serialize().flushUnitToDB(tx, ptr.id) + udb := ptr.serialize() + flushErr := s.flushUnitToDB(udb, tx, ptr.id) if flushErr != nil { - log.Error("stats: flushing unit: %s", flushErr) + s.logger.Error("flushing unit", slogutil.KeyError, flushErr) isCommitable = false } @@ -446,11 +456,12 @@ func (s *StatsCtx) flushDB(id, limit uint32, ptr *unit) (cont bool, sleepFor tim if delErr != nil { // TODO(e.burkov): Improve the algorithm of deleting the oldest bucket // to avoid the error. + msg := "deleting bucket" if errors.Is(delErr, bbolt.ErrBucketNotFound) { - log.Debug("stats: warning: deleting unit: %s", delErr) + s.logger.Warn(msg, slogutil.KeyError, delErr) } else { isCommitable = false - log.Error("stats: deleting unit: %s", delErr) + s.logger.Error(msg, slogutil.KeyError, delErr) } } @@ -467,7 +478,7 @@ func (s *StatsCtx) periodicFlush() { cont, sleepFor = s.flush() } - log.Debug("periodic flushing finished") + s.logger.Debug("periodic flushing finished") } // setLimit sets the limit. s.lock is expected to be locked. @@ -477,16 +488,16 @@ func (s *StatsCtx) setLimit(limit time.Duration) { if limit != 0 { s.enabled = true s.limit = limit - log.Debug("stats: set limit: %d days", limit/timeutil.Day) + s.logger.Debug("setting limit in days", "num", limit/timeutil.Day) return } s.enabled = false - log.Debug("stats: disabled") + s.logger.Debug("disabled") if err := s.clear(); err != nil { - log.Error("stats: %s", err) + s.logger.Error("clearing", slogutil.KeyError, err) } } @@ -499,7 +510,7 @@ func (s *StatsCtx) clear() (err error) { var tx *bbolt.Tx tx, err = db.Begin(true) if err != nil { - log.Error("stats: opening a transaction: %s", err) + s.logger.Error("opening transaction", slogutil.KeyError, err) } else if err = finishTxn(tx, false); err != nil { // Don't wrap the error since it's informative enough as is. return err @@ -513,21 +524,21 @@ func (s *StatsCtx) clear() (err error) { } // All active transactions are now closed. - log.Debug("stats: database closed") + s.logger.Debug("database closed") } err = os.Remove(s.filename) if err != nil { - log.Error("stats: %s", err) + s.logger.Error("removing", slogutil.KeyError, err) } err = s.openDB() if err != nil { - log.Error("stats: opening database: %s", err) + s.logger.Error("opening database", slogutil.KeyError, err) } // Use defer to unlock the mutex as soon as possible. - defer log.Debug("stats: cleared") + defer s.logger.Debug("cleared") s.currMu.Lock() defer s.currMu.Unlock() @@ -548,7 +559,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) { // taken into account. tx, err := db.Begin(true) if err != nil { - log.Error("stats: opening transaction: %s", err) + s.logger.Error("opening transaction", slogutil.KeyError, err) return nil, 0 } @@ -568,7 +579,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) { units = make([]*unitDB, 0, limit) firstID := curID - limit + 1 for i := firstID; i != curID; i++ { - u := loadUnitFromDB(tx, i) + u := s.loadUnitFromDB(tx, i) if u == nil { u = &unitDB{NResult: make([]uint64, resultLast)} } @@ -577,7 +588,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) { err = finishTxn(tx, false) if err != nil { - log.Error("stats: %s", err) + s.logger.Error("finishing transaction", slogutil.KeyError, err) } if cur != nil { @@ -585,7 +596,9 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) { } if unitsLen := len(units); unitsLen != int(limit) { - log.Fatalf("loaded %d units whilst the desired number is %d", unitsLen, limit) + // Should not happen. + s.logger.Error("number of loaded units not equal to limit", "loaded", unitsLen, "limit", limit) + units = units[:limit] } return units, curID diff --git a/internal/stats/stats_internal_test.go b/internal/stats/stats_internal_test.go index 3423c7ad..e7ed446a 100644 --- a/internal/stats/stats_internal_test.go +++ b/internal/stats/stats_internal_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/AdguardTeam/golibs/logutil/slogutil" "github.com/AdguardTeam/golibs/testutil" "github.com/AdguardTeam/golibs/timeutil" "github.com/stretchr/testify/assert" @@ -18,6 +19,7 @@ func TestStats_races(t *testing.T) { var r uint32 idGen := func() (id uint32) { return atomic.LoadUint32(&r) } conf := Config{ + Logger: slogutil.NewDiscardLogger(), ShouldCountClient: func([]string) bool { return true }, UnitID: idGen, Filename: filepath.Join(t.TempDir(), "./stats.db"), @@ -94,6 +96,7 @@ func TestStatsCtx_FillCollectedStats_daily(t *testing.T) { ) s, err := New(Config{ + Logger: slogutil.NewDiscardLogger(), ShouldCountClient: func([]string) bool { return true }, Filename: filepath.Join(t.TempDir(), "./stats.db"), Limit: time.Hour, @@ -151,6 +154,7 @@ func TestStatsCtx_DataFromUnits_month(t *testing.T) { const hoursInMonth = 720 s, err := New(Config{ + Logger: slogutil.NewDiscardLogger(), ShouldCountClient: func([]string) bool { return true }, Filename: filepath.Join(t.TempDir(), "./stats.db"), Limit: time.Hour, diff --git a/internal/stats/stats_test.go b/internal/stats/stats_test.go index 2f7c526a..e5af7031 100644 --- a/internal/stats/stats_test.go +++ b/internal/stats/stats_test.go @@ -13,6 +13,7 @@ import ( "github.com/AdguardTeam/AdGuardHome/internal/aghnet" "github.com/AdguardTeam/AdGuardHome/internal/stats" + "github.com/AdguardTeam/golibs/logutil/slogutil" "github.com/AdguardTeam/golibs/netutil" "github.com/AdguardTeam/golibs/testutil" "github.com/AdguardTeam/golibs/timeutil" @@ -55,6 +56,7 @@ func TestStats(t *testing.T) { handlers := map[string]http.Handler{} conf := stats.Config{ + Logger: slogutil.NewDiscardLogger(), ShouldCountClient: func([]string) bool { return true }, Filename: filepath.Join(t.TempDir(), "stats.db"), Limit: timeutil.Day, @@ -171,6 +173,7 @@ func TestLargeNumbers(t *testing.T) { handlers := map[string]http.Handler{} conf := stats.Config{ + Logger: slogutil.NewDiscardLogger(), ShouldCountClient: func([]string) bool { return true }, Filename: filepath.Join(t.TempDir(), "stats.db"), Limit: timeutil.Day, @@ -222,6 +225,7 @@ func TestShouldCount(t *testing.T) { require.NoError(t, err) s, err := stats.New(stats.Config{ + Logger: slogutil.NewDiscardLogger(), Enabled: true, Filename: filepath.Join(t.TempDir(), "stats.db"), Limit: timeutil.Day, diff --git a/internal/stats/unit.go b/internal/stats/unit.go index 621f1cda..e9aeb87b 100644 --- a/internal/stats/unit.go +++ b/internal/stats/unit.go @@ -10,7 +10,7 @@ import ( "github.com/AdguardTeam/AdGuardHome/internal/aghnet" "github.com/AdguardTeam/golibs/errors" - "github.com/AdguardTeam/golibs/log" + "github.com/AdguardTeam/golibs/logutil/slogutil" "go.etcd.io/bbolt" "golang.org/x/exp/maps" ) @@ -277,13 +277,14 @@ func (u *unit) serialize() (udb *unitDB) { } } -func loadUnitFromDB(tx *bbolt.Tx, id uint32) (udb *unitDB) { +// loadUnitFromDB loads unit by id from the database. +func (s *StatsCtx) loadUnitFromDB(tx *bbolt.Tx, id uint32) (udb *unitDB) { bkt := tx.Bucket(idToUnitName(id)) if bkt == nil { return nil } - log.Tracef("Loading unit %d", id) + s.logger.Debug("loading unit", "id", id) var buf bytes.Buffer buf.Write(bkt.Get([]byte{0})) @@ -291,7 +292,7 @@ func loadUnitFromDB(tx *bbolt.Tx, id uint32) (udb *unitDB) { err := gob.NewDecoder(&buf).Decode(udb) if err != nil { - log.Error("gob Decode: %s", err) + s.logger.Error("gob decode", slogutil.KeyError, err) return nil } @@ -339,8 +340,8 @@ func (u *unit) add(e *Entry) { } // flushUnitToDB puts udb to the database at id. -func (udb *unitDB) flushUnitToDB(tx *bbolt.Tx, id uint32) (err error) { - log.Debug("stats: flushing unit with id %d and total of %d", id, udb.NTotal) +func (s *StatsCtx) flushUnitToDB(udb *unitDB, tx *bbolt.Tx, id uint32) (err error) { + s.logger.Debug("flushing unit", "id", id, "req_num", udb.NTotal) bkt, err := tx.CreateBucketIfNotExists(idToUnitName(id)) if err != nil {