]> Git — Sourcephile - haskell/symantic-http.git/blob - test/Hspec/Client/Stream.hs
Add streaming support through pipes
[haskell/symantic-http.git] / test / Hspec / Client / Stream.hs
1 {-# LANGUAGE OverloadedStrings #-}
2 {-# LANGUAGE NoMonomorphismRestriction #-}
3 {-# OPTIONS_GHC -Wno-missing-signatures #-}
4 module Hspec.Client.Stream where
5
6 import Control.Monad (when)
7 import Data.Bool
8 import Data.Either (Either(..))
9 import Data.Eq (Eq(..))
10 import Data.Foldable (null)
11 import Data.Functor ((<$))
12 import Data.Int (Int)
13 import Data.Ord (Ord(..))
14 import Data.String (String)
15 import Data.Tuple (snd)
16 import Prelude (Num(..))
17 import qualified Control.Concurrent as Concurrent
18 import qualified Control.Monad.Classes as MC
19 import qualified Network.Wai.Handler.Warp as Warp
20 import qualified Pipes as P
21 import qualified Pipes.Prelude as P
22 {-
23 import qualified Pipes.Safe as Ps
24 import qualified Pipes.Safe.Prelude as Ps
25 import qualified Pipes.ByteString as Pbs
26 -}
27
28 import Symantic.HTTP
29 import Symantic.HTTP.Utils
30 import Symantic.HTTP.Pipes ()
31 import Hspec.Utils
32 import Hspec.Client.Server
33
34 type instance MC.CanDo (P.Proxy a' a b' b m) (MC.EffExec IO) = 'False
35
36 api = ("slow"
37 </> capture @Int "n"
38 <.> getStream @(P.Producer Int IO ()) @'[PlainText] @NewlineFraming
39 <!> "wait"
40 </> getStream @(P.Producer Int IO String) @'[PlainText] @NewlineFraming
41 <!> "cat"
42 </> bodyStream @(P.Producer Int IO String) @'[PlainText] @NewlineFraming
43 <.> getStream @(P.Producer Int IO String) @'[PlainText] @NewlineFraming
44 <!> "netstring" </>
45 ("slow"
46 </> capture @Int "n"
47 <.> getStream @(P.Producer Int IO String) @'[PlainText] @NetstringFraming
48 <!> "cat"
49 </> bodyStream @(P.Producer Int IO String) @'[PlainText] @NetstringFraming
50 <.> getStream @(P.Producer Int IO String) @'[PlainText] @NetstringFraming
51 )
52 )
53
54 client_slow
55 :!: client_wait
56 :!: client_cat
57 :!:( client_netstring_slow
58 :!: client_netstring_cat
59 )
60 = client api
61
62 srv v = server api $
63 route_slow
64 :!: route_wait
65 :!: route_cat
66 :!:( route_netstring_slow
67 :!: route_netstring_cat
68 )
69 where
70 route_slow n =
71 return $ P.for (fastPipe () n) $ \a -> do
72 P.yield a
73 liftIO $ Concurrent.threadDelay 100000
74 route_wait = return $ fastPipe "" 9 P.>-> sendAck v
75 route_cat (ServerBodyStreamArg b) =
76 return $ ("RequestBody failed" <$ b) P.>-> sendAck v
77 route_netstring_slow n =
78 return $ P.for (fastPipe "" n) $ \a -> do
79 P.yield a
80 liftIO $ Concurrent.threadDelay 100000
81 route_netstring_cat (ServerBodyStreamArg b) =
82 return $ ("RequestBody failed" <$ b) P.>-> sendAck v
83 fastPipe r n = (`P.unfoldr` 0) $ \i -> do
84 -- putStrLn $ "fastPipe: i:"<>show (i+1)
85 return $ if i < n then Right (i+1,i+1) else Left r
86
87 warp :: IO ()
88 warp = do
89 v <- Concurrent.newEmptyMVar
90 Warp.run 8080 $ srv v
91
92 checkAck ::
93 Concurrent.MVar Int ->
94 P.Pipe Int Int IO String
95 checkAck recv = "checkAck failed" <$ go
96 where go = do
97 -- liftIO $ putStrLn $ "checkAck: await"
98 exp <- P.await
99 -- liftIO $ putStrLn $ "checkAck: exp:"<>show exp
100 got <- liftIO $ Concurrent.takeMVar recv
101 -- liftIO $ putStrLn $ "checkAck: got:"<>show got
102 when (got == exp) $ do
103 P.yield exp
104 go
105
106 sendAck ::
107 Concurrent.MVar Int ->
108 P.Pipe Int Int IO String
109 sendAck send = "sendAck failed" <$ go
110 where go = do
111 got <- P.await
112 P.yield got
113 -- liftIO $ putStrLn $ "sendAck: exp:"<>show got
114 liftIO $ Concurrent.putMVar send got
115 -- liftIO $ putStrLn $ "sendAck: sent:"<>show got
116 go
117
118 hspec :: IO [TestTree]
119 hspec = testSpecs $ describe "Pipes" $
120 before (do
121 -- NOTE: use 'before' not 'beforeAll' because
122 -- 'v' must be reset after testing an expected failure.
123 v <- Concurrent.newEmptyMVar
124 (v,) <$> runTestServer (srv v)) $
125 after (killTestServer . snd) $ do
126 it "can slowly count down" $ \(_v, TestServer{..}) -> do
127 (`shouldReturn` Right [1..4]) $
128 runClientStream env (client_slow 4) $
129 let go p = P.next p >>= \case
130 Left () -> return []
131 Right (a,next) -> (a :) <$> go next in
132 go
133 it "can check failing count down" $ \(v, TestServer{..}) ->
134 (`shouldReturn` Right (Left "checkAck failed")) $
135 runClientStream env client_wait $ \pipe ->
136 runPipe $ pipe P.>-> P.map (* 2) P.>-> checkAck v
137 it "can check each count down" $ \(v, TestServer{..}) ->
138 (`shouldReturn` Right (Right [1..9])) $
139 runClientStream env client_wait $ \pipe ->
140 runPipe $ pipe P.>-> checkAck v
141 it "can re-stream the request's body" $ \(v, TestServer{..}) ->
142 (`shouldReturn` Right (Right [0..99])) $
143 runClientStream env (client_cat $ ClientBodyStreamArg $
144 ("each" <$ P.each [0..99])) $ \pipe ->
145 runPipe $ pipe P.>-> checkAck v
146 describe "NetString" $ do
147 it "can slowly count down" $ \(_v, TestServer{..}) -> do
148 (`shouldReturn` Right (Right [1..4])) $
149 runClientStream env (client_netstring_slow 4) $ \pipe ->
150 runPipe $ pipe
151 it "can re-stream the request's body" $ \(v, TestServer{..}) ->
152 (`shouldReturn` Right (Right [0..99])) $
153 runClientStream env (client_netstring_cat $ ClientBodyStreamArg $
154 ("each" <$ P.each [0..99])) $ \pipe ->
155 runPipe $ pipe P.>-> checkAck v
156
157 runPipe :: Monad m => P.Producer a m String -> m (Either String [a])
158 runPipe = go
159 where
160 go p = P.next p >>= \case
161 Left err | null err -> return $ Right []
162 | otherwise -> return $ Left err
163 Right (a,next) -> ((a :) <$>) <$> go next