Streamline.hs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. {-# LANGUAGE OverloadedStrings #-}
  2. {- |
  3. Streamline exports a monad that, given an uniform IO target, emulates
  4. character stream IO using high performance block IO.
  5. -}
  6. module System.IO.Uniform.Streamline (
  7. -- * Basic Type
  8. Streamline,
  9. -- * Running streamline targets
  10. -- ** Single pass runners
  11. withClient,
  12. withServer,
  13. withTarget,
  14. -- ** Several pass runner
  15. StreamlineState,
  16. streamline,
  17. resume,
  18. close,
  19. remaining,
  20. -- * Sending and recieving data
  21. send,
  22. send',
  23. recieveLine,
  24. recieveLine',
  25. recieveN,
  26. recieveN',
  27. -- ** Running a parser
  28. runAttoparsec,
  29. runAttoparsecAndReturn,
  30. -- ** Scanning the input
  31. runScanner,
  32. runScanner',
  33. scan,
  34. scan',
  35. recieveTill,
  36. recieveTill',
  37. -- ** Deprecated functions
  38. lazyRecieveLine,
  39. lazyRecieveN,
  40. -- * Behavior settings
  41. startTls,
  42. isSecure,
  43. setTimeout,
  44. echoTo,
  45. setEcho
  46. ) where
  47. import System.IO (stdout, Handle)
  48. import qualified System.IO.Uniform as S
  49. import qualified System.IO.Uniform.Network as N
  50. import qualified System.IO.Uniform.Std as Std
  51. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  52. import System.IO.Uniform.Streamline.Scanner
  53. import Data.Default.Class
  54. import Control.Monad.Trans.Class
  55. import Control.Applicative
  56. import Control.Monad (ap)
  57. import Control.Monad.IO.Class
  58. import System.IO.Error
  59. import Data.ByteString (ByteString)
  60. import qualified Data.ByteString as BS
  61. import qualified Data.ByteString.Lazy as LBS
  62. import Data.Word8 (Word8)
  63. import Data.IP (IP)
  64. import qualified Data.Char as C
  65. import qualified Data.Attoparsec.ByteString as A
  66. -- | Internal state for a Streamline monad
  67. data StreamlineState = StreamlineState {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Maybe Handle}
  68. instance Default StreamlineState where
  69. -- | Will open StdIO
  70. def = StreamlineState (SomeIO Std.StdIO) defaultTimeout BS.empty False Nothing
  71. -- | Monad that emulates character stream IO over block IO.
  72. newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)}
  73. blockSize :: Int
  74. blockSize = 4096
  75. defaultTimeout :: Int
  76. defaultTimeout = 1000000 * 600
  77. readF :: MonadIO m => StreamlineState -> m ByteString
  78. readF cl = case echo cl of
  79. Just h -> do
  80. l <- liftIO $ S.uRead (str cl) blockSize
  81. liftIO $ BS.hPutStr h "<"
  82. liftIO $ BS.hPutStr h l
  83. return l
  84. Nothing -> liftIO $ S.uRead (str cl) blockSize
  85. writeF :: MonadIO m => StreamlineState -> ByteString -> m ()
  86. writeF cl l = case echo cl of
  87. Just h -> do
  88. liftIO $ BS.hPutStr h ">"
  89. liftIO $ BS.hPutStr h l
  90. liftIO $ S.uPut (str cl) l
  91. Nothing -> liftIO $ S.uPut (str cl) l
  92. -- | withServer f serverIP port
  93. --
  94. -- Connects to the given server port, runs f, and closes the connection.
  95. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
  96. withServer host port f = do
  97. ds <- liftIO $ N.connectTo host port
  98. (ret, _) <- withTarget' f def{str=SomeIO ds}
  99. liftIO $ S.uClose ds
  100. return ret
  101. -- | withClient f boundPort
  102. --
  103. -- Accepts a connection at the bound port, runs f and closes the connection.
  104. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
  105. withClient port f = do
  106. ds <- liftIO $ N.accept port
  107. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  108. (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds}
  109. liftIO $ S.uClose ds
  110. return ret
  111. {- |
  112. withTarget f someIO
  113. Runs f wrapped on a Streamline monad that does IO on someIO.
  114. -}
  115. withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b
  116. withTarget s f = do
  117. (r, _) <- withTarget' f def{str=SomeIO s}
  118. return r
  119. {- |
  120. Run f wrapped on a Streamline monad, returning the final state in a way that
  121. can be continued with "resume".
  122. If run with this function, the state must be closed, explicitly with "close" or
  123. implicitly with "remaining".
  124. -}
  125. streamline :: (Monad m, UniformIO a) => a -> Streamline m b -> m (b, StreamlineState)
  126. streamline s f = withTarget' f def{str=SomeIO s}
  127. {- |
  128. Continues the execution of functions on a Streamline monad comming from
  129. "start" or another "resume" call.
  130. -}
  131. resume :: Monad m => StreamlineState -> Streamline m b -> m (b, StreamlineState)
  132. resume dt f = withTarget' f dt
  133. instance Monad m => Monad (Streamline m) where
  134. --return :: (Monad m) => a -> Streamline m a
  135. return x = Streamline $ \cl -> return (x, cl)
  136. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  137. a >>= b = Streamline $ \cl -> do
  138. (x, cl') <- withTarget' a cl
  139. withTarget' (b x) cl'
  140. instance Monad m => Functor (Streamline m) where
  141. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  142. fmap f m = Streamline $ \cl -> do
  143. (x, cl') <- withTarget' m cl
  144. return (f x, cl')
  145. instance (Functor m, Monad m) => Applicative (Streamline m) where
  146. pure = return
  147. (<*>) = ap
  148. instance MonadTrans Streamline where
  149. --lift :: Monad m => m a -> Streamline m a
  150. lift x = Streamline $ \cl -> do
  151. a <- x
  152. return (a, cl)
  153. instance MonadIO m => MonadIO (Streamline m) where
  154. liftIO = lift . liftIO
  155. -- | Sends data over the IO target.
  156. send :: MonadIO m => ByteString -> Streamline m ()
  157. send r = Streamline $ \cl -> do
  158. writeF cl r
  159. return ((), cl)
  160. -- | Sends data from a lazy byte string
  161. send' :: MonadIO m => LBS.ByteString -> Streamline m ()
  162. send' r = Streamline $ \cl -> do
  163. let dd = LBS.toChunks r
  164. mapM (writeF cl) dd
  165. return ((), cl)
  166. {- |
  167. Very much like Attoparsec's runScanner:
  168. runScanner scanner initial_state
  169. Recieves data, running the scanner on each byte,
  170. using the scanner result as initial state for the
  171. next byte, and stopping when the scanner returns
  172. Nothing.
  173. Returns the scanned ByteString.
  174. -}
  175. runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
  176. runScanner state scanner = do
  177. (rt, st) <- runScanner' state scanner
  178. return (LBS.toStrict rt, st)
  179. -- | Equivalent to runScanner, but returns a strict, completely
  180. -- evaluated ByteString.
  181. runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
  182. runScanner' state scanner = Streamline $ \d ->
  183. do
  184. (tx, st, d') <- in_scan d state
  185. return ((LBS.fromChunks tx, st), d')
  186. where
  187. --in_scan :: StreamlineState -> s -> m ([ByteString], s, StreamlineState)
  188. in_scan d st
  189. | isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
  190. | BS.null (buff d) = do
  191. dt <- readF d
  192. if BS.null dt
  193. then return ([], st, d{isEOF=True})
  194. else in_scan d{buff=dt} st
  195. | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of
  196. AllInput st' -> do
  197. (tx', st'', d') <- in_scan d{buff=""} st'
  198. return (buff d:tx', st'', d')
  199. SplitAt n st' -> let
  200. (r, i) = BS.splitAt n (buff d)
  201. in return ([r], st', d{buff=i})
  202. -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
  203. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
  204. sscan _ s0 _ [] = AllInput s0
  205. sscan s s0 i (w:ww) = case s s0 w of
  206. Finished -> SplitAt i s0
  207. LastPass s1 -> SplitAt (i+1) s1
  208. Running s1 -> sscan s s1 (i+1) ww
  209. data ScanResult s = SplitAt Int s | AllInput s
  210. -- | Equivalent to runScanner, but discards the final state
  211. scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
  212. scan state scanner = fst <$> runScanner state scanner
  213. -- | Equivalent to runScanner', but discards the final state
  214. scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
  215. scan' state scanner = fst <$> runScanner' state scanner
  216. -- | Recieves data untill the next end of line (\n or \r\n)
  217. recieveLine :: MonadIO m => Streamline m ByteString
  218. recieveLine = recieveTill "\n"
  219. -- | Lazy version of recieveLine
  220. recieveLine' :: MonadIO m => Streamline m LBS.ByteString
  221. recieveLine' = recieveTill' "\n"
  222. -- | Use recieveLine'.
  223. lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
  224. {-# DEPRECATED #-}
  225. lazyRecieveLine = Streamline $ \cl -> lazyRecieveLine' cl
  226. where
  227. lazyRecieveLine' :: MonadIO m => StreamlineState -> m ([ByteString], StreamlineState)
  228. lazyRecieveLine' cl' =
  229. if isEOF cl'
  230. then eofError "System.IO.Uniform.Streamline.lazyRecieveLine"
  231. else
  232. if BS.null $ buff cl'
  233. then do
  234. dt <- readF cl'
  235. lazyRecieveLine' cl'{buff=dt}{isEOF=BS.null dt}
  236. else do
  237. let l = A.parseOnly lineWithEol $ buff cl'
  238. case l of
  239. Left _ -> do
  240. l' <- readF cl'
  241. (ret, cl'') <- lazyRecieveLine' cl'{buff=l'}{isEOF=BS.null l'}
  242. return ((buff cl') : ret, cl'')
  243. Right (ret, dt) -> return ([ret], cl'{buff=dt})
  244. -- | Recieves the given number of bytes.
  245. recieveN :: MonadIO m => Int -> Streamline m ByteString
  246. recieveN n = LBS.toStrict <$> recieveN' n
  247. -- | Lazy version of recieveN
  248. recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
  249. recieveN' n | n <= 0 = return ""
  250. | otherwise = Streamline $ \cl ->
  251. do
  252. (tt, cl') <- recieve cl n
  253. return (LBS.fromChunks tt, cl')
  254. where
  255. recieve d b
  256. | isEOF d = eofError "System.IO.Uniform.Streamline.lazyRecieveN"
  257. | BS.null . buff $ d = do
  258. dt <- readF d
  259. recieve d{buff=dt}{isEOF=BS.null dt} b
  260. | b <= (BS.length . buff $ d) = let
  261. (r, dt) = BS.splitAt b $ buff d
  262. in return ([r], d{buff=dt})
  263. | otherwise = do
  264. (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d)
  265. return (buff d : r, d')
  266. -- | Use recieveN'.
  267. lazyRecieveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
  268. {-# DEPRECATED #-}
  269. lazyRecieveN n' = Streamline $ \cl' -> lazyRecieveN' cl' n'
  270. where
  271. lazyRecieveN' :: (Functor m, MonadIO m) => StreamlineState -> Int -> m ([ByteString], StreamlineState)
  272. lazyRecieveN' cl n =
  273. if isEOF cl
  274. then eofError "System.IO.Uniform.Streamline.lazyRecieveN"
  275. else
  276. if BS.null (buff cl)
  277. then do
  278. b <- readF cl
  279. let eof = BS.null b
  280. let cl' = cl{buff=b}{isEOF=eof}
  281. lazyRecieveN' cl' n
  282. else
  283. if n <= BS.length (buff cl)
  284. then let
  285. ret = [BS.take n (buff cl)]
  286. buff' = BS.drop n (buff cl)
  287. in return (ret, cl{buff=buff'})
  288. else let
  289. cl' = cl{buff=""}
  290. b = buff cl
  291. in fmap (appFst b) $ lazyRecieveN' cl' (n - BS.length b)
  292. appFst :: a -> ([a], b) -> ([a], b)
  293. appFst a (l, b) = (a:l, b)
  294. -- | Recieves data until it matches the argument.
  295. -- Returns all of it, including the matching data.
  296. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
  297. recieveTill t = LBS.toStrict <$> recieveTill' t
  298. -- | Lazy version of recieveTill
  299. recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
  300. recieveTill' t = recieve . BS.unpack $ t
  301. where
  302. recieve t' = scan' [] (textScanner t')
  303. -- | Wraps the streamlined IO target on TLS, streamlining
  304. -- the new wrapper afterwads.
  305. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  306. startTls st = Streamline $ \cl -> do
  307. ds' <- liftIO $ S.startTls st $ str cl
  308. return ((), cl{str=SomeIO ds'}{buff=""})
  309. -- | Runs an Attoparsec parser over the data read from the
  310. -- streamlined IO target. Returns both the parser
  311. -- result and the string consumed by it.
  312. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  313. runAttoparsecAndReturn p = Streamline $ \cl ->
  314. if isEOF cl
  315. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  316. else do
  317. let c = A.parse p $ buff cl
  318. (cl', i, a) <- liftIO $ continueResult cl c
  319. return ((i, a), cl')
  320. where
  321. continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, ByteString, (Either String a))
  322. -- tx eof ds
  323. continueResult cl c = case c of
  324. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  325. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  326. A.Partial c' -> do
  327. d <- readF cl
  328. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  329. continueResult cl' (c' d)
  330. -- | Runs an Attoparsec parser over the data read from the
  331. -- streamlined IO target. Returning the parser result.
  332. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  333. runAttoparsec p = Streamline $ \cl ->
  334. if isEOF cl
  335. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  336. else do
  337. let c = A.parse p $ buff cl
  338. (cl', a) <- liftIO $ continueResult cl c
  339. return (a, cl')
  340. where
  341. continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, (Either String a))
  342. continueResult cl c = case c of
  343. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  344. A.Done i r -> return (cl{buff=i}, Right r)
  345. A.Partial c' -> do
  346. d <- readF cl
  347. let eof' = BS.null d
  348. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  349. -- | Indicates whether transport layer security is being used.
  350. isSecure :: Monad m => Streamline m Bool
  351. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  352. -- | Sets the timeout for the streamlined IO target.
  353. setTimeout :: Monad m => Int -> Streamline m ()
  354. setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
  355. -- | Sets echo of the streamlines IO target.
  356. -- If echo is set, all the data read an written to the target
  357. -- will be echoed in stdout, with ">" and "<" markers indicating
  358. -- what is read and written.
  359. setEcho :: Monad m => Bool -> Streamline m ()
  360. setEcho e = Streamline $ \cl ->
  361. if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing})
  362. {- |
  363. Sets echo of the streamlined IO target.
  364. If echo is set, all the data read an written to the target
  365. will be echoed to the handle, with ">" and "<" markers indicating
  366. what is read and written.
  367. Setting to Nothing will disable echo.
  368. -}
  369. echoTo :: Monad m => Maybe Handle -> Streamline m ()
  370. echoTo h = Streamline $ \cl -> return ((), cl{echo=h})
  371. lineWithEol :: A.Parser (ByteString, ByteString)
  372. lineWithEol = do
  373. l <- A.scan False lineScanner
  374. r <- A.takeByteString
  375. return (l, r)
  376. eofError :: MonadIO m => String -> m a
  377. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
  378. lineScanner :: Bool -> Word8 -> Maybe Bool
  379. lineScanner False c
  380. | c == (fromIntegral . C.ord $ '\n') = Just True
  381. | otherwise = Just False
  382. lineScanner True _ = Nothing
  383. {- |
  384. Closes the target of a streamline state, releasing any used resource.
  385. -}
  386. close :: MonadIO m => StreamlineState -> m ()
  387. close st = liftIO . S.uClose . str $ st
  388. {- |
  389. Retrieves the remaining contents of a streamline state, closing it afterwards.
  390. -}
  391. remaining :: MonadIO m => StreamlineState -> m LBS.ByteString
  392. remaining st =
  393. if isEOF st then close st >> return LBS.empty
  394. else
  395. if BS.null . buff $ st
  396. then do
  397. dt <- readF st
  398. remaining st{buff=dt}{isEOF=BS.null dt}
  399. else do
  400. dt' <- remaining st{buff=BS.empty}
  401. return $ LBS.append (LBS.fromStrict . buff $ st) dt'