Browse Source

Added scanning functions to Streamline, homogenized the apostrophe (') use, and moved the function to the last argument at Streamline creation.

Marcos Dumay de Medeiros 8 years ago
parent
commit
b1912f069b
5 changed files with 231 additions and 82 deletions
  1. 7 7
      src/System/IO/Uniform/ByteString.hs
  2. 192 46
      src/System/IO/Uniform/Streamline.hs
  3. 28 25
      test/Blocking.hs
  4. 1 1
      test/Targets.hs
  5. 3 3
      uniform-io.cabal

+ 7 - 7
src/System/IO/Uniform/ByteString.hs

@@ -45,11 +45,11 @@ instance UniformIO ByteStringIO where
   startTls _ a = return a
   isSecure _ = True
 
--- | withByteStringIO input f
+-- | withByteStringIO' input f
 --   Runs f with a ByteStringIO that has the given input, returns f's output and
 --   the ByteStringIO output.
-withByteStringIO :: ByteString -> (ByteStringIO -> IO a) -> IO (a, LBS.ByteString)
-withByteStringIO input f = do
+withByteStringIO' :: ByteString -> (ByteStringIO -> IO a) -> IO (a, LBS.ByteString)
+withByteStringIO' input f = do
   ivar <- newMVar (input, False)
   ovar <- newMVar . BSBuild.byteString $ BS.empty
   let bsio = ByteStringIO ivar ovar
@@ -57,8 +57,8 @@ withByteStringIO input f = do
   out <- takeMVar . bsiooutput $ bsio
   return (a, BSBuild.toLazyByteString out)
 
--- | The same as withByteStringIO, but returns an strict ByteString
-withByteStringIO' :: ByteString -> (ByteStringIO -> IO a) -> IO (a, ByteString)
-withByteStringIO' input f = do
-  (a, t) <- withByteStringIO input f
+-- | The same as withByteStringIO', but returns an strict ByteString
+withByteStringIO :: ByteString -> (ByteStringIO -> IO a) -> IO (a, ByteString)
+withByteStringIO input f = do
+  (a, t) <- withByteStringIO' input f
   return (a, LBS.toStrict t)

+ 192 - 46
src/System/IO/Uniform/Streamline.hs

@@ -3,7 +3,33 @@
 -- |
 -- Streamline exports a monad that, given an uniform IO target, emulates
 -- character tream IO using high performance block IO.
-module System.IO.Uniform.Streamline (Streamline, withClient, withServer, withTarget, send, send', receiveLine, lazyRecieveLine, lazyReceiveN, startTls, runAttoparsec, runAttoparsecAndReturn, isSecure, setTimeout, setEcho) where
+module System.IO.Uniform.Streamline (
+  Streamline,
+  IOScannerState,
+  withClient,
+  withServer,
+  withTarget,
+  send,
+  send',
+  recieveLine,
+  recieveLine',
+  lazyRecieveLine,
+  recieveN,
+  recieveN',
+  lazyRecieveN,
+  recieveTill,
+  recieveTill',
+  startTls,
+  runAttoparsec,
+  runAttoparsecAndReturn,
+  isSecure,
+  setTimeout,
+  setEcho,
+  runScanner,
+  runScanner',
+  scan,
+  scan'
+  ) where
 
 import qualified System.IO.Uniform as S
 import qualified System.IO.Uniform.Network as N
@@ -19,6 +45,7 @@ import qualified Data.ByteString as BS
 import qualified Data.ByteString.Lazy as LBS
 import Data.Word8 (Word8)
 import Data.IP (IP)
+import qualified Data.Char as C
 
 import qualified Data.Attoparsec.ByteString as A
 
@@ -51,8 +78,8 @@ writeF cl l = if echo cl
 -- | withServer f serverIP port
 --
 --  Connects to the given server port, runs f, and closes the connection.
-withServer :: MonadIO m => Streamline m a -> IP -> Int -> m a
-withServer f host port = do
+withServer :: MonadIO m => IP -> Int -> Streamline m a -> m a
+withServer host port f = do
   ds <- liftIO $ N.connectTo host port
   (ret, _) <- withTarget' f $ Data (SomeIO ds) defaultTimeout "" False False
   liftIO $ S.uClose ds
@@ -61,8 +88,8 @@ withServer f host port = do
 -- | withClient f boundPort
 --
 --  Accepts a connection at the bound port, runs f and closes the connection.
-withClient :: MonadIO m => (IP -> Int -> Streamline m a) -> N.BoundedPort -> m a
-withClient f port = do
+withClient :: MonadIO m => N.BoundedPort -> (IP -> Int -> Streamline m a) -> m a
+withClient port f = do
   ds <- liftIO $ N.accept port
   (peerIp, peerPort) <- liftIO $ N.getPeer ds
   (ret, _) <- withTarget' (f peerIp peerPort) $ Data (SomeIO ds) defaultTimeout "" False False
@@ -72,8 +99,8 @@ withClient f port = do
 -- | withTarget f someIO
 --
 --  Runs f wrapped on a Streamline monad that does IO on nomeIO.
-withTarget :: (MonadIO m, UniformIO a) => Streamline m b -> a -> m b
-withTarget f s = do  
+withTarget :: (MonadIO m, UniformIO a) => a -> Streamline m b -> m b
+withTarget s f = do  
   (ret, _) <- withTarget' f $ Data (SomeIO s) defaultTimeout "" False False
   return ret
 
@@ -117,57 +144,151 @@ send' r = Streamline $ \cl -> do
   mapM (writeF cl) dd
   return ((), cl)
 
--- | Receives a line from the streamlined IO target.
-receiveLine :: MonadIO m => Streamline m ByteString
-receiveLine = do
-  l <- runAttoparsec parseLine
-  case l of
-    Left _ -> return ""
-    Right l' -> return l'
+-- | State of an IO scanner.
+--   Differently from a parser scanner, an IO scanner
+--   must deal with blocking behavior.
+data IOScannerState a =
+  -- | A scanner returns Finished when the current input is not
+  --   part of the result, and the scanning must stop before this
+  --   input.
+  Finished |
+  -- | A scanner returns LastPass when the current input is the
+  --   last one of the result, and the scanning must stop before
+  --   after this input, without consuming more data.
+  LastPass a |
+  -- | A scanner returns Running when the current input is part
+  --   of the result, and the scanning must continue.
+  Running a
+
+-- | Equivalent to runScanner', but returns a strict, completely
+--   evaluated ByteString.
+runScanner :: MonadIO m => s -> (s -> Word8 -> IOScannerState s) -> Streamline m (ByteString, s)
+runScanner state scanner = do
+  (rt, st) <- runScanner' state scanner
+  return (LBS.toStrict rt, st)
+
+{- |
+Very much like Attoparsec's runScanner:
+
+runScanner' scanner initial_state
+
+Recieves data, running the scanner on each byte,
+using the scanner result as initial state for the
+next byte, and stopping when the scanner returns
+Nothing.
+
+Returns the scanned ByteString.
+ -} 
+runScanner' :: MonadIO m => s -> (s -> Word8 -> IOScannerState s) -> Streamline m (LBS.ByteString, s)
+runScanner' state scanner = Streamline $ \d ->
+  do
+    (tx, st, d') <- in_scan d state
+    return ((LBS.fromChunks tx, st), d')
+  where
+    --in_scan :: Data -> s -> m ([ByteString], s, Data)
+    in_scan d st
+      | isEOF d = eofError "System.IO.Uniform.Streamline.scan'"
+      | BS.null (buff d) = do
+        dt <- readF d
+        if BS.null dt
+          then return ([], st, d{isEOF=True})
+          else in_scan d{buff=dt} st
+      | otherwise = case sscan scanner st 0 (BS.unpack . buff $ d) of
+        AllInput st' -> do
+          (tx', st'', d') <- in_scan d{buff=""} st'
+          return (buff d:tx', st'', d')
+        SplitAt n st' -> let
+          (r, i) = BS.splitAt n (buff d)
+          in return ([r], st', d{buff=i})
+    -- I'll avoid rebuilding a list on high level code. The ByteString functions are way better.
+    sscan :: (s -> Word8 -> IOScannerState s) -> s -> Int -> [Word8] -> ScanResult s
+    sscan _ s0 _ [] = AllInput s0
+    sscan s s0 i (w:ww) = case s s0 w of
+      Finished -> SplitAt i s0
+      LastPass s1 -> SplitAt (i+1) s1
+      Running s1 -> sscan s s1 (i+1) ww
+
+data ScanResult s = SplitAt Int s | AllInput s
+
+
+-- | Equivalent to runScanner, but dischards the final state
+scan :: MonadIO m => s -> (s -> Word8 -> IOScannerState s) -> Streamline m ByteString
+scan state scanner = fst <$> runScanner state scanner
+
+-- | Equivalent to runScanner', but dischards the final state
+scan' :: MonadIO m => s -> (s -> Word8 -> IOScannerState s) -> Streamline m LBS.ByteString
+scan' state scanner = fst <$> runScanner' state scanner
+
+-- | Recieves data untill the next end of line (\n or \r\n)
+recieveLine :: MonadIO m => Streamline m ByteString
+recieveLine = recieveTill "\n"
     
--- | Receives a line from the streamlined IO target,
---  but breaks the line on reasonably sized chuncks, and
---  reads them lazyly, so that IO can be done in constant
---  memory space.
+-- | Lazy version of recieveLine
+recieveLine' :: MonadIO m => Streamline m LBS.ByteString
+recieveLine' = recieveTill' "\n"
+
+-- | Use recieveLine'.
 lazyRecieveLine :: MonadIO m => Streamline m [ByteString]
-lazyRecieveLine = Streamline $ \cl -> lazyReceiveLine' cl
+{-# DEPRECATED #-}
+lazyRecieveLine = Streamline $ \cl -> lazyRecieveLine' cl
   where
-    lazyReceiveLine' :: MonadIO m => Data -> m ([ByteString], Data)
-    lazyReceiveLine' cl' = 
+    lazyRecieveLine' :: MonadIO m => Data -> m ([ByteString], Data)
+    lazyRecieveLine' cl' = 
       if isEOF cl'
-      then eofError "System.IO.Uniform.Streamline.lazyReceiveLine"
+      then eofError "System.IO.Uniform.Streamline.lazyRecieveLine"
       else
         if BS.null $ buff cl'
         then do
           dt <- readF cl'
-          lazyReceiveLine' cl'{buff=dt}{isEOF=BS.null dt}
+          lazyRecieveLine' cl'{buff=dt}{isEOF=BS.null dt}
         else do
           let l = A.parseOnly lineWithEol $ buff cl'
           case l of
             Left _ -> do
               l' <- readF cl'
-              (ret, cl'') <- lazyReceiveLine' cl'{buff=l'}{isEOF=BS.null l'}
+              (ret, cl'') <- lazyRecieveLine' cl'{buff=l'}{isEOF=BS.null l'}
               return ((buff cl') : ret, cl'')
             Right (ret, dt) -> return ([ret], cl'{buff=dt})
 
--- | lazyReceiveN n
---  Receives n bytes of data from the streamlined IO target,
---  but breaks the data on reasonably sized chuncks, and reads
---  them lazyly, so that IO can be done in constant memory space.
-lazyReceiveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
-lazyReceiveN n' = Streamline $ \cl' -> lazyReceiveN' cl' n'
+-- | Recieves the given number of bytes.
+recieveN :: MonadIO m => Int -> Streamline m ByteString
+recieveN n = LBS.toStrict <$> recieveN' n
+
+-- | Lazy version of recieveN
+recieveN' :: MonadIO m => Int -> Streamline m LBS.ByteString
+recieveN' n = Streamline $ \cl ->
+  do
+    (tt, cl') <- recieve cl n
+    return (LBS.fromChunks tt, cl')
+  where
+    recieve d b
+      | isEOF d = eofError "System.IO.Uniform.Streamline.lazyRecieveN"
+      | BS.null . buff $ d = do
+        dt <- readF d
+        recieve d{buff=dt}{isEOF=BS.null dt} b
+      | b <= (BS.length . buff $ d) = let
+        (r, dt) = BS.splitAt b $ buff d
+        in return ([r], d{buff=dt})
+      | otherwise = do
+        (r, d') <- recieve d{buff=""} $ b - (BS.length . buff $ d)
+        return (buff d : r, d')
+
+-- | Use recieveN'.
+lazyRecieveN :: (Functor m, MonadIO m) => Int -> Streamline m [ByteString]
+{-# DEPRECATED #-}
+lazyRecieveN n' = Streamline $ \cl' -> lazyRecieveN' cl' n'
   where
-    lazyReceiveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
-    lazyReceiveN' cl n =
+    lazyRecieveN' :: (Functor m, MonadIO m) => Data -> Int -> m ([ByteString], Data)
+    lazyRecieveN' cl n =
       if isEOF cl
-      then eofError "System.IO.Uniform.Streamline.lazyReceiveN"
+      then eofError "System.IO.Uniform.Streamline.lazyRecieveN"
       else
         if BS.null (buff cl)
         then do
           b <- readF cl
           let eof = BS.null b
           let cl' = cl{buff=b}{isEOF=eof}
-          lazyReceiveN' cl' n
+          lazyRecieveN' cl' n
         else
           if n <= BS.length (buff cl)
           then let
@@ -177,10 +298,21 @@ lazyReceiveN n' = Streamline $ \cl' -> lazyReceiveN' cl' n'
           else let
             cl' = cl{buff=""}
             b = buff cl
-            in fmap (appFst b) $ lazyReceiveN' cl' (n - BS.length b)
+            in fmap (appFst b) $ lazyRecieveN' cl' (n - BS.length b)
     appFst :: a -> ([a], b) -> ([a], b)
     appFst a (l, b) = (a:l, b)
 
+-- | Recieves data until it matches the argument.
+--   Returns all of it, including the matching data.
+recieveTill :: MonadIO m => ByteString -> Streamline m ByteString
+recieveTill t = LBS.toStrict <$> recieveTill' t
+
+-- | Lazy version of recieveTill
+recieveTill' :: MonadIO m => ByteString -> Streamline m LBS.ByteString
+recieveTill' t = recieve . BS.unpack $ t
+  where
+    recieve t' = scan' [] (textScanner t')
+
 -- | Wraps the streamlined IO target on TLS, streamlining
 --  the new wrapper afterwads.
 startTls :: MonadIO m => TlsSettings -> Streamline m ()
@@ -245,12 +377,6 @@ setTimeout t = Streamline $ \cl -> return ((), cl{timeout=t})
 setEcho :: Monad m => Bool -> Streamline m ()
 setEcho e = Streamline $ \cl -> return ((), cl{echo=e})
 
-parseLine :: A.Parser ByteString
-parseLine = do
-  l <- A.takeTill isEol
-  (A.word8 13 >> A.word8 10) <|>  A.word8 10
-  return l
-  
 lineWithEol :: A.Parser (ByteString, ByteString)
 lineWithEol = do
   l <- A.scan False lineScanner
@@ -258,11 +384,31 @@ lineWithEol = do
   return (l, r)
   
 lineScanner :: Bool -> Word8 -> Maybe Bool
-lineScanner False c = Just $ isEol c
-lineScanner True c = if isEol c then Just True else Nothing
-
-isEol :: Word8 -> Bool
-isEol c = c == 13 || c == 10
+lineScanner False c 
+  | c == (fromIntegral . C.ord $ '\n') = Just True
+  | otherwise = Just False
+lineScanner True _ = Nothing
 
 eofError :: MonadIO m => String -> m a
 eofError msg = liftIO . ioError $ mkIOError eofErrorType msg Nothing Nothing
+
+textScanner :: [Word8] -> ([[Word8]] -> Word8 -> IOScannerState [[Word8]])
+textScanner [] = \_ _ -> Finished
+textScanner t@(c:_) = scanner
+  where
+    scanner st c'
+      | c == c' = popStacks c' $ t:st
+      | otherwise = popStacks c' st
+    popStacks :: Word8 -> [[Word8]] -> IOScannerState [[Word8]]
+    popStacks _ [] = Running []
+    popStacks _ ([]:_) = Finished
+    popStacks h ((h':hh):ss)
+      | h == h' && null hh = case popStacks h ss of
+        Finished -> Finished
+        LastPass ss' -> LastPass $ ss'
+        Running ss' -> LastPass $ ss'
+      | h == h' = case popStacks h ss of
+        Finished -> Finished
+        LastPass ss' -> LastPass $ hh:ss'
+        Running ss' -> Running $ hh:ss'
+      | otherwise = popStacks h ss

+ 28 - 25
test/Blocking.hs

@@ -1,6 +1,6 @@
 {-# LANGUAGE OverloadedStrings #-}
 
-module Blocking (tests) where
+module Blocking where
 
 import Distribution.TestSuite
 import Base (simpleTest)
@@ -13,11 +13,12 @@ import qualified Data.ByteString as BS
 import qualified Data.ByteString.Char8 as C8
 import qualified Data.Attoparsec.ByteString as A
 --import Control.Monad.IO.Class (liftIO)
+--import Debug.Trace
 
 tests :: IO [Test]
 tests = return [
   simpleTest "recieveLine"
-  (successTimeout "A test\n" (restoreLine S.receiveLine)),
+  (successTimeout "A test\n" (S.recieveLine)),
   simpleTest "runAttoparsec with successful parser"
   (successTimeout "abcde" (parseBS (A.string "abcde"))),
   simpleTest "runAttoparsec with failed parser"
@@ -25,7 +26,9 @@ tests = return [
   simpleTest "lazyRecieveLine"
   (successTimeout "Another test\n" (concatLine S.lazyRecieveLine)),
   simpleTest "lazyReceiveN"
-  (failTimeout "abcde" (concatLine (S.lazyReceiveN 5)))
+  (failTimeout "abcde" (concatLine (S.lazyRecieveN 5))),
+  simpleTest "recieveTill"
+  (failTimeout "abcde" (restoreLine $ S.recieveTill "de"))
   ]
 
 parseBS :: A.Parser ByteString -> S.Streamline IO ByteString
@@ -50,18 +53,18 @@ concatLine f = do
 successTimeout :: ByteString -> S.Streamline IO ByteString -> IO Progress
 successTimeout txt f = do
   recv <- bindPort 8888
-  forkIO $ S.withClient (\_ _ -> do
-                            l <- f
-                            S.send l
-                            return ()
-                        ) recv
-  r' <- timeout 1000000 $ S.withServer (do
-                                     S.send txt
-                                     t <- f
-                                     if t == txt
-                                       then return . Finished $ Pass
-                                       else return . Finished . Fail . C8.unpack $ t
-                                 ) "127.0.0.1" 8888
+  forkIO $ S.withClient recv $ \_ _ ->
+    do
+      l <- f
+      S.send l
+      return ()
+  r' <- timeout 1000000 $ S.withServer "127.0.0.1" 8888 $
+        do
+          S.send txt
+          t <- f
+          if t == txt
+            then return . Finished $ Pass
+            else return . Finished . Fail $ "Strings differ: " -- ++ show txt ++ " <> " ++ show t
   closePort recv
   case r' of
     Just r -> return r
@@ -72,16 +75,16 @@ successTimeout txt f = do
 failTimeout :: ByteString -> S.Streamline IO ByteString -> IO Progress
 failTimeout txt f = do
   recv <- bindPort 8888
-  forkIO $ S.withClient (\_ _ -> do
-                            f
-                            S.send "\n"
-                            return ()
-                        ) recv
-  r' <- timeout 1000000 $ S.withServer (do
-                                     S.send txt
-                                     S.receiveLine
-                                     return . Finished $ Pass
-                                 ) "127.0.0.1" 8888
+  forkIO $ S.withClient recv $ \_ _ ->
+    do
+      f
+      S.send "\n"
+      return ()
+  r' <- timeout 1000000 $ S.withServer "127.0.0.1" 8888 $
+        do
+          S.send txt
+          S.recieveLine
+          return . Finished $ Pass
   closePort recv
   case r' of
     Just r -> return r

+ 1 - 1
test/Targets.hs

@@ -89,7 +89,7 @@ testTls = do
 testBS :: IO Progress
 testBS = do
   let dt = "Some data to test ByteString"
-  (len, echo) <- withByteStringIO' dt (
+  (len, echo) <- withByteStringIO dt (
     \io -> let
       count = countAndEcho io :: Int -> ByteString -> IO Int
       in mapOverInput io 2 count 0

+ 3 - 3
uniform-io.cabal

@@ -10,7 +10,7 @@ name:                uniform-io
 -- PVP summary:      +-+------- breaking API changes
 --                   | | +----- non-breaking API additions
 --                   | | | +--- code changes with no API change
-version:    1.0.1.0
+version:    1.1.0.0
 
 -- A short (one-line) description of the package.
 synopsis:   Uniform IO over files, network, anything.
@@ -70,7 +70,7 @@ source-repository head
 source-repository this
   type:     git
   location: https://sealgram.com/git/haskell/uniform-io
-  tag:   1.0.1.0
+  tag:   1.1.0.0
 
 library
   -- Modules exported by the library.
@@ -104,7 +104,7 @@ library
       network >=2.4 && <3.0,
       transformers >=0.3 && <1.0,
       word8 >=0.1 && <1.0,
-      attoparsec >=0.10 && <1.0,
+      attoparsec >=0.13.0.1 && <1.0,
       data-default-class >= 0.0.1 && <1.0
   
   -- Directories containing source files.