{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Consensus.Mempool.Init
( openMempool
, openMempoolWithoutSyncThread
) where
import Control.Monad (void)
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.ResourceRegistry
import Control.Tracer
import Data.Functor.Identity (runIdentity)
import Ouroboros.Consensus.HeaderValidation
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Mempool.API (Mempool (..), MempoolTimeoutConfig)
import Ouroboros.Consensus.Mempool.Capacity
import Ouroboros.Consensus.Mempool.Impl.Common
import Ouroboros.Consensus.Mempool.Query
import Ouroboros.Consensus.Mempool.Update
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM
openMempool ::
( IOLike m
, MonadTimer m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
) =>
ResourceRegistry m ->
LedgerInterface m blk ->
LedgerConfig blk ->
MempoolCapacityBytesOverride ->
Maybe MempoolTimeoutConfig ->
Tracer m (TraceEventMempool blk) ->
m (Mempool m blk)
openMempool :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk), ValidateEnvelope blk) =>
ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempool ResourceRegistry m
topLevelRegistry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer = do
env <- LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer ResourceRegistry m
topLevelRegistry
forkSyncStateOnTipPointChange env
return $ mkMempool env
forkSyncStateOnTipPointChange ::
forall m blk.
( IOLike m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
) =>
MempoolEnv m blk ->
m ()
forkSyncStateOnTipPointChange :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
MempoolEnv m blk -> m ()
forkSyncStateOnTipPointChange MempoolEnv m blk
menv =
m (Thread m Void) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Void) -> m ()) -> m (Thread m Void) -> m ()
forall a b. (a -> b) -> a -> b
$
ResourceRegistry m
-> String
-> Watcher
m (MempoolLedgerDBView m blk) (MempoolLedgerDBView m blk)
-> m (Thread m Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher
(MempoolEnv m blk -> ResourceRegistry m
forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry MempoolEnv m blk
menv)
String
"Mempool.syncStateOnTipPointChange"
Watcher
{ wFingerprint :: MempoolLedgerDBView m blk -> MempoolLedgerDBView m blk
wFingerprint = MempoolLedgerDBView m blk -> MempoolLedgerDBView m blk
forall a. a -> a
id
, wInitial :: Maybe (MempoolLedgerDBView m blk)
wInitial = Maybe (MempoolLedgerDBView m blk)
forall a. Maybe a
Nothing
, wNotify :: MempoolLedgerDBView m blk -> m ()
wNotify = MempoolLedgerDBView m blk -> m ()
action
, wReader :: STM m (MempoolLedgerDBView m blk)
wReader = STM m (MempoolLedgerDBView m blk)
getCurrentTip
}
where
action :: MempoolLedgerDBView m blk -> m ()
action :: MempoolLedgerDBView m blk -> m ()
action MempoolLedgerDBView m blk
_a =
m (MempoolSnapshot blk) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (MempoolSnapshot blk) -> m ())
-> m (MempoolSnapshot blk) -> m ()
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
menv
getCurrentTip :: STM m (MempoolLedgerDBView m blk)
getCurrentTip :: STM m (MempoolLedgerDBView m blk)
getCurrentTip =
LedgerInterface m blk
-> ResourceRegistry m -> STM m (MempoolLedgerDBView m blk)
forall (m :: * -> *) blk.
LedgerInterface m blk
-> ResourceRegistry m -> STM m (MempoolLedgerDBView m blk)
getCurrentLedgerState (MempoolEnv m blk -> LedgerInterface m blk
forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger MempoolEnv m blk
menv) (MempoolEnv m blk -> ResourceRegistry m
forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry MempoolEnv m blk
menv)
openMempoolWithoutSyncThread ::
( IOLike m
, MonadTimer m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
) =>
ResourceRegistry m ->
LedgerInterface m blk ->
LedgerConfig blk ->
MempoolCapacityBytesOverride ->
Maybe MempoolTimeoutConfig ->
Tracer m (TraceEventMempool blk) ->
m (Mempool m blk)
openMempoolWithoutSyncThread :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk), ValidateEnvelope blk) =>
ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempoolWithoutSyncThread ResourceRegistry m
registry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer =
MempoolEnv m blk -> Mempool m blk
forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk), ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool (MempoolEnv m blk -> Mempool m blk)
-> m (MempoolEnv m blk) -> m (Mempool m blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer ResourceRegistry m
registry
mkMempool ::
( IOLike m
, MonadTimer m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
) =>
MempoolEnv m blk -> Mempool m blk
mkMempool :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk), ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool MempoolEnv m blk
mpEnv =
Mempool
{ addTx :: AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
addTx = (Identity (MempoolAddTxResult blk) -> MempoolAddTxResult blk)
-> m (Identity (MempoolAddTxResult blk))
-> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Identity (MempoolAddTxResult blk) -> MempoolAddTxResult blk
forall a. Identity a -> a
runIdentity (m (Identity (MempoolAddTxResult blk))
-> m (MempoolAddTxResult blk))
-> (AddTxOnBehalfOf
-> GenTx blk -> m (Identity (MempoolAddTxResult blk)))
-> AddTxOnBehalfOf
-> GenTx blk
-> m (MempoolAddTxResult blk)
forall y z x0 x1. (y -> z) -> (x0 -> x1 -> y) -> x0 -> x1 -> z
.: MempoolEnv m blk
-> WhichAddTx Identity
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Identity (MempoolAddTxResult blk))
forall (m :: * -> *) blk (f :: * -> *).
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> WhichAddTx f
-> AddTxOnBehalfOf
-> GenTx blk
-> m (f (MempoolAddTxResult blk))
implAddTx MempoolEnv m blk
mpEnv WhichAddTx Identity
ProductionAddTx
, removeTxsEvenIfValid :: NonEmpty (GenTxId blk) -> m ()
removeTxsEvenIfValid = MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
implRemoveTxsEvenIfValid MempoolEnv m blk
mpEnv
, getSnapshot :: STM m (MempoolSnapshot blk)
getSnapshot = InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk,
GetTip (TickedLedgerState blk)) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS (InternalState blk -> MempoolSnapshot blk)
-> STM m (InternalState blk) -> STM m (MempoolSnapshot blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTMVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (InternalState blk)
istate
, getSnapshotFor :: SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
-> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
getSnapshotFor = MempoolEnv m blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
-> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
-> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
implGetSnapshotFor MempoolEnv m blk
mpEnv
, getCapacity :: STM m (TxMeasure blk)
getCapacity = InternalState blk -> TxMeasure blk
forall blk. InternalState blk -> TxMeasure blk
isCapacity (InternalState blk -> TxMeasure blk)
-> STM m (InternalState blk) -> STM m (TxMeasure blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTMVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (InternalState blk)
istate
, testSyncWithLedger :: m (MempoolSnapshot blk)
testSyncWithLedger = MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv
, testForkMempoolThread :: forall a. String -> m a -> m (Thread m a)
testForkMempoolThread = ResourceRegistry m -> String -> m a -> m (Thread m a)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread (MempoolEnv m blk -> ResourceRegistry m
forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry MempoolEnv m blk
mpEnv)
, testTryAddTx :: DiffTime
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk))
testTryAddTx = MempoolEnv m blk
-> WhichAddTx Maybe
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk))
forall (m :: * -> *) blk (f :: * -> *).
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> WhichAddTx f
-> AddTxOnBehalfOf
-> GenTx blk
-> m (f (MempoolAddTxResult blk))
implAddTx MempoolEnv m blk
mpEnv (WhichAddTx Maybe
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk)))
-> (DiffTime -> WhichAddTx Maybe)
-> DiffTime
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DiffTime -> WhichAddTx Maybe
TestingAddTx
}
where
MempoolEnv
{ mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar = StrictTMVar m (InternalState blk)
istate
} = MempoolEnv m blk
mpEnv