{-# LANGUAGE FlexibleContexts #-}

-- | Operations that update the mempool. They are internally divided in the pure
-- and impure sides of the operation.
module Ouroboros.Consensus.Mempool.Update (
    implAddTx
  , implRemoveTxs
  , implSyncWithLedger
  ) where

import           Control.Concurrent.Class.MonadMVar (MVar, withMVar)
import           Control.Exception (assert)
import           Control.Monad.Except (runExcept)
import           Control.Tracer
import           Data.Maybe (isJust)
import qualified Data.Measure as Measure
import qualified Data.Set as Set
import           Ouroboros.Consensus.HeaderValidation
import           Ouroboros.Consensus.Ledger.Abstract
import           Ouroboros.Consensus.Ledger.SupportsMempool
import           Ouroboros.Consensus.Mempool.API
import           Ouroboros.Consensus.Mempool.Capacity
import           Ouroboros.Consensus.Mempool.Impl.Common
import           Ouroboros.Consensus.Mempool.TxSeq (TxTicket (..))
import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq
import           Ouroboros.Consensus.Util (whenJust)
import           Ouroboros.Consensus.Util.IOLike hiding (withMVar)

{-------------------------------------------------------------------------------
  Add transactions
-------------------------------------------------------------------------------}

-- | Add a single transaction to the mempool, blocking if there is no space.
--
implAddTx ::
     ( MonadSTM m
     , MonadMVar m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     )
  => StrictTVar m (InternalState blk)
     -- ^ The InternalState TVar.
  -> MVar m ()
      -- ^ The FIFO for remote peers
  -> MVar m ()
      -- ^ The FIFO for all remote peers and local clients
  -> LedgerConfig blk
     -- ^ The configuration of the ledger.
  -> Tracer m (TraceEventMempool blk)
  -> AddTxOnBehalfOf
     -- ^ Whether we're acting on behalf of a remote peer or a local client.
  -> GenTx blk
     -- ^ The transaction to add to the mempool.
  -> m (MempoolAddTxResult blk)
implAddTx :: forall (m :: * -> *) blk.
(MonadSTM m, MonadMVar m, LedgerSupportsMempool blk,
 HasTxId (GenTx blk)) =>
StrictTVar m (InternalState blk)
-> MVar m ()
-> MVar m ()
-> LedgerConfig blk
-> Tracer m (TraceEventMempool blk)
-> AddTxOnBehalfOf
-> GenTx blk
-> m (MempoolAddTxResult blk)
implAddTx StrictTVar m (InternalState blk)
istate MVar m ()
remoteFifo MVar m ()
allFifo LedgerCfg (LedgerState blk)
cfg Tracer m (TraceEventMempool blk)
trcr AddTxOnBehalfOf
onbehalf GenTx blk
tx =
    -- To ensure fair behaviour between threads that are trying to add
    -- transactions, we make them all queue in a fifo. Only the one at the head
    -- of the queue gets to actually wait for space to get freed up in the
    -- mempool. This avoids small transactions repeatedly squeezing in ahead of
    -- larger transactions.
    --
    -- The fifo behaviour is implemented using a simple MVar. And take this
    -- MVar lock on a transaction by transaction basis. So if several threads
    -- are each trying to add several transactions, then they'll interleave at
    -- transaction granularity, not batches of transactions.
    --
    -- To add back in a bit of deliberate unfairness, we want to prioritise
    -- transactions being added on behalf of local clients, over ones being
    -- added on behalf of remote peers. We do this by using a pair of mvar
    -- fifos: remote peers must wait on both mvars, while local clients only
    -- need to wait on the second.
    case AddTxOnBehalfOf
onbehalf of
      AddTxOnBehalfOf
AddTxForRemotePeer ->
        MVar m ()
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. MVar m a -> (a -> m b) -> m b
forall (m :: * -> *) a b.
MonadMVar m =>
MVar m a -> (a -> m b) -> m b
withMVar MVar m ()
remoteFifo ((() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk))
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> a -> b
$ \() ->
        MVar m ()
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. MVar m a -> (a -> m b) -> m b
forall (m :: * -> *) a b.
MonadMVar m =>
MVar m a -> (a -> m b) -> m b
withMVar MVar m ()
allFifo ((() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk))
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> a -> b
$ \() ->
          -- This action can also block. Holding the MVars means
          -- there is only a single such thread blocking at once.
          m (MempoolAddTxResult blk)
implAddTx'

      AddTxOnBehalfOf
AddTxForLocalClient ->
        MVar m ()
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. MVar m a -> (a -> m b) -> m b
forall (m :: * -> *) a b.
MonadMVar m =>
MVar m a -> (a -> m b) -> m b
withMVar MVar m ()
allFifo ((() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk))
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> a -> b
$ \() ->
          -- As above but skip the first MVar fifo so we will get
          -- service sooner if there's lots of other remote
          -- threads waiting.
          m (MempoolAddTxResult blk)
implAddTx'
  where
    implAddTx' :: m (MempoolAddTxResult blk)
implAddTx' = do
      (MempoolAddTxResult blk
result, TraceEventMempool blk
ev) <- STM m (MempoolAddTxResult blk, TraceEventMempool blk)
-> m (MempoolAddTxResult blk, TraceEventMempool blk)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (MempoolAddTxResult blk, TraceEventMempool blk)
 -> m (MempoolAddTxResult blk, TraceEventMempool blk))
