123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- {-# LANGUAGE OverloadedStrings, TypeFamilies, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
- {- |
- Streamline exports a monad that, given an uniform IO target, emulates
- character stream IO using high performance block IO.
- -}
- module System.IO.Uniform.Streamline (
- -- * Basic Type
- Streamline,
- -- * Running streamline targets
- -- ** Single pass runners
- withClient,
- withServer,
- withTarget,
- -- ** Interruptible support
- inStreamlineCtx,
- peelStreamlineCtx,
- closeTarget,
- -- * Sending and recieving data
- send,
- send',
- recieveLine,
- recieveLine',
- recieveN,
- recieveN',
- -- ** Running a parser
- runAttoparsec,
- runAttoparsecAndReturn,
- -- ** Scanning the input
- runScanner,
- runScanner',
- scan,
- scan',
- recieveTill,
- recieveTill',
- -- * Behavior settings
- startTls,
- isSecure,
- transformTarget,
- limitInput,
- echoTo,
- setEcho
- ) where
- import System.IO (stdout, Handle)
- import qualified System.IO.Uniform as S
- import qualified System.IO.Uniform.Network as N
- import qualified System.IO.Uniform.Std as Std
- import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
- import System.IO.Uniform.Streamline.Scanner
- import Data.Default.Class
- import Control.Monad.Trans.Class
- import Control.Monad.Trans.Interruptible
- import Control.Monad.Trans.Control
- import Control.Monad
- import Control.Monad.Base
- import Control.Monad.IO.Class
- import System.IO.Error
- import Data.ByteString (ByteString)
- import qualified Data.ByteString as BS
- import qualified Data.ByteString.Lazy as LBS
- import Data.Word8 (Word8)
- import Data.IP (IP)
- import qualified Data.Attoparsec.ByteString as A
- -- | Internal state for a Streamline monad
- data StreamlineState = StreamlineState {str :: SomeIO, buff :: ByteString, isEOF :: Bool, echo :: Maybe Handle, inLimit :: Int, sentEmpty :: Bool}
- instance Default StreamlineState where
- -- | Will open StdIO
- def = StreamlineState (SomeIO Std.StdIO) BS.empty False Nothing (-1) False
- -- | Monad that emulates character stream IO over block IO.
- newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)}
- blockSize :: Int
- blockSize = 4096
- readF :: MonadIO m => Streamline m ()
- readF = -- Must try just not to read more than the limit, actual limiting is done by takeBuff
- Streamline $ \cl -> if not . BS.null . buff $ cl then return ((), cl)
- else do
- let lim = inLimit cl
- sz = if lim < 0 then blockSize
- else if lim <= blockSize then lim
- else blockSize
- l <- liftIO $ S.uRead (str cl) sz
- let cl' = cl{buff= l}
- case echo cl of
- Just h -> do
- liftIO $ BS.hPutStr h "< "
- liftIO $ BS.hPutStr h l
- Nothing -> return ()
- return ((), cl')
- -- | Takes the buffer for processing
- takeBuff :: MonadIO m => Streamline m ByteString
- takeBuff = do
- readF
- Streamline $ \cl ->
- let lim = inLimit cl
- eof = isEOF cl
- b = buff cl
- in if eof then eofError "System.IO.Uniform.Streamline"
- else if lim < 0 then return (b, cl{buff="", isEOF=BS.null b})
- else let (r, b') = BS.splitAt lim b
- in return (r, cl{
- -- EOF is at the real end of file, not on limited input
- isEOF = lim /= 0 && (BS.null b || sentEmpty cl),
- sentEmpty = BS.null r,
- buff = b',
- inLimit = lim - BS.length r
- })
- -- | Pushes remaining data back into the buffer
- pushBuff :: Monad m => ByteString -> Streamline m ()
- pushBuff dt = Streamline $ \cl -> let
- lim = inLimit cl
- b = buff cl
- newb = BS.append dt b
- newl = if lim < 0 then lim else lim + BS.length dt
- in return ((), cl{buff=newb, inLimit=newl})
- writeF :: MonadIO m => ByteString -> Streamline m ()
- writeF l = Streamline $ \cl -> case echo cl of
- Just h -> do
- liftIO $ BS.hPutStr h "> "
- liftIO $ BS.hPutStr h l
- liftIO $ S.uPut (str cl) l
- return ((), cl)
- Nothing -> liftIO $ S.uPut (str cl) l >> return ((), cl)
- -- | > withServer f serverIP port
- --
- -- Connects to the given server port, runs f, and closes the connection.
- withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
- withServer host port f = do
- ds <- liftIO $ N.connectTo host port
- (ret, _) <- withTarget' f def{str=SomeIO ds}
- liftIO $ S.uClose ds
- return ret
- -- | > withClient f boundPort
- --
- -- Accepts a connection at the bound port, runs f and closes the connection.
- withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
- withClient port f = do
- ds <- liftIO $ N.accept port
- (peerIp, peerPort) <- liftIO $ N.getPeer ds
- (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds}
- liftIO $ S.uClose ds
- return ret
- {- |
- > withTarget f someIO
- Runs f wrapped on a Streamline monad that does IO on someIO.
- -}
- withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b
- withTarget s f = do
- (r, _) <- withTarget' f def{str=SomeIO s}
- return r
- instance Monad m => Monad (Streamline m) where
- --return :: (Monad m) => a -> Streamline m a
- return x = Streamline $ \cl -> return (x, cl)
- --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
- a >>= b = Streamline $ \cl -> do
- (x, cl') <- withTarget' a cl
- withTarget' (b x) cl'
- instance Monad m => Functor (Streamline m) where
- --fmap :: (a -> b) -> Streamline m a -> Streamline m b
- fmap f m = Streamline $ \cl -> do
- (x, cl') <- withTarget' m cl
- return (f x, cl')
- instance Monad m => Applicative (Streamline m) where
- pure = return
- (<*>) = ap
- instance MonadTrans Streamline where
- --lift :: Monad m => m a -> Streamline m a
- lift x = Streamline $ \cl -> do
- a <- x
- return (a, cl)
- instance MonadIO m => MonadIO (Streamline m) where
- liftIO = lift . liftIO
- -- | Sends data over the IO target.
- send :: MonadIO m => ByteString -> Streamline m ()
- send r = writeF r
- -- | Sends data from a lazy byte string
- send' :: MonadIO m => LBS.ByteString -> Streamline m ()
- send' r = do
- let dd = LBS.toChunks r
- mapM_ writeF dd
- {- |
- Very much like Attoparsec's runScanner:
- > runScanner scanner initial_state
- Recieves data, running the scanner on each byte,
- using the scanner result as initial state for the
- next byte, and stopping when the scanner returns
- Nothing.
- Returns the scanned ByteString.
- -}
- runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
- runScanner state scanner = do
- (rt, st) <- runScanner' state scanner
- return (LBS.toStrict rt, st)
- -- | Equivalent to runScanner, but returns a lazy ByteString
- runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
- runScanner' state scanner = do
- (tx, st) <- in_scan state
- return (LBS.fromChunks tx, st)
- where
- --in_scan :: MonadIO m => s -> Streamline m ([ByteString], s)
- in_scan st = do
- d <- takeBuff
- if BS.null d then return ([], st)
- else case sscan scanner st 0 $ BS.unpack d of
- AllInput st' -> do
- (tx', st'') <- in_scan st'
- return (d:tx', st'')
- SplitAt n st' -> do
- let (r, i) = BS.splitAt n d
- pushBuff i
- return ([r], st')
- -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
- sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
- sscan _ s0 _ [] = AllInput s0
- sscan s s0 i (w:ww) = case s s0 w of
- Finished -> SplitAt i s0
- LastPass s1 -> SplitAt (i+1) s1
- Running s1 -> sscan s s1 (i+1) ww
- data ScanResult s = SplitAt Int s | AllInput s
- -- | Equivalent to runScanner, but discards the final state
- scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
- scan state scanner = fst <$> runScanner state scanner
- -- | Equivalent to runScanner', but discards the final state
- scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
- scan' state scanner = fst <$> runScanner' state scanner
- -- | Recieves data untill the next end of line (\n or \r\n)
- recieveLine :: MonadIO m => Streamline m ByteString
- recieveLine = recieveTill "\n"
-
- -- | Lazy version of recieveLine
- recieveLine' :: MonadIO m => Streamline m LBS.ByteString
- recieveLine' = recieveTill' "\n"
- {- |
- Recieves the given number of bytes, or less in case of end of file.
- -}
- recieveN :: MonadIO m => Int -> Streamline m ByteString
- recieveN n = LBS.toStrict <$> recieveN' n
- -- | Lazy version of recieveN
- recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
- recieveN' n = LBS.fromChunks <$> recieve n
- where
- recieve sz
- | sz <= 0 = return []
- | otherwise = do
- d <- takeBuff
- if BS.null d then return []
- else do
- let (h, t) = BS.splitAt sz d
- sz' = sz - BS.length h
- unless (BS.null t) $ pushBuff t
- r <- recieve sz'
- return $ h : r
- -- | Recieves data until it matches the argument.
- -- Returns all of it, including the matching data.
- recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
- recieveTill t = LBS.toStrict <$> recieveTill' t
- -- | Lazy version of recieveTill
- recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
- recieveTill' = recieve . BS.unpack
- where
- recieve t' = scan' [] (textScanner t')
- -- | Wraps the streamlined IO target on TLS, streamlining
- -- the new wrapper afterwads.
- startTls :: MonadIO m => TlsSettings -> Streamline m ()
- startTls st = Streamline $ \cl -> do
- ds' <- liftIO $ S.startTls st $ str cl
- return ((), cl{str=SomeIO ds'}{buff=""})
- -- | Runs an Attoparsec parser over the data read from the
- -- streamlined IO target. Returns both the parser
- -- result and the string consumed by it.
- runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
- runAttoparsecAndReturn p = do
- d <- takeBuff
- let c = A.parse p d
- continueResult c d [d]
- where
- continueResult c d dd = case c of
- A.Fail i _ msg -> do
- pushBuff $ BS.concat (reverse dd) `BS.append` i
- return (BS.take (BS.length d - BS.length i) d, Left msg)
- A.Done i r -> do
- pushBuff i
- return (BS.concat (reverse dd) `BS.append`
- BS.take (BS.length d - BS.length i) d,
- Right r)
- A.Partial c' -> do
- dt <- takeBuff
- continueResult (c' dt) dt $ dt:dd
- -- | Runs an Attoparsec parser over the data read from the
- -- streamlined IO target. Returning the parser result.
- runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
- runAttoparsec p = snd <$> runAttoparsecAndReturn p
- -- | Indicates whether transport layer security is being used.
- isSecure :: Monad m => Streamline m Bool
- isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
- -- | Sets echo of the streamlines IO target.
- -- If echo is set, all the data read an written to the target
- -- will be echoed in stdout, with ">" and "<" markers indicating
- -- what is read and written.
- setEcho :: Monad m => Bool -> Streamline m ()
- setEcho e = Streamline $ \cl ->
- if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing})
- {- |
- Replaces the enclosed target with the result of the given transformation.
- Discards all buffered data in the process.
- -}
- transformTarget :: (UniformIO a, Monad m) => (SomeIO -> a) -> Streamline m ()
- transformTarget w = Streamline $ \cl -> return ((), cl{str = SomeIO . w . str $ cl})
- {- |
- Limits the input to the given number of bytes, emulating an end of file after them.
- If the limit is negative, the input will not be limited.
- -}
- limitInput :: Monad m => Int -> Streamline m ()
- limitInput n = Streamline $ \cl -> return ((), cl{inLimit = n})
- {- |
- Sets echo of the streamlined IO target.
- If echo is set, all the data read an written to the target
- will be echoed to the handle, with ">" and "<" markers indicating
- what is read and written.
- Setting to Nothing will disable echo.
- -}
- echoTo :: Monad m => Maybe Handle -> Streamline m ()
- echoTo h = Streamline $ \cl -> return ((), cl{echo=h})
- eofError :: MonadIO m => String -> m a
- eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
- instance Interruptible Streamline where
- type RSt Streamline a = (a, StreamlineState)
- resume f (a, st) = withTarget' (f a) st
- -- | Creates a Streamline interrutible context
- inStreamlineCtx :: UniformIO io => io -> a -> RSt Streamline a
- inStreamlineCtx io a = (a, def{str = SomeIO io})
- -- | Closes the target of a streamline state, releasing any resource.
- closeTarget :: MonadIO m => Streamline m ()
- closeTarget = Streamline $ \st -> do
- liftIO . S.uClose . str $ st
- return ((), st)
- -- | Removes a Streamline interruptible context
- peelStreamlineCtx :: RSt Streamline a -> (a, SomeIO)
- peelStreamlineCtx (a, dt) = (a, str dt)
- instance MonadTransControl Streamline where
- type StT Streamline a = (a, StreamlineState)
- liftWith f = Streamline $ \s ->
- liftM (\x -> (x, s))
- (f $ \t -> withTarget' t s)
- restoreT = Streamline . const
- instance MonadBase b m => MonadBase b (Streamline m) where
- liftBase = liftBaseDefault
- instance MonadBaseControl b m => MonadBaseControl b (Streamline m) where
- type StM (Streamline m) a = ComposeSt Streamline m a
- liftBaseWith = defaultLiftBaseWith
- restoreM = defaultRestoreM
|