{-# LANGUAGE OverloadedStrings #-} {- | Streamline exports a monad that, given an uniform IO target, emulates character stream IO using high performance block IO. -} module System.IO.Uniform.Streamline ( -- * Basic Type Streamline, -- * Running streamline targets -- ** Single pass runners withClient, withServer, withTarget, -- ** Several pass runner StreamlineState, streamline, resume, close, remaining, -- * Sending and recieving data send, send', recieveLine, recieveLine', recieveN, recieveN', -- ** Running a parser runAttoparsec, runAttoparsecAndReturn, -- ** Scanning the input runScanner, runScanner', scan, scan', recieveTill, recieveTill', -- ** Deprecated functions lazyRecieveLine, lazyRecieveN, -- * Behavior settings startTls, isSecure, setTimeout, echoTo, setEcho ) where import System.IO (stdout, Handle) import qualified System.IO.Uniform as S import qualified System.IO.Uniform.Network as N import qualified System.IO.Uniform.Std as Std import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings) import System.IO.Uniform.Streamline.Scanner import Data.Default.Class import Control.Monad.Trans.Class import Control.Applicative import Control.Monad (ap) import Control.Monad.IO.Class import System.IO.Error import Data.ByteString (ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as LBS import Data.Word8 (Word8) import Data.IP (IP) import qualified Data.Char as C import qualified Data.Attoparsec.ByteString as A -- | Internal state for a Streamline monad data StreamlineState = StreamlineState {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Maybe Handle} instance Default StreamlineState where -- | Will open StdIO def = StreamlineState (SomeIO Std.StdIO) defaultTimeout BS.empty False Nothing -- | Monad that emulates character stream IO over block IO. newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)} blockSize :: Int blockSize = 4096 defaultTimeout :: Int defaultTimeout = 1000000 * 600 readF :: MonadIO m => StreamlineState -> m ByteString readF cl = case echo cl of Just h -> do l <- liftIO $ S.uRead (str cl) blockSize liftIO $ BS.hPutStr h "<" liftIO $ BS.hPutStr h l return l Nothing -> liftIO $ S.uRead (str cl) blockSize writeF :: MonadIO m => StreamlineState -> ByteString -> m () writeF cl l = case echo cl of Just h -> do liftIO $ BS.hPutStr h ">" liftIO $ BS.hPutStr h l liftIO $ S.uPut (str cl) l Nothing -> liftIO $ S.uPut (str cl) l -- | withServer f serverIP port -- -- Connects to the given server port, runs f, and closes the connection. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a withServer host port f = do ds <- liftIO $ N.connectTo host port (ret, _) <- withTarget' f def{str=SomeIO ds} liftIO $ S.uClose ds return ret -- | withClient f boundPort -- -- Accepts a connection at the bound port, runs f and closes the connection. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a withClient port f = do ds <- liftIO $ N.accept port (peerIp, peerPort) <- liftIO $ N.getPeer ds (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds} liftIO $ S.uClose ds return ret {- | withTarget f someIO Runs f wrapped on a Streamline monad that does IO on someIO. -} withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b withTarget s f = do (r, _) <- withTarget' f def{str=SomeIO s} return r {- | Run f wrapped on a Streamline monad, returning the final state in a way that can be continued with "resume". If run with this function, the state must be closed, explicitly with "close" or implicitly with "remaining". -} streamline :: (Monad m, UniformIO a) => a -> Streamline m b -> m (b, StreamlineState) streamline s f = withTarget' f def{str=SomeIO s} {- | Continues the execution of functions on a Streamline monad comming from "start" or another "resume" call. -} resume :: Monad m => StreamlineState -> Streamline m b -> m (b, StreamlineState) resume dt f = withTarget' f dt instance Monad m => Monad (Streamline m) where --return :: (Monad m) => a -> Streamline m a return x = Streamline $ \cl -> return (x, cl) --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b a >>= b = Streamline $ \cl -> do (x, cl') <- withTarget' a cl withTarget' (b x) cl' instance Monad m => Functor (Streamline m) where --fmap :: (a -> b) -> Streamline m a -> Streamline m b fmap f m = Streamline $ \cl -> do (x, cl') <- withTarget' m cl return (f x, cl') instance (Functor m, Monad m) => Applicative (Streamline m) where pure = return (<*>) = ap instance MonadTrans Streamline where --lift :: Monad m => m a -> Streamline m a lift x = Streamline $ \cl -> do a <- x return (a, cl) instance MonadIO m => MonadIO (Streamline m) where liftIO = lift . liftIO -- | Sends data over the IO target. send :: MonadIO m => ByteString -> Streamline m () send r = Streamline $ \cl -> do writeF cl r return ((), cl) -- | Sends data from a lazy byte string send' :: MonadIO m => LBS.ByteString -> Streamline m () send' r = Streamline $ \cl -> do let dd = LBS.toChunks r mapM (writeF cl) dd return ((), cl) {- | Very much like Attoparsec's runScanner: runScanner scanner initial_state Recieves data, running the scanner on each byte, using the scanner result as initial state for the next byte, and stopping when the scanner returns Nothing. Returns the scanned ByteString. -} runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s) runScanner state scanner = do (rt, st) <- runScanner' state scanner return (LBS.toStrict rt, st) -- | Equivalent to runScanner, but returns a strict, completely -- evaluated ByteString. runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s) runScanner' state scanner = Streamline $ \d -> do (tx, st, d') <- in_scan d state return ((LBS.fromChunks tx, st), d') where --in_scan :: StreamlineState -> s -> m ([ByteString], s, StreamlineState) in_scan d st | isEOF d = eofError "System.IO.Uniform.Streamline.scan'" | BS.null (buff d) = do dt <- readF d if BS.null dt then return ([], st, d{isEOF=True}) else in_scan d{buff=dt} st | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of AllInput st' -> do (tx', st'', d') <- in_scan d{buff=""} st' return (buff d:tx', st'', d') SplitAt n st' -> let (r, i) = BS.splitAt n (buff d) in return ([r], st', d{buff=i}) -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s sscan _ s0 _ [] = AllInput s0 sscan s s0 i (w:ww) = case s s0 w of Finished -> SplitAt i s0 LastPass s1 -> SplitAt (i+1) s1 Running s1 -> sscan s s1 (i+1) ww data ScanResult s = SplitAt Int s | AllInput s -- | Equivalent to runScanner, but discards the final state scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString scan state scanner = fst <$> runScanner state scanner -- | Equivalent to runScanner', but discards the final state scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString scan' state scanner = fst <$> runScanner' state scanner -- | Recieves data untill the next end of line (\n or \r\n) recieveLine :: MonadIO m => Streamline m ByteString recieveLine = recieveTill "\n" -- | Lazy version of recieveLine recieveLine' :: MonadIO m => Streamline m LBS.ByteString recieveLine' = recieveTill' "\n" -- | Use recieveLine'. lazyRecieveLine :: MonadIO m => Streamline m [ByteString] {-# DEPRECATED #-} lazyRecieveLine = Streamline $ \cl -> lazyRecieveLine' cl where lazyRecieveLine' :: MonadIO m => StreamlineState -> m ([ByteString], StreamlineState) lazyRecieveLine' cl' = if isEOF cl' then eofError "System.IO.Uniform.Streamline.lazyRecieveLine" else if BS.null $ buff cl' then do dt <- readF cl' lazyRecieveLine' cl'{buff=dt}{isEOF=BS.null dt} else do let l = A.parseOnly lineWithEol $ buff cl' case l of Left _ -> do l' <- readF cl' (ret, cl'') <- lazyRecieveLine' cl'{buff=l'}{isEOF=BS.null l'} return ((buff cl') : ret, cl'') Right (ret, dt) -> return ([ret], cl'{buff=dt}) -- | Recieves the given number of bytes. recieveN :: MonadIO m => Int -> Streamline m ByteString recieveN n = LBS.toStrict <$> recieveN' n -- | Lazy version of recieveN recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString recieveN' n | n <= 0 = return "" | otherwise = Streamline $ \cl -> do (tt, cl') <- recieve cl n return (LBS.fromChunks tt, cl') where recieve d b | isEOF d = eofError "System.IO.Uniform.Streamline.lazyRecieveN" | BS.null . buff $ d = do dt <- readF d recieve d{buff=dt}{isEOF=BS.null dt} b | b <= (BS.length . buff $ d) = let (r, dt) = BS.splitAt b $ buff d in return ([r], d{buff=dt}) | otherwise = do (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d) return (buff d : r, d') -- | Use recieveN'. lazyRecieveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString] {-# DEPRECATED #-} lazyRecieveN n' = Streamline $ \cl' -> lazyRecieveN' cl' n' where lazyRecieveN' :: (Functor m, MonadIO m) => StreamlineState -> Int -> m ([ByteString], StreamlineState) lazyRecieveN' cl n = if isEOF cl then eofError "System.IO.Uniform.Streamline.lazyRecieveN" else if BS.null (buff cl) then do b <- readF cl let eof = BS.null b let cl' = cl{buff=b}{isEOF=eof} lazyRecieveN' cl' n else if n <= BS.length (buff cl) then let ret = [BS.take n (buff cl)] buff' = BS.drop n (buff cl) in return (ret, cl{buff=buff'}) else let cl' = cl{buff=""} b = buff cl in fmap (appFst b) $ lazyRecieveN' cl' (n - BS.length b) appFst :: a -> ([a], b) -> ([a], b) appFst a (l, b) = (a:l, b) -- | Recieves data until it matches the argument. -- Returns all of it, including the matching data. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString recieveTill t = LBS.toStrict <$> recieveTill' t -- | Lazy version of recieveTill recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString recieveTill' t = recieve . BS.unpack $ t where recieve t' = scan' [] (textScanner t') -- | Wraps the streamlined IO target on TLS, streamlining -- the new wrapper afterwads. startTls :: MonadIO m => TlsSettings -> Streamline m () startTls st = Streamline $ \cl -> do ds' <- liftIO $ S.startTls st $ str cl return ((), cl{str=SomeIO ds'}{buff=""}) -- | Runs an Attoparsec parser over the data read from the -- streamlined IO target. Returns both the parser -- result and the string consumed by it. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a) runAttoparsecAndReturn p = Streamline $ \cl -> if isEOF cl then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn" else do let c = A.parse p $ buff cl (cl', i, a) <- liftIO $ continueResult cl c return ((i, a), cl') where continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, ByteString, (Either String a)) -- tx eof ds continueResult cl c = case c of A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg) A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r) A.Partial c' -> do d <- readF cl let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d} continueResult cl' (c' d) -- | Runs an Attoparsec parser over the data read from the -- streamlined IO target. Returning the parser result. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a) runAttoparsec p = Streamline $ \cl -> if isEOF cl then eofError "System.IO.Uniform.Streamline.runAttoparsec" else do let c = A.parse p $ buff cl (cl', a) <- liftIO $ continueResult cl c return (a, cl') where continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, (Either String a)) continueResult cl c = case c of A.Fail i _ msg -> return (cl{buff=i}, Left msg) A.Done i r -> return (cl{buff=i}, Right r) A.Partial c' -> do d <- readF cl let eof' = BS.null d continueResult cl{buff=d}{isEOF=eof'} (c' d) -- | Indicates whether transport layer security is being used. isSecure :: Monad m => Streamline m Bool isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl) -- | Sets the timeout for the streamlined IO target. setTimeout :: Monad m => Int -> Streamline m () setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t}) -- | Sets echo of the streamlines IO target. -- If echo is set, all the data read an written to the target -- will be echoed in stdout, with ">" and "<" markers indicating -- what is read and written. setEcho :: Monad m => Bool -> Streamline m () setEcho e = Streamline $ \cl -> if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing}) {- | Sets echo of the streamlined IO target. If echo is set, all the data read an written to the target will be echoed to the handle, with ">" and "<" markers indicating what is read and written. Setting to Nothing will disable echo. -} echoTo :: Monad m => Maybe Handle -> Streamline m () echoTo h = Streamline $ \cl -> return ((), cl{echo=h}) lineWithEol :: A.Parser (ByteString, ByteString) lineWithEol = do l <- A.scan False lineScanner r <- A.takeByteString return (l, r) eofError :: MonadIO m => String -> m a eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing lineScanner :: Bool -> Word8 -> Maybe Bool lineScanner False c | c == (fromIntegral . C.ord $ '\n') = Just True | otherwise = Just False lineScanner True _ = Nothing {- | Closes the target of a streamline state, releasing any used resource. -} close :: MonadIO m => StreamlineState -> m () close st = liftIO . S.uClose . str $ st {- | Retrieves the remaining contents of a streamline state, closing it afterwards. -} remaining :: MonadIO m => StreamlineState -> m LBS.ByteString remaining st = if isEOF st then close st >> return LBS.empty else if BS.null . buff $ st then do dt <- readF st remaining st{buff=dt}{isEOF=BS.null dt} else do dt' <- remaining st{buff=BS.empty} return $ LBS.append (LBS.fromStrict . buff $ st) dt'