{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DisambiguateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Ouroboros.Consensus.Storage.ImmutableDB.Impl.Parser (
BlockSummary (..)
, ChunkFileError (..)
, parseChunkFile
) where
import Codec.CBOR.Decoding (Decoder)
import Data.Bifunctor (first)
import qualified Data.ByteString.Lazy as Lazy
import Data.Functor ((<&>))
import Data.Word (Word64)
import Ouroboros.Consensus.Block hiding (headerHash)
import Ouroboros.Consensus.Storage.Common
import qualified Ouroboros.Consensus.Storage.ImmutableDB.Impl.Index.Secondary as Secondary
import Ouroboros.Consensus.Storage.ImmutableDB.Impl.Types
import Ouroboros.Consensus.Storage.Serialisation (DecodeDisk (..),
HasBinaryBlockInfo (..))
import Ouroboros.Consensus.Util.CBOR (withStreamIncrementalOffsets)
import Ouroboros.Consensus.Util.IOLike
import qualified Streaming as S
import Streaming (Of, Stream)
import qualified Streaming.Prelude as S
import System.FS.API (HasFS)
import System.FS.API.Types (FsPath)
import System.FS.CRC
data BlockSummary blk = BlockSummary {
forall blk. BlockSummary blk -> Entry blk
summaryEntry :: !(Secondary.Entry blk)
, forall blk. BlockSummary blk -> BlockNo
summaryBlockNo :: !BlockNo
, forall blk. BlockSummary blk -> SlotNo
summarySlotNo :: !SlotNo
}
parseChunkFile ::
forall m blk h r.
( IOLike m
, GetPrevHash blk
, HasBinaryBlockInfo blk
, DecodeDisk blk (Lazy.ByteString -> blk)
)
=> CodecConfig blk
-> HasFS m h
-> (blk -> Bool)
-> FsPath
-> [CRC]
-> ( Stream (Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> m r
)
-> m r
parseChunkFile :: forall (m :: * -> *) blk h r.
(IOLike m, GetPrevHash blk, HasBinaryBlockInfo blk,
DecodeDisk blk (ByteString -> blk)) =>
CodecConfig blk
-> HasFS m h
-> (blk -> Bool)
-> FsPath
-> [CRC]
-> (Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> m r)
-> m r
parseChunkFile CodecConfig blk
ccfg HasFS m h
hasFS blk -> Bool
isNotCorrupt FsPath
fsPath [CRC]
expectedChecksums Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> m r
k =
HasFS m h
-> (forall s. Decoder s (ByteString -> (blk, CRC)))
-> FsPath
-> (Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ReadIncrementalErr, Word64))
-> m r)
-> m r
forall (m :: * -> *) h a r.
(IOLike m, HasCallStack) =>
HasFS m h
-> (forall s. Decoder s (ByteString -> a))
-> FsPath
-> (Stream
(Of (Word64, (Word64, a))) m (Maybe (ReadIncrementalErr, Word64))
-> m r)
-> m r
withStreamIncrementalOffsets HasFS m h
hasFS Decoder s (ByteString -> (blk, CRC))
forall s. Decoder s (ByteString -> (blk, CRC))
decoder FsPath
fsPath
( Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> m r
k
(Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> m r)
-> (Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ReadIncrementalErr, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64)))
-> Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ReadIncrementalErr, Word64))
-> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
checkIfHashesLineUp
(Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64)))
-> (Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ReadIncrementalErr, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64)))
-> Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ReadIncrementalErr, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [CRC]
-> Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
checkEntries [CRC]
expectedChecksums
(Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64)))
-> (Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ReadIncrementalErr, Word64))
-> Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ChunkFileError blk, Word64)))
-> Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ReadIncrementalErr, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (ReadIncrementalErr, Word64)
-> Maybe (ChunkFileError blk, Word64))
-> Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ReadIncrementalErr, Word64))
-> Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ChunkFileError blk, Word64))
forall a b.
(a -> b)
-> Stream (Of (Word64, (Word64, (blk, CRC)))) m a
-> Stream (Of (Word64, (Word64, (blk, CRC)))) m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((ReadIncrementalErr, Word64) -> (ChunkFileError blk, Word64))
-> Maybe (ReadIncrementalErr, Word64)
-> Maybe (ChunkFileError blk, Word64)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((ReadIncrementalErr -> ChunkFileError blk)
-> (ReadIncrementalErr, Word64) -> (ChunkFileError blk, Word64)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first ReadIncrementalErr -> ChunkFileError blk
forall blk. ReadIncrementalErr -> ChunkFileError blk
ChunkErrRead))
)
where
decoder :: forall s. Decoder s (Lazy.ByteString -> (blk, CRC))
decoder :: forall s. Decoder s (ByteString -> (blk, CRC))
decoder = CodecConfig blk -> forall s. Decoder s (ByteString -> blk)
forall blk a.
DecodeDisk blk a =>
CodecConfig blk -> forall s. Decoder s a
decodeDisk CodecConfig blk
ccfg Decoder s (ByteString -> blk)
-> ((ByteString -> blk) -> ByteString -> (blk, CRC))
-> Decoder s (ByteString -> (blk, CRC))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ByteString -> blk
mkBlk ByteString
bs ->
let !blk :: blk
blk = ByteString -> blk
mkBlk ByteString
bs
!checksum :: CRC
checksum = ByteString -> CRC
forall a. CRC32 a => a -> CRC
computeCRC ByteString
bs
in (blk
blk, CRC
checksum)
checkEntries
:: [CRC]
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream (Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
checkEntries :: [CRC]
-> Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
checkEntries = \[CRC]
expected -> [CRC]
-> ([CRC]
-> (Word64, (Word64, (blk, CRC)))
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), [CRC]))
-> Stream
(Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
forall (m :: * -> *) s a r b.
Monad m =>
s
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS [CRC]
expected [CRC]
-> (Word64, (Word64, (blk, CRC)))
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), [CRC])
updateAcc
where
updateAcc
:: [CRC]
-> (Word64, (Word64, (blk, CRC)))
-> Either (Maybe (ChunkFileError blk, Word64))
( (BlockSummary blk, ChainHash blk)
, [CRC]
)
updateAcc :: [CRC]
-> (Word64, (Word64, (blk, CRC)))
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), [CRC])
updateAcc [CRC]
expected blkAndInfo :: (Word64, (Word64, (blk, CRC)))
blkAndInfo@(Word64
offset, (Word64
_, (blk
blk, CRC
checksum))) =
case [CRC]
expected of
CRC
expectedChecksum:[CRC]
expected'
| CRC
expectedChecksum CRC -> CRC -> Bool
forall a. Eq a => a -> a -> Bool
== CRC
checksum
-> ((BlockSummary blk, ChainHash blk), [CRC])
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), [CRC])
forall a b. b -> Either a b
Right ((BlockSummary blk, ChainHash blk)
entryAndPrevHash, [CRC]
expected')
[CRC]
_ | blk -> Bool
isNotCorrupt blk
blk
-> ((BlockSummary blk, ChainHash blk), [CRC])
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), [CRC])
forall a b. b -> Either a b
Right ((BlockSummary blk, ChainHash blk)
entryAndPrevHash, Int -> [CRC] -> [CRC]
forall a. Int -> [a] -> [a]
drop Int
1 [CRC]
expected)
| Bool
otherwise
-> Maybe (ChunkFileError blk, Word64)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), [CRC])
forall a b. a -> Either a b
Left (Maybe (ChunkFileError blk, Word64)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), [CRC]))
-> Maybe (ChunkFileError blk, Word64)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), [CRC])
forall a b. (a -> b) -> a -> b
$ (ChunkFileError blk, Word64) -> Maybe (ChunkFileError blk, Word64)
forall a. a -> Maybe a
Just (Point blk -> ChunkFileError blk
forall blk. Point blk -> ChunkFileError blk
ChunkErrCorrupt (blk -> Point blk
forall block. HasHeader block => block -> Point block
blockPoint blk
blk), Word64
offset)
where
entryAndPrevHash :: (BlockSummary blk, ChainHash blk)
entryAndPrevHash = (Word64, (Word64, (blk, CRC))) -> (BlockSummary blk, ChainHash blk)
entryForBlockAndInfo (Word64, (Word64, (blk, CRC)))
blkAndInfo
entryForBlockAndInfo
:: (Word64, (Word64, (blk, CRC)))
-> (BlockSummary blk, ChainHash blk)
entryForBlockAndInfo :: (Word64, (Word64, (blk, CRC))) -> (BlockSummary blk, ChainHash blk)
entryForBlockAndInfo (Word64
offset, (Word64
_size, (blk
blk, CRC
checksum))) =
(BlockSummary blk
blockSummary, ChainHash blk
prevHash)
where
!prevHash :: ChainHash blk
prevHash = blk -> ChainHash blk
forall blk. GetPrevHash blk => blk -> ChainHash blk
blockPrevHash blk
blk
entry :: Entry blk
entry = Secondary.Entry {
blockOffset :: BlockOffset
blockOffset = Word64 -> BlockOffset
Secondary.BlockOffset Word64
offset
, headerOffset :: HeaderOffset
headerOffset = Word16 -> HeaderOffset
Secondary.HeaderOffset Word16
headerOffset
, headerSize :: HeaderSize
headerSize = Word16 -> HeaderSize
Secondary.HeaderSize Word16
headerSize
, checksum :: CRC
checksum = CRC
checksum
, headerHash :: HeaderHash blk
headerHash = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash blk
blk
, blockOrEBB :: BlockOrEBB
blockOrEBB = case blk -> Maybe EpochNo
forall blk. GetHeader blk => blk -> Maybe EpochNo
blockIsEBB blk
blk of
Just EpochNo
epoch -> EpochNo -> BlockOrEBB
EBB EpochNo
epoch
Maybe EpochNo
Nothing -> SlotNo -> BlockOrEBB
Block (blk -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot blk
blk)
}
!blockSummary :: BlockSummary blk
blockSummary = BlockSummary {
summaryEntry :: Entry blk
summaryEntry = Entry blk
entry
, summaryBlockNo :: BlockNo
summaryBlockNo = blk -> BlockNo
forall b. HasHeader b => b -> BlockNo
blockNo blk
blk
, summarySlotNo :: SlotNo
summarySlotNo = blk -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot blk
blk
}
BinaryBlockInfo { Word16
headerOffset :: Word16
headerOffset :: BinaryBlockInfo -> Word16
headerOffset, Word16
headerSize :: Word16
headerSize :: BinaryBlockInfo -> Word16
headerSize } = blk -> BinaryBlockInfo
forall blk. HasBinaryBlockInfo blk => blk -> BinaryBlockInfo
getBinaryBlockInfo blk
blk
checkIfHashesLineUp
:: Stream (Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream (Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
checkIfHashesLineUp :: Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
checkIfHashesLineUp = ((BlockSummary blk, ChainHash blk)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), HeaderHash blk))
-> (HeaderHash blk
-> (BlockSummary blk, ChainHash blk)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), HeaderHash blk))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
-> Stream
(Of (BlockSummary blk, ChainHash blk))
m
(Maybe (ChunkFileError blk, Word64))
forall (m :: * -> *) a b r s.
Monad m =>
(a -> Either r (b, s))
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS0 (BlockSummary blk, ChainHash blk)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), HeaderHash blk)
forall {blk} {b} {a}.
(BlockSummary blk, b)
-> Either a ((BlockSummary blk, b), HeaderHash blk)
checkFirst HeaderHash blk
-> (BlockSummary blk, ChainHash blk)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), HeaderHash blk)
forall {blk} {blk}.
StandardHash blk =>
HeaderHash blk
-> (BlockSummary blk, ChainHash blk)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), HeaderHash blk)
checkNext
where
checkFirst :: (BlockSummary blk, b)
-> Either a ((BlockSummary blk, b), HeaderHash blk)
checkFirst x :: (BlockSummary blk, b)
x@(BlockSummary { Entry blk
summaryEntry :: forall blk. BlockSummary blk -> Entry blk
summaryEntry :: Entry blk
summaryEntry }, b
_) =
((BlockSummary blk, b), HeaderHash blk)
-> Either a ((BlockSummary blk, b), HeaderHash blk)
forall a b. b -> Either a b
Right ((BlockSummary blk, b)
x, Entry blk -> HeaderHash blk
forall blk. Entry blk -> HeaderHash blk
Secondary.headerHash Entry blk
summaryEntry)
checkNext :: HeaderHash blk
-> (BlockSummary blk, ChainHash blk)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), HeaderHash blk)
checkNext HeaderHash blk
hashOfPrevBlock x :: (BlockSummary blk, ChainHash blk)
x@(BlockSummary { Entry blk
summaryEntry :: forall blk. BlockSummary blk -> Entry blk
summaryEntry :: Entry blk
summaryEntry }, ChainHash blk
prevHash)
| ChainHash blk
prevHash ChainHash blk -> ChainHash blk -> Bool
forall a. Eq a => a -> a -> Bool
== HeaderHash blk -> ChainHash blk
forall {k} (b :: k). HeaderHash b -> ChainHash b
BlockHash HeaderHash blk
hashOfPrevBlock
= ((BlockSummary blk, ChainHash blk), HeaderHash blk)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), HeaderHash blk)
forall a b. b -> Either a b
Right ((BlockSummary blk, ChainHash blk)
x, Entry blk -> HeaderHash blk
forall blk. Entry blk -> HeaderHash blk
Secondary.headerHash Entry blk
summaryEntry)
| Bool
otherwise
= Maybe (ChunkFileError blk, Word64)
-> Either
(Maybe (ChunkFileError blk, Word64))
((BlockSummary blk, ChainHash blk), HeaderHash blk)
forall a b. a -> Either a b
Left ((ChunkFileError blk, Word64) -> Maybe (ChunkFileError blk, Word64)
forall a. a -> Maybe a
Just (ChunkFileError blk
err, Word64
offset))
where
err :: ChunkFileError blk
err = HeaderHash blk -> ChainHash blk -> ChunkFileError blk
forall blk. HeaderHash blk -> ChainHash blk -> ChunkFileError blk
ChunkErrHashMismatch HeaderHash blk
hashOfPrevBlock ChainHash blk
prevHash
offset :: Word64
offset = BlockOffset -> Word64
Secondary.unBlockOffset (BlockOffset -> Word64) -> BlockOffset -> Word64
forall a b. (a -> b) -> a -> b
$ Entry blk -> BlockOffset
forall blk. Entry blk -> BlockOffset
Secondary.blockOffset Entry blk
summaryEntry
mapAccumS ::
Monad m
=> s
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS :: forall (m :: * -> *) s a r b.
Monad m =>
s
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS s
st0 s -> a -> Either r (b, s)
updateAcc = s -> Stream (Of a) m r -> Stream (Of b) m r
go s
st0
where
go :: s -> Stream (Of a) m r -> Stream (Of b) m r
go s
st Stream (Of a) m r
input = m (Either r (a, Stream (Of a) m r))
-> Stream (Of b) m (Either r (a, Stream (Of a) m r))
forall (m :: * -> *) a. Monad m => m a -> Stream (Of b) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
S.lift (Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
S.next Stream (Of a) m r
input) Stream (Of b) m (Either r (a, Stream (Of a) m r))
-> (Either r (a, Stream (Of a) m r) -> Stream (Of b) m r)
-> Stream (Of b) m r
forall a b.
Stream (Of b) m a -> (a -> Stream (Of b) m b) -> Stream (Of b) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left r
r -> r -> Stream (Of b) m r
forall a. a -> Stream (Of b) m a
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
Right (a
a, Stream (Of a) m r
input') -> case s -> a -> Either r (b, s)
updateAcc s
st a
a of
Left r
r -> r -> Stream (Of b) m r
forall a. a -> Stream (Of b) m a
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
Right (b
b, s
st') -> b -> Stream (Of b) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
S.yield b
b Stream (Of b) m () -> Stream (Of b) m r -> Stream (Of b) m r
forall a b.
Stream (Of b) m a -> Stream (Of b) m b -> Stream (Of b) m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> s -> Stream (Of a) m r -> Stream (Of b) m r
go s
st' Stream (Of a) m r
input'
mapAccumS0 ::
forall m a b r s. Monad m
=> (a -> Either r (b, s))
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS0 :: forall (m :: * -> *) a b r s.
Monad m =>
(a -> Either r (b, s))
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS0 a -> Either r (b, s)
initAcc s -> a -> Either r (b, s)
updateAcc = Maybe s
-> (Maybe s -> a -> Either r (b, Maybe s))
-> Stream (Of a) m r
-> Stream (Of b) m r
forall (m :: * -> *) s a r b.
Monad m =>
s
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS Maybe s
forall a. Maybe a
Nothing Maybe s -> a -> Either r (b, Maybe s)
updateAcc'
where
updateAcc' :: Maybe s -> a -> Either r (b, Maybe s)
updateAcc' :: Maybe s -> a -> Either r (b, Maybe s)
updateAcc' Maybe s
mbSt = ((b, s) -> (b, Maybe s))
-> Either r (b, s) -> Either r (b, Maybe s)
forall a b. (a -> b) -> Either r a -> Either r b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((s -> Maybe s) -> (b, s) -> (b, Maybe s)
forall a b. (a -> b) -> (b, a) -> (b, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap s -> Maybe s
forall a. a -> Maybe a
Just) (Either r (b, s) -> Either r (b, Maybe s))
-> (a -> Either r (b, s)) -> a -> Either r (b, Maybe s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Either r (b, s))
-> (s -> a -> Either r (b, s)) -> Maybe s -> a -> Either r (b, s)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe a -> Either r (b, s)
initAcc s -> a -> Either r (b, s)
updateAcc Maybe s
mbSt