{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}
module Ouroboros.Consensus.Util.LeakyBucket (
Config (..)
, Handlers (..)
, State (..)
, atomicallyWithMonotonicTime
, diffTimeToSecondsRational
, dummyConfig
, evalAgainstBucket
, execAgainstBucket
, execAgainstBucket'
, fill'
, microsecondsPerSecond
, picosecondsPerSecond
, runAgainstBucket
, secondsRationalToDiffTime
, setPaused'
, updateConfig'
) where
import Control.Exception (assert)
import Control.Monad (forever, void, when)
import qualified Control.Monad.Class.MonadSTM.Internal as TVar
import Control.Monad.Class.MonadTimer (MonadTimer, registerDelay)
import Control.Monad.Class.MonadTimer.SI (diffTimeToMicrosecondsAsInt)
import Data.Ratio ((%))
import Data.Time.Clock (diffTimeToPicoseconds)
import GHC.Generics (Generic)
import Ouroboros.Consensus.Util.IOLike hiding (killThread)
import Ouroboros.Consensus.Util.STM (blockUntilChanged)
import Prelude hiding (init)
data Config m = Config
{
forall (m :: * -> *). Config m -> Rational
capacity :: !Rational,
forall (m :: * -> *). Config m -> Rational
rate :: !Rational,
forall (m :: * -> *). Config m -> Bool
fillOnOverflow :: !Bool,
forall (m :: * -> *). Config m -> m ()
onEmpty :: !(m ())
}
deriving ((forall x. Config m -> Rep (Config m) x)
-> (forall x. Rep (Config m) x -> Config m) -> Generic (Config m)
forall x. Rep (Config m) x -> Config m
forall x. Config m -> Rep (Config m) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall (m :: * -> *) x. Rep (Config m) x -> Config m
forall (m :: * -> *) x. Config m -> Rep (Config m) x
$cfrom :: forall (m :: * -> *) x. Config m -> Rep (Config m) x
from :: forall x. Config m -> Rep (Config m) x
$cto :: forall (m :: * -> *) x. Rep (Config m) x -> Config m
to :: forall x. Rep (Config m) x -> Config m
Generic)
deriving instance NoThunks (m ()) => NoThunks (Config m)
dummyConfig :: (Applicative m) => Config m
dummyConfig :: forall (m :: * -> *). Applicative m => Config m
dummyConfig =
Config
{ capacity :: Rational
capacity = Rational
0,
rate :: Rational
rate = Rational
0,
fillOnOverflow :: Bool
fillOnOverflow = Bool
True,
onEmpty :: m ()
onEmpty = () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
}
data State m = State
{ forall (m :: * -> *). State m -> Rational
level :: !Rational,
forall (m :: * -> *). State m -> Time
time :: !Time,
forall (m :: * -> *). State m -> Bool
paused :: !Bool,
forall (m :: * -> *). State m -> Int
configGeneration :: !Int,
forall (m :: * -> *). State m -> Config m
config :: !(Config m)
}
deriving ((forall x. State m -> Rep (State m) x)
-> (forall x. Rep (State m) x -> State m) -> Generic (State m)
forall x. Rep (State m) x -> State m
forall x. State m -> Rep (State m) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall (m :: * -> *) x. Rep (State m) x -> State m
forall (m :: * -> *) x. State m -> Rep (State m) x
$cfrom :: forall (m :: * -> *) x. State m -> Rep (State m) x
from :: forall x. State m -> Rep (State m) x
$cto :: forall (m :: * -> *) x. Rep (State m) x -> State m
to :: forall x. Rep (State m) x -> State m
Generic)
deriving instance (NoThunks (m ())) => NoThunks (State m)
type Bucket m = StrictTVar m (State m)
data FillResult = Overflew | DidNotOverflow
data Handlers m = Handlers
{
forall (m :: * -> *).
Handlers m -> Rational -> Time -> STM m FillResult
fill ::
!( Rational ->
Time ->
STM m FillResult
),
forall (m :: * -> *). Handlers m -> Bool -> Time -> STM m ()
setPaused ::
!( Bool ->
Time ->
STM m ()
),
forall (m :: * -> *).
Handlers m
-> ((Rational, Config m) -> (Rational, Config m))
-> Time
-> STM m ()
updateConfig ::
!( ((Rational, Config m) -> (Rational, Config m)) ->
Time ->
STM m ()
)
}
fill' ::
( MonadMonotonicTime m,
MonadSTM m
) =>
Handlers m ->
Rational ->
m FillResult
fill' :: forall (m :: * -> *).
(MonadMonotonicTime m, MonadSTM m) =>
Handlers m -> Rational -> m FillResult
fill' Handlers m
h Rational
r = (Time -> STM m FillResult) -> m FillResult
forall (m :: * -> *) b.
(MonadMonotonicTime m, MonadSTM m) =>
(Time -> STM m b) -> m b
atomicallyWithMonotonicTime ((Time -> STM m FillResult) -> m FillResult)
-> (Time -> STM m FillResult) -> m FillResult
forall a b. (a -> b) -> a -> b
$ Handlers m -> Rational -> Time -> STM m FillResult
forall (m :: * -> *).
Handlers m -> Rational -> Time -> STM m FillResult
fill Handlers m
h Rational
r
setPaused' ::
( MonadMonotonicTime m,
MonadSTM m
) =>
Handlers m ->
Bool ->
m ()
setPaused' :: forall (m :: * -> *).
(MonadMonotonicTime m, MonadSTM m) =>
Handlers m -> Bool -> m ()
setPaused' Handlers m
h Bool
p = (Time -> STM m ()) -> m ()
forall (m :: * -> *) b.
(MonadMonotonicTime m, MonadSTM m) =>
(Time -> STM m b) -> m b
atomicallyWithMonotonicTime ((Time -> STM m ()) -> m ()) -> (Time -> STM m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Handlers m -> Bool -> Time -> STM m ()
forall (m :: * -> *). Handlers m -> Bool -> Time -> STM m ()
setPaused Handlers m
h Bool
p
updateConfig' ::
( MonadMonotonicTime m,
MonadSTM m
) =>
Handlers m ->
((Rational, Config m) -> (Rational, Config m)) ->
m ()
updateConfig' :: forall (m :: * -> *).
(MonadMonotonicTime m, MonadSTM m) =>
Handlers m
-> ((Rational, Config m) -> (Rational, Config m)) -> m ()
updateConfig' Handlers m
h (Rational, Config m) -> (Rational, Config m)
f = (Time -> STM m ()) -> m ()
forall (m :: * -> *) b.
(MonadMonotonicTime m, MonadSTM m) =>
(Time -> STM m b) -> m b
atomicallyWithMonotonicTime ((Time -> STM m ()) -> m ()) -> (Time -> STM m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Handlers m
-> ((Rational, Config m) -> (Rational, Config m))
-> Time
-> STM m ()
forall (m :: * -> *).
Handlers m
-> ((Rational, Config m) -> (Rational, Config m))
-> Time
-> STM m ()
updateConfig Handlers m
h (Rational, Config m) -> (Rational, Config m)
f
execAgainstBucket ::
( MonadDelay m,
MonadAsync m,
MonadFork m,
MonadMask m,
MonadTimer m,
NoThunks (m ())
) =>
Config m ->
(Handlers m -> m a) ->
m a
execAgainstBucket :: forall (m :: * -> *) a.
(MonadDelay m, MonadAsync m, MonadFork m, MonadMask m,
MonadTimer m, NoThunks (m ())) =>
Config m -> (Handlers m -> m a) -> m a
execAgainstBucket Config m
config Handlers m -> m a
action = (State m, a) -> a
forall a b. (a, b) -> b
snd ((State m, a) -> a) -> m (State m, a) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Config m -> (Handlers m -> m a) -> m (State m, a)
forall (m :: * -> *) a.
(MonadDelay m, MonadAsync m, MonadFork m, MonadMask m,
MonadTimer m, NoThunks (m ())) =>
Config m -> (Handlers m -> m a) -> m (State m, a)
runAgainstBucket Config m
config Handlers m -> m a
action
execAgainstBucket' ::
( MonadDelay m,
MonadAsync m,
MonadFork m,
MonadMask m,
MonadTimer m,
NoThunks (m ())
) =>
(Handlers m -> m a) ->
m a
execAgainstBucket' :: forall (m :: * -> *) a.
(MonadDelay m, MonadAsync m, MonadFork m, MonadMask m,
MonadTimer m, NoThunks (m ())) =>
(Handlers m -> m a) -> m a
execAgainstBucket' Handlers m -> m a
action =
Config m -> (Handlers m -> m a) -> m a
forall (m :: * -> *) a.
(MonadDelay m, MonadAsync m, MonadFork m, MonadMask m,
MonadTimer m, NoThunks (m ())) =>
Config m -> (Handlers m -> m a) -> m a
execAgainstBucket Config m
forall (m :: * -> *). Applicative m => Config m
dummyConfig Handlers m -> m a
action
evalAgainstBucket ::
(MonadDelay m, MonadAsync m, MonadFork m, MonadMask m, MonadTimer m, NoThunks (m ())
) =>
Config m ->
(Handlers m -> m a) ->
m (State m)
evalAgainstBucket :: forall (m :: * -> *) a.
(MonadDelay m, MonadAsync m, MonadFork m, MonadMask m,
MonadTimer m, NoThunks (m ())) =>
Config m -> (Handlers m -> m a) -> m (State m)
evalAgainstBucket Config m
config Handlers m -> m a
action = (State m, a) -> State m
forall a b. (a, b) -> a
fst ((State m, a) -> State m) -> m (State m, a) -> m (State m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Config m -> (Handlers m -> m a) -> m (State m, a)
forall (m :: * -> *) a.
(MonadDelay m, MonadAsync m, MonadFork m, MonadMask m,
MonadTimer m, NoThunks (m ())) =>
Config m -> (Handlers m -> m a) -> m (State m, a)
runAgainstBucket Config m
config Handlers m -> m a
action
runAgainstBucket ::
forall m a.
( MonadDelay m,
MonadAsync m,
MonadFork m,
MonadMask m,
MonadTimer m,
NoThunks (m ())
) =>
Config m ->
(Handlers m -> m a) ->
m (State m, a)
runAgainstBucket :: forall (m :: * -> *) a.
(MonadDelay m, MonadAsync m, MonadFork m, MonadMask m,
MonadTimer m, NoThunks (m ())) =>
Config m -> (Handlers m -> m a) -> m (State m, a)
runAgainstBucket Config m
config Handlers m -> m a
action = do
StrictTMVar m Int
leakingPeriodVersionTMVar <- STM m (StrictTMVar m Int) -> m (StrictTMVar m Int)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (StrictTMVar m Int)
forall (m :: * -> *) a. MonadSTM m => STM m (StrictTMVar m a)
newEmptyTMVar
ThreadId m
tid <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
Bucket m
bucket <- Config m -> m (Bucket m)
forall (m :: * -> *).
(MonadMonotonicTime m, MonadSTM m, NoThunks (m ())) =>
Config m -> m (Bucket m)
init Config m
config
m () -> (Async m () -> m (State m, a)) -> m (State m, a)
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (STM m Int -> ThreadId m -> Bucket m -> m ()
forall (m :: * -> *).
(MonadDelay m, MonadCatch m, MonadFork m, MonadAsync m,
MonadTimer m) =>
STM m Int -> ThreadId m -> Bucket m -> m ()
leak (StrictTMVar m Int -> STM m Int
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m Int
leakingPeriodVersionTMVar) ThreadId m
tid Bucket m
bucket) ((Async m () -> m (State m, a)) -> m (State m, a))
-> (Async m () -> m (State m, a)) -> m (State m, a)
forall a b. (a -> b) -> a -> b
$ \Async m ()
_ -> do
(Time -> STM m ()) -> m ()
forall (m :: * -> *) b.
(MonadMonotonicTime m, MonadSTM m) =>
(Time -> STM m b) -> m b
atomicallyWithMonotonicTime ((Time -> STM m ()) -> m ()) -> (Time -> STM m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe Int -> StrictTMVar m Int -> Bucket m -> Time -> STM m ()
maybeStartThread Maybe Int
forall a. Maybe a
Nothing StrictTMVar m Int
leakingPeriodVersionTMVar Bucket m
bucket
a
result <-
Handlers m -> m a
action (Handlers m -> m a) -> Handlers m -> m a
forall a b. (a -> b) -> a -> b
$
Handlers
{ fill :: Rational -> Time -> STM m FillResult
fill = \Rational
r Time
t -> ((State m, FillResult) -> FillResult
forall a b. (a, b) -> b
snd ((State m, FillResult) -> FillResult)
-> STM m (State m, FillResult) -> STM m FillResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (STM m (State m, FillResult) -> STM m FillResult)
-> STM m (State m, FillResult) -> STM m FillResult
forall a b. (a -> b) -> a -> b
$ Bucket m -> Rational -> Time -> STM m (State m, FillResult)
forall (m :: * -> *).
MonadSTM m =>
Bucket m -> Rational -> Time -> STM m (State m, FillResult)
snapshotFill Bucket m
bucket Rational
r Time
t,
setPaused :: Bool -> Time -> STM m ()
setPaused = Bucket m -> Bool -> Time -> STM m ()
setPaused Bucket m
bucket,
updateConfig :: ((Rational, Config m) -> (Rational, Config m)) -> Time -> STM m ()
updateConfig = StrictTMVar m Int
-> Bucket m
-> ((Rational, Config m) -> (Rational, Config m))
-> Time
-> STM m ()
updateConfig StrictTMVar m Int
leakingPeriodVersionTMVar Bucket m
bucket
}
State m
state <- (Time -> STM m (State m)) -> m (State m)
forall (m :: * -> *) b.
(MonadMonotonicTime m, MonadSTM m) =>
(Time -> STM m b) -> m b
atomicallyWithMonotonicTime ((Time -> STM m (State m)) -> m (State m))
-> (Time -> STM m (State m)) -> m (State m)
forall a b. (a -> b) -> a -> b
$ Bucket m -> Time -> STM m (State m)
forall (m :: * -> *).
MonadSTM m =>
Bucket m -> Time -> STM m (State m)
snapshot Bucket m
bucket
(State m, a) -> m (State m, a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (State m
state, a
result)
where
maybeStartThread :: Maybe Int -> StrictTMVar m Int -> Bucket m -> Time -> STM m ()
maybeStartThread :: Maybe Int -> StrictTMVar m Int -> Bucket m -> Time -> STM m ()
maybeStartThread Maybe Int
mLeakingPeriodVersion StrictTMVar m Int
leakingPeriodVersionTMVar Bucket m
bucket Time
time = do
State {config :: forall (m :: * -> *). State m -> Config m
config = Config {Rational
rate :: forall (m :: * -> *). Config m -> Rational
rate :: Rational
rate}} <- Bucket m -> Time -> STM m (State m)
forall (m :: * -> *).
MonadSTM m =>
Bucket m -> Time -> STM m (State m)
snapshot Bucket m
bucket Time
time
Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Rational
rate Rational -> Rational -> Bool
forall a. Ord a => a -> a -> Bool
> Rational
0) (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$ STM m Bool -> STM m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM m Bool -> STM m ()) -> STM m Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m Int -> Int -> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar m Int
leakingPeriodVersionTMVar (Int -> STM m Bool) -> Int -> STM m Bool
forall a b. (a -> b) -> a -> b
$ Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
0 (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Maybe Int
mLeakingPeriodVersion
setPaused :: Bucket m -> Bool -> Time -> STM m ()
setPaused :: Bucket m -> Bool -> Time -> STM m ()
setPaused Bucket m
bucket Bool
paused Time
time = do
State m
newState <- Bucket m -> Time -> STM m (State m)
forall (m :: * -> *).
MonadSTM m =>
Bucket m -> Time -> STM m (State m)
snapshot Bucket m
bucket Time
time
Bucket m -> State m -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar Bucket m
bucket State m
newState {paused}
updateConfig ::
StrictTMVar m Int ->
Bucket m ->
((Rational, Config m) -> (Rational, Config m)) ->
Time ->
STM m ()
updateConfig :: StrictTMVar m Int
-> Bucket m
-> ((Rational, Config m) -> (Rational, Config m))
-> Time
-> STM m ()
updateConfig StrictTMVar m Int
leakingPeriodVersionTMVar Bucket m
bucket (Rational, Config m) -> (Rational, Config m)
f Time
time = do
State
{ level :: forall (m :: * -> *). State m -> Rational
level = Rational
oldLevel,
Bool
paused :: forall (m :: * -> *). State m -> Bool
paused :: Bool
paused,
configGeneration :: forall (m :: * -> *). State m -> Int
configGeneration = Int
oldConfigGeneration,
config :: forall (m :: * -> *). State m -> Config m
config = Config m
oldConfig
} <-
Bucket m -> Time -> STM m (State m)
forall (m :: * -> *).
MonadSTM m =>
Bucket m -> Time -> STM m (State m)
snapshot Bucket m
bucket Time
time
let (Rational
newLevel, Config m
newConfig) = (Rational, Config m) -> (Rational, Config m)
f (Rational
oldLevel, Config m
oldConfig)
Config {capacity :: forall (m :: * -> *). Config m -> Rational
capacity = Rational
newCapacity} = Config m
newConfig
newLevel' :: Rational
newLevel' = (Rational, Rational) -> Rational -> Rational
forall a. Ord a => (a, a) -> a -> a
clamp (Rational
0, Rational
newCapacity) Rational
newLevel
Bucket m -> State m -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar Bucket m
bucket (State m -> STM m ()) -> State m -> STM m ()
forall a b. (a -> b) -> a -> b
$
State
{ level :: Rational
level = Rational
newLevel',
Time
time :: Time
time :: Time
time,
Bool
paused :: Bool
paused :: Bool
paused,
configGeneration :: Int
configGeneration = Int
oldConfigGeneration Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1,
config :: Config m
config = Config m
newConfig
}
Maybe Int
mLeakingPeriodVersion <- StrictTMVar m Int -> STM m (Maybe Int)
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> STM m (Maybe a)
tryTakeTMVar StrictTMVar m Int
leakingPeriodVersionTMVar
Maybe Int -> StrictTMVar m Int -> Bucket m -> Time -> STM m ()
maybeStartThread Maybe Int
mLeakingPeriodVersion StrictTMVar m Int
leakingPeriodVersionTMVar Bucket m
bucket Time
time
init ::
(MonadMonotonicTime m, MonadSTM m, NoThunks (m ())) =>
Config m ->
m (Bucket m)
init :: forall (m :: * -> *).
(MonadMonotonicTime m, MonadSTM m, NoThunks (m ())) =>
Config m -> m (Bucket m)
init config :: Config m
config@Config {Rational
capacity :: forall (m :: * -> *). Config m -> Rational
capacity :: Rational
capacity} = do
Time
time <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
State m -> m (Bucket m)
forall (m :: * -> *) a.
(HasCallStack, MonadSTM m, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO (State m -> m (Bucket m)) -> State m -> m (Bucket m)
forall a b. (a -> b) -> a -> b
$
State
{ Time
time :: Time
time :: Time
time,
level :: Rational
level = Rational
capacity,
paused :: Bool
paused = Bool
False,
configGeneration :: Int
configGeneration = Int
0,
config :: Config m
config = Config m
config
}
leak ::
( MonadDelay m,
MonadCatch m,
MonadFork m,
MonadAsync m,
MonadTimer m
) =>
STM m Int ->
ThreadId m ->
Bucket m ->
m ()
leak :: forall (m :: * -> *).
(MonadDelay m, MonadCatch m, MonadFork m, MonadAsync m,
MonadTimer m) =>
STM m Int -> ThreadId m -> Bucket m -> m ()
leak STM m Int
leakingPeriodVersionSTM ThreadId m
actionThreadId Bucket m
bucket = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int
leakingPeriodVersion <- STM m Int -> m Int
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m Int
leakingPeriodVersionSTM
State {Rational
level :: forall (m :: * -> *). State m -> Rational
level :: Rational
level, configGeneration :: forall (m :: * -> *). State m -> Int
configGeneration = Int
oldConfigGeneration, config :: forall (m :: * -> *). State m -> Config m
config = Config {Rational
rate :: forall (m :: * -> *). Config m -> Rational
rate :: Rational
rate, m ()
onEmpty :: forall (m :: * -> *). Config m -> m ()
onEmpty :: m ()
onEmpty}} <-
(Time -> STM m (State m)) -> m (State m)
forall (m :: * -> *) b.
(MonadMonotonicTime m, MonadSTM m) =>
(Time -> STM m b) -> m b
atomicallyWithMonotonicTime ((Time -> STM m (State m)) -> m (State m))
-> (Time -> STM m (State m)) -> m (State m)
forall a b. (a -> b) -> a -> b
$ Bucket m -> Time -> STM m (State m)
forall (m :: * -> *).
MonadSTM m =>
Bucket m -> Time -> STM m (State m)
snapshot Bucket m
bucket
let timeToWait :: DiffTime
timeToWait = Rational -> DiffTime
secondsRationalToDiffTime (Rational
level Rational -> Rational -> Rational
forall a. Fractional a => a -> a -> a
/ Rational
rate)
timeToWaitMicroseconds :: Int
timeToWaitMicroseconds = DiffTime -> Int
diffTimeToMicrosecondsAsInt DiffTime
timeToWait
if Rational
level Rational -> Rational -> Bool
forall a. Ord a => a -> a -> Bool
<= Rational
0 Bool -> Bool -> Bool
|| Int
timeToWaitMicroseconds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then do
(SomeException -> m ()) -> m () -> m ()
forall e a. Exception e => (e -> m a) -> m a -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (\(SomeException
e :: SomeException) -> ThreadId m -> SomeException -> m ()
forall e. Exception e => ThreadId m -> e -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
actionThreadId SomeException
e) m ()
onEmpty
m (State m, Int) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (State m, Int) -> m ()) -> m (State m, Int) -> m ()
forall a b. (a -> b) -> a -> b
$ STM m (State m, Int) -> m (State m, Int)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (State m, Int) -> m (State m, Int))
-> STM m (State m, Int) -> m (State m, Int)
forall a b. (a -> b) -> a -> b
$ (State m -> Int) -> Int -> STM m (State m) -> STM m (State m, Int)
forall (m :: * -> *) a b.
(MonadSTM m, Eq b) =>
(a -> b) -> b -> STM m a -> STM m (a, b)
blockUntilChanged State m -> Int
forall (m :: * -> *). State m -> Int
configGeneration Int
oldConfigGeneration (STM m (State m) -> STM m (State m, Int))
-> STM m (State m) -> STM m (State m, Int)
forall a b. (a -> b) -> a -> b
$ Bucket m -> STM m (State m)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar Bucket m
bucket
else
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Int
timeToWaitMicroseconds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar m Bool
varTimeout <- Int -> m (TVar m Bool)
forall (m :: * -> *). MonadTimer m => Int -> m (TVar m Bool)
registerDelay Int
timeToWaitMicroseconds
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
(Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ()) -> STM m Bool -> STM m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar m Bool -> STM m Bool
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
TVar.readTVar TVar m Bool
varTimeout)
STM m () -> STM m () -> STM m ()
forall a. STM m a -> STM m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a -> STM m a -> STM m a
`orElse`
(STM m (Int, Int) -> STM m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM m (Int, Int) -> STM m ()) -> STM m (Int, Int) -> STM m ()
forall a b. (a -> b) -> a -> b
$ (Int -> Int) -> Int -> STM m Int -> STM m (Int, Int)
forall (m :: * -> *) a b.
(MonadSTM m, Eq b) =>
(a -> b) -> b -> STM m a -> STM m (a, b)
blockUntilChanged Int -> Int
forall a. a -> a
id Int
leakingPeriodVersion STM m Int
leakingPeriodVersionSTM)
snapshot ::
( MonadSTM m
) =>
Bucket m ->
Time ->
STM m (State m)
snapshot :: forall (m :: * -> *).
MonadSTM m =>
Bucket m -> Time -> STM m (State m)
snapshot Bucket m
bucket Time
newTime = (State m, FillResult) -> State m
forall a b. (a, b) -> a
fst ((State m, FillResult) -> State m)
-> STM m (State m, FillResult) -> STM m (State m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bucket m -> Rational -> Time -> STM m (State m, FillResult)
forall (m :: * -> *).
MonadSTM m =>
Bucket m -> Rational -> Time -> STM m (State m, FillResult)
snapshotFill Bucket m
bucket Rational
0 Time
newTime
snapshotFill ::
( MonadSTM m
) =>
Bucket m ->
Rational ->
Time ->
STM m (State m, FillResult)
snapshotFill :: forall (m :: * -> *).
MonadSTM m =>
Bucket m -> Rational -> Time -> STM m (State m, FillResult)
snapshotFill Bucket m
bucket Rational
toAdd Time
newTime = do
State {Rational
level :: forall (m :: * -> *). State m -> Rational
level :: Rational
level, Time
time :: forall (m :: * -> *). State m -> Time
time :: Time
time, Bool
paused :: forall (m :: * -> *). State m -> Bool
paused :: Bool
paused, Int
configGeneration :: forall (m :: * -> *). State m -> Int
configGeneration :: Int
configGeneration, config :: forall (m :: * -> *). State m -> Config m
config = Config m
config} <- Bucket m -> STM m (State m)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar Bucket m
bucket
let Config {Rational
rate :: forall (m :: * -> *). Config m -> Rational
rate :: Rational
rate, Rational
capacity :: forall (m :: * -> *). Config m -> Rational
capacity :: Rational
capacity, Bool
fillOnOverflow :: forall (m :: * -> *). Config m -> Bool
fillOnOverflow :: Bool
fillOnOverflow} = Config m
config
elapsed :: DiffTime
elapsed = Time -> Time -> DiffTime
diffTime Time
newTime Time
time
leaked :: Rational
leaked = if Bool
paused then Rational
0 else (DiffTime -> Rational
diffTimeToSecondsRational DiffTime
elapsed Rational -> Rational -> Rational
forall a. Num a => a -> a -> a
* Rational
rate)
levelLeaked :: Rational
levelLeaked = (Rational, Rational) -> Rational -> Rational
forall a. Ord a => (a, a) -> a -> a
clamp (Rational
0, Rational
capacity) (Rational
level Rational -> Rational -> Rational
forall a. Num a => a -> a -> a
- Rational
leaked)
levelFilled :: Rational
levelFilled = (Rational, Rational) -> Rational -> Rational
forall a. Ord a => (a, a) -> a -> a
clamp (Rational
0, Rational
capacity) (Rational
levelLeaked Rational -> Rational -> Rational
forall a. Num a => a -> a -> a
+ Rational
toAdd)
overflew :: Bool
overflew = Rational
levelLeaked Rational -> Rational -> Rational
forall a. Num a => a -> a -> a
+ Rational
toAdd Rational -> Rational -> Bool
forall a. Ord a => a -> a -> Bool
> Rational
capacity
newLevel :: Rational
newLevel = if Bool -> Bool
not Bool
overflew Bool -> Bool -> Bool
|| Bool
fillOnOverflow then Rational
levelFilled else Rational
levelLeaked
!newState :: State m
newState = State {time :: Time
time = Time
newTime, level :: Rational
level = Rational
newLevel, Bool
paused :: Bool
paused :: Bool
paused, Int
configGeneration :: Int
configGeneration :: Int
configGeneration, Config m
config :: Config m
config :: Config m
config}
Bucket m -> State m -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar Bucket m
bucket State m
newState
(State m, FillResult) -> STM m (State m, FillResult)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (State m
newState, if Bool
overflew then FillResult
Overflew else FillResult
DidNotOverflow)
diffTimeToSecondsRational :: DiffTime -> Rational
diffTimeToSecondsRational :: DiffTime -> Rational
diffTimeToSecondsRational = (Integer -> Integer -> Rational
forall a. Integral a => a -> a -> Ratio a
% Integer
picosecondsPerSecond) (Integer -> Rational)
-> (DiffTime -> Integer) -> DiffTime -> Rational
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DiffTime -> Integer
diffTimeToPicoseconds
secondsRationalToDiffTime :: Rational -> DiffTime
secondsRationalToDiffTime :: Rational -> DiffTime
secondsRationalToDiffTime = Rational -> DiffTime
forall a b. (Real a, Fractional b) => a -> b
realToFrac
atomicallyWithMonotonicTime ::
( MonadMonotonicTime m,
MonadSTM m
) =>
(Time -> STM m b) ->
m b
atomicallyWithMonotonicTime :: forall (m :: * -> *) b.
(MonadMonotonicTime m, MonadSTM m) =>
(Time -> STM m b) -> m b
atomicallyWithMonotonicTime Time -> STM m b
f =
STM m b -> m b
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m b -> m b) -> (Time -> STM m b) -> Time -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Time -> STM m b
f (Time -> m b) -> m Time -> m b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
clamp :: Ord a => (a, a) -> a -> a
clamp :: forall a. Ord a => (a, a) -> a -> a
clamp (a
low, a
high) a
x = a -> a -> a
forall a. Ord a => a -> a -> a
min a
high (a -> a -> a
forall a. Ord a => a -> a -> a
max a
low a
x)
microsecondsPerSecond :: Integer
microsecondsPerSecond :: Integer
microsecondsPerSecond = Integer
1_000_000
picosecondsPerSecond :: Integer
picosecondsPerSecond :: Integer
picosecondsPerSecond = Integer
1_000_000_000_000