|
@@ -0,0 +1,257 @@
|
|
|
+{-# LANGUAGE OverloadedStrings #-}
|
|
|
+
|
|
|
+-- |
|
|
|
+-- Streamline exports a monad that, given an uniform IO target, emulates
|
|
|
+-- character tream IO using high performance block IO.
|
|
|
+module System.IO.Uniform.Streamline (Streamline, withClient, withServer, withTarget, send, receiveLine, lazyRecieveLine, lazyReceiveN, startTls, runAttoparsec, runAttoparsecAndReturn, isSecure, setTimeout, setEcho) where
|
|
|
+
|
|
|
+import qualified System.IO.Uniform as S
|
|
|
+import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
|
|
|
+
|
|
|
+import Control.Monad.Trans.Class
|
|
|
+import Control.Applicative
|
|
|
+import Control.Monad (ap)
|
|
|
+import Control.Monad.IO.Class
|
|
|
+import System.IO.Error
|
|
|
+import Data.ByteString (ByteString)
|
|
|
+import qualified Data.ByteString as BS
|
|
|
+import Data.Char (ord)
|
|
|
+import Data.Word8 (Word8)
|
|
|
+import Data.IP (IP)
|
|
|
+
|
|
|
+import qualified Data.Attoparsec.ByteString as A
|
|
|
+
|
|
|
+data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
|
|
|
+-- | Monad that emulates character stream IO over block IO.
|
|
|
+newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
|
|
|
+
|
|
|
+blockSize :: Int
|
|
|
+blockSize = 4096
|
|
|
+defaultTimeout :: Int
|
|
|
+defaultTimeout = 1000000 * 600
|
|
|
+
|
|
|
+readF :: MonadIO m => Data -> m ByteString
|
|
|
+readF cl = if echo cl
|
|
|
+ then do
|
|
|
+ l <- liftIO $ S.fRead (str cl) blockSize
|
|
|
+ liftIO $ BS.putStr "<"
|
|
|
+ liftIO $ BS.putStr l
|
|
|
+ return l
|
|
|
+ else liftIO $ S.fRead (str cl) blockSize
|
|
|
+
|
|
|
+writeF :: MonadIO m => Data -> ByteString -> m ()
|
|
|
+writeF cl l = if echo cl
|
|
|
+ then do
|
|
|
+ liftIO $ BS.putStr ">"
|
|
|
+ liftIO $ BS.putStr l
|
|
|
+ liftIO $ S.fPut (str cl) l
|
|
|
+ else liftIO $ S.fPut (str cl) l
|
|
|
+
|
|
|
+-- | withServer f serverIP port
|
|
|
+-- Connects to the given server port, runs f, and closes the connection.
|
|
|
+withServer :: MonadIO m => Streamline m a -> IP -> Int -> m a
|
|
|
+withServer f host port = do
|
|
|
+ ds <- liftIO $ S.connectTo host port
|
|
|
+ (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
|
|
|
+ liftIO $ S.fClose ds
|
|
|
+ return ret
|
|
|
+
|
|
|
+-- | withClient f boundPort
|
|
|
+-- Accepts a connection at the bound port, runs f and closes the connection.
|
|
|
+withClient :: MonadIO m => (IP -> Int -> Streamline m a) -> S.BoundPort -> m a
|
|
|
+withClient f port = do
|
|
|
+ ds <- liftIO $ S.accept port
|
|
|
+ (peerIp, peerPort) <- liftIO $ S.getPeer ds
|
|
|
+ (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
|
|
|
+ liftIO $ S.fClose ds
|
|
|
+ return ret
|
|
|
+
|
|
|
+-- | withTarget f someIO
|
|
|
+-- Runs f wrapped on a Streamline monad that does IO on nomeIO.
|
|
|
+withTarget :: (MonadIO m, UniformIO a) => Streamline m b -> a -> m b
|
|
|
+withTarget f s = do
|
|
|
+ (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
|
|
|
+ return ret
|
|
|
+
|
|
|
+instance Monad m => Monad (Streamline m) where
|
|
|
+ --return :: (Monad m) => a -> Streamline m a
|
|
|
+ return x = Streamline $ \cl -> return (x, cl)
|
|
|
+ --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
|
|
|
+ a >>= b = Streamline $ \cl -> do
|
|
|
+ (x, cl') <- withTarget' a cl
|
|
|
+ withTarget' (b x) cl'
|
|
|
+
|
|
|
+instance Monad m => Functor (Streamline m) where
|
|
|
+ --fmap :: (a -> b) -> Streamline m a -> Streamline m b
|
|
|
+ fmap f m = Streamline $ \cl -> do
|
|
|
+ (x, cl') <- withTarget' m cl
|
|
|
+ return (f x, cl')
|
|
|
+
|
|
|
+instance (Functor m, Monad m) => Applicative (Streamline m) where
|
|
|
+ pure = return
|
|
|
+ (<*>) = ap
|
|
|
+
|
|
|
+instance MonadTrans Streamline where
|
|
|
+ --lift :: Monad m => m a -> Streamline m a
|
|
|
+ lift x = Streamline $ \cl -> do
|
|
|
+ a <- x
|
|
|
+ return (a, cl)
|
|
|
+
|
|
|
+instance MonadIO m => MonadIO (Streamline m) where
|
|
|
+ liftIO = lift . liftIO
|
|
|
+
|
|
|
+-- | Sends data over the streamlines an IO target.
|
|
|
+send :: MonadIO m => ByteString -> Streamline m ()
|
|
|
+send r = Streamline $ \cl -> do
|
|
|
+ writeF cl r
|
|
|
+ return ((), cl)
|
|
|
+
|
|
|
+-- | Receives a line from the streamlined IO target.
|
|
|
+receiveLine :: MonadIO m => Streamline m ByteString
|
|
|
+receiveLine = do
|
|
|
+ l <- runAttoparsec parseLine
|
|
|
+ case l of
|
|
|
+ Left _ -> return ""
|
|
|
+ Right l' -> return l'
|
|
|
+
|
|
|
+-- | Receives a line from the streamlined IO target,
|
|
|
+-- but breaks the line on reasonably sized chuncks, and
|
|
|
+-- reads them lazyly, so that IO can be done in constant
|
|
|
+-- memory space.
|
|
|
+lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
|
|
|
+lazyRecieveLine = Streamline $ \cl -> lazyReceiveLine' cl
|
|
|
+ where
|
|
|
+ lazyReceiveLine' :: MonadIO m => Data -> m ([ByteString], Data)
|
|
|
+ lazyReceiveLine' cl' =
|
|
|
+ if isEOF cl'
|
|
|
+ then eofError "System.IO.Uniform.Streamline.lazyReceiveLine"
|
|
|
+ else
|
|
|
+ if BS.null $ buff cl'
|
|
|
+ then do
|
|
|
+ dt <- readF cl'
|
|
|
+ lazyReceiveLine' cl'{buff=dt}{isEOF=BS.null dt}
|
|
|
+ else do
|
|
|
+ let l = A.parseOnly lineWithEol $ buff cl'
|
|
|
+ case l of
|
|
|
+ Left _ -> do
|
|
|
+ l' <- readF cl'
|
|
|
+ (ret, cl'') <- lazyReceiveLine' cl'{buff=l'}{isEOF=BS.null l'}
|
|
|
+ return ((buff cl') : ret, cl'')
|
|
|
+ Right (ret, dt) -> return ([ret], cl'{buff=dt})
|
|
|
+
|
|
|
+-- | lazyReceiveN n
|
|
|
+-- Receives n bytes of data from the streamlined IO target,
|
|
|
+-- but breaks the data on reasonably sized chuncks, and reads
|
|
|
+-- them lazyly, so that IO can be done in constant memory space.
|
|
|
+lazyReceiveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
|
|
|
+lazyReceiveN n' = Streamline $ \cl' -> lazyReceiveN' cl' n'
|
|
|
+ where
|
|
|
+ lazyReceiveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
|
|
|
+ lazyReceiveN' cl n =
|
|
|
+ if isEOF cl
|
|
|
+ then eofError "System.IO.Uniform.Streamline.lazyReceiveN"
|
|
|
+ else
|
|
|
+ if BS.null (buff cl)
|
|
|
+ then do
|
|
|
+ b <- readF cl
|
|
|
+ let eof = BS.null b
|
|
|
+ let cl' = cl{buff=b}{isEOF=eof}
|
|
|
+ lazyReceiveN' cl' n
|
|
|
+ else
|
|
|
+ if n <= BS.length (buff cl)
|
|
|
+ then let
|
|
|
+ ret = [BS.take n (buff cl)]
|
|
|
+ buff' = BS.drop n (buff cl)
|
|
|
+ in return (ret, cl{buff=buff'})
|
|
|
+ else let
|
|
|
+ cl' = cl{buff=""}
|
|
|
+ b = buff cl
|
|
|
+ in fmap (appFst b) $ lazyReceiveN' cl' (n - BS.length b)
|
|
|
+ appFst :: a -> ([a], b) -> ([a], b)
|
|
|
+ appFst a (l, b) = (a:l, b)
|
|
|
+
|
|
|
+-- | Wraps the streamlined IO target on TLS, streamlining
|
|
|
+-- the new wrapper afterwads.
|
|
|
+startTls :: MonadIO m => TlsSettings -> Streamline m ()
|
|
|
+startTls st = Streamline $ \cl -> do
|
|
|
+ ds' <- liftIO $ S.startTls st $ str cl
|
|
|
+ return ((), cl{str=SomeIO ds'}{buff=""})
|
|
|
+
|
|
|
+-- | Runs an Attoparsec parser over the data read from the
|
|
|
+-- streamlined IO target. Returns both the parser
|
|
|
+-- result and the string consumed by it.
|
|
|
+runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
|
|
|
+runAttoparsecAndReturn p = Streamline $ \cl ->
|
|
|
+ if isEOF cl
|
|
|
+ then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
|
|
|
+ else do
|
|
|
+ let c = A.parse p $ buff cl
|
|
|
+ (cl', i, a) <- liftIO $ continueResult cl c
|
|
|
+ return ((i, a), cl')
|
|
|
+ where
|
|
|
+ continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
|
|
|
+ -- tx eof ds
|
|
|
+ continueResult cl c = case c of
|
|
|
+ A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
|
|
|
+ A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
|
|
|
+ A.Partial c' -> do
|
|
|
+ d <- readF cl
|
|
|
+ let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
|
|
|
+ continueResult cl' (c' d)
|
|
|
+
|
|
|
+-- | Runs an Attoparsec parser over the data read from the
|
|
|
+-- streamlined IO target. Returning the parser result.
|
|
|
+runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
|
|
|
+runAttoparsec p = Streamline $ \cl ->
|
|
|
+ if isEOF cl
|
|
|
+ then eofError "System.IO.Uniform.Streamline.runAttoparsec"
|
|
|
+ else do
|
|
|
+ let c = A.parse p $ buff cl
|
|
|
+ (cl', a) <- liftIO $ continueResult cl c
|
|
|
+ return (a, cl')
|
|
|
+ where
|
|
|
+ continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
|
|
|
+ continueResult cl c = case c of
|
|
|
+ A.Fail i _ msg -> return (cl{buff=i}, Left msg)
|
|
|
+ A.Done i r -> return (cl{buff=i}, Right r)
|
|
|
+ A.Partial c' -> do
|
|
|
+ d <- readF cl
|
|
|
+ let eof' = BS.null d
|
|
|
+ continueResult cl{buff=d}{isEOF=eof'} (c' d)
|
|
|
+
|
|
|
+-- | Indicates whether transport layer security is being used.
|
|
|
+isSecure :: Monad m => Streamline m Bool
|
|
|
+isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
|
|
|
+
|
|
|
+-- | Sets the timeout for the streamlined IO target.
|
|
|
+setTimeout :: Monad m => Int -> Streamline m ()
|
|
|
+setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
|
|
|
+
|
|
|
+-- | Sets echo of the streamlines IO target.
|
|
|
+-- If echo is set, all the data read an written to the target
|
|
|
+-- will be echoed in stdout, with ">" and "<" markers indicating
|
|
|
+-- what is read and written.
|
|
|
+setEcho :: Monad m => Bool -> Streamline m ()
|
|
|
+setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
|
|
|
+
|
|
|
+parseLine :: A.Parser ByteString
|
|
|
+parseLine = do
|
|
|
+ l <- A.takeTill isEol
|
|
|
+ (A.string "\r\n" <|> A.string "\n")
|
|
|
+ return l
|
|
|
+
|
|
|
+lineWithEol :: A.Parser (ByteString, ByteString)
|
|
|
+lineWithEol = do
|
|
|
+ l <- A.scan False lineScanner
|
|
|
+ r <- A.takeByteString
|
|
|
+ return (l, r)
|
|
|
+
|
|
|
+lineScanner :: Bool -> Word8 -> Maybe Bool
|
|
|
+lineScanner False c = Just $ isEol c
|
|
|
+lineScanner True c = if isEol c then Just True else Nothing
|
|
|
+
|
|
|
+isEol :: Word8 -> Bool
|
|
|
+isEol c = elem c (map (fromIntegral . ord) "\r\n")
|
|
|
+
|
|
|
+eofError :: MonadIO m => String -> m a
|
|
|
+eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
|