{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | Creating a mempool
module Ouroboros.Consensus.Mempool.Init
  ( openMempool
  , openMempoolWithoutSyncThread
  ) where

import Control.Monad (void)
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.ResourceRegistry
import Control.Tracer
import Data.Functor.Identity (runIdentity)
import Ouroboros.Consensus.HeaderValidation
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Mempool.API (Mempool (..), MempoolTimeoutConfig)
import Ouroboros.Consensus.Mempool.Capacity
import Ouroboros.Consensus.Mempool.Impl.Common
import Ouroboros.Consensus.Mempool.Query
import Ouroboros.Consensus.Mempool.Update
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM

{-------------------------------------------------------------------------------
  Opening the mempool
-------------------------------------------------------------------------------}

-- | Create a @Mempool m blk@ in @m@ to manipulate the mempool. It will also
-- fork a thread that syncs the mempool and the ledger when the ledger changes.
openMempool ::
  ( IOLike m
  , MonadTimer m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  ) =>
  ResourceRegistry m ->
  LedgerInterface m blk ->
  LedgerConfig blk ->
  MempoolCapacityBytesOverride ->
  Maybe MempoolTimeoutConfig ->
  Tracer m (TraceEventMempool blk) ->
  m (Mempool m blk)
openMempool :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
 HasTxId (GenTx blk), ValidateEnvelope blk) =>
ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempool ResourceRegistry m
topLevelRegistry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer = do
  env <- LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer ResourceRegistry m
topLevelRegistry
  forkSyncStateOnTipPointChange env
  return $ mkMempool env

-- | Spawn a thread which syncs the 'Mempool' state whenever the 'LedgerState'
-- changes.
forkSyncStateOnTipPointChange ::
  forall m blk.
  ( IOLike m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  ) =>
  MempoolEnv m blk ->
  m ()
forkSyncStateOnTipPointChange :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> m ()
forkSyncStateOnTipPointChange MempoolEnv m blk
menv =
  m (Thread m Void) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Void) -> m ()) -> m (Thread m Void) -> m ()
forall a b. (a -> b) -> a -> b
$
    ResourceRegistry m
-> String
-> Watcher
     m (MempoolLedgerDBView m blk) (MempoolLedgerDBView m blk)
-> m (Thread m Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher
      (MempoolEnv m blk -> ResourceRegistry m
forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry MempoolEnv m blk
menv)
      String
"Mempool.syncStateOnTipPointChange"
      Watcher
        { wFingerprint :: MempoolLedgerDBView m blk -> MempoolLedgerDBView m blk
wFingerprint = MempoolLedgerDBView m blk -> MempoolLedgerDBView m blk
forall a. a -> a
id
        , wInitial :: Maybe (MempoolLedgerDBView m blk)
wInitial = Maybe (MempoolLedgerDBView m blk)
forall a. Maybe a
Nothing
        , wNotify :: MempoolLedgerDBView m blk -> m ()
wNotify = MempoolLedgerDBView m blk -> m ()
action
        , wReader :: STM m (MempoolLedgerDBView m blk)
wReader = STM m (MempoolLedgerDBView m blk)
getCurrentTip
        }
 where
  action :: MempoolLedgerDBView m blk -> m ()
  action :: MempoolLedgerDBView m blk -> m ()
action MempoolLedgerDBView m blk
_a =
    m (MempoolSnapshot blk) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (MempoolSnapshot blk) -> m ())
-> m (MempoolSnapshot blk) -> m ()
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
menv

  -- Using the tip ('Point') allows for quicker equality checks
  getCurrentTip :: STM m (MempoolLedgerDBView m blk)
  getCurrentTip :: STM m (MempoolLedgerDBView m blk)
getCurrentTip =
    LedgerInterface m blk
-> ResourceRegistry m -> STM m (MempoolLedgerDBView m blk)
forall (m :: * -> *) blk.
LedgerInterface m blk
-> ResourceRegistry m -> STM m (MempoolLedgerDBView m blk)
getCurrentLedgerState (MempoolEnv m blk -> LedgerInterface m blk
forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger MempoolEnv m blk
menv) (MempoolEnv m blk -> ResourceRegistry m
forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry MempoolEnv m blk
menv)

-- | Unlike 'openMempool', this function does not fork a background thread
-- that synchronises with the ledger state whenever the later changes.
--
-- Intended for testing purposes.
openMempoolWithoutSyncThread ::
  ( IOLike m
  , MonadTimer m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  ) =>
  ResourceRegistry m ->
  LedgerInterface m blk ->
  LedgerConfig blk ->
  MempoolCapacityBytesOverride ->
  Maybe MempoolTimeoutConfig ->
  Tracer m (TraceEventMempool blk) ->
  m (Mempool m blk)
openMempoolWithoutSyncThread :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
 HasTxId (GenTx blk), ValidateEnvelope blk) =>
ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempoolWithoutSyncThread ResourceRegistry m
registry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer =
  MempoolEnv m blk -> Mempool m blk
forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
 HasTxId (GenTx blk), ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool (MempoolEnv m blk -> Mempool m blk)
-> m (MempoolEnv m blk) -> m (Mempool m blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Maybe MempoolTimeoutConfig
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Maybe MempoolTimeoutConfig
timeoutConfig Tracer m (TraceEventMempool blk)
tracer ResourceRegistry m
registry

mkMempool ::
  ( IOLike m
  , MonadTimer m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  ) =>
  MempoolEnv m blk -> Mempool m blk
mkMempool :: forall (m :: * -> *) blk.
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
 HasTxId (GenTx blk), ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool MempoolEnv m blk
mpEnv =
  Mempool
    { addTx :: AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
addTx = (Identity (MempoolAddTxResult blk) -> MempoolAddTxResult blk)
-> m (Identity (MempoolAddTxResult blk))
-> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Identity (MempoolAddTxResult blk) -> MempoolAddTxResult blk
forall a. Identity a -> a
runIdentity (m (Identity (MempoolAddTxResult blk))
 -> m (MempoolAddTxResult blk))
-> (AddTxOnBehalfOf
    -> GenTx blk -> m (Identity (MempoolAddTxResult blk)))
-> AddTxOnBehalfOf
-> GenTx blk
-> m (MempoolAddTxResult blk)
forall y z x0 x1. (y -> z) -> (x0 -> x1 -> y) -> x0 -> x1 -> z
.: MempoolEnv m blk
-> WhichAddTx Identity
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Identity (MempoolAddTxResult blk))
forall (m :: * -> *) blk (f :: * -> *).
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> WhichAddTx f
-> AddTxOnBehalfOf
-> GenTx blk
-> m (f (MempoolAddTxResult blk))
implAddTx MempoolEnv m blk
mpEnv WhichAddTx Identity
ProductionAddTx
    , removeTxsEvenIfValid :: NonEmpty (GenTxId blk) -> m ()
removeTxsEvenIfValid = MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
implRemoveTxsEvenIfValid MempoolEnv m blk
mpEnv
    , getSnapshot :: STM m (MempoolSnapshot blk)
getSnapshot = InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk,
 GetTip (TickedLedgerState blk)) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS (InternalState blk -> MempoolSnapshot blk)
-> STM m (InternalState blk) -> STM m (MempoolSnapshot blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTMVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (InternalState blk)
istate
    , getSnapshotFor :: SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
    -> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
getSnapshotFor = MempoolEnv m blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
    -> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
    -> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
implGetSnapshotFor MempoolEnv m blk
mpEnv
    , getCapacity :: STM m (TxMeasure blk)
getCapacity = InternalState blk -> TxMeasure blk
forall blk. InternalState blk -> TxMeasure blk
isCapacity (InternalState blk -> TxMeasure blk)
-> STM m (InternalState blk) -> STM m (TxMeasure blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTMVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (InternalState blk)
istate
    , testSyncWithLedger :: m (MempoolSnapshot blk)
testSyncWithLedger = MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv
    , testForkMempoolThread :: forall a. String -> m a -> m (Thread m a)
testForkMempoolThread = ResourceRegistry m -> String -> m a -> m (Thread m a)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread (MempoolEnv m blk -> ResourceRegistry m
forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry MempoolEnv m blk
mpEnv)
    , testTryAddTx :: DiffTime
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk))
testTryAddTx = MempoolEnv m blk
-> WhichAddTx Maybe
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk))
forall (m :: * -> *) blk (f :: * -> *).
(IOLike m, MonadTimer m, LedgerSupportsMempool blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> WhichAddTx f
-> AddTxOnBehalfOf
-> GenTx blk
-> m (f (MempoolAddTxResult blk))
implAddTx MempoolEnv m blk
mpEnv (WhichAddTx Maybe
 -> AddTxOnBehalfOf
 -> GenTx blk
 -> m (Maybe (MempoolAddTxResult blk)))
-> (DiffTime -> WhichAddTx Maybe)
-> DiffTime
-> AddTxOnBehalfOf
-> GenTx blk
-> m (Maybe (MempoolAddTxResult blk))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DiffTime -> WhichAddTx Maybe
TestingAddTx
    }
 where
  MempoolEnv
    { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar = StrictTMVar m (InternalState blk)
istate
    } = MempoolEnv m blk
mpEnv