{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | Operations that update the mempool. They are internally divided in the pure
-- and impure sides of the operation.
module Ouroboros.Consensus.Mempool.Update
  ( implAddTx
  , implRemoveTxsEvenIfValid
  , implSyncWithLedger
  ) where

import Cardano.Slotting.Slot
import Control.Monad.Except (runExcept)
import Control.Tracer
import qualified Data.Foldable as Foldable
import Data.Functor.Contravariant ((>$<))
import qualified Data.List.NonEmpty as NE
import Data.Maybe (fromMaybe)
import qualified Data.Measure as Measure
import qualified Data.Set as Set
import Ouroboros.Consensus.HeaderValidation
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Ledger.Tables.Utils (emptyLedgerTables)
import Ouroboros.Consensus.Mempool.API
import Ouroboros.Consensus.Mempool.Capacity
import Ouroboros.Consensus.Mempool.Impl.Common
import Ouroboros.Consensus.Mempool.TxSeq (TxTicket (..))
import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq
import Ouroboros.Consensus.Storage.LedgerDB.Forker hiding (trace)
import Ouroboros.Consensus.Util (whenJust)
import Ouroboros.Consensus.Util.Enclose
import Ouroboros.Consensus.Util.IOLike hiding (withMVar)
import Ouroboros.Consensus.Util.NormalForm.StrictMVar
import Ouroboros.Consensus.Util.STM
import Ouroboros.Network.Block

{-------------------------------------------------------------------------------
  Add transactions
-------------------------------------------------------------------------------}

-- | Add a single transaction to the mempool, blocking if there is no space.
implAddTx ::
  ( IOLike m
  , LedgerSupportsMempool blk
  , ValidateEnvelope blk
  , HasTxId (GenTx blk)
  ) =>
  MempoolEnv m blk ->
  -- | Whether we're acting on behalf of a remote peer or a local client.
  AddTxOnBehalfOf ->
  -- | The transaction to add to the mempool.
  GenTx blk ->
  m (MempoolAddTxResult blk)
implAddTx :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk
-> AddTxOnBehalfOf -> GenTx blk -> m (MempoolAddTxResult blk)
implAddTx MempoolEnv m blk
mpEnv AddTxOnBehalfOf
onbehalf GenTx blk
tx =
  -- To ensure fair behaviour between threads that are trying to add
  -- transactions, we make them all queue in a fifo. Only the one at the head
  -- of the queue gets to actually wait for space to get freed up in the
  -- mempool. This avoids small transactions repeatedly squeezing in ahead of
  -- larger transactions.
  --
  -- The fifo behaviour is implemented using a simple MVar. And take this
  -- MVar lock on a transaction by transaction basis. So if several threads
  -- are each trying to add several transactions, then they'll interleave at
  -- transaction granularity, not batches of transactions.
  --
  -- To add back in a bit of deliberate unfairness, we want to prioritise
  -- transactions being added on behalf of local clients, over ones being
  -- added on behalf of remote peers. We do this by using a pair of mvar
  -- fifos: remote peers must wait on both mvars, while local clients only
  -- need to wait on the second.
  case AddTxOnBehalfOf
onbehalf of
    AddTxOnBehalfOf
AddTxForRemotePeer ->
      StrictMVar m ()
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m b) -> m b
withMVar StrictMVar m ()
remoteFifo ((() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk))
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> a -> b
$ \() ->
        StrictMVar m ()
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m b) -> m b
withMVar StrictMVar m ()
allFifo ((() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk))
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> a -> b
$ \() ->
          -- This action can also block. Holding the MVars means
          -- there is only a single such thread blocking at once.
          m (MempoolAddTxResult blk)
implAddTx'
    AddTxOnBehalfOf
AddTxForLocalClient ->
      StrictMVar m ()
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m b) -> m b
withMVar StrictMVar m ()
allFifo ((() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk))
-> (() -> m (MempoolAddTxResult blk)) -> m (MempoolAddTxResult blk)
forall a b. (a -> b) -> a -> b
$ \() ->
        -- As above but skip the first MVar fifo so we will get
        -- service sooner if there's lots of other remote
        -- threads waiting.
        m (MempoolAddTxResult blk)
implAddTx'
 where
  MempoolEnv
    { mpEnvAddTxsRemoteFifo :: forall (m :: * -> *) blk. MempoolEnv m blk -> StrictMVar m ()
mpEnvAddTxsRemoteFifo = StrictMVar m ()
remoteFifo
    , mpEnvAddTxsAllFifo :: forall (m :: * -> *) blk. MempoolEnv m blk -> StrictMVar m ()
mpEnvAddTxsAllFifo = StrictMVar m ()
allFifo
    , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
    } = MempoolEnv m blk
mpEnv

  implAddTx' :: m (MempoolAddTxResult blk)
implAddTx' = do
    TransactionProcessingResult _ result ev <-
      MempoolEnv m blk
-> WhetherToIntervene -> GenTx blk -> m (TransactionProcessed blk)
forall blk (m :: * -> *).
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk, IOLike m) =>
MempoolEnv m blk
-> WhetherToIntervene -> GenTx blk -> m (TransactionProcessed blk)
doAddTx
        MempoolEnv m blk
mpEnv
        (AddTxOnBehalfOf -> WhetherToIntervene
whetherToIntervene AddTxOnBehalfOf
onbehalf)
        GenTx blk
tx
    traceWith trcr ev
    return result

  whetherToIntervene :: AddTxOnBehalfOf -> WhetherToIntervene
  whetherToIntervene :: AddTxOnBehalfOf -> WhetherToIntervene
whetherToIntervene AddTxOnBehalfOf
AddTxForRemotePeer = WhetherToIntervene
DoNotIntervene
  whetherToIntervene AddTxOnBehalfOf
AddTxForLocalClient = WhetherToIntervene
Intervene

-- | Tried to add a transaction, was it processed or is there no space left?
data TriedToAddTx blk
  = -- | Adding the next transaction would put the mempool over capacity.
    NotEnoughSpaceLeft
  | Processed (TransactionProcessed blk)

-- | The new state, if the transaction was accepted
data TransactionProcessed blk
  = TransactionProcessingResult
      -- | If the transaction was accepted, the new state that can be written to
      -- the TVar.
      (Maybe (InternalState blk))
      -- | The result of trying to add the transaction to the mempool.
      (MempoolAddTxResult blk)
      -- | The event emitted by the operation.
      (TraceEventMempool blk)

-- | This function returns whether the transaction was added or rejected, and
-- will block if the mempool is full.
--
-- This function returns whether the transaction was added or rejected, or if
-- the Mempool capacity is reached. See 'implAddTx' for a function that blocks
-- in case the Mempool capacity is reached.
--
-- Transactions are added one by one, updating the Mempool each time one was
-- added successfully.
--
-- See the necessary invariants on the Haddock for 'API.addTxs'.
--
-- NOTE when using V1 LedgerDB: This function does not sync the Mempool contents
-- with the ledger state in case the latter changes in a way that doesn't
-- invalidate the db changelog, it relies on the background thread to do
-- that. If the db changelog is invalidated (by rolling back the last synced
-- ledger state), it will sync in-place.
--
-- INVARIANT: The code needs that read and writes on the state are coupled
-- together or inconsistencies will arise.
doAddTx ::
  ( LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  , IOLike m
  ) =>
  MempoolEnv m blk ->
  WhetherToIntervene ->
  -- | The transaction to add to the mempool.
  GenTx blk ->
  m (TransactionProcessed blk)
doAddTx :: forall blk (m :: * -> *).
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk, IOLike m) =>
MempoolEnv m blk
-> WhetherToIntervene -> GenTx blk -> m (TransactionProcessed blk)
doAddTx MempoolEnv m blk
mpEnv WhetherToIntervene
wti GenTx blk
tx =
  Maybe MempoolSize -> m (TransactionProcessed blk)
doAddTx' Maybe MempoolSize
forall a. Maybe a
Nothing
 where
  MempoolEnv
    { mpEnvForker :: forall (m :: * -> *) blk.
MempoolEnv m blk
-> StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
mpEnvForker = StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
forker
    , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerCfg (LedgerState blk)
cfg
    , mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar = StrictTMVar m (InternalState blk)
istate
    , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
    } = MempoolEnv m blk
mpEnv

  doAddTx' :: Maybe MempoolSize -> m (TransactionProcessed blk)
doAddTx' Maybe MempoolSize
mbPrevSize = do
    Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr (TraceEventMempool blk -> m ()) -> TraceEventMempool blk -> m ()
forall a b. (a -> b) -> a -> b
$ GenTx blk -> TraceEventMempool blk
forall blk. GenTx blk -> TraceEventMempool blk
TraceMempoolAttemptingAdd GenTx blk
tx

    -- If retrying, wait until the mempool size changes before attempting to
    -- add the tx again
    let additionalCheck :: InternalState blk -> STM m ()
additionalCheck InternalState blk
is =
          case Maybe MempoolSize
mbPrevSize of
            Maybe MempoolSize
Nothing -> () -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Just MempoolSize
prevSize -> Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ()) -> Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is MempoolSize -> MempoolSize -> Bool
forall a. Eq a => a -> a -> Bool
/= MempoolSize
prevSize

    res <- StrictTMVar m (InternalState blk)
-> (InternalState blk -> STM m ())
-> (InternalState blk
    -> ()
    -> m (Either MempoolSize (TransactionProcessed blk),
          InternalState blk))
-> m (Either MempoolSize (TransactionProcessed blk))
forall (m :: * -> *) a b c.
IOLike m =>
StrictTMVar m a -> (a -> STM m b) -> (a -> b -> m (c, a)) -> m c
withTMVarAnd StrictTMVar m (InternalState blk)
istate InternalState blk -> STM m ()
forall {m :: * -> *} {blk}.
(MonadSTM m, TxLimits blk) =>
InternalState blk -> STM m ()
additionalCheck ((InternalState blk
  -> ()
  -> m (Either MempoolSize (TransactionProcessed blk),
        InternalState blk))
 -> m (Either MempoolSize (TransactionProcessed blk)))
-> (InternalState blk
    -> ()
    -> m (Either MempoolSize (TransactionProcessed blk),
          InternalState blk))
-> m (Either MempoolSize (TransactionProcessed blk))
forall a b. (a -> b) -> a -> b
$
      \InternalState blk
is () -> do
        frkr <- StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
-> m (ReadOnlyForker m (LedgerState blk) blk)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
forker
        tbs <-
          castLedgerTables
            <$> roforkerReadTables frkr (castLedgerTables $ getTransactionKeySets tx)
        case pureTryAddTx cfg wti tx is tbs of
          TriedToAddTx blk
NotEnoughSpaceLeft -> do
            (Either MempoolSize (TransactionProcessed blk), InternalState blk)
-> m (Either MempoolSize (TransactionProcessed blk),
      InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MempoolSize -> Either MempoolSize (TransactionProcessed blk)
forall a b. a -> Either a b
Left (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is), InternalState blk
is)
          Processed outcome :: TransactionProcessed blk
outcome@(TransactionProcessingResult Maybe (InternalState blk)
is' MempoolAddTxResult blk
_ TraceEventMempool blk
_) -> do
            (Either MempoolSize (TransactionProcessed blk), InternalState blk)
