Streamline.hs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. {-# LANGUAGE OverloadedStrings #-}
  2. -- |
  3. -- Streamline exports a monad that, given an uniform IO target, emulates
  4. -- character tream IO using high performance block IO.
  5. module System.IO.Uniform.Streamline (
  6. Streamline,
  7. IOScannerState,
  8. withClient,
  9. withServer,
  10. withTarget,
  11. send,
  12. send',
  13. recieveLine,
  14. recieveLine',
  15. lazyRecieveLine,
  16. recieveN,
  17. recieveN',
  18. lazyRecieveN,
  19. recieveTill,
  20. recieveTill',
  21. startTls,
  22. runAttoparsec,
  23. runAttoparsecAndReturn,
  24. isSecure,
  25. setTimeout,
  26. setEcho,
  27. runScanner,
  28. runScanner',
  29. scan,
  30. scan'
  31. ) where
  32. import qualified System.IO.Uniform as S
  33. import qualified System.IO.Uniform.Network as N
  34. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  35. import Control.Monad.Trans.Class
  36. import Control.Applicative
  37. import Control.Monad (ap)
  38. import Control.Monad.IO.Class
  39. import System.IO.Error
  40. import Data.ByteString (ByteString)
  41. import qualified Data.ByteString as BS
  42. import qualified Data.ByteString.Lazy as LBS
  43. import Data.Word8 (Word8)
  44. import Data.IP (IP)
  45. import qualified Data.Char as C
  46. import qualified Data.Attoparsec.ByteString as A
  47. data Data = Data {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Bool}
  48. -- | Monad that emulates character stream IO over block IO.
  49. newtype Streamline m a = Streamline {withTarget' :: Data -> m (a, Data)}
  50. blockSize :: Int
  51. blockSize = 4096
  52. defaultTimeout :: Int
  53. defaultTimeout = 1000000 * 600
  54. readF :: MonadIO m => Data -> m ByteString
  55. readF cl = if echo cl
  56. then do
  57. l <- liftIO $ S.uRead (str cl) blockSize
  58. liftIO $ BS.putStr "<"
  59. liftIO $ BS.putStr l
  60. return l
  61. else liftIO $ S.uRead (str cl) blockSize
  62. writeF :: MonadIO m => Data -> ByteString -> m ()
  63. writeF cl l = if echo cl
  64. then do
  65. liftIO $ BS.putStr ">"
  66. liftIO $ BS.putStr l
  67. liftIO $ S.uPut (str cl) l
  68. else liftIO $ S.uPut (str cl) l
  69. -- | withServer f serverIP port
  70. --
  71. -- Connects to the given server port, runs f, and closes the connection.
  72. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
  73. withServer host port f = do
  74. ds <- liftIO $ N.connectTo host port
  75. (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
  76. liftIO $ S.uClose ds
  77. return ret
  78. -- | withClient f boundPort
  79. --
  80. -- Accepts a connection at the bound port, runs f and closes the connection.
  81. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
  82. withClient port f = do
  83. ds <- liftIO $ N.accept port
  84. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  85. (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
  86. liftIO $ S.uClose ds
  87. return ret
  88. -- | withTarget f someIO
  89. --
  90. -- Runs f wrapped on a Streamline monad that does IO on nomeIO.
  91. withTarget :: (MonadIO m, UniformIO a) => a -> Streamline m b -> m b
  92. withTarget s f = do
  93. (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
  94. return ret
  95. instance Monad m => Monad (Streamline m) where
  96. --return :: (Monad m) => a -> Streamline m a
  97. return x = Streamline $ \cl -> return (x, cl)
  98. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  99. a >>= b = Streamline $ \cl -> do
  100. (x, cl') <- withTarget' a cl
  101. withTarget' (b x) cl'
  102. instance Monad m => Functor (Streamline m) where
  103. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  104. fmap f m = Streamline $ \cl -> do
  105. (x, cl') <- withTarget' m cl
  106. return (f x, cl')
  107. instance (Functor m, Monad m) => Applicative (Streamline m) where
  108. pure = return
  109. (<*>) = ap
  110. instance MonadTrans Streamline where
  111. --lift :: Monad m => m a -> Streamline m a
  112. lift x = Streamline $ \cl -> do
  113. a <- x
  114. return (a, cl)
  115. instance MonadIO m => MonadIO (Streamline m) where
  116. liftIO = lift . liftIO
  117. -- | Sends data over the streamlines an IO target.
  118. send :: MonadIO m => ByteString -> Streamline m ()
  119. send r = Streamline $ \cl -> do
  120. writeF cl r
  121. return ((), cl)
  122. -- | Sends data from a lazy byte string
  123. send' :: MonadIO m => LBS.ByteString -> Streamline m ()
  124. send' r = Streamline $ \cl -> do
  125. let dd = LBS.toChunks r
  126. mapM (writeF cl) dd
  127. return ((), cl)
  128. -- | State of an IO scanner.
  129. -- Differently from a parser scanner, an IO scanner
  130. -- must deal with blocking behavior.
  131. data IOScannerState a =
  132. -- | A scanner returns Finished when the current input is not
  133. -- part of the result, and the scanning must stop before this
  134. -- input.
  135. Finished |
  136. -- | A scanner returns LastPass when the current input is the
  137. -- last one of the result, and the scanning must stop before
  138. -- after this input, without consuming more data.
  139. LastPass a |
  140. -- | A scanner returns Running when the current input is part
  141. -- of the result, and the scanning must continue.
  142. Running a
  143. -- | Equivalent to runScanner', but returns a strict, completely
  144. -- evaluated ByteString.
  145. runScanner :: MonadIO m => s -> (s -> Word8 -> IOScannerState s) -> Streamline m (ByteString, s)
  146. runScanner state scanner = do
  147. (rt, st) <- runScanner' state scanner
  148. return (LBS.toStrict rt, st)
  149. {- |
  150. Very much like Attoparsec's runScanner:
  151. runScanner' scanner initial_state
  152. Recieves data, running the scanner on each byte,
  153. using the scanner result as initial state for the
  154. next byte, and stopping when the scanner returns
  155. Nothing.
  156. Returns the scanned ByteString.
  157. -}
  158. runScanner' :: MonadIO m => s -> (s -> Word8 -> IOScannerState s) -> Streamline m (LBS.ByteString, s)
  159. runScanner' state scanner = Streamline $ \d ->
  160. do
  161. (tx, st, d') <- in_scan d state
  162. return ((LBS.fromChunks tx, st), d')
  163. where
  164. --in_scan :: Data -> s -> m ([ByteString], s, Data)
  165. in_scan d st
  166. | isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
  167. | BS.null (buff d) = do
  168. dt <- readF d
  169. if BS.null dt
  170. then return ([], st, d{isEOF=True})
  171. else in_scan d{buff=dt} st
  172. | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of
  173. AllInput st' -> do
  174. (tx', st'', d') <- in_scan d{buff=""} st'
  175. return (buff d:tx', st'', d')
  176. SplitAt n st' -> let
  177. (r, i) = BS.splitAt n (buff d)
  178. in return ([r], st', d{buff=i})
  179. -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
  180. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
  181. sscan _ s0 _ [] = AllInput s0
  182. sscan s s0 i (w:ww) = case s s0 w of
  183. Finished -> SplitAt i s0
  184. LastPass s1 -> SplitAt (i+1) s1
  185. Running s1 -> sscan s s1 (i+1) ww
  186. data ScanResult s = SplitAt Int s | AllInput s
  187. -- | Equivalent to runScanner, but dischards the final state
  188. scan :: MonadIO m => s -> (s -> Word8 -> IOScannerState s) -> Streamline m ByteString
  189. scan state scanner = fst <$> runScanner state scanner
  190. -- | Equivalent to runScanner', but dischards the final state
  191. scan' :: MonadIO m => s -> (s -> Word8 -> IOScannerState s) -> Streamline m LBS.ByteString
  192. scan' state scanner = fst <$> runScanner' state scanner
  193. -- | Recieves data untill the next end of line (\n or \r\n)
  194. recieveLine :: MonadIO m => Streamline m ByteString
  195. recieveLine = recieveTill "\n"
  196. -- | Lazy version of recieveLine
  197. recieveLine' :: MonadIO m => Streamline m LBS.ByteString
  198. recieveLine' = recieveTill' "\n"
  199. -- | Use recieveLine'.
  200. lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
  201. {-# DEPRECATED #-}
  202. lazyRecieveLine = Streamline $ \cl -> lazyRecieveLine' cl
  203. where
  204. lazyRecieveLine' :: MonadIO m => Data -> m ([ByteString], Data)
  205. lazyRecieveLine' cl' =
  206. if isEOF cl'
  207. then eofError "System.IO.Uniform.Streamline.lazyRecieveLine"
  208. else
  209. if BS.null $ buff cl'
  210. then do
  211. dt <- readF cl'
  212. lazyRecieveLine' cl'{buff=dt}{isEOF=BS.null dt}
  213. else do
  214. let l = A.parseOnly lineWithEol $ buff cl'
  215. case l of
  216. Left _ -> do
  217. l' <- readF cl'
  218. (ret, cl'') <- lazyRecieveLine' cl'{buff=l'}{isEOF=BS.null l'}
  219. return ((buff cl') : ret, cl'')
  220. Right (ret, dt) -> return ([ret], cl'{buff=dt})
  221. -- | Recieves the given number of bytes.
  222. recieveN :: MonadIO m => Int -> Streamline m ByteString
  223. recieveN n = LBS.toStrict <$> recieveN' n
  224. -- | Lazy version of recieveN
  225. recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
  226. recieveN' n = Streamline $ \cl ->
  227. do
  228. (tt, cl') <- recieve cl n
  229. return (LBS.fromChunks tt, cl')
  230. where
  231. recieve d b
  232. | isEOF d = eofError "System.IO.Uniform.Streamline.lazyRecieveN"
  233. | BS.null . buff $ d = do
  234. dt <- readF d
  235. recieve d{buff=dt}{isEOF=BS.null dt} b
  236. | b <= (BS.length . buff $ d) = let
  237. (r, dt) = BS.splitAt b $ buff d
  238. in return ([r], d{buff=dt})
  239. | otherwise = do
  240. (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d)
  241. return (buff d : r, d')
  242. -- | Use recieveN'.
  243. lazyRecieveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
  244. {-# DEPRECATED #-}
  245. lazyRecieveN n' = Streamline $ \cl' -> lazyRecieveN' cl' n'
  246. where
  247. lazyRecieveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
  248. lazyRecieveN' cl n =
  249. if isEOF cl
  250. then eofError "System.IO.Uniform.Streamline.lazyRecieveN"
  251. else
  252. if BS.null (buff cl)
  253. then do
  254. b <- readF cl
  255. let eof = BS.null b
  256. let cl' = cl{buff=b}{isEOF=eof}
  257. lazyRecieveN' cl' n
  258. else
  259. if n <= BS.length (buff cl)
  260. then let
  261. ret = [BS.take n (buff cl)]
  262. buff' = BS.drop n (buff cl)
  263. in return (ret, cl{buff=buff'})
  264. else let
  265. cl' = cl{buff=""}
  266. b = buff cl
  267. in fmap (appFst b) $ lazyRecieveN' cl' (n - BS.length b)
  268. appFst :: a -> ([a], b) -> ([a], b)
  269. appFst a (l, b) = (a:l, b)
  270. -- | Recieves data until it matches the argument.
  271. -- Returns all of it, including the matching data.
  272. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
  273. recieveTill t = LBS.toStrict <$> recieveTill' t
  274. -- | Lazy version of recieveTill
  275. recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
  276. recieveTill' t = recieve . BS.unpack $ t
  277. where
  278. recieve t' = scan' [] (textScanner t')
  279. -- | Wraps the streamlined IO target on TLS, streamlining
  280. -- the new wrapper afterwads.
  281. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  282. startTls st = Streamline $ \cl -> do
  283. ds' <- liftIO $ S.startTls st $ str cl
  284. return ((), cl{str=SomeIO ds'}{buff=""})
  285. -- | Runs an Attoparsec parser over the data read from the
  286. -- streamlined IO target. Returns both the parser
  287. -- result and the string consumed by it.
  288. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  289. runAttoparsecAndReturn p = Streamline $ \cl ->
  290. if isEOF cl
  291. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  292. else do
  293. let c = A.parse p $ buff cl
  294. (cl', i, a) <- liftIO $ continueResult cl c
  295. return ((i, a), cl')
  296. where
  297. continueResult :: Data -> A.Result a -> IO (Data, ByteString, (Either String a))
  298. -- tx eof ds
  299. continueResult cl c = case c of
  300. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  301. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  302. A.Partial c' -> do
  303. d <- readF cl
  304. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  305. continueResult cl' (c' d)
  306. -- | Runs an Attoparsec parser over the data read from the
  307. -- streamlined IO target. Returning the parser result.
  308. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  309. runAttoparsec p = Streamline $ \cl ->
  310. if isEOF cl
  311. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  312. else do
  313. let c = A.parse p $ buff cl
  314. (cl', a) <- liftIO $ continueResult cl c
  315. return (a, cl')
  316. where
  317. continueResult :: Data -> A.Result a -> IO (Data, (Either String a))
  318. continueResult cl c = case c of
  319. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  320. A.Done i r -> return (cl{buff=i}, Right r)
  321. A.Partial c' -> do
  322. d <- readF cl
  323. let eof' = BS.null d
  324. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  325. -- | Indicates whether transport layer security is being used.
  326. isSecure :: Monad m => Streamline m Bool
  327. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  328. -- | Sets the timeout for the streamlined IO target.
  329. setTimeout :: Monad m => Int -> Streamline m ()
  330. setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
  331. -- | Sets echo of the streamlines IO target.
  332. -- If echo is set, all the data read an written to the target
  333. -- will be echoed in stdout, with ">" and "<" markers indicating
  334. -- what is read and written.
  335. setEcho :: Monad m => Bool -> Streamline m ()
  336. setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
  337. lineWithEol :: A.Parser (ByteString, ByteString)
  338. lineWithEol = do
  339. l <- A.scan False lineScanner
  340. r <- A.takeByteString
  341. return (l, r)
  342. lineScanner :: Bool -> Word8 -> Maybe Bool
  343. lineScanner False c
  344. | c == (fromIntegral . C.ord $ '\n') = Just True
  345. | otherwise = Just False
  346. lineScanner True _ = Nothing
  347. eofError :: MonadIO m => String -> m a
  348. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
  349. textScanner :: [Word8] -> ([[Word8]] -> Word8 -> IOScannerState [[Word8]])
  350. textScanner [] = \_ _ -> Finished
  351. textScanner t@(c:_) = scanner
  352. where
  353. scanner st c'
  354. | c == c' = popStacks c' $ t:st
  355. | otherwise = popStacks c' st
  356. popStacks :: Word8 -> [[Word8]] -> IOScannerState [[Word8]]
  357. popStacks _ [] = Running []
  358. popStacks _ ([]:_) = Finished
  359. popStacks h ((h':hh):ss)
  360. | h == h' && null hh = case popStacks h ss of
  361. Finished -> Finished
  362. LastPass ss' -> LastPass $ ss'
  363. Running ss' -> LastPass $ ss'
  364. | h == h' = case popStacks h ss of
  365. Finished -> Finished
  366. LastPass ss' -> LastPass $ hh:ss'
  367. Running ss' -> Running $ hh:ss'
  368. | otherwise = popStacks h ss