-> STM m (MempoolAddTxResult blk, TraceEventMempool blk)
-> m (MempoolAddTxResult blk, TraceEventMempool blk)
forall a b. (a -> b) -> a -> b
$ do
        TryAddTx blk
outcome <- StrictTVar m (InternalState blk)
-> LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> STM m (TryAddTx blk)
forall (m :: * -> *) blk.
(MonadSTM m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
StrictTVar m (InternalState blk)
-> LedgerConfig blk
-> WhetherToIntervene
-> GenTx blk
-> STM m (TryAddTx blk)
implTryAddTx StrictTVar m (InternalState blk)
istate LedgerCfg (LedgerState blk)
cfg
                                (AddTxOnBehalfOf -> WhetherToIntervene
whetherToIntervene AddTxOnBehalfOf
onbehalf)
                                GenTx blk
tx
        case TryAddTx blk
outcome of
          TryAddTx Maybe (InternalState blk)
_ MempoolAddTxResult blk
result TraceEventMempool blk
ev -> do (MempoolAddTxResult blk, TraceEventMempool blk)
-> STM m (MempoolAddTxResult blk, TraceEventMempool blk)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MempoolAddTxResult blk
result, TraceEventMempool blk
ev)

          -- or block until space is available to fit the next transaction
          TryAddTx blk
NotEnoughSpaceLeft   -> STM m (MempoolAddTxResult blk, TraceEventMempool blk)
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry

      Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr TraceEventMempool blk
ev
      MempoolAddTxResult blk -> m (MempoolAddTxResult blk)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MempoolAddTxResult blk
result

    whetherToIntervene :: AddTxOnBehalfOf -> WhetherToIntervene
    whetherToIntervene :: AddTxOnBehalfOf -> WhetherToIntervene
whetherToIntervene AddTxOnBehalfOf
AddTxForRemotePeer  = WhetherToIntervene
DoNotIntervene
    whetherToIntervene AddTxOnBehalfOf
AddTxForLocalClient = WhetherToIntervene
Intervene

-- | Result of trying to add a transaction to the mempool.
data TryAddTx blk =
    -- | Adding the next transaction would put the mempool over capacity.
    NotEnoughSpaceLeft
    -- | A transaction was processed.
  | TryAddTx
      (Maybe (InternalState blk))
      -- ^ If the transaction was accepted, the new state that can be written to
      -- the TVar.
      (MempoolAddTxResult blk)
      -- ^ The result of trying to add the transaction to the mempool.
      (TraceEventMempool blk)
      -- ^ The event emitted by the operation.

-- | Add a single transaction by interpreting a 'TryAddTx' from 'pureTryAddTx'.
--
-- This function returns whether the transaction was added or rejected, or if
-- the Mempool capacity is reached. See 'implAddTx' for a function that blocks
-- in case the Mempool capacity is reached.
--
-- Transactions are added one by one, updating the Mempool each time one was
-- added successfully.
--
-- See the necessary invariants on the Haddock for 'API.addTxs'.
--
-- This function does not sync the Mempool contents with the ledger state in
-- case the latter changes, it relies on the background thread to do that.
--
-- INVARIANT: The code needs that read and writes on the state are coupled
-- together or inconsistencies will arise. To ensure that STM transactions are
-- short, each iteration of the helper function is a separate STM transaction.
implTryAddTx ::
     ( MonadSTM m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     )
  => StrictTVar m (InternalState blk)
     -- ^ The InternalState TVar.
  -> LedgerConfig blk
     -- ^ The configuration of the ledger.
  -> WhetherToIntervene
  -> GenTx blk
     -- ^ The transaction to add to the mempool.
  -> STM m (TryAddTx blk)
