Streamline.hs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. {-# LANGUAGE OverloadedStrings #-}
  2. -- |
  3. -- Streamline exports a monad that, given an uniform IO target, emulates
  4. -- character tream IO using high performance block IO.
  5. module System.IO.Uniform.Streamline (Streamline, withClient, withServer, withTarget, send, receiveLine, lazyRecieveLine, lazyReceiveN, startTls, runAttoparsec, runAttoparsecAndReturn, isSecure, setTimeout, setEcho) where
  6. import qualified System.IO.Uniform as S
  7. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  8. import Control.Monad.Trans.Class
  9. import Control.Applicative
  10. import Control.Monad (ap)
  11. import Control.Monad.IO.Class
  12. import System.IO.Error
  13. import Data.ByteString (ByteString)
  14. import qualified Data.ByteString as BS
  15. import Data.Char (ord)
  16. import Data.Word8 (Word8)
  17. import Data.IP (IP)
  18. import qualified Data.Attoparsec.ByteString as A
  19. data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
  20. -- | Monad that emulates character stream IO over block IO.
  21. newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
  22. blockSize :: Int
  23. blockSize = 4096
  24. defaultTimeout :: Int
  25. defaultTimeout = 1000000 * 600
  26. readF :: MonadIO m => Data -> m ByteString
  27. readF cl = if echo cl
  28. then do
  29. l <- liftIO $ S.fRead (str cl) blockSize
  30. liftIO $ BS.putStr "<"
  31. liftIO $ BS.putStr l
  32. return l
  33. else liftIO $ S.fRead (str cl) blockSize
  34. writeF :: MonadIO m => Data -> ByteString -> m ()
  35. writeF cl l = if echo cl
  36. then do
  37. liftIO $ BS.putStr ">"
  38. liftIO $ BS.putStr l
  39. liftIO $ S.fPut (str cl) l
  40. else liftIO $ S.fPut (str cl) l
  41. -- | withServer f serverIP port
  42. -- Connects to the given server port, runs f, and closes the connection.
  43. withServer :: MonadIO m => Streamline m a -> IP -> Int -> m a
  44. withServer f host port = do
  45. ds <- liftIO $ S.connectTo host port
  46. (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
  47. liftIO $ S.fClose ds
  48. return ret
  49. -- | withClient f boundPort
  50. -- Accepts a connection at the bound port, runs f and closes the connection.
  51. withClient :: MonadIO m => (IP -> Int -> Streamline m a) -> S.BoundPort -> m a
  52. withClient f port = do
  53. ds <- liftIO $ S.accept port
  54. (peerIp, peerPort) <- liftIO $ S.getPeer ds
  55. (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
  56. liftIO $ S.fClose ds
  57. return ret
  58. -- | withTarget f someIO
  59. -- Runs f wrapped on a Streamline monad that does IO on nomeIO.
  60. withTarget :: (MonadIO m, UniformIO a) => Streamline m b -> a -> m b
  61. withTarget f s = do
  62. (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
  63. return ret
  64. instance Monad m => Monad (Streamline m) where
  65. --return :: (Monad m) => a -> Streamline m a
  66. return x = Streamline $ \cl -> return (x, cl)
  67. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  68. a >>= b = Streamline $ \cl -> do
  69. (x, cl') <- withTarget' a cl
  70. withTarget' (b x) cl'
  71. instance Monad m => Functor (Streamline m) where
  72. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  73. fmap f m = Streamline $ \cl -> do
  74. (x, cl') <- withTarget' m cl
  75. return (f x, cl')
  76. instance (Functor m, Monad m) => Applicative (Streamline m) where
  77. pure = return
  78. (<*>) = ap
  79. instance MonadTrans Streamline where
  80. --lift :: Monad m => m a -> Streamline m a
  81. lift x = Streamline $ \cl -> do
  82. a <- x
  83. return (a, cl)
  84. instance MonadIO m => MonadIO (Streamline m) where
  85. liftIO = lift . liftIO
  86. -- | Sends data over the streamlines an IO target.
  87. send :: MonadIO m => ByteString -> Streamline m ()
  88. send r = Streamline $ \cl -> do
  89. writeF cl r
  90. return ((), cl)
  91. -- | Receives a line from the streamlined IO target.
  92. receiveLine :: MonadIO m => Streamline m ByteString
  93. receiveLine = do
  94. l <- runAttoparsec parseLine
  95. case l of
  96. Left _ -> return ""
  97. Right l' -> return l'
  98. -- | Receives a line from the streamlined IO target,
  99. -- but breaks the line on reasonably sized chuncks, and
  100. -- reads them lazyly, so that IO can be done in constant
  101. -- memory space.
  102. lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
  103. lazyRecieveLine = Streamline $ \cl -> lazyReceiveLine' cl
  104. where
  105. lazyReceiveLine' :: MonadIO m => Data -> m ([ByteString], Data)
  106. lazyReceiveLine' cl' =
  107. if isEOF cl'
  108. then eofError "System.IO.Uniform.Streamline.lazyReceiveLine"
  109. else
  110. if BS.null $ buff cl'
  111. then do
  112. dt <- readF cl'
  113. lazyReceiveLine' cl'{buff=dt}{isEOF=BS.null dt}
  114. else do
  115. let l = A.parseOnly lineWithEol $ buff cl'
  116. case l of
  117. Left _ -> do
  118. l' <- readF cl'
  119. (ret, cl'') <- lazyReceiveLine' cl'{buff=l'}{isEOF=BS.null l'}
  120. return ((buff cl') : ret, cl'')
  121. Right (ret, dt) -> return ([ret], cl'{buff=dt})
  122. -- | lazyReceiveN n
  123. -- Receives n bytes of data from the streamlined IO target,
  124. -- but breaks the data on reasonably sized chuncks, and reads
  125. -- them lazyly, so that IO can be done in constant memory space.
  126. lazyReceiveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
  127. lazyReceiveN n' = Streamline $ \cl' -> lazyReceiveN' cl' n'
  128. where
  129. lazyReceiveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
  130. lazyReceiveN' cl n =
  131. if isEOF cl
  132. then eofError "System.IO.Uniform.Streamline.lazyReceiveN"
  133. else
  134. if BS.null (buff cl)
  135. then do
  136. b <- readF cl
  137. let eof = BS.null b
  138. let cl' = cl{buff=b}{isEOF=eof}
  139. lazyReceiveN' cl' n
  140. else
  141. if n <= BS.length (buff cl)
  142. then let
  143. ret = [BS.take n (buff cl)]
  144. buff' = BS.drop n (buff cl)
  145. in return (ret, cl{buff=buff'})
  146. else let
  147. cl' = cl{buff=""}
  148. b = buff cl
  149. in fmap (appFst b) $ lazyReceiveN' cl' (n - BS.length b)
  150. appFst :: a -> ([a], b) -> ([a], b)
  151. appFst a (l, b) = (a:l, b)
  152. -- | Wraps the streamlined IO target on TLS, streamlining
  153. -- the new wrapper afterwads.
  154. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  155. startTls st = Streamline $ \cl -> do
  156. ds' <- liftIO $ S.startTls st $ str cl
  157. return ((), cl{str=SomeIO ds'}{buff=""})
  158. -- | Runs an Attoparsec parser over the data read from the
  159. -- streamlined IO target. Returns both the parser
  160. -- result and the string consumed by it.
  161. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  162. runAttoparsecAndReturn p = Streamline $ \cl ->
  163. if isEOF cl
  164. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  165. else do
  166. let c = A.parse p $ buff cl
  167. (cl', i, a) <- liftIO $ continueResult cl c
  168. return ((i, a), cl')
  169. where
  170. continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
  171. -- tx eof ds
  172. continueResult cl c = case c of
  173. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  174. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  175. A.Partial c' -> do
  176. d <- readF cl
  177. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  178. continueResult cl' (c' d)
  179. -- | Runs an Attoparsec parser over the data read from the
  180. -- streamlined IO target. Returning the parser result.
  181. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  182. runAttoparsec p = Streamline $ \cl ->
  183. if isEOF cl
  184. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  185. else do
  186. let c = A.parse p $ buff cl
  187. (cl', a) <- liftIO $ continueResult cl c
  188. return (a, cl')
  189. where
  190. continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
  191. continueResult cl c = case c of
  192. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  193. A.Done i r -> return (cl{buff=i}, Right r)
  194. A.Partial c' -> do
  195. d <- readF cl
  196. let eof' = BS.null d
  197. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  198. -- | Indicates whether transport layer security is being used.
  199. isSecure :: Monad m => Streamline m Bool
  200. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  201. -- | Sets the timeout for the streamlined IO target.
  202. setTimeout :: Monad m => Int -> Streamline m ()
  203. setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
  204. -- | Sets echo of the streamlines IO target.
  205. -- If echo is set, all the data read an written to the target
  206. -- will be echoed in stdout, with ">" and "<" markers indicating
  207. -- what is read and written.
  208. setEcho :: Monad m => Bool -> Streamline m ()
  209. setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
  210. parseLine :: A.Parser ByteString
  211. parseLine = do
  212. l <- A.takeTill isEol
  213. (A.string "\r\n" <|> A.string "\n")
  214. return l
  215. lineWithEol :: A.Parser (ByteString, ByteString)
  216. lineWithEol = do
  217. l <- A.scan False lineScanner
  218. r <- A.takeByteString
  219. return (l, r)
  220. lineScanner :: Bool -> Word8 -> Maybe Bool
  221. lineScanner False c = Just $ isEol c
  222. lineScanner True c = if isEol c then Just True else Nothing
  223. isEol :: Word8 -> Bool
  224. isEol c = elem c (map (fromIntegral . ord) "\r\n")
  225. eofError :: MonadIO m => String -> m a
  226. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing