Streamline.hs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. {-# LANGUAGE OverloadedStrings, TypeFamilies, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
  2. {- |
  3. Streamline exports a monad that, given an uniform IO target, emulates
  4. character stream IO using high performance block IO.
  5. -}
  6. module System.IO.Uniform.Streamline (
  7. -- * Basic Type
  8. Streamline,
  9. -- * Running streamline targets
  10. -- ** Single pass runners
  11. withClient,
  12. withServer,
  13. withTarget,
  14. -- ** Interruptible support
  15. inStreamlineCtx,
  16. peelStreamlineCtx,
  17. closeTarget,
  18. -- * Sending and recieving data
  19. send,
  20. send',
  21. recieveLine,
  22. recieveLine',
  23. recieveN,
  24. recieveN',
  25. -- ** Running a parser
  26. runAttoparsec,
  27. runAttoparsecAndReturn,
  28. -- ** Scanning the input
  29. runScanner,
  30. runScanner',
  31. scan,
  32. scan',
  33. recieveTill,
  34. recieveTill',
  35. -- * Behavior changing and status
  36. startTls,
  37. isSecure,
  38. isEOF,
  39. transformTarget,
  40. limitInput,
  41. echoTo,
  42. setEcho
  43. ) where
  44. import System.IO (stdout, Handle)
  45. import qualified System.IO.Uniform as S
  46. import qualified System.IO.Uniform.Network as N
  47. import qualified System.IO.Uniform.Std as Std
  48. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  49. import System.IO.Uniform.Streamline.Scanner
  50. import Data.Default.Class
  51. import Control.Monad.Trans.Class
  52. import Control.Monad.Trans.Interruptible
  53. import Control.Monad.Trans.Control
  54. import Control.Monad
  55. import Control.Monad.Base
  56. import Control.Monad.IO.Class
  57. import System.IO.Error
  58. import Data.ByteString (ByteString)
  59. import qualified Data.ByteString as BS
  60. import qualified Data.ByteString.Lazy as LBS
  61. import Data.Word8 (Word8)
  62. import Data.IP (IP)
  63. import qualified Data.Attoparsec.ByteString as A
  64. import Debug.Trace
  65. -- | Internal state for a Streamline monad
  66. data StreamlineState = StreamlineState {str :: SomeIO, buff :: ByteString, targetEOF :: Bool, echo :: Maybe Handle, inLimit :: Int}
  67. instance Default StreamlineState where
  68. -- | Will open StdIO
  69. def = StreamlineState (SomeIO Std.StdIO) BS.empty False Nothing (-1)
  70. -- | Monad that emulates character stream IO over block IO.
  71. newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)}
  72. blockSize :: Int
  73. blockSize = 4096
  74. readF :: MonadIO m => Streamline m ()
  75. readF = -- Must try just not to read more than the limit, actual limiting is done by takeBuff
  76. Streamline $ \cl -> if not . BS.null . buff $ cl then return ((), cl)
  77. else do
  78. let lim = inLimit cl
  79. sz = if lim < 0 then blockSize
  80. else if lim <= blockSize then lim
  81. else blockSize
  82. l <- liftIO $ S.uRead (str cl) sz
  83. let cl' = cl{
  84. buff = l,
  85. targetEOF = BS.null l
  86. }
  87. case echo cl of
  88. Just h -> do
  89. liftIO $ BS.hPutStr h "< "
  90. liftIO $ BS.hPutStr h l
  91. Nothing -> return ()
  92. return ((), cl')
  93. -- | Takes the buffer for processing
  94. takeBuff :: MonadIO m => Streamline m ByteString
  95. takeBuff = do
  96. readF
  97. Streamline $ \cl ->
  98. let lim = inLimit cl
  99. eof = targetEOF cl
  100. b = buff cl
  101. in if lim < 0 then return (b, cl{buff=""})
  102. else let (r, b') = BS.splitAt lim b
  103. in return (r, cl{
  104. buff = b',
  105. inLimit = lim - BS.length r
  106. })
  107. -- | Pushes remaining data back into the buffer
  108. pushBuff :: Monad m => ByteString -> Streamline m ()
  109. pushBuff dt = Streamline $ \cl -> let
  110. lim = inLimit cl
  111. b = buff cl
  112. newb = BS.append dt b
  113. newl = if lim < 0 then lim else lim + BS.length dt
  114. in return ((), cl{buff=newb, inLimit=newl})
  115. writeF :: MonadIO m => ByteString -> Streamline m ()
  116. writeF l = Streamline $ \cl -> case echo cl of
  117. Just h -> do
  118. liftIO $ BS.hPutStr h "> "
  119. liftIO $ BS.hPutStr h l
  120. liftIO $ S.uPut (str cl) l
  121. return ((), cl)
  122. Nothing -> liftIO $ S.uPut (str cl) l >> return ((), cl)
  123. -- | > withServer f serverIP port
  124. --
  125. -- Connects to the given server port, runs f, and closes the connection.
  126. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
  127. withServer host port f = do
  128. ds <- liftIO $ N.connectTo host port
  129. (ret, _) <- withTarget' f def{str=SomeIO ds}
  130. liftIO $ S.uClose ds
  131. return ret
  132. -- | > withClient f boundPort
  133. --
  134. -- Accepts a connection at the bound port, runs f and closes the connection.
  135. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
  136. withClient port f = do
  137. ds <- liftIO $ N.accept port
  138. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  139. (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds}
  140. liftIO $ S.uClose ds
  141. return ret
  142. {- |
  143. > withTarget f someIO
  144. Runs f wrapped on a Streamline monad that does IO on someIO.
  145. -}
  146. withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b
  147. withTarget s f = do
  148. (r, _) <- withTarget' f def{str=SomeIO s}
  149. return r
  150. instance Monad m => Monad (Streamline m) where
  151. --return :: (Monad m) => a -> Streamline m a
  152. return x = Streamline $ \cl -> return (x, cl)
  153. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  154. a >>= b = Streamline $ \cl -> do
  155. (x, cl') <- withTarget' a cl
  156. withTarget' (b x) cl'
  157. instance Monad m => Functor (Streamline m) where
  158. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  159. fmap f m = Streamline $ \cl -> do
  160. (x, cl') <- withTarget' m cl
  161. return (f x, cl')
  162. instance Monad m => Applicative (Streamline m) where
  163. pure = return
  164. (<*>) = ap
  165. instance MonadTrans Streamline where
  166. --lift :: Monad m => m a -> Streamline m a
  167. lift x = Streamline $ \cl -> do
  168. a <- x
  169. return (a, cl)
  170. instance MonadIO m => MonadIO (Streamline m) where
  171. liftIO = lift . liftIO
  172. -- | Sends data over the IO target.
  173. send :: MonadIO m => ByteString -> Streamline m ()
  174. send r = writeF r
  175. -- | Sends data from a lazy byte string
  176. send' :: MonadIO m => LBS.ByteString -> Streamline m ()
  177. send' r = do
  178. let dd = LBS.toChunks r
  179. mapM_ writeF dd
  180. {- |
  181. Very much like Attoparsec's runScanner:
  182. > runScanner scanner initial_state
  183. Recieves data, running the scanner on each byte,
  184. using the scanner result as initial state for the
  185. next byte, and stopping when the scanner returns
  186. Nothing.
  187. Returns the scanned ByteString.
  188. -}
  189. runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
  190. runScanner state scanner = do
  191. (rt, st) <- runScanner' state scanner
  192. return (LBS.toStrict rt, st)
  193. -- | Equivalent to runScanner, but returns a lazy ByteString
  194. runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
  195. runScanner' state scanner = do
  196. (tx, st) <- in_scan state
  197. return (LBS.fromChunks tx, st)
  198. where
  199. --in_scan :: MonadIO m => s -> Streamline m ([ByteString], s)
  200. in_scan st = do
  201. d <- takeBuff
  202. if BS.null d then return ([], st)
  203. else case sscan scanner st 0 $ BS.unpack d of
  204. AllInput st' -> do
  205. (tx', st'') <- in_scan st'
  206. return (d:tx', st'')
  207. SplitAt n st' -> do
  208. let (r, i) = BS.splitAt n d
  209. pushBuff i
  210. return ([r], st')
  211. -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
  212. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
  213. sscan _ s0 _ [] = AllInput s0
  214. sscan s s0 i (w:ww) = case s s0 w of
  215. Finished -> SplitAt i s0
  216. LastPass s1 -> SplitAt (i+1) s1
  217. Running s1 -> sscan s s1 (i+1) ww
  218. data ScanResult s = SplitAt Int s | AllInput s
  219. -- | Equivalent to runScanner, but discards the final state
  220. scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
  221. scan state scanner = fst <$> runScanner state scanner
  222. -- | Equivalent to runScanner', but discards the final state
  223. scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
  224. scan' state scanner = fst <$> runScanner' state scanner
  225. -- | Recieves data untill the next end of line (\n or \r\n)
  226. recieveLine :: MonadIO m => Streamline m ByteString
  227. recieveLine = recieveTill "\n"
  228. -- | Lazy version of recieveLine
  229. recieveLine' :: MonadIO m => Streamline m LBS.ByteString
  230. recieveLine' = recieveTill' "\n"
  231. {- |
  232. Recieves the given number of bytes, or less in case of end of file.
  233. -}
  234. recieveN :: MonadIO m => Int -> Streamline m ByteString
  235. recieveN n = LBS.toStrict <$> recieveN' n
  236. -- | Lazy version of recieveN
  237. recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
  238. recieveN' n = LBS.fromChunks <$> recieve n
  239. where
  240. recieve sz
  241. | sz <= 0 = return []
  242. | otherwise = do
  243. d <- takeBuff
  244. if BS.null d then return []
  245. else do
  246. let (h, t) = BS.splitAt sz d
  247. sz' = sz - BS.length h
  248. unless (BS.null t) $ pushBuff t
  249. r <- recieve sz'
  250. return $ h : r
  251. -- | Recieves data until it matches the argument.
  252. -- Returns all of it, including the matching data.
  253. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
  254. recieveTill t = LBS.toStrict <$> recieveTill' t
  255. -- | Lazy version of recieveTill
  256. recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
  257. recieveTill' = recieve . BS.unpack
  258. where
  259. recieve t' = scan' [] (textScanner t')
  260. -- | Wraps the streamlined IO target on TLS, streamlining
  261. -- the new wrapper afterwads.
  262. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  263. startTls st = Streamline $ \cl -> do
  264. ds' <- liftIO $ S.startTls st $ str cl
  265. return ((), cl{str=SomeIO ds'}{buff=""})
  266. -- | Runs an Attoparsec parser over the data read from the
  267. -- streamlined IO target. Returns both the parser
  268. -- result and the string consumed by it.
  269. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  270. runAttoparsecAndReturn p = do
  271. d <- takeBuff
  272. let c = A.parse p d
  273. continueResult c d [d]
  274. where
  275. continueResult c d dd = case c of
  276. A.Fail i _ msg -> do
  277. pushBuff $ BS.concat (reverse dd) `BS.append` i
  278. return (BS.take (BS.length d - BS.length i) d, Left msg)
  279. A.Done i r -> do
  280. pushBuff i
  281. return (BS.concat (reverse dd) `BS.append`
  282. BS.take (BS.length d - BS.length i) d,
  283. Right r)
  284. A.Partial c' -> do
  285. dt <- takeBuff
  286. continueResult (c' dt) dt $ dt:dd
  287. -- | Runs an Attoparsec parser over the data read from the
  288. -- streamlined IO target. Returning the parser result.
  289. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  290. runAttoparsec p = snd <$> runAttoparsecAndReturn p
  291. -- | Indicates whether transport layer security is being used.
  292. isSecure :: Monad m => Streamline m Bool
  293. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  294. {- |
  295. True if the input is at the end of stream.
  296. If the input is limited, will be true when either the limit is reached,
  297. or the underlining target was at actual end of stream. In case of EOF
  298. due to reaching the limit, changing the limit will immediately make this
  299. return False again.
  300. -}
  301. isEOF :: Monad m => Streamline m Bool
  302. isEOF = Streamline $ \cl -> return (targetEOF cl || inLimit cl == 0, cl)
  303. -- | Sets echo of the streamlines IO target.
  304. -- If echo is set, all the data read an written to the target
  305. -- will be echoed in stdout, with ">" and "<" markers indicating
  306. -- what is read and written.
  307. setEcho :: Monad m => Bool -> Streamline m ()
  308. setEcho e = Streamline $ \cl ->
  309. if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing})
  310. {- |
  311. Replaces the enclosed target with the result of the given transformation.
  312. Discards all buffered data in the process.
  313. -}
  314. transformTarget :: (UniformIO a, Monad m) => (SomeIO -> a) -> Streamline m ()
  315. transformTarget w = Streamline $ \cl -> return ((), cl{str = SomeIO . w . str $ cl})
  316. {- |
  317. Limits the input to the given number of bytes, emulating an end of file after them.
  318. If the limit is negative, the input will not be limited.
  319. -}
  320. limitInput :: Monad m => Int -> Streamline m ()
  321. limitInput n = Streamline $ \cl -> return ((), cl{inLimit = n})
  322. {- |
  323. Sets echo of the streamlined IO target.
  324. If echo is set, all the data read an written to the target
  325. will be echoed to the handle, with ">" and "<" markers indicating
  326. what is read and written.
  327. Setting to Nothing will disable echo.
  328. -}
  329. echoTo :: Monad m => Maybe Handle -> Streamline m ()
  330. echoTo h = Streamline $ \cl -> return ((), cl{echo=h})
  331. eofError :: MonadIO m => String -> m a
  332. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
  333. instance Interruptible Streamline where
  334. type RSt Streamline a = (a, StreamlineState)
  335. resume f (a, st) = withTarget' (f a) st
  336. -- | Creates a Streamline interrutible context
  337. inStreamlineCtx :: UniformIO io => io -> a -> RSt Streamline a
  338. inStreamlineCtx io a = (a, def{str = SomeIO io})
  339. -- | Closes the target of a streamline state, releasing any resource.
  340. closeTarget :: MonadIO m => Streamline m ()
  341. closeTarget = Streamline $ \st -> do
  342. liftIO . S.uClose . str $ st
  343. return ((), st)
  344. -- | Removes a Streamline interruptible context
  345. peelStreamlineCtx :: RSt Streamline a -> (a, SomeIO)
  346. peelStreamlineCtx (a, dt) = (a, str dt)
  347. instance MonadTransControl Streamline where
  348. type StT Streamline a = (a, StreamlineState)
  349. liftWith f = Streamline $ \s ->
  350. liftM (\x -> (x, s))
  351. (f $ \t -> withTarget' t s)
  352. restoreT = Streamline . const
  353. instance MonadBase b m => MonadBase b (Streamline m) where
  354. liftBase = liftBaseDefault
  355. instance MonadBaseControl b m => MonadBaseControl b (Streamline m) where
  356. type StM (Streamline m) a = ComposeSt Streamline m a
  357. liftBaseWith = defaultLiftBaseWith
  358. restoreM = defaultRestoreM