{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | Operations that update the mempool. They are internally divided in the pure
-- and impure sides of the operation.
module Ouroboros.Consensus.Mempool.Update (
    implAddTx
  , implRemoveTxsEvenIfValid
  , implSyncWithLedger
  ) where

import           Cardano.Slotting.Slot
import           Control.Concurrent.Class.MonadMVar (withMVar)
import           Control.Monad (void)
import           Control.Monad.Except (runExcept)
import           Control.Tracer
import qualified Data.Foldable as Foldable
import           Data.Functor.Contravariant ((>$<))
import qualified Data.List.NonEmpty as NE
import           Data.Maybe (fromMaybe)
import qualified Data.Measure as Measure
import qualified Data.Set as Set
import           Data.Void
import           Ouroboros.Consensus.HeaderValidation
import           Ouroboros.Consensus.Ledger.Abstract
import           Ouroboros.Consensus.Ledger.SupportsMempool
import           Ouroboros.Consensus.Mempool.API
import           Ouroboros.Consensus.Mempool.Capacity
import           Ouroboros.Consensus.Mempool.Impl.Common
import           Ouroboros.Consensus.Mempool.TxSeq (TxTicket (..))
import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq
import           Ouroboros.Consensus.Util (whenJust)
import           Ouroboros.Consensus.Util.Enclose
import           Ouroboros.Consensus.Util.IOLike hiding (withMVar)
import           Ouroboros.Consensus.Util.STM
import           Ouroboros.Network.Block

{-------------------------------------------------------------------------------
  Add transactions
-------------------------------------------------------------------------------}

-- | Add a single transaction to the mempool, blocking if there is no space.
implAddTx ::
     ( IOLike m
     , LedgerSupportsMempool blk
     , ValidateEnvelope blk
     , HasTxId (GenTx blk)
     )
  => MempoolEnv m blk
  -> AddTxOnBehalfOf
     -- ^ Whether we're acting on behalf of a remote peer or a local client.
  -> GenTx blk
     -- ^ The transaction to add to the mempool.
  -> m (MempoolAddTxResult blk)
implAddTx :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
implAddTx MempoolEnv m blk
mpEnv AddTxOnBehalfOf
onbehalf GenTx blk
tx =
    -- To ensure fair behaviour between threads that are trying to add
    -- transactions, we make them all queue in a fifo. Only the one at the head
    -- of the queue gets to actually wait for space to get freed up in the
    -- mempool. This avoids small transactions repeatedly squeezing in ahead of
    -- larger transactions.
    --
    -- The fifo behaviour is implemented using a simple MVar. And take this
    -- MVar lock on a transaction by transaction basis. So if several threads
    -- are each trying to add several transactions, then they'll interleave at
    -- transaction granularity, not batches of transactions.
    --
    -- To add back in a bit of deliberate unfairness, we want to prioritise
    -- transactions being added on behalf of local clients, over ones being
    -- added on behalf of remote peers. We do this by using a pair of mvar
    -- fifos: remote peers must wait on both mvars, while local clients only
    -- need to wait on the second.
    case AddTxOnBehalfOf
onbehalf of
      AddTxOnBehalfOf
AddTxForRemotePeer ->
        MVar m ()
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. MVar m a -> (a -> m b) -> m b
forall (m :: * -> *) a b.
MonadMVar m =>
MVar m a -> (a -> m b) -> m b
withMVar MVar m ()
remoteFifo ((() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk))
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> a -> b
$ \() ->
        MVar m ()
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. MVar m a -> (a -> m b) -> m b
forall (m :: * -> *) a b.
MonadMVar m =>
MVar m a -> (a -> m b) -> m b
withMVar MVar m ()
allFifo ((() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk))
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> a -> b
$ \() ->
          -- This action can also block. Holding the MVars means
          -- there is only a single such thread blocking at once.
          m (MempoolAddTxResult blk)
implAddTx'

      AddTxOnBehalfOf
AddTxForLocalClient ->
        MVar m ()
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. MVar m a -> (a -> m b) -> m b
forall (m :: * -> *) a b.
MonadMVar m =>
MVar m a -> (a -> m b) -> m b
withMVar MVar m ()
allFifo ((() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk))
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> a -> b
$ \() ->
          -- As above but skip the first MVar fifo so we will get
          -- service sooner if there's lots of other remote
          -- threads waiting.
          m (MempoolAddTxResult blk)
implAddTx'
  where
    MempoolEnv {
        mpEnvAddTxsRemoteFifo :: forall (m :: * -> *) blk. MempoolEnv m blk -> MVar m ()
mpEnvAddTxsRemoteFifo = MVar m ()
remoteFifo
      , mpEnvAddTxsAllFifo :: forall (m :: * -> *) blk. MempoolEnv m blk -> MVar m ()
mpEnvAddTxsAllFifo = MVar m ()
allFifo
      , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
      } = MempoolEnv m blk
mpEnv

    implAddTx' :: m (MempoolAddTxResult blk)
implAddTx' = do
      TransactionProcessingResult _ result ev <-
        MempoolEnv m blk
-> WhetherToIntervene -> GenTx blk -> m (TransactionProcessed blk)
forall blk (m :: * -> *).
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk, IOLike m) =>
MempoolEnv m blk
-> WhetherToIntervene -> GenTx blk -> m (TransactionProcessed blk)
doAddTx
          MempoolEnv m blk
mpEnv
          (AddTxOnBehalfOf -> WhetherToIntervene
whetherToIntervene AddTxOnBehalfOf
onbehalf)
          GenTx blk
tx
      traceWith trcr ev
      return result

    whetherToIntervene :: AddTxOnBehalfOf -> WhetherToIntervene
    whetherToIntervene :: AddTxOnBehalfOf -> WhetherToIntervene
whetherToIntervene AddTxOnBehalfOf
AddTxForRemotePeer  = WhetherToIntervene
DoNotIntervene
    whetherToIntervene AddTxOnBehalfOf
AddTxForLocalClient = WhetherToIntervene
Intervene

-- | Tried to add a transaction, was it processed or is there no space left?
data TriedToAddTx blk =
    -- | Adding the next transaction would put the mempool over capacity.
    NotEnoughSpaceLeft
  | Processed (TransactionProcessed blk)

