{-# LANGUAGE OverloadedStrings, TypeFamilies, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-} {- | Streamline exports a monad that, given an uniform IO target, emulates character stream IO using high performance block IO. -} module System.IO.Uniform.Streamline ( -- * Basic Type Streamline, -- * Running streamline targets -- ** Single pass runners withClient, withServer, withTarget, -- ** Interruptible support inStreamlineCtx, peelStreamlineCtx, closeTarget, -- * Sending and recieving data send, send', recieveLine, recieveLine', recieveN, recieveN', -- ** Running a parser runAttoparsec, runAttoparsecAndReturn, -- ** Scanning the input runScanner, runScanner', scan, scan', recieveTill, recieveTill', -- * Behavior changing and status startTls, isSecure, isEOF, transformTarget, limitInput, echoTo, setEcho ) where import System.IO (stdout, Handle) import qualified System.IO.Uniform as S import qualified System.IO.Uniform.Network as N import qualified System.IO.Uniform.Std as Std import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings) import System.IO.Uniform.Streamline.Scanner import Data.Default.Class import Control.Monad.Trans.Class import Control.Monad.Trans.Interruptible import Control.Monad.Trans.Control import Control.Monad import Control.Monad.Base import Control.Monad.IO.Class import System.IO.Error import Data.ByteString (ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as LBS import Data.Word8 (Word8) import Data.IP (IP) import qualified Data.Attoparsec.ByteString as A import Debug.Trace -- | Internal state for a Streamline monad data StreamlineState = StreamlineState {str :: SomeIO, buff :: ByteString, targetEOF :: Bool, echo :: Maybe Handle, inLimit :: Int} instance Default StreamlineState where -- | Will open StdIO def = StreamlineState (SomeIO Std.StdIO) BS.empty False Nothing (-1) -- | Monad that emulates character stream IO over block IO. newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)} blockSize :: Int blockSize = 4096 readF :: MonadIO m => Streamline m () readF = -- Must try just not to read more than the limit, actual limiting is done by takeBuff Streamline $ \cl -> if not . BS.null . buff $ cl then return ((), cl) else do let lim = inLimit cl sz = if lim < 0 then blockSize else if lim <= blockSize then lim else blockSize l <- liftIO $ S.uRead (str cl) sz let cl' = cl{ buff = l, targetEOF = BS.null l } case echo cl of Just h -> do liftIO $ BS.hPutStr h "< " liftIO $ BS.hPutStr h l Nothing -> return () return ((), cl') -- | Takes the buffer for processing takeBuff :: MonadIO m => Streamline m ByteString takeBuff = do readF Streamline $ \cl -> let lim = inLimit cl eof = targetEOF cl b = buff cl in if lim < 0 then return (b, cl{buff=""}) else let (r, b') = BS.splitAt lim b in return (r, cl{ buff = b', inLimit = lim - BS.length r }) -- | Pushes remaining data back into the buffer pushBuff :: Monad m => ByteString -> Streamline m () pushBuff dt = Streamline $ \cl -> let lim = inLimit cl b = buff cl newb = BS.append dt b newl = if lim < 0 then lim else lim + BS.length dt in return ((), cl{buff=newb, inLimit=newl}) writeF :: MonadIO m => ByteString -> Streamline m () writeF l = Streamline $ \cl -> case echo cl of Just h -> do liftIO $ BS.hPutStr h "> " liftIO $ BS.hPutStr h l liftIO $ S.uPut (str cl) l return ((), cl) Nothing -> liftIO $ S.uPut (str cl) l >> return ((), cl) -- | > withServer f serverIP port -- -- Connects to the given server port, runs f, and closes the connection. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a withServer host port f = do ds <- liftIO $ N.connectTo host port (ret, _) <- withTarget' f def{str=SomeIO ds} liftIO $ S.uClose ds return ret -- | > withClient f boundPort -- -- Accepts a connection at the bound port, runs f and closes the connection. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a withClient port f = do ds <- liftIO $ N.accept port (peerIp, peerPort) <- liftIO $ N.getPeer ds (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds} liftIO $ S.uClose ds return ret {- | > withTarget f someIO Runs f wrapped on a Streamline monad that does IO on someIO. -} withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b withTarget s f = do (r, _) <- withTarget' f def{str=SomeIO s} return r instance Monad m => Monad (Streamline m) where --return :: (Monad m) => a -> Streamline m a return x = Streamline $ \cl -> return (x, cl) --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b a >>= b = Streamline $ \cl -> do (x, cl') <- withTarget' a cl withTarget' (b x) cl' instance Monad m => Functor (Streamline m) where --fmap :: (a -> b) -> Streamline m a -> Streamline m b fmap f m = Streamline $ \cl -> do (x, cl') <- withTarget' m cl return (f x, cl') instance Monad m => Applicative (Streamline m) where pure = return (<*>) = ap instance MonadTrans Streamline where --lift :: Monad m => m a -> Streamline m a lift x = Streamline $ \cl -> do a <- x return (a, cl) instance MonadIO m => MonadIO (Streamline m) where liftIO = lift . liftIO -- | Sends data over the IO target. send :: MonadIO m => ByteString -> Streamline m () send r = writeF r -- | Sends data from a lazy byte string send' :: MonadIO m => LBS.ByteString -> Streamline m () send' r = do let dd = LBS.toChunks r mapM_ writeF dd {- | Very much like Attoparsec's runScanner: > runScanner scanner initial_state Recieves data, running the scanner on each byte, using the scanner result as initial state for the next byte, and stopping when the scanner returns Nothing. Returns the scanned ByteString. -} runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s) runScanner state scanner = do (rt, st) <- runScanner' state scanner return (LBS.toStrict rt, st) -- | Equivalent to runScanner, but returns a lazy ByteString runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s) runScanner' state scanner = do (tx, st) <- in_scan state return (LBS.fromChunks tx, st) where --in_scan :: MonadIO m => s -> Streamline m ([ByteString], s) in_scan st = do d <- takeBuff if BS.null d then return ([], st) else case sscan scanner st 0 $ BS.unpack d of AllInput st' -> do (tx', st'') <- in_scan st' return (d:tx', st'') SplitAt n st' -> do let (r, i) = BS.splitAt n d pushBuff i return ([r], st') -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s sscan _ s0 _ [] = AllInput s0 sscan s s0 i (w:ww) = case s s0 w of Finished -> SplitAt i s0 LastPass s1 -> SplitAt (i+1) s1 Running s1 -> sscan s s1 (i+1) ww data ScanResult s = SplitAt Int s | AllInput s -- | Equivalent to runScanner, but discards the final state scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString scan state scanner = fst <$> runScanner state scanner -- | Equivalent to runScanner', but discards the final state scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString scan' state scanner = fst <$> runScanner' state scanner -- | Recieves data untill the next end of line (\n or \r\n) recieveLine :: MonadIO m => Streamline m ByteString recieveLine = recieveTill "\n" -- | Lazy version of recieveLine recieveLine' :: MonadIO m => Streamline m LBS.ByteString recieveLine' = recieveTill' "\n" {- | Recieves the given number of bytes, or less in case of end of file. -} recieveN :: MonadIO m => Int -> Streamline m ByteString recieveN n = LBS.toStrict <$> recieveN' n -- | Lazy version of recieveN recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString recieveN' n = LBS.fromChunks <$> recieve n where recieve sz | sz <= 0 = return [] | otherwise = do d <- takeBuff if BS.null d then return [] else do let (h, t) = BS.splitAt sz d sz' = sz - BS.length h unless (BS.null t) $ pushBuff t r <- recieve sz' return $ h : r -- | Recieves data until it matches the argument. -- Returns all of it, including the matching data. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString recieveTill t = LBS.toStrict <$> recieveTill' t -- | Lazy version of recieveTill recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString recieveTill' = recieve . BS.unpack where recieve t' = scan' [] (textScanner t') -- | Wraps the streamlined IO target on TLS, streamlining -- the new wrapper afterwads. startTls :: MonadIO m => TlsSettings -> Streamline m () startTls st = Streamline $ \cl -> do ds' <- liftIO $ S.startTls st $ str cl return ((), cl{str=SomeIO ds'}{buff=""}) -- | Runs an Attoparsec parser over the data read from the -- streamlined IO target. Returns both the parser -- result and the string consumed by it. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a) runAttoparsecAndReturn p = do d <- takeBuff let c = A.parse p d continueResult c d [d] where continueResult c d dd = case c of A.Fail i _ msg -> do pushBuff $ BS.concat (reverse dd) `BS.append` i return (BS.take (BS.length d - BS.length i) d, Left msg) A.Done i r -> do pushBuff i return (BS.concat (reverse dd) `BS.append` BS.take (BS.length d - BS.length i) d, Right r) A.Partial c' -> do dt <- takeBuff continueResult (c' dt) dt $ dt:dd -- | Runs an Attoparsec parser over the data read from the -- streamlined IO target. Returning the parser result. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a) runAttoparsec p = snd <$> runAttoparsecAndReturn p -- | Indicates whether transport layer security is being used. isSecure :: Monad m => Streamline m Bool isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl) {- | True if the input is at the end of stream. If the input is limited, will be true when either the limit is reached, or the underlining target was at actual end of stream. In case of EOF due to reaching the limit, changing the limit will immediately make this return False again. -} isEOF :: Monad m => Streamline m Bool isEOF = Streamline $ \cl -> return (targetEOF cl || inLimit cl == 0, cl) -- | Sets echo of the streamlines IO target. -- If echo is set, all the data read an written to the target -- will be echoed in stdout, with ">" and "<" markers indicating -- what is read and written. setEcho :: Monad m => Bool -> Streamline m () setEcho e = Streamline $ \cl -> if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing}) {- | Replaces the enclosed target with the result of the given transformation. Discards all buffered data in the process. -} transformTarget :: (UniformIO a, Monad m) => (SomeIO -> a) -> Streamline m () transformTarget w = Streamline $ \cl -> return ((), cl{str = SomeIO . w . str $ cl}) {- | Limits the input to the given number of bytes, emulating an end of file after them. If the limit is negative, the input will not be limited. -} limitInput :: Monad m => Int -> Streamline m () limitInput n = Streamline $ \cl -> return ((), cl{inLimit = n}) {- | Sets echo of the streamlined IO target. If echo is set, all the data read an written to the target will be echoed to the handle, with ">" and "<" markers indicating what is read and written. Setting to Nothing will disable echo. -} echoTo :: Monad m => Maybe Handle -> Streamline m () echoTo h = Streamline $ \cl -> return ((), cl{echo=h}) eofError :: MonadIO m => String -> m a eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing instance Interruptible Streamline where type RSt Streamline a = (a, StreamlineState) resume f (a, st) = withTarget' (f a) st -- | Creates a Streamline interrutible context inStreamlineCtx :: UniformIO io => io -> a -> RSt Streamline a inStreamlineCtx io a = (a, def{str = SomeIO io}) -- | Closes the target of a streamline state, releasing any resource. closeTarget :: MonadIO m => Streamline m () closeTarget = Streamline $ \st -> do liftIO . S.uClose . str $ st return ((), st) -- | Removes a Streamline interruptible context peelStreamlineCtx :: RSt Streamline a -> (a, SomeIO) peelStreamlineCtx (a, dt) = (a, str dt) instance MonadTransControl Streamline where type StT Streamline a = (a, StreamlineState) liftWith f = Streamline $ \s -> liftM (\x -> (x, s)) (f $ \t -> withTarget' t s) restoreT = Streamline . const instance MonadBase b m => MonadBase b (Streamline m) where liftBase = liftBaseDefault instance MonadBaseControl b m => MonadBaseControl b (Streamline m) where type StM (Streamline m) a = ComposeSt Streamline m a liftBaseWith = defaultLiftBaseWith restoreM = defaultRestoreM