123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- {-# LANGUAGE OverloadedStrings #-}
- -- |
- -- Streamline exports a monad that, given an uniform IO target, emulates
- -- character tream IO using high performance block IO.
- module System.IO.Uniform.Streamline (Streamline, withClient, withServer, withTarget, send, receiveLine, lazyRecieveLine, lazyReceiveN, startTls, runAttoparsec, runAttoparsecAndReturn, isSecure, setTimeout, setEcho) where
- import qualified System.IO.Uniform as S
- import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
- import Control.Monad.Trans.Class
- import Control.Applicative
- import Control.Monad (ap)
- import Control.Monad.IO.Class
- import System.IO.Error
- import Data.ByteString (ByteString)
- import qualified Data.ByteString as BS
- import Data.Word8 (Word8)
- import Data.IP (IP)
- import qualified Data.Attoparsec.ByteString as A
- data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
- -- | Monad that emulates character stream IO over block IO.
- newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
- blockSize :: Int
- blockSize = 4096
- defaultTimeout :: Int
- defaultTimeout = 1000000 * 600
- readF :: MonadIO m => Data -> m ByteString
- readF cl = if echo cl
- then do
- l <- liftIO $ S.uRead (str cl) blockSize
- liftIO $ BS.putStr "<"
- liftIO $ BS.putStr l
- return l
- else liftIO $ S.uRead (str cl) blockSize
- writeF :: MonadIO m => Data -> ByteString -> m ()
- writeF cl l = if echo cl
- then do
- liftIO $ BS.putStr ">"
- liftIO $ BS.putStr l
- liftIO $ S.uPut (str cl) l
- else liftIO $ S.uPut (str cl) l
- -- | withServer f serverIP port
- --
- -- Connects to the given server port, runs f, and closes the connection.
- withServer :: MonadIO m => Streamline m a -> IP -> Int -> m a
- withServer f host port = do
- ds <- liftIO $ S.connectTo host port
- (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
- liftIO $ S.uClose ds
- return ret
- -- | withClient f boundPort
- --
- -- Accepts a connection at the bound port, runs f and closes the connection.
- withClient :: MonadIO m => (IP -> Int -> Streamline m a) -> S.BoundedPort -> m a
- withClient f port = do
- ds <- liftIO $ S.accept port
- (peerIp, peerPort) <- liftIO $ S.getPeer ds
- (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
- liftIO $ S.uClose ds
- return ret
- -- | withTarget f someIO
- --
- -- Runs f wrapped on a Streamline monad that does IO on nomeIO.
- withTarget :: (MonadIO m, UniformIO a) => Streamline m b -> a -> m b
- withTarget f s = do
- (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
- return ret
- instance Monad m => Monad (Streamline m) where
- --return :: (Monad m) => a -> Streamline m a
- return x = Streamline $ \cl -> return (x, cl)
- --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
- a >>= b = Streamline $ \cl -> do
- (x, cl') <- withTarget' a cl
- withTarget' (b x) cl'
- instance Monad m => Functor (Streamline m) where
- --fmap :: (a -> b) -> Streamline m a -> Streamline m b
- fmap f m = Streamline $ \cl -> do
- (x, cl') <- withTarget' m cl
- return (f x, cl')
- instance (Functor m, Monad m) => Applicative (Streamline m) where
- pure = return
- (<*>) = ap
- instance MonadTrans Streamline where
- --lift :: Monad m => m a -> Streamline m a
- lift x = Streamline $ \cl -> do
- a <- x
- return (a, cl)
- instance MonadIO m => MonadIO (Streamline m) where
- liftIO = lift . liftIO
- -- | Sends data over the streamlines an IO target.
- send :: MonadIO m => ByteString -> Streamline m ()
- send r = Streamline $ \cl -> do
- writeF cl r
- return ((), cl)
- -- | Receives a line from the streamlined IO target.
- receiveLine :: MonadIO m => Streamline m ByteString
- receiveLine = do
- l <- runAttoparsec parseLine
- case l of
- Left _ -> return ""
- Right l' -> return l'
-
- -- | Receives a line from the streamlined IO target,
- -- but breaks the line on reasonably sized chuncks, and
- -- reads them lazyly, so that IO can be done in constant
- -- memory space.
- lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
- lazyRecieveLine = Streamline $ \cl -> lazyReceiveLine' cl
- where
- lazyReceiveLine' :: MonadIO m => Data -> m ([ByteString], Data)
- lazyReceiveLine' cl' =
- if isEOF cl'
- then eofError "System.IO.Uniform.Streamline.lazyReceiveLine"
- else
- if BS.null $ buff cl'
- then do
- dt <- readF cl'
- lazyReceiveLine' cl'{buff=dt}{isEOF=BS.null dt}
- else do
- let l = A.parseOnly lineWithEol $ buff cl'
- case l of
- Left _ -> do
- l' <- readF cl'
- (ret, cl'') <- lazyReceiveLine' cl'{buff=l'}{isEOF=BS.null l'}
- return ((buff cl') : ret, cl'')
- Right (ret, dt) -> return ([ret], cl'{buff=dt})
- -- | lazyReceiveN n
- -- Receives n bytes of data from the streamlined IO target,
- -- but breaks the data on reasonably sized chuncks, and reads
- -- them lazyly, so that IO can be done in constant memory space.
- lazyReceiveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
- lazyReceiveN n' = Streamline $ \cl' -> lazyReceiveN' cl' n'
- where
- lazyReceiveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
- lazyReceiveN' cl n =
- if isEOF cl
- then eofError "System.IO.Uniform.Streamline.lazyReceiveN"
- else
- if BS.null (buff cl)
- then do
- b <- readF cl
- let eof = BS.null b
- let cl' = cl{buff=b}{isEOF=eof}
- lazyReceiveN' cl' n
- else
- if n <= BS.length (buff cl)
- then let
- ret = [BS.take n (buff cl)]
- buff' = BS.drop n (buff cl)
- in return (ret, cl{buff=buff'})
- else let
- cl' = cl{buff=""}
- b = buff cl
- in fmap (appFst b) $ lazyReceiveN' cl' (n - BS.length b)
- appFst :: a -> ([a], b) -> ([a], b)
- appFst a (l, b) = (a:l, b)
- -- | Wraps the streamlined IO target on TLS, streamlining
- -- the new wrapper afterwads.
- startTls :: MonadIO m => TlsSettings -> Streamline m ()
- startTls st = Streamline $ \cl -> do
- ds' <- liftIO $ S.startTls st $ str cl
- return ((), cl{str=SomeIO ds'}{buff=""})
- -- | Runs an Attoparsec parser over the data read from the
- -- streamlined IO target. Returns both the parser
- -- result and the string consumed by it.
- runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
- runAttoparsecAndReturn p = Streamline $ \cl ->
- if isEOF cl
- then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
- else do
- let c = A.parse p $ buff cl
- (cl', i, a) <- liftIO $ continueResult cl c
- return ((i, a), cl')
- where
- continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
- -- tx eof ds
- continueResult cl c = case c of
- A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
- A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
- A.Partial c' -> do
- d <- readF cl
- let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
- continueResult cl' (c' d)
- -- | Runs an Attoparsec parser over the data read from the
- -- streamlined IO target. Returning the parser result.
- runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
- runAttoparsec p = Streamline $ \cl ->
- if isEOF cl
- then eofError "System.IO.Uniform.Streamline.runAttoparsec"
- else do
- let c = A.parse p $ buff cl
- (cl', a) <- liftIO $ continueResult cl c
- return (a, cl')
- where
- continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
- continueResult cl c = case c of
- A.Fail i _ msg -> return (cl{buff=i}, Left msg)
- A.Done i r -> return (cl{buff=i}, Right r)
- A.Partial c' -> do
- d <- readF cl
- let eof' = BS.null d
- continueResult cl{buff=d}{isEOF=eof'} (c' d)
-
- -- | Indicates whether transport layer security is being used.
- isSecure :: Monad m => Streamline m Bool
- isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
- -- | Sets the timeout for the streamlined IO target.
- setTimeout :: Monad m => Int -> Streamline m ()
- setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
- -- | Sets echo of the streamlines IO target.
- -- If echo is set, all the data read an written to the target
- -- will be echoed in stdout, with ">" and "<" markers indicating
- -- what is read and written.
- setEcho :: Monad m => Bool -> Streamline m ()
- setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
- parseLine :: A.Parser ByteString
- parseLine = do
- l <- A.takeTill isEol
- (A.word8 13 >> A.word8 10) <|> A.word8 10
- return l
-
- lineWithEol :: A.Parser (ByteString, ByteString)
- lineWithEol = do
- l <- A.scan False lineScanner
- r <- A.takeByteString
- return (l, r)
-
- lineScanner :: Bool -> Word8 -> Maybe Bool
- lineScanner False c = Just $ isEol c
- lineScanner True c = if isEol c then Just True else Nothing
- isEol :: Word8 -> Bool
- isEol c = c == 13 || c == 10
- eofError :: MonadIO m => String -> m a
- eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
|