{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Consensus.Mempool.Init
( openMempool
, openMempoolWithoutSyncThread
) where
import Control.Monad (void)
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.ResourceRegistry
import Control.Tracer
import Data.Functor.Identity (runIdentity)
import Ouroboros.Consensus.HeaderValidation
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Mempool.API (Mempool (..), MempoolTimeoutConfig)
import Ouroboros.Consensus.Mempool.Capacity
import Ouroboros.Consensus.Mempool.Impl.Common
import Ouroboros.Consensus.Mempool.Query
import Ouroboros.Consensus.Mempool.Update
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM
import Ouroboros.Network.Block (Point)
openMempool ::
( IOLike m
, MonadTimer m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
) =>
ResourceRegistry m ->
LedgerInterface m blk ->
LedgerConfig blk ->
MempoolCapacityBytesOverride ->
Maybe MempoolTimeoutConfig ->
Tracer m (TraceEventMempool blk) ->
m (Mempool m blk)
openMempool :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk), ValidateEnvelope blk) =>
ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempool ResourceRegistry m
topLevelRegistry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer = do
env <- LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer
forkSyncStateOnTipPointChange env topLevelRegistry
return $ mkMempool env
forkSyncStateOnTipPointChange ::
forall m blk.
( IOLike m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
) =>
MempoolEnv m blk ->
ResourceRegistry m ->
m ()
forkSyncStateOnTipPointChange :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
MempoolEnv m blk -> ResourceRegistry m -> m ()
forkSyncStateOnTipPointChange MempoolEnv m blk
menv ResourceRegistry m
reg =
m (Thread m Void) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Void) -> m ()) -> m (Thread m Void) -> m ()
forall a b. (a -> b) -> a -> b
$
ResourceRegistry m
-> String -> Watcher m (Point blk) (Point blk) -> m (Thread m Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher
ResourceRegistry m
reg
String
"Mempool.syncStateOnTipPointChange"
Watcher
{ wFingerprint :: Point blk -> Point blk
wFingerprint = Point blk -> Point blk
forall a. a -> a
id
, wInitial :: Maybe (Point blk)
wInitial = Maybe (Point blk)
forall a. Maybe a
Nothing
, wNotify :: Point blk -> m ()
wNotify = Point blk -> m ()
action
, wReader :: STM m (Point blk)
wReader = STM m (Point blk)
getCurrentTip
}
where
action :: Point blk -> m ()
action :: Point blk -> m ()
action Point blk
_a = (InternalState blk -> ()) -> MempoolEnv m blk -> m ()
forall (m :: * -> *) blk r.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
HasTxId (GenTx blk)) =>
(InternalState blk -> r) -> MempoolEnv m blk -> m r
implSyncWithLedger (() -> InternalState blk -> ()
forall a b. a -> b -> a
const ()) MempoolEnv m blk
menv
getCurrentTip :: STM m (Point blk)
getCurrentTip :: STM m (Point blk)
getCurrentTip =
LedgerState blk EmptyMK -> Point blk
forall blk (mk :: MapKind).
UpdateLedger blk =>
LedgerState blk mk -> Point blk
ledgerTipPoint (LedgerState blk EmptyMK -> Point blk)
-> (MempoolLedgerDBView m blk -> LedgerState blk EmptyMK)
-> MempoolLedgerDBView m blk
-> Point blk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MempoolLedgerDBView m blk -> LedgerState blk EmptyMK
forall (m :: * -> *) blk.
MempoolLedgerDBView m blk -> LedgerState blk EmptyMK
mldViewState (MempoolLedgerDBView m blk -> Point blk)
-> STM m (MempoolLedgerDBView m blk) -> STM m (Point blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk -> STM m (MempoolLedgerDBView m blk)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (MempoolLedgerDBView m blk)
getCurrentLedgerState (MempoolEnv m blk -> LedgerInterface m blk
forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger MempoolEnv m blk
menv)
openMempoolWithoutSyncThread ::
( IOLike m
, MonadTimer m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
) =>
LedgerInterface m blk ->
LedgerConfig blk ->
MempoolCapacityBytesOverride ->
Maybe MempoolTimeoutConfig ->
Tracer m (TraceEventMempool blk) ->
m (Mempool m blk)
openMempoolWithoutSyncThread :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk), ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempoolWithoutSyncThread LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer =
MempoolEnv m blk -> Mempool m blk
forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk), ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool (MempoolEnv m blk -> Mempool m blk)
-> m (MempoolEnv m blk) -> m (Mempool m blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer
mkMempool ::
( IOLike m
, MonadTimer m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, ValidateEnvelope blk
) =>
MempoolEnv m blk -> Mempool m blk
mkMempool :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk), ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool MempoolEnv m blk
mpEnv =
Mempool
{ addTx :: AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
addTx = (Identity (MempoolAddTxResult blk) -> MempoolAddTxResult blk)
-> m (Identity (MempoolAddTxResult blk))
-> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Identity (MempoolAddTxResult blk) -> MempoolAddTxResult blk
forall a. Identity a -> a
runIdentity (m (Identity (MempoolAddTxResult blk))
-> m (MempoolAddTxResult blk))
-> (AddTxOnBehalfOf
-> GenTx blk -> m (Identity (MempoolAddTxResult blk)))
-> AddTxOnBehalfOf
-> GenTx blk
-> m (MempoolAddTxResult blk)
forall y z x0 x1. (y -> z) -> (x0 -> x1 -> y) -> x0 -> x1 -> z
.: MempoolEnv m blk
-> WhichAddTx Identity
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Identity (MempoolAddTxResult blk))
forall (m :: * -> *) blk (f :: * -> *).
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> WhichAddTx f
-> AddTxOnBehalfOf
-> GenTx blk
-> m (f (MempoolAddTxResult blk))
implAddTx MempoolEnv m blk
mpEnv WhichAddTx Identity
ProductionAddTx
, removeTxsEvenIfValid :: NonEmpty (GenTxId blk) -> m ()
removeTxsEvenIfValid = MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
implRemoveTxsEvenIfValid MempoolEnv m blk
mpEnv
, getSnapshot :: STM m (MempoolSnapshot blk)
getSnapshot = InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk,
GetTip (TickedLedgerState blk)) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS (InternalState blk -> MempoolSnapshot blk)
-> STM m (InternalState blk) -> STM m (MempoolSnapshot blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTMVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (InternalState blk)
istate
, getSnapshotFor :: SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
-> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
getSnapshotFor = MempoolEnv m blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
-> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
-> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
implGetSnapshotFor MempoolEnv m blk
mpEnv
, getCapacity :: STM m (TxMeasure blk)
getCapacity = InternalState blk -> TxMeasure blk
forall blk. InternalState blk -> TxMeasure blk
isCapacity (InternalState blk -> TxMeasure blk)
-> STM m (InternalState blk) -> STM m (TxMeasure blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTMVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (InternalState blk)
istate
, testSyncWithLedger :: m (MempoolSnapshot blk)
testSyncWithLedger = (InternalState blk -> MempoolSnapshot blk)
-> MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk r.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
HasTxId (GenTx blk)) =>
(InternalState blk -> r) -> MempoolEnv m blk -> m r
implSyncWithLedger InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk,
GetTip (TickedLedgerState blk)) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS MempoolEnv m blk
mpEnv
, testTryAddTx :: DiffTime
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk))
testTryAddTx = MempoolEnv m blk
-> WhichAddTx Maybe
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk))
forall (m :: * -> *) blk (f :: * -> *).
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> WhichAddTx f
-> AddTxOnBehalfOf
-> GenTx blk
-> m (f (MempoolAddTxResult blk))
implAddTx MempoolEnv m blk
mpEnv (WhichAddTx Maybe
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk)))
-> (DiffTime -> WhichAddTx Maybe)
-> DiffTime
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DiffTime -> WhichAddTx Maybe
TestingAddTx
}
where
MempoolEnv
{ mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar = StrictTMVar m (InternalState blk)
istate
} = MempoolEnv m blk
mpEnv