Streamline.hs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. {-# LANGUAGE OverloadedStrings #-}
  2. {- |
  3. Streamline exports a monad that, given an uniform IO target, emulates
  4. character stream IO using high performance block IO.
  5. -}
  6. module System.IO.Uniform.Streamline (
  7. -- * Basic Type
  8. Streamline,
  9. -- * Running streamline targets
  10. -- ** Single pass runners
  11. withClient,
  12. withServer,
  13. withTarget,
  14. -- ** Several pass runner
  15. StreamlineState,
  16. streamline,
  17. resume,
  18. close,
  19. remaining,
  20. -- * Sending and recieving data
  21. send,
  22. send',
  23. recieveLine,
  24. recieveLine',
  25. recieveN,
  26. recieveN',
  27. -- ** Running a parser
  28. runAttoparsec,
  29. runAttoparsecAndReturn,
  30. -- ** Scanning the input
  31. runScanner,
  32. runScanner',
  33. scan,
  34. scan',
  35. recieveTill,
  36. recieveTill',
  37. -- * Behavior settings
  38. startTls,
  39. isSecure,
  40. setTimeout,
  41. echoTo,
  42. setEcho
  43. ) where
  44. import System.IO (stdout, Handle)
  45. import qualified System.IO.Uniform as S
  46. import qualified System.IO.Uniform.Network as N
  47. import qualified System.IO.Uniform.Std as Std
  48. import System.IO.Uniform (UniformIO, SomeIO(..), TlsSettings)
  49. import System.IO.Uniform.Streamline.Scanner
  50. import Data.Default.Class
  51. import Control.Monad.Trans.Class
  52. import Control.Monad (ap)
  53. import Control.Monad.IO.Class
  54. import System.IO.Error
  55. import Data.ByteString (ByteString)
  56. import qualified Data.ByteString as BS
  57. import qualified Data.ByteString.Lazy as LBS
  58. import Data.Word8 (Word8)
  59. import Data.IP (IP)
  60. import qualified Data.Attoparsec.ByteString as A
  61. -- | Internal state for a Streamline monad
  62. data StreamlineState = StreamlineState {str :: SomeIO, timeout :: Int, buff :: ByteString, isEOF :: Bool, echo :: Maybe Handle}
  63. instance Default StreamlineState where
  64. -- | Will open StdIO
  65. def = StreamlineState (SomeIO Std.StdIO) defaultTimeout BS.empty False Nothing
  66. -- | Monad that emulates character stream IO over block IO.
  67. newtype Streamline m a = Streamline {withTarget' :: StreamlineState -> m (a, StreamlineState)}
  68. blockSize :: Int
  69. blockSize = 4096
  70. defaultTimeout :: Int
  71. defaultTimeout = 1000000 * 600
  72. readF :: MonadIO m => StreamlineState -> m ByteString
  73. readF cl = case echo cl of
  74. Just h -> do
  75. l <- liftIO $ S.uRead (str cl) blockSize
  76. liftIO $ BS.hPutStr h "<"
  77. liftIO $ BS.hPutStr h l
  78. return l
  79. Nothing -> liftIO $ S.uRead (str cl) blockSize
  80. writeF :: MonadIO m => StreamlineState -> ByteString -> m ()
  81. writeF cl l = case echo cl of
  82. Just h -> do
  83. liftIO $ BS.hPutStr h ">"
  84. liftIO $ BS.hPutStr h l
  85. liftIO $ S.uPut (str cl) l
  86. Nothing -> liftIO $ S.uPut (str cl) l
  87. -- | withServer f serverIP port
  88. --
  89. -- Connects to the given server port, runs f, and closes the connection.
  90. withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
  91. withServer host port f = do
  92. ds <- liftIO $ N.connectTo host port
  93. (ret, _) <- withTarget' f def{str=SomeIO ds}
  94. liftIO $ S.uClose ds
  95. return ret
  96. -- | withClient f boundPort
  97. --
  98. -- Accepts a connection at the bound port, runs f and closes the connection.
  99. withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
  100. withClient port f = do
  101. ds <- liftIO $ N.accept port
  102. (peerIp, peerPort) <- liftIO $ N.getPeer ds
  103. (ret, _) <- withTarget' (f peerIp peerPort) def{str=SomeIO ds}
  104. liftIO $ S.uClose ds
  105. return ret
  106. {- |
  107. withTarget f someIO
  108. Runs f wrapped on a Streamline monad that does IO on someIO.
  109. -}
  110. withTarget :: (Monad m, UniformIO a) => a -> Streamline m b -> m b
  111. withTarget s f = do
  112. (r, _) <- withTarget' f def{str=SomeIO s}
  113. return r
  114. {- |
  115. Run f wrapped on a Streamline monad, returning the final state in a way that
  116. can be continued with "resume".
  117. If run with this function, the state must be closed, explicitly with "close" or
  118. implicitly with "remaining".
  119. -}
  120. streamline :: (Monad m, UniformIO a) => a -> Streamline m b -> m (b, StreamlineState)
  121. streamline s f = withTarget' f def{str=SomeIO s}
  122. {- |
  123. Continues the execution of functions on a Streamline monad comming from
  124. "start" or another "resume" call.
  125. -}
  126. resume :: Monad m => StreamlineState -> Streamline m b -> m (b, StreamlineState)
  127. resume dt f = withTarget' f dt
  128. instance Monad m => Monad (Streamline m) where
  129. --return :: (Monad m) => a -> Streamline m a
  130. return x = Streamline $ \cl -> return (x, cl)
  131. --(>>=) :: Monad m => Streamline m a -> (a -> Streamline m b) -> Streamline m b
  132. a >>= b = Streamline $ \cl -> do
  133. (x, cl') <- withTarget' a cl
  134. withTarget' (b x) cl'
  135. instance Monad m => Functor (Streamline m) where
  136. --fmap :: (a -> b) -> Streamline m a -> Streamline m b
  137. fmap f m = Streamline $ \cl -> do
  138. (x, cl') <- withTarget' m cl
  139. return (f x, cl')
  140. instance (Functor m, Monad m) => Applicative (Streamline m) where
  141. pure = return
  142. (<*>) = ap
  143. instance MonadTrans Streamline where
  144. --lift :: Monad m => m a -> Streamline m a
  145. lift x = Streamline $ \cl -> do
  146. a <- x
  147. return (a, cl)
  148. instance MonadIO m => MonadIO (Streamline m) where
  149. liftIO = lift . liftIO
  150. -- | Sends data over the IO target.
  151. send :: MonadIO m => ByteString -> Streamline m ()
  152. send r = Streamline $ \cl -> do
  153. writeF cl r
  154. return ((), cl)
  155. -- | Sends data from a lazy byte string
  156. send' :: MonadIO m => LBS.ByteString -> Streamline m ()
  157. send' r = Streamline $ \cl -> do
  158. let dd = LBS.toChunks r
  159. mapM_ (writeF cl) dd
  160. return ((), cl)
  161. {- |
  162. Very much like Attoparsec's runScanner:
  163. runScanner scanner initial_state
  164. Recieves data, running the scanner on each byte,
  165. using the scanner result as initial state for the
  166. next byte, and stopping when the scanner returns
  167. Nothing.
  168. Returns the scanned ByteString.
  169. -}
  170. runScanner :: MonadIO m => s -> IOScanner s -> Streamline m (ByteString, s)
  171. runScanner state scanner = do
  172. (rt, st) <- runScanner' state scanner
  173. return (LBS.toStrict rt, st)
  174. -- | Equivalent to runScanner, but returns a strict, completely
  175. -- evaluated ByteString.
  176. runScanner' :: MonadIO m => s -> IOScanner s -> Streamline m (LBS.ByteString, s)
  177. runScanner' state scanner = Streamline $ \d ->
  178. do
  179. (tx, st, d') <- in_scan d state
  180. return ((LBS.fromChunks tx, st), d')
  181. where
  182. --in_scan :: StreamlineState -> s -> m ([ByteString], s, StreamlineState)
  183. in_scan d st
  184. | isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
  185. | BS.null (buff d) = do
  186. dt <- readF d
  187. if BS.null dt
  188. then return ([], st, d{isEOF=True})
  189. else in_scan d{buff=dt} st
  190. | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of
  191. AllInput st' -> do
  192. (tx', st'', d') <- in_scan d{buff=""} st'
  193. return (buff d:tx', st'', d')
  194. SplitAt n st' -> let
  195. (r, i) = BS.splitAt n (buff d)
  196. in return ([r], st', d{buff=i})
  197. -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
  198. sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
  199. sscan _ s0 _ [] = AllInput s0
  200. sscan s s0 i (w:ww) = case s s0 w of
  201. Finished -> SplitAt i s0
  202. LastPass s1 -> SplitAt (i+1) s1
  203. Running s1 -> sscan s s1 (i+1) ww
  204. data ScanResult s = SplitAt Int s | AllInput s
  205. -- | Equivalent to runScanner, but discards the final state
  206. scan :: MonadIO m => s -> IOScanner s -> Streamline m ByteString
  207. scan state scanner = fst <$> runScanner state scanner
  208. -- | Equivalent to runScanner', but discards the final state
  209. scan' :: MonadIO m => s -> IOScanner s -> Streamline m LBS.ByteString
  210. scan' state scanner = fst <$> runScanner' state scanner
  211. -- | Recieves data untill the next end of line (\n or \r\n)
  212. recieveLine :: MonadIO m => Streamline m ByteString
  213. recieveLine = recieveTill "\n"
  214. -- | Lazy version of recieveLine
  215. recieveLine' :: MonadIO m => Streamline m LBS.ByteString
  216. recieveLine' = recieveTill' "\n"
  217. -- | Recieves the given number of bytes.
  218. recieveN :: MonadIO m => Int -> Streamline m ByteString
  219. recieveN n = LBS.toStrict <$> recieveN' n
  220. -- | Lazy version of recieveN
  221. recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
  222. recieveN' n | n <= 0 = return ""
  223. | otherwise = Streamline $ \cl ->
  224. do
  225. (tt, cl') <- recieve cl n
  226. return (LBS.fromChunks tt, cl')
  227. where
  228. recieve d b
  229. | isEOF d = eofError "System.IO.Uniform.Streamline.recieveN"
  230. | BS.null . buff $ d = do
  231. dt <- readF d
  232. recieve d{buff=dt}{isEOF=BS.null dt} b
  233. | b <= (BS.length . buff $ d) = let
  234. (r, dt) = BS.splitAt b $ buff d
  235. in return ([r], d{buff=dt})
  236. | otherwise = do
  237. (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d)
  238. return (buff d : r, d')
  239. -- | Recieves data until it matches the argument.
  240. -- Returns all of it, including the matching data.
  241. recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
  242. recieveTill t = LBS.toStrict <$> recieveTill' t
  243. -- | Lazy version of recieveTill
  244. recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
  245. recieveTill' = recieve . BS.unpack
  246. where
  247. recieve t' = scan' [] (textScanner t')
  248. -- | Wraps the streamlined IO target on TLS, streamlining
  249. -- the new wrapper afterwads.
  250. startTls :: MonadIO m => TlsSettings -> Streamline m ()
  251. startTls st = Streamline $ \cl -> do
  252. ds' <- liftIO $ S.startTls st $ str cl
  253. return ((), cl{str=SomeIO ds'}{buff=""})
  254. -- | Runs an Attoparsec parser over the data read from the
  255. -- streamlined IO target. Returns both the parser
  256. -- result and the string consumed by it.
  257. runAttoparsecAndReturn :: MonadIO m => A.Parser a -> Streamline m (ByteString, Either String a)
  258. runAttoparsecAndReturn p = Streamline $ \cl ->
  259. if isEOF cl
  260. then eofError "System.IO.Uniform.Streamline.runAttoparsecAndReturn"
  261. else do
  262. let c = A.parse p $ buff cl
  263. (cl', i, a) <- liftIO $ continueResult cl c
  264. return ((i, a), cl')
  265. where
  266. continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, ByteString, Either String a)
  267. -- tx eof ds
  268. continueResult cl c = case c of
  269. A.Fail i _ msg -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Left msg)
  270. A.Done i r -> return (cl{buff=i}, BS.take (BS.length (buff cl) - BS.length i) (buff cl), Right r)
  271. A.Partial c' -> do
  272. d <- readF cl
  273. let cl' = cl{buff=BS.append (buff cl) d}{isEOF=BS.null d}
  274. continueResult cl' (c' d)
  275. -- | Runs an Attoparsec parser over the data read from the
  276. -- streamlined IO target. Returning the parser result.
  277. runAttoparsec :: MonadIO m => A.Parser a -> Streamline m (Either String a)
  278. runAttoparsec p = Streamline $ \cl ->
  279. if isEOF cl
  280. then eofError "System.IO.Uniform.Streamline.runAttoparsec"
  281. else do
  282. let c = A.parse p $ buff cl
  283. (cl', a) <- liftIO $ continueResult cl c
  284. return (a, cl')
  285. where
  286. continueResult :: StreamlineState -> A.Result a -> IO (StreamlineState, Either String a)
  287. continueResult cl c = case c of
  288. A.Fail i _ msg -> return (cl{buff=i}, Left msg)
  289. A.Done i r -> return (cl{buff=i}, Right r)
  290. A.Partial c' -> do
  291. d <- readF cl
  292. let eof' = BS.null d
  293. continueResult cl{buff=d}{isEOF=eof'} (c' d)
  294. -- | Indicates whether transport layer security is being used.
  295. isSecure :: Monad m => Streamline m Bool
  296. isSecure = Streamline $ \cl -> return (S.isSecure $ str cl, cl)
  297. -- | Sets the timeout for the streamlined IO target.
  298. setTimeout :: Monad m => Int -> Streamline m ()
  299. setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
  300. -- | Sets echo of the streamlines IO target.
  301. -- If echo is set, all the data read an written to the target
  302. -- will be echoed in stdout, with ">" and "<" markers indicating
  303. -- what is read and written.
  304. setEcho :: Monad m => Bool -> Streamline m ()
  305. setEcho e = Streamline $ \cl ->
  306. if e then return ((), cl{echo=Just stdout}) else return ((), cl{echo=Nothing})
  307. {- |
  308. Sets echo of the streamlined IO target.
  309. If echo is set, all the data read an written to the target
  310. will be echoed to the handle, with ">" and "<" markers indicating
  311. what is read and written.
  312. Setting to Nothing will disable echo.
  313. -}
  314. echoTo :: Monad m => Maybe Handle -> Streamline m ()
  315. echoTo h = Streamline $ \cl -> return ((), cl{echo=h})
  316. eofError :: MonadIO m => String -> m a
  317. eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
  318. {- |
  319. Closes the target of a streamline state, releasing any used resource.
  320. -}
  321. close :: MonadIO m => StreamlineState -> m ()
  322. close = liftIO . S.uClose . str
  323. {- |
  324. Retrieves the remaining contents of a streamline state, closing it afterwards.
  325. -}
  326. remaining :: MonadIO m => StreamlineState -> m LBS.ByteString
  327. remaining st
  328. | isEOF st = close st >> return LBS.empty
  329. | BS.null . buff $ st = do
  330. dt <- readF st
  331. remaining st{buff=dt}{isEOF=BS.null dt}
  332. | otherwise = do
  333. dt' <- remaining st{buff=BS.empty}
  334. return $ LBS.append (LBS.fromStrict . buff $ st) dt'