]> Git — Sourcephile - gargantext.git/blob - src-test/Utils/Jobs.hs
Move tests under a single umbrella (tasty)
[gargantext.git] / src-test / Utils / Jobs.hs
1 {-# LANGUAGE DeriveGeneric #-}
2 {-# LANGUAGE ScopedTypeVariables #-}
3 {-# LANGUAGE TypeApplications #-}
4 {-# LANGUAGE TypeFamilies #-}
5 {-# LANGUAGE NumericUnderscores #-}
6 module Utils.Jobs (test) where
7
8 import Control.Concurrent
9 import qualified Control.Concurrent.Async as Async
10 import Control.Concurrent.STM
11 import Control.Exception
12 import Control.Monad
13 import Control.Monad.Reader
14 import Control.Monad.Except
15 import Data.Maybe
16 import Data.Either
17 import Data.List
18 import Data.Sequence (Seq, (|>), fromList)
19 import Data.Time
20 import GHC.Stack
21 import Prelude
22 import System.IO.Unsafe
23 import Network.HTTP.Client.TLS (newTlsManager)
24 import Network.HTTP.Client (Manager)
25 import Test.Hspec
26 import qualified Servant.Job.Types as SJ
27 import qualified Servant.Job.Core as SJ
28
29 import Gargantext.Utils.Jobs.Internal (newJob)
30 import Gargantext.Utils.Jobs.Map
31 import Gargantext.Utils.Jobs.Monad hiding (withJob)
32 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
33 import Gargantext.Utils.Jobs.State
34 import Gargantext.API.Prelude
35 import Gargantext.API.Admin.EnvTypes as EnvTypes
36 import Gargantext.API.Admin.Orchestrator.Types
37
38
39 data JobT = A
40 | B
41 | C
42 | D
43 deriving (Eq, Ord, Show, Enum, Bounded)
44
45 -- | This type models the schedule picked up by the orchestrator.
46 newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show)
47
48 addJobToSchedule :: JobT -> MVar JobSchedule -> IO ()
49 addJobToSchedule jobt mvar = do
50 modifyMVar_ mvar $ \js -> do
51 let js' = js { _JobSchedule = _JobSchedule js |> jobt }
52 pure js'
53
54 data Counts = Counts { countAs :: Int, countBs :: Int }
55 deriving (Eq, Show)
56
57 jobDuration, initialDelay :: Int
58 jobDuration = 100000
59 initialDelay = 20000
60
61 testMaxRunners :: IO ()
62 testMaxRunners = do
63 -- max runners = 2 with default settings
64 k <- genSecret
65 let settings = defaultJobSettings 2 k
66 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
67 runningJs <- newTVarIO []
68 let j num _jHandle _inp _l = do
69 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
70 threadDelay jobDuration
71 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
72 jobs = [ j n | n <- [1..4::Int] ]
73 _jids <- forM jobs $ \f -> pushJob A () f settings st
74 threadDelay initialDelay
75 r1 <- readTVarIO runningJs
76 sort r1 `shouldBe` ["Job #1", "Job #2"]
77 threadDelay jobDuration
78 r2 <- readTVarIO runningJs
79 sort r2 `shouldBe` ["Job #3", "Job #4"]
80 threadDelay jobDuration
81 r3 <- readTVarIO runningJs
82 r3 `shouldBe` []
83
84 testPrios :: IO ()
85 testPrios = do
86 k <- genSecret
87 -- Use a single runner, so that we can check the order of execution
88 -- without worrying about the runners competing with each other.
89 let settings = defaultJobSettings 1 k
90 prios = [(B, 10), (C, 1), (D, 5)]
91 st :: JobsState JobT [String] () <- newJobsState settings $
92 applyPrios prios defaultPrios -- B has the highest priority
93 pickedSchedule <- newMVar (JobSchedule mempty)
94 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
95 jobs = [ (A, j A)
96 , (C, j C)
97 , (B, j B)
98 , (D, j D)
99 ]
100
101 -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
102 -- the time 'popQueue' gets called.
103 now <- getCurrentTime
104 atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
105
106 -- wait for the jobs to finish, waiting for more than the total duration,
107 -- so that we are sure that all jobs have finished, then check the schedule.
108 threadDelay jobDuration
109 finalSchedule <- readMVar pickedSchedule
110 finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A])
111
112 testExceptions :: IO ()
113 testExceptions = do
114 k <- genSecret
115 let settings = defaultJobSettings 2 k
116 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
117 jid <- pushJob A ()
118 (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
119 settings st
120 threadDelay initialDelay
121 mjob <- lookupJob jid (jobsData st)
122 case mjob of
123 Nothing -> error "boo"
124 Just je -> case jTask je of
125 DoneJ _ r -> isLeft r `shouldBe` True
126 _ -> error "boo2"
127 return ()
128
129 testFairness :: IO ()
130 testFairness = do
131 k <- genSecret
132 let settings = defaultJobSettings 1 k
133 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
134 pickedSchedule <- newMVar (JobSchedule mempty)
135 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
136 jobs = [ (A, j A)
137 , (A, j A)
138 , (B, j B)
139 , (A, j A)
140 , (A, j A)
141 ]
142 time <- getCurrentTime
143 -- in this scenario we simulate two types of jobs all with
144 -- all the same level of priority: our queue implementation
145 -- will behave as a classic FIFO, keeping into account the
146 -- time of arrival.
147 atomically $ forM_ (zip [0,2 ..] jobs) $ \(timeDelta, (t, f)) -> void $
148 pushJobWithTime (addUTCTime (fromInteger timeDelta) time) t () f settings st
149
150 threadDelay jobDuration
151 finalSchedule <- readMVar pickedSchedule
152 finalSchedule `shouldBe` JobSchedule (fromList [A, A, B, A, A])
153
154
155 newtype MyDummyMonad a =
156 MyDummyMonad { _MyDummyMonad :: GargM Env GargError a }
157 deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
158
159 instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
160 getJobEnv = MyDummyMonad getJobEnv
161
162 instance MonadJobStatus MyDummyMonad where
163 type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle GargError
164 type JobType MyDummyMonad = GargJob
165 type JobOutputType MyDummyMonad = JobLog
166 type JobEventType MyDummyMonad = JobLog
167
168 getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
169 withTracer _ jh n = n jh
170 markStarted n jh = MyDummyMonad (markStarted n jh)
171 markProgress steps jh = MyDummyMonad (markProgress steps jh)
172 markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
173 markComplete jh = MyDummyMonad (markComplete jh)
174 markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
175
176 runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
177 runMyDummyMonad env m = do
178 res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
179 case res of
180 Left e -> throwIO e
181 Right x -> pure x
182
183 testTlsManager :: Manager
184 testTlsManager = unsafePerformIO newTlsManager
185 {-# NOINLINE testTlsManager #-}
186
187 withJob :: Env
188 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
189 -> IO (SJ.JobStatus 'SJ.Safe JobLog)
190 withJob env f = runMyDummyMonad env $ MyDummyMonad $
191 -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
192 newJob @_ @GargError mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
193 runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
194
195 withJob_ :: Env
196 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
197 -> IO ()
198 withJob_ env f = void (withJob env f)
199
200 newTestEnv :: IO Env
201 newTestEnv = do
202 k <- genSecret
203 let settings = defaultJobSettings 1 k
204 myEnv <- newJobEnv settings defaultPrios testTlsManager
205 pure $ Env
206 { _env_settings = error "env_settings not needed, but forced somewhere (check StrictData)"
207 , _env_logger = error "env_logger not needed, but forced somewhere (check StrictData)"
208 , _env_pool = error "env_pool not needed, but forced somewhere (check StrictData)"
209 , _env_nodeStory = error "env_nodeStory not needed, but forced somewhere (check StrictData)"
210 , _env_manager = testTlsManager
211 , _env_self_url = error "self_url not needed, but forced somewhere (check StrictData)"
212 , _env_scrapers = error "scrapers not needed, but forced somewhere (check StrictData)"
213 , _env_jobs = myEnv
214 , _env_config = error "config not needed, but forced somewhere (check StrictData)"
215 , _env_mail = error "mail not needed, but forced somewhere (check StrictData)"
216 , _env_nlp = error "nlp not needed, but forced somewhere (check StrictData)"
217 }
218
219 testFetchJobStatus :: IO ()
220 testFetchJobStatus = do
221 myEnv <- newTestEnv
222 evts <- newMVar []
223
224 withJob_ myEnv $ \hdl _input -> do
225 mb_status <- getLatestJobStatus hdl
226
227 -- now let's log something
228 markStarted 10 hdl
229 mb_status' <- getLatestJobStatus hdl
230 markProgress 5 hdl
231 mb_status'' <- getLatestJobStatus hdl
232
233 liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
234 pure ()
235
236 threadDelay 500_000
237 -- Check the events
238 readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
239
240 testFetchJobStatusNoContention :: IO ()
241 testFetchJobStatusNoContention = do
242 myEnv <- newTestEnv
243
244 evts1 <- newMVar []
245 evts2 <- newMVar []
246
247 let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
248 markStarted 100 hdl
249 mb_status <- getLatestJobStatus hdl
250 liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
251 pure ()
252
253 let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
254 markStarted 50 hdl
255 mb_status <- getLatestJobStatus hdl
256 liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
257 pure ()
258
259 Async.forConcurrently_ [job1, job2] ($ ())
260 threadDelay 500_000
261 -- Check the events
262 readMVar evts1 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 100]
263 readMVar evts2 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 50]
264
265 testMarkProgress :: IO ()
266 testMarkProgress = do
267 myEnv <- newTestEnv
268 evts <- newTBQueueIO 7
269 let getStatus hdl = do
270 liftIO $ threadDelay 100_000
271 st <- getLatestJobStatus hdl
272 liftIO $ atomically $ writeTBQueue evts st
273 readAllEvents = do
274 allEventsArrived <- isFullTBQueue evts
275 if allEventsArrived then flushTBQueue evts else retry
276
277 withJob_ myEnv $ \hdl _input -> do
278 markStarted 10 hdl
279 getStatus hdl
280
281 markProgress 1 hdl
282 getStatus hdl
283
284 markFailure 1 Nothing hdl
285 getStatus hdl
286
287 markFailure 1 (Just "boom") hdl
288
289 getStatus hdl
290 markComplete hdl
291
292 getStatus hdl
293 markStarted 5 hdl
294 markProgress 1 hdl
295
296 getStatus hdl
297 markFailed (Just "kaboom") hdl
298
299 getStatus hdl
300
301 [jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- atomically readAllEvents
302
303 -- Check the events are what we expect
304 jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
305 , _scst_failed = Just 0
306 , _scst_remaining = Just 10
307 , _scst_events = Just []
308 }
309 jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
310 , _scst_failed = Just 0
311 , _scst_remaining = Just 9
312 , _scst_events = Just []
313 }
314 jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
315 , _scst_failed = Just 1
316 , _scst_remaining = Just 8
317 , _scst_events = Just []
318 }
319 jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
320 , _scst_failed = Just 2
321 , _scst_remaining = Just 7
322 , _scst_events = Just [
323 ScraperEvent { _scev_message = Just "boom"
324 , _scev_level = Just "ERROR"
325 , _scev_date = Nothing }
326 ]
327 }
328 jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
329 , _scst_failed = Just 2
330 , _scst_remaining = Just 0
331 , _scst_events = Just [
332 ScraperEvent { _scev_message = Just "boom"
333 , _scev_level = Just "ERROR"
334 , _scev_date = Nothing }
335 ]
336 }
337 jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
338 , _scst_failed = Just 0
339 , _scst_remaining = Just 4
340 , _scst_events = Just []
341 }
342 jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
343 , _scst_failed = Just 4
344 , _scst_remaining = Just 0
345 , _scst_events = Just [
346 ScraperEvent { _scev_message = Just "kaboom"
347 , _scev_level = Just "ERROR"
348 , _scev_date = Nothing }
349 ]
350 }
351
352 test :: Spec
353 test = do
354 describe "job queue" $ do
355 it "respects max runners limit" $
356 testMaxRunners
357 it "respects priorities" $
358 testPrios
359 it "can handle exceptions" $
360 testExceptions
361 it "fairly picks equal-priority-but-different-kind jobs" $
362 testFairness
363 describe "job status update and tracking" $ do
364 it "can fetch the latest job status" $
365 testFetchJobStatus
366 it "can spin two separate jobs and track their status separately" $
367 testFetchJobStatusNoContention
368 it "marking stuff behaves as expected" $
369 testMarkProgress