1 {-# LANGUAGE OverloadedStrings #-}
2 {-# LANGUAGE NoMonomorphismRestriction #-}
3 module Hspec.Server.Stream where
5 import Control.Monad (when)
7 import Data.Either (Either(..))
8 import Data.Eq (Eq(..))
9 import Data.Ord (Ord(..))
10 import Data.Functor ((<$))
12 import Data.Maybe (Maybe(..))
13 import Data.Semigroup (Semigroup(..))
14 import Data.String (String)
15 import Prelude ((+), (-))
16 import System.IO (putStrLn)
17 import Test.Hspec.Wai (liftIO)
18 import Text.Read (readMaybe)
19 import Text.Show (Show(..))
20 import qualified Control.Concurrent as Concurrent
21 import qualified Data.ByteString.Base64 as BS64
22 import qualified Data.ByteString.Lazy as BSL
23 import qualified Data.Text as Text
24 import qualified Data.Text.Encoding as Text
25 import qualified Data.Text.Lazy as TL
26 import qualified Data.Text.Lazy.Encoding as TL
27 import qualified Network.HTTP.Types as HTTP
28 import qualified Network.Wai.Handler.Warp as Warp
29 import qualified Test.Hspec.Wai as Wai
30 import qualified Control.Monad.Classes as MC
36 api = "stream" </> "slow"
38 <.> getStream @(SourceIO Int) @'[PlainText] @NewlineFraming
40 srv = server api $ route_slow
42 route_slow n = MC.exec $ do
43 putStrLn $ "/stream/slow/" <> show n
44 return $ mapStepT (delaySource 100000) $ fastSource n
45 fastSource n = source [1..n]
46 delaySource t = mapYield $ \a next ->
48 -- NOTE: add a delay after each yielded value.
49 Concurrent.threadDelay t
50 return $ delaySource t next
55 warp = Warp.run 8080 srv
59 hspec = testSpecs $ describe "Error" $ Wai.with (return srv) $ do
61 it "checks shorter path" $ do
63 `Wai.shouldRespondWith` 404
64 it "checks longer path" $ do
65 Wai.get "/good/path/bad"
66 `Wai.shouldRespondWith` 404
70 type FastAPI = "get" :> Capture "num" Int :> StreamGet NewlineFraming JSON (SourceIO Int)
73 :<|> "slow" :> Capture "num" Int :> StreamGet NewlineFraming JSON (SourceIO Int)
74 -- monad can be ResourceT IO too.
75 :<|> "readme" :> StreamGet NoFraming OctetStream (SourceIO BS.ByteString)
76 -- we can have streaming request body
78 :> StreamBody NoFraming OctetStream (SourceIO BS.ByteString)
79 :> StreamPost NoFraming OctetStream (SourceIO BS.ByteString)
85 server = fast :<|> slow :<|> readme :<|> proxy where
87 putStrLn $ "/get/" ++ show n
91 putStrLn $ "/slow/" ++ show n
96 return (S.readFile "README.md")
102 -- for some reason unfold leaks?
103 fastSource = S.fromStepT . mk where
106 | otherwise = S.Yield m (mk (m - 1))
108 slowSource m = S.mapStepT delay (fastSource m) where
109 delay S.Stop = S.Stop
110 delay (S.Error err) = S.Error err
111 delay (S.Skip s) = S.Skip (delay s)
112 delay (S.Effect ms) = S.Effect (delay <$> ms)
113 delay (S.Yield x s) = S.Effect $
114 S.Yield x (delay s) <$ threadDelay 1000000
117 app = serve api server
119 cli :: Client ClientM FastAPI
120 cli :<|> _ :<|> _ :<|> _ = client api
127 putStrLn "Starting cookbook-basic-streaming at http://localhost:8000"
128 port <- fromMaybe 8000 . (>>= readMaybe) <$> lookupEnv "PORT"
130 ("client":ns:_) -> do
131 n <- maybe (fail $ "not a number: " ++ ns) pure $ readMaybe ns
132 mgr <- newManager defaultManagerSettings
133 burl <- parseBaseUrl "http://localhost:8000/"
134 withClientM (cli n) (mkClientEnv mgr burl) $ \me -> case me of
135 Left err -> print err
137 x <- S.unSourceT src (go (0 :: Int))
140 go !acc S.Stop = return acc
141 go !acc (S.Error err) = print err >> return acc
142 go !acc (S.Skip s) = go acc s
143 go !acc (S.Effect ms) = ms >>= go acc
144 go !acc (S.Yield _ s) = go (acc + 1) s
147 putStrLn "cabal new-run cookbook-basic-streaming server"
148 putStrLn "cabal new-run cookbook-basic-streaming client 10"
149 putStrLn "time curl -H 'Accept: application/json' localhost:8000/slow/5"