123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- {-# LANGUAGE OverloadedStrings #-}
- -- |
- -- Streamline exports a monad that, given an uniform IO target, emulates
- -- character tream IO using high performance block IO.
- module System.IO.Uniform.Streamline (
- Streamline,
- IOScannerState(..),
- withClient,
- withServer,
- withTarget,
- send,
- send',
- recieveLine,
- recieveLine',
- lazyRecieveLine,
- recieveN,
- recieveN',
- lazyRecieveN,
- recieveTill,
- recieveTill',
- startTls,
- runAttoparsec,
- runAttoparsecAndReturn,
- isSecure,
- setTimeout,
- setEcho,
- runScanner,
- runScanner',
- scan,
- scan',
- textScanner
- ) where
- import qualified System.IO.Uniform as S
- import qualified System.IO.Uniform.Network as N
- import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
- import System.IO.Uniform.Streamline.Scanner
- import Control.Monad.Trans.Class
- import Control.Applicative
- import Control.Monad (ap)
- import Control.Monad.IO.Class
- import System.IO.Error
- import Data.ByteString (ByteString)
- import qualified Data.ByteString as BS
- import qualified Data.ByteString.Lazy as LBS
- import Data.Word8 (Word8)
- import Data.IP (IP)
- import qualified Data.Char as C
- import qualified Data.Attoparsec.ByteString as A
- data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
- -- | Monad that emulates character stream IO over block IO.
- newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
- blockSize :: Int
- blockSize = 4096
- defaultTimeout :: Int
- defaultTimeout = 1000000 * 600
- readF :: MonadIO m => Data -> m ByteString
- readF cl = if echo cl
- then do
- l <- liftIO $ S.uRead (str cl) blockSize
- liftIO $ BS.putStr "<"
- liftIO $ BS.putStr l
- return l
- else liftIO $ S.uRead (str cl) blockSize
- writeF :: MonadIO m => Data -> ByteString -> m ()
- writeF cl l = if echo cl
- then do
- liftIO $ BS.putStr ">"
- liftIO $ BS.putStr l
- liftIO $ S.uPut (str cl) l
- else liftIO $ S.uPut (str cl) l
- -- | withServer f serverIP port
- --
- -- Connects to the given server port, runs f, and closes the connection.
- withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
- withServer host port f = do
- ds <- liftIO $ N.connectTo host port
- (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
- liftIO $ S.uClose ds
- return ret
- -- | withClient f boundPort
- --
- -- Accepts a connection at the bound port, runs f and closes the connection.
- withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
- withClient port f = do
- ds <- liftIO $ N.accept port
- (peerIp, peerPort) <- liftIO $ N.getPeer ds
- (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
- liftIO $ S.uClose ds
- return ret
- -- | withTarget f someIO
- --
- -- Runs f wrapped on a Streamline monad that does IO on nomeIO.
- withTarget :: (MonadIO m, UniformIO a) => a -> Streamline m b -> m b
- withTarget s f = do
- (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
- return ret
- instance Monad m => Monad (Streamline m) where
- --return :: (Monad m) => a -> Streamline m a
- return x = Streamline $ \cl -> return (x, cl)
- --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
- a >>= b = Streamline $ \cl -> do
- (x, cl') <- withTarget' a cl
- withTarget' (b x) cl'
- instance Monad m => Functor (Streamline m) where
- --fmap :: (a -> b) -> Streamline m a -> Streamline m b
- fmap f m = Streamline $ \cl -> do
- (x, cl') <- withTarget' m cl
- return (f x, cl')
- instance (Functor m, Monad m) => Applicative (Streamline m) where
- pure = return
- (<*>) = ap
- instance MonadTrans Streamline where
- --lift :: Monad m => m a -> Streamline m a
- lift x = Streamline $ \cl -> do
- a <- x
- return (a, cl)
- instance MonadIO m => MonadIO (Streamline m) where
- liftIO = lift . liftIO
- -- | Sends data over the streamlines an IO target.
- send :: MonadIO m => ByteString -> Streamline m ()
- send r = Streamline $ \cl -> do
- writeF cl r
- return ((), cl)
- -- | Sends data from a lazy byte string
- send' :: MonadIO m => LBS.ByteString -> Streamline m ()
- send' r = Streamline $ \cl -> do
- let dd = LBS.toChunks r
- mapM (writeF cl) dd
- return ((), cl)
- -- | Equivalent to runScanner', but returns a strict, completely
- -- evaluated ByteString.
- runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
- runScanner state scanner = do
- (rt, st) <- runScanner' state scanner
- return (LBS.toStrict rt, st)
- {- |
- Very much like Attoparsec's runScanner:
- runScanner' scanner initial_state
- Recieves data, running the scanner on each byte,
- using the scanner result as initial state for the
- next byte, and stopping when the scanner returns
- Nothing.
- Returns the scanned ByteString.
- -}
- runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
- runScanner' state scanner = Streamline $ \d ->
- do
- (tx, st, d') <- in_scan d state
- return ((LBS.fromChunks tx, st), d')
- where
- --in_scan :: Data -> s -> m ([ByteString], s, Data)
- in_scan d st
- | isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
- | BS.null (buff d) = do
- dt <- readF d
- if BS.null dt
- then return ([], st, d{isEOF=True})
- else in_scan d{buff=dt} st
- | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of
- AllInput st' -> do
- (tx', st'', d') <- in_scan d{buff=""} st'
- return (buff d:tx', st'', d')
- SplitAt n st' -> let
- (r, i) = BS.splitAt n (buff d)
- in return ([r], st', d{buff=i})
- -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
- sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
- sscan _ s0 _ [] = AllInput s0
- sscan s s0 i (w:ww) = case s s0 w of
- Finished -> SplitAt i s0
- LastPass s1 -> SplitAt (i+1) s1
- Running s1 -> sscan s s1 (i+1) ww
- data ScanResult s = SplitAt Int s | AllInput s
- -- | Equivalent to runScanner, but dischards the final state
- scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
- scan state scanner = fst <$> runScanner state scanner
- -- | Equivalent to runScanner', but dischards the final state
- scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
- scan' state scanner = fst <$> runScanner' state scanner
- -- | Recieves data untill the next end of line (\n or \r\n)
- recieveLine :: MonadIO m => Streamline m ByteString
- recieveLine = recieveTill "\n"
-
- -- | Lazy version of recieveLine
- recieveLine' :: MonadIO m => Streamline m LBS.ByteString
- recieveLine' = recieveTill' "\n"
- -- | Use recieveLine'.
- lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
- {-# DEPRECATED #-}
- lazyRecieveLine = Streamline $ \cl -> lazyRecieveLine' cl
- where
- lazyRecieveLine' :: MonadIO m => Data -> m ([ByteString], Data)
- lazyRecieveLine' cl' =
- if isEOF cl'
- then eofError "System.IO.Uniform.Streamline.lazyRecieveLine"
- else
- if BS.null $ buff cl'
- then do
- dt <- readF cl'
- lazyRecieveLine' cl'{buff=dt}{isEOF=BS.null dt}
- else do
- let l = A.parseOnly lineWithEol $ buff cl'
- case l of
- Left _ -> do
- l' <- readF cl'
- (ret, cl'') <- lazyRecieveLine' cl'{buff=l'}{isEOF=BS.null l'}
- return ((buff cl') : ret, cl'')
- Right (ret, dt) -> return ([ret], cl'{buff=dt})
- -- | Recieves the given number of bytes.
- recieveN :: MonadIO m => Int -> Streamline m ByteString
- recieveN n = LBS.toStrict <$> recieveN' n
- -- | Lazy version of recieveN
- recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
- recieveN' n = Streamline $ \cl ->
- do
- (tt, cl') <- recieve cl n
- return (LBS.fromChunks tt, cl')
- where
- recieve d b
- | isEOF d = eofError "System.IO.Uniform.Streamline.lazyRecieveN"
- | BS.null . buff $ d = do
- dt <- readF d
- recieve d{buff=dt}{isEOF=BS.null dt} b
- | b <= (BS.length . buff $ d) = let
- (r, dt) = BS.splitAt b $ buff d
- in return ([r], d{buff=dt})
- | otherwise = do
- (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d)
- return (buff d : r, d')
- -- | Use recieveN'.
- lazyRecieveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
- {-# DEPRECATED #-}
- lazyRecieveN n' = Streamline $ \cl' -> lazyRecieveN' cl' n'
- where
- lazyRecieveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
- lazyRecieveN' cl n =
- if isEOF cl
- then eofError "System.IO.Uniform.Streamline.lazyRecieveN"
- else
- if BS.null (buff cl)
- then do
- b <- readF cl
- let eof = BS.null b
- let cl' = cl{buff=b}{isEOF=eof}
- lazyRecieveN' cl' n
- else
- if n <= BS.length (buff cl)
- then let
- ret = [BS.take n (buff cl)]
- buff' = BS.drop n (buff cl)
- in return (ret, cl{buff=buff'})
- else let
- cl' = cl{buff=""}
- b = buff cl
- in fmap (appFst b) $ lazyRecieveN' cl' (n - BS.length b)
- appFst :: a -> ([a], b) -> ([a], b)
- appFst a (l, b) = (a:l, b)
- -- | Recieves data until it matches the argument.
- -- Returns all of it, including the matching data.
- recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
- recieveTill t = LBS.toStrict <$> recieveTill' t
- -- | Lazy version of recieveTill
- recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
- recieveTill' t = recieve . BS.unpack $ t
- where
- recieve t' = scan' [] (textScanner t')
- -- | Wraps the streamlined IO target on TLS, streamlining
- -- the new wrapper afterwads.
- startTls :: MonadIO m => TlsSettings -> Streamline m ()
- startTls st = Streamline $ \cl -> do
- ds' <- liftIO $ S.startTls st $ str cl
- return ((), cl{str=SomeIO ds'}{buff=""})
- -- | Runs an Attoparsec parser over the data read from the
- -- streamlined IO target. Returns both the parser
- -- result and the string consumed by it.
- runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
- runAttoparsecAndReturn p = Streamline $ \cl ->
- if isEOF cl
- then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
- else do
- let c = A.parse p $ buff cl
- (cl', i, a) <- liftIO $ continueResult cl c
- return ((i, a), cl')
- where
- continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
- -- tx eof ds
- continueResult cl c = case c of
- A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
- A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
- A.Partial c' -> do
- d <- readF cl
- let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
- continueResult cl' (c' d)
- -- | Runs an Attoparsec parser over the data read from the
- -- streamlined IO target. Returning the parser result.
- runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
- runAttoparsec p = Streamline $ \cl ->
- if isEOF cl
- then eofError "System.IO.Uniform.Streamline.runAttoparsec"
- else do
- let c = A.parse p $ buff cl
- (cl', a) <- liftIO $ continueResult cl c
- return (a, cl')
- where
- continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
- continueResult cl c = case c of
- A.Fail i _ msg -> return (cl{buff=i}, Left msg)
- A.Done i r -> return (cl{buff=i}, Right r)
- A.Partial c' -> do
- d <- readF cl
- let eof' = BS.null d
- continueResult cl{buff=d}{isEOF=eof'} (c' d)
-
- -- | Indicates whether transport layer security is being used.
- isSecure :: Monad m => Streamline m Bool
- isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
- -- | Sets the timeout for the streamlined IO target.
- setTimeout :: Monad m => Int -> Streamline m ()
- setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
- -- | Sets echo of the streamlines IO target.
- -- If echo is set, all the data read an written to the target
- -- will be echoed in stdout, with ">" and "<" markers indicating
- -- what is read and written.
- setEcho :: Monad m => Bool -> Streamline m ()
- setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
- lineWithEol :: A.Parser (ByteString, ByteString)
- lineWithEol = do
- l <- A.scan False lineScanner
- r <- A.takeByteString
- return (l, r)
-
- eofError :: MonadIO m => String -> m a
- eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
- lineScanner :: Bool -> Word8 -> Maybe Bool
- lineScanner False c
- | c == (fromIntegral . C.ord $ '\n') = Just True
- | otherwise = Just False
- lineScanner True _ = Nothing
|