@ -4,9 +4,12 @@
%if style == newcode
\begin{code}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-@ embed GHC.Natural.Natural as int @-}
module Cardano.BM.Output.Aggregation
(
Aggregation
@ -28,7 +31,7 @@ import qualified Data.HashMap.Strict as HM
import Data.Text (Text, pack)
import qualified Data.Text.IO as TIO
import Data.Time.Calendar (toModifiedJulianDay)
import Data.Time.Clock (UTCTime (..), getCurrentTime)
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Word (Word64)
import GHC.Clock (getMonotonicTimeNSec)
import System.IO (stderr)
@ -36,7 +39,7 @@ import System.IO (stderr)
import Cardano.BM.Configuration.Model (Configuration, getAggregatedKind)
import Cardano.BM.Data.Aggregated (Aggregated (..), BaseStats (..),
EWMA (..), Measurable (..), Stats (..), getDouble,
singletonStats)
getInteger, singletonStats, subtractMeasurable )
import Cardano.BM.Data.AggregatedKind (AggregatedKind (..))
import Cardano.BM.Data.Backend
import Cardano.BM.Data.Counter (Counter (..), CounterState (..),
@ -78,7 +81,7 @@ type Timestamp = Word64
data AggregatedExpanded = AggregatedExpanded
{ aeAggregated :: !Aggregated
, aeResetAfter :: !(Maybe Int )
, aeResetAfter :: !(Maybe Word64 )
, aeLastSent :: {-# UNPACK #-} !Timestamp
}
@ -107,7 +110,7 @@ instance IsEffectuator Aggregation a where
instance IsBackend Aggregation a where
typeof _ = AggregationBK
realize _ = error "Aggregation cannot be instantiated by 'realize'"
realize _ = fail "Aggregation cannot be instantiated by 'realize'"
realizefrom config trace _ = do
aggref <- newEmptyMVar
@ -158,11 +161,12 @@ spawnDispatcher conf aggMap aggregationQueue basetrace = do
Async.async $ qProc trace countersMVar aggMap
where
{-@ lazy qProc @-}
qProc trace counters aggregatedMap = do
maybeItem <- atomically $ TBQ.readTBQueue aggregationQueue
case maybeItem of
Just (lo@(LogObject logname lm _)) -> do
(updatedMap, aggregations) <- update lo aggregatedMap
(updatedMap, aggregations) <- update lo aggregatedMap trace
unless (null aggregations) $
sendAggregated trace (LogObject logname lm (AggregatedMessage aggregations))
-- increase the counter for the specific severity and message type
@ -170,81 +174,107 @@ spawnDispatcher conf aggMap aggregationQueue basetrace = do
qProc trace counters updatedMap
Nothing -> return ()
createNupdate :: Text -> Measurable -> LOMeta -> AggregationMap -> IO (Either Text Aggregated)
createNupdate name value lme agmap = do
case HM.lookup name agmap of
Nothing -> do
-- if Aggregated does not exist; initialize it.
aggregatedKind <- getAggregatedKind conf name
case aggregatedKind of
StatsAK -> return $ singletonStats value
StatsAK -> return $ Right $ singletonStats value
EwmaAK aEWMA -> do
let initEWMA = EmptyEWMA aEWMA
return $ AggregatedEWMA $ ewma initEWMA value
return $ AggregatedEWMA < $> ewma initEWMA value
Just a -> return $ updateAggregation value (aeAggregated a) lme (aeResetAfter a)
update :: LogObject a
-> AggregationMap
-> Trace.Trace IO a
-> IO (AggregationMap, [(Text, Aggregated)])
update (LogObject logname lme (LogValue iname value)) agmap = do
update (LogObject logname lme (LogValue iname value)) agmap trace = do
let fullname = logname <> "." <> iname
aggregated <- createNupdate fullname value lme agmap
now <- getMonotonicTimeNSec
let aggregatedX = AggregatedExpanded {
aeAggregated = aggregated
, aeResetAfter = Nothing
, aeLastSent = now
}
namedAggregated = [(iname, aeAggregated aggregatedX)]
updatedMap = HM.alter (const $ Just $ aggregatedX) fullname agmap
return (updatedMap, namedAggregated)
update (LogObject logname lme (ObserveDiff counterState)) agmap =
updateCounters (csCounters counterState) lme (logname, "diff") agmap []
update (LogObject logname lme (ObserveOpen counterState)) agmap =
updateCounters (csCounters counterState) lme (logname, "open") agmap []
update (LogObject logname lme (ObserveClose counterState)) agmap =
updateCounters (csCounters counterState) lme (logname, "close") agmap []
update (LogObject logname lme (LogMessage _)) agmap = do
eitherAggregated <- createNupdate fullname value lme agmap
case eitherAggregated of
Right aggregated -> do
now <- getMonotonicTimeNSec
let aggregatedX = AggregatedExpanded {
aeAggregated = aggregated
, aeResetAfter = Nothing
, aeLastSent = now
}
namedAggregated = [(iname, aeAggregated aggregatedX)]
updatedMap = HM.alter (const $ Just $ aggregatedX) fullname agmap
return (updatedMap, namedAggregated)
Left w -> do
trace' <- Trace.appendName "update" trace
Trace.traceNamedObject trace' =<<
(,) <$> liftIO (mkLOMeta Warning Public)
<*> pure (LogError w)
return (agmap, [])
update (LogObject logname lme (ObserveDiff counterState)) agmap trace =
updateCounters (csCounters counterState) lme (logname, "diff") agmap [] trace
update (LogObject logname lme (ObserveOpen counterState)) agmap trace =
updateCounters (csCounters counterState) lme (logname, "open") agmap [] trace
update (LogObject logname lme (ObserveClose counterState)) agmap trace =
updateCounters (csCounters counterState) lme (logname, "close") agmap [] trace
update (LogObject logname lme (LogMessage _)) agmap trace = do
let iname = pack $ show (severity lme)
let fullname = logname <> "." <> iname
aggregated <- createNupdate fullname (PureI 0) lme agmap
now <- getMonotonicTimeNSec
let aggregatedX = AggregatedExpanded {
aeAggregated = aggregated
, aeResetAfter = Nothing
, aeLastSent = now
}
namedAggregated = [(iname, aeAggregated aggregatedX)]
updatedMap = HM.alter (const $ Just $ aggregatedX) fullname agmap
return (updatedMap, namedAggregated)
eitherAggregated <- createNupdate fullname (PureI 0) lme agmap
case eitherAggregated of
Right aggregated -> do
now <- getMonotonicTimeNSec
let aggregatedX = AggregatedExpanded {
aeAggregated = aggregated
, aeResetAfter = Nothing
, aeLastSent = now
}
namedAggregated = [(iname, aeAggregated aggregatedX)]
updatedMap = HM.alter (const $ Just $ aggregatedX) fullname agmap
return (updatedMap, namedAggregated)
Left w -> do
trace' <- Trace.appendName "update" trace
Trace.traceNamedObject trace' =<<
(,) <$> liftIO (mkLOMeta Warning Public)
<*> pure (LogError w)
return (agmap, [])
-- everything else
update _ agmap = return (agmap, [])
update _ agmap _ = return (agmap, [])
updateCounters :: [Counter]
-> LOMeta
-> (LoggerName,LoggerName)
-> AggregationMap
-> [(Text, Aggregated)]
-> Trace.Trace IO a
-> IO (AggregationMap, [(Text, Aggregated)])
updateCounters [] _ _ aggrMap aggs = return $ (aggrMap, aggs)
updateCounters (counter : cs) lme (logname, msgname) aggrMap aggs = do
updateCounters [] _ _ aggrMap aggs _ = return $ (aggrMap, aggs)
updateCounters (counter : cs) lme (logname, msgname) aggrMap aggs trace = do
let name = cName counter
subname = msgname <> "." <> (nameCounter counter) <> "." <> name
fullname = logname <> "." <> subname
value = cValue counter
aggregated <- createNupdate fullname value lme aggrMap
now <- getMonotonicTimeNSec
let aggregatedX = AggregatedExpanded {
aeAggregated = aggregated
, aeResetAfter = Nothing
, aeLastSent = now
}
namedAggregated = (subname, aggregated)
updatedMap = HM.alter (const $ Just $ aggregatedX) fullname aggrMap
updateCounters cs lme (logname, msgname) updatedMap (namedAggregated : aggs)
eitherAggregated <- createNupdate fullname value lme aggrMap
case eitherAggregated of
Right aggregated -> do
now <- getMonotonicTimeNSec
let aggregatedX = AggregatedExpanded {
aeAggregated = aggregated
, aeResetAfter = Nothing
, aeLastSent = now
}
namedAggregated = (subname, aggregated)
updatedMap = HM.alter (const $ Just $ aggregatedX) fullname aggrMap
updateCounters cs lme (logname, msgname) updatedMap (namedAggregated : aggs) trace
Left w -> do
trace' <- Trace.appendName "updateCounters" trace
Trace.traceNamedObject trace' =<<
(,) <$> liftIO (mkLOMeta Warning Public)
<*> pure (LogError w)
updateCounters cs lme (logname, msgname) aggrMap aggs trace
sendAggregated :: Trace.Trace IO a -> LogObject a -> IO ()
sendAggregated trace (LogObject logname meta v@(AggregatedMessage _)) = do
@ -263,50 +293,62 @@ We use Welford's online algorithm to update the estimation of mean and variance
(see \url{https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_Online_algorithm})
\begin{code}
updateAggregation :: Measurable -> Aggregated -> LOMeta -> Maybe Int -> Aggregated
updateAggregation :: Measurable -> Aggregated -> LOMeta -> Maybe Word64 -> Either Text Aggregated
updateAggregation v (AggregatedStats s) lme resetAfter =
let count = fcount (fbasic s)
reset = maybe False (count >=) resetAfter
in
if reset
then
singletonStats v
Right $ singletonStats v
else
AggregatedStats $! Stats { flast = v
, fold = mkTimestamp
, fbasic = updateBaseStats (count >= 1) v (fbasic s)
, fdelta = updateBaseStats (count >= 2) (v - flast s) (fdelta s)
, ftimed = updateBaseStats (count >= 2) (mkTimestamp - fold s) (ftimed s)
}
Right $ AggregatedStats $! Stats { flast = v
, fold = mkTimestamp
, fbasic = updateBaseStats 1 v (fbasic s)
, fdelta = updateBaseStats 2 deltav (fdelta s)
, ftimed = updateBaseStats 2 timediff (ftimed s)
}
where
deltav = subtractMeasurable v (flast s)
mkTimestamp = utc2ns (tstamp lme)
timediff = Nanoseconds $ fromInteger $ (getInteger mkTimestamp) - (getInteger $ fold s)
utc2ns (UTCTime days secs) =
let yearsecs :: Rational
yearsecs = 365 * 24 * 3600
rdays,rsecs :: Rational
rdays = toRational $ toModifiedJulianDay days
rsecs = toRational secs
s2ns = 1000000000
let daysecs = 24 * 3600
rdays,rsecs :: Integer
rdays = toModifiedJulianDay days
rsecs = diffTimeToPicoseconds secs `div` 1000
s2ns = 1000*1000*1000
ns = rsecs + s2ns * rdays * daysecs
in
Nanoseconds $ round $ (fromRational $ s2ns * rsecs + rdays * yearsecs :: Double)
updateAggregation v (AggregatedEWMA e) _ _ = AggregatedEWMA $! ewma e v
updateBaseStats :: Bool -> Measurable -> BaseStats -> BaseStats
updateBaseStats False _ s = s {fcount = fcount s + 1}
updateBaseStats True v s =
let newcount = fcount s + 1
newvalue = getDouble v
delta = newvalue - fsum_A s
dincr = (delta / fromIntegral newcount)
delta2 = newvalue - fsum_A s - dincr
Nanoseconds $ fromInteger ns
updateAggregation v (AggregatedEWMA e) _ _ =
let !eitherAvg = ewma e v
in
BaseStats { fmin = min (fmin s) v
, fmax = max v (fmax s)
, fcount = newcount
, fsum_A = fsum_A s + dincr
, fsum_B = fsum_B s + (delta * delta2)
}
AggregatedEWMA <$> eitherAvg
updateBaseStats :: Word64 -> Measurable -> BaseStats -> BaseStats
updateBaseStats startAt v s =
let newcount = fcount s + 1 in
if (startAt > newcount)
then s {fcount = fcount s + 1}
else
let newcountRel = newcount - startAt + 1
newvalue = getDouble v
delta = newvalue - fsum_A s
dincr = (delta / fromIntegral newcountRel)
delta2 = newvalue - fsum_A s - dincr
(minim, maxim) =
if startAt == newcount
then (v, v)
else (min v (fmin s), max v (fmax s))
in
BaseStats { fmin = minim
, fmax = maxim
, fcount = newcount
, fsum_A = fsum_A s + dincr
, fsum_B = fsum_B s + (delta * delta2)
}
\end{code}
@ -325,18 +367,18 @@ $$
The pattern matching below ensures that the |EWMA| will start with the first value passed in,
and will not change type, once determined.
\begin{code}
ewma :: EWMA -> Measurable -> EWMA
ewma (EmptyEWMA a) v = EWMA a v
ewma :: EWMA -> Measurable -> Either Text E WMA
ewma (EmptyEWMA a) v = Right $ EWMA a v
ewma (EWMA a s@(Microseconds _)) y@(Microseconds _) =
EWMA a $ Microseconds $ round $ a * (getDouble y) + (1 - a) * (getDouble s)
Right $ EWMA a $ Microseconds $ round $ a * (getDouble y) + (1 - a) * (getDouble s)
ewma (EWMA a s@(Seconds _)) y@(Seconds _) =
EWMA a $ Seconds $ round $ a * (getDouble y) + (1 - a) * (getDouble s)
Right $ EWMA a $ Seconds $ round $ a * (getDouble y) + (1 - a) * (getDouble s)
ewma (EWMA a s@(Bytes _)) y@(Bytes _) =
EWMA a $ Bytes $ round $ a * (getDouble y) + (1 - a) * (getDouble s)
Right $ EWMA a $ Bytes $ round $ a * (getDouble y) + (1 - a) * (getDouble s)
ewma (EWMA a (PureI s)) (PureI y) =
EWMA a $ PureI $ round $ a * (fromInteger y) + (1 - a) * (fromInteger s)
Right $ EWMA a $ PureI $ round $ a * (fromInteger y) + (1 - a) * (fromInteger s)
ewma (EWMA a (PureD s)) (PureD y) =
EWMA a $ PureD $ a * y + (1 - a) * s
ewma _ _ = error "Cannot average on values of different type "
Right $ EWMA a $ PureD $ a * y + (1 - a) * s
ewma _ _ = Left "EWMA: Cannot compute average on values of different types "
\end{code}