{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
module Test.Consensus.Mempool.Fairness (
testTxSizeFairness
, tests
) where
import qualified Cardano.Slotting.Time as Time
import Control.Arrow ((***))
import Control.Concurrent (threadDelay)
import qualified Control.Concurrent.Async as Async
import Control.Exception (assert)
import Control.Monad (forever, void)
import qualified Control.Tracer as Tracer
import Data.Foldable (asum)
import qualified Data.List as List
import Data.Void (Void, vacuous)
import Ouroboros.Consensus.Config.SecurityParam as Consensus
import qualified Ouroboros.Consensus.HardFork.History as HardFork
import Ouroboros.Consensus.Ledger.SupportsMempool (ByteSize32 (..))
import qualified Ouroboros.Consensus.Ledger.SupportsMempool as Mempool
import Ouroboros.Consensus.Mempool (Mempool)
import qualified Ouroboros.Consensus.Mempool as Mempool
import qualified Ouroboros.Consensus.Mempool.Capacity as Mempool
import Ouroboros.Consensus.Util.IOLike (STM, atomically, retry)
import System.Random (randomIO)
import Test.Consensus.Mempool.Fairness.TestBlock
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.HUnit (testCase, (@?), (@?=))
import Test.Util.TestBlock (testBlockLedgerConfigFrom,
testInitLedgerWithState)
tests :: TestTree
tests :: TestTree
tests = TestName -> [TestTree] -> TestTree
testGroup TestName
"Mempool fairness"
[ TestName -> Assertion -> TestTree
testCase TestName
"There is no substantial bias in added transaction sizes" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
TestParams -> Assertion
testTxSizeFairness TestParams { mempoolMaxCapacity :: ByteSize32
mempoolMaxCapacity = Word32 -> ByteSize32
ByteSize32 Word32
100
, smallTxSize :: ByteSize32
smallTxSize = Word32 -> ByteSize32
ByteSize32 Word32
1
, largeTxSize :: ByteSize32
largeTxSize = Word32 -> ByteSize32
ByteSize32 Word32
10
, nrOftxsToCollect :: Int
nrOftxsToCollect = Int
1_000
, toleranceThreshold :: Double
toleranceThreshold = Double
0.2
}
]
type TestMempool = Mempool IO TestBlock
testTxSizeFairness :: TestParams -> IO ()
testTxSizeFairness :: TestParams -> Assertion
testTxSizeFairness TestParams { ByteSize32
mempoolMaxCapacity :: TestParams -> ByteSize32
mempoolMaxCapacity :: ByteSize32
mempoolMaxCapacity, ByteSize32
smallTxSize :: TestParams -> ByteSize32
smallTxSize :: ByteSize32
smallTxSize, ByteSize32
largeTxSize :: TestParams -> ByteSize32
largeTxSize :: ByteSize32
largeTxSize, Int
nrOftxsToCollect :: TestParams -> Int
nrOftxsToCollect :: Int
nrOftxsToCollect, Double
toleranceThreshold :: TestParams -> Double
toleranceThreshold :: Double
toleranceThreshold } = do
let
ledgerItf :: LedgerInterface IO (TestBlockWith Tx)
ledgerItf = Mempool.LedgerInterface {
getCurrentLedgerState :: STM IO (LedgerState (TestBlockWith Tx))
Mempool.getCurrentLedgerState = LedgerState (TestBlockWith Tx)
-> STM IO (LedgerState (TestBlockWith Tx))
forall a. a -> STM IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LedgerState (TestBlockWith Tx)
-> STM IO (LedgerState (TestBlockWith Tx)))
-> LedgerState (TestBlockWith Tx)
-> STM IO (LedgerState (TestBlockWith Tx))
forall a b. (a -> b) -> a -> b
$ PayloadDependentState Tx -> LedgerState (TestBlockWith Tx)
forall ptype.
PayloadDependentState ptype -> LedgerState (TestBlockWith ptype)
testInitLedgerWithState ()
}
eraParams :: EraParams
eraParams =
SecurityParam -> SlotLength -> EraParams
HardFork.defaultEraParams (Word64 -> SecurityParam
Consensus.SecurityParam Word64
10) (Integer -> SlotLength
Time.slotLengthFromSec Integer
2)
Mempool IO (TestBlockWith Tx)
mempool <- LedgerInterface IO (TestBlockWith Tx)
-> LedgerConfig (TestBlockWith Tx)
-> MempoolCapacityBytesOverride
-> Tracer IO (TraceEventMempool (TestBlockWith Tx))
-> IO (Mempool IO (TestBlockWith Tx))
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> m (Mempool m blk)
Mempool.openMempoolWithoutSyncThread
LedgerInterface IO (TestBlockWith Tx)
ledgerItf
(EraParams -> TestBlockLedgerConfig
testBlockLedgerConfigFrom EraParams
eraParams)
(ByteSize32 -> MempoolCapacityBytesOverride
Mempool.mkCapacityBytesOverride ByteSize32
mempoolMaxCapacity)
Tracer IO (TraceEventMempool (TestBlockWith Tx))
forall (m :: * -> *) a. Applicative m => Tracer m a
Tracer.nullTracer
let waitForSmallAddersToFillMempool :: Assertion
waitForSmallAddersToFillMempool = Int -> Assertion
threadDelay Int
1_000
[Tx]
txs <- [IO [Tx]] -> IO [Tx]
forall a. [IO a] -> IO a
runConcurrently [
Mempool IO (TestBlockWith Tx) -> ByteSize32 -> IO [Tx]
forall a. Mempool IO (TestBlockWith Tx) -> ByteSize32 -> IO a
adders Mempool IO (TestBlockWith Tx)
mempool ByteSize32
smallTxSize
, Assertion
waitForSmallAddersToFillMempool Assertion -> IO [Tx] -> IO [Tx]
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Mempool IO (TestBlockWith Tx) -> ByteSize32 -> IO [Tx]
forall a. Mempool IO (TestBlockWith Tx) -> ByteSize32 -> IO a
adders Mempool IO (TestBlockWith Tx)
mempool ByteSize32
largeTxSize
, Assertion
waitForSmallAddersToFillMempool Assertion -> IO [Tx] -> IO [Tx]
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Mempool IO (TestBlockWith Tx) -> Int -> IO [Tx]
remover Mempool IO (TestBlockWith Tx)
mempool Int
nrOftxsToCollect
]
let
nrSmall :: Double
nrLarge :: Double
(Double
nrSmall, Double
nrLarge) = (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Double) -> ([ByteSize32] -> Int) -> [ByteSize32] -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteSize32] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([ByteSize32] -> Double)
-> ([ByteSize32] -> Double)
-> ([ByteSize32], [ByteSize32])
-> (Double, Double)
forall b c b' c'. (b -> c) -> (b' -> c') -> (b, b') -> (c, c')
forall (a :: * -> * -> *) b c b' c'.
Arrow a =>
a b c -> a b' c' -> a (b, b') (c, c')
*** Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Double) -> ([ByteSize32] -> Int) -> [ByteSize32] -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteSize32] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length)
(([ByteSize32], [ByteSize32]) -> (Double, Double))
-> ([ByteSize32], [ByteSize32]) -> (Double, Double)
forall a b. (a -> b) -> a -> b
$ (ByteSize32 -> Bool)
-> [ByteSize32] -> ([ByteSize32], [ByteSize32])
forall a. (a -> Bool) -> [a] -> ([a], [a])
List.partition (ByteSize32 -> ByteSize32 -> Bool
forall a. Ord a => a -> a -> Bool
<= ByteSize32
smallTxSize)
([ByteSize32] -> ([ByteSize32], [ByteSize32]))
-> [ByteSize32] -> ([ByteSize32], [ByteSize32])
forall a b. (a -> b) -> a -> b
$ (Tx -> ByteSize32) -> [Tx] -> [ByteSize32]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Tx -> ByteSize32
txSize [Tx]
txs
[Tx] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Tx]
txs Int -> Int -> Assertion
forall a.
(Eq a, Show a, ?callStack::CallStack) =>
a -> a -> Assertion
@?= Int
nrOftxsToCollect
Double -> Double -> (Double, Double, Double)
forall {c}. Fractional c => c -> c -> (c, c, c)
theRatioOfTheDifferenceBetween Double
nrSmall Double
nrLarge (Double, Double, Double) -> Double -> Assertion
forall {a} {a} {a}.
(Ord a, Show a, Show a, Show a) =>
(a, a, a) -> a -> Assertion
`isBelow` Double
toleranceThreshold
where
theRatioOfTheDifferenceBetween :: c -> c -> (c, c, c)
theRatioOfTheDifferenceBetween c
x c
y = (c -> c
forall a. Num a => a -> a
abs (c
x c -> c -> c
forall a. Num a => a -> a -> a
- c
y) c -> c -> c
forall a. Fractional a => a -> a -> a
/ (c
x c -> c -> c
forall a. Num a => a -> a -> a
+ c
y), c
x, c
y)
isBelow :: (a, a, a) -> a -> Assertion
isBelow (a
ratioDiff, a
nrSmall, a
nrLarge) a
threshold = a
ratioDiff a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
threshold
Bool -> TestName -> Assertion
forall t.
(AssertionPredicable t, ?callStack::CallStack) =>
t -> TestName -> Assertion
@? ( TestName
"The difference between the number of large and small transactions added "
TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> TestName
"exeeds the threshold (" TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> a -> TestName
forall a. Show a => a -> TestName
show a
threshold TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> TestName
")\n"
TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> TestName
"Total number of small transactions that were added: " TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> a -> TestName
forall a. Show a => a -> TestName
show a
nrSmall TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> TestName
"\n"
TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> TestName
"Total number of large transactions that were added: " TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> a -> TestName
forall a. Show a => a -> TestName
show a
nrLarge TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> TestName
"\n"
TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> TestName
"Difference / Total: " TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> a -> TestName
forall a. Show a => a -> TestName
show a
ratioDiff
)
runConcurrently :: [IO a] -> IO a
runConcurrently :: forall a. [IO a] -> IO a
runConcurrently = Concurrently a -> IO a
forall a. Concurrently a -> IO a
Async.runConcurrently (Concurrently a -> IO a)
-> ([IO a] -> Concurrently a) -> [IO a] -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Concurrently a] -> Concurrently a
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum ([Concurrently a] -> Concurrently a)
-> ([IO a] -> [Concurrently a]) -> [IO a] -> Concurrently a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IO a -> Concurrently a) -> [IO a] -> [Concurrently a]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap IO a -> Concurrently a
forall a. IO a -> Concurrently a
Async.Concurrently
data TestParams = TestParams {
TestParams -> ByteSize32
mempoolMaxCapacity :: ByteSize32
, TestParams -> ByteSize32
smallTxSize :: ByteSize32
, TestParams -> ByteSize32
largeTxSize :: ByteSize32
, TestParams -> Int
nrOftxsToCollect :: Int
, TestParams -> Double
toleranceThreshold :: Double
}
adders ::
TestMempool
-> ByteSize32
-> IO a
adders :: forall a. Mempool IO (TestBlockWith Tx) -> ByteSize32 -> IO a
adders Mempool IO (TestBlockWith Tx)
mempool ByteSize32
fixedTxSize = IO Void -> IO a
forall (f :: * -> *) a. Functor f => f Void -> f a
vacuous (IO Void -> IO a) -> IO Void -> IO a
forall a b. (a -> b) -> a -> b
$ [IO Void] -> IO Void
forall a. [IO a] -> IO a
runConcurrently ([IO Void] -> IO Void) -> [IO Void] -> IO Void
forall a b. (a -> b) -> a -> b
$ (Int -> IO Void) -> [Int] -> [IO Void]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Int -> IO Void
adder [Int
0..Int
2]
where
adder :: Int -> IO Void
adder :: Int -> IO Void
adder Int
_i = Assertion -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Assertion -> IO Void) -> Assertion -> IO Void
forall a b. (a -> b) -> a -> b
$ do
Int
thisTxId <- IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
IO [MempoolAddTxResult (TestBlockWith Tx)] -> Assertion
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [MempoolAddTxResult (TestBlockWith Tx)] -> Assertion)
-> IO [MempoolAddTxResult (TestBlockWith Tx)] -> Assertion
forall a b. (a -> b) -> a -> b
$ Mempool IO (TestBlockWith Tx)
-> [GenTx (TestBlockWith Tx)]
-> IO [MempoolAddTxResult (TestBlockWith Tx)]
forall (m :: * -> *) blk (t :: * -> *).
(MonadSTM m, Traversable t) =>
Mempool m blk -> t (GenTx blk) -> m (t (MempoolAddTxResult blk))
Mempool.addTxs Mempool IO (TestBlockWith Tx)
mempool [Int -> ByteSize32 -> GenTx (TestBlockWith Tx)
mkGenTx Int
thisTxId ByteSize32
fixedTxSize]
remover ::
TestMempool
-> Int
-> IO [Tx]
remover :: Mempool IO (TestBlockWith Tx) -> Int -> IO [Tx]
remover Mempool IO (TestBlockWith Tx)
mempool Int
total = do
let result :: IO [Tx]
result = [Tx] -> Int -> IO [Tx]
forall {t}. (Eq t, Num t) => [Tx] -> t -> IO [Tx]
loop [] Int
total
[Tx]
removedTxs <- IO [Tx]
result
Bool -> IO [Tx] -> IO [Tx]
forall a. (?callStack::CallStack) => Bool -> a -> a
assert ([Tx] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Tx]
removedTxs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
total) IO [Tx]
result
where
loop :: [Tx] -> t -> IO [Tx]
loop [Tx]
txs t
0 = [Tx] -> IO [Tx]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Tx]
txs
loop [Tx]
txs t
n = do
Int -> Assertion
threadDelay Int
1000
GenTx (TestBlockWith Tx)
gtx <- STM IO (GenTx (TestBlockWith Tx)) -> IO (GenTx (TestBlockWith Tx))
forall a. (?callStack::CallStack) => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM IO (GenTx (TestBlockWith Tx))
-> IO (GenTx (TestBlockWith Tx)))
-> STM IO (GenTx (TestBlockWith Tx))
-> IO (GenTx (TestBlockWith Tx))
forall a b. (a -> b) -> a -> b
$ STM (GenTx (TestBlockWith Tx))
STM IO (GenTx (TestBlockWith Tx))
getATxFromTheMempool
Mempool IO (TestBlockWith Tx)
-> [GenTxId (TestBlockWith Tx)] -> Assertion
forall (m :: * -> *) blk. Mempool m blk -> [GenTxId blk] -> m ()
Mempool.removeTxs Mempool IO (TestBlockWith Tx)
mempool [GenTx (TestBlockWith Tx) -> GenTxId (TestBlockWith Tx)
forall tx. HasTxId tx => tx -> TxId tx
Mempool.txId GenTx (TestBlockWith Tx)
gtx]
[Tx] -> t -> IO [Tx]
loop (GenTx (TestBlockWith Tx) -> Tx
unGenTx GenTx (TestBlockWith Tx)
gtxTx -> [Tx] -> [Tx]
forall a. a -> [a] -> [a]
:[Tx]
txs) (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1)
where
getATxFromTheMempool :: STM (GenTx (TestBlockWith Tx))
getATxFromTheMempool =
Mempool IO (TestBlockWith Tx) -> STM IO [GenTx (TestBlockWith Tx)]
getTxsInSnapshot Mempool IO (TestBlockWith Tx)
mempool STM [GenTx (TestBlockWith Tx)]
-> ([GenTx (TestBlockWith Tx)] -> STM (GenTx (TestBlockWith Tx)))
-> STM (GenTx (TestBlockWith Tx))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
[] -> STM (GenTx (TestBlockWith Tx))
STM IO (GenTx (TestBlockWith Tx))
forall a. STM IO a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
GenTx (TestBlockWith Tx)
x:[GenTx (TestBlockWith Tx)]
_ -> GenTx (TestBlockWith Tx) -> STM (GenTx (TestBlockWith Tx))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure GenTx (TestBlockWith Tx)
x
getTxsInSnapshot :: Mempool IO TestBlock -> STM IO [Mempool.GenTx TestBlock]
getTxsInSnapshot :: Mempool IO (TestBlockWith Tx) -> STM IO [GenTx (TestBlockWith Tx)]
getTxsInSnapshot Mempool IO (TestBlockWith Tx)
mempool = (MempoolSnapshot (TestBlockWith Tx) -> [GenTx (TestBlockWith Tx)])
-> STM IO (MempoolSnapshot (TestBlockWith Tx))
-> STM IO [GenTx (TestBlockWith Tx)]
forall a b. (a -> b) -> STM IO a -> STM IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MempoolSnapshot (TestBlockWith Tx) -> [GenTx (TestBlockWith Tx)]
txsInSnapshot
(STM IO (MempoolSnapshot (TestBlockWith Tx))
-> STM IO [GenTx (TestBlockWith Tx)])
-> STM IO (MempoolSnapshot (TestBlockWith Tx))
-> STM IO [GenTx (TestBlockWith Tx)]
forall a b. (a -> b) -> a -> b
$ Mempool IO (TestBlockWith Tx)
-> STM IO (MempoolSnapshot (TestBlockWith Tx))
forall (m :: * -> *) blk.
Mempool m blk -> STM m (MempoolSnapshot blk)
Mempool.getSnapshot Mempool IO (TestBlockWith Tx)
mempool
where
txsInSnapshot :: MempoolSnapshot (TestBlockWith Tx) -> [GenTx (TestBlockWith Tx)]
txsInSnapshot = ((Validated (GenTx (TestBlockWith Tx)), TicketNo, ByteSize32)
-> GenTx (TestBlockWith Tx))
-> [(Validated (GenTx (TestBlockWith Tx)), TicketNo, ByteSize32)]
-> [GenTx (TestBlockWith Tx)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Validated (GenTx (TestBlockWith Tx)), TicketNo, ByteSize32)
-> GenTx (TestBlockWith Tx)
forall {b} {c}.
(Validated (GenTx (TestBlockWith Tx)), b, c)
-> GenTx (TestBlockWith Tx)
prjTx
([(Validated (GenTx (TestBlockWith Tx)), TicketNo, ByteSize32)]
-> [GenTx (TestBlockWith Tx)])
-> (MempoolSnapshot (TestBlockWith Tx)
-> [(Validated (GenTx (TestBlockWith Tx)), TicketNo, ByteSize32)])
-> MempoolSnapshot (TestBlockWith Tx)
-> [GenTx (TestBlockWith Tx)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MempoolSnapshot (TestBlockWith Tx)
-> [(Validated (GenTx (TestBlockWith Tx)), TicketNo, ByteSize32)]
forall blk.
MempoolSnapshot blk
-> [(Validated (GenTx blk), TicketNo, ByteSize32)]
Mempool.snapshotTxs
prjTx :: (Validated (GenTx (TestBlockWith Tx)), b, c)
-> GenTx (TestBlockWith Tx)
prjTx (Validated (GenTx (TestBlockWith Tx))
a, b
_b, c
_c) = Validated (GenTx (TestBlockWith Tx)) -> GenTx (TestBlockWith Tx)
forall blk.
LedgerSupportsMempool blk =>
Validated (GenTx blk) -> GenTx blk
Mempool.txForgetValidated Validated (GenTx (TestBlockWith Tx))
a :: Mempool.GenTx TestBlock