{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Consensus.Storage.VolatileDB.Impl.Parser (
ParseError (..)
, ParsedBlockInfo (..)
, parseBlockFile
, extractBlockInfo
) where
import Data.Bifunctor (bimap)
import qualified Data.ByteString.Lazy as Lazy
import Data.Word (Word64)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Storage.Serialisation
import Ouroboros.Consensus.Storage.VolatileDB.API (BlockInfo (..))
import Ouroboros.Consensus.Storage.VolatileDB.Impl.Types
import Ouroboros.Consensus.Util.CBOR (ReadIncrementalErr (..),
withStreamIncrementalOffsets)
import Ouroboros.Consensus.Util.IOLike
import qualified Streaming.Prelude as S
import Streaming.Prelude (Of (..), Stream)
import System.FS.API (HasFS)
import System.FS.API.Types (FsPath)
data ParsedBlockInfo blk = ParsedBlockInfo {
forall blk. ParsedBlockInfo blk -> BlockOffset
pbiBlockOffset :: !BlockOffset
, forall blk. ParsedBlockInfo blk -> BlockSize
pbiBlockSize :: !BlockSize
, forall blk. ParsedBlockInfo blk -> BlockInfo blk
pbiBlockInfo :: !(BlockInfo blk)
, forall blk.
ParsedBlockInfo blk -> SomeSecond (NestedCtxt Header) blk
pbiNestedCtxt :: !(SomeSecond (NestedCtxt Header) blk)
}
parseBlockFile ::
forall m blk h.
( IOLike m
, GetPrevHash blk
, HasBinaryBlockInfo blk
, HasNestedContent Header blk
, DecodeDisk blk (Lazy.ByteString -> blk)
)
=> CodecConfig blk
-> HasFS m h
-> (blk -> Bool)
-> BlockValidationPolicy
-> FsPath
-> m ( [ParsedBlockInfo blk]
, Maybe (ParseError blk, BlockOffset)
)
parseBlockFile :: forall (m :: * -> *) blk h.
(IOLike m, GetPrevHash blk, HasBinaryBlockInfo blk,
HasNestedContent Header blk, DecodeDisk blk (ByteString -> blk)) =>
CodecConfig blk
-> HasFS m h
-> (blk -> Bool)
-> BlockValidationPolicy
-> FsPath
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
parseBlockFile CodecConfig blk
ccfg HasFS m h
hasFS blk -> Bool
isNotCorrupt BlockValidationPolicy
validationPolicy FsPath
fsPath =
HasFS m h
-> (forall s. Decoder s (ByteString -> blk))
-> FsPath
-> (Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) h a r.
(IOLike m, HasCallStack) =>
HasFS m h
-> (forall s. Decoder s (ByteString -> a))
-> FsPath
-> (Stream
(Of (Word64, (Word64, a))) m (Maybe (ReadIncrementalErr, Word64))
-> m r)
-> m r
withStreamIncrementalOffsets HasFS m h
hasFS (CodecConfig blk -> forall s. Decoder s (ByteString -> blk)
forall blk a.
DecodeDisk blk a =>
CodecConfig blk -> forall s. Decoder s a
decodeDisk CodecConfig blk
ccfg) FsPath
fsPath ((Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> (Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a b. (a -> b) -> a -> b
$
[ParsedBlockInfo blk]
-> Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries []
where
noValidation :: Bool
noValidation :: Bool
noValidation = BlockValidationPolicy
validationPolicy BlockValidationPolicy -> BlockValidationPolicy -> Bool
forall a. Eq a => a -> a -> Bool
== BlockValidationPolicy
NoValidation
checkEntries ::
[ParsedBlockInfo blk]
-> Stream (Of (Word64, (Word64, blk)))
m
(Maybe (ReadIncrementalErr, Word64))
-> m ( [ParsedBlockInfo blk]
, Maybe (ParseError blk, BlockOffset)
)
checkEntries :: [ParsedBlockInfo blk]
-> Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries [ParsedBlockInfo blk]
parsed Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream = Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m (Either
(Maybe (ReadIncrementalErr, Word64))
((Word64, (Word64, blk)),
Stream
(Of (Word64, (Word64, blk)))
m
(Maybe (ReadIncrementalErr, Word64))))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
S.next Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream m (Either
(Maybe (ReadIncrementalErr, Word64))
((Word64, (Word64, blk)),
Stream
(Of (Word64, (Word64, blk)))
m
(Maybe (ReadIncrementalErr, Word64))))
-> (Either
(Maybe (ReadIncrementalErr, Word64))
((Word64, (Word64, blk)),
Stream
(Of (Word64, (Word64, blk)))
m
(Maybe (ReadIncrementalErr, Word64)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left Maybe (ReadIncrementalErr, Word64)
mbErr
-> ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. [a] -> [a]
reverse [ParsedBlockInfo blk]
parsed, (ReadIncrementalErr -> ParseError blk)
-> (Word64 -> BlockOffset)
-> (ReadIncrementalErr, Word64)
-> (ParseError blk, BlockOffset)
forall a b c d. (a -> b) -> (c -> d) -> (a, c) -> (b, d)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap ReadIncrementalErr -> ParseError blk
forall blk. ReadIncrementalErr -> ParseError blk
BlockReadErr Word64 -> BlockOffset
BlockOffset ((ReadIncrementalErr, Word64) -> (ParseError blk, BlockOffset))
-> Maybe (ReadIncrementalErr, Word64)
-> Maybe (ParseError blk, BlockOffset)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ReadIncrementalErr, Word64)
mbErr)
Right ((Word64
offset, (Word64
size, blk
blk)), Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream')
| Bool
noValidation Bool -> Bool -> Bool
|| blk -> Bool
isNotCorrupt blk
blk
-> let !blockInfo :: BlockInfo blk
blockInfo = blk -> BlockInfo blk
forall blk.
(GetPrevHash blk, HasBinaryBlockInfo blk) =>
blk -> BlockInfo blk
extractBlockInfo blk
blk
!newParsed :: ParsedBlockInfo blk
newParsed = ParsedBlockInfo {
pbiBlockOffset :: BlockOffset
pbiBlockOffset = Word64 -> BlockOffset
BlockOffset Word64
offset
, pbiBlockSize :: BlockSize
pbiBlockSize = Word32 -> BlockSize
BlockSize (Word32 -> BlockSize) -> Word32 -> BlockSize
forall a b. (a -> b) -> a -> b
$ Word64 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
size
, pbiBlockInfo :: BlockInfo blk
pbiBlockInfo = BlockInfo blk
blockInfo
, pbiNestedCtxt :: SomeSecond (NestedCtxt Header) blk
pbiNestedCtxt = case Header blk -> DepPair (NestedCtxt Header blk)
forall (f :: * -> *) blk.
HasNestedContent f blk =>
f blk -> DepPair (NestedCtxt f blk)
unnest (blk -> Header blk
forall blk. GetHeader blk => blk -> Header blk
getHeader blk
blk) of
DepPair NestedCtxt Header blk a
nestedCtxt a
_ -> NestedCtxt Header blk a -> SomeSecond (NestedCtxt Header) blk
forall {k1} {k2} (f :: k1 -> k2 -> *) (a :: k1) (b :: k2).
f a b -> SomeSecond f a
SomeSecond NestedCtxt Header blk a
nestedCtxt
}
in [ParsedBlockInfo blk]
-> Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries (ParsedBlockInfo blk
newParsed ParsedBlockInfo blk
-> [ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. a -> [a] -> [a]
: [ParsedBlockInfo blk]
parsed) Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream'
| Bool
otherwise
-> let !hash :: HeaderHash blk
hash = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash blk
blk
in ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ( [ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. [a] -> [a]
reverse [ParsedBlockInfo blk]
parsed
, (ParseError blk, BlockOffset)
-> Maybe (ParseError blk, BlockOffset)
forall a. a -> Maybe a
Just (HeaderHash blk -> ParseError blk
forall blk. HeaderHash blk -> ParseError blk
BlockCorruptedErr HeaderHash blk
hash, Word64 -> BlockOffset
BlockOffset Word64
offset)
)
extractBlockInfo ::
(GetPrevHash blk, HasBinaryBlockInfo blk)
=> blk
-> BlockInfo blk
blk
blk = BlockInfo {
biHash :: HeaderHash blk
biHash = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash blk
blk
, biSlotNo :: SlotNo
biSlotNo = blk -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot blk
blk
, biBlockNo :: BlockNo
biBlockNo = blk -> BlockNo
forall b. HasHeader b => b -> BlockNo
blockNo blk
blk
, biIsEBB :: IsEBB
biIsEBB = blk -> IsEBB
forall blk. GetHeader blk => blk -> IsEBB
blockToIsEBB blk
blk
, biPrevHash :: ChainHash blk
biPrevHash = blk -> ChainHash blk
forall blk. GetPrevHash blk => blk -> ChainHash blk
blockPrevHash blk
blk
, biHeaderOffset :: Word16
biHeaderOffset = Word16
headerOffset
, biHeaderSize :: Word16
biHeaderSize = Word16
headerSize
}
where
BinaryBlockInfo { Word16
headerOffset :: Word16
headerOffset :: BinaryBlockInfo -> Word16
headerOffset, Word16
headerSize :: Word16
headerSize :: BinaryBlockInfo -> Word16
headerSize } = blk -> BinaryBlockInfo
forall blk. HasBinaryBlockInfo blk => blk -> BinaryBlockInfo
getBinaryBlockInfo blk
blk