-> m (Either MempoolSize (TransactionProcessed blk),
      InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TransactionProcessed blk
-> Either MempoolSize (TransactionProcessed blk)
forall a b. b -> Either a b
Right TransactionProcessed blk
outcome, InternalState blk -> Maybe (InternalState blk) -> InternalState blk
forall a. a -> Maybe a -> a
fromMaybe InternalState blk
is Maybe (InternalState blk)
is')
    either (doAddTx' . Just) pure res

pureTryAddTx ::
  ( LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  ) =>
  -- | The ledger configuration.
  LedgerCfg (LedgerState blk) ->
  WhetherToIntervene ->
  -- | The transaction to add to the mempool.
  GenTx blk ->
  -- | The current internal state of the mempool.
  InternalState blk ->
  LedgerTables (LedgerState blk) ValuesMK ->
  TriedToAddTx blk
pureTryAddTx :: forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> InternalState blk
-> LedgerTables (LedgerState blk) ValuesMK
-> TriedToAddTx blk
pureTryAddTx LedgerCfg (LedgerState blk)
cfg WhetherToIntervene
wti GenTx blk
tx InternalState blk
is LedgerTables (LedgerState blk) ValuesMK
values =
  let st :: TickedLedgerState blk ValuesMK
st =
        LedgerTables (LedgerState blk) ValuesMK
-> LedgerTables (LedgerState blk) KeysMK
-> TickedLedgerState blk DiffMK
-> TickedLedgerState blk ValuesMK
forall blk.
LedgerSupportsMempool blk =>
LedgerTables (LedgerState blk) ValuesMK
-> LedgerTables (LedgerState blk) KeysMK
-> TickedLedgerState blk DiffMK
-> TickedLedgerState blk ValuesMK
applyMempoolDiffs
          LedgerTables (LedgerState blk) ValuesMK
values
          (GenTx blk -> LedgerTables (LedgerState blk) KeysMK
forall blk.
LedgerSupportsMempool blk =>
GenTx blk -> LedgerTables (LedgerState blk) KeysMK
getTransactionKeySets GenTx blk
tx)
          (InternalState blk -> TickedLedgerState blk DiffMK
forall blk. InternalState blk -> TickedLedgerState blk DiffMK
isLedgerState InternalState blk
is)
   in case Except (ApplyTxErr blk) (TxMeasure blk)
-> Either (ApplyTxErr blk) (TxMeasure blk)
forall e a. Except e a -> Either e a
runExcept (Except (ApplyTxErr blk) (TxMeasure blk)
 -> Either (ApplyTxErr blk) (TxMeasure blk))
-> Except (ApplyTxErr blk) (TxMeasure blk)
-> Either (ApplyTxErr blk) (TxMeasure blk)
forall a b. (a -> b) -> a -> b
$ LedgerCfg (LedgerState blk)
-> TickedLedgerState blk ValuesMK
-> GenTx blk
-> Except (ApplyTxErr blk) (TxMeasure blk)
forall blk.
TxLimits blk =>
LedgerConfig blk
-> TickedLedgerState blk ValuesMK
-> GenTx blk
-> Except (ApplyTxErr blk) (TxMeasure blk)
txMeasure LedgerCfg (LedgerState blk)
cfg TickedLedgerState blk ValuesMK
st GenTx blk
tx of
        Left ApplyTxErr blk
err ->
          -- The transaction does not have a valid measure (eg its ExUnits is
          -- greater than what this ledger state allows for a single transaction).
          --
          -- It might seem simpler to remove the failure case from 'txMeasure' and
          -- simply fully validate the tx before determining whether it'd fit in
          -- the mempool; that way we could reject invalid txs ASAP. However, for a
          -- valid tx, we'd pay that validation cost every time the node's
          -- selection changed, even if the tx wouldn't fit. So it'd very much be
          -- as if the mempool were effectively over capacity! What's worse, each
          -- attempt would not be using 'extendVRPrevApplied'.
          TransactionProcessed blk -> TriedToAddTx blk
forall blk. TransactionProcessed blk -> TriedToAddTx blk
Processed (TransactionProcessed blk -> TriedToAddTx blk)
-> TransactionProcessed blk -> TriedToAddTx blk
forall a b. (a -> b) -> a -> b
$
            Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
forall blk.
Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
TransactionProcessingResult
              Maybe (InternalState blk)
forall a. Maybe a
Nothing
              (GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
forall blk. GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
MempoolTxRejected GenTx blk
tx ApplyTxErr blk
err)
              ( GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
forall blk.
GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
TraceMempoolRejectedTx
                  GenTx blk
tx
                  ApplyTxErr blk
err
                  (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is)
              )
        Right TxMeasure blk
txsz
          -- Check for overflow
          --
          -- No measure of a transaction can ever be negative, so the only way
          -- adding two measures could result in a smaller measure is if some
          -- modular arithmetic overflowed. Also, overflow necessarily yields a
          -- lesser result, since adding 'maxBound' is modularly equivalent to
          -- subtracting one. Recall that we're checking each individual addition.
          --
          -- We assume that the 'txMeasure' limit and the mempool capacity
          -- 'isCapacity' are much smaller than the modulus, and so this should
          -- never happen. Despite that, blocking until adding the transaction
          -- doesn't overflow seems like a reasonable way to handle this case.
          | Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ TxMeasure blk
currentSize TxMeasure blk -> TxMeasure blk -> Bool
forall a. Measure a => a -> a -> Bool
Measure.<= TxMeasure blk
currentSize TxMeasure blk -> TxMeasure blk -> TxMeasure blk
forall a. Measure a => a -> a -> a
`Measure.plus` TxMeasure blk
txsz ->
              TriedToAddTx blk
forall blk. TriedToAddTx blk
NotEnoughSpaceLeft
          -- We add the transaction if and only if it wouldn't overrun any component
          -- of the mempool capacity.
          --
          -- In the past, this condition was instead @TxSeq.toSize (isTxs is) <
          -- isCapacity is@. Thus the effective capacity of the mempool was
          -- actually one increment less than the reported capacity plus one
          -- transaction. That subtlety's cost paid for two benefits.
          --
          -- First, the absence of addition avoids a risk of overflow, since the
          -- transaction's sizes (eg ExUnits) have not yet been bounded by
          -- validation (which presumably enforces a low enough bound that any
          -- reasonably-sized mempool would never overflow the representation's
          -- 'maxBound').
          --
          -- Second, it is more fair, since it does not depend on the transaction
          -- at all. EG a large transaction might struggle to win the race against
          -- a firehose of tiny transactions.
          --
          -- However, we prefer to avoid the subtlety. Overflow is handled by the
          -- previous guard. And fairness is already ensured elsewhere (the 'MVar's
          -- in 'implAddTx' --- which the "Test.Consensus.Mempool.Fairness" test
          -- exercises). Moreover, the notion of "is under capacity" becomes
          -- difficult to assess independently of the pending tx when the measure
          -- is multi-dimensional; both typical options (any component is not full
          -- or every component is not full) lead to some confusing behaviors
          -- (denying some txs that would "obviously" fit and accepting some txs
          -- that "obviously" don't, respectively).
          --
          -- Even with the overflow handler, it's important that 'txMeasure'
          -- returns a well-bounded result. Otherwise, if an adversarial tx arrived
          -- that could't even fit in an empty mempool, then that thread would
          -- never release the 'MVar'. In particular, we tacitly assume here that a
          -- tx that wouldn't even fit in an empty mempool would be rejected by
          -- 'txMeasure'.
          | Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ TxMeasure blk
currentSize TxMeasure blk -> TxMeasure blk -> TxMeasure blk
forall a. Measure a => a -> a -> a
`Measure.plus` TxMeasure blk
txsz TxMeasure blk -> TxMeasure blk -> Bool
forall a. Measure a => a -> a -> Bool
Measure.<= InternalState blk -> TxMeasure blk
forall blk. InternalState blk -> TxMeasure blk
isCapacity InternalState blk
is ->
              TriedToAddTx blk
forall blk. TriedToAddTx blk
NotEnoughSpaceLeft
          | Bool
otherwise ->
              case LedgerCfg (LedgerState blk)
-> WhetherToIntervene
-> GenTx blk
-> TxMeasure blk
-> LedgerTables (LedgerState blk) ValuesMK
-> TickedLedgerState blk ValuesMK
-> InternalState blk
-> (Either (ApplyTxErr blk) (Validated (GenTx blk)),
    InternalState blk)
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
LedgerConfig blk
-> WhetherToIntervene
-> GenTx blk
-> TxMeasure blk
-> LedgerTables (LedgerState blk) ValuesMK
-> TickedLedgerState blk ValuesMK
-> InternalState blk
-> (Either (ApplyTxErr blk) (Validated (GenTx blk)),
    InternalState blk)
validateNewTransaction LedgerCfg (LedgerState blk)
cfg WhetherToIntervene
wti GenTx blk
tx TxMeasure blk
txsz LedgerTables (LedgerState blk) ValuesMK
values TickedLedgerState blk ValuesMK
st InternalState blk
is of
                (Left ApplyTxErr blk
err, InternalState blk
_) ->
                  TransactionProcessed blk -> TriedToAddTx blk
forall blk. TransactionProcessed blk -> TriedToAddTx blk
Processed (TransactionProcessed blk -> TriedToAddTx blk)
-> TransactionProcessed blk -> TriedToAddTx blk
forall a b. (a -> b) -> a -> b
$
                    Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
forall blk.
Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
TransactionProcessingResult
                      Maybe (InternalState blk)
forall a. Maybe a
Nothing
                      (GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
forall blk. GenTx blk -> ApplyTxErr blk -> MempoolAddTxResult blk
MempoolTxRejected GenTx blk
tx ApplyTxErr blk
err)
                      ( GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
forall blk.
GenTx blk -> ApplyTxErr blk -> MempoolSize -> TraceEventMempool blk
TraceMempoolRejectedTx
                          GenTx blk
tx
                          ApplyTxErr blk
err
                          (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is)
                      )
                (Right Validated (GenTx blk)
vtx, InternalState blk
is') ->
                  TransactionProcessed blk -> TriedToAddTx blk
forall blk. TransactionProcessed blk -> TriedToAddTx blk
Processed (TransactionProcessed blk -> TriedToAddTx blk)
-> TransactionProcessed blk -> TriedToAddTx blk
forall a b. (a -> b) -> a -> b
$
                    Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
forall blk.
Maybe (InternalState blk)
-> MempoolAddTxResult blk
-> TraceEventMempool blk
-> TransactionProcessed blk
TransactionProcessingResult
                      (InternalState blk -> Maybe (InternalState blk)
forall a. a -> Maybe a
Just InternalState blk
is')
                      (Validated (GenTx blk) -> MempoolAddTxResult blk
forall blk. Validated (GenTx blk) -> MempoolAddTxResult blk
MempoolTxAdded Validated (GenTx blk)
vtx)
                      ( Validated (GenTx blk)
-> MempoolSize -> MempoolSize -> TraceEventMempool blk
forall blk.
Validated (GenTx blk)
-> MempoolSize -> MempoolSize -> TraceEventMempool blk
TraceMempoolAddedTx
                          Validated (GenTx blk)
vtx
                          (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is)
                          (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is')
                      )
 where
  currentSize :: TxMeasure blk
currentSize = TxSeq (TxMeasure blk) (Validated (GenTx blk)) -> TxMeasure blk
forall sz tx. Measure sz => TxSeq sz tx -> sz
TxSeq.toSize (InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
forall blk.
InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
isTxs InternalState blk
is)

{-------------------------------------------------------------------------------
  Remove transactions
-------------------------------------------------------------------------------}

-- | See 'Ouroboros.Consensus.Mempool.API.removeTxsEvenIfValid'.
implRemoveTxsEvenIfValid ::
  ( IOLike m
  , LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  , ValidateEnvelope blk
  ) =>
  MempoolEnv m blk ->
  NE.NonEmpty (GenTxId blk) ->
  m ()
implRemoveTxsEvenIfValid :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> NonEmpty (GenTxId blk) -> m ()
implRemoveTxsEvenIfValid MempoolEnv m blk
mpEnv NonEmpty (TxId (GenTx blk))
toRemove =
  StrictTMVar m (InternalState blk)
-> (InternalState blk -> m ((), InternalState blk)) -> m ()
forall (m :: * -> *) a c.
IOLike m =>
StrictTMVar m a -> (a -> m (c, a)) -> m c
withTMVar StrictTMVar m (InternalState blk)
istate ((InternalState blk -> m ((), InternalState blk)) -> m ())
-> (InternalState blk -> m ((), InternalState blk)) -> m ()
forall a b. (a -> b) -> a -> b
$
    \InternalState blk
is -> do
      let toKeep :: [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
toKeep =
            (TxTicket (TxMeasure blk) (Validated (GenTx blk)) -> Bool)
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall a. (a -> Bool) -> [a] -> [a]
filter
              ( (TxId (GenTx blk) -> Set (TxId (GenTx blk)) -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` [TxId (GenTx blk)] -> Set (TxId (GenTx blk))
forall a. Ord a => [a] -> Set a
Set.fromList (NonEmpty (TxId (GenTx blk)) -> [TxId (GenTx blk)]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (TxId (GenTx blk))
toRemove))
                  (TxId (GenTx blk) -> Bool)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk))
    -> TxId (GenTx blk))
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenTx blk -> TxId (GenTx blk)
forall tx. HasTxId tx => tx -> TxId tx
txId
                  (GenTx blk -> TxId (GenTx blk))
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk)) -> GenTx blk)
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> TxId (GenTx blk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Validated (GenTx blk) -> GenTx blk
forall blk.
LedgerSupportsMempool blk =>
Validated (GenTx blk) -> GenTx blk
txForgetValidated
                  (Validated (GenTx blk) -> GenTx blk)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk))
    -> Validated (GenTx blk))
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> GenTx blk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> Validated (GenTx blk)
forall sz tx. TxTicket sz tx -> tx
txTicketTx
              )
              (TxSeq (TxMeasure blk) (Validated (GenTx blk))
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall sz tx. TxSeq sz tx -> [TxTicket sz tx]
TxSeq.toList (TxSeq (TxMeasure blk) (Validated (GenTx blk))
 -> [TxTicket (TxMeasure blk) (Validated (GenTx blk))])
-> TxSeq (TxMeasure blk) (Validated (GenTx blk))
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall a b. (a -> b) -> a -> b
$ InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
forall blk.
InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
isTxs InternalState blk
is)
          toKeep' :: LedgerTables (LedgerState blk) KeysMK
toKeep' = (TxTicket (TxMeasure blk) (Validated (GenTx blk))
 -> LedgerTables (LedgerState blk) KeysMK)
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> LedgerTables (LedgerState blk) KeysMK
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
Foldable.foldMap' (GenTx blk -> LedgerTables (LedgerState blk) KeysMK
forall blk.
LedgerSupportsMempool blk =>
GenTx blk -> LedgerTables (LedgerState blk) KeysMK
getTransactionKeySets (GenTx blk -> LedgerTables (LedgerState blk) KeysMK)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk)) -> GenTx blk)
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> LedgerTables (LedgerState blk) KeysMK
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Validated (GenTx blk) -> GenTx blk
forall blk.
LedgerSupportsMempool blk =>
Validated (GenTx blk) -> GenTx blk
txForgetValidated (Validated (GenTx blk) -> GenTx blk)
-> (TxTicket (TxMeasure blk) (Validated (GenTx blk))
    -> Validated (GenTx blk))
-> TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> GenTx blk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TxTicket (TxMeasure blk) (Validated (GenTx blk))
-> Validated (GenTx blk)
forall sz tx. TxTicket sz tx -> tx
TxSeq.txTicketTx) [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
toKeep
      frkr <- StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
-> m (ReadOnlyForker m (LedgerState blk) blk)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
forker
      tbs <- castLedgerTables <$> roforkerReadTables frkr (castLedgerTables toKeep')
      let (is', t) =
            pureRemoveTxs
              capacityOverride
              cfg
              (isSlotNo is)
              (isLedgerState is `withLedgerTables` emptyLedgerTables)
              tbs
              (isLastTicketNo is)
              toKeep
              toRemove
      traceWith trcr t
      pure ((), is')
 where
  MempoolEnv
    { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar = StrictTMVar m (InternalState blk)
istate
    , mpEnvForker :: forall (m :: * -> *) blk.
MempoolEnv m blk
-> StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
mpEnvForker = StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
forker
    , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
    , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerCfg (LedgerState blk)
cfg
    , mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
capacityOverride
    } = MempoolEnv m blk
mpEnv

-- | Craft a 'RemoveTxs' that manually removes the given transactions from the
-- mempool, returning inside it an updated InternalState.
pureRemoveTxs ::
  ( LedgerSupportsMempool blk
  , HasTxId (GenTx blk)
  ) =>
  MempoolCapacityBytesOverride ->
  LedgerConfig blk ->
  SlotNo ->
  TickedLedgerState blk DiffMK ->
  LedgerTables (LedgerState blk) ValuesMK ->
  TicketNo ->
  -- | Txs to keep
  [TxTicket (TxMeasure blk) (Validated (GenTx blk))] ->
  -- | IDs to remove
  NE.NonEmpty (GenTxId blk) ->
  (InternalState blk, TraceEventMempool blk)
pureRemoveTxs :: forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> NonEmpty (GenTxId blk)
-> (InternalState blk, TraceEventMempool blk)
pureRemoveTxs MempoolCapacityBytesOverride
capacityOverride LedgerCfg (LedgerState blk)
lcfg SlotNo
slot TickedLedgerState blk DiffMK
lstate LedgerTables (LedgerState blk) ValuesMK
values TicketNo
tkt [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
txs NonEmpty (TxId (GenTx blk))
txIds =
  let RevalidateTxsResult InternalState blk
is' [Invalidated blk]
removed =
        MempoolCapacityBytesOverride
-> LedgerCfg (LedgerState blk)
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> RevalidateTxsResult blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> RevalidateTxsResult blk
revalidateTxsFor
          MempoolCapacityBytesOverride
capacityOverride
          LedgerCfg (LedgerState blk)
lcfg
          SlotNo
slot
          TickedLedgerState blk DiffMK
lstate
          LedgerTables (LedgerState blk) ValuesMK
values
          TicketNo
tkt
          [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
txs
      trace :: TraceEventMempool blk
trace =
        NonEmpty (TxId (GenTx blk))
-> [Validated (GenTx blk)] -> MempoolSize -> TraceEventMempool blk
forall blk.
NonEmpty (GenTxId blk)
-> [Validated (GenTx blk)] -> MempoolSize -> TraceEventMempool blk
TraceMempoolManuallyRemovedTxs
          NonEmpty (TxId (GenTx blk))
txIds
          ((Invalidated blk -> Validated (GenTx blk))
-> [Invalidated blk] -> [Validated (GenTx blk)]
forall a b. (a -> b) -> [a] -> [b]
map Invalidated blk -> Validated (GenTx blk)
forall blk. Invalidated blk -> Validated (GenTx blk)
getInvalidated [Invalidated blk]
removed)
          (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is')
   in (InternalState blk
is', TraceEventMempool blk
trace)

{-------------------------------------------------------------------------------
  Sync with ledger
-------------------------------------------------------------------------------}

-- | See 'Ouroboros.Consensus.Mempool.API.syncWithLedger'.
implSyncWithLedger ::
  ( IOLike m
  , LedgerSupportsMempool blk
  , ValidateEnvelope blk
  , HasTxId (GenTx blk)
  ) =>
  MempoolEnv m blk ->
  m (MempoolSnapshot blk)
implSyncWithLedger :: forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, ValidateEnvelope blk,
 HasTxId (GenTx blk)) =>
MempoolEnv m blk -> m (MempoolSnapshot blk)
implSyncWithLedger MempoolEnv m blk
mpEnv =
  Tracer m EnclosingTimed
-> m (MempoolSnapshot blk) -> m (MempoolSnapshot blk)
forall (m :: * -> *) a.
MonadMonotonicTime m =>
Tracer m EnclosingTimed -> m a -> m a
encloseTimedWith (EnclosingTimed -> TraceEventMempool blk
forall blk. EnclosingTimed -> TraceEventMempool blk
TraceMempoolSynced (EnclosingTimed -> TraceEventMempool blk)
-> Tracer m (TraceEventMempool blk) -> Tracer m EnclosingTimed
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer MempoolEnv m blk
mpEnv) (m (MempoolSnapshot blk) -> m (MempoolSnapshot blk))
-> m (MempoolSnapshot blk) -> m (MempoolSnapshot blk)
forall a b. (a -> b) -> a -> b
$ do
    res <-
      -- There could possibly be a race condition if we used there the state
      -- that triggered the re-syncing in the background watcher, if a different
      -- action acquired the state before the revalidation started.
      --
      -- For that reason, we read the state again here in the same STM
      -- transaction in which we acquire the internal state of the mempool.
      --
      -- This implies that the watcher might be triggered again with the same
      -- state from the point of view of the mempool, if after the watcher saw a
      -- new state and this read for re-syncing, the state has changed. The
      -- watcher will see it once again and trigger re-validation again. Just
      -- for performance reasons, we will avoid re-validating the mempool if the
      -- state didn't change.
      StrictTMVar m (InternalState blk)
-> (InternalState blk -> STM m (MempoolLedgerDBView m blk))
-> (InternalState blk
    -> MempoolLedgerDBView m blk
    -> m (Maybe (MempoolSnapshot blk), InternalState blk))
-> m (Maybe (MempoolSnapshot blk))
forall (m :: * -> *) a b c.
IOLike m =>
StrictTMVar m a -> (a -> STM m b) -> (a -> b -> m (c, a)) -> m c
withTMVarAnd StrictTMVar m (InternalState blk)
istate (STM m (MempoolLedgerDBView m blk)
-> InternalState blk -> STM m (MempoolLedgerDBView m blk)
forall a b. a -> b -> a
const (STM m (MempoolLedgerDBView m blk)
 -> InternalState blk -> STM m (MempoolLedgerDBView m blk))
-> STM m (MempoolLedgerDBView m blk)
-> InternalState blk
-> STM m (MempoolLedgerDBView m blk)
forall a b. (a -> b) -> a -> b
$ LedgerInterface m blk
-> ResourceRegistry m -> STM m (MempoolLedgerDBView m blk)
forall (m :: * -> *) blk.
LedgerInterface m blk
-> ResourceRegistry m -> STM m (MempoolLedgerDBView m blk)
getCurrentLedgerState LedgerInterface m blk
ldgrInterface ResourceRegistry m
registry) ((InternalState blk
  -> MempoolLedgerDBView m blk
  -> m (Maybe (MempoolSnapshot blk), InternalState blk))
 -> m (Maybe (MempoolSnapshot blk)))
-> (InternalState blk
    -> MempoolLedgerDBView m blk
    -> m (Maybe (MempoolSnapshot blk), InternalState blk))
-> m (Maybe (MempoolSnapshot blk))
forall a b. (a -> b) -> a -> b
$
        \InternalState blk
is (MempoolLedgerDBView LedgerState blk EmptyMK
ls m (Either GetForkerError (ReadOnlyForker m (LedgerState blk) blk))
meFrk) -> do
          eFrk <- m (Either GetForkerError (ReadOnlyForker m (LedgerState blk) blk))
meFrk
          case eFrk of
            -- This case should happen only if the tip has moved again, this time
            -- to a separate fork, since the background thread saw a change in the
            -- tip, which should happen very rarely
            Left{} -> do
              Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr TraceEventMempool blk
forall blk. TraceEventMempool blk
TraceMempoolTipMovedBetweenSTMBlocks
              (Maybe (MempoolSnapshot blk), InternalState blk)
-> m (Maybe (MempoolSnapshot blk), InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (MempoolSnapshot blk)
forall a. Maybe a
Nothing, InternalState blk
is)
            Right ReadOnlyForker m (LedgerState blk) blk
frk -> do
              let (SlotNo
slot, TickedLedgerState blk DiffMK
ls') = LedgerCfg (LedgerState blk)
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK)
forall blk.
(UpdateLedger blk, ValidateEnvelope blk) =>
LedgerConfig blk
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK)
tickLedgerState LedgerCfg (LedgerState blk)
cfg (ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK))
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk DiffMK)
forall a b. (a -> b) -> a -> b
$ LedgerState blk EmptyMK -> ForgeLedgerState blk
forall blk. LedgerState blk EmptyMK -> ForgeLedgerState blk
ForgeInUnknownSlot LedgerState blk EmptyMK
ls
              if Point blk -> ChainHash blk
forall {k} (block :: k). Point block -> ChainHash block
pointHash (InternalState blk -> Point blk
forall blk. InternalState blk -> Point blk
isTip InternalState blk
is) ChainHash blk -> ChainHash blk -> Bool
forall a. Eq a => a -> a -> Bool
== ChainHash (LedgerState blk) -> ChainHash blk
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
ChainHash b -> ChainHash b'
castHash (LedgerState blk EmptyMK -> ChainHash (LedgerState blk)
forall (l :: LedgerStateKind) (mk :: MapKind).
GetTip l =>
l mk -> ChainHash l
getTipHash LedgerState blk EmptyMK
ls) Bool -> Bool -> Bool
&& InternalState blk -> SlotNo
forall blk. InternalState blk -> SlotNo
isSlotNo InternalState blk
is SlotNo -> SlotNo -> Bool
forall a. Eq a => a -> a -> Bool
== SlotNo
slot
                then do
                  -- The tip didn't change, put the same state.
                  Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr (TraceEventMempool blk -> m ()) -> TraceEventMempool blk -> m ()
forall a b. (a -> b) -> a -> b
$ Point blk -> TraceEventMempool blk
forall blk. Point blk -> TraceEventMempool blk
TraceMempoolSyncNotNeeded (InternalState blk -> Point blk
forall blk. InternalState blk -> Point blk
isTip InternalState blk
is)
                  (Maybe (MempoolSnapshot blk), InternalState blk)
-> m (Maybe (MempoolSnapshot blk), InternalState blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MempoolSnapshot blk -> Maybe (MempoolSnapshot blk)
forall a. a -> Maybe a
Just (InternalState blk -> MempoolSnapshot blk
forall blk.
(HasTxId (GenTx blk), TxLimits blk,
 GetTip (TickedLedgerState blk)) =>
InternalState blk -> MempoolSnapshot blk
snapshotFromIS InternalState blk
is), InternalState blk
is)
                else do
                  -- The tip changed, we have to revalidate
                  StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
-> (ReadOnlyForker m (LedgerState blk) blk
    -> m (ReadOnlyForker m (LedgerState blk) blk))
-> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadMVar m) =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_
                    StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
forkerMVar
                    ( \ReadOnlyForker m (LedgerState blk) blk
oldFrk -> do
                        ReadOnlyForker m (LedgerState blk) blk -> m ()
forall (m :: * -> *) (l :: LedgerStateKind) blk.
ReadOnlyForker m l blk -> m ()
roforkerClose ReadOnlyForker m (LedgerState blk) blk
oldFrk
                        ReadOnlyForker m (LedgerState blk) blk
-> m (ReadOnlyForker m (LedgerState blk) blk)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ReadOnlyForker m (LedgerState blk) blk
frk
                    )
                  tbs <- LedgerTables (LedgerState blk) ValuesMK
-> LedgerTables (LedgerState blk) ValuesMK
forall (l :: LedgerStateKind) (l' :: LedgerStateKind)
       (mk :: MapKind).
SameUtxoTypes l l' =>
LedgerTables l mk -> LedgerTables l' mk
castLedgerTables (LedgerTables (LedgerState blk) ValuesMK
 -> LedgerTables (LedgerState blk) ValuesMK)
-> m (LedgerTables (LedgerState blk) ValuesMK)
-> m (LedgerTables (LedgerState blk) ValuesMK)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ReadOnlyForker m (LedgerState blk) blk
-> LedgerTables (LedgerState blk) KeysMK
-> m (LedgerTables (LedgerState blk) ValuesMK)
forall (m :: * -> *) (l :: LedgerStateKind) blk.
ReadOnlyForker m l blk
-> LedgerTables l KeysMK -> m (LedgerTables l ValuesMK)
roforkerReadTables ReadOnlyForker m (LedgerState blk) blk
frk (LedgerTables (LedgerState blk) KeysMK
-> LedgerTables (LedgerState blk) KeysMK
forall (l :: LedgerStateKind) (l' :: LedgerStateKind)
       (mk :: MapKind).
SameUtxoTypes l l' =>
LedgerTables l mk -> LedgerTables l' mk
castLedgerTables (LedgerTables (LedgerState blk) KeysMK
 -> LedgerTables (LedgerState blk) KeysMK)
-> LedgerTables (LedgerState blk) KeysMK
-> LedgerTables (LedgerState blk) KeysMK
forall a b. (a -> b) -> a -> b
$ InternalState blk -> LedgerTables (LedgerState blk) KeysMK
forall blk.
InternalState blk -> LedgerTables (LedgerState blk) KeysMK
isTxKeys InternalState blk
is)
                  let (is', mTrace) =
                        pureSyncWithLedger
                          capacityOverride
                          cfg
                          slot
                          ls'
                          tbs
                          is
                  whenJust mTrace (traceWith trcr)
                  pure (Just (snapshotFromIS is'), is')
    maybe
      (implSyncWithLedger mpEnv)
      pure
      res
 where
  MempoolEnv
    { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTMVar m (InternalState blk)
mpEnvStateVar = StrictTMVar m (InternalState blk)
istate
    , mpEnvForker :: forall (m :: * -> *) blk.
MempoolEnv m blk
-> StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
mpEnvForker = StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
forkerMVar
    , mpEnvLedger :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger = LedgerInterface m blk
ldgrInterface
    , mpEnvRegistry :: forall (m :: * -> *) blk. MempoolEnv m blk -> ResourceRegistry m
mpEnvRegistry = ResourceRegistry m
registry
    , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
    , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerCfg (LedgerState blk)
cfg
    , mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
capacityOverride
    } = MempoolEnv m blk
mpEnv

-- | Create a 'SyncWithLedger' value representing the values that will need to
-- be stored for committing this synchronization with the Ledger.
--
-- See the documentation of 'runSyncWithLedger' for more context.
pureSyncWithLedger ::
  (LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
  MempoolCapacityBytesOverride ->
  LedgerConfig blk ->
  SlotNo ->
  TickedLedgerState blk DiffMK ->
  LedgerTables (LedgerState blk) ValuesMK ->
  InternalState blk ->
  ( InternalState blk
  , Maybe (TraceEventMempool blk)
  )
pureSyncWithLedger :: forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> InternalState blk
-> (InternalState blk, Maybe (TraceEventMempool blk))
pureSyncWithLedger MempoolCapacityBytesOverride
capacityOverride LedgerCfg (LedgerState blk)
lcfg SlotNo
slot TickedLedgerState blk DiffMK
lstate LedgerTables (LedgerState blk) ValuesMK
values InternalState blk
istate =
  let RevalidateTxsResult InternalState blk
is' [Invalidated blk]
removed =
        MempoolCapacityBytesOverride
-> LedgerCfg (LedgerState blk)
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> RevalidateTxsResult blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
MempoolCapacityBytesOverride
-> LedgerConfig blk
-> SlotNo
-> TickedLedgerState blk DiffMK
-> LedgerTables (LedgerState blk) ValuesMK
-> TicketNo
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
-> RevalidateTxsResult blk
revalidateTxsFor
          MempoolCapacityBytesOverride
capacityOverride
          LedgerCfg (LedgerState blk)
lcfg
          SlotNo
slot
          TickedLedgerState blk DiffMK
lstate
          LedgerTables (LedgerState blk) ValuesMK
values
          (InternalState blk -> TicketNo
forall blk. InternalState blk -> TicketNo
isLastTicketNo InternalState blk
istate)
          (TxSeq (TxMeasure blk) (Validated (GenTx blk))
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall sz tx. TxSeq sz tx -> [TxTicket sz tx]
TxSeq.toList (TxSeq (TxMeasure blk) (Validated (GenTx blk))
 -> [TxTicket (TxMeasure blk) (Validated (GenTx blk))])
-> TxSeq (TxMeasure blk) (Validated (GenTx blk))
-> [TxTicket (TxMeasure blk) (Validated (GenTx blk))]
forall a b. (a -> b) -> a -> b
$ InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
forall blk.
InternalState blk -> TxSeq (TxMeasure blk) (Validated (GenTx blk))
isTxs InternalState blk
istate)
      mTrace :: Maybe (TraceEventMempool blk)
mTrace =
        if [Invalidated blk] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Invalidated blk]
removed
          then
            Maybe (TraceEventMempool blk)
forall a. Maybe a
Nothing
          else
            TraceEventMempool blk -> Maybe (TraceEventMempool blk)
forall a. a -> Maybe a
Just (TraceEventMempool blk -> Maybe (TraceEventMempool blk))
-> TraceEventMempool blk -> Maybe (TraceEventMempool blk)
forall a b. (a -> b) -> a -> b
$
              [(Validated (GenTx blk), ApplyTxErr blk)]
-> MempoolSize -> TraceEventMempool blk
forall blk.
[(Validated (GenTx blk), ApplyTxErr blk)]
-> MempoolSize -> TraceEventMempool blk
TraceMempoolRemoveTxs ((Invalidated blk -> (Validated (GenTx blk), ApplyTxErr blk))
-> [Invalidated blk] -> [(Validated (GenTx blk), ApplyTxErr blk)]
forall a b. (a -> b) -> [a] -> [b]
map (\Invalidated blk
x -> (Invalidated blk -> Validated (GenTx blk)
forall blk. Invalidated blk -> Validated (GenTx blk)
getInvalidated Invalidated blk
x, Invalidated blk -> ApplyTxErr blk
forall blk. Invalidated blk -> ApplyTxErr blk
getReason Invalidated blk
x)) [Invalidated blk]
removed) (InternalState blk -> MempoolSize
forall blk. TxLimits blk => InternalState blk -> MempoolSize
isMempoolSize InternalState blk
is')
   in (InternalState blk
is', Maybe (TraceEventMempool blk)
mTrace)