Streamline.hs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. {-# LANGUAGE OverloadedStrings #-}
  2. {- |
  3. Streamline exports a monad that, given an uniform IO target, emulates
  4. character stream IO using high performance block IO.
  5. -}
  6. module System.IO.Uniform.Streamline (
  7. -- * Basic Type
  8. Streamline,
  9. -- * Running streamline targets
  10. -- ** Single pass runners
  11. withClient,
  12. withServer,
  13. withTarget,
  14. -- ** Several pass runner
  15. StreamlineState,
  16. streamline,
  17. resume,
  18. close,
  19. remaining,
  20. -- * Sending and recieving data
  21. send,
  22. send',
  23. recieveLine,
  24. recieveLine',
  25. recieveN,
  26. recieveN',
  27. -- ** Running a parser
  28. runAttoparsec,
  29. runAttoparsecAndReturn,
  30. -- ** Scanning the input
  31. runScanner,
  32. runScanner',
  33. scan,
  34. scan',
  35. recieveTill,
  36. recieveTill',
  37. -- ** Deprecated functions
  38. lazyRecieveLine,
  39. lazyRecieveN,
  40. -- * Behavior settings
  41. startTls,
  42. isSecure,
  43. setTimeout,
  44. echoTo,
  45. setEcho
  46. ) where
  47. import System.IO (stdout, Handle)
  48. import qualified System.IO.Uniform as S
  49. import qualified System.IO.Uniform.Network as N
  50. import qualified System.IO.Uniform.Std as Std
  51. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  52. import System.IO.Uniform.Streamline.Scanner
  53. import Data.Default.Class
  54. import Control.Monad.Trans.Class
  55. import Control.Monad (ap)
  56. import Control.Monad.IO.Class
  57. import System.IO.Error
  58. import Data.ByteString (ByteString)
  59. import qualified Data.ByteString as BS
  60. import qualified Data.ByteString.Lazy as LBS
  61. import Data.Word8 (Word8)
  62. import Data.IP (IP)
  63. import qualified Data.Char as C
  64. import qualified Data.Attoparsec.ByteString as A
  65. -- | Internal state for a Streamline monad
  66. data StreamlineState = StreamlineState {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Maybe Handle}
  67. instance Default StreamlineState where
  68. -- | Will open StdIO
  69. def = StreamlineState (SomeIO Std.StdIO) defaultTimeout BS.empty False Nothing
  70. -- | Monad that emulates character stream IO over block IO.
  71. newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)}
  72. blockSize :: Int
  73. blockSize = 4096
  74. defaultTimeout :: Int
  75. defaultTimeout = 1000000 * 600
  76. readF :: MonadIO m => StreamlineState -> m ByteString
  77. readF cl = case echo cl of
  78. Just h -> do
  79. l <- liftIO $ S.uRead (str cl) blockSize
  80. liftIO $ BS.hPutStr h "<"
  81. liftIO $ BS.hPutStr h l
  82. return l
  83. Nothing -> liftIO $ S.uRead (str cl) blockSize
  84. writeF :: MonadIO m => StreamlineState -> ByteString -> m ()
  85. writeF cl l = case echo cl of
  86. Just h -> do
  87. liftIO $ BS.hPutStr h ">"
  88. liftIO $ BS.hPutStr h l
  89. liftIO $ S.uPut (str cl) l
  90. Nothing -> liftIO $ S.uPut (str cl) l
  91. -- | withServer f serverIP port
  92. --
  93. -- Connects to the given server port, runs f, and closes the connection.
  94. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
  95. withServer host port f = do
  96. ds <- liftIO $ N.connectTo host port
  97. (ret, _) <- withTarget' f def{str=SomeIO ds}
  98. liftIO $ S.uClose ds
  99. return ret
  100. -- | withClient f boundPort
  101. --
  102. -- Accepts a connection at the bound port, runs f and closes the connection.
  103. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
  104. withClient port f = do
  105. ds <- liftIO $ N.accept port
  106. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  107. (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds}
  108. liftIO $ S.uClose ds
  109. return ret
  110. {- |
  111. withTarget f someIO
  112. Runs f wrapped on a Streamline monad that does IO on someIO.
  113. -}
  114. withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b
  115. withTarget s f = do
  116. (r, _) <- withTarget' f def{str=SomeIO s}
  117. return r
  118. {- |
  119. Run f wrapped on a Streamline monad, returning the final state in a way that
  120. can be continued with "resume".
  121. If run with this function, the state must be closed, explicitly with "close" or
  122. implicitly with "remaining".
  123. -}
  124. streamline :: (Monad m, UniformIO a) => a -> Streamline m b -> m (b, StreamlineState)
  125. streamline s f = withTarget' f def{str=SomeIO s}
  126. {- |
  127. Continues the execution of functions on a Streamline monad comming from
  128. "start" or another "resume" call.
  129. -}
  130. resume :: Monad m => StreamlineState -> Streamline m b -> m (b, StreamlineState)
  131. resume dt f = withTarget' f dt
  132. instance Monad m => Monad (Streamline m) where
  133. --return :: (Monad m) => a -> Streamline m a
  134. return x = Streamline $ \cl -> return (x, cl)
  135. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  136. a >>= b = Streamline $ \cl -> do
  137. (x, cl') <- withTarget' a cl
  138. withTarget' (b x) cl'
  139. instance Monad m => Functor (Streamline m) where
  140. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  141. fmap f m = Streamline $ \cl -> do
  142. (x, cl') <- withTarget' m cl
  143. return (f x, cl')
  144. instance (Functor m, Monad m) => Applicative (Streamline m) where
  145. pure = return
  146. (<*>) = ap
  147. instance MonadTrans Streamline where
  148. --lift :: Monad m => m a -> Streamline m a
  149. lift x = Streamline $ \cl -> do
  150. a <- x
  151. return (a, cl)
  152. instance MonadIO m => MonadIO (Streamline m) where
  153. liftIO = lift . liftIO
  154. -- | Sends data over the IO target.
  155. send :: MonadIO m => ByteString -> Streamline m ()
  156. send r = Streamline $ \cl -> do
  157. writeF cl r
  158. return ((), cl)
  159. -- | Sends data from a lazy byte string
  160. send' :: MonadIO m => LBS.ByteString -> Streamline m ()
  161. send' r = Streamline $ \cl -> do
  162. let dd = LBS.toChunks r
  163. mapM (writeF cl) dd
  164. return ((), cl)
  165. {- |
  166. Very much like Attoparsec's runScanner:
  167. runScanner scanner initial_state
  168. Recieves data, running the scanner on each byte,
  169. using the scanner result as initial state for the
  170. next byte, and stopping when the scanner returns
  171. Nothing.
  172. Returns the scanned ByteString.
  173. -}
  174. runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
  175. runScanner state scanner = do
  176. (rt, st) <- runScanner' state scanner
  177. return (LBS.toStrict rt, st)
  178. -- | Equivalent to runScanner, but returns a strict, completely
  179. -- evaluated ByteString.
  180. runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
  181. runScanner' state scanner = Streamline $ \d ->
  182. do
  183. (tx, st, d') <- in_scan d state
  184. return ((LBS.fromChunks tx, st), d')
  185. where
  186. --in_scan :: StreamlineState -> s -> m ([ByteString], s, StreamlineState)
  187. in_scan d st
  188. | isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
  189. | BS.null (buff d) = do
  190. dt <- readF d
  191. if BS.null dt
  192. then return ([], st, d{isEOF=True})
  193. else in_scan d{buff=dt} st
  194. | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of
  195. AllInput st' -> do
  196. (tx', st'', d') <- in_scan d{buff=""} st'
  197. return (buff d:tx', st'', d')
  198. SplitAt n st' -> let
  199. (r, i) = BS.splitAt n (buff d)
  200. in return ([r], st', d{buff=i})
  201. -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
  202. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
  203. sscan _ s0 _ [] = AllInput s0
  204. sscan s s0 i (w:ww) = case s s0 w of
  205. Finished -> SplitAt i s0
  206. LastPass s1 -> SplitAt (i+1) s1
  207. Running s1 -> sscan s s1 (i+1) ww
  208. data ScanResult s = SplitAt Int s | AllInput s
  209. -- | Equivalent to runScanner, but discards the final state
  210. scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
  211. scan state scanner = fst <$> runScanner state scanner
  212. -- | Equivalent to runScanner', but discards the final state
  213. scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
  214. scan' state scanner = fst <$> runScanner' state scanner
  215. -- | Recieves data untill the next end of line (\n or \r\n)
  216. recieveLine :: MonadIO m => Streamline m ByteString
  217. recieveLine = recieveTill "\n"
  218. -- | Lazy version of recieveLine
  219. recieveLine' :: MonadIO m => Streamline m LBS.ByteString
  220. recieveLine' = recieveTill' "\n"
  221. -- | Use recieveLine'.
  222. lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
  223. {-# DEPRECATED #-}
  224. lazyRecieveLine = Streamline $ \cl -> lazyRecieveLine' cl
  225. where
  226. lazyRecieveLine' :: MonadIO m => StreamlineState -> m ([ByteString], StreamlineState)
  227. lazyRecieveLine' cl' =
  228. if isEOF cl'
  229. then eofError "System.IO.Uniform.Streamline.lazyRecieveLine"
  230. else
  231. if BS.null $ buff cl'
  232. then do
  233. dt <- readF cl'
  234. lazyRecieveLine' cl'{buff=dt}{isEOF=BS.null dt}
  235. else do
  236. let l = A.parseOnly lineWithEol $ buff cl'
  237. case l of
  238. Left _ -> do
  239. l' <- readF cl'
  240. (ret, cl'') <- lazyRecieveLine' cl'{buff=l'}{isEOF=BS.null l'}
  241. return ((buff cl') : ret, cl'')
  242. Right (ret, dt) -> return ([ret], cl'{buff=dt})
  243. -- | Recieves the given number of bytes.
  244. recieveN :: MonadIO m => Int -> Streamline m ByteString
  245. recieveN n = LBS.toStrict <$> recieveN' n
  246. -- | Lazy version of recieveN
  247. recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
  248. recieveN' n | n <= 0 = return ""
  249. | otherwise = Streamline $ \cl ->
  250. do
  251. (tt, cl') <- recieve cl n
  252. return (LBS.fromChunks tt, cl')
  253. where
  254. recieve d b
  255. | isEOF d = eofError "System.IO.Uniform.Streamline.lazyRecieveN"
  256. | BS.null . buff $ d = do
  257. dt <- readF d
  258. recieve d{buff=dt}{isEOF=BS.null dt} b
  259. | b <= (BS.length . buff $ d) = let
  260. (r, dt) = BS.splitAt b $ buff d
  261. in return ([r], d{buff=dt})
  262. | otherwise = do
  263. (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d)
  264. return (buff d : r, d')
  265. -- | Use recieveN'.
  266. lazyRecieveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
  267. {-# DEPRECATED #-}
  268. lazyRecieveN n' = Streamline $ \cl' -> lazyRecieveN' cl' n'
  269. where
  270. lazyRecieveN' :: (Functor m, MonadIO m) => StreamlineState -> Int -> m ([ByteString], StreamlineState)
  271. lazyRecieveN' cl n =
  272. if isEOF cl
  273. then eofError "System.IO.Uniform.Streamline.lazyRecieveN"
  274. else
  275. if BS.null (buff cl)
  276. then do
  277. b <- readF cl
  278. let eof = BS.null b
  279. let cl' = cl{buff=b}{isEOF=eof}
  280. lazyRecieveN' cl' n
  281. else
  282. if n <= BS.length (buff cl)
  283. then let
  284. ret = [BS.take n (buff cl)]
  285. buff' = BS.drop n (buff cl)
  286. in return (ret, cl{buff=buff'})
  287. else let
  288. cl' = cl{buff=""}
  289. b = buff cl
  290. in fmap (appFst b) $ lazyRecieveN' cl' (n - BS.length b)
  291. appFst :: a -> ([a], b) -> ([a], b)
  292. appFst a (l, b) = (a:l, b)
  293. -- | Recieves data until it matches the argument.
  294. -- Returns all of it, including the matching data.
  295. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
  296. recieveTill t = LBS.toStrict <$> recieveTill' t
  297. -- | Lazy version of recieveTill
  298. recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
  299. recieveTill' t = recieve . BS.unpack $ t
  300. where
  301. recieve t' = scan' [] (textScanner t')
  302. -- | Wraps the streamlined IO target on TLS, streamlining
  303. -- the new wrapper afterwads.
  304. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  305. startTls st = Streamline $ \cl -> do
  306. ds' <- liftIO $ S.startTls st $ str cl
  307. return ((), cl{str=SomeIO ds'}{buff=""})
  308. -- | Runs an Attoparsec parser over the data read from the
  309. -- streamlined IO target. Returns both the parser
  310. -- result and the string consumed by it.
  311. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  312. runAttoparsecAndReturn p = Streamline $ \cl ->
  313. if isEOF cl
  314. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  315. else do
  316. let c = A.parse p $ buff cl
  317. (cl', i, a) <- liftIO $ continueResult cl c
  318. return ((i, a), cl')
  319. where
  320. continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, ByteString, (Either String a))
  321. -- tx eof ds
  322. continueResult cl c = case c of
  323. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  324. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  325. A.Partial c' -> do
  326. d <- readF cl
  327. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  328. continueResult cl' (c' d)
  329. -- | Runs an Attoparsec parser over the data read from the
  330. -- streamlined IO target. Returning the parser result.
  331. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  332. runAttoparsec p = Streamline $ \cl ->
  333. if isEOF cl
  334. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  335. else do
  336. let c = A.parse p $ buff cl
  337. (cl', a) <- liftIO $ continueResult cl c
  338. return (a, cl')
  339. where
  340. continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, (Either String a))
  341. continueResult cl c = case c of
  342. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  343. A.Done i r -> return (cl{buff=i}, Right r)
  344. A.Partial c' -> do
  345. d <- readF cl
  346. let eof' = BS.null d
  347. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  348. -- | Indicates whether transport layer security is being used.
  349. isSecure :: Monad m => Streamline m Bool
  350. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  351. -- | Sets the timeout for the streamlined IO target.
  352. setTimeout :: Monad m => Int -> Streamline m ()
  353. setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
  354. -- | Sets echo of the streamlines IO target.
  355. -- If echo is set, all the data read an written to the target
  356. -- will be echoed in stdout, with ">" and "<" markers indicating
  357. -- what is read and written.
  358. setEcho :: Monad m => Bool -> Streamline m ()
  359. setEcho e = Streamline $ \cl ->
  360. if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing})
  361. {- |
  362. Sets echo of the streamlined IO target.
  363. If echo is set, all the data read an written to the target
  364. will be echoed to the handle, with ">" and "<" markers indicating
  365. what is read and written.
  366. Setting to Nothing will disable echo.
  367. -}
  368. echoTo :: Monad m => Maybe Handle -> Streamline m ()
  369. echoTo h = Streamline $ \cl -> return ((), cl{echo=h})
  370. lineWithEol :: A.Parser (ByteString, ByteString)
  371. lineWithEol = do
  372. l <- A.scan False lineScanner
  373. r <- A.takeByteString
  374. return (l, r)
  375. eofError :: MonadIO m => String -> m a
  376. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
  377. lineScanner :: Bool -> Word8 -> Maybe Bool
  378. lineScanner False c
  379. | c == (fromIntegral . C.ord $ '\n') = Just True
  380. | otherwise = Just False
  381. lineScanner True _ = Nothing
  382. {- |
  383. Closes the target of a streamline state, releasing any used resource.
  384. -}
  385. close :: MonadIO m => StreamlineState -> m ()
  386. close st = liftIO . S.uClose . str $ st
  387. {- |
  388. Retrieves the remaining contents of a streamline state, closing it afterwards.
  389. -}
  390. remaining :: MonadIO m => StreamlineState -> m LBS.ByteString
  391. remaining st =
  392. if isEOF st then close st >> return LBS.empty
  393. else
  394. if BS.null . buff $ st
  395. then do
  396. dt <- readF st
  397. remaining st{buff=dt}{isEOF=BS.null dt}
  398. else do
  399. dt' <- remaining st{buff=BS.empty}
  400. return $ LBS.append (LBS.fromStrict . buff $ st) dt'