{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | Creating a mempool
module Ouroboros.Consensus.Mempool.Init
  ( openMempool
  , openMempoolWithoutSyncThread
  ) where

import Control.Monad (void)
import Control.ResourceRegistry
import Control.Tracer
import Ouroboros.Consensus.HeaderValidation
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Mempool.API (Mempool (..))
import Ouroboros.Consensus.Mempool.Capacity
import Ouroboros.Consensus.Mempool.Impl.Common
import Ouroboros.Consensus.Mempool.Query
import Ouroboros.Consensus.Mempool.Update
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM

{-------------------------------------------------------------------------------
  Opening the mempool
-------------------------------------------------------------------------------}

-- | Create a @Mempool m blk@ in @m@ to manipulate the mempool. It will also
-- fork a thread that syncs the mempool and the ledger when the ledger changes.
openMempool ::
  ( IOLike m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  ) =>
  ResourceRegistry m ->
  LedgerInterface m blk ->
  LedgerConfig blk ->
  MempoolCapacityBytesOverride ->
  Tracer m (TraceEventMempool blk) ->
  m (Mempool m blk)
openMempool :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempool ResourceRegistry m
topLevelRegistry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer = do
  env <- LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer ResourceRegistry m
topLevelRegistry
  forkSyncStateOnTipPointChange topLevelRegistry env
  return $ mkMempool env

-- | Spawn a thread which syncs the 'Mempool' state whenever the 'LedgerState'
-- changes.
forkSyncStateOnTipPointChange ::
  forall m blk.
  ( IOLike m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  ) =>
  ResourceRegistry m ->
  MempoolEnv m blk ->
  m ()
forkSyncStateOnTipPointChange :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
ResourceRegistry m -> MempoolEnv m blk -> m ()
forkSyncStateOnTipPointChange ResourceRegistry m
topLevelRegistry MempoolEnv m blk
menv = do
  w <-
    ResourceRegistry m
-> String
-> Watcher
     m (MempoolLedgerDBView m blk) (MempoolLedgerDBView m blk)
-> m (Thread m Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher
      (MempoolEnv m blk -> ResourceRegistry m
forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry MempoolEnv m blk
menv)
      String
"Mempool.syncStateOnTipPointChange"
      Watcher
        { wFingerprint :: MempoolLedgerDBView m blk -> MempoolLedgerDBView m blk
wFingerprint = MempoolLedgerDBView m blk -> MempoolLedgerDBView m blk
forall a. a -> a
id
        , wInitial :: Maybe (MempoolLedgerDBView m blk)
wInitial = Maybe (MempoolLedgerDBView m blk)
forall a. Maybe a
Nothing
        , wNotify :: MempoolLedgerDBView m blk -> m ()
wNotify = MempoolLedgerDBView m blk -> m ()
action
        , wReader :: STM m (MempoolLedgerDBView m blk)
wReader = STM m (MempoolLedgerDBView m blk)
getCurrentTip
        }

  -- With this allocation on the top level registry, we make sure that we first
  -- stop the watcher thread before closing the mempool registry, as otherwise
  -- we would run into a race condition (the thread might try to re-sync and
  -- allocate a forker on the mempool registry which would be closing down).
  void $ allocate topLevelRegistry (\ResourceId
_ -> Thread m Void -> m (Thread m Void)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Thread m Void
w) cancelThread
 where
  action :: MempoolLedgerDBView m blk -> m ()
  action :: MempoolLedgerDBView m blk -> m ()
action MempoolLedgerDBView m blk
_a =
    m (MempoolSnapshot blk) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (MempoolSnapshot blk) -> m ())
-> m (MempoolSnapshot blk) -> m ()
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
menv

  -- Using the tip ('Point') allows for quicker equality checks
  getCurrentTip :: STM m (MempoolLedgerDBView m blk)
  getCurrentTip :: STM m (MempoolLedgerDBView m blk)
getCurrentTip =
    LedgerInterface m blk
-> ResourceRegistry m -> STM m (MempoolLedgerDBView m blk)
forall (m :: * -> *) blk.
LedgerInterface m blk
-> ResourceRegistry m -> STM m (MempoolLedgerDBView m blk)
getCurrentLedgerState (MempoolEnv m blk -> LedgerInterface m blk
forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger MempoolEnv m blk
menv) (MempoolEnv m blk -> ResourceRegistry m
forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry MempoolEnv m blk
menv)

-- | Unlike 'openMempool', this function does not fork a background thread
-- that synchronises with the ledger state whenever the later changes.
--
-- Intended for testing purposes.
openMempoolWithoutSyncThread ::
  ( IOLike m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  ) =>
  ResourceRegistry m ->
  LedgerInterface m blk ->
  LedgerConfig blk ->
  MempoolCapacityBytesOverride ->
  Tracer m (TraceEventMempool blk) ->
  m (Mempool m blk)
openMempoolWithoutSyncThread :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempoolWithoutSyncThread ResourceRegistry m
registry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer =
  MempoolEnv m blk -> Mempool m blk
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool (MempoolEnv m blk -> Mempool m blk)
-> m (MempoolEnv m blk) -> m (Mempool m blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> ResourceRegistry m
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer ResourceRegistry m
registry

mkMempool ::
  ( IOLike m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  ) =>
  MempoolEnv m blk -> Mempool m blk
mkMempool :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool MempoolEnv m blk
mpEnv =
  Mempool
    { addTx :: AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
addTx = MempoolEnv m blk
-> AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
implAddTx MempoolEnv m blk
mpEnv
    , removeTxsEvenIfValid :: NonEmpty (GenTxId blk) -> m ()
removeTxsEvenIfValid = MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
implRemoveTxsEvenIfValid MempoolEnv m blk
mpEnv
    , getSnapshot :: STM m (MempoolSnapshot blk)
getSnapshot = InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk,
 GetTip (TickedLedgerState blk)) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS (InternalState blk -> MempoolSnapshot blk)
-> STM m (InternalState blk) -> STM m (MempoolSnapshot blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTMVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (InternalState blk)
istate
    , getSnapshotFor :: SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
    -> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
getSnapshotFor = MempoolEnv m blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
    -> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> (LedgerTables (LedgerState blk) KeysMK
    -> m (LedgerTables (LedgerState blk) ValuesMK))
-> m (MempoolSnapshot blk)
implGetSnapshotFor MempoolEnv m blk
mpEnv
    , getCapacity :: STM m (TxMeasure blk)
getCapacity = InternalState blk -> TxMeasure blk
forall blk. InternalState blk -> TxMeasure blk
isCapacity (InternalState blk -> TxMeasure blk)
-> STM m (InternalState blk) -> STM m (TxMeasure blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTMVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (InternalState blk)
istate
    , testSyncWithLedger :: m (MempoolSnapshot blk)
testSyncWithLedger = MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv
    , testForkMempoolThread :: forall a. String -> m a -> m (Thread m a)
testForkMempoolThread = ResourceRegistry m -> String -> m a -> m (Thread m a)
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m, HasCallStack) =>
ResourceRegistry m -> String -> m a -> m (Thread m a)
forkLinkedThread (MempoolEnv m blk -> ResourceRegistry m
forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry MempoolEnv m blk
mpEnv)
    }
 where
  MempoolEnv
    { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar = StrictTMVar m (InternalState blk)
istate
    } = MempoolEnv m blk
mpEnv