Streamline.hs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. {-# LANGUAGE OverloadedStrings #-}
  2. -- |
  3. -- Streamline exports a monad that, given an uniform IO target, emulates
  4. -- character tream IO using high performance block IO.
  5. module System.IO.Uniform.Streamline (Streamline, withClient, withServer, withTarget, send, receiveLine, lazyRecieveLine, lazyReceiveN, startTls, runAttoparsec, runAttoparsecAndReturn, isSecure, setTimeout, setEcho) where
  6. import qualified System.IO.Uniform as S
  7. import qualified System.IO.Uniform.Network as N
  8. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  9. import Control.Monad.Trans.Class
  10. import Control.Applicative
  11. import Control.Monad (ap)
  12. import Control.Monad.IO.Class
  13. import System.IO.Error
  14. import Data.ByteString (ByteString)
  15. import qualified Data.ByteString as BS
  16. import Data.Word8 (Word8)
  17. import Data.IP (IP)
  18. import qualified Data.Attoparsec.ByteString as A
  19. data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
  20. -- | Monad that emulates character stream IO over block IO.
  21. newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
  22. blockSize :: Int
  23. blockSize = 4096
  24. defaultTimeout :: Int
  25. defaultTimeout = 1000000 * 600
  26. readF :: MonadIO m => Data -> m ByteString
  27. readF cl = if echo cl
  28. then do
  29. l <- liftIO $ S.uRead (str cl) blockSize
  30. liftIO $ BS.putStr "<"
  31. liftIO $ BS.putStr l
  32. return l
  33. else liftIO $ S.uRead (str cl) blockSize
  34. writeF :: MonadIO m => Data -> ByteString -> m ()
  35. writeF cl l = if echo cl
  36. then do
  37. liftIO $ BS.putStr ">"
  38. liftIO $ BS.putStr l
  39. liftIO $ S.uPut (str cl) l
  40. else liftIO $ S.uPut (str cl) l
  41. -- | withServer f serverIP port
  42. --
  43. -- Connects to the given server port, runs f, and closes the connection.
  44. withServer :: MonadIO m => Streamline m a -> IP -> Int -> m a
  45. withServer f host port = do
  46. ds <- liftIO $ N.connectTo host port
  47. (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
  48. liftIO $ S.uClose ds
  49. return ret
  50. -- | withClient f boundPort
  51. --
  52. -- Accepts a connection at the bound port, runs f and closes the connection.
  53. withClient :: MonadIO m => (IP -> Int -> Streamline m a) -> N.BoundedPort -> m a
  54. withClient f port = do
  55. ds <- liftIO $ N.accept port
  56. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  57. (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
  58. liftIO $ S.uClose ds
  59. return ret
  60. -- | withTarget f someIO
  61. --
  62. -- Runs f wrapped on a Streamline monad that does IO on nomeIO.
  63. withTarget :: (MonadIO m, UniformIO a) => Streamline m b -> a -> m b
  64. withTarget f s = do
  65. (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
  66. return ret
  67. instance Monad m => Monad (Streamline m) where
  68. --return :: (Monad m) => a -> Streamline m a
  69. return x = Streamline $ \cl -> return (x, cl)
  70. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  71. a >>= b = Streamline $ \cl -> do
  72. (x, cl') <- withTarget' a cl
  73. withTarget' (b x) cl'
  74. instance Monad m => Functor (Streamline m) where
  75. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  76. fmap f m = Streamline $ \cl -> do
  77. (x, cl') <- withTarget' m cl
  78. return (f x, cl')
  79. instance (Functor m, Monad m) => Applicative (Streamline m) where
  80. pure = return
  81. (<*>) = ap
  82. instance MonadTrans Streamline where
  83. --lift :: Monad m => m a -> Streamline m a
  84. lift x = Streamline $ \cl -> do
  85. a <- x
  86. return (a, cl)
  87. instance MonadIO m => MonadIO (Streamline m) where
  88. liftIO = lift . liftIO
  89. -- | Sends data over the streamlines an IO target.
  90. send :: MonadIO m => ByteString -> Streamline m ()
  91. send r = Streamline $ \cl -> do
  92. writeF cl r
  93. return ((), cl)
  94. -- | Receives a line from the streamlined IO target.
  95. receiveLine :: MonadIO m => Streamline m ByteString
  96. receiveLine = do
  97. l <- runAttoparsec parseLine
  98. case l of
  99. Left _ -> return ""
  100. Right l' -> return l'
  101. -- | Receives a line from the streamlined IO target,
  102. -- but breaks the line on reasonably sized chuncks, and
  103. -- reads them lazyly, so that IO can be done in constant
  104. -- memory space.
  105. lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
  106. lazyRecieveLine = Streamline $ \cl -> lazyReceiveLine' cl
  107. where
  108. lazyReceiveLine' :: MonadIO m => Data -> m ([ByteString], Data)
  109. lazyReceiveLine' cl' =
  110. if isEOF cl'
  111. then eofError "System.IO.Uniform.Streamline.lazyReceiveLine"
  112. else
  113. if BS.null $ buff cl'
  114. then do
  115. dt <- readF cl'
  116. lazyReceiveLine' cl'{buff=dt}{isEOF=BS.null dt}
  117. else do
  118. let l = A.parseOnly lineWithEol $ buff cl'
  119. case l of
  120. Left _ -> do
  121. l' <- readF cl'
  122. (ret, cl'') <- lazyReceiveLine' cl'{buff=l'}{isEOF=BS.null l'}
  123. return ((buff cl') : ret, cl'')
  124. Right (ret, dt) -> return ([ret], cl'{buff=dt})
  125. -- | lazyReceiveN n
  126. -- Receives n bytes of data from the streamlined IO target,
  127. -- but breaks the data on reasonably sized chuncks, and reads
  128. -- them lazyly, so that IO can be done in constant memory space.
  129. lazyReceiveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
  130. lazyReceiveN n' = Streamline $ \cl' -> lazyReceiveN' cl' n'
  131. where
  132. lazyReceiveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
  133. lazyReceiveN' cl n =
  134. if isEOF cl
  135. then eofError "System.IO.Uniform.Streamline.lazyReceiveN"
  136. else
  137. if BS.null (buff cl)
  138. then do
  139. b <- readF cl
  140. let eof = BS.null b
  141. let cl' = cl{buff=b}{isEOF=eof}
  142. lazyReceiveN' cl' n
  143. else
  144. if n <= BS.length (buff cl)
  145. then let
  146. ret = [BS.take n (buff cl)]
  147. buff' = BS.drop n (buff cl)
  148. in return (ret, cl{buff=buff'})
  149. else let
  150. cl' = cl{buff=""}
  151. b = buff cl
  152. in fmap (appFst b) $ lazyReceiveN' cl' (n - BS.length b)
  153. appFst :: a -> ([a], b) -> ([a], b)
  154. appFst a (l, b) = (a:l, b)
  155. -- | Wraps the streamlined IO target on TLS, streamlining
  156. -- the new wrapper afterwads.
  157. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  158. startTls st = Streamline $ \cl -> do
  159. ds' <- liftIO $ S.startTls st $ str cl
  160. return ((), cl{str=SomeIO ds'}{buff=""})
  161. -- | Runs an Attoparsec parser over the data read from the
  162. -- streamlined IO target. Returns both the parser
  163. -- result and the string consumed by it.
  164. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  165. runAttoparsecAndReturn p = Streamline $ \cl ->
  166. if isEOF cl
  167. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  168. else do
  169. let c = A.parse p $ buff cl
  170. (cl', i, a) <- liftIO $ continueResult cl c
  171. return ((i, a), cl')
  172. where
  173. continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
  174. -- tx eof ds
  175. continueResult cl c = case c of
  176. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  177. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  178. A.Partial c' -> do
  179. d <- readF cl
  180. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  181. continueResult cl' (c' d)
  182. -- | Runs an Attoparsec parser over the data read from the
  183. -- streamlined IO target. Returning the parser result.
  184. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  185. runAttoparsec p = Streamline $ \cl ->
  186. if isEOF cl
  187. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  188. else do
  189. let c = A.parse p $ buff cl
  190. (cl', a) <- liftIO $ continueResult cl c
  191. return (a, cl')
  192. where
  193. continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
  194. continueResult cl c = case c of
  195. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  196. A.Done i r -> return (cl{buff=i}, Right r)
  197. A.Partial c' -> do
  198. d <- readF cl
  199. let eof' = BS.null d
  200. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  201. -- | Indicates whether transport layer security is being used.
  202. isSecure :: Monad m => Streamline m Bool
  203. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  204. -- | Sets the timeout for the streamlined IO target.
  205. setTimeout :: Monad m => Int -> Streamline m ()
  206. setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
  207. -- | Sets echo of the streamlines IO target.
  208. -- If echo is set, all the data read an written to the target
  209. -- will be echoed in stdout, with ">" and "<" markers indicating
  210. -- what is read and written.
  211. setEcho :: Monad m => Bool -> Streamline m ()
  212. setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
  213. parseLine :: A.Parser ByteString
  214. parseLine = do
  215. l <- A.takeTill isEol
  216. (A.word8 13 >> A.word8 10) <|> A.word8 10
  217. return l
  218. lineWithEol :: A.Parser (ByteString, ByteString)
  219. lineWithEol = do
  220. l <- A.scan False lineScanner
  221. r <- A.takeByteString
  222. return (l, r)
  223. lineScanner :: Bool -> Word8 -> Maybe Bool
  224. lineScanner False c = Just $ isEol c
  225. lineScanner True c = if isEol c then Just True else Nothing
  226. isEol :: Word8 -> Bool
  227. isEol c = c == 13 || c == 10
  228. eofError :: MonadIO m => String -> m a
  229. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing