{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NoMonomorphismRestriction #-} {-# OPTIONS_GHC -Wno-missing-signatures #-} module Hspec.Pipes where import Control.Monad (when) import Data.Bool import Data.Either (Either(..)) import Data.Eq (Eq(..)) import Data.Foldable (null) import Data.Functor ((<$)) import Data.Int (Int) import Data.Ord (Ord(..)) import Data.String (String) import Data.Tuple (snd) import Prelude (Num(..)) import qualified Control.Concurrent as Concurrent import qualified Control.Monad.Classes as MC import qualified Network.Wai.Handler.Warp as Warp import qualified Pipes as P import qualified Pipes.Prelude as P {- import qualified Pipes.Safe as Ps import qualified Pipes.Safe.Prelude as Ps import qualified Pipes.ByteString as Pbs -} import Symantic.HTTP import Symantic.HTTP.Client import Symantic.HTTP.Server import Symantic.HTTP.Pipes () import Hspec.Utils import Hspec.Utils.Server api = "slow" capture @Int "n" <.> getStream @(P.Producer Int IO ()) @'[PlainText] @NewlineFraming "wait" getStream @(P.Producer Int IO String) @'[PlainText] @NewlineFraming "cat" bodyStream @(P.Producer Int IO String) @'[PlainText] @NewlineFraming <.> getStream @(P.Producer Int IO String) @'[PlainText] @NewlineFraming "netstring" ("slow" capture @Int "n" <.> getStream @(P.Producer Int IO String) @'[PlainText] @NetstringFraming "cat" bodyStream @(P.Producer Int IO String) @'[PlainText] @NetstringFraming <.> getStream @(P.Producer Int IO String) @'[PlainText] @NetstringFraming ) client_slow :!: client_wait :!: client_cat :!:( client_netstring_slow :!: client_netstring_cat ) = client api srv v = server api $ route_slow :!: route_wait :!: route_cat :!:( route_netstring_slow :!: route_netstring_cat ) where route_slow n = return $ P.for (fastPipe () n) $ \a -> do P.yield a MC.exec @IO $ Concurrent.threadDelay 100000 route_wait = return $ fastPipe "" 9 P.>-> sendAck v route_cat (ServerBodyStreamArg b) = return $ ("RequestBody failed" <$ b) P.>-> sendAck v route_netstring_slow n = return $ P.for (fastPipe "" n) $ \a -> do P.yield a MC.exec @IO $ Concurrent.threadDelay 100000 route_netstring_cat (ServerBodyStreamArg b) = return $ ("RequestBody failed" <$ b) P.>-> sendAck v fastPipe r n = (`P.unfoldr` 0) $ \i -> do -- putStrLn $ "fastPipe: i:"<>show (i+1) return $ if i < n then Right (i+1,i+1) else Left r warp :: IO () warp = do v <- Concurrent.newEmptyMVar Warp.run 8080 $ srv v checkAck :: Concurrent.MVar Int -> P.Pipe Int Int IO String checkAck recv = "checkAck failed" <$ go where go = do -- MC.exec @IO $ putStrLn $ "checkAck: await" exp <- P.await -- MC.exec @IO $ putStrLn $ "checkAck: exp:"<>show exp got <- MC.exec @IO $ Concurrent.takeMVar recv -- MC.exec @IO $ putStrLn $ "checkAck: got:"<>show got when (got == exp) $ do P.yield exp go sendAck :: Concurrent.MVar Int -> P.Pipe Int Int IO String sendAck send = "sendAck failed" <$ go where go = do got <- P.await P.yield got -- MC.exec @IO $ putStrLn $ "sendAck: exp:"<>show got MC.exec @IO $ Concurrent.putMVar send got -- MC.exec @IO $ putStrLn $ "sendAck: sent:"<>show got go hspec :: IO [TestTree] hspec = testSpecs $ describe "Pipes" $ before (do -- NOTE: use 'before' not 'beforeAll' because -- 'v' must be reset after testing an expected failure. v <- Concurrent.newEmptyMVar (v,) <$> runTestServer (srv v)) $ after (killTestServer . snd) $ do it "can slowly count down" $ \(_v, TestServer{..}) -> do (`shouldReturn` Right [1..4]) $ runClientStream env (client_slow 4) $ let go p = P.next p >>= \case Left () -> return [] Right (a,next) -> (a :) <$> go next in go it "can check failing count down" $ \(v, TestServer{..}) -> (`shouldReturn` Right (Left "checkAck failed")) $ runClientStream env client_wait $ \pipe -> runPipe $ pipe P.>-> P.map (* 2) P.>-> checkAck v it "can check each count down" $ \(v, TestServer{..}) -> (`shouldReturn` Right (Right [1..9])) $ runClientStream env client_wait $ \pipe -> runPipe $ pipe P.>-> checkAck v it "can re-stream the request's body" $ \(v, TestServer{..}) -> (`shouldReturn` Right (Right [0..99])) $ runClientStream env (client_cat $ ClientBodyStreamArg $ ("each" <$ P.each [0..99])) $ \pipe -> runPipe $ pipe P.>-> checkAck v describe "NetString" $ do it "can slowly count down" $ \(_v, TestServer{..}) -> do (`shouldReturn` Right (Right [1..4])) $ runClientStream env (client_netstring_slow 4) $ \pipe -> runPipe $ pipe it "can re-stream the request's body" $ \(v, TestServer{..}) -> (`shouldReturn` Right (Right [0..99])) $ runClientStream env (client_netstring_cat $ ClientBodyStreamArg $ ("each" <$ P.each [0..99])) $ \pipe -> runPipe $ pipe P.>-> checkAck v runPipe :: Monad m => P.Producer a m String -> m (Either String [a]) runPipe = go where go p = P.next p >>= \case Left err | null err -> return $ Right [] | otherwise -> return $ Left err Right (a,next) -> ((a :) <$>) <$> go next