Streamline.hs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. {-# LANGUAGE OverloadedStrings #-}
  2. -- |
  3. -- Streamline exports a monad that, given an uniform IO target, emulates
  4. -- character tream IO using high performance block IO.
  5. module System.IO.Uniform.Streamline (
  6. Streamline,
  7. IOScannerState(..),
  8. withClient,
  9. withServer,
  10. withTarget,
  11. send,
  12. send',
  13. recieveLine,
  14. recieveLine',
  15. lazyRecieveLine,
  16. recieveN,
  17. recieveN',
  18. lazyRecieveN,
  19. recieveTill,
  20. recieveTill',
  21. startTls,
  22. runAttoparsec,
  23. runAttoparsecAndReturn,
  24. isSecure,
  25. setTimeout,
  26. setEcho,
  27. runScanner,
  28. runScanner',
  29. scan,
  30. scan',
  31. textScanner
  32. ) where
  33. import qualified System.IO.Uniform as S
  34. import qualified System.IO.Uniform.Network as N
  35. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  36. import System.IO.Uniform.Streamline.Scanner
  37. import Control.Monad.Trans.Class
  38. import Control.Applicative
  39. import Control.Monad (ap)
  40. import Control.Monad.IO.Class
  41. import System.IO.Error
  42. import Data.ByteString (ByteString)
  43. import qualified Data.ByteString as BS
  44. import qualified Data.ByteString.Lazy as LBS
  45. import Data.Word8 (Word8)
  46. import Data.IP (IP)
  47. import qualified Data.Char as C
  48. import qualified Data.Attoparsec.ByteString as A
  49. data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
  50. -- | Monad that emulates character stream IO over block IO.
  51. newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
  52. blockSize :: Int
  53. blockSize = 4096
  54. defaultTimeout :: Int
  55. defaultTimeout = 1000000 * 600
  56. readF :: MonadIO m => Data -> m ByteString
  57. readF cl = if echo cl
  58. then do
  59. l <- liftIO $ S.uRead (str cl) blockSize
  60. liftIO $ BS.putStr "<"
  61. liftIO $ BS.putStr l
  62. return l
  63. else liftIO $ S.uRead (str cl) blockSize
  64. writeF :: MonadIO m => Data -> ByteString -> m ()
  65. writeF cl l = if echo cl
  66. then do
  67. liftIO $ BS.putStr ">"
  68. liftIO $ BS.putStr l
  69. liftIO $ S.uPut (str cl) l
  70. else liftIO $ S.uPut (str cl) l
  71. -- | withServer f serverIP port
  72. --
  73. -- Connects to the given server port, runs f, and closes the connection.
  74. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
  75. withServer host port f = do
  76. ds <- liftIO $ N.connectTo host port
  77. (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
  78. liftIO $ S.uClose ds
  79. return ret
  80. -- | withClient f boundPort
  81. --
  82. -- Accepts a connection at the bound port, runs f and closes the connection.
  83. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
  84. withClient port f = do
  85. ds <- liftIO $ N.accept port
  86. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  87. (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
  88. liftIO $ S.uClose ds
  89. return ret
  90. -- | withTarget f someIO
  91. --
  92. -- Runs f wrapped on a Streamline monad that does IO on nomeIO.
  93. withTarget :: (MonadIO m, UniformIO a) => a -> Streamline m b -> m b
  94. withTarget s f = do
  95. (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
  96. return ret
  97. instance Monad m => Monad (Streamline m) where
  98. --return :: (Monad m) => a -> Streamline m a
  99. return x = Streamline $ \cl -> return (x, cl)
  100. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  101. a >>= b = Streamline $ \cl -> do
  102. (x, cl') <- withTarget' a cl
  103. withTarget' (b x) cl'
  104. instance Monad m => Functor (Streamline m) where
  105. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  106. fmap f m = Streamline $ \cl -> do
  107. (x, cl') <- withTarget' m cl
  108. return (f x, cl')
  109. instance (Functor m, Monad m) => Applicative (Streamline m) where
  110. pure = return
  111. (<*>) = ap
  112. instance MonadTrans Streamline where
  113. --lift :: Monad m => m a -> Streamline m a
  114. lift x = Streamline $ \cl -> do
  115. a <- x
  116. return (a, cl)
  117. instance MonadIO m => MonadIO (Streamline m) where
  118. liftIO = lift . liftIO
  119. -- | Sends data over the streamlines an IO target.
  120. send :: MonadIO m => ByteString -> Streamline m ()
  121. send r = Streamline $ \cl -> do
  122. writeF cl r
  123. return ((), cl)
  124. -- | Sends data from a lazy byte string
  125. send' :: MonadIO m => LBS.ByteString -> Streamline m ()
  126. send' r = Streamline $ \cl -> do
  127. let dd = LBS.toChunks r
  128. mapM (writeF cl) dd
  129. return ((), cl)
  130. -- | Equivalent to runScanner', but returns a strict, completely
  131. -- evaluated ByteString.
  132. runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
  133. runScanner state scanner = do
  134. (rt, st) <- runScanner' state scanner
  135. return (LBS.toStrict rt, st)
  136. {- |
  137. Very much like Attoparsec's runScanner:
  138. runScanner' scanner initial_state
  139. Recieves data, running the scanner on each byte,
  140. using the scanner result as initial state for the
  141. next byte, and stopping when the scanner returns
  142. Nothing.
  143. Returns the scanned ByteString.
  144. -}
  145. runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
  146. runScanner' state scanner = Streamline $ \d ->
  147. do
  148. (tx, st, d') <- in_scan d state
  149. return ((LBS.fromChunks tx, st), d')
  150. where
  151. --in_scan :: Data -> s -> m ([ByteString], s, Data)
  152. in_scan d st
  153. | isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
  154. | BS.null (buff d) = do
  155. dt <- readF d
  156. if BS.null dt
  157. then return ([], st, d{isEOF=True})
  158. else in_scan d{buff=dt} st
  159. | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of
  160. AllInput st' -> do
  161. (tx', st'', d') <- in_scan d{buff=""} st'
  162. return (buff d:tx', st'', d')
  163. SplitAt n st' -> let
  164. (r, i) = BS.splitAt n (buff d)
  165. in return ([r], st', d{buff=i})
  166. -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
  167. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
  168. sscan _ s0 _ [] = AllInput s0
  169. sscan s s0 i (w:ww) = case s s0 w of
  170. Finished -> SplitAt i s0
  171. LastPass s1 -> SplitAt (i+1) s1
  172. Running s1 -> sscan s s1 (i+1) ww
  173. data ScanResult s = SplitAt Int s | AllInput s
  174. -- | Equivalent to runScanner, but dischards the final state
  175. scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
  176. scan state scanner = fst <$> runScanner state scanner
  177. -- | Equivalent to runScanner', but dischards the final state
  178. scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
  179. scan' state scanner = fst <$> runScanner' state scanner
  180. -- | Recieves data untill the next end of line (\n or \r\n)
  181. recieveLine :: MonadIO m => Streamline m ByteString
  182. recieveLine = recieveTill "\n"
  183. -- | Lazy version of recieveLine
  184. recieveLine' :: MonadIO m => Streamline m LBS.ByteString
  185. recieveLine' = recieveTill' "\n"
  186. -- | Use recieveLine'.
  187. lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
  188. {-# DEPRECATED #-}
  189. lazyRecieveLine = Streamline $ \cl -> lazyRecieveLine' cl
  190. where
  191. lazyRecieveLine' :: MonadIO m => Data -> m ([ByteString], Data)
  192. lazyRecieveLine' cl' =
  193. if isEOF cl'
  194. then eofError "System.IO.Uniform.Streamline.lazyRecieveLine"
  195. else
  196. if BS.null $ buff cl'
  197. then do
  198. dt <- readF cl'
  199. lazyRecieveLine' cl'{buff=dt}{isEOF=BS.null dt}
  200. else do
  201. let l = A.parseOnly lineWithEol $ buff cl'
  202. case l of
  203. Left _ -> do
  204. l' <- readF cl'
  205. (ret, cl'') <- lazyRecieveLine' cl'{buff=l'}{isEOF=BS.null l'}
  206. return ((buff cl') : ret, cl'')
  207. Right (ret, dt) -> return ([ret], cl'{buff=dt})
  208. -- | Recieves the given number of bytes.
  209. recieveN :: MonadIO m => Int -> Streamline m ByteString
  210. recieveN n = LBS.toStrict <$> recieveN' n
  211. -- | Lazy version of recieveN
  212. recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
  213. recieveN' n = Streamline $ \cl ->
  214. do
  215. (tt, cl') <- recieve cl n
  216. return (LBS.fromChunks tt, cl')
  217. where
  218. recieve d b
  219. | isEOF d = eofError "System.IO.Uniform.Streamline.lazyRecieveN"
  220. | BS.null . buff $ d = do
  221. dt <- readF d
  222. recieve d{buff=dt}{isEOF=BS.null dt} b
  223. | b <= (BS.length . buff $ d) = let
  224. (r, dt) = BS.splitAt b $ buff d
  225. in return ([r], d{buff=dt})
  226. | otherwise = do
  227. (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d)
  228. return (buff d : r, d')
  229. -- | Use recieveN'.
  230. lazyRecieveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
  231. {-# DEPRECATED #-}
  232. lazyRecieveN n' = Streamline $ \cl' -> lazyRecieveN' cl' n'
  233. where
  234. lazyRecieveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
  235. lazyRecieveN' cl n =
  236. if isEOF cl
  237. then eofError "System.IO.Uniform.Streamline.lazyRecieveN"
  238. else
  239. if BS.null (buff cl)
  240. then do
  241. b <- readF cl
  242. let eof = BS.null b
  243. let cl' = cl{buff=b}{isEOF=eof}
  244. lazyRecieveN' cl' n
  245. else
  246. if n <= BS.length (buff cl)
  247. then let
  248. ret = [BS.take n (buff cl)]
  249. buff' = BS.drop n (buff cl)
  250. in return (ret, cl{buff=buff'})
  251. else let
  252. cl' = cl{buff=""}
  253. b = buff cl
  254. in fmap (appFst b) $ lazyRecieveN' cl' (n - BS.length b)
  255. appFst :: a -> ([a], b) -> ([a], b)
  256. appFst a (l, b) = (a:l, b)
  257. -- | Recieves data until it matches the argument.
  258. -- Returns all of it, including the matching data.
  259. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
  260. recieveTill t = LBS.toStrict <$> recieveTill' t
  261. -- | Lazy version of recieveTill
  262. recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
  263. recieveTill' t = recieve . BS.unpack $ t
  264. where
  265. recieve t' = scan' [] (textScanner t')
  266. -- | Wraps the streamlined IO target on TLS, streamlining
  267. -- the new wrapper afterwads.
  268. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  269. startTls st = Streamline $ \cl -> do
  270. ds' <- liftIO $ S.startTls st $ str cl
  271. return ((), cl{str=SomeIO ds'}{buff=""})
  272. -- | Runs an Attoparsec parser over the data read from the
  273. -- streamlined IO target. Returns both the parser
  274. -- result and the string consumed by it.
  275. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  276. runAttoparsecAndReturn p = Streamline $ \cl ->
  277. if isEOF cl
  278. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  279. else do
  280. let c = A.parse p $ buff cl
  281. (cl', i, a) <- liftIO $ continueResult cl c
  282. return ((i, a), cl')
  283. where
  284. continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
  285. -- tx eof ds
  286. continueResult cl c = case c of
  287. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  288. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  289. A.Partial c' -> do
  290. d <- readF cl
  291. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  292. continueResult cl' (c' d)
  293. -- | Runs an Attoparsec parser over the data read from the
  294. -- streamlined IO target. Returning the parser result.
  295. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  296. runAttoparsec p = Streamline $ \cl ->
  297. if isEOF cl
  298. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  299. else do
  300. let c = A.parse p $ buff cl
  301. (cl', a) <- liftIO $ continueResult cl c
  302. return (a, cl')
  303. where
  304. continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
  305. continueResult cl c = case c of
  306. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  307. A.Done i r -> return (cl{buff=i}, Right r)
  308. A.Partial c' -> do
  309. d <- readF cl
  310. let eof' = BS.null d
  311. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  312. -- | Indicates whether transport layer security is being used.
  313. isSecure :: Monad m => Streamline m Bool
  314. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  315. -- | Sets the timeout for the streamlined IO target.
  316. setTimeout :: Monad m => Int -> Streamline m ()
  317. setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
  318. -- | Sets echo of the streamlines IO target.
  319. -- If echo is set, all the data read an written to the target
  320. -- will be echoed in stdout, with ">" and "<" markers indicating
  321. -- what is read and written.
  322. setEcho :: Monad m => Bool -> Streamline m ()
  323. setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
  324. lineWithEol :: A.Parser (ByteString, ByteString)
  325. lineWithEol = do
  326. l <- A.scan False lineScanner
  327. r <- A.takeByteString
  328. return (l, r)
  329. eofError :: MonadIO m => String -> m a
  330. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
  331. lineScanner :: Bool -> Word8 -> Maybe Bool
  332. lineScanner False c
  333. | c == (fromIntegral . C.ord $ '\n') = Just True
  334. | otherwise = Just False
  335. lineScanner True _ = Nothing