Streamline.hs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. {-# LANGUAGE OverloadedStrings #-}
  2. -- |
  3. -- Streamline exports a monad that, given an uniform IO target, emulates
  4. -- character tream IO using high performance block IO.
  5. module System.IO.Uniform.Streamline (Streamline, withClient, withServer, withTarget, send, receiveLine, lazyRecieveLine, lazyReceiveN, startTls, runAttoparsec, runAttoparsecAndReturn, isSecure, setTimeout, setEcho) where
  6. import qualified System.IO.Uniform as S
  7. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  8. import Control.Monad.Trans.Class
  9. import Control.Applicative
  10. import Control.Monad (ap)
  11. import Control.Monad.IO.Class
  12. import System.IO.Error
  13. import Data.ByteString (ByteString)
  14. import qualified Data.ByteString as BS
  15. import Data.Word8 (Word8)
  16. import Data.IP (IP)
  17. import qualified Data.Attoparsec.ByteString as A
  18. data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
  19. -- | Monad that emulates character stream IO over block IO.
  20. newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
  21. blockSize :: Int
  22. blockSize = 4096
  23. defaultTimeout :: Int
  24. defaultTimeout = 1000000 * 600
  25. readF :: MonadIO m => Data -> m ByteString
  26. readF cl = if echo cl
  27. then do
  28. l <- liftIO $ S.uRead (str cl) blockSize
  29. liftIO $ BS.putStr "<"
  30. liftIO $ BS.putStr l
  31. return l
  32. else liftIO $ S.uRead (str cl) blockSize
  33. writeF :: MonadIO m => Data -> ByteString -> m ()
  34. writeF cl l = if echo cl
  35. then do
  36. liftIO $ BS.putStr ">"
  37. liftIO $ BS.putStr l
  38. liftIO $ S.uPut (str cl) l
  39. else liftIO $ S.uPut (str cl) l
  40. -- | withServer f serverIP port
  41. --
  42. -- Connects to the given server port, runs f, and closes the connection.
  43. withServer :: MonadIO m => Streamline m a -> IP -> Int -> m a
  44. withServer f host port = do
  45. ds <- liftIO $ S.connectTo host port
  46. (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
  47. liftIO $ S.uClose ds
  48. return ret
  49. -- | withClient f boundPort
  50. --
  51. -- Accepts a connection at the bound port, runs f and closes the connection.
  52. withClient :: MonadIO m => (IP -> Int -> Streamline m a) -> S.BoundedPort -> m a
  53. withClient f port = do
  54. ds <- liftIO $ S.accept port
  55. (peerIp, peerPort) <- liftIO $ S.getPeer ds
  56. (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
  57. liftIO $ S.uClose ds
  58. return ret
  59. -- | withTarget f someIO
  60. --
  61. -- Runs f wrapped on a Streamline monad that does IO on nomeIO.
  62. withTarget :: (MonadIO m, UniformIO a) => Streamline m b -> a -> m b
  63. withTarget f s = do
  64. (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
  65. return ret
  66. instance Monad m => Monad (Streamline m) where
  67. --return :: (Monad m) => a -> Streamline m a
  68. return x = Streamline $ \cl -> return (x, cl)
  69. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  70. a >>= b = Streamline $ \cl -> do
  71. (x, cl') <- withTarget' a cl
  72. withTarget' (b x) cl'
  73. instance Monad m => Functor (Streamline m) where
  74. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  75. fmap f m = Streamline $ \cl -> do
  76. (x, cl') <- withTarget' m cl
  77. return (f x, cl')
  78. instance (Functor m, Monad m) => Applicative (Streamline m) where
  79. pure = return
  80. (<*>) = ap
  81. instance MonadTrans Streamline where
  82. --lift :: Monad m => m a -> Streamline m a
  83. lift x = Streamline $ \cl -> do
  84. a <- x
  85. return (a, cl)
  86. instance MonadIO m => MonadIO (Streamline m) where
  87. liftIO = lift . liftIO
  88. -- | Sends data over the streamlines an IO target.
  89. send :: MonadIO m => ByteString -> Streamline m ()
  90. send r = Streamline $ \cl -> do
  91. writeF cl r
  92. return ((), cl)
  93. -- | Receives a line from the streamlined IO target.
  94. receiveLine :: MonadIO m => Streamline m ByteString
  95. receiveLine = do
  96. l <- runAttoparsec parseLine
  97. case l of
  98. Left _ -> return ""
  99. Right l' -> return l'
  100. -- | Receives a line from the streamlined IO target,
  101. -- but breaks the line on reasonably sized chuncks, and
  102. -- reads them lazyly, so that IO can be done in constant
  103. -- memory space.
  104. lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
  105. lazyRecieveLine = Streamline $ \cl -> lazyReceiveLine' cl
  106. where
  107. lazyReceiveLine' :: MonadIO m => Data -> m ([ByteString], Data)
  108. lazyReceiveLine' cl' =
  109. if isEOF cl'
  110. then eofError "System.IO.Uniform.Streamline.lazyReceiveLine"
  111. else
  112. if BS.null $ buff cl'
  113. then do
  114. dt <- readF cl'
  115. lazyReceiveLine' cl'{buff=dt}{isEOF=BS.null dt}
  116. else do
  117. let l = A.parseOnly lineWithEol $ buff cl'
  118. case l of
  119. Left _ -> do
  120. l' <- readF cl'
  121. (ret, cl'') <- lazyReceiveLine' cl'{buff=l'}{isEOF=BS.null l'}
  122. return ((buff cl') : ret, cl'')
  123. Right (ret, dt) -> return ([ret], cl'{buff=dt})
  124. -- | lazyReceiveN n
  125. -- Receives n bytes of data from the streamlined IO target,
  126. -- but breaks the data on reasonably sized chuncks, and reads
  127. -- them lazyly, so that IO can be done in constant memory space.
  128. lazyReceiveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
  129. lazyReceiveN n' = Streamline $ \cl' -> lazyReceiveN' cl' n'
  130. where
  131. lazyReceiveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
  132. lazyReceiveN' cl n =
  133. if isEOF cl
  134. then eofError "System.IO.Uniform.Streamline.lazyReceiveN"
  135. else
  136. if BS.null (buff cl)
  137. then do
  138. b <- readF cl
  139. let eof = BS.null b
  140. let cl' = cl{buff=b}{isEOF=eof}
  141. lazyReceiveN' cl' n
  142. else
  143. if n <= BS.length (buff cl)
  144. then let
  145. ret = [BS.take n (buff cl)]
  146. buff' = BS.drop n (buff cl)
  147. in return (ret, cl{buff=buff'})
  148. else let
  149. cl' = cl{buff=""}
  150. b = buff cl
  151. in fmap (appFst b) $ lazyReceiveN' cl' (n - BS.length b)
  152. appFst :: a -> ([a], b) -> ([a], b)
  153. appFst a (l, b) = (a:l, b)
  154. -- | Wraps the streamlined IO target on TLS, streamlining
  155. -- the new wrapper afterwads.
  156. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  157. startTls st = Streamline $ \cl -> do
  158. ds' <- liftIO $ S.startTls st $ str cl
  159. return ((), cl{str=SomeIO ds'}{buff=""})
  160. -- | Runs an Attoparsec parser over the data read from the
  161. -- streamlined IO target. Returns both the parser
  162. -- result and the string consumed by it.
  163. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  164. runAttoparsecAndReturn p = Streamline $ \cl ->
  165. if isEOF cl
  166. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  167. else do
  168. let c = A.parse p $ buff cl
  169. (cl', i, a) <- liftIO $ continueResult cl c
  170. return ((i, a), cl')
  171. where
  172. continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
  173. -- tx eof ds
  174. continueResult cl c = case c of
  175. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  176. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  177. A.Partial c' -> do
  178. d <- readF cl
  179. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  180. continueResult cl' (c' d)
  181. -- | Runs an Attoparsec parser over the data read from the
  182. -- streamlined IO target. Returning the parser result.
  183. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  184. runAttoparsec p = Streamline $ \cl ->
  185. if isEOF cl
  186. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  187. else do
  188. let c = A.parse p $ buff cl
  189. (cl', a) <- liftIO $ continueResult cl c
  190. return (a, cl')
  191. where
  192. continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
  193. continueResult cl c = case c of
  194. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  195. A.Done i r -> return (cl{buff=i}, Right r)
  196. A.Partial c' -> do
  197. d <- readF cl
  198. let eof' = BS.null d
  199. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  200. -- | Indicates whether transport layer security is being used.
  201. isSecure :: Monad m => Streamline m Bool
  202. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  203. -- | Sets the timeout for the streamlined IO target.
  204. setTimeout :: Monad m => Int -> Streamline m ()
  205. setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
  206. -- | Sets echo of the streamlines IO target.
  207. -- If echo is set, all the data read an written to the target
  208. -- will be echoed in stdout, with ">" and "<" markers indicating
  209. -- what is read and written.
  210. setEcho :: Monad m => Bool -> Streamline m ()
  211. setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
  212. parseLine :: A.Parser ByteString
  213. parseLine = do
  214. l <- A.takeTill isEol
  215. (A.word8 13 >> A.word8 10) <|> A.word8 10
  216. return l
  217. lineWithEol :: A.Parser (ByteString, ByteString)
  218. lineWithEol = do
  219. l <- A.scan False lineScanner
  220. r <- A.takeByteString
  221. return (l, r)
  222. lineScanner :: Bool -> Word8 -> Maybe Bool
  223. lineScanner False c = Just $ isEol c
  224. lineScanner True c = if isEol c then Just True else Nothing
  225. isEol :: Word8 -> Bool
  226. isEol c = c == 13 || c == 10
  227. eofError :: MonadIO m => String -> m a
  228. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing