@@ -1,41 +1,58 @@
{-# LANGUAGE OverloadedStrings #-}
+Streamline exports a monad that, given an uniform IO target, emulates
+character stream IO using high performance block IO.
module System.IO.Uniform.Streamline (
- IOScannerState(..),
+ StreamlineState,
+ streamline,
+ resume,
+ close,
+ remaining,
- lazyRecieveLine,
- lazyRecieveN,
- recieveTill,
- recieveTill',
- startTls,
- isSecure,
- setTimeout,
- setEcho,
- textScanner
+ recieveTill,
+ recieveTill',
+ lazyRecieveLine,
+ lazyRecieveN,
+ startTls,
+ isSecure,
+ setTimeout,
+ echoTo,
+ setEcho
) where
+import System.IO (stdout, Handle)
import qualified System.IO.Uniform as S
import qualified System.IO.Uniform.Network as N
+import qualified System.IO.Uniform.Std as Std
import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
import System.IO.Uniform.Streamline.Scanner
+import Data.Default.Class
import Control.Monad.Trans.Class
import Control.Applicative
@@ -51,31 +68,36 @@ import qualified Data.Char as C
import qualified Data.Attoparsec.ByteString as A
-data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
+data StreamlineState = StreamlineState {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Maybe Handle}
+instance Default StreamlineState where
+ def = StreamlineState (SomeIO Std.StdIO) defaultTimeout BS.empty False Nothing
-newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
+newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)}
blockSize :: Int
blockSize = 4096
defaultTimeout :: Int
defaultTimeout = 1000000 * 600
-readF :: MonadIO m => Data -> m ByteString
-readF cl = if echo cl
- then do
- l <- liftIO $ S.uRead (str cl) blockSize
- liftIO $ BS.putStr "<"
- liftIO $ BS.putStr l
- return l
- else liftIO $ S.uRead (str cl) blockSize
-writeF :: MonadIO m => Data -> ByteString -> m ()
-writeF cl l = if echo cl
- then do
- liftIO $ BS.putStr ">"
- liftIO $ BS.putStr l
- liftIO $ S.uPut (str cl) l
- else liftIO $ S.uPut (str cl) l
+readF :: MonadIO m => StreamlineState -> m ByteString
+readF cl = case echo cl of
+ Just h -> do
+ l <- liftIO $ S.uRead (str cl) blockSize
+ liftIO $ BS.hPutStr h "<"
+ liftIO $ BS.hPutStr h l
+ return l
+ Nothing -> liftIO $ S.uRead (str cl) blockSize
+writeF :: MonadIO m => StreamlineState -> ByteString -> m ()
+writeF cl l = case echo cl of
+ Just h -> do
+ liftIO $ BS.hPutStr h ">"
+ liftIO $ BS.hPutStr h l
+ liftIO $ S.uPut (str cl) l
+ Nothing -> liftIO $ S.uPut (str cl) l
@@ -83,7 +105,7 @@ writeF cl l = if echo cl
withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
withServer host port f = do
ds <- liftIO $ N.connectTo host port
- (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
+ (ret, _) <- withTarget' f def{str=SomeIO ds}
liftIO $ S.uClose ds
return ret
@@ -94,17 +116,37 @@ withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
withClient port f = do
ds <- liftIO $ N.accept port
(peerIp, peerPort) <- liftIO $ N.getPeer ds
- (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
+ (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds}
liftIO $ S.uClose ds
return ret
-withTarget :: (MonadIO m, UniformIO a) => a -> Streamline m b -> m b
-withTarget s f = do
- (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
- return ret
+withTarget f someIO
+Runs f wrapped on a Streamline monad that does IO on someIO.
+withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b
+withTarget s f = do
+ (r, _) <- withTarget' f def{str=SomeIO s}
+ return r
+Run f wrapped on a Streamline monad, returning the final state in a way that
+can be continued with "resume".
+If run with this function, the state must be closed, explicitly with "close" or
+implicitly with "remaining".
+streamline :: (Monad m, UniformIO a) => a -> Streamline m b -> m (b, StreamlineState)
+streamline s f = withTarget' f def{str=SomeIO s}
+Continues the execution of functions on a Streamline monad comming from
+"start" or another "resume" call.
+resume :: Monad m => StreamlineState -> Streamline m b -> m (b, StreamlineState)
+resume dt f = withTarget' f dt
instance Monad m => Monad (Streamline m) where
@@ -133,7 +175,7 @@ instance MonadTrans Streamline where
instance MonadIO m => MonadIO (Streamline m) where
liftIO = lift . liftIO
send :: MonadIO m => ByteString -> Streamline m ()
send r = Streamline $ \cl -> do
writeF cl r
@@ -146,17 +188,10 @@ send' r = Streamline $ \cl -> do
mapM (writeF cl) dd
return ((), cl)
-runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
-runScanner state scanner = do
- (rt, st) <- runScanner' state scanner
- return (LBS.toStrict rt, st)
Very much like Attoparsec's runScanner:
-runScanner' scanner initial_state
+runScanner scanner initial_state
Recieves data, running the scanner on each byte,
using the scanner result as initial state for the
@@ -165,13 +200,20 @@ Nothing.
Returns the scanned ByteString.
+runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
+runScanner state scanner = do
+ (rt, st) <- runScanner' state scanner
+ return (LBS.toStrict rt, st)
runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
runScanner' state scanner = Streamline $ \d ->
(tx, st, d') <- in_scan d state
return ((LBS.fromChunks tx, st), d')
in_scan d st
| isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
| BS.null (buff d) = do
@@ -197,11 +239,11 @@ runScanner' state scanner = Streamline $ \d ->
data ScanResult s = SplitAt Int s | AllInput s
scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
scan state scanner = fst <$> runScanner state scanner
scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
scan' state scanner = fst <$> runScanner' state scanner
@@ -218,7 +260,7 @@ lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
lazyRecieveLine = Streamline $ \cl -> lazyRecieveLine' cl
- lazyRecieveLine' :: MonadIO m => Data -> m ([ByteString], Data)
+ lazyRecieveLine' :: MonadIO m => StreamlineState -> m ([ByteString], StreamlineState)
lazyRecieveLine' cl' =
if isEOF cl'
then eofError "System.IO.Uniform.Streamline.lazyRecieveLine"
@@ -264,7 +306,7 @@ lazyRecieveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
lazyRecieveN n' = Streamline $ \cl' -> lazyRecieveN' cl' n'
- lazyRecieveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
+ lazyRecieveN' :: (Functor m, MonadIO m) => StreamlineState -> Int -> m ([ByteString], StreamlineState)
lazyRecieveN' cl n =
if isEOF cl
then eofError "System.IO.Uniform.Streamline.lazyRecieveN"
@@ -318,7 +360,7 @@ runAttoparsecAndReturn p = Streamline $ \cl ->
(cl', i, a) <- liftIO $ continueResult cl c
return ((i, a), cl')
- continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
+ continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, ByteString, (Either String a))
continueResult cl c = case c of
A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
@@ -339,7 +381,7 @@ runAttoparsec p = Streamline $ \cl ->
(cl', a) <- liftIO $ continueResult cl c
return (a, cl')
- continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
+ continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, (Either String a))
continueResult cl c = case c of
A.Fail i _ msg -> return (cl{buff=i}, Left msg)
A.Done i r -> return (cl{buff=i}, Right r)
@@ -347,7 +389,7 @@ runAttoparsec p = Streamline $ \cl ->
d <- readF cl
let eof' = BS.null d
continueResult cl{buff=d}{isEOF=eof'} (c' d)
isSecure :: Monad m => Streamline m Bool
isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
@@ -361,7 +403,20 @@ setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
setEcho :: Monad m => Bool -> Streamline m ()
-setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
+setEcho e = Streamline $ \cl ->
+ if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing})
+Sets echo of the streamlined IO target.
+If echo is set, all the data read an written to the target
+will be echoed to the handle, with ">" and "<" markers indicating
+what is read and written.
+Setting to Nothing will disable echo.
+echoTo :: Monad m => Maybe Handle -> Streamline m ()
+echoTo h = Streamline $ \cl -> return ((), cl{echo=h})
lineWithEol :: A.Parser (ByteString, ByteString)
lineWithEol = do
@@ -377,3 +432,24 @@ lineScanner False c
| c == (fromIntegral . C.ord $ '\n') = Just True
| otherwise = Just False
lineScanner True _ = Nothing
+Closes the target of a streamline state, releasing any used resource.
+close :: MonadIO m => StreamlineState -> m ()
+close st = liftIO . S.uClose . str $ st
+Retrieves the remaining contents of a streamline state, closing it afterwards.
+remaining :: MonadIO m => StreamlineState -> m LBS.ByteString
+remaining st =
+ if isEOF st then close st >> return LBS.empty
+ else
+ if BS.null . buff $ st
+ then do
+ dt <- readF st
+ remaining st{buff=dt}{isEOF=BS.null dt}
+ else do
+ dt' <- remaining st{buff=BS.empty}
+ return $ LBS.append (LBS.fromStrict . buff $ st) dt'