]> Git — Sourcephile - gargantext.git/blob - tests/queue/Main.hs
Remove need for liftBase when using the jobs api
[gargantext.git] / tests / queue / Main.hs
1 {-# LANGUAGE DeriveGeneric #-}
2 {-# LANGUAGE ScopedTypeVariables #-}
3 {-# LANGUAGE TypeApplications #-}
4 {-# LANGUAGE TypeFamilies #-}
5 {-# LANGUAGE NumericUnderscores #-}
6 module Main where
7
8 import Control.Concurrent
9 import qualified Control.Concurrent.Async as Async
10 import Control.Concurrent.STM
11 import Control.Exception
12 import Control.Monad
13 import Control.Monad.Reader
14 import Data.Aeson
15 import Data.Either
16 import Data.List
17 import Data.Sequence (Seq)
18 import GHC.Generics
19 import GHC.Stack
20 import Prelude
21 import System.IO.Unsafe
22 import Network.HTTP.Client.TLS (newTlsManager)
23 import Network.HTTP.Client (Manager)
24 import Test.Hspec
25 import qualified Servant.Job.Types as SJ
26 import qualified Servant.Job.Core as SJ
27
28 import Gargantext.Utils.Jobs.Internal (newJob)
29 import Gargantext.Utils.Jobs.Map
30 import Gargantext.Utils.Jobs.Monad hiding (withJob)
31 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
32 import Gargantext.Utils.Jobs.State
33
34 data JobT = A | B deriving (Eq, Ord, Show, Enum, Bounded)
35
36 data Counts = Counts { countAs :: Int, countBs :: Int }
37 deriving (Eq, Show)
38
39 inc, dec :: JobT -> Counts -> Counts
40 inc A cs = cs { countAs = countAs cs + 1 }
41 inc B cs = cs { countBs = countBs cs + 1 }
42 dec A cs = cs { countAs = countAs cs - 1 }
43 dec B cs = cs { countBs = countBs cs - 1 }
44
45 jobDuration, initialDelay :: Int
46 jobDuration = 100000
47 initialDelay = 20000
48
49 testMaxRunners :: IO ()
50 testMaxRunners = do
51 -- max runners = 2 with default settings
52 k <- genSecret
53 let settings = defaultJobSettings 2 k
54 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
55 runningJs <- newTVarIO []
56 let j num _jHandle _inp _l = do
57 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
58 threadDelay jobDuration
59 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
60 jobs = [ j n | n <- [1..4::Int] ]
61 _jids <- forM jobs $ \f -> pushJob A () f settings st
62 threadDelay initialDelay
63 r1 <- readTVarIO runningJs
64 sort r1 `shouldBe` ["Job #1", "Job #2"]
65 threadDelay jobDuration
66 r2 <- readTVarIO runningJs
67 sort r2 `shouldBe` ["Job #3", "Job #4"]
68 threadDelay jobDuration
69 r3 <- readTVarIO runningJs
70 r3 `shouldBe` []
71
72 testPrios :: IO ()
73 testPrios = do
74 k <- genSecret
75 let settings = defaultJobSettings 2 k
76 st :: JobsState JobT [String] () <- newJobsState settings $
77 applyPrios [(B, 10)] defaultPrios -- B has higher priority
78 runningJs <- newTVarIO (Counts 0 0)
79 let j jobt _jHandle _inp _l = do
80 atomically $ modifyTVar runningJs (inc jobt)
81 threadDelay jobDuration
82 atomically $ modifyTVar runningJs (dec jobt)
83 jobs = [ (A, j A)
84 , (A, j A)
85 , (B, j B)
86 , (B, j B)
87 ]
88 _jids <- forM jobs $ \(t, f) -> do
89 pushJob t () f settings st
90 threadDelay (2*initialDelay)
91 r1 <- readTVarIO runningJs
92 r1 `shouldBe` (Counts 0 2)
93 threadDelay jobDuration
94 r2 <- readTVarIO runningJs
95 r2 `shouldBe` (Counts 2 0)
96 threadDelay jobDuration
97 r3 <- readTVarIO runningJs
98 r3 `shouldBe` (Counts 0 0)
99
100 testExceptions :: IO ()
101 testExceptions = do
102 k <- genSecret
103 let settings = defaultJobSettings 2 k
104 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
105 jid <- pushJob A ()
106 (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
107 settings st
108 threadDelay initialDelay
109 mjob <- lookupJob jid (jobsData st)
110 case mjob of
111 Nothing -> error "boo"
112 Just je -> case jTask je of
113 DoneJ _ r -> isLeft r `shouldBe` True
114 _ -> error "boo2"
115 return ()
116
117 testFairness :: IO ()
118 testFairness = do
119 k <- genSecret
120 let settings = defaultJobSettings 2 k
121 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
122 runningJs <- newTVarIO (Counts 0 0)
123 let j jobt _jHandle _inp _l = do
124 atomically $ modifyTVar runningJs (inc jobt)
125 threadDelay jobDuration
126 atomically $ modifyTVar runningJs (dec jobt)
127 jobs = [ (A, j A)
128 , (A, j A)
129 , (B, j B)
130 , (A, j A)
131 , (A, j A)
132 ]
133 _jids <- forM jobs $ \(t, f) -> do
134 pushJob t () f settings st
135 threadDelay initialDelay
136 r1 <- readTVarIO runningJs
137 r1 `shouldBe` (Counts 2 0)
138 threadDelay jobDuration
139 r2 <- readTVarIO runningJs
140 r2 `shouldBe` (Counts 1 1) -- MOST IMPORTANT CHECK: the B got picked after the
141 -- two As, because it was inserted right after them
142 -- and has equal priority.
143 threadDelay jobDuration
144 r3 <- readTVarIO runningJs
145 r3 `shouldBe` (Counts 1 0)
146 threadDelay jobDuration
147 r4 <- readTVarIO runningJs
148 r4 `shouldBe` (Counts 0 0)
149
150 data MyDummyJob
151 = MyDummyJob
152 deriving (Show, Eq, Ord, Enum, Bounded)
153
154 data MyDummyError
155 = SomethingWentWrong JobError
156 deriving (Show)
157
158 instance Exception MyDummyError where
159 toException _ = toException (userError "SomethingWentWrong")
160
161 instance ToJSON MyDummyError where
162 toJSON (SomethingWentWrong _) = String "SomethingWentWrong"
163
164 type Progress = Int
165
166 data MyDummyLog =
167 Step_0 !Progress
168 | Step_1 !Progress
169 deriving (Show, Eq, Ord, Generic)
170
171 instance Monoid MyDummyLog where
172 mempty = Step_0 0
173
174 instance Semigroup MyDummyLog where
175 _ <> _ = error "not needed"
176
177 instance ToJSON MyDummyLog
178
179 newtype MyDummyEnv = MyDummyEnv { _MyDummyEnv :: JobEnv MyDummyJob (Seq MyDummyLog) () }
180
181 newtype MyDummyMonad a =
182 MyDummyMonad { _MyDummyMonad :: ReaderT MyDummyEnv IO a }
183 deriving (Functor, Applicative, Monad, MonadIO, MonadReader MyDummyEnv)
184
185 runMyDummyMonad :: MyDummyEnv -> MyDummyMonad a -> IO a
186 runMyDummyMonad env = flip runReaderT env . _MyDummyMonad
187
188 instance MonadJob MyDummyMonad MyDummyJob (Seq MyDummyLog) () where
189 getJobEnv = asks _MyDummyEnv
190
191 instance MonadJobStatus MyDummyMonad where
192 type JobType MyDummyMonad = MyDummyJob
193 type JobOutputType MyDummyMonad = ()
194 type JobEventType MyDummyMonad = MyDummyLog
195
196 testTlsManager :: Manager
197 testTlsManager = unsafePerformIO newTlsManager
198 {-# NOINLINE testTlsManager #-}
199
200 shouldBeE :: (MonadIO m, HasCallStack, Show a, Eq a) => a -> a -> m ()
201 shouldBeE a b = liftIO (shouldBe a b)
202
203 type TheEnv = JobEnv MyDummyJob (Seq MyDummyLog) ()
204
205 withJob :: TheEnv
206 -> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
207 -> IO (SJ.JobStatus 'SJ.Safe MyDummyLog)
208 withJob myEnv f = runMyDummyMonad (MyDummyEnv myEnv) $
209 newJob @_ @MyDummyError getJobEnv MyDummyJob (\env hdl input ->
210 runMyDummyMonad (MyDummyEnv myEnv) $ f env hdl input) (SJ.JobInput () Nothing)
211
212 withJob_ :: TheEnv
213 -> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
214 -> IO ()
215 withJob_ env f = void (withJob env f)
216
217 testFetchJobStatus :: IO ()
218 testFetchJobStatus = do
219 k <- genSecret
220 let settings = defaultJobSettings 2 k
221 myEnv <- newJobEnv settings defaultPrios testTlsManager
222 evts <- newMVar []
223
224 withJob_ myEnv $ \_ hdl _input -> do
225 mb_status <- getLatestJobStatus hdl
226
227 -- now let's log something
228 updateJobProgress hdl (const $ Step_0 20)
229 mb_status' <- getLatestJobStatus hdl
230 updateJobProgress hdl (\(Step_0 x) -> Step_0 (x + 5))
231 mb_status'' <- getLatestJobStatus hdl
232
233 liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
234 pure $ Right ()
235
236 threadDelay 500_000
237 -- Check the events
238 readMVar evts >>= \expected -> expected `shouldBe` [Nothing, Just (Step_0 20), Just (Step_0 25)]
239
240 testFetchJobStatusNoContention :: IO ()
241 testFetchJobStatusNoContention = do
242 k <- genSecret
243 let settings = defaultJobSettings 2 k
244 myEnv <- newJobEnv settings defaultPrios testTlsManager
245
246 evts1 <- newMVar []
247 evts2 <- newMVar []
248
249 let job1 = \() -> withJob_ myEnv $ \_ hdl _input -> do
250 updateJobProgress hdl (const $ Step_1 100)
251 mb_status <- getLatestJobStatus hdl
252 liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
253 pure $ Right ()
254
255 let job2 = \() -> withJob_ myEnv $ \_ hdl _input -> do
256 updateJobProgress hdl (const $ Step_0 50)
257 mb_status <- getLatestJobStatus hdl
258 liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
259 pure $ Right ()
260
261 Async.forConcurrently_ [job1, job2] ($ ())
262 threadDelay 500_000
263 -- Check the events
264 readMVar evts1 >>= \expected -> expected `shouldBe` [Just (Step_1 100)]
265 readMVar evts2 >>= \expected -> expected `shouldBe` [Just (Step_0 50)]
266
267 main :: IO ()
268 main = hspec $ do
269 describe "job queue" $ do
270 it "respects max runners limit" $
271 testMaxRunners
272 it "respects priorities" $
273 testPrios
274 it "can handle exceptions" $
275 testExceptions
276 it "fairly picks equal-priority-but-different-kind jobs" $
277 testFairness
278 describe "job status update and tracking" $ do
279 it "can fetch the latest job status" $
280 testFetchJobStatus
281 it "can spin two separate jobs and track their status separately" $
282 testFetchJobStatusNoContention