{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | Creating a mempool
module Ouroboros.Consensus.Mempool.Init (
    openMempool
  , openMempoolWithoutSyncThread
  ) where

import           Control.Monad (void)
import           Control.ResourceRegistry
import           Control.Tracer
import           Data.Functor.Contravariant ((>$<))
import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.HeaderValidation
import           Ouroboros.Consensus.Ledger.Abstract
import           Ouroboros.Consensus.Ledger.SupportsMempool
import           Ouroboros.Consensus.Mempool.API
import           Ouroboros.Consensus.Mempool.Capacity
import           Ouroboros.Consensus.Mempool.Impl.Common
import           Ouroboros.Consensus.Mempool.Query
import           Ouroboros.Consensus.Mempool.Update
import           Ouroboros.Consensus.Util.Enclose
import           Ouroboros.Consensus.Util.IOLike
import           Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)

{-------------------------------------------------------------------------------
  Opening the mempool
-------------------------------------------------------------------------------}

-- | Create a @Mempool m blk@ in @m@ to manipulate the mempool. It will also
-- fork a thread that syncs the mempool and the ledger when the ledger changes.
openMempool ::
     ( IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => ResourceRegistry m
  -> LedgerInterface m blk
  -> LedgerConfig blk
  -> MempoolCapacityBytesOverride
  -> Tracer m (TraceEventMempool blk)
  -> m (Mempool m blk)
openMempool :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempool ResourceRegistry m
registry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer = do
    MempoolEnv m blk
env <- LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, NoThunks (GenTxId blk), LedgerSupportsMempool blk,
 ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer
    ResourceRegistry m -> MempoolEnv m blk -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
ResourceRegistry m -> MempoolEnv m blk -> m ()
forkSyncStateOnTipPointChange ResourceRegistry m
registry MempoolEnv m blk
env
    Mempool m blk -> m (Mempool m blk)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Mempool m blk -> m (Mempool m blk))
-> Mempool m blk -> m (Mempool m blk)
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> Mempool m blk
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool MempoolEnv m blk
env

-- | Spawn a thread which syncs the 'Mempool' state whenever the 'LedgerState'
-- changes.
forkSyncStateOnTipPointChange :: forall m blk.
  ( IOLike m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  )
  => ResourceRegistry m
  -> MempoolEnv m blk
  -> m ()
forkSyncStateOnTipPointChange :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
ResourceRegistry m -> MempoolEnv m blk -> m ()
forkSyncStateOnTipPointChange ResourceRegistry m
registry MempoolEnv m blk
menv =
    m (Thread m Void) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Void) -> m ()) -> m (Thread m Void) -> m ()
