{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound
( objectDiffusionOutbound
, TraceObjectDiffusionOutbound (..)
, ObjectDiffusionOutboundError (..)
) where
import Cardano.Network.NodeToNode.Version (NodeToNodeVersion)
import Control.Monad (join, unless, when)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, traceWith)
import Data.List.NonEmpty qualified as NonEmpty
import Data.Map qualified as Map
import Data.Maybe (fromMaybe)
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as Seq
import Data.Set qualified as Set
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound
import Ouroboros.Network.Protocol.ObjectDiffusion.Type
data TraceObjectDiffusionOutbound objectId object
= TraceObjectDiffusionOutboundRecvMsgRequestObjectIds NumObjectIdsReq
|
TraceObjectDiffusionOutboundSendMsgReplyObjectIds [objectId]
|
TraceObjectDiffusionOutboundRecvMsgRequestObjects
[objectId]
|
TraceObjectDiffusionOutboundSendMsgReplyObjects
[object]
|
TraceObjectDiffusionOutboundTerminated
deriving Int -> TraceObjectDiffusionOutbound objectId object -> ShowS
[TraceObjectDiffusionOutbound objectId object] -> ShowS
TraceObjectDiffusionOutbound objectId object -> String
(Int -> TraceObjectDiffusionOutbound objectId object -> ShowS)
-> (TraceObjectDiffusionOutbound objectId object -> String)
-> ([TraceObjectDiffusionOutbound objectId object] -> ShowS)
-> Show (TraceObjectDiffusionOutbound objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
(Show objectId, Show object) =>
Int -> TraceObjectDiffusionOutbound objectId object -> ShowS
forall objectId object.
(Show objectId, Show object) =>
[TraceObjectDiffusionOutbound objectId object] -> ShowS
forall objectId object.
(Show objectId, Show object) =>
TraceObjectDiffusionOutbound objectId object -> String
$cshowsPrec :: forall objectId object.
(Show objectId, Show object) =>
Int -> TraceObjectDiffusionOutbound objectId object -> ShowS
showsPrec :: Int -> TraceObjectDiffusionOutbound objectId object -> ShowS
$cshow :: forall objectId object.
(Show objectId, Show object) =>
TraceObjectDiffusionOutbound objectId object -> String
show :: TraceObjectDiffusionOutbound objectId object -> String
$cshowList :: forall objectId object.
(Show objectId, Show object) =>
[TraceObjectDiffusionOutbound objectId object] -> ShowS
showList :: [TraceObjectDiffusionOutbound objectId object] -> ShowS
Show
data ObjectDiffusionOutboundError
= ProtocolErrorAckedTooManyObjectIds
| ProtocolErrorRequestedNothing
| ProtocolErrorRequestedTooManyObjectIds NumObjectIdsReq NumObjectsUnacknowledged
| ProtocolErrorRequestBlocking
| ProtocolErrorRequestNonBlocking
| ProtocolErrorRequestedUnavailableObject
| ProtocolErrorRequestedDuplicateObject
deriving Int -> ObjectDiffusionOutboundError -> ShowS
[ObjectDiffusionOutboundError] -> ShowS
ObjectDiffusionOutboundError -> String
(Int -> ObjectDiffusionOutboundError -> ShowS)
-> (ObjectDiffusionOutboundError -> String)
-> ([ObjectDiffusionOutboundError] -> ShowS)
-> Show ObjectDiffusionOutboundError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ObjectDiffusionOutboundError -> ShowS
showsPrec :: Int -> ObjectDiffusionOutboundError -> ShowS
$cshow :: ObjectDiffusionOutboundError -> String
show :: ObjectDiffusionOutboundError -> String
$cshowList :: [ObjectDiffusionOutboundError] -> ShowS
showList :: [ObjectDiffusionOutboundError] -> ShowS
Show
instance Exception ObjectDiffusionOutboundError where
displayException :: ObjectDiffusionOutboundError -> String
displayException ObjectDiffusionOutboundError
ProtocolErrorAckedTooManyObjectIds =
String
"The peer tried to acknowledged more objectIds than are available to do so."
displayException (ProtocolErrorRequestedTooManyObjectIds NumObjectIdsReq
reqNo NumObjectsUnacknowledged
maxUnacked) =
String
"The peer requested "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ NumObjectIdsReq -> String
forall a. Show a => a -> String
show NumObjectIdsReq
reqNo
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" objectIds which would put the "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"total in flight over the limit of "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ NumObjectsUnacknowledged -> String
forall a. Show a => a -> String
show NumObjectsUnacknowledged
maxUnacked
displayException ObjectDiffusionOutboundError
ProtocolErrorRequestedNothing =
String
"The peer requested zero objectIds."
displayException ObjectDiffusionOutboundError
ProtocolErrorRequestBlocking =
String
"The peer made a blocking request for more objectIds when there are still "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"unacknowledged objectIds. It should have used a non-blocking request."
displayException ObjectDiffusionOutboundError
ProtocolErrorRequestNonBlocking =
String
"The peer made a non-blocking request for more objectIds when there are "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"no unacknowledged objectIds. It should have used a blocking request."
displayException ObjectDiffusionOutboundError
ProtocolErrorRequestedUnavailableObject =
String
"The peer requested an object which is not available, either "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"because it was never available or because it was previously requested."
displayException ObjectDiffusionOutboundError
ProtocolErrorRequestedDuplicateObject =
String
"The peer requested the same object twice."
data OutboundSt objectId object ticketNo = OutboundSt
{ forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> StrictSeq object
outstandingFifo :: !(StrictSeq object)
, forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> ticketNo
lastTicketNo :: !ticketNo
}
objectDiffusionOutbound ::
forall objectId object ticketNo m.
(Ord objectId, MonadSTM m, MonadThrow m) =>
Tracer m (TraceObjectDiffusionOutbound objectId object) ->
NumObjectsUnacknowledged ->
ObjectPoolReader objectId object ticketNo m ->
NodeToNodeVersion ->
ObjectDiffusionOutbound objectId object m ()
objectDiffusionOutbound :: forall objectId object ticketNo (m :: * -> *).
(Ord objectId, MonadSTM m, MonadThrow m) =>
Tracer m (TraceObjectDiffusionOutbound objectId object)
-> NumObjectsUnacknowledged
-> ObjectPoolReader objectId object ticketNo m
-> NodeToNodeVersion
-> ObjectDiffusionOutbound objectId object m ()
objectDiffusionOutbound Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer NumObjectsUnacknowledged
maxFifoLength ObjectPoolReader{ticketNo
object -> objectId
ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprObjectId :: object -> objectId
oprZeroTicketNo :: ticketNo
oprObjectsAfter :: ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprObjectId :: forall objectId object ticketNo (m :: * -> *).
ObjectPoolReader objectId object ticketNo m -> object -> objectId
oprObjectsAfter :: forall objectId object ticketNo (m :: * -> *).
ObjectPoolReader objectId object ticketNo m
-> ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprZeroTicketNo :: forall objectId object ticketNo (m :: * -> *).
ObjectPoolReader objectId object ticketNo m -> ticketNo
..} NodeToNodeVersion
_version =
m (OutboundStIdle objectId object m ())
-> ObjectDiffusionOutbound objectId object m ()
forall objectId object (m :: * -> *) a.
m (OutboundStIdle objectId object m a)
-> ObjectDiffusionOutbound objectId object m a
ObjectDiffusionOutbound (OutboundStIdle objectId object m ()
-> m (OutboundStIdle objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (OutboundSt objectId object ticketNo
-> OutboundStIdle objectId object m ()
makeBundle (OutboundSt objectId object ticketNo
-> OutboundStIdle objectId object m ())
-> OutboundSt objectId object ticketNo
-> OutboundStIdle objectId object m ()
forall a b. (a -> b) -> a -> b
$ StrictSeq object -> ticketNo -> OutboundSt objectId object ticketNo
forall objectId object ticketNo.
StrictSeq object -> ticketNo -> OutboundSt objectId object ticketNo
OutboundSt StrictSeq object
forall a. StrictSeq a
Seq.empty ticketNo
oprZeroTicketNo))
where
makeBundle :: OutboundSt objectId object ticketNo -> OutboundStIdle objectId object m ()
makeBundle :: OutboundSt objectId object ticketNo
-> OutboundStIdle objectId object m ()
makeBundle !OutboundSt objectId object ticketNo
st =
OutboundStIdle
{ recvMsgRequestObjectIds :: forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds = OutboundSt objectId object ticketNo
-> SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
forall (blocking :: StBlockingStyle).
OutboundSt objectId object ticketNo
-> SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds OutboundSt objectId object ticketNo
st
, recvMsgRequestObjects :: [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects = OutboundSt objectId object ticketNo
-> [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects OutboundSt objectId object ticketNo
st
, recvMsgDone :: m ()
recvMsgDone = Tracer m (TraceObjectDiffusionOutbound objectId object)
-> TraceObjectDiffusionOutbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer TraceObjectDiffusionOutbound objectId object
forall objectId object.
TraceObjectDiffusionOutbound objectId object
TraceObjectDiffusionOutboundTerminated
}
updateStNewObjects ::
OutboundSt objectId object ticketNo ->
[(ticketNo, object)] ->
OutboundSt objectId object ticketNo
updateStNewObjects :: OutboundSt objectId object ticketNo
-> [(ticketNo, object)] -> OutboundSt objectId object ticketNo
updateStNewObjects !OutboundSt{ticketNo
StrictSeq object
outstandingFifo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> StrictSeq object
lastTicketNo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> ticketNo
outstandingFifo :: StrictSeq object
lastTicketNo :: ticketNo
..} [(ticketNo, object)]
sortedNewContent =
let !outstandingFifo' :: StrictSeq object
outstandingFifo' =
StrictSeq object
outstandingFifo
StrictSeq object -> StrictSeq object -> StrictSeq object
forall a. Semigroup a => a -> a -> a
<> ([object] -> StrictSeq object
forall a. [a] -> StrictSeq a
Seq.fromList ([object] -> StrictSeq object) -> [object] -> StrictSeq object
forall a b. (a -> b) -> a -> b
$ (ticketNo, object) -> object
forall a b. (a, b) -> b
snd ((ticketNo, object) -> object) -> [(ticketNo, object)] -> [object]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ticketNo, object)]
sortedNewContent)
!lastTicketNo' :: ticketNo
lastTicketNo'
| [(ticketNo, object)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(ticketNo, object)]
sortedNewContent = ticketNo
lastTicketNo
| Bool
otherwise = (ticketNo, object) -> ticketNo
forall a b. (a, b) -> a
fst ((ticketNo, object) -> ticketNo) -> (ticketNo, object) -> ticketNo
forall a b. (a -> b) -> a -> b
$ [(ticketNo, object)] -> (ticketNo, object)
forall a. HasCallStack => [a] -> a
last [(ticketNo, object)]
sortedNewContent
in OutboundSt
{ outstandingFifo :: StrictSeq object
outstandingFifo = StrictSeq object
outstandingFifo'
, lastTicketNo :: ticketNo
lastTicketNo = ticketNo
lastTicketNo'
}
recvMsgRequestObjectIds ::
forall blocking.
OutboundSt objectId object ticketNo ->
SingBlockingStyle blocking ->
NumObjectIdsAck ->
NumObjectIdsReq ->
m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds :: forall (blocking :: StBlockingStyle).
OutboundSt objectId object ticketNo
-> SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds !st :: OutboundSt objectId object ticketNo
st@OutboundSt{ticketNo
StrictSeq object
outstandingFifo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> StrictSeq object
lastTicketNo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> ticketNo
outstandingFifo :: StrictSeq object
lastTicketNo :: ticketNo
..} SingBlockingStyle blocking
blocking NumObjectIdsAck
numIdsToAck NumObjectIdsReq
numIdsToReq = do
Tracer m (TraceObjectDiffusionOutbound objectId object)
-> TraceObjectDiffusionOutbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer (NumObjectIdsReq -> TraceObjectDiffusionOutbound objectId object
forall objectId object.
NumObjectIdsReq -> TraceObjectDiffusionOutbound objectId object
TraceObjectDiffusionOutboundRecvMsgRequestObjectIds NumObjectIdsReq
numIdsToReq)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumObjectIdsAck
numIdsToAck NumObjectIdsAck -> NumObjectIdsAck -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> NumObjectIdsAck
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq object -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq object
outstandingFifo)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorAckedTooManyObjectIds
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when
( StrictSeq object -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq object
outstandingFifo
Int -> Int -> Int
forall a. Num a => a -> a -> a
- NumObjectIdsAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsAck
numIdsToAck
Int -> Int -> Int
forall a. Num a => a -> a -> a
+ NumObjectIdsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsToReq
Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> NumObjectsUnacknowledged -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectsUnacknowledged
maxFifoLength
)
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (NumObjectIdsReq
-> NumObjectsUnacknowledged -> ObjectDiffusionOutboundError
ProtocolErrorRequestedTooManyObjectIds NumObjectIdsReq
numIdsToReq NumObjectsUnacknowledged
maxFifoLength)
let !outstandingFifo' :: StrictSeq object
outstandingFifo' = Int -> StrictSeq object -> StrictSeq object
forall a. Int -> StrictSeq a -> StrictSeq a
Seq.drop (NumObjectIdsAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsAck
numIdsToAck) StrictSeq object
outstandingFifo
st' :: OutboundSt objectId object ticketNo
!st' :: OutboundSt objectId object ticketNo
st' = OutboundSt objectId object ticketNo
st{outstandingFifo = outstandingFifo'}
case SingBlockingStyle blocking
blocking of
SingBlockingStyle blocking
SingBlocking -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumObjectIdsReq
numIdsToReq NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumObjectIdsReq
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestedNothing
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq object -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq object
outstandingFifo') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestBlocking
let getNewContent :: m (Map ticketNo object)
getNewContent = do
content <- m (m (Map ticketNo object)) -> m (Map ticketNo object)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Map ticketNo object)) -> m (Map ticketNo object))
-> (STM m (m (Map ticketNo object)) -> m (m (Map ticketNo object)))
-> STM m (m (Map ticketNo object))
-> m (Map ticketNo object)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM m (m (Map ticketNo object)) -> m (m (Map ticketNo object))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (m (Map ticketNo object)) -> m (Map ticketNo object))
-> STM m (m (Map ticketNo object)) -> m (Map ticketNo object)
forall a b. (a -> b) -> a -> b
$ do
maybeNewObjectsAction <-
ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprObjectsAfter
ticketNo
lastTicketNo
(NumObjectIdsReq -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsToReq)
case maybeNewObjectsAction of
Maybe (m (Map ticketNo object))
Nothing -> STM m (m (Map ticketNo object))
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
Just m (Map ticketNo object)
newObjectsAction -> m (Map ticketNo object) -> STM m (m (Map ticketNo object))
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure m (Map ticketNo object)
newObjectsAction
if null content then getNewContent else pure content
sortedNewContent <- Map ticketNo object -> [(ticketNo, object)]
forall k a. Map k a -> [(k, a)]
Map.toAscList (Map ticketNo object -> [(ticketNo, object)])
-> m (Map ticketNo object) -> m [(ticketNo, object)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Map ticketNo object)
getNewContent
let !newIds = object -> objectId
oprObjectId (object -> objectId)
-> ((ticketNo, object) -> object) -> (ticketNo, object) -> objectId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ticketNo, object) -> object
forall a b. (a, b) -> b
snd ((ticketNo, object) -> objectId)
-> [(ticketNo, object)] -> [objectId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ticketNo, object)]
sortedNewContent
st'' = OutboundSt objectId object ticketNo
-> [(ticketNo, object)] -> OutboundSt objectId object ticketNo
updateStNewObjects OutboundSt objectId object ticketNo
st' [(ticketNo, object)]
sortedNewContent
traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds)
pure $
SendMsgReplyObjectIds
(BlockingReply (NonEmpty.fromList $ newIds))
(makeBundle st'')
SingBlockingStyle blocking
SingNonBlocking -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumObjectIdsReq
numIdsToReq NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumObjectIdsReq
0 Bool -> Bool -> Bool
&& NumObjectIdsAck
numIdsToAck NumObjectIdsAck -> NumObjectIdsAck -> Bool
forall a. Eq a => a -> a -> Bool
== NumObjectIdsAck
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestedNothing
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (StrictSeq object -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq object
outstandingFifo') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestNonBlocking
let getNewContent :: m (Map ticketNo object)
getNewContent = m (m (Map ticketNo object)) -> m (Map ticketNo object)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Map ticketNo object)) -> m (Map ticketNo object))
-> (STM m (m (Map ticketNo object)) -> m (m (Map ticketNo object)))
-> STM m (m (Map ticketNo object))
-> m (Map ticketNo object)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM m (m (Map ticketNo object)) -> m (m (Map ticketNo object))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (m (Map ticketNo object)) -> m (Map ticketNo object))
-> STM m (m (Map ticketNo object)) -> m (Map ticketNo object)
forall a b. (a -> b) -> a -> b
$ do
maybeNewObjectsAction <-
ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object)))
oprObjectsAfter
ticketNo
lastTicketNo
(NumObjectIdsReq -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
numIdsToReq)
pure $ fromMaybe (pure Map.empty) maybeNewObjectsAction
sortedNewContent <- Map ticketNo object -> [(ticketNo, object)]
forall k a. Map k a -> [(k, a)]
Map.toAscList (Map ticketNo object -> [(ticketNo, object)])
-> m (Map ticketNo object) -> m [(ticketNo, object)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Map ticketNo object)
getNewContent
let !newIds = object -> objectId
oprObjectId (object -> objectId)
-> ((ticketNo, object) -> object) -> (ticketNo, object) -> objectId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ticketNo, object) -> object
forall a b. (a, b) -> b
snd ((ticketNo, object) -> objectId)
-> [(ticketNo, object)] -> [objectId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ticketNo, object)]
sortedNewContent
st'' = OutboundSt objectId object ticketNo
-> [(ticketNo, object)] -> OutboundSt objectId object ticketNo
updateStNewObjects OutboundSt objectId object ticketNo
st' [(ticketNo, object)]
sortedNewContent
traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds)
pure (SendMsgReplyObjectIds (NonBlockingReply newIds) (makeBundle st''))
recvMsgRequestObjects ::
OutboundSt objectId object ticketNo ->
[objectId] ->
m (OutboundStObjects objectId object m ())
recvMsgRequestObjects :: OutboundSt objectId object ticketNo
-> [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects !st :: OutboundSt objectId object ticketNo
st@OutboundSt{ticketNo
StrictSeq object
outstandingFifo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> StrictSeq object
lastTicketNo :: forall objectId object ticketNo.
OutboundSt objectId object ticketNo -> ticketNo
outstandingFifo :: StrictSeq object
lastTicketNo :: ticketNo
..} [objectId]
requestedIds = do
Tracer m (TraceObjectDiffusionOutbound objectId object)
-> TraceObjectDiffusionOutbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer ([objectId] -> TraceObjectDiffusionOutbound objectId object
forall objectId object.
[objectId] -> TraceObjectDiffusionOutbound objectId object
TraceObjectDiffusionOutboundRecvMsgRequestObjects [objectId]
requestedIds)
let requestedIdsSet :: Set objectId
requestedIdsSet = [objectId] -> Set objectId
forall a. Ord a => [a] -> Set a
Set.fromList [objectId]
requestedIds
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set objectId -> Int
forall a. Set a -> Int
Set.size Set objectId
requestedIdsSet Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= [objectId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [objectId]
requestedIds) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestedDuplicateObject
let requestedObjects :: [object]
requestedObjects =
(object -> [object] -> [object])
-> [object] -> StrictSeq object -> [object]
forall a b. (a -> b -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
( \object
obj [object]
acc ->
if objectId -> Set objectId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (object -> objectId
oprObjectId object
obj) Set objectId
requestedIdsSet
then object
obj object -> [object] -> [object]
forall a. a -> [a] -> [a]
: [object]
acc
else [object]
acc
)
[]
StrictSeq object
outstandingFifo
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set objectId -> Int
forall a. Set a -> Int
Set.size Set objectId
requestedIdsSet Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= [object] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [object]
requestedObjects) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
ObjectDiffusionOutboundError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO ObjectDiffusionOutboundError
ProtocolErrorRequestedUnavailableObject
Tracer m (TraceObjectDiffusionOutbound objectId object)
-> TraceObjectDiffusionOutbound objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionOutbound objectId object)
tracer ([object] -> TraceObjectDiffusionOutbound objectId object
forall objectId object.
[object] -> TraceObjectDiffusionOutbound objectId object
TraceObjectDiffusionOutboundSendMsgReplyObjects [object]
requestedObjects)
OutboundStObjects objectId object m ()
-> m (OutboundStObjects objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([object]
-> OutboundStIdle objectId object m ()
-> OutboundStObjects objectId object m ()
forall object objectId (m :: * -> *) a.
[object]
-> OutboundStIdle objectId object m a
-> OutboundStObjects objectId object m a
SendMsgReplyObjects [object]
requestedObjects (OutboundSt objectId object ticketNo
-> OutboundStIdle objectId object m ()
makeBundle OutboundSt objectId object ticketNo
st))