{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Consensus.Storage.ImmutableDB.Stream (
NextItem (..)
, StreamAPI (..)
, streamAPI
, streamAPI'
, streamAll
) where
import Control.Monad.Except
import GHC.Stack
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.ImmutableDB hiding (streamAll)
import qualified Ouroboros.Consensus.Storage.ImmutableDB.API as ImmutableDB
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
data NextItem blk = NoMoreItems | NextItem blk
newtype StreamAPI m blk a = StreamAPI {
forall (m :: * -> *) blk a.
StreamAPI m blk a
-> forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter :: forall b. HasCallStack
=> Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b)
-> m b
}
streamAll ::
forall m blk e b a. (Monad m, HasCallStack)
=> StreamAPI m blk b
-> Point blk
-> (RealPoint blk -> e)
-> a
-> (b -> a -> m a)
-> ExceptT e m a
streamAll :: forall (m :: * -> *) blk e b a.
(Monad m, HasCallStack) =>
StreamAPI m blk b
-> Point blk
-> (RealPoint blk -> e)
-> a
-> (b -> a -> m a)
-> ExceptT e m a
streamAll StreamAPI{forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m b) -> m b
streamAfter :: forall (m :: * -> *) blk a.
StreamAPI m blk a
-> forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter :: forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m b) -> m b
..} Point blk
tip RealPoint blk -> e
notFound a
e b -> a -> m a
f = m (Either e a) -> ExceptT e m a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (m (Either e a) -> ExceptT e m a)
-> m (Either e a) -> ExceptT e m a
forall a b. (a -> b) -> a -> b
$
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m (Either e a))
-> m (Either e a)
forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem b)) -> m b) -> m b
streamAfter Point blk
tip ((Either (RealPoint blk) (m (NextItem b)) -> m (Either e a))
-> m (Either e a))
-> (Either (RealPoint blk) (m (NextItem b)) -> m (Either e a))
-> m (Either e a)
forall a b. (a -> b) -> a -> b
$ \case
Left RealPoint blk
tip' -> Either e a -> m (Either e a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either e a -> m (Either e a)) -> Either e a -> m (Either e a)
forall a b. (a -> b) -> a -> b
$ e -> Either e a
forall a b. a -> Either a b
Left (RealPoint blk -> e
notFound RealPoint blk
tip')
Right m (NextItem b)
getNext -> do
let go :: a -> m a
go :: a -> m a
go a
a = do NextItem b
mNext <- m (NextItem b)
getNext
case NextItem b
mNext of
NextItem b
NoMoreItems -> a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
NextItem b
b -> a -> m a
go (a -> m a) -> m a -> m a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< b -> a -> m a
f b
b a
a
a -> Either e a
forall a b. b -> Either a b
Right (a -> Either e a) -> m a -> m (Either e a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m a
go a
e
streamAPI ::
(IOLike m, HasHeader blk)
=> ImmutableDB m blk -> StreamAPI m blk blk
streamAPI :: forall (m :: * -> *) blk.
(IOLike m, HasHeader blk) =>
ImmutableDB m blk -> StreamAPI m blk blk
streamAPI = (blk -> m (NextItem blk))
-> BlockComponent blk blk
-> ImmutableDB m blk
-> StreamAPI m blk blk
forall (m :: * -> *) blk a.
(IOLike m, HasHeader blk) =>
(a -> m (NextItem a))
-> BlockComponent blk a -> ImmutableDB m blk -> StreamAPI m blk a
streamAPI' (NextItem blk -> m (NextItem blk)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (NextItem blk -> m (NextItem blk))
-> (blk -> NextItem blk) -> blk -> m (NextItem blk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. blk -> NextItem blk
forall blk. blk -> NextItem blk
NextItem) BlockComponent blk blk
forall blk. BlockComponent blk blk
GetBlock
streamAPI' ::
forall m blk a.
(IOLike m, HasHeader blk)
=> (a -> m (NextItem a))
-> BlockComponent blk a
-> ImmutableDB m blk
-> StreamAPI m blk a
streamAPI' :: forall (m :: * -> *) blk a.
(IOLike m, HasHeader blk) =>
(a -> m (NextItem a))
-> BlockComponent blk a -> ImmutableDB m blk -> StreamAPI m blk a
streamAPI' a -> m (NextItem a)
shouldStop BlockComponent blk a
blockComponent ImmutableDB m blk
immutableDB = (forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b)
-> StreamAPI m blk a
forall (m :: * -> *) blk a.
(forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b)
-> StreamAPI m blk a
StreamAPI Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
forall b.
HasCallStack =>
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
forall b.
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter
where
streamAfter :: Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b)
-> m b
streamAfter :: forall b.
Point blk
-> (Either (RealPoint blk) (m (NextItem a)) -> m b) -> m b
streamAfter Point blk
tip Either (RealPoint blk) (m (NextItem a)) -> m b
k = (ResourceRegistry m -> m b) -> m b
forall (m :: * -> *) a.
(IOLike m, HasCallStack) =>
(ResourceRegistry m -> m a) -> m a
withRegistry ((ResourceRegistry m -> m b) -> m b)
-> (ResourceRegistry m -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \ResourceRegistry m
registry -> do
Either (MissingBlock blk) (Iterator m blk a)
eItr <-
ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk a
-> Point blk
-> m (Either (MissingBlock blk) (Iterator m blk a))
forall (m :: * -> *) blk b.
(MonadSTM m, HasHeader blk, HasCallStack) =>
ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk b
-> Point blk
-> m (Either (MissingBlock blk) (Iterator m blk b))
ImmutableDB.streamAfterPoint
ImmutableDB m blk
immutableDB
ResourceRegistry m
registry
BlockComponent blk a
blockComponent
Point blk
tip
case Either (MissingBlock blk) (Iterator m blk a)
eItr of
Left MissingBlock blk
err -> Either (RealPoint blk) (m (NextItem a)) -> m b
k (Either (RealPoint blk) (m (NextItem a)) -> m b)
-> Either (RealPoint blk) (m (NextItem a)) -> m b
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> Either (RealPoint blk) (m (NextItem a))
forall a b. a -> Either a b
Left (RealPoint blk -> Either (RealPoint blk) (m (NextItem a)))
-> RealPoint blk -> Either (RealPoint blk) (m (NextItem a))
forall a b. (a -> b) -> a -> b
$ MissingBlock blk -> RealPoint blk
forall blk. MissingBlock blk -> RealPoint blk
ImmutableDB.missingBlockPoint MissingBlock blk
err
Right Iterator m blk a
itr -> Either (RealPoint blk) (m (NextItem a)) -> m b
k (Either (RealPoint blk) (m (NextItem a)) -> m b)
-> Either (RealPoint blk) (m (NextItem a)) -> m b
forall a b. (a -> b) -> a -> b
$ m (NextItem a) -> Either (RealPoint blk) (m (NextItem a))
forall a b. b -> Either a b
Right (m (NextItem a) -> Either (RealPoint blk) (m (NextItem a)))
-> m (NextItem a) -> Either (RealPoint blk) (m (NextItem a))
forall a b. (a -> b) -> a -> b
$ Iterator m blk a -> m (NextItem a)
streamUsing Iterator m blk a
itr
streamUsing :: ImmutableDB.Iterator m blk a
-> m (NextItem a)
streamUsing :: Iterator m blk a -> m (NextItem a)
streamUsing Iterator m blk a
itr = do
IteratorResult a
itrResult <- Iterator m blk a -> HasCallStack => m (IteratorResult a)
forall (m :: * -> *) blk b.
Iterator m blk b -> HasCallStack => m (IteratorResult b)
ImmutableDB.iteratorNext Iterator m blk a
itr
case IteratorResult a
itrResult of
IteratorResult a
ImmutableDB.IteratorExhausted -> NextItem a -> m (NextItem a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return NextItem a
forall blk. NextItem blk
NoMoreItems
ImmutableDB.IteratorResult a
b -> a -> m (NextItem a)
shouldStop a
b