Streamline.hs 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. {-# LANGUAGE OverloadedStrings #-}
  2. -- |
  3. -- Streamline exports a monad that, given an uniform IO target, emulates
  4. -- character tream IO using high performance block IO.
  5. module System.IO.Uniform.Streamline (Streamline, withClient, withServer, withTarget, send, send', receiveLine, lazyRecieveLine, lazyReceiveN, startTls, runAttoparsec, runAttoparsecAndReturn, isSecure, setTimeout, setEcho) where
  6. import qualified System.IO.Uniform as S
  7. import qualified System.IO.Uniform.Network as N
  8. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  9. import Control.Monad.Trans.Class
  10. import Control.Applicative
  11. import Control.Monad (ap)
  12. import Control.Monad.IO.Class
  13. import System.IO.Error
  14. import Data.ByteString (ByteString)
  15. import qualified Data.ByteString as BS
  16. import qualified Data.ByteString.Lazy as LBS
  17. import Data.Word8 (Word8)
  18. import Data.IP (IP)
  19. import qualified Data.Attoparsec.ByteString as A
  20. data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
  21. -- | Monad that emulates character stream IO over block IO.
  22. newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
  23. blockSize :: Int
  24. blockSize = 4096
  25. defaultTimeout :: Int
  26. defaultTimeout = 1000000 * 600
  27. readF :: MonadIO m => Data -> m ByteString
  28. readF cl = if echo cl
  29. then do
  30. l <- liftIO $ S.uRead (str cl) blockSize
  31. liftIO $ BS.putStr "<"
  32. liftIO $ BS.putStr l
  33. return l
  34. else liftIO $ S.uRead (str cl) blockSize
  35. writeF :: MonadIO m => Data -> ByteString -> m ()
  36. writeF cl l = if echo cl
  37. then do
  38. liftIO $ BS.putStr ">"
  39. liftIO $ BS.putStr l
  40. liftIO $ S.uPut (str cl) l
  41. else liftIO $ S.uPut (str cl) l
  42. -- | withServer f serverIP port
  43. --
  44. -- Connects to the given server port, runs f, and closes the connection.
  45. withServer :: MonadIO m => Streamline m a -> IP -> Int -> m a
  46. withServer f host port = do
  47. ds <- liftIO $ N.connectTo host port
  48. (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
  49. liftIO $ S.uClose ds
  50. return ret
  51. -- | withClient f boundPort
  52. --
  53. -- Accepts a connection at the bound port, runs f and closes the connection.
  54. withClient :: MonadIO m => (IP -> Int -> Streamline m a) -> N.BoundedPort -> m a
  55. withClient f port = do
  56. ds <- liftIO $ N.accept port
  57. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  58. (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
  59. liftIO $ S.uClose ds
  60. return ret
  61. -- | withTarget f someIO
  62. --
  63. -- Runs f wrapped on a Streamline monad that does IO on nomeIO.
  64. withTarget :: (MonadIO m, UniformIO a) => Streamline m b -> a -> m b
  65. withTarget f s = do
  66. (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
  67. return ret
  68. instance Monad m => Monad (Streamline m) where
  69. --return :: (Monad m) => a -> Streamline m a
  70. return x = Streamline $ \cl -> return (x, cl)
  71. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  72. a >>= b = Streamline $ \cl -> do
  73. (x, cl') <- withTarget' a cl
  74. withTarget' (b x) cl'
  75. instance Monad m => Functor (Streamline m) where
  76. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  77. fmap f m = Streamline $ \cl -> do
  78. (x, cl') <- withTarget' m cl
  79. return (f x, cl')
  80. instance (Functor m, Monad m) => Applicative (Streamline m) where
  81. pure = return
  82. (<*>) = ap
  83. instance MonadTrans Streamline where
  84. --lift :: Monad m => m a -> Streamline m a
  85. lift x = Streamline $ \cl -> do
  86. a <- x
  87. return (a, cl)
  88. instance MonadIO m => MonadIO (Streamline m) where
  89. liftIO = lift . liftIO
  90. -- | Sends data over the streamlines an IO target.
  91. send :: MonadIO m => ByteString -> Streamline m ()
  92. send r = Streamline $ \cl -> do
  93. writeF cl r
  94. return ((), cl)
  95. -- | Sends data from a lazy byte string
  96. send' :: MonadIO m => LBS.ByteString -> Streamline m ()
  97. send' r = Streamline $ \cl -> do
  98. let dd = LBS.toChunks r
  99. mapM (writeF cl) dd
  100. return ((), cl)
  101. -- | Receives a line from the streamlined IO target.
  102. receiveLine :: MonadIO m => Streamline m ByteString
  103. receiveLine = do
  104. l <- runAttoparsec parseLine
  105. case l of
  106. Left _ -> return ""
  107. Right l' -> return l'
  108. -- | Receives a line from the streamlined IO target,
  109. -- but breaks the line on reasonably sized chuncks, and
  110. -- reads them lazyly, so that IO can be done in constant
  111. -- memory space.
  112. lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
  113. lazyRecieveLine = Streamline $ \cl -> lazyReceiveLine' cl
  114. where
  115. lazyReceiveLine' :: MonadIO m => Data -> m ([ByteString], Data)
  116. lazyReceiveLine' cl' =
  117. if isEOF cl'
  118. then eofError "System.IO.Uniform.Streamline.lazyReceiveLine"
  119. else
  120. if BS.null $ buff cl'
  121. then do
  122. dt <- readF cl'
  123. lazyReceiveLine' cl'{buff=dt}{isEOF=BS.null dt}
  124. else do
  125. let l = A.parseOnly lineWithEol $ buff cl'
  126. case l of
  127. Left _ -> do
  128. l' <- readF cl'
  129. (ret, cl'') <- lazyReceiveLine' cl'{buff=l'}{isEOF=BS.null l'}
  130. return ((buff cl') : ret, cl'')
  131. Right (ret, dt) -> return ([ret], cl'{buff=dt})
  132. -- | lazyReceiveN n
  133. -- Receives n bytes of data from the streamlined IO target,
  134. -- but breaks the data on reasonably sized chuncks, and reads
  135. -- them lazyly, so that IO can be done in constant memory space.
  136. lazyReceiveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
  137. lazyReceiveN n' = Streamline $ \cl' -> lazyReceiveN' cl' n'
  138. where
  139. lazyReceiveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
  140. lazyReceiveN' cl n =
  141. if isEOF cl
  142. then eofError "System.IO.Uniform.Streamline.lazyReceiveN"
  143. else
  144. if BS.null (buff cl)
  145. then do
  146. b <- readF cl
  147. let eof = BS.null b
  148. let cl' = cl{buff=b}{isEOF=eof}
  149. lazyReceiveN' cl' n
  150. else
  151. if n <= BS.length (buff cl)
  152. then let
  153. ret = [BS.take n (buff cl)]
  154. buff' = BS.drop n (buff cl)
  155. in return (ret, cl{buff=buff'})
  156. else let
  157. cl' = cl{buff=""}
  158. b = buff cl
  159. in fmap (appFst b) $ lazyReceiveN' cl' (n - BS.length b)
  160. appFst :: a -> ([a], b) -> ([a], b)
  161. appFst a (l, b) = (a:l, b)
  162. -- | Wraps the streamlined IO target on TLS, streamlining
  163. -- the new wrapper afterwads.
  164. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  165. startTls st = Streamline $ \cl -> do
  166. ds' <- liftIO $ S.startTls st $ str cl
  167. return ((), cl{str=SomeIO ds'}{buff=""})
  168. -- | Runs an Attoparsec parser over the data read from the
  169. -- streamlined IO target. Returns both the parser
  170. -- result and the string consumed by it.
  171. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  172. runAttoparsecAndReturn p = Streamline $ \cl ->
  173. if isEOF cl
  174. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  175. else do
  176. let c = A.parse p $ buff cl
  177. (cl', i, a) <- liftIO $ continueResult cl c
  178. return ((i, a), cl')
  179. where
  180. continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
  181. -- tx eof ds
  182. continueResult cl c = case c of
  183. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  184. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  185. A.Partial c' -> do
  186. d <- readF cl
  187. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  188. continueResult cl' (c' d)
  189. -- | Runs an Attoparsec parser over the data read from the
  190. -- streamlined IO target. Returning the parser result.
  191. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  192. runAttoparsec p = Streamline $ \cl ->
  193. if isEOF cl
  194. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  195. else do
  196. let c = A.parse p $ buff cl
  197. (cl', a) <- liftIO $ continueResult cl c
  198. return (a, cl')
  199. where
  200. continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
  201. continueResult cl c = case c of
  202. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  203. A.Done i r -> return (cl{buff=i}, Right r)
  204. A.Partial c' -> do
  205. d <- readF cl
  206. let eof' = BS.null d
  207. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  208. -- | Indicates whether transport layer security is being used.
  209. isSecure :: Monad m => Streamline m Bool
  210. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  211. -- | Sets the timeout for the streamlined IO target.
  212. setTimeout :: Monad m => Int -> Streamline m ()
  213. setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
  214. -- | Sets echo of the streamlines IO target.
  215. -- If echo is set, all the data read an written to the target
  216. -- will be echoed in stdout, with ">" and "<" markers indicating
  217. -- what is read and written.
  218. setEcho :: Monad m => Bool -> Streamline m ()
  219. setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
  220. parseLine :: A.Parser ByteString
  221. parseLine = do
  222. l <- A.takeTill isEol
  223. (A.word8 13 >> A.word8 10) <|> A.word8 10
  224. return l
  225. lineWithEol :: A.Parser (ByteString, ByteString)
  226. lineWithEol = do
  227. l <- A.scan False lineScanner
  228. r <- A.takeByteString
  229. return (l, r)
  230. lineScanner :: Bool -> Word8 -> Maybe Bool
  231. lineScanner False c = Just $ isEol c
  232. lineScanner True c = if isEol c then Just True else Nothing
  233. isEol :: Word8 -> Bool
  234. isEol c = c == 13 || c == 10
  235. eofError :: MonadIO m => String -> m a
  236. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing