{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Consensus.Mempool.Init (
openMempool
, openMempoolWithoutSyncThread
) where
import Control.Monad (void)
import Control.ResourceRegistry
import Control.Tracer
import Data.Functor.Contravariant ((>$<))
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.HeaderValidation
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Mempool.API
import Ouroboros.Consensus.Mempool.Capacity
import Ouroboros.Consensus.Mempool.Impl.Common
import Ouroboros.Consensus.Mempool.Query
import Ouroboros.Consensus.Mempool.Update
import Ouroboros.Consensus.Util.Enclose
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)
openMempool ::
( IOLike m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
)
=> ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempool :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempool ResourceRegistry m
registry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer = do
MempoolEnv m blk
env <- LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, NoThunks (GenTxId blk), LedgerSupportsMempool blk,
ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer
ResourceRegistry m -> MempoolEnv m blk -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
ResourceRegistry m -> MempoolEnv m blk -> m ()
forkSyncStateOnTipPointChange ResourceRegistry m
registry MempoolEnv m blk
env
Mempool m blk -> m (Mempool m blk)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Mempool m blk -> m (Mempool m blk))
-> Mempool m blk -> m (Mempool m blk)
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> Mempool m blk
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool MempoolEnv m blk
env
forkSyncStateOnTipPointChange :: forall m blk.
( IOLike m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
)
=> ResourceRegistry m
-> MempoolEnv m blk
-> m ()
forkSyncStateOnTipPointChange :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
ResourceRegistry m -> MempoolEnv m blk -> m ()
forkSyncStateOnTipPointChange ResourceRegistry m
registry MempoolEnv m blk
menv =
m (Thread m Void) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Void) -> m ()) -> m (Thread m Void) -> m ()
forall a b. (a -> b) -> a -> b
$ ResourceRegistry m
-> String -> Watcher m (Point blk) (Point blk) -> m (Thread m Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher
ResourceRegistry m
registry
String
"Mempool.syncStateOnTipPointChange"
Watcher {
wFingerprint :: Point blk -> Point blk
wFingerprint = Point blk -> Point blk
forall a. a -> a
id
, wInitial :: Maybe (Point blk)
wInitial = Maybe (Point blk)
forall a. Maybe a
Nothing
, wNotify :: Point blk -> m ()
wNotify = Point blk -> m ()
action
, wReader :: STM m (Point blk)
wReader = STM m (Point blk)
getCurrentTip
}
where
action :: Point blk -> m ()
action :: Point blk -> m ()
action Point blk
_tipPoint =
Tracer m EnclosingTimed -> m () -> m ()
forall (m :: * -> *) a.
MonadMonotonicTime m =>
Tracer m EnclosingTimed -> m a -> m a
encloseTimedWith (EnclosingTimed -> TraceEventMempool blk
forall blk. EnclosingTimed -> TraceEventMempool blk
TraceMempoolSynced (EnclosingTimed -> TraceEventMempool blk)
-> Tracer m (TraceEventMempool blk) -> Tracer m EnclosingTimed
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer MempoolEnv m blk
menv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
m (MempoolSnapshot blk) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (MempoolSnapshot blk) -> m ())
-> m (MempoolSnapshot blk) -> m ()
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
menv
getCurrentTip :: STM m (Point blk)
getCurrentTip :: STM m (Point blk)
getCurrentTip =
LedgerState blk -> Point blk
forall blk. UpdateLedger blk => LedgerState blk -> Point blk
ledgerTipPoint
(LedgerState blk -> Point blk)
-> STM m (LedgerState blk) -> STM m (Point blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk -> STM m (LedgerState blk)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk)
getCurrentLedgerState (MempoolEnv m blk -> LedgerInterface m blk
forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger MempoolEnv m blk
menv)
openMempoolWithoutSyncThread ::
( IOLike m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
)
=> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempoolWithoutSyncThread :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempoolWithoutSyncThread LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer =
MempoolEnv m blk -> Mempool m blk
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool (MempoolEnv m blk -> Mempool m blk)
-> m (MempoolEnv m blk) -> m (Mempool m blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, NoThunks (GenTxId blk), LedgerSupportsMempool blk,
ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer
mkMempool ::
( IOLike m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
)
=> MempoolEnv m blk -> Mempool m blk
mkMempool :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool MempoolEnv m blk
mpEnv = Mempool
{ addTx :: AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
addTx = StrictTVar m (InternalState blk)
-> MVar m ()
-> MVar m ()
-> LedgerConfig blk
-> Tracer m (TraceEventMempool blk)
-> AddTxOnBehalfOf
-> GenTx blk
-> m (MempoolAddTxResult blk)
forall (m :: * -> *) blk.
(MonadSTM m, MonadMVar m, LedgerSupportsMempool blk,
HasTxId (GenTx blk)) =>
StrictTVar m (InternalState blk)
-> MVar m ()
-> MVar m ()
-> LedgerConfig blk
-> Tracer m (TraceEventMempool blk)
-> AddTxOnBehalfOf
-> GenTx blk
-> m (MempoolAddTxResult blk)
implAddTx StrictTVar m (InternalState blk)
istate MVar m ()
remoteFifo MVar m ()
allFifo LedgerConfig blk
cfg Tracer m (TraceEventMempool blk)
trcr
, removeTxs :: [GenTxId blk] -> m ()
removeTxs = MempoolEnv m blk -> [GenTxId blk] -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
MempoolEnv m blk -> [GenTxId blk] -> m ()
implRemoveTxs MempoolEnv m blk
mpEnv
, syncWithLedger :: m (MempoolSnapshot blk)
syncWithLedger = MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv
, getSnapshot :: STM m (MempoolSnapshot blk)
getSnapshot = InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS (InternalState blk -> MempoolSnapshot blk)
-> STM m (InternalState blk) -> STM m (MempoolSnapshot blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
, getSnapshotFor :: ForgeLedgerState blk -> STM m (MempoolSnapshot blk)
getSnapshotFor = \ForgeLedgerState blk
fls -> LedgerConfig blk
-> ForgeLedgerState blk
-> MempoolCapacityBytesOverride
-> InternalState blk
-> MempoolSnapshot blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
LedgerConfig blk
-> ForgeLedgerState blk
-> MempoolCapacityBytesOverride
-> InternalState blk
-> MempoolSnapshot blk
pureGetSnapshotFor LedgerConfig blk
cfg ForgeLedgerState blk
fls MempoolCapacityBytesOverride
co (InternalState blk -> MempoolSnapshot blk)
-> STM m (InternalState blk) -> STM m (MempoolSnapshot blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
, getCapacity :: STM m (TxMeasure blk)
getCapacity = InternalState blk -> TxMeasure blk
forall blk. InternalState blk -> TxMeasure blk
isCapacity (InternalState blk -> TxMeasure blk)
-> STM m (InternalState blk) -> STM m (TxMeasure blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
}
where MempoolEnv { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTVar m (InternalState blk)
mpEnvStateVar = StrictTVar m (InternalState blk)
istate
, mpEnvAddTxsRemoteFifo :: forall (m :: * -> *) blk. MempoolEnv m blk -> MVar m ()
mpEnvAddTxsRemoteFifo = MVar m ()
remoteFifo
, mpEnvAddTxsAllFifo :: forall (m :: * -> *) blk. MempoolEnv m blk -> MVar m ()
mpEnvAddTxsAllFifo = MVar m ()
allFifo
, mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerConfig blk
cfg
, mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
, mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
co
} = MempoolEnv m blk
mpEnv