2020-12-07 11:32:06 +00:00
// Package stats provides units for managing statistics of the filtering DNS
// server.
2019-08-22 14:34:58 +01:00
package stats
import (
2022-08-17 12:09:13 +01:00
"fmt"
"io"
2019-08-22 14:34:58 +01:00
"net"
2022-11-09 11:37:07 +00:00
"net/netip"
2022-08-17 12:09:13 +01:00
"os"
"sync"
"sync/atomic"
"time"
2022-08-04 17:05:28 +01:00
"github.com/AdguardTeam/AdGuardHome/internal/aghhttp"
2022-08-17 12:09:13 +01:00
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
2023-02-13 15:15:33 +00:00
"github.com/AdguardTeam/golibs/stringutil"
2023-03-23 10:46:57 +00:00
"github.com/AdguardTeam/golibs/timeutil"
2022-08-17 12:09:13 +01:00
"go.etcd.io/bbolt"
2019-08-22 14:34:58 +01:00
)
2022-08-17 12:09:13 +01:00
// checkInterval returns true if days is valid to be used as statistics
// retention interval. The valid values are 0, 1, 7, 30 and 90.
func checkInterval ( days uint32 ) ( ok bool ) {
return days == 0 || days == 1 || days == 7 || days == 30 || days == 90
}
2023-03-23 10:46:57 +00:00
// validateIvl returns an error if ivl is less than an hour or more than a
// year.
func validateIvl ( ivl time . Duration ) ( err error ) {
if ivl < time . Hour {
return errors . Error ( "less than an hour" )
}
if ivl > timeutil . Day * 365 {
return errors . Error ( "more than a year" )
}
return nil
}
2022-08-04 17:05:28 +01:00
// Config is the configuration structure for the statistics collecting.
2019-09-16 14:14:52 +01:00
type Config struct {
2022-08-04 17:05:28 +01:00
// UnitID is the function to generate the identifier for current unit. If
// nil, the default function is used, see newUnitID.
UnitID UnitIDGenFunc
2019-09-25 13:36:09 +01:00
2022-08-04 17:05:28 +01:00
// ConfigModified will be called each time the configuration changed via web
// interface.
2019-09-25 13:36:09 +01:00
ConfigModified func ( )
2022-08-04 17:05:28 +01:00
// HTTPRegister is the function that registers handlers for the stats
// endpoints.
HTTPRegister aghhttp . RegisterFunc
2019-12-11 09:38:58 +00:00
2022-08-04 17:05:28 +01:00
// Filename is the name of the database file.
Filename string
2019-09-16 14:14:52 +01:00
2023-03-23 10:46:57 +00:00
// Limit is an upper limit for collecting statistics.
Limit time . Duration
2023-02-13 15:15:33 +00:00
// Enabled tells if the statistics are enabled.
Enabled bool
// Ignored is the list of host names, which should not be counted.
Ignored * stringutil . Set
2019-08-22 14:34:58 +01:00
}
2022-08-04 17:05:28 +01:00
// Interface is the statistics interface to be used by other packages.
type Interface interface {
// Start begins the statistics collecting.
2020-01-16 11:25:40 +00:00
Start ( )
2022-08-17 12:09:13 +01:00
io . Closer
2019-08-22 14:34:58 +01:00
2022-08-04 17:05:28 +01:00
// Update collects the incoming statistics data.
2019-08-22 14:34:58 +01:00
Update ( e Entry )
2022-08-04 17:05:28 +01:00
// GetTopClientIP returns at most limit IP addresses corresponding to the
// clients with the most number of requests.
2022-11-09 11:37:07 +00:00
TopClientsIP ( limit uint ) [ ] netip . Addr
2019-10-07 13:56:33 +01:00
2022-08-04 17:05:28 +01:00
// WriteDiskConfig puts the Interface's configuration to the dc.
2023-02-13 15:15:33 +00:00
WriteDiskConfig ( dc * Config )
// ShouldCount returns true if request for the host should be counted.
ShouldCount ( host string , qType , qClass uint16 ) bool
2019-08-22 14:34:58 +01:00
}
2022-08-17 12:09:13 +01:00
// StatsCtx collects the statistics and flushes it to the database. Its default
// flushing interval is one hour.
type StatsCtx struct {
// currMu protects curr.
currMu * sync . RWMutex
// curr is the actual statistics collection result.
curr * unit
2019-08-22 14:34:58 +01:00
2022-08-17 12:09:13 +01:00
// db is the opened statistics database, if any.
2023-02-08 10:39:04 +00:00
db atomic . Pointer [ bbolt . DB ]
2019-08-22 14:34:58 +01:00
2022-08-17 12:09:13 +01:00
// unitIDGen is the function that generates an identifier for the current
// unit. It's here for only testing purposes.
unitIDGen UnitIDGenFunc
2019-08-22 14:34:58 +01:00
2022-08-17 12:09:13 +01:00
// httpRegister is used to set HTTP handlers.
httpRegister aghhttp . RegisterFunc
// configModified is called whenever the configuration is modified via web
// interface.
configModified func ( )
// filename is the name of database file.
filename string
2023-02-13 15:15:33 +00:00
// lock protects all the fields below.
lock sync . Mutex
// enabled tells if the statistics are enabled.
enabled bool
2023-03-23 10:46:57 +00:00
// limit is an upper limit for collecting statistics.
limit time . Duration
2023-02-13 15:15:33 +00:00
// ignored is the list of host names, which should not be counted.
ignored * stringutil . Set
2022-08-17 12:09:13 +01:00
}
// New creates s from conf and properly initializes it. Don't use s before
// calling it's Start method.
func New ( conf Config ) ( s * StatsCtx , err error ) {
defer withRecovered ( & err )
s = & StatsCtx {
2023-02-13 15:15:33 +00:00
enabled : conf . Enabled ,
2022-08-17 12:09:13 +01:00
currMu : & sync . RWMutex { } ,
filename : conf . Filename ,
configModified : conf . ConfigModified ,
httpRegister : conf . HTTPRegister ,
2023-02-13 15:15:33 +00:00
ignored : conf . Ignored ,
2022-08-17 12:09:13 +01:00
}
2023-03-23 10:46:57 +00:00
err = validateIvl ( conf . Limit )
if err != nil {
return nil , fmt . Errorf ( "unsupported interval: %w" , err )
2022-08-17 12:09:13 +01:00
}
2023-03-23 10:46:57 +00:00
s . limit = conf . Limit
2022-08-17 12:09:13 +01:00
if s . unitIDGen = newUnitID ; conf . UnitID != nil {
s . unitIDGen = conf . UnitID
}
// TODO(e.burkov): Move the code below to the Start method.
err = s . openDB ( )
if err != nil {
return nil , fmt . Errorf ( "opening database: %w" , err )
}
var udb * unitDB
id := s . unitIDGen ( )
2023-02-08 10:39:04 +00:00
tx , err := s . db . Load ( ) . Begin ( true )
2022-08-17 12:09:13 +01:00
if err != nil {
return nil , fmt . Errorf ( "stats: opening a transaction: %w" , err )
}
2023-03-23 10:46:57 +00:00
deleted := deleteOldUnits ( tx , id - uint32 ( s . limit . Hours ( ) ) - 1 )
2022-08-17 12:09:13 +01:00
udb = loadUnitFromDB ( tx , id )
err = finishTxn ( tx , deleted > 0 )
if err != nil {
log . Error ( "stats: %s" , err )
}
s . curr = newUnit ( id )
s . curr . deserialize ( udb )
log . Debug ( "stats: initialized" )
return s , nil
}
// withRecovered turns the value recovered from panic if any into an error and
// combines it with the one pointed by orig. orig must be non-nil.
func withRecovered ( orig * error ) {
p := recover ( )
if p == nil {
return
}
var err error
switch p := p . ( type ) {
case error :
err = fmt . Errorf ( "panic: %w" , p )
default :
err = fmt . Errorf ( "panic: recovered value of type %[1]T: %[1]v" , p )
}
* orig = errors . WithDeferred ( * orig , err )
}
2022-11-09 11:37:07 +00:00
// type check
var _ Interface = ( * StatsCtx ) ( nil )
2023-01-09 10:38:31 +00:00
// Start implements the [Interface] interface for *StatsCtx.
2022-08-17 12:09:13 +01:00
func ( s * StatsCtx ) Start ( ) {
s . initWeb ( )
go s . periodicFlush ( )
}
// Close implements the io.Closer interface for *StatsCtx.
func ( s * StatsCtx ) Close ( ) ( err error ) {
defer func ( ) { err = errors . Annotate ( err , "stats: closing: %w" ) } ( )
2023-02-08 10:39:04 +00:00
db := s . db . Swap ( nil )
2022-08-17 12:09:13 +01:00
if db == nil {
return nil
}
defer func ( ) {
cerr := db . Close ( )
if cerr == nil {
log . Debug ( "stats: database closed" )
}
err = errors . WithDeferred ( err , cerr )
} ( )
tx , err := db . Begin ( true )
if err != nil {
return fmt . Errorf ( "opening transaction: %w" , err )
}
defer func ( ) { err = errors . WithDeferred ( err , finishTxn ( tx , err == nil ) ) } ( )
s . currMu . RLock ( )
defer s . currMu . RUnlock ( )
udb := s . curr . serialize ( )
return udb . flushUnitToDB ( tx , s . curr . id )
}
// Update implements the Interface interface for *StatsCtx.
func ( s * StatsCtx ) Update ( e Entry ) {
2023-02-13 15:15:33 +00:00
s . lock . Lock ( )
defer s . lock . Unlock ( )
2023-03-23 10:46:57 +00:00
if ! s . enabled || s . limit == 0 {
2022-08-17 12:09:13 +01:00
return
}
if e . Result == 0 || e . Result >= resultLast || e . Domain == "" || e . Client == "" {
log . Debug ( "stats: malformed entry" )
return
}
s . currMu . Lock ( )
defer s . currMu . Unlock ( )
if s . curr == nil {
log . Error ( "stats: current unit is nil" )
return
}
clientID := e . Client
if ip := net . ParseIP ( clientID ) ; ip != nil {
clientID = ip . String ( )
}
s . curr . add ( e . Result , e . Domain , clientID , uint64 ( e . Time ) )
}
// WriteDiskConfig implements the Interface interface for *StatsCtx.
2023-02-13 15:15:33 +00:00
func ( s * StatsCtx ) WriteDiskConfig ( dc * Config ) {
s . lock . Lock ( )
defer s . lock . Unlock ( )
2023-03-23 10:46:57 +00:00
dc . Limit = s . limit
2023-02-13 15:15:33 +00:00
dc . Enabled = s . enabled
dc . Ignored = s . ignored
2022-08-17 12:09:13 +01:00
}
2022-11-09 11:37:07 +00:00
// TopClientsIP implements the [Interface] interface for *StatsCtx.
func ( s * StatsCtx ) TopClientsIP ( maxCount uint ) ( ips [ ] netip . Addr ) {
2023-02-13 15:15:33 +00:00
s . lock . Lock ( )
defer s . lock . Unlock ( )
2023-03-23 10:46:57 +00:00
limit := uint32 ( s . limit . Hours ( ) )
2023-02-13 15:15:33 +00:00
if ! s . enabled || limit == 0 {
2022-08-17 12:09:13 +01:00
return nil
}
units , _ := s . loadUnits ( limit )
if units == nil {
return nil
}
// Collect data for all the clients to sort and crop it afterwards.
m := map [ string ] uint64 { }
for _ , u := range units {
for _ , it := range u . Clients {
m [ it . Name ] += it . Count
}
}
a := convertMapToSlice ( m , int ( maxCount ) )
2022-11-09 11:37:07 +00:00
ips = [ ] netip . Addr { }
2022-08-17 12:09:13 +01:00
for _ , it := range a {
2022-11-09 11:37:07 +00:00
ip , err := netip . ParseAddr ( it . Name )
if err == nil {
2022-08-17 12:09:13 +01:00
ips = append ( ips , ip )
}
}
return ips
}
// deleteOldUnits walks the buckets available to tx and deletes old units. It
// returns the number of deletions performed.
func deleteOldUnits ( tx * bbolt . Tx , firstID uint32 ) ( deleted int ) {
log . Debug ( "stats: deleting old units until id %d" , firstID )
// TODO(a.garipov): See if this is actually necessary. Looks like a rather
// bizarre solution.
const errStop errors . Error = "stop iteration"
walk := func ( name [ ] byte , _ * bbolt . Bucket ) ( err error ) {
nameID , ok := unitNameToID ( name )
if ok && nameID >= firstID {
return errStop
}
err = tx . DeleteBucket ( name )
if err != nil {
log . Debug ( "stats: deleting bucket: %s" , err )
return nil
}
log . Debug ( "stats: deleted unit %d (name %x)" , nameID , name )
deleted ++
return nil
}
err := tx . ForEach ( walk )
if err != nil && ! errors . Is ( err , errStop ) {
log . Debug ( "stats: deleting units: %s" , err )
}
return deleted
}
// openDB returns an error if the database can't be opened from the specified
// file. It's safe for concurrent use.
func ( s * StatsCtx ) openDB ( ) ( err error ) {
log . Debug ( "stats: opening database" )
var db * bbolt . DB
db , err = bbolt . Open ( s . filename , 0 o644 , nil )
if err != nil {
if err . Error ( ) == "invalid argument" {
log . Error ( "AdGuard Home cannot be initialized due to an incompatible file system.\nPlease read the explanation here: https://github.com/AdguardTeam/AdGuardHome/wiki/Getting-Started#limitations" )
}
return err
}
// Use defer to unlock the mutex as soon as possible.
defer log . Debug ( "stats: database opened" )
2023-02-08 10:39:04 +00:00
s . db . Store ( db )
2022-08-17 12:09:13 +01:00
return nil
}
func ( s * StatsCtx ) flush ( ) ( cont bool , sleepFor time . Duration ) {
id := s . unitIDGen ( )
2023-02-13 15:15:33 +00:00
s . lock . Lock ( )
defer s . lock . Unlock ( )
2022-08-17 12:09:13 +01:00
s . currMu . Lock ( )
defer s . currMu . Unlock ( )
ptr := s . curr
if ptr == nil {
return false , 0
}
2023-03-23 10:46:57 +00:00
limit := uint32 ( s . limit . Hours ( ) )
2022-08-17 12:09:13 +01:00
if limit == 0 || ptr . id == id {
return true , time . Second
}
2023-02-08 10:39:04 +00:00
db := s . db . Load ( )
2022-08-17 12:09:13 +01:00
if db == nil {
return true , 0
}
2022-08-22 12:21:41 +01:00
isCommitable := true
2022-08-17 12:09:13 +01:00
tx , err := db . Begin ( true )
if err != nil {
log . Error ( "stats: opening transaction: %s" , err )
return true , 0
}
2022-08-22 12:21:41 +01:00
defer func ( ) {
if err = finishTxn ( tx , isCommitable ) ; err != nil {
log . Error ( "stats: %s" , err )
}
} ( )
2022-08-17 12:09:13 +01:00
s . curr = newUnit ( id )
2022-08-22 12:21:41 +01:00
flushErr := ptr . serialize ( ) . flushUnitToDB ( tx , ptr . id )
if flushErr != nil {
log . Error ( "stats: flushing unit: %s" , flushErr )
2022-08-17 12:09:13 +01:00
isCommitable = false
}
2022-08-22 12:21:41 +01:00
delErr := tx . DeleteBucket ( idToUnitName ( id - limit ) )
if delErr != nil {
// TODO(e.burkov): Improve the algorithm of deleting the oldest bucket
// to avoid the error.
if errors . Is ( delErr , bbolt . ErrBucketNotFound ) {
log . Debug ( "stats: warning: deleting unit: %s" , delErr )
} else {
2022-08-17 12:09:13 +01:00
isCommitable = false
2022-08-22 12:21:41 +01:00
log . Error ( "stats: deleting unit: %s" , delErr )
2022-08-17 12:09:13 +01:00
}
}
return true , 0
}
// periodicFlush checks and flushes the unit to the database if the freshly
// generated unit ID differs from the current's ID. Flushing process includes:
2022-08-31 16:57:02 +01:00
// - swapping the current unit with the new empty one;
// - writing the current unit to the database;
// - removing the stale unit from the database.
2022-08-17 12:09:13 +01:00
func ( s * StatsCtx ) periodicFlush ( ) {
for cont , sleepFor := true , time . Duration ( 0 ) ; cont ; time . Sleep ( sleepFor ) {
cont , sleepFor = s . flush ( )
}
log . Debug ( "periodic flushing finished" )
}
2023-03-23 10:46:57 +00:00
// setLimit sets the limit. s.lock is expected to be locked.
//
// TODO(s.chzhen): Remove it when migration to the new API is over.
func ( s * StatsCtx ) setLimit ( limit time . Duration ) {
if limit != 0 {
2023-02-13 15:15:33 +00:00
s . enabled = true
2023-03-23 10:46:57 +00:00
s . limit = limit
log . Debug ( "stats: set limit: %d days" , limit / timeutil . Day )
2023-02-13 15:15:33 +00:00
return
2022-08-17 12:09:13 +01:00
}
2023-02-13 15:15:33 +00:00
s . enabled = false
log . Debug ( "stats: disabled" )
if err := s . clear ( ) ; err != nil {
log . Error ( "stats: %s" , err )
}
2022-08-17 12:09:13 +01:00
}
// Reset counters and clear database
func ( s * StatsCtx ) clear ( ) ( err error ) {
defer func ( ) { err = errors . Annotate ( err , "clearing: %w" ) } ( )
2023-02-08 10:39:04 +00:00
db := s . db . Swap ( nil )
2022-08-17 12:09:13 +01:00
if db != nil {
var tx * bbolt . Tx
tx , err = db . Begin ( true )
if err != nil {
log . Error ( "stats: opening a transaction: %s" , err )
} else if err = finishTxn ( tx , false ) ; err != nil {
// Don't wrap the error since it's informative enough as is.
return err
}
// Active transactions will continue using database, but new ones won't
// be created.
err = db . Close ( )
if err != nil {
return fmt . Errorf ( "closing database: %w" , err )
}
// All active transactions are now closed.
log . Debug ( "stats: database closed" )
}
err = os . Remove ( s . filename )
if err != nil {
log . Error ( "stats: %s" , err )
}
err = s . openDB ( )
if err != nil {
log . Error ( "stats: opening database: %s" , err )
}
// Use defer to unlock the mutex as soon as possible.
defer log . Debug ( "stats: cleared" )
s . currMu . Lock ( )
defer s . currMu . Unlock ( )
s . curr = newUnit ( s . unitIDGen ( ) )
return nil
}
func ( s * StatsCtx ) loadUnits ( limit uint32 ) ( units [ ] * unitDB , firstID uint32 ) {
2023-02-08 10:39:04 +00:00
db := s . db . Load ( )
2022-08-17 12:09:13 +01:00
if db == nil {
return nil , 0
}
// Use writable transaction to ensure any ongoing writable transaction is
// taken into account.
tx , err := db . Begin ( true )
if err != nil {
log . Error ( "stats: opening transaction: %s" , err )
return nil , 0
}
s . currMu . RLock ( )
defer s . currMu . RUnlock ( )
cur := s . curr
var curID uint32
if cur != nil {
curID = cur . id
} else {
curID = s . unitIDGen ( )
}
// Per-hour units.
units = make ( [ ] * unitDB , 0 , limit )
firstID = curID - limit + 1
for i := firstID ; i != curID ; i ++ {
u := loadUnitFromDB ( tx , i )
if u == nil {
u = & unitDB { NResult : make ( [ ] uint64 , resultLast ) }
}
units = append ( units , u )
}
err = finishTxn ( tx , false )
if err != nil {
log . Error ( "stats: %s" , err )
}
2021-01-27 15:32:13 +00:00
2022-08-17 12:09:13 +01:00
if cur != nil {
units = append ( units , cur . serialize ( ) )
}
2022-08-04 17:05:28 +01:00
2022-08-17 12:09:13 +01:00
if unitsLen := len ( units ) ; unitsLen != int ( limit ) {
log . Fatalf ( "loaded %d units whilst the desired number is %d" , unitsLen , limit )
}
2022-08-04 17:05:28 +01:00
2022-08-17 12:09:13 +01:00
return units , firstID
2019-08-22 14:34:58 +01:00
}
2023-02-13 15:15:33 +00:00
// ShouldCount returns true if request for the host should be counted.
func ( s * StatsCtx ) ShouldCount ( host string , _ , _ uint16 ) bool {
return ! s . isIgnored ( host )
}
// isIgnored returns true if the host is in the Ignored list.
func ( s * StatsCtx ) isIgnored ( host string ) bool {
return s . ignored . Has ( host )
}