123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- {-# LANGUAGE OverloadedStrings, TypeFamilies, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
- {- |
- Streamline exports a monad that, given an uniform IO target, emulates
- character stream IO using high performance block IO.
- -}
- module System.IO.Uniform.Streamline (
- -- * Basic Type
- Streamline,
- -- * Running streamline targets
- -- ** Single pass runners
- withClient,
- withServer,
- withTarget,
- -- ** Interruptible support
- inStreamlineCtx,
- peelStreamlineCtx,
- closeTarget,
- -- * Sending and recieving data
- send,
- send',
- recieveLine,
- recieveLine',
- recieveN,
- recieveN',
- -- ** Running a parser
- runAttoparsec,
- runAttoparsecAndReturn,
- -- ** Scanning the input
- runScanner,
- runScanner',
- scan,
- scan',
- recieveTill,
- recieveTill',
- -- * Behavior settings
- startTls,
- isSecure,
- transformTarget,
- echoTo,
- setEcho
- ) where
- import System.IO (stdout, Handle)
- import qualified System.IO.Uniform as S
- import qualified System.IO.Uniform.Network as N
- import qualified System.IO.Uniform.Std as Std
- import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
- import System.IO.Uniform.Streamline.Scanner
- import Data.Default.Class
- import Control.Monad.Trans.Class
- import Control.Monad.Trans.Interruptible
- import Control.Monad.Trans.Control
- import Control.Monad (ap, liftM)
- import Control.Monad.Base
- import Control.Monad.IO.Class
- import System.IO.Error
- import Data.ByteString (ByteString)
- import qualified Data.ByteString as BS
- import qualified Data.ByteString.Lazy as LBS
- import Data.Word8 (Word8)
- import Data.IP (IP)
- import qualified Data.Attoparsec.ByteString as A
- -- | Internal state for a Streamline monad
- data StreamlineState = StreamlineState {str :: SomeIO, buff :: ByteString, isEOF :: Bool, echo :: Maybe Handle}
- instance Default StreamlineState where
- -- | Will open StdIO
- def = StreamlineState (SomeIO Std.StdIO) BS.empty False Nothing
- -- | Monad that emulates character stream IO over block IO.
- newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)}
- blockSize :: Int
- blockSize = 4096
- readF :: MonadIO m => StreamlineState -> m ByteString
- readF cl = case echo cl of
- Just h -> do
- l <- liftIO $ S.uRead (str cl) blockSize
- liftIO $ BS.hPutStr h "< "
- liftIO $ BS.hPutStr h l
- return l
- Nothing -> liftIO $ S.uRead (str cl) blockSize
- writeF :: MonadIO m => StreamlineState -> ByteString -> m ()
- writeF cl l = case echo cl of
- Just h -> do
- liftIO $ BS.hPutStr h "> "
- liftIO $ BS.hPutStr h l
- liftIO $ S.uPut (str cl) l
- Nothing -> liftIO $ S.uPut (str cl) l
- -- | > withServer f serverIP port
- --
- -- Connects to the given server port, runs f, and closes the connection.
- withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
- withServer host port f = do
- ds <- liftIO $ N.connectTo host port
- (ret, _) <- withTarget' f def{str=SomeIO ds}
- liftIO $ S.uClose ds
- return ret
- -- | > withClient f boundPort
- --
- -- Accepts a connection at the bound port, runs f and closes the connection.
- withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
- withClient port f = do
- ds <- liftIO $ N.accept port
- (peerIp, peerPort) <- liftIO $ N.getPeer ds
- (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds}
- liftIO $ S.uClose ds
- return ret
- {- |
- > withTarget f someIO
- Runs f wrapped on a Streamline monad that does IO on someIO.
- -}
- withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b
- withTarget s f = do
- (r, _) <- withTarget' f def{str=SomeIO s}
- return r
- instance Monad m => Monad (Streamline m) where
- --return :: (Monad m) => a -> Streamline m a
- return x = Streamline $ \cl -> return (x, cl)
- --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
- a >>= b = Streamline $ \cl -> do
- (x, cl') <- withTarget' a cl
- withTarget' (b x) cl'
- instance Monad m => Functor (Streamline m) where
- --fmap :: (a -> b) -> Streamline m a -> Streamline m b
- fmap f m = Streamline $ \cl -> do
- (x, cl') <- withTarget' m cl
- return (f x, cl')
- instance Monad m => Applicative (Streamline m) where
- pure = return
- (<*>) = ap
- instance MonadTrans Streamline where
- --lift :: Monad m => m a -> Streamline m a
- lift x = Streamline $ \cl -> do
- a <- x
- return (a, cl)
- instance MonadIO m => MonadIO (Streamline m) where
- liftIO = lift . liftIO
- -- | Sends data over the IO target.
- send :: MonadIO m => ByteString -> Streamline m ()
- send r = Streamline $ \cl -> do
- writeF cl r
- return ((), cl)
- -- | Sends data from a lazy byte string
- send' :: MonadIO m => LBS.ByteString -> Streamline m ()
- send' r = Streamline $ \cl -> do
- let dd = LBS.toChunks r
- mapM_ (writeF cl) dd
- return ((), cl)
- {- |
- Very much like Attoparsec's runScanner:
- > runScanner scanner initial_state
- Recieves data, running the scanner on each byte,
- using the scanner result as initial state for the
- next byte, and stopping when the scanner returns
- Nothing.
- Returns the scanned ByteString.
- -}
- runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
- runScanner state scanner = do
- (rt, st) <- runScanner' state scanner
- return (LBS.toStrict rt, st)
- -- | Equivalent to runScanner, but returns a strict, completely
- -- evaluated ByteString.
- runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
- runScanner' state scanner = Streamline $ \d ->
- do
- (tx, st, d') <- in_scan d state
- return ((LBS.fromChunks tx, st), d')
- where
- --in_scan :: StreamlineState -> s -> m ([ByteString], s, StreamlineState)
- in_scan d st
- | isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
- | BS.null (buff d) = do
- dt <- readF d
- if BS.null dt
- then return ([], st, d{isEOF=True})
- else in_scan d{buff=dt} st
- | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of
- AllInput st' -> do
- (tx', st'', d') <- in_scan d{buff=""} st'
- return (buff d:tx', st'', d')
- SplitAt n st' -> let
- (r, i) = BS.splitAt n (buff d)
- in return ([r], st', d{buff=i})
- -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
- sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
- sscan _ s0 _ [] = AllInput s0
- sscan s s0 i (w:ww) = case s s0 w of
- Finished -> SplitAt i s0
- LastPass s1 -> SplitAt (i+1) s1
- Running s1 -> sscan s s1 (i+1) ww
- data ScanResult s = SplitAt Int s | AllInput s
- -- | Equivalent to runScanner, but discards the final state
- scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
- scan state scanner = fst <$> runScanner state scanner
- -- | Equivalent to runScanner', but discards the final state
- scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
- scan' state scanner = fst <$> runScanner' state scanner
- -- | Recieves data untill the next end of line (\n or \r\n)
- recieveLine :: MonadIO m => Streamline m ByteString
- recieveLine = recieveTill "\n"
-
- -- | Lazy version of recieveLine
- recieveLine' :: MonadIO m => Streamline m LBS.ByteString
- recieveLine' = recieveTill' "\n"
- -- | Recieves the given number of bytes.
- recieveN :: MonadIO m => Int -> Streamline m ByteString
- recieveN n = LBS.toStrict <$> recieveN' n
- -- | Lazy version of recieveN
- recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
- recieveN' n | n <= 0 = return ""
- | otherwise = Streamline $ \cl ->
- do
- (tt, cl') <- recieve cl n
- return (LBS.fromChunks tt, cl')
- where
- recieve d b
- | isEOF d = eofError "System.IO.Uniform.Streamline.recieveN"
- | BS.null . buff $ d = do
- dt <- readF d
- recieve d{buff=dt}{isEOF=BS.null dt} b
- | b <= (BS.length . buff $ d) = let
- (r, dt) = BS.splitAt b $ buff d
- in return ([r], d{buff=dt})
- | otherwise = do
- (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d)
- return (buff d : r, d')
- -- | Recieves data until it matches the argument.
- -- Returns all of it, including the matching data.
- recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
- recieveTill t = LBS.toStrict <$> recieveTill' t
- -- | Lazy version of recieveTill
- recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
- recieveTill' = recieve . BS.unpack
- where
- recieve t' = scan' [] (textScanner t')
- -- | Wraps the streamlined IO target on TLS, streamlining
- -- the new wrapper afterwads.
- startTls :: MonadIO m => TlsSettings -> Streamline m ()
- startTls st = Streamline $ \cl -> do
- ds' <- liftIO $ S.startTls st $ str cl
- return ((), cl{str=SomeIO ds'}{buff=""})
- -- | Runs an Attoparsec parser over the data read from the
- -- streamlined IO target. Returns both the parser
- -- result and the string consumed by it.
- runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
- runAttoparsecAndReturn p = Streamline $ \cl ->
- if isEOF cl
- then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
- else do
- let c = A.parse p $ buff cl
- (cl', i, a) <- liftIO $ continueResult cl c
- return ((i, a), cl')
- where
- continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, ByteString, Either String a)
- -- tx eof ds
- continueResult cl c = case c of
- A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
- A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
- A.Partial c' -> do
- d <- readF cl
- let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
- continueResult cl' (c' d)
- -- | Runs an Attoparsec parser over the data read from the
- -- streamlined IO target. Returning the parser result.
- runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
- runAttoparsec p = Streamline $ \cl ->
- if isEOF cl
- then eofError "System.IO.Uniform.Streamline.runAttoparsec"
- else do
- let c = A.parse p $ buff cl
- (cl', a) <- liftIO $ continueResult cl c
- return (a, cl')
- where
- continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, Either String a)
- continueResult cl c = case c of
- A.Fail i _ msg -> return (cl{buff=i}, Left msg)
- A.Done i r -> return (cl{buff=i}, Right r)
- A.Partial c' -> do
- d <- readF cl
- let eof' = BS.null d
- continueResult cl{buff=d}{isEOF=eof'} (c' d)
- -- | Indicates whether transport layer security is being used.
- isSecure :: Monad m => Streamline m Bool
- isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
- -- | Sets echo of the streamlines IO target.
- -- If echo is set, all the data read an written to the target
- -- will be echoed in stdout, with ">" and "<" markers indicating
- -- what is read and written.
- setEcho :: Monad m => Bool -> Streamline m ()
- setEcho e = Streamline $ \cl ->
- if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing})
- {- |
- Replaces the enclosed target with the result of the given transformation.
- Discards all buffered data in the process.
- -}
- transformTarget :: (UniformIO a, Monad m) => (SomeIO -> a) -> Streamline m ()
- transformTarget w = Streamline $ \cl -> return ((), cl{str = SomeIO . w . str $ cl})
- {- |
- Sets echo of the streamlined IO target.
- If echo is set, all the data read an written to the target
- will be echoed to the handle, with ">" and "<" markers indicating
- what is read and written.
- Setting to Nothing will disable echo.
- -}
- echoTo :: Monad m => Maybe Handle -> Streamline m ()
- echoTo h = Streamline $ \cl -> return ((), cl{echo=h})
- eofError :: MonadIO m => String -> m a
- eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
- instance Interruptible Streamline where
- type RSt Streamline a = (a, StreamlineState)
- resume f (a, st) = withTarget' (f a) st
- -- | Creates a Streamline interrutible context
- inStreamlineCtx :: UniformIO io => io -> a -> RSt Streamline a
- inStreamlineCtx io a = (a, def{str = SomeIO io})
- -- | Closes the target of a streamline state, releasing any resource.
- closeTarget :: MonadIO m => Streamline m ()
- closeTarget = Streamline $ \st -> do
- liftIO . S.uClose . str $ st
- return ((), st)
- -- | Removes a Streamline interruptible context
- peelStreamlineCtx :: RSt Streamline a -> (a, SomeIO)
- peelStreamlineCtx (a, dt) = (a, str dt)
- instance MonadTransControl Streamline where
- type StT Streamline a = (a, StreamlineState)
- liftWith f = Streamline $ \s ->
- liftM (\x -> (x, s))
- (f $ \t -> withTarget' t s)
- restoreT = Streamline . const
- instance MonadBase b m => MonadBase b (Streamline m) where
- liftBase = liftBaseDefault
- instance MonadBaseControl b m => MonadBaseControl b (Streamline m) where
- type StM (Streamline m) a = ComposeSt Streamline m a
- liftBaseWith = defaultLiftBaseWith
- restoreM = defaultRestoreM
|