-- | The new state, if the transaction was accepted
data TransactionProcessed blk =
  TransactionProcessingResult
    (Maybe (InternalState blk))
    -- ^ If the transaction was accepted, the new state that can be written to
    -- the TVar.
    (MempoolAddTxResult blk)
    -- ^ The result of trying to add the transaction to the mempool.
    (TraceEventMempool blk)
    -- ^ The event emitted by the operation.

-- | This function returns whether the transaction was added or rejected, and
-- will block if the mempool is full.
--
-- This function returns whether the transaction was added or rejected, or if
-- the Mempool capacity is reached. See 'implAddTx' for a function that blocks
-- in case the Mempool capacity is reached.
--
-- Transactions are added one by one, updating the Mempool each time one was
-- added successfully.
--
-- See the necessary invariants on the Haddock for 'API.addTxs'.
--
-- NOTE when using V1 LedgerDB: This function does not sync the Mempool contents
-- with the ledger state in case the latter changes in a way that doesn't
-- invalidate the db changelog, it relies on the background thread to do
-- that. If the db changelog is invalidated (by rolling back the last synced
-- ledger state), it will sync in-place.
--
-- INVARIANT: The code needs that read and writes on the state are coupled
-- together or inconsistencies will arise.
doAddTx ::
     ( LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     , IOLike m
     )
  => MempoolEnv m blk
  -> WhetherToIntervene
  -> GenTx blk
     -- ^ The transaction to add to the mempool.
  -> m (TransactionProcessed blk)
doAddTx :: forall blk (m :: * -> *).
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk, IOLike m) =>
MempoolEnv m blk
-> WhetherToIntervene -> GenTx blk -> m (TransactionProcessed blk)
doAddTx MempoolEnv m blk
mpEnv WhetherToIntervene
wti GenTx blk
tx =
    Maybe MempoolSize -> m (TransactionProcessed blk)
doAddTx' Maybe MempoolSize
forall a. Maybe a
Nothing
  where
    MempoolEnv {
        mpEnvLedger :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger = LedgerInterface m blk
ldgrInterface
      , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerCfg (LedgerState blk)
cfg
      , mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar = StrictTMVar m (InternalState blk)
istate
      , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
      } = MempoolEnv m blk
mpEnv

    doAddTx' :: Maybe MempoolSize -> m (TransactionProcessed blk)
doAddTx' Maybe MempoolSize
mbPrevSize = do
      Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr (TraceEventMempool blk -> m ()) -> TraceEventMempool blk -> m ()
forall a b. (a -> b) -> a -> b
$ GenTx blk -> TraceEventMempool blk
forall blk. GenTx blk -> TraceEventMempool blk
TraceMempoolAttemptingAdd GenTx blk
tx

      -- If retrying, wait until the mempool size changes before attempting to
      -- add the tx again
      let additionalCheck :: InternalState blk -> STM m ()
additionalCheck InternalState blk
is =
            case Maybe MempoolSize
mbPrevSize of
              Maybe MempoolSize
Nothing       -> () -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              Just MempoolSize
prevSize -> Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ()) -> Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is MempoolSize -> MempoolSize -> Bool
forall a. Eq a => a -> a -> Bool
/= MempoolSize
prevSize

      res <- StrictTMVar m (InternalState blk)
-> (InternalState blk -> STM m ())
-> (InternalState blk
    -> ()
    -> m (WithTMVarOutcome MempoolSize (TransactionProcessed blk),
          InternalState blk))
-> m (WithTMVarOutcome MempoolSize (TransactionProcessed blk))
forall (m :: * -> *) a b c.
IOLike m =>
StrictTMVar m a -> (a -> STM m b) -> (a -> b -> m (c, a)) -> m c
withTMVarAnd StrictTMVar m (InternalState blk)
istate InternalState blk -> STM m ()
forall {m :: * -> *} {blk}.
(MonadSTM m, TxLimits blk) =>
InternalState blk -> STM m ()
additionalCheck
       ((InternalState blk
  -> ()
  -> m (WithTMVarOutcome MempoolSize (TransactionProcessed blk),
        InternalState blk))
 -> m (WithTMVarOutcome MempoolSize (TransactionProcessed blk)))
-> (InternalState blk
    -> ()
    -> m (WithTMVarOutcome MempoolSize (TransactionProcessed blk),
          InternalState blk))
-> m (WithTMVarOutcome MempoolSize (TransactionProcessed blk))
forall a b. (a -> b) -> a -> b
$ \InternalState blk
is () -> do
          mTbs <- LedgerInterface m blk
-> Point blk
-> LedgerTables (LedgerState blk) KeysMK
-> m (Maybe (LedgerTables (LedgerState blk) ValuesMK))
forall (m :: * -> *) blk.
LedgerInterface m blk
-> Point blk
-> LedgerTables (LedgerState blk) KeysMK
-> m (Maybe (LedgerTables (LedgerState blk) ValuesMK))
getLedgerTablesAtFor LedgerInterface m blk
ldgrInterface (InternalState blk -> Point blk
forall blk. InternalState blk -> Point blk
isTip InternalState blk
is) (GenTx blk -> LedgerTables (LedgerState blk) KeysMK
forall blk.
LedgerSupportsMempool blk =>
GenTx blk -> LedgerTables (LedgerState blk) KeysMK
getTransactionKeySets GenTx blk
tx)
          case mTbs of
            Just LedgerTables (LedgerState blk) ValuesMK
tbs -> do
              Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr (TraceEventMempool blk -> m ()) -> TraceEventMempool blk -> m ()
forall a b. (a -> b) -> a -> b
$ Point blk -> TraceEventMempool blk
forall blk. Point blk -> TraceEventMempool blk
TraceMempoolLedgerFound (InternalState blk -> Point blk
forall blk. InternalState blk -> Point blk
isTip InternalState blk
is)
              case LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> InternalState blk
-> LedgerTables (LedgerState blk) ValuesMK
-> TriedToAddTx blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> InternalState blk
-> LedgerTables (LedgerState blk) ValuesMK
-> TriedToAddTx blk
pureTryAddTx LedgerCfg (LedgerState blk)
cfg WhetherToIntervene
wti GenTx blk
tx InternalState blk
is LedgerTables (LedgerState blk) ValuesMK
tbs of
                TriedToAddTx blk
NotEnoughSpaceLeft -> do
                  (WithTMVarOutcome MempoolSize (TransactionProcessed blk),
 InternalState blk)
