{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
( objectDiffusionInbound
, TraceObjectDiffusionInbound (..)
, ObjectDiffusionInboundError (..)
, NumObjectsProcessed (..)
) where
import Cardano.Network.NodeToNode.Version (NodeToNodeVersion)
import Cardano.Prelude (catMaybes, (&))
import Control.Concurrent.Class.MonadSTM.Strict.TVar.Checked
import Control.Exception (assert)
import Control.Monad (when)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, traceWith)
import Data.Data (Typeable)
import Data.Foldable as Foldable (foldl', toList)
import Data.List qualified as List
import Data.List.NonEmpty qualified as NonEmpty
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as Seq
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Word (Word64)
import GHC.Generics (Generic)
import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt)
import NoThunks.Class (NoThunks (..))
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
import Ouroboros.Consensus.Util.NormalForm.Invariant (noThunksInvariant)
import Ouroboros.Network.ControlMessage
import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound
import Ouroboros.Network.Protocol.ObjectDiffusion.Type
newtype NumObjectsProcessed
= NumObjectsProcessed
{ NumObjectsProcessed -> Word64
getNumObjectsProcessed :: Word64
}
deriving (NumObjectsProcessed -> NumObjectsProcessed -> Bool
(NumObjectsProcessed -> NumObjectsProcessed -> Bool)
-> (NumObjectsProcessed -> NumObjectsProcessed -> Bool)
-> Eq NumObjectsProcessed
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NumObjectsProcessed -> NumObjectsProcessed -> Bool
== :: NumObjectsProcessed -> NumObjectsProcessed -> Bool
$c/= :: NumObjectsProcessed -> NumObjectsProcessed -> Bool
/= :: NumObjectsProcessed -> NumObjectsProcessed -> Bool
Eq, Int -> NumObjectsProcessed -> ShowS
[NumObjectsProcessed] -> ShowS
NumObjectsProcessed -> String
(Int -> NumObjectsProcessed -> ShowS)
-> (NumObjectsProcessed -> String)
-> ([NumObjectsProcessed] -> ShowS)
-> Show NumObjectsProcessed
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NumObjectsProcessed -> ShowS
showsPrec :: Int -> NumObjectsProcessed -> ShowS
$cshow :: NumObjectsProcessed -> String
show :: NumObjectsProcessed -> String
$cshowList :: [NumObjectsProcessed] -> ShowS
showList :: [NumObjectsProcessed] -> ShowS
Show)
data TraceObjectDiffusionInbound objectId object
=
TraceObjectDiffusionInboundCollectedObjects Int
|
TraceObjectDiffusionInboundAddedObjects NumObjectsProcessed
|
TraceObjectDiffusionInboundRecvControlMessage ControlMessage
| TraceObjectDiffusionInboundCanRequestMoreObjects Int
| TraceObjectDiffusionInboundCannotRequestMoreObjects Int
deriving (TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
(TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool)
-> (TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool)
-> Eq (TraceObjectDiffusionInbound objectId object)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall objectId object.
TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
$c== :: forall objectId object.
TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
== :: TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
$c/= :: forall objectId object.
TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
/= :: TraceObjectDiffusionInbound objectId object
-> TraceObjectDiffusionInbound objectId object -> Bool
Eq, Int -> TraceObjectDiffusionInbound objectId object -> ShowS
[TraceObjectDiffusionInbound objectId object] -> ShowS
TraceObjectDiffusionInbound objectId object -> String
(Int -> TraceObjectDiffusionInbound objectId object -> ShowS)
-> (TraceObjectDiffusionInbound objectId object -> String)
-> ([TraceObjectDiffusionInbound objectId object] -> ShowS)
-> Show (TraceObjectDiffusionInbound objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object -> ShowS
forall objectId object.
[TraceObjectDiffusionInbound objectId object] -> ShowS
forall objectId object.
TraceObjectDiffusionInbound objectId object -> String
$cshowsPrec :: forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object -> ShowS
showsPrec :: Int -> TraceObjectDiffusionInbound objectId object -> ShowS
$cshow :: forall objectId object.
TraceObjectDiffusionInbound objectId object -> String
show :: TraceObjectDiffusionInbound objectId object -> String
$cshowList :: forall objectId object.
[TraceObjectDiffusionInbound objectId object] -> ShowS
showList :: [TraceObjectDiffusionInbound objectId object] -> ShowS
Show)
data ObjectDiffusionInboundError objectId object
= ProtocolErrorObjectsDifferentThanRequested (Set objectId) (Set objectId)
| ProtocolErrorObjectIdsNotRequested Int Int
| ProtocolErrorObjectIdsAlreadyKnown (Set objectId)
| ProtocolErrorObjectIdsDuplicate (Map objectId Int)
deriving Int -> ObjectDiffusionInboundError objectId object -> ShowS
[ObjectDiffusionInboundError objectId object] -> ShowS
ObjectDiffusionInboundError objectId object -> String
(Int -> ObjectDiffusionInboundError objectId object -> ShowS)
-> (ObjectDiffusionInboundError objectId object -> String)
-> ([ObjectDiffusionInboundError objectId object] -> ShowS)
-> Show (ObjectDiffusionInboundError objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
Show objectId =>
Int -> ObjectDiffusionInboundError objectId object -> ShowS
forall objectId object.
Show objectId =>
[ObjectDiffusionInboundError objectId object] -> ShowS
forall objectId object.
Show objectId =>
ObjectDiffusionInboundError objectId object -> String
$cshowsPrec :: forall objectId object.
Show objectId =>
Int -> ObjectDiffusionInboundError objectId object -> ShowS
showsPrec :: Int -> ObjectDiffusionInboundError objectId object -> ShowS
$cshow :: forall objectId object.
Show objectId =>
ObjectDiffusionInboundError objectId object -> String
show :: ObjectDiffusionInboundError objectId object -> String
$cshowList :: forall objectId object.
Show objectId =>
[ObjectDiffusionInboundError objectId object] -> ShowS
showList :: [ObjectDiffusionInboundError objectId object] -> ShowS
Show
instance
(Show objectId, Typeable object, Typeable objectId) =>
Exception (ObjectDiffusionInboundError objectId object)
where
displayException :: ObjectDiffusionInboundError objectId object -> String
displayException (ProtocolErrorObjectsDifferentThanRequested Set objectId
reqButNotRecvd Set objectId
recvButNotReq) =
String
"The peer replied with different objects than those we asked for: "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"requested but didn't receive "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set objectId -> String
forall a. Show a => a -> String
show Set objectId
reqButNotRecvd
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"; received but didn't requested "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set objectId -> String
forall a. Show a => a -> String
show Set objectId
recvButNotReq
displayException (ProtocolErrorObjectIdsNotRequested Int
expected Int
actual) =
String
"The peer replied with more objectIds than we asked for: expected "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
expected
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" , received "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
actual
displayException (ProtocolErrorObjectIdsAlreadyKnown Set objectId
offenders) =
String
"The peer replied with some objectIds that it has already sent us previously: "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set objectId -> String
forall a. Show a => a -> String
show Set objectId
offenders
displayException (ProtocolErrorObjectIdsDuplicate Map objectId Int
offenders) =
String
"The peer replied with a batch of objectIds containing duplicates: "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Map objectId Int -> String
forall a. Show a => a -> String
show Map objectId Int
offenders
data InboundSt objectId object = InboundSt
{ forall objectId object.
InboundSt objectId object -> NumObjectIdsReq
numIdsInFlight :: !NumObjectIdsReq
, forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo :: !(StrictSeq objectId)
, forall objectId object. InboundSt objectId object -> Set objectId
canRequestNext :: !(Set objectId)
, forall objectId object.
InboundSt objectId object -> Map objectId (Maybe object)
pendingObjects :: !(Map objectId (Maybe object))
, forall objectId object.
InboundSt objectId object -> NumObjectIdsAck
numToAckOnNextReq :: !NumObjectIdsAck
}
deriving stock (Int -> InboundSt objectId object -> ShowS
[InboundSt objectId object] -> ShowS
InboundSt objectId object -> String
(Int -> InboundSt objectId object -> ShowS)
-> (InboundSt objectId object -> String)
-> ([InboundSt objectId object] -> ShowS)
-> Show (InboundSt objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
(Show objectId, Show object) =>
Int -> InboundSt objectId object -> ShowS
forall objectId object.
(Show objectId, Show object) =>
[InboundSt objectId object] -> ShowS
forall objectId object.
(Show objectId, Show object) =>
InboundSt objectId object -> String
$cshowsPrec :: forall objectId object.
(Show objectId, Show object) =>
Int -> InboundSt objectId object -> ShowS
showsPrec :: Int -> InboundSt objectId object -> ShowS
$cshow :: forall objectId object.
(Show objectId, Show object) =>
InboundSt objectId object -> String
show :: InboundSt objectId object -> String
$cshowList :: forall objectId object.
(Show objectId, Show object) =>
[InboundSt objectId object] -> ShowS
showList :: [InboundSt objectId object] -> ShowS
Show, (forall x.
InboundSt objectId object -> Rep (InboundSt objectId object) x)
-> (forall x.
Rep (InboundSt objectId object) x -> InboundSt objectId object)
-> Generic (InboundSt objectId object)
forall x.
Rep (InboundSt objectId object) x -> InboundSt objectId object
forall x.
InboundSt objectId object -> Rep (InboundSt objectId object) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall objectId object x.
Rep (InboundSt objectId object) x -> InboundSt objectId object
forall objectId object x.
InboundSt objectId object -> Rep (InboundSt objectId object) x
$cfrom :: forall objectId object x.
InboundSt objectId object -> Rep (InboundSt objectId object) x
from :: forall x.
InboundSt objectId object -> Rep (InboundSt objectId object) x
$cto :: forall objectId object x.
Rep (InboundSt objectId object) x -> InboundSt objectId object
to :: forall x.
Rep (InboundSt objectId object) x -> InboundSt objectId object
Generic)
deriving anyclass Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
Proxy (InboundSt objectId object) -> String
(Context -> InboundSt objectId object -> IO (Maybe ThunkInfo))
-> (Context -> InboundSt objectId object -> IO (Maybe ThunkInfo))
-> (Proxy (InboundSt objectId object) -> String)
-> NoThunks (InboundSt objectId object)
forall a.
(Context -> a -> IO (Maybe ThunkInfo))
-> (Context -> a -> IO (Maybe ThunkInfo))
-> (Proxy a -> String)
-> NoThunks a
forall objectId object.
(NoThunks objectId, NoThunks object) =>
Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
forall objectId object.
(NoThunks objectId, NoThunks object) =>
Proxy (InboundSt objectId object) -> String
$cnoThunks :: forall objectId object.
(NoThunks objectId, NoThunks object) =>
Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
noThunks :: Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
$cwNoThunks :: forall objectId object.
(NoThunks objectId, NoThunks object) =>
Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
wNoThunks :: Context -> InboundSt objectId object -> IO (Maybe ThunkInfo)
$cshowTypeOf :: forall objectId object.
(NoThunks objectId, NoThunks object) =>
Proxy (InboundSt objectId object) -> String
showTypeOf :: Proxy (InboundSt objectId object) -> String
NoThunks
initialInboundSt :: InboundSt objectId object
initialInboundSt :: forall objectId object. InboundSt objectId object
initialInboundSt = NumObjectIdsReq
-> StrictSeq objectId
-> Set objectId
-> Map objectId (Maybe object)
-> NumObjectIdsAck
-> InboundSt objectId object
forall objectId object.
NumObjectIdsReq
-> StrictSeq objectId
-> Set objectId
-> Map objectId (Maybe object)
-> NumObjectIdsAck
-> InboundSt objectId object
InboundSt NumObjectIdsReq
0 StrictSeq objectId
forall a. StrictSeq a
Seq.empty Set objectId
forall a. Set a
Set.empty Map objectId (Maybe object)
forall k a. Map k a
Map.empty NumObjectIdsAck
0
objectDiffusionInbound ::
forall objectId object m.
( Ord objectId
, Show objectId
, Typeable objectId
, Typeable object
, NoThunks objectId
, NoThunks object
, MonadSTM m
, MonadThrow m
) =>
Tracer m (TraceObjectDiffusionInbound objectId object) ->
(NumObjectsUnacknowledged, NumObjectIdsReq, NumObjectsReq) ->
ObjectPoolWriter objectId object m ->
NodeToNodeVersion ->
ControlMessageSTM m ->
ObjectDiffusionInboundPipelined objectId object m ()
objectDiffusionInbound :: forall objectId object (m :: * -> *).
(Ord objectId, Show objectId, Typeable objectId, Typeable object,
NoThunks objectId, NoThunks object, MonadSTM m, MonadThrow m) =>
Tracer m (TraceObjectDiffusionInbound objectId object)
-> (NumObjectsUnacknowledged, NumObjectIdsReq, NumObjectsReq)
-> ObjectPoolWriter objectId object m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> ObjectDiffusionInboundPipelined objectId object m ()
objectDiffusionInbound
Tracer m (TraceObjectDiffusionInbound objectId object)
tracer
(NumObjectsUnacknowledged
maxFifoLength, NumObjectIdsReq
maxNumIdsToReq, NumObjectsReq
maxNumObjectsToReq)
ObjectPoolWriter{STM m (objectId -> Bool)
object -> objectId
[object] -> m ()
opwObjectId :: object -> objectId
opwAddObjects :: [object] -> m ()
opwHasObject :: STM m (objectId -> Bool)
opwAddObjects :: forall objectId object (m :: * -> *).
ObjectPoolWriter objectId object m -> [object] -> m ()
opwHasObject :: forall objectId object (m :: * -> *).
ObjectPoolWriter objectId object m -> STM m (objectId -> Bool)
opwObjectId :: forall objectId object (m :: * -> *).
ObjectPoolWriter objectId object m -> object -> objectId
..}
NodeToNodeVersion
_version
ControlMessageSTM m
controlMessageSTM =
InboundStIdle 'Z objectId object m ()
-> ObjectDiffusionInboundPipelined objectId object m ()
forall objectId object (m :: * -> *) a.
InboundStIdle 'Z objectId object m a
-> ObjectDiffusionInboundPipelined objectId object m a
ObjectDiffusionInboundPipelined (InboundStIdle 'Z objectId object m ()
-> ObjectDiffusionInboundPipelined objectId object m ())
-> InboundStIdle 'Z objectId object m ()
-> ObjectDiffusionInboundPipelined objectId object m ()
forall a b. (a -> b) -> a -> b
$!
InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
forall objectId object. InboundSt objectId object
initialInboundSt InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle 'Z objectId object m ())
-> InboundStIdle 'Z objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat 'Z
-> InboundSt objectId object
-> InboundStIdle 'Z objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero
where
canRequestMoreObjects :: InboundSt k object -> Bool
canRequestMoreObjects :: forall k. InboundSt k object -> Bool
canRequestMoreObjects !InboundSt k object
st =
Bool -> Bool
not (Set k -> Bool
forall a. Set a -> Bool
Set.null (InboundSt k object -> Set k
forall objectId object. InboundSt objectId object -> Set objectId
canRequestNext InboundSt k object
st))
numIdsToReq :: InboundSt objectId object -> NumObjectIdsReq
numIdsToReq :: InboundSt objectId object -> NumObjectIdsReq
numIdsToReq !InboundSt objectId object
st =
NumObjectIdsReq
maxNumIdsToReq
NumObjectIdsReq -> NumObjectIdsReq -> NumObjectIdsReq
forall a. Ord a => a -> a -> a
`min` ( NumObjectsUnacknowledged -> NumObjectIdsReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectsUnacknowledged
maxFifoLength
NumObjectIdsReq -> NumObjectIdsReq -> NumObjectIdsReq
forall a. Num a => a -> a -> a
- (Int -> NumObjectIdsReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> NumObjectIdsReq) -> Int -> NumObjectIdsReq
forall a b. (a -> b) -> a -> b
$ StrictSeq objectId -> Int
forall a. StrictSeq a -> Int
Seq.length (StrictSeq objectId -> Int) -> StrictSeq objectId -> Int
forall a b. (a -> b) -> a -> b
$ InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st)
NumObjectIdsReq -> NumObjectIdsReq -> NumObjectIdsReq
forall a. Num a => a -> a -> a
- InboundSt objectId object -> NumObjectIdsReq
forall objectId object.
InboundSt objectId object -> NumObjectIdsReq
numIdsInFlight InboundSt objectId object
st
)
preAcknowledge ::
InboundSt objectId object ->
(objectId -> Bool) ->
[objectId] ->
InboundSt objectId object
preAcknowledge :: InboundSt objectId object
-> (objectId -> Bool) -> [objectId] -> InboundSt objectId object
preAcknowledge !InboundSt objectId object
st objectId -> Bool
_ [objectId]
collectedIds | [objectId] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [objectId]
collectedIds = InboundSt objectId object
st
preAcknowledge !InboundSt objectId object
st objectId -> Bool
poolHasObject [objectId]
collectedIds =
let
([objectId]
alreadyObtained, [objectId]
notYetObtained) =
(objectId -> Bool) -> [objectId] -> ([objectId], [objectId])
forall a. (a -> Bool) -> [a] -> ([a], [a])
List.partition
objectId -> Bool
poolHasObject
[objectId]
collectedIds
pendingObjects' :: Map objectId (Maybe object)
pendingObjects' =
(Map objectId (Maybe object)
-> objectId -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> [objectId]
-> Map objectId (Maybe object)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl'
(\Map objectId (Maybe object)
accMap objectId
objectId -> objectId
-> Maybe object
-> Map objectId (Maybe object)
-> Map objectId (Maybe object)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert objectId
objectId Maybe object
forall a. Maybe a
Nothing Map objectId (Maybe object)
accMap)
(InboundSt objectId object -> Map objectId (Maybe object)
forall objectId object.
InboundSt objectId object -> Map objectId (Maybe object)
pendingObjects InboundSt objectId object
st)
[objectId]
alreadyObtained
outstandingFifo' :: StrictSeq objectId
outstandingFifo' = InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st StrictSeq objectId -> StrictSeq objectId -> StrictSeq objectId
forall a. Semigroup a => a -> a -> a
<> [objectId] -> StrictSeq objectId
forall a. [a] -> StrictSeq a
Seq.fromList [objectId]
collectedIds
(StrictSeq objectId
objectIdsToAck, StrictSeq objectId
outstandingFifo'') =
(objectId -> Bool)
-> StrictSeq objectId -> (StrictSeq objectId, StrictSeq objectId)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (objectId -> Map objectId (Maybe object) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map objectId (Maybe object)
pendingObjects') StrictSeq objectId
outstandingFifo'
pendingObjects'' :: Map objectId (Maybe object)
pendingObjects'' =
(Map objectId (Maybe object)
-> objectId -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> StrictSeq objectId
-> Map objectId (Maybe object)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl'
((objectId
-> Map objectId (Maybe object) -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> objectId
-> Map objectId (Maybe object)
forall a b c. (a -> b -> c) -> b -> a -> c
flip objectId
-> Map objectId (Maybe object) -> Map objectId (Maybe object)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete)
Map objectId (Maybe object)
pendingObjects'
StrictSeq objectId
objectIdsToAck
!st' :: InboundSt objectId object
st' =
InboundSt objectId object
st
{ canRequestNext = canRequestNext st <> Set.fromList notYetObtained
, pendingObjects = pendingObjects''
, outstandingFifo = outstandingFifo''
, numToAckOnNextReq =
numToAckOnNextReq st
+ fromIntegral (Seq.length objectIdsToAck)
}
in
InboundSt objectId object
st'
go ::
forall (n :: N).
Nat n ->
InboundSt objectId object ->
InboundStIdle n objectId object m ()
go :: forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go Nat n
n !InboundSt objectId object
st = m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall (m :: * -> *) (n :: N) objectId object a.
m (InboundStIdle n objectId object m a)
-> InboundStIdle n objectId object m a
WithEffect (m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ())
-> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. (a -> b) -> a -> b
$ do
ctrlMsg <- ControlMessageSTM m -> m ControlMessage
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically ControlMessageSTM m
controlMessageSTM
traceWith tracer $
TraceObjectDiffusionInboundRecvControlMessage ctrlMsg
case ctrlMsg of
ControlMessage
Terminate ->
InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$! Nat n -> InboundStIdle n objectId object m ()
forall (n :: N). Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain Nat n
n
ControlMessage
_continue -> case Nat n
n of
Nat n
Zero -> do
if InboundSt objectId object -> Bool
forall k. InboundSt k object -> Bool
canRequestMoreObjects InboundSt objectId object
st
then do
Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCanRequestMoreObjects (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n)
InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$! InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectsAndObjectIdsPipelined Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero
else do
Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCannotRequestMoreObjects (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n)
InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$! InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& InboundSt objectId object -> InboundStIdle n objectId object m ()
InboundSt objectId object -> InboundStIdle 'Z objectId object m ()
goReqObjectIdsBlocking
Succ Nat n
n' ->
if InboundSt objectId object -> Bool
forall k. InboundSt k object -> Bool
canRequestMoreObjects InboundSt objectId object
st
then do
Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCanRequestMoreObjects (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n)
InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$!
Maybe (InboundStIdle ('S n) objectId object m ())
-> (Collect objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall (n1 :: N) objectId object (m :: * -> *) a.
Maybe (InboundStIdle ('S n1) objectId object m a)
-> (Collect objectId object
-> InboundStIdle n1 objectId object m a)
-> InboundStIdle ('S n1) objectId object m a
CollectPipelined
(InboundStIdle ('S n) objectId object m ()
-> Maybe (InboundStIdle ('S n) objectId object m ())
forall a. a -> Maybe a
Just (InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle ('S n) objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat ('S n)
-> InboundSt objectId object
-> InboundStIdle ('S n) objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectsAndObjectIdsPipelined (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n')))
(\Collect objectId object
collected -> InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goCollect Nat n
n' Collect objectId object
collected)
else do
Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCannotRequestMoreObjects (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n)
InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$!
Maybe (InboundStIdle ('S n) objectId object m ())
-> (Collect objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall (n1 :: N) objectId object (m :: * -> *) a.
Maybe (InboundStIdle ('S n1) objectId object m a)
-> (Collect objectId object
-> InboundStIdle n1 objectId object m a)
-> InboundStIdle ('S n1) objectId object m a
CollectPipelined
Maybe (InboundStIdle ('S n) objectId object m ())
forall a. Maybe a
Nothing
(\Collect objectId object
collected -> InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goCollect Nat n
n' Collect objectId object
collected)
goCollect ::
forall (n :: N).
Nat n ->
Collect objectId object ->
InboundSt objectId object ->
InboundStIdle n objectId object m ()
goCollect :: forall (n :: N).
Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goCollect Nat n
n Collect objectId object
collect !InboundSt objectId object
st = case Collect objectId object
collect of
CollectObjectIds NumObjectIdsReq
numIdsRequested [objectId]
collectedIds -> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall (m :: * -> *) (n :: N) objectId object a.
m (InboundStIdle n objectId object m a)
-> InboundStIdle n objectId object m a
WithEffect (m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ())
-> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. (a -> b) -> a -> b
$ do
let numCollectedIds :: Int
numCollectedIds = [objectId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [objectId]
collectedIds
collectedIdsMap :: Map objectId Int
collectedIdsMap = (Int -> Int -> Int) -> [(objectId, Int)] -> Map objectId Int
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
Map.fromListWith Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) [(objectId
x, Int
1 :: Int) | objectId
x <- [objectId]
collectedIds]
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
numCollectedIds Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> NumObjectIdsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsRequested) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionInboundError objectId object -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
( forall objectId object.
Int -> Int -> ObjectDiffusionInboundError objectId object
ProtocolErrorObjectIdsNotRequested @objectId @object
(NumObjectIdsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsRequested)
Int
numCollectedIds
)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Map objectId Int -> Int
forall k a. Map k a -> Int
Map.size Map objectId Int
collectedIdsMap Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
numCollectedIds) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionInboundError objectId object -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
( forall objectId object.
Map objectId Int -> ObjectDiffusionInboundError objectId object
ProtocolErrorObjectIdsDuplicate @objectId @object (Map objectId Int -> ObjectDiffusionInboundError objectId object)
-> Map objectId Int -> ObjectDiffusionInboundError objectId object
forall a b. (a -> b) -> a -> b
$
(Int -> Bool) -> Map objectId Int -> Map objectId Int
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1) (Map objectId Int -> Map objectId Int)
-> Map objectId Int -> Map objectId Int
forall a b. (a -> b) -> a -> b
$
Map objectId Int
collectedIdsMap
)
let alreadyKnownIds :: StrictSeq objectId
alreadyKnownIds = (objectId -> Bool) -> StrictSeq objectId -> StrictSeq objectId
forall a. (a -> Bool) -> StrictSeq a -> StrictSeq a
Seq.filter (objectId -> Map objectId Int -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map objectId Int
collectedIdsMap) (InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (StrictSeq objectId -> Bool
forall a. StrictSeq a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null StrictSeq objectId
alreadyKnownIds)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionInboundError objectId object -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
( forall objectId object.
Set objectId -> ObjectDiffusionInboundError objectId object
ProtocolErrorObjectIdsAlreadyKnown @objectId @object (Set objectId -> ObjectDiffusionInboundError objectId object)
-> Set objectId -> ObjectDiffusionInboundError objectId object
forall a b. (a -> b) -> a -> b
$
[objectId] -> Set objectId
forall a. Ord a => [a] -> Set a
Set.fromList (StrictSeq objectId -> [objectId]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq objectId
alreadyKnownIds)
)
let !st' :: InboundSt objectId object
st' = InboundSt objectId object
st{numIdsInFlight = numIdsInFlight st - numIdsRequested}
poolHasObject <- STM m (objectId -> Bool) -> m (objectId -> Bool)
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (objectId -> Bool) -> m (objectId -> Bool))
-> STM m (objectId -> Bool) -> m (objectId -> Bool)
forall a b. (a -> b) -> a -> b
$ STM m (objectId -> Bool)
opwHasObject
let !st'' = InboundSt objectId object
-> (objectId -> Bool) -> [objectId] -> InboundSt objectId object
preAcknowledge InboundSt objectId object
st' objectId -> Bool
poolHasObject [objectId]
collectedIds
pure $! checkState st'' & go n
CollectObjects [objectId]
requestedIds [object]
collectedObjects -> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall (m :: * -> *) (n :: N) objectId object a.
m (InboundStIdle n objectId object m a)
-> InboundStIdle n objectId object m a
WithEffect (m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ())
-> m (InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. (a -> b) -> a -> b
$ do
let requestedIdsSet :: Set objectId
requestedIdsSet = [objectId] -> Set objectId
forall a. Ord a => [a] -> Set a
Set.fromList [objectId]
requestedIds
obtainedIdsSet :: Set objectId
obtainedIdsSet = [objectId] -> Set objectId
forall a. Ord a => [a] -> Set a
Set.fromList (object -> objectId
opwObjectId (object -> objectId) -> [object] -> [objectId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [object]
collectedObjects)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set objectId
requestedIdsSet Set objectId -> Set objectId -> Bool
forall a. Eq a => a -> a -> Bool
/= Set objectId
obtainedIdsSet) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
let reqButNotRecvd :: Set objectId
reqButNotRecvd = Set objectId
requestedIdsSet Set objectId -> Set objectId -> Set objectId
forall a. Ord a => Set a -> Set a -> Set a
`Set.difference` Set objectId
obtainedIdsSet
recvButNotReq :: Set objectId
recvButNotReq = Set objectId
obtainedIdsSet Set objectId -> Set objectId -> Set objectId
forall a. Ord a => Set a -> Set a -> Set a
`Set.difference` Set objectId
requestedIdsSet
in ObjectDiffusionInboundError objectId object -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
( forall objectId object.
Set objectId
-> Set objectId -> ObjectDiffusionInboundError objectId object
ProtocolErrorObjectsDifferentThanRequested @objectId @object
Set objectId
reqButNotRecvd
Set objectId
recvButNotReq
)
Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
Int -> TraceObjectDiffusionInbound objectId object
forall objectId object.
Int -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundCollectedObjects ([object] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [object]
collectedObjects)
let pendingObjects' :: Map objectId (Maybe object)
pendingObjects' =
(Map objectId (Maybe object)
-> object -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> [object]
-> Map objectId (Maybe object)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl'
(\Map objectId (Maybe object)
accMap object
object -> objectId
-> Maybe object
-> Map objectId (Maybe object)
-> Map objectId (Maybe object)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (object -> objectId
opwObjectId object
object) (object -> Maybe object
forall a. a -> Maybe a
Just object
object) Map objectId (Maybe object)
accMap)
(InboundSt objectId object -> Map objectId (Maybe object)
forall objectId object.
InboundSt objectId object -> Map objectId (Maybe object)
pendingObjects InboundSt objectId object
st)
[object]
collectedObjects
(StrictSeq objectId
objectIdsToAck, StrictSeq objectId
outstandingFifo') =
(objectId -> Bool)
-> StrictSeq objectId -> (StrictSeq objectId, StrictSeq objectId)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (objectId -> Map objectId (Maybe object) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map objectId (Maybe object)
pendingObjects') (InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st)
pendingObjects'' :: Map objectId (Maybe object)
pendingObjects'' =
(Map objectId (Maybe object)
-> objectId -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> StrictSeq objectId
-> Map objectId (Maybe object)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl'
((objectId
-> Map objectId (Maybe object) -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> objectId
-> Map objectId (Maybe object)
forall a b c. (a -> b -> c) -> b -> a -> c
flip objectId
-> Map objectId (Maybe object) -> Map objectId (Maybe object)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete)
Map objectId (Maybe object)
pendingObjects'
StrictSeq objectId
objectIdsToAck
objectsToAck :: [object]
objectsToAck =
[Maybe object] -> [object]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe object] -> [object]) -> [Maybe object] -> [object]
forall a b. (a -> b) -> a -> b
$
((Map objectId (Maybe object) -> objectId -> Maybe object
forall k a. Ord k => Map k a -> k -> a
(Map.!) Map objectId (Maybe object)
pendingObjects') (objectId -> Maybe object) -> [objectId] -> [Maybe object]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictSeq objectId -> [objectId]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq objectId
objectIdsToAck)
[object] -> m ()
opwAddObjects [object]
objectsToAck
Tracer m (TraceObjectDiffusionInbound objectId object)
-> TraceObjectDiffusionInbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionInbound objectId object)
tracer (TraceObjectDiffusionInbound objectId object -> m ())
-> TraceObjectDiffusionInbound objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
NumObjectsProcessed -> TraceObjectDiffusionInbound objectId object
forall objectId object.
NumObjectsProcessed -> TraceObjectDiffusionInbound objectId object
TraceObjectDiffusionInboundAddedObjects
(Word64 -> NumObjectsProcessed
NumObjectsProcessed (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word64) -> Int -> Word64
forall a b. (a -> b) -> a -> b
$ [object] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [object]
objectsToAck))
let !st' :: InboundSt objectId object
st' =
InboundSt objectId object
st
{ pendingObjects = pendingObjects''
, outstandingFifo = outstandingFifo'
, numToAckOnNextReq =
numToAckOnNextReq st
+ fromIntegral (Seq.length objectIdsToAck)
}
InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ()))
-> InboundStIdle n objectId object m ()
-> m (InboundStIdle n objectId object m ())
forall a b. (a -> b) -> a -> b
$! InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st' InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go Nat n
n
goReqObjectIdsBlocking ::
InboundSt objectId object ->
InboundStIdle 'Z objectId object m ()
goReqObjectIdsBlocking :: InboundSt objectId object -> InboundStIdle 'Z objectId object m ()
goReqObjectIdsBlocking !InboundSt objectId object
st =
let numIdsToRequest :: NumObjectIdsReq
numIdsToRequest = InboundSt objectId object -> NumObjectIdsReq
numIdsToReq InboundSt objectId object
st
!st' :: InboundSt objectId object
st' =
InboundSt objectId object
st
{ numToAckOnNextReq = 0
, numIdsInFlight = numIdsToRequest
}
in Bool
-> InboundStIdle 'Z objectId object m ()
-> InboundStIdle 'Z objectId object m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert
( InboundSt objectId object -> NumObjectIdsReq
forall objectId object.
InboundSt objectId object -> NumObjectIdsReq
numIdsInFlight InboundSt objectId object
st NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumObjectIdsReq
0
Bool -> Bool -> Bool
&& StrictSeq objectId -> Bool
forall a. StrictSeq a -> Bool
Seq.null (InboundSt objectId object -> StrictSeq objectId
forall objectId object.
InboundSt objectId object -> StrictSeq objectId
outstandingFifo InboundSt objectId object
st)
Bool -> Bool -> Bool
&& Set objectId -> Bool
forall a. Set a -> Bool
Set.null (InboundSt objectId object -> Set objectId
forall objectId object. InboundSt objectId object -> Set objectId
canRequestNext InboundSt objectId object
st)
Bool -> Bool -> Bool
&& Map objectId (Maybe object) -> Bool
forall k a. Map k a -> Bool
Map.null (InboundSt objectId object -> Map objectId (Maybe object)
forall objectId object.
InboundSt objectId object -> Map objectId (Maybe object)
pendingObjects InboundSt objectId object
st)
)
(InboundStIdle 'Z objectId object m ()
-> InboundStIdle 'Z objectId object m ())
-> InboundStIdle 'Z objectId object m ()
-> InboundStIdle 'Z objectId object m ()
forall a b. (a -> b) -> a -> b
$ NumObjectIdsAck
-> NumObjectIdsReq
-> (NonEmpty objectId -> InboundStIdle 'Z objectId object m ())
-> InboundStIdle 'Z objectId object m ()
forall objectId object (m :: * -> *) a.
NumObjectIdsAck
-> NumObjectIdsReq
-> (NonEmpty objectId -> InboundStIdle 'Z objectId object m a)
-> InboundStIdle 'Z objectId object m a
SendMsgRequestObjectIdsBlocking
(InboundSt objectId object -> NumObjectIdsAck
forall objectId object.
InboundSt objectId object -> NumObjectIdsAck
numToAckOnNextReq InboundSt objectId object
st)
NumObjectIdsReq
numIdsToRequest
( \NonEmpty objectId
neCollectedIds ->
InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st' InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle 'Z objectId object m ())
-> InboundStIdle 'Z objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat 'Z
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle 'Z objectId object m ()
forall (n :: N).
Nat n
-> Collect objectId object
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goCollect Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero (NumObjectIdsReq -> [objectId] -> Collect objectId object
forall objectId object.
NumObjectIdsReq -> [objectId] -> Collect objectId object
CollectObjectIds NumObjectIdsReq
numIdsToRequest (NonEmpty objectId -> [objectId]
forall a. NonEmpty a -> [a]
NonEmpty.toList NonEmpty objectId
neCollectedIds))
)
goReqObjectsAndObjectIdsPipelined ::
forall (n :: N).
Nat n ->
InboundSt objectId object ->
InboundStIdle n objectId object m ()
goReqObjectsAndObjectIdsPipelined :: forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectsAndObjectIdsPipelined Nat n
n !InboundSt objectId object
st =
let (Set objectId
toRequest, Set objectId
canRequestNext') =
Int -> Set objectId -> (Set objectId, Set objectId)
forall a. Int -> Set a -> (Set a, Set a)
Set.splitAt (NumObjectsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectsReq
maxNumObjectsToReq) (InboundSt objectId object -> Set objectId
forall objectId object. InboundSt objectId object -> Set objectId
canRequestNext InboundSt objectId object
st)
!st' :: InboundSt objectId object
st' = InboundSt objectId object
st{canRequestNext = canRequestNext'}
in [objectId]
-> InboundStIdle ('S n) objectId object m ()
-> InboundStIdle n objectId object m ()
forall objectId (n :: N) object (m :: * -> *) a.
[objectId]
-> InboundStIdle ('S n) objectId object m a
-> InboundStIdle n objectId object m a
SendMsgRequestObjectsPipelined
(Set objectId -> [objectId]
forall a. Set a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Set objectId
toRequest)
(InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st' InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle ('S n) objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat ('S n)
-> InboundSt objectId object
-> InboundStIdle ('S n) objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectIdsPipelined (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))
goReqObjectIdsPipelined ::
forall (n :: N).
Nat n ->
InboundSt objectId object ->
InboundStIdle n objectId object m ()
goReqObjectIdsPipelined :: forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
goReqObjectIdsPipelined Nat n
n !InboundSt objectId object
st =
let numIdsToRequest :: NumObjectIdsReq
numIdsToRequest = InboundSt objectId object -> NumObjectIdsReq
numIdsToReq InboundSt objectId object
st
in if NumObjectIdsReq
numIdsToRequest NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Ord a => a -> a -> Bool
<= NumObjectIdsReq
0
then InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle n objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go Nat n
n
else
let !st' :: InboundSt objectId object
st' =
InboundSt objectId object
st
{ numIdsInFlight =
numIdsInFlight st
+ numIdsToRequest
, numToAckOnNextReq = 0
}
in NumObjectIdsAck
-> NumObjectIdsReq
-> InboundStIdle ('S n) objectId object m ()
-> InboundStIdle n objectId object m ()
forall (n :: N) objectId object (m :: * -> *) a.
NumObjectIdsAck
-> NumObjectIdsReq
-> InboundStIdle ('S n) objectId object m a
-> InboundStIdle n objectId object m a
SendMsgRequestObjectIdsPipelined
(InboundSt objectId object -> NumObjectIdsAck
forall objectId object.
InboundSt objectId object -> NumObjectIdsAck
numToAckOnNextReq InboundSt objectId object
st)
NumObjectIdsReq
numIdsToRequest
(InboundSt objectId object -> InboundSt objectId object
forall s. NoThunks s => s -> s
checkState InboundSt objectId object
st' InboundSt objectId object
-> (InboundSt objectId object
-> InboundStIdle ('S n) objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall a b. a -> (a -> b) -> b
& Nat ('S n)
-> InboundSt objectId object
-> InboundStIdle ('S n) objectId object m ()
forall (n :: N).
Nat n
-> InboundSt objectId object
-> InboundStIdle n objectId object m ()
go (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))
terminateAfterDrain ::
Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain :: forall (n :: N). Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain = \case
Nat n
Zero -> () -> InboundStIdle 'Z objectId object m ()
forall a objectId object (m :: * -> *).
a -> InboundStIdle 'Z objectId object m a
SendMsgDone ()
Succ Nat n
n -> Maybe (InboundStIdle ('S n) objectId object m ())
-> (Collect objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall (n1 :: N) objectId object (m :: * -> *) a.
Maybe (InboundStIdle ('S n1) objectId object m a)
-> (Collect objectId object
-> InboundStIdle n1 objectId object m a)
-> InboundStIdle ('S n1) objectId object m a
CollectPipelined Maybe (InboundStIdle ('S n) objectId object m ())
forall a. Maybe a
Nothing ((Collect objectId object -> InboundStIdle n objectId object m ())
-> InboundStIdle ('S n) objectId object m ())
-> (Collect objectId object
-> InboundStIdle n objectId object m ())
-> InboundStIdle ('S n) objectId object m ()
forall a b. (a -> b) -> a -> b
$ \Collect objectId object
_ignoredMsg -> Nat n -> InboundStIdle n objectId object m ()
forall (n :: N). Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain Nat n
n
checkState :: NoThunks s => s -> s
checkState :: forall s. NoThunks s => s -> s
checkState !s
st = Maybe String -> s -> s
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (s -> Maybe String
forall a. NoThunks a => a -> Maybe String
noThunksInvariant s
st) s
st