forall a b. (a -> b) -> a -> b
$ ResourceRegistry m
-> String -> Watcher m (Point blk) (Point blk) -> m (Thread m Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher
      ResourceRegistry m
registry
      String
"Mempool.syncStateOnTipPointChange"
      Watcher {
          wFingerprint :: Point blk -> Point blk
wFingerprint = Point blk -> Point blk
forall a. a -> a
id
        , wInitial :: Maybe (Point blk)
wInitial     = Maybe (Point blk)
forall a. Maybe a
Nothing
        , wNotify :: Point blk -> m ()
wNotify      = Point blk -> m ()
action
        , wReader :: STM m (Point blk)
wReader      = STM m (Point blk)
getCurrentTip
        }
  where
    action :: Point blk -> m ()
    action :: Point blk -> m ()
action Point blk
_tipPoint =
      Tracer m EnclosingTimed -> m () -> m ()
forall (m :: * -> *) a.
MonadMonotonicTime m =>
Tracer m EnclosingTimed -> m a -> m a
encloseTimedWith (EnclosingTimed -> TraceEventMempool blk
forall blk. EnclosingTimed -> TraceEventMempool blk
TraceMempoolSynced (EnclosingTimed -> TraceEventMempool blk)
-> Tracer m (TraceEventMempool blk) -> Tracer m EnclosingTimed
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer MempoolEnv m blk
menv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
        m (MempoolSnapshot blk) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (MempoolSnapshot blk) -> m ())
-> m (MempoolSnapshot blk) -> m ()
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
menv

    -- Using the tip ('Point') allows for quicker equality checks
    getCurrentTip :: STM m (Point blk)
    getCurrentTip :: STM m (Point blk)
getCurrentTip =
          LedgerState blk -> Point blk
forall blk. UpdateLedger blk => LedgerState blk -> Point blk
ledgerTipPoint
      (LedgerState blk -> Point blk)
-> STM m (LedgerState blk) -> STM m (Point blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk -> STM m (LedgerState blk)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk)
getCurrentLedgerState (MempoolEnv m blk -> LedgerInterface m blk
forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger MempoolEnv m blk
menv)

-- | Unlike 'openMempool', this function does not fork a background thread
-- that synchronises with the ledger state whenever the later changes.
--
-- Intended for testing purposes.
openMempoolWithoutSyncThread ::
     ( IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => LedgerInterface m blk
  -> LedgerConfig blk
  -> MempoolCapacityBytesOverride
  -> Tracer m (TraceEventMempool blk)
  -> m (Mempool m blk)
openMempoolWithoutSyncThread :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
openMempoolWithoutSyncThread LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer =
    MempoolEnv m blk -> Mempool m blk
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool (MempoolEnv m blk -> Mempool m blk)
-> m (MempoolEnv m blk) -> m (Mempool m blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, NoThunks (GenTxId blk), LedgerSupportsMempool blk,
 ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer

mkMempool ::
     ( IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => MempoolEnv m blk -> Mempool m blk
mkMempool :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk
mkMempool MempoolEnv m blk
mpEnv = Mempool
    { addTx :: AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
addTx          = StrictTVar m (InternalState blk)
-> MVar m ()
-> MVar m ()
-> LedgerConfig blk
-> Tracer m (TraceEventMempool blk)
-> AddTxOnBehalfOf
-> GenTx blk
-> m (MempoolAddTxResult blk)
forall (m :: * -> *) blk.
(MonadSTM m, MonadMVar m, LedgerSupportsMempool blk,
 HasTxId (GenTx blk)) =>
StrictTVar m (InternalState blk)
-> MVar m ()
-> MVar m ()
-> LedgerConfig blk
-> Tracer m (TraceEventMempool blk)
-> AddTxOnBehalfOf
-> GenTx blk
-> m (MempoolAddTxResult blk)
implAddTx StrictTVar m (InternalState blk)
istate MVar m ()
remoteFifo MVar m ()
allFifo LedgerConfig blk
cfg Tracer m (TraceEventMempool blk)
trcr
    , removeTxs :: [GenTxId blk] -> m ()
removeTxs      = MempoolEnv m blk -> [GenTxId blk] -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> [GenTxId blk] -> m ()
implRemoveTxs MempoolEnv m blk
mpEnv
    , syncWithLedger :: m (MempoolSnapshot blk)
syncWithLedger = MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv
    , getSnapshot :: STM m (MempoolSnapshot blk)
getSnapshot    = InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS (InternalState blk -> MempoolSnapshot blk)
-> STM m (InternalState blk) -> STM m (MempoolSnapshot blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
    , getSnapshotFor :: ForgeLedgerState blk -> STM m (MempoolSnapshot blk)
getSnapshotFor = \ForgeLedgerState blk
fls -> LedgerConfig blk
-> ForgeLedgerState blk
-> MempoolCapacityBytesOverride
-> InternalState blk
-> MempoolSnapshot blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
LedgerConfig blk
-> ForgeLedgerState blk
-> MempoolCapacityBytesOverride
-> InternalState blk
-> MempoolSnapshot blk
pureGetSnapshotFor LedgerConfig blk
cfg ForgeLedgerState blk
fls MempoolCapacityBytesOverride
co (InternalState blk -> MempoolSnapshot blk)
-> STM m (InternalState blk) -> STM m (MempoolSnapshot blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
    , getCapacity :: STM m (TxMeasure blk)
getCapacity    = InternalState blk -> TxMeasure blk
forall blk. InternalState blk -> TxMeasure blk
isCapacity (InternalState blk -> TxMeasure blk)
-> STM m (InternalState blk) -> STM m (TxMeasure blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
    }
   where MempoolEnv { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTVar m (InternalState blk)
mpEnvStateVar = StrictTVar m (InternalState blk)
istate
                    , mpEnvAddTxsRemoteFifo :: forall (m :: * -> *) blk. MempoolEnv m blk -> MVar m ()
mpEnvAddTxsRemoteFifo = MVar m ()
remoteFifo
                    , mpEnvAddTxsAllFifo :: forall (m :: * -> *) blk. MempoolEnv m blk -> MVar m ()
mpEnvAddTxsAllFifo = MVar m ()
allFifo
                    , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerConfig blk
cfg
                    , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
                    , mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
co
                    } = MempoolEnv m blk
mpEnv