Streamline.hs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. {-# LANGUAGE OverloadedStrings, TypeFamilies, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
  2. {- |
  3. Streamline exports a monad that, given an uniform IO target, emulates
  4. character stream IO using high performance block IO.
  5. -}
  6. module System.IO.Uniform.Streamline (
  7. -- * Basic Type
  8. Streamline,
  9. -- * Running streamline targets
  10. -- ** Single pass runners
  11. withClient,
  12. withServer,
  13. withTarget,
  14. -- ** Interruptible support
  15. inStreamlineCtx,
  16. peelStreamlineCtx,
  17. closeTarget,
  18. -- * Sending and recieving data
  19. send,
  20. send',
  21. recieveLine,
  22. recieveLine',
  23. recieveN,
  24. recieveN',
  25. -- ** Running a parser
  26. runAttoparsec,
  27. runAttoparsecAndReturn,
  28. -- ** Scanning the input
  29. runScanner,
  30. runScanner',
  31. scan,
  32. scan',
  33. recieveTill,
  34. recieveTill',
  35. -- * Behavior settings
  36. startTls,
  37. isSecure,
  38. transformTarget,
  39. limitInput,
  40. echoTo,
  41. setEcho
  42. ) where
  43. import System.IO (stdout, Handle)
  44. import qualified System.IO.Uniform as S
  45. import qualified System.IO.Uniform.Network as N
  46. import qualified System.IO.Uniform.Std as Std
  47. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  48. import System.IO.Uniform.Streamline.Scanner
  49. import Data.Default.Class
  50. import Control.Monad.Trans.Class
  51. import Control.Monad.Trans.Interruptible
  52. import Control.Monad.Trans.Control
  53. import Control.Monad
  54. import Control.Monad.Base
  55. import Control.Monad.IO.Class
  56. import System.IO.Error
  57. import Data.ByteString (ByteString)
  58. import qualified Data.ByteString as BS
  59. import qualified Data.ByteString.Lazy as LBS
  60. import Data.Word8 (Word8)
  61. import Data.IP (IP)
  62. import qualified Data.Attoparsec.ByteString as A
  63. -- | Internal state for a Streamline monad
  64. data StreamlineState = StreamlineState {str :: SomeIO, buff :: ByteString, isEOF :: Bool, echo :: Maybe Handle, inLimit :: Int, sentEmpty :: Bool}
  65. instance Default StreamlineState where
  66. -- | Will open StdIO
  67. def = StreamlineState (SomeIO Std.StdIO) BS.empty False Nothing (-1) False
  68. -- | Monad that emulates character stream IO over block IO.
  69. newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)}
  70. blockSize :: Int
  71. blockSize = 4096
  72. readF :: MonadIO m => Streamline m ()
  73. readF = -- Must try just not to read more than the limit, actual limiting is done by takeBuff
  74. Streamline $ \cl -> if not . BS.null . buff $ cl then return ((), cl)
  75. else do
  76. let lim = inLimit cl
  77. sz = if lim < 0 then blockSize
  78. else if lim <= blockSize then lim
  79. else blockSize
  80. l <- liftIO $ S.uRead (str cl) sz
  81. let cl' = cl{buff= l}
  82. case echo cl of
  83. Just h -> do
  84. liftIO $ BS.hPutStr h "< "
  85. liftIO $ BS.hPutStr h l
  86. Nothing -> return ()
  87. return ((), cl')
  88. -- | Takes the buffer for processing
  89. takeBuff :: MonadIO m => Streamline m ByteString
  90. takeBuff = do
  91. readF
  92. Streamline $ \cl ->
  93. let lim = inLimit cl
  94. eof = isEOF cl
  95. b = buff cl
  96. in if eof then eofError "System.IO.Uniform.Streamline"
  97. else if lim < 0 then return (b, cl{buff="", isEOF=BS.null b})
  98. else let (r, b') = BS.splitAt lim b
  99. in return (r, cl{
  100. -- EOF is at the real end of file, not on limited input
  101. isEOF = lim /= 0 && (BS.null b || sentEmpty cl),
  102. sentEmpty = BS.null r,
  103. buff = b',
  104. inLimit = lim - BS.length r
  105. })
  106. -- | Pushes remaining data back into the buffer
  107. pushBuff :: Monad m => ByteString -> Streamline m ()
  108. pushBuff dt = Streamline $ \cl -> let
  109. lim = inLimit cl
  110. b = buff cl
  111. newb = BS.append dt b
  112. newl = if lim < 0 then lim else lim + BS.length dt
  113. in return ((), cl{buff=newb, inLimit=newl})
  114. writeF :: MonadIO m => ByteString -> Streamline m ()
  115. writeF l = Streamline $ \cl -> case echo cl of
  116. Just h -> do
  117. liftIO $ BS.hPutStr h "> "
  118. liftIO $ BS.hPutStr h l
  119. liftIO $ S.uPut (str cl) l
  120. return ((), cl)
  121. Nothing -> liftIO $ S.uPut (str cl) l >> return ((), cl)
  122. -- | > withServer f serverIP port
  123. --
  124. -- Connects to the given server port, runs f, and closes the connection.
  125. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
  126. withServer host port f = do
  127. ds <- liftIO $ N.connectTo host port
  128. (ret, _) <- withTarget' f def{str=SomeIO ds}
  129. liftIO $ S.uClose ds
  130. return ret
  131. -- | > withClient f boundPort
  132. --
  133. -- Accepts a connection at the bound port, runs f and closes the connection.
  134. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
  135. withClient port f = do
  136. ds <- liftIO $ N.accept port
  137. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  138. (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds}
  139. liftIO $ S.uClose ds
  140. return ret
  141. {- |
  142. > withTarget f someIO
  143. Runs f wrapped on a Streamline monad that does IO on someIO.
  144. -}
  145. withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b
  146. withTarget s f = do
  147. (r, _) <- withTarget' f def{str=SomeIO s}
  148. return r
  149. instance Monad m => Monad (Streamline m) where
  150. --return :: (Monad m) => a -> Streamline m a
  151. return x = Streamline $ \cl -> return (x, cl)
  152. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  153. a >>= b = Streamline $ \cl -> do
  154. (x, cl') <- withTarget' a cl
  155. withTarget' (b x) cl'
  156. instance Monad m => Functor (Streamline m) where
  157. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  158. fmap f m = Streamline $ \cl -> do
  159. (x, cl') <- withTarget' m cl
  160. return (f x, cl')
  161. instance Monad m => Applicative (Streamline m) where
  162. pure = return
  163. (<*>) = ap
  164. instance MonadTrans Streamline where
  165. --lift :: Monad m => m a -> Streamline m a
  166. lift x = Streamline $ \cl -> do
  167. a <- x
  168. return (a, cl)
  169. instance MonadIO m => MonadIO (Streamline m) where
  170. liftIO = lift . liftIO
  171. -- | Sends data over the IO target.
  172. send :: MonadIO m => ByteString -> Streamline m ()
  173. send r = writeF r
  174. -- | Sends data from a lazy byte string
  175. send' :: MonadIO m => LBS.ByteString -> Streamline m ()
  176. send' r = do
  177. let dd = LBS.toChunks r
  178. mapM_ writeF dd
  179. {- |
  180. Very much like Attoparsec's runScanner:
  181. > runScanner scanner initial_state
  182. Recieves data, running the scanner on each byte,
  183. using the scanner result as initial state for the
  184. next byte, and stopping when the scanner returns
  185. Nothing.
  186. Returns the scanned ByteString.
  187. -}
  188. runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
  189. runScanner state scanner = do
  190. (rt, st) <- runScanner' state scanner
  191. return (LBS.toStrict rt, st)
  192. -- | Equivalent to runScanner, but returns a lazy ByteString
  193. runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
  194. runScanner' state scanner = do
  195. (tx, st) <- in_scan state
  196. return (LBS.fromChunks tx, st)
  197. where
  198. --in_scan :: MonadIO m => s -> Streamline m ([ByteString], s)
  199. in_scan st = do
  200. d <- takeBuff
  201. if BS.null d then return ([], st)
  202. else case sscan scanner st 0 $ BS.unpack d of
  203. AllInput st' -> do
  204. (tx', st'') <- in_scan st'
  205. return (d:tx', st'')
  206. SplitAt n st' -> do
  207. let (r, i) = BS.splitAt n d
  208. pushBuff i
  209. return ([r], st')
  210. -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
  211. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
  212. sscan _ s0 _ [] = AllInput s0
  213. sscan s s0 i (w:ww) = case s s0 w of
  214. Finished -> SplitAt i s0
  215. LastPass s1 -> SplitAt (i+1) s1
  216. Running s1 -> sscan s s1 (i+1) ww
  217. data ScanResult s = SplitAt Int s | AllInput s
  218. -- | Equivalent to runScanner, but discards the final state
  219. scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
  220. scan state scanner = fst <$> runScanner state scanner
  221. -- | Equivalent to runScanner', but discards the final state
  222. scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
  223. scan' state scanner = fst <$> runScanner' state scanner
  224. -- | Recieves data untill the next end of line (\n or \r\n)
  225. recieveLine :: MonadIO m => Streamline m ByteString
  226. recieveLine = recieveTill "\n"
  227. -- | Lazy version of recieveLine
  228. recieveLine' :: MonadIO m => Streamline m LBS.ByteString
  229. recieveLine' = recieveTill' "\n"
  230. {- |
  231. Recieves the given number of bytes, or less in case of end of file.
  232. -}
  233. recieveN :: MonadIO m => Int -> Streamline m ByteString
  234. recieveN n = LBS.toStrict <$> recieveN' n
  235. -- | Lazy version of recieveN
  236. recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
  237. recieveN' n = LBS.fromChunks <$> recieve n
  238. where
  239. recieve sz
  240. | sz <= 0 = return []
  241. | otherwise = do
  242. d <- takeBuff
  243. if BS.null d then return []
  244. else do
  245. let (h, t) = BS.splitAt sz d
  246. sz' = sz - BS.length h
  247. unless (BS.null t) $ pushBuff t
  248. r <- recieve sz'
  249. return $ h : r
  250. -- | Recieves data until it matches the argument.
  251. -- Returns all of it, including the matching data.
  252. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
  253. recieveTill t = LBS.toStrict <$> recieveTill' t
  254. -- | Lazy version of recieveTill
  255. recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
  256. recieveTill' = recieve . BS.unpack
  257. where
  258. recieve t' = scan' [] (textScanner t')
  259. -- | Wraps the streamlined IO target on TLS, streamlining
  260. -- the new wrapper afterwads.
  261. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  262. startTls st = Streamline $ \cl -> do
  263. ds' <- liftIO $ S.startTls st $ str cl
  264. return ((), cl{str=SomeIO ds'}{buff=""})
  265. -- | Runs an Attoparsec parser over the data read from the
  266. -- streamlined IO target. Returns both the parser
  267. -- result and the string consumed by it.
  268. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  269. runAttoparsecAndReturn p = do
  270. d <- takeBuff
  271. let c = A.parse p d
  272. continueResult c d [d]
  273. where
  274. continueResult c d dd = case c of
  275. A.Fail i _ msg -> do
  276. pushBuff $ BS.concat (reverse dd) `BS.append` i
  277. return (BS.take (BS.length d - BS.length i) d, Left msg)
  278. A.Done i r -> do
  279. pushBuff i
  280. return (BS.concat (reverse dd) `BS.append`
  281. BS.take (BS.length d - BS.length i) d,
  282. Right r)
  283. A.Partial c' -> do
  284. dt <- takeBuff
  285. continueResult (c' dt) dt $ dt:dd
  286. -- | Runs an Attoparsec parser over the data read from the
  287. -- streamlined IO target. Returning the parser result.
  288. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  289. runAttoparsec p = snd <$> runAttoparsecAndReturn p
  290. -- | Indicates whether transport layer security is being used.
  291. isSecure :: Monad m => Streamline m Bool
  292. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  293. -- | Sets echo of the streamlines IO target.
  294. -- If echo is set, all the data read an written to the target
  295. -- will be echoed in stdout, with ">" and "<" markers indicating
  296. -- what is read and written.
  297. setEcho :: Monad m => Bool -> Streamline m ()
  298. setEcho e = Streamline $ \cl ->
  299. if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing})
  300. {- |
  301. Replaces the enclosed target with the result of the given transformation.
  302. Discards all buffered data in the process.
  303. -}
  304. transformTarget :: (UniformIO a, Monad m) => (SomeIO -> a) -> Streamline m ()
  305. transformTarget w = Streamline $ \cl -> return ((), cl{str = SomeIO . w . str $ cl})
  306. {- |
  307. Limits the input to the given number of bytes, emulating an end of file after them.
  308. If the limit is negative, the input will not be limited.
  309. -}
  310. limitInput :: Monad m => Int -> Streamline m ()
  311. limitInput n = Streamline $ \cl -> return ((), cl{inLimit = n})
  312. {- |
  313. Sets echo of the streamlined IO target.
  314. If echo is set, all the data read an written to the target
  315. will be echoed to the handle, with ">" and "<" markers indicating
  316. what is read and written.
  317. Setting to Nothing will disable echo.
  318. -}
  319. echoTo :: Monad m => Maybe Handle -> Streamline m ()
  320. echoTo h = Streamline $ \cl -> return ((), cl{echo=h})
  321. eofError :: MonadIO m => String -> m a
  322. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
  323. instance Interruptible Streamline where
  324. type RSt Streamline a = (a, StreamlineState)
  325. resume f (a, st) = withTarget' (f a) st
  326. -- | Creates a Streamline interrutible context
  327. inStreamlineCtx :: UniformIO io => io -> a -> RSt Streamline a
  328. inStreamlineCtx io a = (a, def{str = SomeIO io})
  329. -- | Closes the target of a streamline state, releasing any resource.
  330. closeTarget :: MonadIO m => Streamline m ()
  331. closeTarget = Streamline $ \st -> do
  332. liftIO . S.uClose . str $ st
  333. return ((), st)
  334. -- | Removes a Streamline interruptible context
  335. peelStreamlineCtx :: RSt Streamline a -> (a, SomeIO)
  336. peelStreamlineCtx (a, dt) = (a, str dt)
  337. instance MonadTransControl Streamline where
  338. type StT Streamline a = (a, StreamlineState)
  339. liftWith f = Streamline $ \s ->
  340. liftM (\x -> (x, s))
  341. (f $ \t -> withTarget' t s)
  342. restoreT = Streamline . const
  343. instance MonadBase b m => MonadBase b (Streamline m) where
  344. liftBase = liftBaseDefault
  345. instance MonadBaseControl b m => MonadBaseControl b (Streamline m) where
  346. type StM (Streamline m) a = ComposeSt Streamline m a
  347. liftBaseWith = defaultLiftBaseWith
  348. restoreM = defaultRestoreM