Pull request 1963: AG-24051-stats-collector
Updates #6108. Squashed commit of the following: commit ca584c8dbbece70b90f6298a0a18a933a698fcf6 Merge: b6e13623228cfde921
Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Aug 23 17:00:52 2023 +0300 Merge branch 'master' into AG-24051-stats-collector commit b6e136232dd619ce09150b608ae5017676031e25 Merge: bbd4780b03722c2846
Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Aug 23 16:25:45 2023 +0300 Merge branch 'master' into AG-24051-stats-collector commit bbd4780b03a1c954fe2b349d27f1ab3bf7739518 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Tue Aug 22 17:47:51 2023 +0300 stats: imp test commit cfe3b9bdf5fd75bff98f985884b3bff8a88ae8ee Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Tue Aug 22 16:57:31 2023 +0300 stats: add test commit cb579a157056f79c1c3d08479a718698a74e0bb9 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Mon Aug 21 15:24:40 2023 +0300 stats: imp docs commit 3c6ab3affb9ac402db7e3cc3d9696154770e1037 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Thu Aug 17 14:41:35 2023 +0300 stats: imp code commit 125a31b73bb31f7f4886daad9ce7e3bbc97b38c9 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Aug 16 12:29:10 2023 +0300 stats: imp test commit 1ba1eb3b7bd540621bf17ca50d4c2ba4bc55a9f8 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Tue Aug 15 19:57:34 2023 +0300 stats: add test commit 46622f4fdf2775ddaba626b9786af183680e8889 Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Tue Aug 15 15:47:06 2023 +0300 stats: rm stats collector
This commit is contained in:
parent
28cfde9212
commit
4b04c620f2
|
@ -53,7 +53,7 @@ func (s *StatsCtx) handleStats(w http.ResponseWriter, r *http.Request) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
resp StatsResp
|
resp *StatsResp
|
||||||
ok bool
|
ok bool
|
||||||
)
|
)
|
||||||
func() {
|
func() {
|
||||||
|
|
|
@ -535,7 +535,8 @@ func (s *StatsCtx) clear() (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, firstID uint32) {
|
// loadUnits returns stored units from the database and current unit ID.
|
||||||
|
func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) {
|
||||||
db := s.db.Load()
|
db := s.db.Load()
|
||||||
if db == nil {
|
if db == nil {
|
||||||
return nil, 0
|
return nil, 0
|
||||||
|
@ -555,7 +556,6 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, firstID uint32) {
|
||||||
|
|
||||||
cur := s.curr
|
cur := s.curr
|
||||||
|
|
||||||
var curID uint32
|
|
||||||
if cur != nil {
|
if cur != nil {
|
||||||
curID = cur.id
|
curID = cur.id
|
||||||
} else {
|
} else {
|
||||||
|
@ -564,7 +564,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, firstID uint32) {
|
||||||
|
|
||||||
// Per-hour units.
|
// Per-hour units.
|
||||||
units = make([]*unitDB, 0, limit)
|
units = make([]*unitDB, 0, limit)
|
||||||
firstID = curID - limit + 1
|
firstID := curID - limit + 1
|
||||||
for i := firstID; i != curID; i++ {
|
for i := firstID; i != curID; i++ {
|
||||||
u := loadUnitFromDB(tx, i)
|
u := loadUnitFromDB(tx, i)
|
||||||
if u == nil {
|
if u == nil {
|
||||||
|
@ -586,7 +586,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, firstID uint32) {
|
||||||
log.Fatalf("loaded %d units whilst the desired number is %d", unitsLen, limit)
|
log.Fatalf("loaded %d units whilst the desired number is %d", unitsLen, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
return units, firstID
|
return units, curID
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShouldCount returns true if request for the host should be counted.
|
// ShouldCount returns true if request for the host should be counted.
|
||||||
|
|
|
@ -14,24 +14,6 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO(e.burkov): Use more realistic data.
|
|
||||||
func TestStatsCollector(t *testing.T) {
|
|
||||||
ng := func(_ *unitDB) uint64 { return 0 }
|
|
||||||
units := make([]*unitDB, 720)
|
|
||||||
|
|
||||||
t.Run("hours", func(t *testing.T) {
|
|
||||||
statsData := statsCollector(units, 0, Hours, ng)
|
|
||||||
assert.Len(t, statsData, 720)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("days", func(t *testing.T) {
|
|
||||||
for i := 0; i != 25; i++ {
|
|
||||||
statsData := statsCollector(units, uint32(i), Days, ng)
|
|
||||||
require.Lenf(t, statsData, 30, "i=%d", i)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStats_races(t *testing.T) {
|
func TestStats_races(t *testing.T) {
|
||||||
var r uint32
|
var r uint32
|
||||||
idGen := func() (id uint32) { return atomic.LoadUint32(&r) }
|
idGen := func() (id uint32) { return atomic.LoadUint32(&r) }
|
||||||
|
@ -103,3 +85,86 @@ func TestStats_races(t *testing.T) {
|
||||||
finWG.Wait()
|
finWG.Wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStatsCtx_FillCollectedStats_daily(t *testing.T) {
|
||||||
|
const (
|
||||||
|
daysCount = 10
|
||||||
|
|
||||||
|
timeUnits = "days"
|
||||||
|
)
|
||||||
|
|
||||||
|
s, err := New(Config{
|
||||||
|
ShouldCountClient: func([]string) bool { return true },
|
||||||
|
Filename: filepath.Join(t.TempDir(), "./stats.db"),
|
||||||
|
Limit: time.Hour,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testutil.CleanupAndRequireSuccess(t, s.Close)
|
||||||
|
|
||||||
|
sum := make([][]uint64, resultLast)
|
||||||
|
sum[RFiltered] = make([]uint64, daysCount)
|
||||||
|
sum[RSafeBrowsing] = make([]uint64, daysCount)
|
||||||
|
sum[RParental] = make([]uint64, daysCount)
|
||||||
|
|
||||||
|
total := make([]uint64, daysCount)
|
||||||
|
|
||||||
|
dailyData := []*unitDB{}
|
||||||
|
|
||||||
|
for i := 0; i < daysCount*24; i++ {
|
||||||
|
n := uint64(i)
|
||||||
|
nResult := make([]uint64, resultLast)
|
||||||
|
nResult[RFiltered] = n
|
||||||
|
nResult[RSafeBrowsing] = n
|
||||||
|
nResult[RParental] = n
|
||||||
|
|
||||||
|
day := i / 24
|
||||||
|
sum[RFiltered][day] += n
|
||||||
|
sum[RSafeBrowsing][day] += n
|
||||||
|
sum[RParental][day] += n
|
||||||
|
|
||||||
|
t := n * 3
|
||||||
|
|
||||||
|
total[day] += t
|
||||||
|
|
||||||
|
dailyData = append(dailyData, &unitDB{
|
||||||
|
NTotal: t,
|
||||||
|
NResult: nResult,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
data := &StatsResp{}
|
||||||
|
|
||||||
|
// In this way we will not skip first hours.
|
||||||
|
curID := uint32(daysCount * 24)
|
||||||
|
|
||||||
|
s.fillCollectedStats(data, dailyData, curID)
|
||||||
|
|
||||||
|
assert.Equal(t, timeUnits, data.TimeUnits)
|
||||||
|
assert.Equal(t, sum[RFiltered], data.BlockedFiltering)
|
||||||
|
assert.Equal(t, sum[RSafeBrowsing], data.ReplacedSafebrowsing)
|
||||||
|
assert.Equal(t, sum[RParental], data.ReplacedParental)
|
||||||
|
assert.Equal(t, total, data.DNSQueries)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsCtx_DataFromUnits_month(t *testing.T) {
|
||||||
|
const hoursInMonth = 720
|
||||||
|
|
||||||
|
s, err := New(Config{
|
||||||
|
ShouldCountClient: func([]string) bool { return true },
|
||||||
|
Filename: filepath.Join(t.TempDir(), "./stats.db"),
|
||||||
|
Limit: time.Hour,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testutil.CleanupAndRequireSuccess(t, s.Close)
|
||||||
|
|
||||||
|
units, curID := s.loadUnits(hoursInMonth)
|
||||||
|
require.Len(t, units, hoursInMonth)
|
||||||
|
|
||||||
|
var h uint32
|
||||||
|
for h = 1; h <= hoursInMonth; h++ {
|
||||||
|
data := s.dataFromUnits(units[:h], curID)
|
||||||
|
require.NotNil(t, data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -30,13 +30,10 @@ const (
|
||||||
// the statistics unit.
|
// the statistics unit.
|
||||||
type UnitIDGenFunc func() (id uint32)
|
type UnitIDGenFunc func() (id uint32)
|
||||||
|
|
||||||
// TimeUnit is the unit of measuring time while aggregating the statistics.
|
// Supported values of [StatsResp.TimeUnits].
|
||||||
type TimeUnit int
|
|
||||||
|
|
||||||
// Supported TimeUnit values.
|
|
||||||
const (
|
const (
|
||||||
Hours TimeUnit = iota
|
timeUnitsHours = "hours"
|
||||||
Days
|
timeUnitsDays = "days"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Result is the resulting code of processing the DNS request.
|
// Result is the resulting code of processing the DNS request.
|
||||||
|
@ -368,42 +365,6 @@ func convertTopSlice(a []countPair) (m []map[string]uint64) {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// numsGetter is a signature for statsCollector argument.
|
|
||||||
type numsGetter func(u *unitDB) (num uint64)
|
|
||||||
|
|
||||||
// statsCollector collects statisctics for the given *unitDB slice by specified
|
|
||||||
// timeUnit using ng to retrieve data.
|
|
||||||
func statsCollector(units []*unitDB, firstID uint32, timeUnit TimeUnit, ng numsGetter) (nums []uint64) {
|
|
||||||
if timeUnit == Hours {
|
|
||||||
nums = make([]uint64, 0, len(units))
|
|
||||||
for _, u := range units {
|
|
||||||
nums = append(nums, ng(u))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Per time unit counters: 720 hours may span 31 days, so we
|
|
||||||
// skip data for the first day in this case.
|
|
||||||
// align_ceil(24)
|
|
||||||
firstDayID := (firstID + 24 - 1) / 24 * 24
|
|
||||||
|
|
||||||
var sum uint64
|
|
||||||
id := firstDayID
|
|
||||||
nextDayID := firstDayID + 24
|
|
||||||
for i := int(firstDayID - firstID); i != len(units); i++ {
|
|
||||||
sum += ng(units[i])
|
|
||||||
if id == nextDayID {
|
|
||||||
nums = append(nums, sum)
|
|
||||||
sum = 0
|
|
||||||
nextDayID += 24
|
|
||||||
}
|
|
||||||
id++
|
|
||||||
}
|
|
||||||
if id <= nextDayID {
|
|
||||||
nums = append(nums, sum)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nums
|
|
||||||
}
|
|
||||||
|
|
||||||
// pairsGetter is a signature for topsCollector argument.
|
// pairsGetter is a signature for topsCollector argument.
|
||||||
type pairsGetter func(u *unitDB) (pairs []countPair)
|
type pairsGetter func(u *unitDB) (pairs []countPair)
|
||||||
|
|
||||||
|
@ -442,9 +403,9 @@ func topsCollector(units []*unitDB, max int, ignored *stringutil.Set, pg pairsGe
|
||||||
//
|
//
|
||||||
// The total counters (DNS queries, blocked, etc.) are just the sum of data
|
// The total counters (DNS queries, blocked, etc.) are just the sum of data
|
||||||
// for all units.
|
// for all units.
|
||||||
func (s *StatsCtx) getData(limit uint32) (StatsResp, bool) {
|
func (s *StatsCtx) getData(limit uint32) (resp *StatsResp, ok bool) {
|
||||||
if limit == 0 {
|
if limit == 0 {
|
||||||
return StatsResp{
|
return &StatsResp{
|
||||||
TimeUnits: "days",
|
TimeUnits: "days",
|
||||||
|
|
||||||
TopBlocked: []topAddrs{},
|
TopBlocked: []topAddrs{},
|
||||||
|
@ -460,38 +421,19 @@ func (s *StatsCtx) getData(limit uint32) (StatsResp, bool) {
|
||||||
}, true
|
}, true
|
||||||
}
|
}
|
||||||
|
|
||||||
timeUnit := Hours
|
units, curID := s.loadUnits(limit)
|
||||||
if limit/24 > 7 {
|
|
||||||
timeUnit = Days
|
|
||||||
}
|
|
||||||
|
|
||||||
units, firstID := s.loadUnits(limit)
|
|
||||||
if units == nil {
|
if units == nil {
|
||||||
return StatsResp{}, false
|
return &StatsResp{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
dnsQueries := statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NTotal })
|
return s.dataFromUnits(units, curID), true
|
||||||
if timeUnit != Hours && len(dnsQueries) != int(limit/24) {
|
|
||||||
log.Fatalf("len(dnsQueries) != limit: %d %d", len(dnsQueries), limit)
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.dataFromUnits(units, dnsQueries, firstID, timeUnit), true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// dataFromUnits collects and returns the statistics data.
|
// dataFromUnits collects and returns the statistics data.
|
||||||
func (s *StatsCtx) dataFromUnits(
|
func (s *StatsCtx) dataFromUnits(units []*unitDB, curID uint32) (resp *StatsResp) {
|
||||||
units []*unitDB,
|
|
||||||
dnsQueries []uint64,
|
|
||||||
firstID uint32,
|
|
||||||
timeUnit TimeUnit,
|
|
||||||
) (resp StatsResp) {
|
|
||||||
topUpstreamsResponses, topUpstreamsAvgTime := topUpstreamsPairs(units)
|
topUpstreamsResponses, topUpstreamsAvgTime := topUpstreamsPairs(units)
|
||||||
|
|
||||||
data := StatsResp{
|
resp = &StatsResp{
|
||||||
DNSQueries: dnsQueries,
|
|
||||||
BlockedFiltering: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RFiltered] }),
|
|
||||||
ReplacedSafebrowsing: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RSafeBrowsing] }),
|
|
||||||
ReplacedParental: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RParental] }),
|
|
||||||
TopQueried: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.Domains }),
|
TopQueried: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.Domains }),
|
||||||
TopBlocked: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.BlockedDomains }),
|
TopBlocked: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.BlockedDomains }),
|
||||||
TopUpstreamsResponses: topUpstreamsResponses,
|
TopUpstreamsResponses: topUpstreamsResponses,
|
||||||
|
@ -499,6 +441,8 @@ func (s *StatsCtx) dataFromUnits(
|
||||||
TopClients: topsCollector(units, maxClients, nil, topClientPairs(s)),
|
TopClients: topsCollector(units, maxClients, nil, topClientPairs(s)),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.fillCollectedStats(resp, units, curID)
|
||||||
|
|
||||||
// Total counters:
|
// Total counters:
|
||||||
sum := unitDB{
|
sum := unitDB{
|
||||||
NResult: make([]uint64, resultLast),
|
NResult: make([]uint64, resultLast),
|
||||||
|
@ -516,22 +460,83 @@ func (s *StatsCtx) dataFromUnits(
|
||||||
sum.NResult[RParental] += u.NResult[RParental]
|
sum.NResult[RParental] += u.NResult[RParental]
|
||||||
}
|
}
|
||||||
|
|
||||||
data.NumDNSQueries = sum.NTotal
|
resp.NumDNSQueries = sum.NTotal
|
||||||
data.NumBlockedFiltering = sum.NResult[RFiltered]
|
resp.NumBlockedFiltering = sum.NResult[RFiltered]
|
||||||
data.NumReplacedSafebrowsing = sum.NResult[RSafeBrowsing]
|
resp.NumReplacedSafebrowsing = sum.NResult[RSafeBrowsing]
|
||||||
data.NumReplacedSafesearch = sum.NResult[RSafeSearch]
|
resp.NumReplacedSafesearch = sum.NResult[RSafeSearch]
|
||||||
data.NumReplacedParental = sum.NResult[RParental]
|
resp.NumReplacedParental = sum.NResult[RParental]
|
||||||
|
|
||||||
if timeN != 0 {
|
if timeN != 0 {
|
||||||
data.AvgProcessingTime = microsecondsToSeconds(float64(sum.TimeAvg / timeN))
|
resp.AvgProcessingTime = microsecondsToSeconds(float64(sum.TimeAvg / timeN))
|
||||||
}
|
}
|
||||||
|
|
||||||
data.TimeUnits = "hours"
|
return resp
|
||||||
if timeUnit == Days {
|
}
|
||||||
data.TimeUnits = "days"
|
|
||||||
|
// fillCollectedStats fills data with collected statistics.
|
||||||
|
func (s *StatsCtx) fillCollectedStats(data *StatsResp, units []*unitDB, curID uint32) {
|
||||||
|
size := len(units)
|
||||||
|
data.TimeUnits = timeUnitsHours
|
||||||
|
|
||||||
|
daysCount := size / 24
|
||||||
|
if daysCount > 7 {
|
||||||
|
size = daysCount
|
||||||
|
data.TimeUnits = timeUnitsDays
|
||||||
}
|
}
|
||||||
|
|
||||||
return data
|
data.DNSQueries = make([]uint64, size)
|
||||||
|
data.BlockedFiltering = make([]uint64, size)
|
||||||
|
data.ReplacedSafebrowsing = make([]uint64, size)
|
||||||
|
data.ReplacedParental = make([]uint64, size)
|
||||||
|
|
||||||
|
if data.TimeUnits == timeUnitsDays {
|
||||||
|
s.fillCollectedStatsDaily(data, units, curID, size)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, u := range units {
|
||||||
|
data.DNSQueries[i] += u.NTotal
|
||||||
|
data.BlockedFiltering[i] += u.NResult[RFiltered]
|
||||||
|
data.ReplacedSafebrowsing[i] += u.NResult[RSafeBrowsing]
|
||||||
|
data.ReplacedParental[i] += u.NResult[RParental]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fillCollectedStatsDaily fills data with collected daily statistics. units
|
||||||
|
// must contain data for the count of days.
|
||||||
|
func (s *StatsCtx) fillCollectedStatsDaily(
|
||||||
|
data *StatsResp,
|
||||||
|
units []*unitDB,
|
||||||
|
curHour uint32,
|
||||||
|
days int,
|
||||||
|
) {
|
||||||
|
// Per time unit counters: 720 hours may span 31 days, so we skip data for
|
||||||
|
// the first hours in this case. align_ceil(24)
|
||||||
|
hours := countHours(curHour, days)
|
||||||
|
units = units[len(units)-hours:]
|
||||||
|
|
||||||
|
for i := 0; i < len(units); i++ {
|
||||||
|
day := i / 24
|
||||||
|
u := units[i]
|
||||||
|
|
||||||
|
data.DNSQueries[day] += u.NTotal
|
||||||
|
data.BlockedFiltering[day] += u.NResult[RFiltered]
|
||||||
|
data.ReplacedSafebrowsing[day] += u.NResult[RSafeBrowsing]
|
||||||
|
data.ReplacedParental[day] += u.NResult[RParental]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// countHours returns the number of hours in the last days.
|
||||||
|
func countHours(curHour uint32, days int) (n int) {
|
||||||
|
hoursInCurDay := int(curHour % 24)
|
||||||
|
if hoursInCurDay == 0 {
|
||||||
|
hoursInCurDay = 24
|
||||||
|
}
|
||||||
|
|
||||||
|
hoursInRestDays := (days - 1) * 24
|
||||||
|
|
||||||
|
return hoursInRestDays + hoursInCurDay
|
||||||
}
|
}
|
||||||
|
|
||||||
func topClientPairs(s *StatsCtx) (pg pairsGetter) {
|
func topClientPairs(s *StatsCtx) (pg pairsGetter) {
|
||||||
|
|
Loading…
Reference in New Issue