Streamline.hs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. {-# LANGUAGE OverloadedStrings, TypeFamilies, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
  2. {- |
  3. Streamline exports a monad that, given an uniform IO target, emulates
  4. character stream IO using high performance block IO.
  5. -}
  6. module System.IO.Uniform.Streamline (
  7. -- * Basic Type
  8. Streamline,
  9. -- * Running streamline targets
  10. -- ** Single pass runners
  11. withClient,
  12. withServer,
  13. withTarget,
  14. -- ** Interruptible support
  15. inStreamlineCtx,
  16. peelStreamlineCtx,
  17. closeTarget,
  18. -- * Sending and recieving data
  19. send,
  20. send',
  21. recieveLine,
  22. recieveLine',
  23. recieveN,
  24. recieveN',
  25. -- ** Running a parser
  26. runAttoparsec,
  27. runAttoparsecAndReturn,
  28. -- ** Scanning the input
  29. runScanner,
  30. runScanner',
  31. scan,
  32. scan',
  33. recieveTill,
  34. recieveTill',
  35. -- * Behavior settings
  36. startTls,
  37. isSecure,
  38. transformTarget,
  39. echoTo,
  40. setEcho
  41. ) where
  42. import System.IO (stdout, Handle)
  43. import qualified System.IO.Uniform as S
  44. import qualified System.IO.Uniform.Network as N
  45. import qualified System.IO.Uniform.Std as Std
  46. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  47. import System.IO.Uniform.Streamline.Scanner
  48. import Data.Default.Class
  49. import Control.Monad.Trans.Class
  50. import Control.Monad.Trans.Interruptible
  51. import Control.Monad.Trans.Control
  52. import Control.Monad (ap, liftM)
  53. import Control.Monad.Base
  54. import Control.Monad.IO.Class
  55. import System.IO.Error
  56. import Data.ByteString (ByteString)
  57. import qualified Data.ByteString as BS
  58. import qualified Data.ByteString.Lazy as LBS
  59. import Data.Word8 (Word8)
  60. import Data.IP (IP)
  61. import qualified Data.Attoparsec.ByteString as A
  62. -- | Internal state for a Streamline monad
  63. data StreamlineState = StreamlineState {str :: SomeIO, buff :: ByteString, isEOF :: Bool, echo :: Maybe Handle}
  64. instance Default StreamlineState where
  65. -- | Will open StdIO
  66. def = StreamlineState (SomeIO Std.StdIO) BS.empty False Nothing
  67. -- | Monad that emulates character stream IO over block IO.
  68. newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)}
  69. blockSize :: Int
  70. blockSize = 4096
  71. readF :: MonadIO m => StreamlineState -> m ByteString
  72. readF cl = case echo cl of
  73. Just h -> do
  74. l <- liftIO $ S.uRead (str cl) blockSize
  75. liftIO $ BS.hPutStr h "< "
  76. liftIO $ BS.hPutStr h l
  77. return l
  78. Nothing -> liftIO $ S.uRead (str cl) blockSize
  79. writeF :: MonadIO m => StreamlineState -> ByteString -> m ()
  80. writeF cl l = case echo cl of
  81. Just h -> do
  82. liftIO $ BS.hPutStr h "> "
  83. liftIO $ BS.hPutStr h l
  84. liftIO $ S.uPut (str cl) l
  85. Nothing -> liftIO $ S.uPut (str cl) l
  86. -- | > withServer f serverIP port
  87. --
  88. -- Connects to the given server port, runs f, and closes the connection.
  89. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
  90. withServer host port f = do
  91. ds <- liftIO $ N.connectTo host port
  92. (ret, _) <- withTarget' f def{str=SomeIO ds}
  93. liftIO $ S.uClose ds
  94. return ret
  95. -- | > withClient f boundPort
  96. --
  97. -- Accepts a connection at the bound port, runs f and closes the connection.
  98. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
  99. withClient port f = do
  100. ds <- liftIO $ N.accept port
  101. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  102. (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds}
  103. liftIO $ S.uClose ds
  104. return ret
  105. {- |
  106. > withTarget f someIO
  107. Runs f wrapped on a Streamline monad that does IO on someIO.
  108. -}
  109. withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b
  110. withTarget s f = do
  111. (r, _) <- withTarget' f def{str=SomeIO s}
  112. return r
  113. instance Monad m => Monad (Streamline m) where
  114. --return :: (Monad m) => a -> Streamline m a
  115. return x = Streamline $ \cl -> return (x, cl)
  116. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  117. a >>= b = Streamline $ \cl -> do
  118. (x, cl') <- withTarget' a cl
  119. withTarget' (b x) cl'
  120. instance Monad m => Functor (Streamline m) where
  121. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  122. fmap f m = Streamline $ \cl -> do
  123. (x, cl') <- withTarget' m cl
  124. return (f x, cl')
  125. instance (Functor m, Monad m) => Applicative (Streamline m) where
  126. pure = return
  127. (<*>) = ap
  128. instance MonadTrans Streamline where
  129. --lift :: Monad m => m a -> Streamline m a
  130. lift x = Streamline $ \cl -> do
  131. a <- x
  132. return (a, cl)
  133. instance MonadIO m => MonadIO (Streamline m) where
  134. liftIO = lift . liftIO
  135. -- | Sends data over the IO target.
  136. send :: MonadIO m => ByteString -> Streamline m ()
  137. send r = Streamline $ \cl -> do
  138. writeF cl r
  139. return ((), cl)
  140. -- | Sends data from a lazy byte string
  141. send' :: MonadIO m => LBS.ByteString -> Streamline m ()
  142. send' r = Streamline $ \cl -> do
  143. let dd = LBS.toChunks r
  144. mapM_ (writeF cl) dd
  145. return ((), cl)
  146. {- |
  147. Very much like Attoparsec's runScanner:
  148. > runScanner scanner initial_state
  149. Recieves data, running the scanner on each byte,
  150. using the scanner result as initial state for the
  151. next byte, and stopping when the scanner returns
  152. Nothing.
  153. Returns the scanned ByteString.
  154. -}
  155. runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
  156. runScanner state scanner = do
  157. (rt, st) <- runScanner' state scanner
  158. return (LBS.toStrict rt, st)
  159. -- | Equivalent to runScanner, but returns a strict, completely
  160. -- evaluated ByteString.
  161. runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
  162. runScanner' state scanner = Streamline $ \d ->
  163. do
  164. (tx, st, d') <- in_scan d state
  165. return ((LBS.fromChunks tx, st), d')
  166. where
  167. --in_scan :: StreamlineState -> s -> m ([ByteString], s, StreamlineState)
  168. in_scan d st
  169. | isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
  170. | BS.null (buff d) = do
  171. dt <- readF d
  172. if BS.null dt
  173. then return ([], st, d{isEOF=True})
  174. else in_scan d{buff=dt} st
  175. | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of
  176. AllInput st' -> do
  177. (tx', st'', d') <- in_scan d{buff=""} st'
  178. return (buff d:tx', st'', d')
  179. SplitAt n st' -> let
  180. (r, i) = BS.splitAt n (buff d)
  181. in return ([r], st', d{buff=i})
  182. -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
  183. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
  184. sscan _ s0 _ [] = AllInput s0
  185. sscan s s0 i (w:ww) = case s s0 w of
  186. Finished -> SplitAt i s0
  187. LastPass s1 -> SplitAt (i+1) s1
  188. Running s1 -> sscan s s1 (i+1) ww
  189. data ScanResult s = SplitAt Int s | AllInput s
  190. -- | Equivalent to runScanner, but discards the final state
  191. scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
  192. scan state scanner = fst <$> runScanner state scanner
  193. -- | Equivalent to runScanner', but discards the final state
  194. scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
  195. scan' state scanner = fst <$> runScanner' state scanner
  196. -- | Recieves data untill the next end of line (\n or \r\n)
  197. recieveLine :: MonadIO m => Streamline m ByteString
  198. recieveLine = recieveTill "\n"
  199. -- | Lazy version of recieveLine
  200. recieveLine' :: MonadIO m => Streamline m LBS.ByteString
  201. recieveLine' = recieveTill' "\n"
  202. -- | Recieves the given number of bytes.
  203. recieveN :: MonadIO m => Int -> Streamline m ByteString
  204. recieveN n = LBS.toStrict <$> recieveN' n
  205. -- | Lazy version of recieveN
  206. recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
  207. recieveN' n | n <= 0 = return ""
  208. | otherwise = Streamline $ \cl ->
  209. do
  210. (tt, cl') <- recieve cl n
  211. return (LBS.fromChunks tt, cl')
  212. where
  213. recieve d b
  214. | isEOF d = eofError "System.IO.Uniform.Streamline.recieveN"
  215. | BS.null . buff $ d = do
  216. dt <- readF d
  217. recieve d{buff=dt}{isEOF=BS.null dt} b
  218. | b <= (BS.length . buff $ d) = let
  219. (r, dt) = BS.splitAt b $ buff d
  220. in return ([r], d{buff=dt})
  221. | otherwise = do
  222. (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d)
  223. return (buff d : r, d')
  224. -- | Recieves data until it matches the argument.
  225. -- Returns all of it, including the matching data.
  226. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
  227. recieveTill t = LBS.toStrict <$> recieveTill' t
  228. -- | Lazy version of recieveTill
  229. recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
  230. recieveTill' = recieve . BS.unpack
  231. where
  232. recieve t' = scan' [] (textScanner t')
  233. -- | Wraps the streamlined IO target on TLS, streamlining
  234. -- the new wrapper afterwads.
  235. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  236. startTls st = Streamline $ \cl -> do
  237. ds' <- liftIO $ S.startTls st $ str cl
  238. return ((), cl{str=SomeIO ds'}{buff=""})
  239. -- | Runs an Attoparsec parser over the data read from the
  240. -- streamlined IO target. Returns both the parser
  241. -- result and the string consumed by it.
  242. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  243. runAttoparsecAndReturn p = Streamline $ \cl ->
  244. if isEOF cl
  245. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  246. else do
  247. let c = A.parse p $ buff cl
  248. (cl', i, a) <- liftIO $ continueResult cl c
  249. return ((i, a), cl')
  250. where
  251. continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, ByteString, Either String a)
  252. -- tx eof ds
  253. continueResult cl c = case c of
  254. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  255. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  256. A.Partial c' -> do
  257. d <- readF cl
  258. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  259. continueResult cl' (c' d)
  260. -- | Runs an Attoparsec parser over the data read from the
  261. -- streamlined IO target. Returning the parser result.
  262. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  263. runAttoparsec p = Streamline $ \cl ->
  264. if isEOF cl
  265. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  266. else do
  267. let c = A.parse p $ buff cl
  268. (cl', a) <- liftIO $ continueResult cl c
  269. return (a, cl')
  270. where
  271. continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, Either String a)
  272. continueResult cl c = case c of
  273. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  274. A.Done i r -> return (cl{buff=i}, Right r)
  275. A.Partial c' -> do
  276. d <- readF cl
  277. let eof' = BS.null d
  278. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  279. -- | Indicates whether transport layer security is being used.
  280. isSecure :: Monad m => Streamline m Bool
  281. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  282. -- | Sets echo of the streamlines IO target.
  283. -- If echo is set, all the data read an written to the target
  284. -- will be echoed in stdout, with ">" and "<" markers indicating
  285. -- what is read and written.
  286. setEcho :: Monad m => Bool -> Streamline m ()
  287. setEcho e = Streamline $ \cl ->
  288. if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing})
  289. {- |
  290. Replaces the enclosed target with the result of the given transformation.
  291. Discards all buffered data in the process.
  292. -}
  293. transformTarget :: (UniformIO a, Monad m) => (SomeIO -> a) -> Streamline m ()
  294. transformTarget w = Streamline $ \cl -> return ((), cl{str = SomeIO . w . str $ cl})
  295. {- |
  296. Sets echo of the streamlined IO target.
  297. If echo is set, all the data read an written to the target
  298. will be echoed to the handle, with ">" and "<" markers indicating
  299. what is read and written.
  300. Setting to Nothing will disable echo.
  301. -}
  302. echoTo :: Monad m => Maybe Handle -> Streamline m ()
  303. echoTo h = Streamline $ \cl -> return ((), cl{echo=h})
  304. eofError :: MonadIO m => String -> m a
  305. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
  306. instance Interruptible Streamline where
  307. type RSt Streamline a = (a, StreamlineState)
  308. resume f (a, st) = withTarget' (f a) st
  309. -- | Creates a Streamline interrutible context
  310. inStreamlineCtx :: UniformIO io => io -> a -> RSt Streamline a
  311. inStreamlineCtx io a = (a, def{str = SomeIO io})
  312. -- | Closes the target of a streamline state, releasing any resource.
  313. closeTarget :: MonadIO m => Streamline m ()
  314. closeTarget = Streamline $ \st -> do
  315. liftIO . S.uClose . str $ st
  316. return ((), st)
  317. -- | Removes a Streamline interruptible context
  318. peelStreamlineCtx :: RSt Streamline a -> (a, SomeIO)
  319. peelStreamlineCtx (a, dt) = (a, str dt)
  320. instance MonadTransControl Streamline where
  321. type StT Streamline a = (a, StreamlineState)
  322. liftWith f = Streamline $ \s ->
  323. liftM (\x -> (x, s))
  324. (f $ \t -> withTarget' t s)
  325. restoreT = Streamline . const
  326. instance MonadBase b m => MonadBase b (Streamline m) where
  327. liftBase = liftBaseDefault
  328. instance MonadBaseControl b m => MonadBaseControl b (Streamline m) where
  329. type StM (Streamline m) a = ComposeSt Streamline m a
  330. liftBaseWith = defaultLiftBaseWith
  331. restoreM = defaultRestoreM