-> m (WithTMVarOutcome MempoolSize (TransactionProcessed blk),
      InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MempoolSize
-> WithTMVarOutcome MempoolSize (TransactionProcessed blk)
forall retry ok. retry -> WithTMVarOutcome retry ok
Retry (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is), InternalState blk
is)
                Processed outcome :: TransactionProcessed blk
outcome@(TransactionProcessingResult Maybe (InternalState blk)
is' MempoolAddTxResult blk
_ TraceEventMempool blk
_) -> do
                  (WithTMVarOutcome MempoolSize (TransactionProcessed blk),
 InternalState blk)
-> m (WithTMVarOutcome MempoolSize (TransactionProcessed blk),
      InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TransactionProcessed blk
-> WithTMVarOutcome MempoolSize (TransactionProcessed blk)
forall retry ok. ok -> WithTMVarOutcome retry ok
OK TransactionProcessed blk
outcome, InternalState blk -> Maybe (InternalState blk) -> InternalState blk
forall a. a -> Maybe a -> a
fromMaybe InternalState blk
is Maybe (InternalState blk)
is')
            Maybe (LedgerTables (LedgerState blk) ValuesMK)
Nothing -> do
              Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr (TraceEventMempool blk -> m ()) -> TraceEventMempool blk -> m ()
forall a b. (a -> b) -> a -> b
$ Point blk -> TraceEventMempool blk
forall blk. Point blk -> TraceEventMempool blk
TraceMempoolLedgerNotFound (InternalState blk -> Point blk
forall blk. InternalState blk -> Point blk
isTip InternalState blk
is)
              -- We couldn't retrieve the values because the state is no longer on
              -- the db. We need to resync.
              (WithTMVarOutcome MempoolSize (TransactionProcessed blk),
 InternalState blk)
-> m (WithTMVarOutcome MempoolSize (TransactionProcessed blk),
      InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WithTMVarOutcome MempoolSize (TransactionProcessed blk)
forall retry ok. WithTMVarOutcome retry ok
Resync, InternalState blk
is)
      case res of
        Retry MempoolSize
s' -> Maybe MempoolSize -> m (TransactionProcessed blk)
doAddTx' (MempoolSize -> Maybe MempoolSize
forall a. a -> Maybe a
Just MempoolSize
s')
        OK TransactionProcessed blk
outcome -> TransactionProcessed blk -> m (TransactionProcessed blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TransactionProcessed blk
outcome
        WithTMVarOutcome MempoolSize (TransactionProcessed blk)
Resync -> do
          m (MempoolSnapshot blk) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (MempoolSnapshot blk) -> m ())
-> m (MempoolSnapshot blk) -> m ()
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv
          Maybe MempoolSize -> m (TransactionProcessed blk)
doAddTx' Maybe MempoolSize
mbPrevSize

data WithTMVarOutcome retry ok =
    Retry !retry
  | OK ok
  | Resync

pureTryAddTx ::
     ( LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     )
  => LedgerCfg (LedgerState blk)
     -- ^ The ledger configuration.
  -> WhetherToIntervene
  -> GenTx blk
     -- ^ The transaction to add to the mempool.
  -> InternalState blk
     -- ^ The current internal state of the mempool.
  -> LedgerTables (LedgerState blk) ValuesMK
  -> TriedToAddTx blk
pureTryAddTx :: forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> InternalState blk
-> LedgerTables (LedgerState blk) ValuesMK
-> TriedToAddTx blk
pureTryAddTx LedgerCfg (LedgerState blk)
cfg WhetherToIntervene
wti GenTx blk
tx InternalState blk
is LedgerTables (LedgerState blk) ValuesMK
values =
  let st :: TickedLedgerState blk ValuesMK
st = LedgerTables (LedgerState blk) ValuesMK
-> LedgerTables (LedgerState blk) KeysMK
-> TickedLedgerState blk DiffMK
-> TickedLedgerState blk ValuesMK
forall blk.
LedgerSupportsMempool blk =>
LedgerTables (LedgerState blk) ValuesMK
-> LedgerTables (LedgerState blk) KeysMK
-> TickedLedgerState blk DiffMK
-> TickedLedgerState blk ValuesMK
applyMempoolDiffs LedgerTables (LedgerState blk) ValuesMK
values (GenTx blk -> LedgerTables (LedgerState blk) KeysMK
forall blk.
LedgerSupportsMempool blk =>
GenTx blk -> LedgerTables (LedgerState blk) KeysMK
getTransactionKeySets GenTx blk
tx) (InternalState blk -> TickedLedgerState blk DiffMK
forall blk. InternalState blk -> TickedLedgerState blk DiffMK
isLedgerState InternalState blk
is) in
  case Except (ApplyTxErr blk) (TxMeasure blk)
-> Either (ApplyTxErr blk) (TxMeasure blk)
forall e a. Except e a -> Either e a
runExcept (Except (ApplyTxErr blk) (TxMeasure blk)
 -> Either (ApplyTxErr blk) (TxMeasure blk))
-> Except (ApplyTxErr blk) (TxMeasure blk)
-> Either (ApplyTxErr blk) (TxMeasure blk)
forall a b. (a -> b) -> a -> b
$ LedgerCfg (LedgerState blk)
-> TickedLedgerState blk ValuesMK
-> GenTx blk
-> Except (ApplyTxErr blk) (TxMeasure blk)
forall blk.
TxLimits blk =>
LedgerConfig blk
-> TickedLedgerState blk ValuesMK
-> GenTx blk
-> Except (ApplyTxErr blk) (TxMeasure blk)
txMeasure LedgerCfg (LedgerState blk)
cfg TickedLedgerState blk ValuesMK
st GenTx blk
tx of
    Left ApplyTxErr blk
err ->
      -- The transaction does not have a valid measure (eg its ExUnits is
      -- greater than what this ledger state allows for a single transaction).
      --
      -- It might seem simpler to remove the failure case from 'txMeasure' and
      -- simply fully validate the tx before determining whether it'd fit in
      -- the mempool; that way we could reject invalid txs ASAP. However, for a
      -- valid tx, we'd pay that validation cost every time the node's
      -- selection changed, even if the tx wouldn't fit. So it'd very much be
      -- as if the mempool were effectively over capacity! What's worse, each
      -- attempt would not be using 'extendVRPrevApplied'.
      TransactionProcessed blk -> TriedToAddTx blk
forall blk. TransactionProcessed blk -> TriedToAddTx blk
Processed (TransactionProcessed blk -> TriedToAddTx blk)
-> TransactionProcessed blk -> TriedToAddTx blk
forall a b. (a -> b) -> a -> b
$ Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
forall blk.
Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
TransactionProcessingResult
        Maybe (InternalState blk)
forall a. Maybe a
Nothing
        (GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
forall blk. GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
MempoolTxRejected GenTx blk
tx ApplyTxErr blk
err)
        (GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
forall blk.
GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
TraceMempoolRejectedTx
         GenTx blk
tx
         ApplyTxErr blk
err
         (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is)
        )
    Right TxMeasure blk
txsz
      -- Check for overflow
      --
      -- No measure of a transaction can ever be negative, so the only way
      -- adding two measures could result in a smaller measure is if some
      -- modular arithmetic overflowed. Also, overflow necessarily yields a
      -- lesser result, since adding 'maxBound' is modularly equivalent to
      -- subtracting one. Recall that we're checking each individual addition.
      --
      -- We assume that the 'txMeasure' limit and the mempool capacity
      -- 'isCapacity' are much smaller than the modulus, and so this should
      -- never happen. Despite that, blocking until adding the transaction
      -- doesn't overflow seems like a reasonable way to handle this case.
     | Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ TxMeasure blk
currentSize TxMeasure blk -> TxMeasure blk -> Bool
forall a. Measure a => a -> a -> Bool
Measure.<= TxMeasure blk
currentSize TxMeasure blk -> TxMeasure blk -> TxMeasure blk
forall a. Measure a => a -> a -> a
`Measure.plus` TxMeasure blk
txsz
     ->
       TriedToAddTx blk
forall blk. TriedToAddTx blk
NotEnoughSpaceLeft
      -- We add the transaction if and only if it wouldn't overrun any component
      -- of the mempool capacity.
      --
      -- In the past, this condition was instead @TxSeq.toSize (isTxs is) <
      -- isCapacity is@. Thus the effective capacity of the mempool was
      -- actually one increment less than the reported capacity plus one
      -- transaction. That subtlety's cost paid for two benefits.
      --
      -- First, the absence of addition avoids a risk of overflow, since the
      -- transaction's sizes (eg ExUnits) have not yet been bounded by
      -- validation (which presumably enforces a low enough bound that any
      -- reasonably-sized mempool would never overflow the representation's
      -- 'maxBound').
      --
      -- Second, it is more fair, since it does not depend on the transaction
      -- at all. EG a large transaction might struggle to win the race against
      -- a firehose of tiny transactions.
      --
      -- However, we prefer to avoid the subtlety. Overflow is handled by the
      -- previous guard. And fairness is already ensured elsewhere (the 'MVar's
      -- in 'implAddTx' --- which the "Test.Consensus.Mempool.Fairness" test
      -- exercises). Moreover, the notion of "is under capacity" becomes
      -- difficult to assess independently of the pending tx when the measure
      -- is multi-dimensional; both typical options (any component is not full
      -- or every component is not full) lead to some confusing behaviors
      -- (denying some txs that would "obviously" fit and accepting some txs
      -- that "obviously" don't, respectively).
      --
      -- Even with the overflow handler, it's important that 'txMeasure'
      -- returns a well-bounded result. Otherwise, if an adversarial tx arrived
      -- that could't even fit in an empty mempool, then that thread would
      -- never release the 'MVar'. In particular, we tacitly assume here that a
      -- tx that wouldn't even fit in an empty mempool would be rejected by
      -- 'txMeasure'.
      | Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ TxMeasure blk
currentSize TxMeasure blk -> TxMeasure blk -> TxMeasure blk
forall a. Measure a => a -> a -> a
`Measure.plus` TxMeasure blk
txsz TxMeasure blk -> TxMeasure blk -> Bool
forall a. Measure a => a -> a -> Bool
Measure.<= InternalState blk -> TxMeasure blk
forall blk. InternalState blk -> TxMeasure blk
isCapacity InternalState blk
is
      ->
        TriedToAddTx blk
forall blk. TriedToAddTx blk
NotEnoughSpaceLeft
      | Bool
otherwise
      ->
        case LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> TxMeasure blk
-> LedgerTables (LedgerState blk) ValuesMK
-> TickedLedgerState blk ValuesMK
-> InternalState blk
-> (Either (ApplyTxErr blk) (Validated (GenTx blk)),
    InternalState blk)
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
LedgerConfig blk
-> WhetherToIntervene
-> GenTx blk
-> TxMeasure blk
-> LedgerTables (LedgerState blk) ValuesMK
-> TickedLedgerState blk ValuesMK
-> InternalState blk
-> (Either (ApplyTxErr blk) (Validated (GenTx blk)),
    InternalState blk)
validateNewTransaction LedgerCfg (LedgerState blk)
cfg WhetherToIntervene
wti GenTx blk
tx TxMeasure blk
txsz LedgerTables (LedgerState blk) ValuesMK
values TickedLedgerState blk ValuesMK
st InternalState blk
is of
          (Left ApplyTxErr blk
err, InternalState blk
_) ->
            TransactionProcessed blk -> TriedToAddTx blk
forall blk. TransactionProcessed blk -> TriedToAddTx blk
Processed (TransactionProcessed blk -> TriedToAddTx blk)
-> TransactionProcessed blk -> TriedToAddTx blk
forall a b. (a -> b) -> a -> b
$ Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
forall blk.
Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
TransactionProcessingResult
              Maybe (InternalState blk)
forall a. Maybe a
Nothing
              (GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
forall blk. GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
MempoolTxRejected GenTx blk
tx ApplyTxErr blk
err)
              (GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
forall blk.
GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
TraceMempoolRejectedTx
               GenTx blk
tx
               ApplyTxErr blk
err
               (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is)
              )
          (Right Validated (GenTx blk)
vtx, InternalState blk
is') ->
              TransactionProcessed blk -> TriedToAddTx blk
forall blk. TransactionProcessed blk -> TriedToAddTx blk
Processed (TransactionProcessed blk -> TriedToAddTx blk)
-> TransactionProcessed blk -> TriedToAddTx blk
forall a b. (a -> b) -> a -> b
$ Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
forall blk.
Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
TransactionProcessingResult
                (InternalState blk -> Maybe (InternalState blk)
forall a. a -> Maybe a
Just InternalState blk
is')
                (Validated (GenTx blk) -> MempoolAddTxResult blk
forall blk. Validated (GenTx blk) -> MempoolAddTxResult blk
MempoolTxAdded Validated (GenTx blk)
vtx)
                (Validated (GenTx blk)
-> MempoolSize -> MempoolSize -> TraceEventMempool blk
forall blk.
Validated (GenTx blk)
-> MempoolSize -> MempoolSize -> TraceEventMempool blk
TraceMempoolAddedTx
                  Validated (GenTx blk)
vtx
                  (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is)
                  (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is')
                )
  where
    currentSize :: TxMeasure blk
currentSize = TxSeq (TxMeasure blk) (Validated (GenTx blk)) -> TxMeasure blk
forall sz tx. Measure sz => TxSeq sz tx -> sz
TxSeq.toSize (InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
forall blk.
InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
isTxs InternalState blk
is)

{-------------------------------------------------------------------------------
  Remove transactions
-------------------------------------------------------------------------------}

-- | See 'Ouroboros.Consensus.Mempool.API.removeTxsEvenIfValid'.
implRemoveTxsEvenIfValid ::
     ( IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
   => MempoolEnv m blk
   -> NE.NonEmpty (GenTxId blk)
   -> m ()
implRemoveTxsEvenIfValid :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
implRemoveTxsEvenIfValid MempoolEnv m blk
mpEnv NonEmpty (TxId (GenTx blk))
toRemove = do
  (out :: WithTMVarOutcome Void ()) <- StrictTMVar m (InternalState blk)
-> (InternalState blk -> STM m (LedgerState blk EmptyMK))
-> (InternalState blk
    -> LedgerState blk EmptyMK
    -> m (WithTMVarOutcome Void (), InternalState blk))
-> m (WithTMVarOutcome Void ())
forall (m :: * -> *) a b c.
IOLike m =>
StrictTMVar m a -> (a -> STM m b) -> (a -> b -> m (c, a)) -> m c
withTMVarAnd StrictTMVar m (InternalState blk)
istate (STM m (LedgerState blk EmptyMK)
-> InternalState blk -> STM m (LedgerState blk EmptyMK)
forall a b. a -> b -> a
const (STM m (LedgerState blk EmptyMK)
 -> InternalState blk -> STM m (LedgerState blk EmptyMK))
-> STM m (LedgerState blk EmptyMK)
-> InternalState blk
-> STM m (LedgerState blk EmptyMK)
forall a b. (a -> b) -> a -> b
$ LedgerInterface m blk -> STM m (LedgerState blk EmptyMK)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk EmptyMK)
getCurrentLedgerState LedgerInterface m blk
ldgrInterface)
   ((InternalState blk
  -> LedgerState blk EmptyMK
  -> m (WithTMVarOutcome Void (), InternalState blk))
 -> m (WithTMVarOutcome Void ()))
-> (InternalState blk
    -> LedgerState blk EmptyMK
    -> m (WithTMVarOutcome Void (), InternalState blk))
-> m (WithTMVarOutcome Void ())
forall a b. (a -> b) -> a -> b
$ \InternalState blk
is LedgerState blk EmptyMK
ls -> do
    let toKeep :: [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
toKeep = (TxTicket (TxMeasure blk) (Validated (GenTx blk)) -> Bool)
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall a. (a -> Bool) -> [a] -> [a]
filter
                 (   (TxId (GenTx blk) -> Set (TxId (GenTx blk)) -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` [TxId (GenTx blk)] -> Set (TxId (GenTx blk))
forall a. Ord a => [a] -> Set a
Set.fromList (NonEmpty (TxId (GenTx blk)) -> [TxId (GenTx blk)]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (TxId (GenTx blk))
toRemove))
                     (TxId (GenTx blk) -> Bool)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk))
    -> TxId (GenTx blk))
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenTx blk -> TxId (GenTx blk)
forall tx. HasTxId tx => tx -> TxId tx
txId
                     (GenTx blk -> TxId (GenTx blk))
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk)) -> GenTx blk)
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> TxId (GenTx blk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Validated (GenTx blk) -> GenTx blk
forall blk.
LedgerSupportsMempool blk =>
Validated (GenTx blk) -> GenTx blk
txForgetValidated
                     (Validated (GenTx blk) -> GenTx blk)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk))
    -> Validated (GenTx blk))
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> GenTx blk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> Validated (GenTx blk)
forall sz tx. TxTicket sz tx -> tx
txTicketTx
                 )
                 (TxSeq (TxMeasure blk) (Validated (GenTx blk))
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall sz tx. TxSeq sz tx -> [TxTicket sz tx]
TxSeq.toList (TxSeq (TxMeasure blk) (Validated (GenTx blk))
 -> [TxTicket (TxMeasure blk) (Validated (GenTx blk))])
-> TxSeq (TxMeasure blk) (Validated (GenTx blk))
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall a b. (a -> b) -> a -> b
$ InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
forall blk.
InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
isTxs InternalState blk
is)
        (SlotNo
slot, TickedLedgerState blk DiffMK
ticked) = LedgerCfg (LedgerState blk)
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK)
forall blk.
(UpdateLedger blk, ValidateEnvelope blk) =>
LedgerConfig blk
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK)
tickLedgerState LedgerCfg (LedgerState blk)
cfg (LedgerState blk EmptyMK -> ForgeLedgerState blk
forall blk. LedgerState blk EmptyMK -> ForgeLedgerState blk
ForgeInUnknownSlot LedgerState blk EmptyMK
ls)
        toKeep' :: LedgerTables (LedgerState blk) KeysMK
toKeep' = (TxTicket (TxMeasure blk) (Validated (GenTx blk))
 -> LedgerTables (LedgerState blk) KeysMK)
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> LedgerTables (LedgerState blk) KeysMK
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
Foldable.foldMap' (GenTx blk -> LedgerTables (LedgerState blk) KeysMK
forall blk.
LedgerSupportsMempool blk =>
GenTx blk -> LedgerTables (LedgerState blk) KeysMK
getTransactionKeySets (GenTx blk -> LedgerTables (LedgerState blk) KeysMK)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk)) -> GenTx blk)
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> LedgerTables (LedgerState blk) KeysMK
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Validated (GenTx blk) -> GenTx blk
forall blk.
LedgerSupportsMempool blk =>
Validated (GenTx blk) -> GenTx blk
txForgetValidated (Validated (GenTx blk) -> GenTx blk)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk))
    -> Validated (GenTx blk))
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> GenTx blk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> Validated (GenTx blk)
forall sz tx. TxTicket sz tx -> tx
TxSeq.txTicketTx) [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
toKeep
    mTbs <- LedgerInterface m blk
-> Point blk
-> LedgerTables (LedgerState blk) KeysMK
-> m (Maybe (LedgerTables (LedgerState blk) ValuesMK))
forall (m :: * -> *) blk.
LedgerInterface m blk
-> Point blk
-> LedgerTables (LedgerState blk) KeysMK
-> m (Maybe (LedgerTables (LedgerState blk) ValuesMK))
getLedgerTablesAtFor LedgerInterface m blk
ldgrInterface (Point (LedgerState blk) -> Point blk
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint (LedgerState blk EmptyMK -> Point (LedgerState blk)
forall (mk :: MapKind).
LedgerState blk mk -> Point (LedgerState blk)
forall (l :: LedgerStateKind) (mk :: MapKind).
GetTip l =>
l mk -> Point l
getTip LedgerState blk EmptyMK
ls)) LedgerTables (LedgerState blk) KeysMK
toKeep'
    case mTbs of
      Maybe (LedgerTables (LedgerState blk) ValuesMK)
Nothing -> (WithTMVarOutcome Void (), InternalState blk)
-> m (WithTMVarOutcome Void (), InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WithTMVarOutcome Void ()
forall retry ok. WithTMVarOutcome retry ok
Resync, InternalState blk
is)
      Just LedgerTables (LedgerState blk) ValuesMK
tbs -> do
        let (InternalState blk
is', TraceEventMempool blk
t) = MempoolCapacityBytesOverride
-> LedgerCfg (LedgerState blk)
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> NonEmpty (TxId (GenTx blk))
-> (InternalState blk, TraceEventMempool blk)
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> NonEmpty (GenTxId blk)
-> (InternalState blk, TraceEventMempool blk)
pureRemoveTxs
                         MempoolCapacityBytesOverride
capacityOverride
                         LedgerCfg (LedgerState blk)
cfg
                         SlotNo
slot
                         TickedLedgerState blk DiffMK
ticked
                         LedgerTables (LedgerState blk) ValuesMK
tbs
                         (InternalState blk -> TicketNo
forall blk. InternalState blk -> TicketNo
isLastTicketNo InternalState blk
is)
                         [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
toKeep
                         NonEmpty (TxId (GenTx blk))
toRemove
        Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr TraceEventMempool blk
t
        (WithTMVarOutcome Void (), InternalState blk)
-> m (WithTMVarOutcome Void (), InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> WithTMVarOutcome Void ()
forall retry ok. ok -> WithTMVarOutcome retry ok
OK (), InternalState blk
is')
  case out of
    WithTMVarOutcome Void ()
Resync  -> do
      m (MempoolSnapshot blk) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (MempoolSnapshot blk) -> m ())
-> m (MempoolSnapshot blk) -> m ()
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv
      MempoolEnv m blk -> NonEmpty (TxId (GenTx blk)) -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
implRemoveTxsEvenIfValid MempoolEnv m blk
mpEnv NonEmpty (TxId (GenTx blk))
toRemove
    OK ()   -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  where
    MempoolEnv { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar         = StrictTMVar m (InternalState blk)
istate
               , mpEnvLedger :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger           = LedgerInterface m blk
ldgrInterface
               , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer           = Tracer m (TraceEventMempool blk)
trcr
               , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg        = LedgerCfg (LedgerState blk)
cfg
               , mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
capacityOverride
               } = MempoolEnv m blk
mpEnv

-- | Craft a 'RemoveTxs' that manually removes the given transactions from the
-- mempool, returning inside it an updated InternalState.
pureRemoveTxs ::
     ( LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     )
  => MempoolCapacityBytesOverride
  -> LedgerConfig blk
  -> SlotNo
  -> TickedLedgerState blk DiffMK
  -> LedgerTables (LedgerState blk) ValuesMK
  -> TicketNo
  -> [TxTicket (TxMeasure blk) (Validated (GenTx blk))] -- ^ Txs to keep
  -> NE.NonEmpty (GenTxId blk) -- ^ IDs to remove
  -> (InternalState blk, TraceEventMempool blk)
pureRemoveTxs :: forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> NonEmpty (GenTxId blk)
-> (InternalState blk, TraceEventMempool blk)
pureRemoveTxs MempoolCapacityBytesOverride
capacityOverride LedgerCfg (LedgerState blk)
lcfg SlotNo
slot TickedLedgerState blk DiffMK
lstate LedgerTables (LedgerState blk) ValuesMK
values TicketNo
tkt [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
txs NonEmpty (TxId (GenTx blk))
txIds =
    let RevalidateTxsResult InternalState blk
is' [Invalidated blk]
removed =
          MempoolCapacityBytesOverride
-> LedgerCfg (LedgerState blk)
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> RevalidateTxsResult blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> RevalidateTxsResult blk
revalidateTxsFor
            MempoolCapacityBytesOverride
capacityOverride
            LedgerCfg (LedgerState blk)
lcfg
            SlotNo
slot
            TickedLedgerState blk DiffMK
lstate
            LedgerTables (LedgerState blk) ValuesMK
values
            TicketNo
tkt
            [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
txs
        trace :: TraceEventMempool blk
trace = NonEmpty (TxId (GenTx blk))
-> [Validated (GenTx blk)] -> MempoolSize -> TraceEventMempool blk
forall blk.
NonEmpty (GenTxId blk)
-> [Validated (GenTx blk)] -> MempoolSize -> TraceEventMempool blk
TraceMempoolManuallyRemovedTxs
                  NonEmpty (TxId (GenTx blk))
txIds
                  ((Invalidated blk -> Validated (GenTx blk))
-> [Invalidated blk] -> [Validated (GenTx blk)]
forall a b. (a -> b) -> [a] -> [b]
map Invalidated blk -> Validated (GenTx blk)
forall blk. Invalidated blk -> Validated (GenTx blk)
getInvalidated [Invalidated blk]
removed)
                  (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is')
    in (InternalState blk
is', TraceEventMempool blk
trace)

{-------------------------------------------------------------------------------
  Sync with ledger
-------------------------------------------------------------------------------}

-- | See 'Ouroboros.Consensus.Mempool.API.syncWithLedger'.
implSyncWithLedger ::
     ( IOLike m
     , LedgerSupportsMempool blk
     , ValidateEnvelope blk
     , HasTxId (GenTx blk)
     )
  => MempoolEnv m blk
  -> m (MempoolSnapshot blk)
implSyncWithLedger :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv = Tracer m EnclosingTimed
-> m (MempoolSnapshot blk) -> m (MempoolSnapshot blk)
forall (m :: * -> *) a.
MonadMonotonicTime m =>
Tracer m EnclosingTimed -> m a -> m a
encloseTimedWith (EnclosingTimed -> TraceEventMempool blk
forall blk. EnclosingTimed -> TraceEventMempool blk
TraceMempoolSynced (EnclosingTimed -> TraceEventMempool blk)
-> Tracer m (TraceEventMempool blk) -> Tracer m EnclosingTimed
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer MempoolEnv m blk
mpEnv) (m (MempoolSnapshot blk) -> m (MempoolSnapshot blk))
-> m (MempoolSnapshot blk) -> m (MempoolSnapshot blk)
forall a b. (a -> b) -> a -> b
$ do
  (res :: WithTMVarOutcome Void (MempoolSnapshot blk)) <-
   StrictTMVar m (InternalState blk)
-> (InternalState blk -> STM m (LedgerState blk EmptyMK))
-> (InternalState blk
    -> LedgerState blk EmptyMK
    -> m (WithTMVarOutcome Void (MempoolSnapshot blk),
          InternalState blk))
-> m (WithTMVarOutcome Void (MempoolSnapshot blk))
forall (m :: * -> *) a b c.
IOLike m =>
StrictTMVar m a -> (a -> STM m b) -> (a -> b -> m (c, a)) -> m c
withTMVarAnd StrictTMVar m (InternalState blk)
istate (STM m (LedgerState blk EmptyMK)
-> InternalState blk -> STM m (LedgerState blk EmptyMK)
forall a b. a -> b -> a
const (STM m (LedgerState blk EmptyMK)
 -> InternalState blk -> STM m (LedgerState blk EmptyMK))
-> STM m (LedgerState blk EmptyMK)
-> InternalState blk
-> STM m (LedgerState blk EmptyMK)
forall a b. (a -> b) -> a -> b
$ LedgerInterface m blk -> STM m (LedgerState blk EmptyMK)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk EmptyMK)
getCurrentLedgerState LedgerInterface m blk
ldgrInterface) ((InternalState blk
  -> LedgerState blk EmptyMK
  -> m (WithTMVarOutcome Void (MempoolSnapshot blk),
        InternalState blk))
 -> m (WithTMVarOutcome Void (MempoolSnapshot blk)))
-> (InternalState blk
    -> LedgerState blk EmptyMK
    -> m (WithTMVarOutcome Void (MempoolSnapshot blk),
          InternalState blk))
-> m (WithTMVarOutcome Void (MempoolSnapshot blk))
forall a b. (a -> b) -> a -> b
$
    \InternalState blk
is LedgerState blk EmptyMK
ls -> do
    let (SlotNo
slot, TickedLedgerState blk DiffMK
ls') = LedgerCfg (LedgerState blk)
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK)
forall blk.
(UpdateLedger blk, ValidateEnvelope blk) =>
LedgerConfig blk
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK)
tickLedgerState LedgerCfg (LedgerState blk)
cfg (ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK))
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK)
forall a b. (a -> b) -> a -> b
$ LedgerState blk EmptyMK -> ForgeLedgerState blk
forall blk. LedgerState blk EmptyMK -> ForgeLedgerState blk
ForgeInUnknownSlot LedgerState blk EmptyMK
ls
    if Point blk -> ChainHash blk
forall {k} (block :: k). Point block -> ChainHash block
pointHash (InternalState blk -> Point blk
forall blk. InternalState blk -> Point blk
isTip InternalState blk
is) ChainHash blk -> ChainHash blk -> Bool
forall a. Eq a => a -> a -> Bool
== ChainHash (LedgerState blk) -> ChainHash blk
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
ChainHash b -> ChainHash b'
castHash (LedgerState blk EmptyMK -> ChainHash (LedgerState blk)
forall (l :: LedgerStateKind) (mk :: MapKind).
GetTip l =>
l mk -> ChainHash l
getTipHash LedgerState blk EmptyMK
ls) Bool -> Bool -> Bool
&& InternalState blk -> SlotNo
forall blk. InternalState blk -> SlotNo
isSlotNo InternalState blk
is SlotNo -> SlotNo -> Bool
forall a. Eq a => a -> a -> Bool
== SlotNo
slot
      then do
        -- The tip didn't change, put the same state.
        Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr (TraceEventMempool blk -> m ()) -> TraceEventMempool blk -> m ()
forall a b. (a -> b) -> a -> b
$ Point blk -> TraceEventMempool blk
forall blk. Point blk -> TraceEventMempool blk
TraceMempoolSyncNotNeeded (InternalState blk -> Point blk
forall blk. InternalState blk -> Point blk
isTip InternalState blk
is)
        (WithTMVarOutcome Void (MempoolSnapshot blk), InternalState blk)
-> m (WithTMVarOutcome Void (MempoolSnapshot blk),
      InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MempoolSnapshot blk -> WithTMVarOutcome Void (MempoolSnapshot blk)
forall retry ok. ok -> WithTMVarOutcome retry ok
OK (InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk,
 GetTip (TickedLedgerState blk)) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS InternalState blk
is), InternalState blk
is)
      else do
        -- We need to revalidate
        let pt :: Point blk
pt = Point (LedgerState blk) -> Point blk
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint (LedgerState blk EmptyMK -> Point (LedgerState blk)
forall (mk :: MapKind).
LedgerState blk mk -> Point (LedgerState blk)
forall (l :: LedgerStateKind) (mk :: MapKind).
GetTip l =>
l mk -> Point l
getTip LedgerState blk EmptyMK
ls)
        mTbs <- LedgerInterface m blk
-> Point blk
-> LedgerTables (LedgerState blk) KeysMK
-> m (Maybe (LedgerTables (LedgerState blk) ValuesMK))
forall (m :: * -> *) blk.
LedgerInterface m blk
-> Point blk
-> LedgerTables (LedgerState blk) KeysMK
-> m (Maybe (LedgerTables (LedgerState blk) ValuesMK))
getLedgerTablesAtFor LedgerInterface m blk
ldgrInterface Point blk
pt (InternalState blk -> LedgerTables (LedgerState blk) KeysMK
forall blk.
InternalState blk -> LedgerTables (LedgerState blk) KeysMK
isTxKeys InternalState blk
is)
        case mTbs of
          Just LedgerTables (LedgerState blk) ValuesMK
tbs -> do
            let (InternalState blk
is', Maybe (TraceEventMempool blk)
mTrace) = MempoolCapacityBytesOverride
-> LedgerCfg (LedgerState blk)
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> InternalState blk
-> (InternalState blk, Maybe (TraceEventMempool blk))
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> InternalState blk
-> (InternalState blk, Maybe (TraceEventMempool blk))
pureSyncWithLedger
                                  MempoolCapacityBytesOverride
capacityOverride
                                  LedgerCfg (LedgerState blk)
cfg
                                  SlotNo
slot
                                  TickedLedgerState blk DiffMK
ls'
                                  LedgerTables (LedgerState blk) ValuesMK
tbs
                                  InternalState blk
is
            Maybe (TraceEventMempool blk)
-> (TraceEventMempool blk -> m ()) -> m ()
forall (f :: * -> *) a.
Applicative f =>
Maybe a -> (a -> f ()) -> f ()
whenJust Maybe (TraceEventMempool blk)
mTrace (Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr)
            (WithTMVarOutcome Void (MempoolSnapshot blk), InternalState blk)
-> m (WithTMVarOutcome Void (MempoolSnapshot blk),
      InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MempoolSnapshot blk -> WithTMVarOutcome Void (MempoolSnapshot blk)
forall retry ok. ok -> WithTMVarOutcome retry ok
OK (InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk,
 GetTip (TickedLedgerState blk)) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS InternalState blk
is'), InternalState blk
is')
          Maybe (LedgerTables (LedgerState blk) ValuesMK)
Nothing -> do
            -- If the point is gone, resync
            (WithTMVarOutcome Void (MempoolSnapshot blk), InternalState blk)
-> m (WithTMVarOutcome Void (MempoolSnapshot blk),
      InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WithTMVarOutcome Void (MempoolSnapshot blk)
forall retry ok. WithTMVarOutcome retry ok
Resync, InternalState blk
is)
  case res of
    OK MempoolSnapshot blk
v   -> MempoolSnapshot blk -> m (MempoolSnapshot blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MempoolSnapshot blk
v
    WithTMVarOutcome Void (MempoolSnapshot blk)
Resync -> MempoolEnv m blk -> m (MempoolSnapshot blk)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv
  where
    MempoolEnv { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar         = StrictTMVar m (InternalState blk)
istate
               , mpEnvLedger :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger           = LedgerInterface m blk
ldgrInterface
               , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer           = Tracer m (TraceEventMempool blk)
trcr
               , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg        = LedgerCfg (LedgerState blk)
cfg
               , mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
capacityOverride
               } = MempoolEnv m blk
mpEnv

-- | Create a 'SyncWithLedger' value representing the values that will need to
-- be stored for committing this synchronization with the Ledger.
--
-- See the documentation of 'runSyncWithLedger' for more context.
pureSyncWithLedger
  :: (LedgerSupportsMempool blk, HasTxId (GenTx blk))
  => MempoolCapacityBytesOverride
  -> LedgerConfig blk
  -> SlotNo
  -> TickedLedgerState blk DiffMK
  -> LedgerTables (LedgerState blk) ValuesMK
  -> InternalState blk
  -> ( InternalState blk
     , Maybe (TraceEventMempool blk)
     )
pureSyncWithLedger :: forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> InternalState blk
-> (InternalState blk, Maybe (TraceEventMempool blk))
pureSyncWithLedger MempoolCapacityBytesOverride
capacityOverride LedgerCfg (LedgerState blk)
lcfg SlotNo
slot TickedLedgerState blk DiffMK
lstate LedgerTables (LedgerState blk) ValuesMK
values InternalState blk
istate =
  let RevalidateTxsResult InternalState blk
is' [Invalidated blk]
removed =
        MempoolCapacityBytesOverride
-> LedgerCfg (LedgerState blk)
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> RevalidateTxsResult blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> RevalidateTxsResult blk
revalidateTxsFor
          MempoolCapacityBytesOverride
capacityOverride
          LedgerCfg (LedgerState blk)
lcfg
          SlotNo
slot
          TickedLedgerState blk DiffMK
lstate
          LedgerTables (LedgerState blk) ValuesMK
values
          (InternalState blk -> TicketNo
forall blk. InternalState blk -> TicketNo
isLastTicketNo InternalState blk
istate)
          (TxSeq (TxMeasure blk) (Validated (GenTx blk))
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall sz tx. TxSeq sz tx -> [TxTicket sz tx]
TxSeq.toList (TxSeq (TxMeasure blk) (Validated (GenTx blk))
 -> [TxTicket (TxMeasure blk) (Validated (GenTx blk))])
-> TxSeq (TxMeasure blk) (Validated (GenTx blk))
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall a b. (a -> b) -> a -> b
$ InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
forall blk.
InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
isTxs InternalState blk
istate)
      mTrace :: Maybe (TraceEventMempool blk)
mTrace = if [Invalidated blk] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Invalidated blk]
removed
               then
                 Maybe (TraceEventMempool blk)
forall a. Maybe a
Nothing
               else
                 TraceEventMempool blk -> Maybe (TraceEventMempool blk)
forall a. a -> Maybe a
Just (TraceEventMempool blk -> Maybe (TraceEventMempool blk))
-> TraceEventMempool blk -> Maybe (TraceEventMempool blk)
forall a b. (a -> b) -> a -> b
$ [(Validated (GenTx blk), ApplyTxErr blk)]
-> MempoolSize -> TraceEventMempool blk
forall blk.
[(Validated (GenTx blk), ApplyTxErr blk)]
-> MempoolSize -> TraceEventMempool blk
TraceMempoolRemoveTxs ((Invalidated blk -> (Validated (GenTx blk), ApplyTxErr blk))
-> [Invalidated blk] -> [(Validated (GenTx blk), ApplyTxErr blk)]
forall a b. (a -> b) -> [a] -> [b]
map (\Invalidated blk
x -> (Invalidated blk -> Validated (GenTx blk)
forall blk. Invalidated blk -> Validated (GenTx blk)
getInvalidated Invalidated blk
x, Invalidated blk -> ApplyTxErr blk
forall blk. Invalidated blk -> ApplyTxErr blk
getReason Invalidated blk
x)) [Invalidated blk]
removed) (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is')
  in (InternalState blk
is', Maybe (TraceEventMempool blk)
mTrace)