1 {-# LANGUAGE OverloadedStrings #-}
2 {-# LANGUAGE NoMonomorphismRestriction #-}
3 {-# OPTIONS_GHC -Wno-missing-signatures #-}
4 module Hspec.Pipes where
6 import Control.Monad (when)
8 import Data.Either (Either(..))
9 import Data.Eq (Eq(..))
10 import Data.Foldable (null)
11 import Data.Functor ((<$))
13 import Data.Ord (Ord(..))
14 import Data.String (String)
15 import Data.Tuple (snd)
16 import Prelude (Num(..))
17 import qualified Control.Concurrent as Concurrent
18 import qualified Control.Monad.Classes as MC
19 import qualified Network.Wai.Handler.Warp as Warp
20 import qualified Pipes as P
21 import qualified Pipes.Prelude as P
23 import qualified Pipes.Safe as Ps
24 import qualified Pipes.Safe.Prelude as Ps
25 import qualified Pipes.ByteString as Pbs
29 import Symantic.HTTP.Client
30 import Symantic.HTTP.Server
31 import Symantic.HTTP.Pipes ()
33 import Hspec.Utils.Server
36 "slow" </> capture @Int "n"
37 <.> getStream @(P.Producer Int IO ())
41 "wait" </> getStream @(P.Producer Int IO String)
45 "cat" </> bodyStream @(P.Producer Int IO String)
48 <.> getStream @(P.Producer Int IO String)
52 "netstring" </> ("slow" </> capture @Int "n"
53 <.> getStream @(P.Producer Int IO String)
57 "cat" </> bodyStream @(P.Producer Int IO String)
60 <.> getStream @(P.Producer Int IO String)
68 :!:( client_netstring_slow
69 :!: client_netstring_cat
77 :!:( route_netstring_slow
78 :!: route_netstring_cat
82 return $ P.for (fastPipe () n) $ \a -> do
84 MC.exec @IO $ Concurrent.threadDelay 100000
85 route_wait = return $ fastPipe "" 9 P.>-> sendAck v
86 route_cat (ServerBodyStreamArg b) =
87 return $ ("RequestBody failed" <$ b) P.>-> sendAck v
88 route_netstring_slow n =
89 return $ P.for (fastPipe "" n) $ \a -> do
91 MC.exec @IO $ Concurrent.threadDelay 100000
92 route_netstring_cat (ServerBodyStreamArg b) =
93 return $ ("RequestBody failed" <$ b) P.>-> sendAck v
94 fastPipe r n = (`P.unfoldr` 0) $ \i -> do
95 -- putStrLn $ "fastPipe: i:"<>show (i+1)
96 return $ if i < n then Right (i+1,i+1) else Left r
100 v <- Concurrent.newEmptyMVar
101 Warp.run 8080 $ srv v
104 Concurrent.MVar Int ->
105 P.Pipe Int Int IO String
106 checkAck recv = "checkAck failed" <$ go
108 -- MC.exec @IO $ putStrLn $ "checkAck: await"
110 -- MC.exec @IO $ putStrLn $ "checkAck: exp:"<>show exp
111 got <- MC.exec @IO $ Concurrent.takeMVar recv
112 -- MC.exec @IO $ putStrLn $ "checkAck: got:"<>show got
113 when (got == exp) $ do
118 Concurrent.MVar Int ->
119 P.Pipe Int Int IO String
120 sendAck send = "sendAck failed" <$ go
124 -- MC.exec @IO $ putStrLn $ "sendAck: exp:"<>show got
125 MC.exec @IO $ Concurrent.putMVar send got
126 -- MC.exec @IO $ putStrLn $ "sendAck: sent:"<>show got
129 hspec :: IO [TestTree]
130 hspec = testSpecs $ describe "Pipes" $
132 -- NOTE: use 'before' not 'beforeAll' because
133 -- 'v' must be reset after testing an expected failure.
134 v <- Concurrent.newEmptyMVar
135 (v,) <$> runTestServer (srv v)) $
136 after (killTestServer . snd) $ do
137 it "can slowly count down" $ \(_v, TestServer{..}) -> do
138 (`shouldReturn` Right [1..4]) $
139 runClientStream env (client_slow 4) $
140 let go p = P.next p >>= \case
142 Right (a,next) -> (a :) <$> go next in
144 it "can check failing count down" $ \(v, TestServer{..}) ->
145 (`shouldReturn` Right (Left "checkAck failed")) $
146 runClientStream env client_wait $ \pipe ->
147 runPipe $ pipe P.>-> P.map (* 2) P.>-> checkAck v
148 it "can check each count down" $ \(v, TestServer{..}) ->
149 (`shouldReturn` Right (Right [1..9])) $
150 runClientStream env client_wait $ \pipe ->
151 runPipe $ pipe P.>-> checkAck v
152 it "can re-stream the request's body" $ \(v, TestServer{..}) ->
153 (`shouldReturn` Right (Right [0..99])) $
154 runClientStream env (client_cat $ ClientBodyStreamArg $
155 ("each" <$ P.each [0..99])) $ \pipe ->
156 runPipe $ pipe P.>-> checkAck v
157 describe "NetString" $ do
158 it "can slowly count down" $ \(_v, TestServer{..}) -> do
159 (`shouldReturn` Right (Right [1..4])) $
160 runClientStream env (client_netstring_slow 4) $ \pipe ->
162 it "can re-stream the request's body" $ \(v, TestServer{..}) ->
163 (`shouldReturn` Right (Right [0..99])) $
164 runClientStream env (client_netstring_cat $ ClientBodyStreamArg $
165 ("each" <$ P.each [0..99])) $ \pipe ->
166 runPipe $ pipe P.>-> checkAck v
168 runPipe :: Monad m => P.Producer a m String -> m (Either String [a])
171 go p = P.next p >>= \case
172 Left err | null err -> return $ Right []
173 | otherwise -> return $ Left err
174 Right (a,next) -> ((a :) <$>) <$> go next