implTryAddTx :: forall (m :: * -> *) blk.
(MonadSTM m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
StrictTVar m (InternalState blk)
-> LedgerConfig blk
-> WhetherToIntervene
-> GenTx blk
-> STM m (TryAddTx blk)
implTryAddTx StrictTVar m (InternalState blk)
istate LedgerCfg (LedgerState blk)
cfg WhetherToIntervene
wti GenTx blk
tx = do
        InternalState blk
is <- StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
        let outcome :: TryAddTx blk
outcome = LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> InternalState blk
-> TryAddTx blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> InternalState blk
-> TryAddTx blk
pureTryAddTx LedgerCfg (LedgerState blk)
cfg WhetherToIntervene
wti GenTx blk
tx InternalState blk
is
        case TryAddTx blk
outcome of
          TryAddTx (Just InternalState blk
is') MempoolAddTxResult blk
_ TraceEventMempool blk
_ -> StrictTVar m (InternalState blk) -> InternalState blk -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (InternalState blk)
istate InternalState blk
is'
          TryAddTx Maybe (InternalState blk)
Nothing    MempoolAddTxResult blk
_ TraceEventMempool blk
_ -> () -> STM m ()
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          TryAddTx blk
NotEnoughSpaceLeft      -> () -> STM m ()
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        TryAddTx blk -> STM m (TryAddTx blk)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return TryAddTx blk
outcome

-- | See the documentation of 'implTryAddTx' for some more context.
pureTryAddTx ::
     ( LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     )
  => LedgerCfg (LedgerState blk)
     -- ^ The ledger configuration.
  -> WhetherToIntervene
  -> GenTx blk
     -- ^ The transaction to add to the mempool.
  -> InternalState blk
     -- ^ The current internal state of the mempool.
  -> TryAddTx blk
pureTryAddTx :: forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> InternalState blk
-> TryAddTx blk
pureTryAddTx LedgerCfg (LedgerState blk)
cfg WhetherToIntervene
wti GenTx blk
tx InternalState blk
is =
  case Except (ApplyTxErr blk) (TxMeasure blk)
-> Either (ApplyTxErr blk) (TxMeasure blk)
forall e a. Except e a -> Either e a
runExcept (Except (ApplyTxErr blk) (TxMeasure blk)
 -> Either (ApplyTxErr blk) (TxMeasure blk))
-> Except (ApplyTxErr blk) (TxMeasure blk)
-> Either (ApplyTxErr blk) (TxMeasure blk)
forall a b. (a -> b) -> a -> b
$ LedgerCfg (LedgerState blk)
-> TickedLedgerState blk
-> GenTx blk
-> Except (ApplyTxErr blk) (TxMeasure blk)
forall blk.
TxLimits blk =>
LedgerConfig blk
-> TickedLedgerState blk
-> GenTx blk
-> Except (ApplyTxErr blk) (TxMeasure blk)
txMeasure LedgerCfg (LedgerState blk)
cfg (InternalState blk -> TickedLedgerState blk
forall blk. InternalState blk -> TickedLedgerState blk
isLedgerState InternalState blk
is) GenTx blk
tx of
    Left ApplyTxErr blk
err ->
      -- The transaction does not have a valid measure (eg its ExUnits is
      -- greater than what this ledger state allows for a single transaction).
      --
      -- It might seem simpler to remove the failure case from 'txMeasure' and
      -- simply fully validate the tx before determining whether it'd fit in
      -- the mempool; that way we could reject invalid txs ASAP. However, for a
      -- valid tx, we'd pay that validation cost every time the node's
      -- selection changed, even if the tx wouldn't fit. So it'd very much be
      -- as if the mempool were effectively over capacity! What's worse, each
      -- attempt would not be using 'extendVRPrevApplied'.
      Maybe (InternalState blk)
-> MempoolAddTxResult blk -> TraceEventMempool blk -> TryAddTx blk
forall blk.
Maybe (InternalState blk)
-> MempoolAddTxResult blk -> TraceEventMempool blk -> TryAddTx blk
TryAddTx
        Maybe (InternalState blk)
forall a. Maybe a
Nothing
        (GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
forall blk. GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
MempoolTxRejected GenTx blk
tx ApplyTxErr blk
err)
        (GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
forall blk.
GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
TraceMempoolRejectedTx
         GenTx blk
tx
         ApplyTxErr blk
err
         (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is)
        )
    Right TxMeasure blk
txsz
      -- Check for overflow
      --
      -- No measure of a transaction can ever be negative, so the only way
      -- adding two measures could result in a smaller measure is if some
      -- modular arithmetic overflowed. Also, overflow necessarily yields a
      -- lesser result, since adding 'maxBound' is modularly equivalent to
      -- subtracting one. Recall that we're checking each individual addition.
      --
      -- We assume that the 'txMeasure' limit and the mempool capacity
      -- 'isCapacity' are much smaller than the modulus, and so this should
      -- never happen. Despite that, blocking until adding the transaction
      -- doesn't overflow seems like a reasonable way to handle this case.
     | Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ TxMeasure blk
currentSize TxMeasure blk -> TxMeasure blk -> Bool
forall a. Measure a => a -> a -> Bool
Measure.<= TxMeasure blk
currentSize TxMeasure blk -> TxMeasure blk -> TxMeasure blk
forall a. Measure a => a -> a -> a
`Measure.plus` TxMeasure blk
txsz
     ->
       TryAddTx blk
forall blk. TryAddTx blk
NotEnoughSpaceLeft
      -- We add the transaction if and only if it wouldn't overrun any component
      -- of the mempool capacity.
      --
      -- In the past, this condition was instead @TxSeq.toSize (isTxs is) <
      -- isCapacity is@. Thus the effective capacity of the mempool was
      -- actually one increment less than the reported capacity plus one
      -- transaction. That subtlety's cost paid for two benefits.
      --
      -- First, the absence of addition avoids a risk of overflow, since the
      -- transaction's sizes (eg ExUnits) have not yet been bounded by
      -- validation (which presumably enforces a low enough bound that any
      -- reasonably-sized mempool would never overflow the representation's
      -- 'maxBound').
      --
      -- Second, it is more fair, since it does not depend on the transaction
      -- at all. EG a large transaction might struggle to win the race against
      -- a firehose of tiny transactions.
      --
      -- However, we prefer to avoid the subtlety. Overflow is handled by the
      -- previous guard. And fairness is already ensured elsewhere (the 'MVar's
      -- in 'implAddTx' --- which the "Test.Consensus.Mempool.Fairness" test
      -- exercises). Moreover, the notion of "is under capacity" becomes
      -- difficult to assess independently of the pending tx when the measure
      -- is multi-dimensional; both typical options (any component is not full
      -- or every component is not full) lead to some confusing behaviors
      -- (denying some txs that would "obviously" fit and accepting some txs
      -- that "obviously" don't, respectively).
      --
      -- Even with the overflow handler, it's important that 'txMeasure'
      -- returns a well-bounded result. Otherwise, if an adversarial tx arrived
      -- that could't even fit in an empty mempool, then that thread would
      -- never release the 'MVar'. In particular, we tacitly assume here that a
      -- tx that wouldn't even fit in an empty mempool would be rejected by
      -- 'txMeasure'.
      | Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ TxMeasure blk
currentSize TxMeasure blk -> TxMeasure blk -> TxMeasure blk
forall a. Measure a => a -> a -> a
`Measure.plus` TxMeasure blk
txsz TxMeasure blk -> TxMeasure blk -> Bool
forall a. Measure a => a -> a -> Bool
Measure.<= InternalState blk -> TxMeasure blk
forall blk. InternalState blk -> TxMeasure blk
isCapacity InternalState blk
is
      ->
        TryAddTx blk
forall blk. TryAddTx blk
NotEnoughSpaceLeft
      | Bool
otherwise
      ->
        case LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> ValidationResult (GenTx blk) blk
-> Either
     (ApplyTxErr blk)
     (Validated (GenTx blk), ValidationResult (GenTx blk) blk)
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
LedgerConfig blk
-> WhetherToIntervene
-> GenTx blk
-> ValidationResult (GenTx blk) blk
-> Either
     (ApplyTxErr blk)
     (Validated (GenTx blk), ValidationResult (GenTx blk) blk)
extendVRNew LedgerCfg (LedgerState blk)
cfg WhetherToIntervene
wti GenTx blk
tx (ValidationResult (GenTx blk) blk
 -> Either
      (ApplyTxErr blk)
      (Validated (GenTx blk), ValidationResult (GenTx blk) blk))
-> ValidationResult (GenTx blk) blk
-> Either
     (ApplyTxErr blk)
     (Validated (GenTx blk), ValidationResult (GenTx blk) blk)
forall a b. (a -> b) -> a -> b
$ InternalState blk -> ValidationResult (GenTx blk) blk
forall blk invalidTx.
InternalState blk -> ValidationResult invalidTx blk
validationResultFromIS InternalState blk
is of
          Left ApplyTxErr blk
err ->
            Maybe (InternalState blk)
-> MempoolAddTxResult blk -> TraceEventMempool blk -> TryAddTx blk
forall blk.
Maybe (InternalState blk)
-> MempoolAddTxResult blk -> TraceEventMempool blk -> TryAddTx blk
TryAddTx
              Maybe (InternalState blk)
forall a. Maybe a
Nothing
              (GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
forall blk. GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
MempoolTxRejected GenTx blk
tx ApplyTxErr blk
err)
              (GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
forall blk.
GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
TraceMempoolRejectedTx
               GenTx blk
tx
               ApplyTxErr blk
err
               (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is)
              )
          Right (Validated (GenTx blk)
vtx, ValidationResult (GenTx blk) blk
vr) ->
            let is' :: InternalState blk
is' = ValidationResult (GenTx blk) blk -> InternalState blk
forall invalidTx blk.
ValidationResult invalidTx blk -> InternalState blk
internalStateFromVR ValidationResult (GenTx blk) blk
vr
            in
            Bool -> TryAddTx blk -> TryAddTx blk
forall a. HasCallStack => Bool -> a -> a
assert (Maybe (Validated (GenTx blk)) -> Bool
forall a. Maybe a -> Bool
isJust (ValidationResult (GenTx blk) blk -> Maybe (Validated (GenTx blk))
forall invalidTx blk.
ValidationResult invalidTx blk -> Maybe (Validated (GenTx blk))
vrNewValid ValidationResult (GenTx blk) blk
vr)) (TryAddTx blk -> TryAddTx blk) -> TryAddTx blk -> TryAddTx blk
forall a b. (a -> b) -> a -> b
$
              Maybe (InternalState blk)
-> MempoolAddTxResult blk -> TraceEventMempool blk -> TryAddTx blk
forall blk.
Maybe (InternalState blk)
-> MempoolAddTxResult blk -> TraceEventMempool blk -> TryAddTx blk
TryAddTx
                (InternalState blk -> Maybe (InternalState blk)
forall a. a -> Maybe a
Just InternalState blk
is')
                (Validated (GenTx blk) -> MempoolAddTxResult blk
forall blk. Validated (GenTx blk) -> MempoolAddTxResult blk
MempoolTxAdded Validated (GenTx blk)
vtx)
                (Validated (GenTx blk)
-> MempoolSize -> MempoolSize -> TraceEventMempool blk
forall blk.
Validated (GenTx blk)
-> MempoolSize -> MempoolSize -> TraceEventMempool blk
TraceMempoolAddedTx
                  Validated (GenTx blk)
vtx
                  (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is)
                  (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is')
                )
  where
    currentSize :: TxMeasure blk
currentSize = TxSeq (TxMeasure blk) (Validated (GenTx blk)) -> TxMeasure blk
forall sz tx. Measure sz => TxSeq sz tx -> sz
TxSeq.toSize (InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
forall blk.
InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
isTxs InternalState blk
is)

{-------------------------------------------------------------------------------
  Remove transactions
-------------------------------------------------------------------------------}

-- | A datatype containing the state resulting after removing the requested
-- transactions from the mempool and maybe a message to be traced while removing
-- them.
data RemoveTxs blk =
    WriteRemoveTxs (InternalState blk) (Maybe (TraceEventMempool blk))

-- | See 'Ouroboros.Consensus.Mempool.API.removeTxs'.
implRemoveTxs ::
     ( IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => MempoolEnv m blk
  -> [GenTxId blk]
  -> m ()
implRemoveTxs :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> [GenTxId blk] -> m ()
implRemoveTxs MempoolEnv m blk
menv [TxId (GenTx blk)]
txs
  | [TxId (GenTx blk)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [TxId (GenTx blk)]
txs = () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  | Bool
otherwise = do
    Maybe (TraceEventMempool blk)
tr <- STM m (Maybe (TraceEventMempool blk))
-> m (Maybe (TraceEventMempool blk))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (TraceEventMempool blk))
 -> m (Maybe (TraceEventMempool blk)))
-> STM m (Maybe (TraceEventMempool blk))
-> m (Maybe (TraceEventMempool blk))
forall a b. (a -> b) -> a -> b
$ do
        InternalState blk
is <- StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
        LedgerState blk
ls <- LedgerInterface m blk -> STM m (LedgerState blk)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk)
getCurrentLedgerState LedgerInterface m blk
ldgrInterface
        let WriteRemoveTxs InternalState blk
is' Maybe (TraceEventMempool blk)
t = LedgerCfg (LedgerState blk)
-> MempoolCapacityBytesOverride
-> [TxId (GenTx blk)]
-> InternalState blk
-> LedgerState blk
-> RemoveTxs blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
LedgerConfig blk
-> MempoolCapacityBytesOverride
-> [GenTxId blk]
-> InternalState blk
-> LedgerState blk
-> RemoveTxs blk
pureRemoveTxs LedgerCfg (LedgerState blk)
cfg MempoolCapacityBytesOverride
co [TxId (GenTx blk)]
txs InternalState blk
is LedgerState blk
ls
        StrictTVar m (InternalState blk) -> InternalState blk -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (InternalState blk)
istate InternalState blk
is'
        Maybe (TraceEventMempool blk)
-> STM m (Maybe (TraceEventMempool blk))
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (TraceEventMempool blk)
t
    Maybe (TraceEventMempool blk)
-> (TraceEventMempool blk -> m ()) -> m ()
forall (f :: * -> *) a.
Applicative f =>
Maybe a -> (a -> f ()) -> f ()
whenJust Maybe (TraceEventMempool blk)
tr (Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr)
  where
    MempoolEnv { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTVar m (InternalState blk)
mpEnvStateVar = StrictTVar m (InternalState blk)
istate
               , mpEnvLedger :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger = LedgerInterface m blk
ldgrInterface
               , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
               , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerCfg (LedgerState blk)
cfg
               , mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
co
               } = MempoolEnv m blk
menv

-- | Craft a 'RemoveTxs' that manually removes the given transactions from the
-- mempool, returning inside it an updated InternalState.
pureRemoveTxs ::
     ( LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => LedgerConfig blk
  -> MempoolCapacityBytesOverride
  -> [GenTxId blk]
  -> InternalState blk
  -> LedgerState blk
  -> RemoveTxs blk
pureRemoveTxs :: forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
LedgerConfig blk
-> MempoolCapacityBytesOverride
-> [GenTxId blk]
-> InternalState blk
-> LedgerState blk
-> RemoveTxs blk
pureRemoveTxs LedgerCfg (LedgerState blk)
cfg MempoolCapacityBytesOverride
capacityOverride [TxId (GenTx blk)]
txIds InternalState blk
is LedgerState blk
lstate =
    -- Filtering is O(n), but this function will rarely be used, as it is an
    -- escape hatch when there's an inconsistency between the ledger and the
    -- mempool.
    let toRemove :: Set (TxId (GenTx blk))
toRemove       = [TxId (GenTx blk)] -> Set (TxId (GenTx blk))
forall a. Ord a => [a] -> Set a
Set.fromList [TxId (GenTx blk)]
txIds
        txTickets' :: [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
txTickets'     = (TxTicket (TxMeasure blk) (Validated (GenTx blk)) -> Bool)
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall a. (a -> Bool) -> [a] -> [a]
filter
                           (   (TxId (GenTx blk) -> Set (TxId (GenTx blk)) -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` Set (TxId (GenTx blk))
toRemove)
                             (TxId (GenTx blk) -> Bool)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk))
    -> TxId (GenTx blk))
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenTx blk -> TxId (GenTx blk)
forall tx. HasTxId tx => tx -> TxId tx
txId
                             (GenTx blk -> TxId (GenTx blk))
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk)) -> GenTx blk)
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> TxId (GenTx blk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Validated (GenTx blk) -> GenTx blk
forall blk.
LedgerSupportsMempool blk =>
Validated (GenTx blk) -> GenTx blk
txForgetValidated
                             (Validated (GenTx blk) -> GenTx blk)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk))
    -> Validated (GenTx blk))
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> GenTx blk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> Validated (GenTx blk)
forall sz tx. TxTicket sz tx -> tx
txTicketTx
                           )
                           (TxSeq (TxMeasure blk) (Validated (GenTx blk))
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall sz tx. TxSeq sz tx -> [TxTicket sz tx]
TxSeq.toList (InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
forall blk.
InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
isTxs InternalState blk
is))
        (SlotNo
slot, Ticked (LedgerState blk)
ticked) = LedgerCfg (LedgerState blk)
-> ForgeLedgerState blk -> (SlotNo, Ticked (LedgerState blk))
forall blk.
(UpdateLedger blk, ValidateEnvelope blk) =>
LedgerConfig blk
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk)
tickLedgerState LedgerCfg (LedgerState blk)
cfg (LedgerState blk -> ForgeLedgerState blk
forall blk. LedgerState blk -> ForgeLedgerState blk
ForgeInUnknownSlot LedgerState blk
lstate)
        vr :: ValidationResult (Validated (GenTx blk)) blk
vr             = MempoolCapacityBytesOverride
-> LedgerCfg (LedgerState blk)
-> SlotNo
-> Ticked (LedgerState blk)
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> ValidationResult (Validated (GenTx blk)) blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> ValidationResult (Validated (GenTx blk)) blk
revalidateTxsFor
                           MempoolCapacityBytesOverride
capacityOverride
                           LedgerCfg (LedgerState blk)
cfg
                           SlotNo
slot
                           Ticked (LedgerState blk)
ticked
                           (InternalState blk -> TicketNo
forall blk. InternalState blk -> TicketNo
isLastTicketNo InternalState blk
is)
                           [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
txTickets'
        is' :: InternalState blk
is'            = ValidationResult (Validated (GenTx blk)) blk -> InternalState blk
forall invalidTx blk.
ValidationResult invalidTx blk -> InternalState blk
internalStateFromVR ValidationResult (Validated (GenTx blk)) blk
vr
        needsTrace :: Maybe (TraceEventMempool blk)
needsTrace     = if [TxId (GenTx blk)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [TxId (GenTx blk)]
txIds
                         then
                           Maybe (TraceEventMempool blk)
forall a. Maybe a
Nothing
                         else
                           TraceEventMempool blk -> Maybe (TraceEventMempool blk)
forall a. a -> Maybe a
Just (TraceEventMempool blk -> Maybe (TraceEventMempool blk))
-> TraceEventMempool blk -> Maybe (TraceEventMempool blk)
forall a b. (a -> b) -> a -> b
$ [TxId (GenTx blk)]
-> [Validated (GenTx blk)] -> MempoolSize -> TraceEventMempool blk
forall blk.
[GenTxId blk]
-> [Validated (GenTx blk)] -> MempoolSize -> TraceEventMempool blk
TraceMempoolManuallyRemovedTxs
                             [TxId (GenTx blk)]
txIds
                             (((Validated (GenTx blk), ApplyTxErr blk) -> Validated (GenTx blk))
-> [(Validated (GenTx blk), ApplyTxErr blk)]
-> [Validated (GenTx blk)]
forall a b. (a -> b) -> [a] -> [b]
map (Validated (GenTx blk), ApplyTxErr blk) -> Validated (GenTx blk)
forall a b. (a, b) -> a
fst (ValidationResult (Validated (GenTx blk)) blk
-> [(Validated (GenTx blk), ApplyTxErr blk)]
forall invalidTx blk.
ValidationResult invalidTx blk -> [(invalidTx, ApplyTxErr blk)]
vrInvalid ValidationResult (Validated (GenTx blk)) blk
vr))
                             (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is')
    in InternalState blk -> Maybe (TraceEventMempool blk) -> RemoveTxs blk
forall blk.
InternalState blk -> Maybe (TraceEventMempool blk) -> RemoveTxs blk
WriteRemoveTxs InternalState blk
is' Maybe (TraceEventMempool blk)
needsTrace

{-------------------------------------------------------------------------------
  Sync with ledger
-------------------------------------------------------------------------------}

-- | A datatype containing the new state produced by syncing with the Ledger, a
-- snapshot of that mempool state and, if needed, a tracing message.
data SyncWithLedger blk =
    NewSyncedState (InternalState blk)
                   (MempoolSnapshot blk)
                   (Maybe (TraceEventMempool blk))

-- | See 'Ouroboros.Consensus.Mempool.API.syncWithLedger'.
implSyncWithLedger ::
     (
       IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => MempoolEnv m blk
  -> m (MempoolSnapshot blk)
implSyncWithLedger :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
menv = do
    (Maybe (TraceEventMempool blk)
mTrace, MempoolSnapshot blk
mp) <- STM m (Maybe (TraceEventMempool blk), MempoolSnapshot blk)
-> m (Maybe (TraceEventMempool blk), MempoolSnapshot blk)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (TraceEventMempool blk), MempoolSnapshot blk)
 -> m (Maybe (TraceEventMempool blk), MempoolSnapshot blk))
-> STM m (Maybe (TraceEventMempool blk), MempoolSnapshot blk)
-> m (Maybe (TraceEventMempool blk), MempoolSnapshot blk)
forall a b. (a -> b) -> a -> b
$ do
      InternalState blk
is <- StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
      LedgerState blk
ls <- LedgerInterface m blk -> STM m (LedgerState blk)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk)
getCurrentLedgerState LedgerInterface m blk
ldgrInterface
      let NewSyncedState InternalState blk
is' MempoolSnapshot blk
msp Maybe (TraceEventMempool blk)
mTrace = InternalState blk
-> LedgerState blk
-> LedgerCfg (LedgerState blk)
-> MempoolCapacityBytesOverride
-> SyncWithLedger blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
InternalState blk
-> LedgerState blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> SyncWithLedger blk
pureSyncWithLedger InternalState blk
is LedgerState blk
ls LedgerCfg (LedgerState blk)
cfg MempoolCapacityBytesOverride
co
      StrictTVar m (InternalState blk) -> InternalState blk -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (InternalState blk)
istate InternalState blk
is'
      (Maybe (TraceEventMempool blk), MempoolSnapshot blk)
-> STM m (Maybe (TraceEventMempool blk), MempoolSnapshot blk)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (TraceEventMempool blk)
mTrace, MempoolSnapshot blk
msp)
    Maybe (TraceEventMempool blk)
-> (TraceEventMempool blk -> m ()) -> m ()
forall (f :: * -> *) a.
Applicative f =>
Maybe a -> (a -> f ()) -> f ()
whenJust Maybe (TraceEventMempool blk)
mTrace (Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr)
    MempoolSnapshot blk -> m (MempoolSnapshot blk)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MempoolSnapshot blk
mp
  where
    MempoolEnv { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTVar m (InternalState blk)
mpEnvStateVar = StrictTVar m (InternalState blk)
istate
               , mpEnvLedger :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger = LedgerInterface m blk
ldgrInterface
               , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
               , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerCfg (LedgerState blk)
cfg
               , mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
co
               } = MempoolEnv m blk
menv

-- | Create a 'SyncWithLedger' value representing the values that will need to
-- be stored for committing this synchronization with the Ledger.
--
-- See the documentation of 'runSyncWithLedger' for more context.
pureSyncWithLedger ::
     (LedgerSupportsMempool blk, HasTxId (GenTx blk), ValidateEnvelope blk)
  => InternalState blk
  -> LedgerState blk
  -> LedgerConfig blk
  -> MempoolCapacityBytesOverride
  -> SyncWithLedger blk
pureSyncWithLedger :: forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
InternalState blk
-> LedgerState blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> SyncWithLedger blk
pureSyncWithLedger InternalState blk
istate LedgerState blk
lstate LedgerCfg (LedgerState blk)
lcfg MempoolCapacityBytesOverride
capacityOverride =
    let vr :: ValidationResult (Validated (GenTx blk)) blk
vr          = MempoolCapacityBytesOverride
-> LedgerCfg (LedgerState blk)
-> ForgeLedgerState blk
-> InternalState blk
-> ValidationResult (Validated (GenTx blk)) blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> ForgeLedgerState blk
-> InternalState blk
-> ValidationResult (Validated (GenTx blk)) blk
validateStateFor
                        MempoolCapacityBytesOverride
capacityOverride
                        LedgerCfg (LedgerState blk)
lcfg
                        (LedgerState blk -> ForgeLedgerState blk
forall blk. LedgerState blk -> ForgeLedgerState blk
ForgeInUnknownSlot LedgerState blk
lstate)
                        InternalState blk
istate
        removed :: [(Validated (GenTx blk), ApplyTxErr blk)]
removed     = ValidationResult (Validated (GenTx blk)) blk
-> [(Validated (GenTx blk), ApplyTxErr blk)]
forall invalidTx blk.
ValidationResult invalidTx blk -> [(invalidTx, ApplyTxErr blk)]
vrInvalid ValidationResult (Validated (GenTx blk)) blk
vr
        istate' :: InternalState blk
istate'     = ValidationResult (Validated (GenTx blk)) blk -> InternalState blk
forall invalidTx blk.
ValidationResult invalidTx blk -> InternalState blk
internalStateFromVR ValidationResult (Validated (GenTx blk)) blk
vr
        mTrace :: Maybe (TraceEventMempool blk)
mTrace      = if [(Validated (GenTx blk), ApplyTxErr blk)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(Validated (GenTx blk), ApplyTxErr blk)]
removed
                      then
                        Maybe (TraceEventMempool blk)
forall a. Maybe a
Nothing
                      else
                        TraceEventMempool blk -> Maybe (TraceEventMempool blk)
forall a. a -> Maybe a
Just (TraceEventMempool blk -> Maybe (TraceEventMempool blk))
-> TraceEventMempool blk -> Maybe (TraceEventMempool blk)
forall a b. (a -> b) -> a -> b
$ [(Validated (GenTx blk), ApplyTxErr blk)]
-> MempoolSize -> TraceEventMempool blk
forall blk.
[(Validated (GenTx blk), ApplyTxErr blk)]
-> MempoolSize -> TraceEventMempool blk
TraceMempoolRemoveTxs [(Validated (GenTx blk), ApplyTxErr blk)]
removed (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
istate')
        snapshot :: MempoolSnapshot blk
snapshot    = InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS InternalState blk
istate'
    in
      InternalState blk
-> MempoolSnapshot blk
-> Maybe (TraceEventMempool blk)
-> SyncWithLedger blk
forall blk.
InternalState blk
-> MempoolSnapshot blk
-> Maybe (TraceEventMempool blk)
-> SyncWithLedger blk
NewSyncedState InternalState blk
istate' MempoolSnapshot blk
snapshot Maybe (TraceEventMempool blk)
mTrace