]> Git — Sourcephile - gargantext.git/blob - src-test/Utils/Jobs.hs
Merge remote-tracking branch 'origin/506-dev-tree-search-fix' into dev
[gargantext.git] / src-test / Utils / Jobs.hs
1 {-# LANGUAGE DeriveGeneric #-}
2 {-# LANGUAGE ScopedTypeVariables #-}
3 {-# LANGUAGE TypeApplications #-}
4 {-# LANGUAGE TypeFamilies #-}
5 {-# LANGUAGE NumericUnderscores #-}
6 module Utils.Jobs (test) where
7
8 import Control.Concurrent
9 import qualified Control.Concurrent.Async as Async
10 import Control.Concurrent.STM
11 import Control.Exception
12 import Control.Monad
13 import Control.Monad.Reader
14 import Control.Monad.Except
15 import Data.Maybe
16 import Data.Either
17 import Data.List
18 import Data.Sequence (Seq, (|>), fromList)
19 import Data.Time
20 import Prelude
21 import System.IO.Unsafe
22 import Network.HTTP.Client.TLS (newTlsManager)
23 import Network.HTTP.Client (Manager)
24 import Test.Hspec
25 import qualified Servant.Job.Types as SJ
26 import qualified Servant.Job.Core as SJ
27
28 import Gargantext.Utils.Jobs.Internal (newJob)
29 import Gargantext.Utils.Jobs.Map
30 import Gargantext.Utils.Jobs.Monad hiding (withJob)
31 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
32 import Gargantext.Utils.Jobs.State
33 import Gargantext.API.Prelude
34 import Gargantext.API.Admin.EnvTypes as EnvTypes
35 import Gargantext.API.Admin.Orchestrator.Types
36
37
38 data JobT = A
39 | B
40 | C
41 | D
42 deriving (Eq, Ord, Show, Enum, Bounded)
43
44 -- | This type models the schedule picked up by the orchestrator.
45 newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show)
46
47 addJobToSchedule :: JobT -> MVar JobSchedule -> IO ()
48 addJobToSchedule jobt mvar = do
49 modifyMVar_ mvar $ \js -> do
50 let js' = js { _JobSchedule = _JobSchedule js |> jobt }
51 pure js'
52
53 data Counts = Counts { countAs :: Int, countBs :: Int }
54 deriving (Eq, Show)
55
56 jobDuration, initialDelay :: Int
57 jobDuration = 100000
58 initialDelay = 20000
59
60 testMaxRunners :: IO ()
61 testMaxRunners = do
62 -- max runners = 2 with default settings
63 k <- genSecret
64 let settings = defaultJobSettings 2 k
65 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
66 runningJs <- newTVarIO []
67 let j num _jHandle _inp _l = do
68 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
69 threadDelay jobDuration
70 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
71 jobs = [ j n | n <- [1..4::Int] ]
72 _jids <- forM jobs $ \f -> pushJob A () f settings st
73 threadDelay initialDelay
74 r1 <- readTVarIO runningJs
75 sort r1 `shouldBe` ["Job #1", "Job #2"]
76 threadDelay jobDuration
77 r2 <- readTVarIO runningJs
78 sort r2 `shouldBe` ["Job #3", "Job #4"]
79 threadDelay jobDuration
80 r3 <- readTVarIO runningJs
81 r3 `shouldBe` []
82
83 testPrios :: IO ()
84 testPrios = do
85 k <- genSecret
86 -- Use a single runner, so that we can check the order of execution
87 -- without worrying about the runners competing with each other.
88 let settings = defaultJobSettings 1 k
89 prios = [(B, 10), (C, 1), (D, 5)]
90 st :: JobsState JobT [String] () <- newJobsState settings $
91 applyPrios prios defaultPrios -- B has the highest priority
92 pickedSchedule <- newMVar (JobSchedule mempty)
93 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
94 jobs = [ (A, j A)
95 , (C, j C)
96 , (B, j B)
97 , (D, j D)
98 ]
99
100 -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
101 -- the time 'popQueue' gets called.
102 now <- getCurrentTime
103 atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
104
105 -- wait for the jobs to finish, waiting for more than the total duration,
106 -- so that we are sure that all jobs have finished, then check the schedule.
107 threadDelay jobDuration
108 finalSchedule <- readMVar pickedSchedule
109 finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A])
110
111 testExceptions :: IO ()
112 testExceptions = do
113 k <- genSecret
114 let settings = defaultJobSettings 2 k
115 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
116 jid <- pushJob A ()
117 (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
118 settings st
119 threadDelay initialDelay
120 mjob <- lookupJob jid (jobsData st)
121 case mjob of
122 Nothing -> error "boo"
123 Just je -> case jTask je of
124 DoneJ _ r -> isLeft r `shouldBe` True
125 _ -> error "boo2"
126 return ()
127
128 testFairness :: IO ()
129 testFairness = do
130 k <- genSecret
131 let settings = defaultJobSettings 1 k
132 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
133 pickedSchedule <- newMVar (JobSchedule mempty)
134 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
135 jobs = [ (A, j A)
136 , (A, j A)
137 , (B, j B)
138 , (A, j A)
139 , (A, j A)
140 ]
141 time <- getCurrentTime
142 -- in this scenario we simulate two types of jobs all with
143 -- all the same level of priority: our queue implementation
144 -- will behave as a classic FIFO, keeping into account the
145 -- time of arrival.
146 atomically $ forM_ (zip [0,2 ..] jobs) $ \(timeDelta, (t, f)) -> void $
147 pushJobWithTime (addUTCTime (fromInteger timeDelta) time) t () f settings st
148
149 threadDelay jobDuration
150 finalSchedule <- readMVar pickedSchedule
151 finalSchedule `shouldBe` JobSchedule (fromList [A, A, B, A, A])
152
153
154 newtype MyDummyMonad a =
155 MyDummyMonad { _MyDummyMonad :: GargM Env GargError a }
156 deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
157
158 instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
159 getJobEnv = MyDummyMonad getJobEnv
160
161 instance MonadJobStatus MyDummyMonad where
162 type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle GargError
163 type JobType MyDummyMonad = GargJob
164 type JobOutputType MyDummyMonad = JobLog
165 type JobEventType MyDummyMonad = JobLog
166
167 getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
168 withTracer _ jh n = n jh
169 markStarted n jh = MyDummyMonad (markStarted n jh)
170 markProgress steps jh = MyDummyMonad (markProgress steps jh)
171 markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
172 markComplete jh = MyDummyMonad (markComplete jh)
173 markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
174
175 runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
176 runMyDummyMonad env m = do
177 res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
178 case res of
179 Left e -> throwIO e
180 Right x -> pure x
181
182 testTlsManager :: Manager
183 testTlsManager = unsafePerformIO newTlsManager
184 {-# NOINLINE testTlsManager #-}
185
186 withJob :: Env
187 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
188 -> IO (SJ.JobStatus 'SJ.Safe JobLog)
189 withJob env f = runMyDummyMonad env $ MyDummyMonad $
190 -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
191 newJob @_ @GargError mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
192 runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
193
194 withJob_ :: Env
195 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
196 -> IO ()
197 withJob_ env f = void (withJob env f)
198
199 newTestEnv :: IO Env
200 newTestEnv = do
201 k <- genSecret
202 let settings = defaultJobSettings 1 k
203 myEnv <- newJobEnv settings defaultPrios testTlsManager
204 pure $ Env
205 { _env_settings = error "env_settings not needed, but forced somewhere (check StrictData)"
206 , _env_logger = error "env_logger not needed, but forced somewhere (check StrictData)"
207 , _env_pool = error "env_pool not needed, but forced somewhere (check StrictData)"
208 , _env_nodeStory = error "env_nodeStory not needed, but forced somewhere (check StrictData)"
209 , _env_manager = testTlsManager
210 , _env_self_url = error "self_url not needed, but forced somewhere (check StrictData)"
211 , _env_scrapers = error "scrapers not needed, but forced somewhere (check StrictData)"
212 , _env_jobs = myEnv
213 , _env_config = error "config not needed, but forced somewhere (check StrictData)"
214 , _env_mail = error "mail not needed, but forced somewhere (check StrictData)"
215 , _env_nlp = error "nlp not needed, but forced somewhere (check StrictData)"
216 }
217
218 testFetchJobStatus :: IO ()
219 testFetchJobStatus = do
220 myEnv <- newTestEnv
221 evts <- newMVar []
222
223 withJob_ myEnv $ \hdl _input -> do
224 mb_status <- getLatestJobStatus hdl
225
226 -- now let's log something
227 markStarted 10 hdl
228 mb_status' <- getLatestJobStatus hdl
229 markProgress 5 hdl
230 mb_status'' <- getLatestJobStatus hdl
231
232 liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
233 pure ()
234
235 threadDelay 500_000
236 -- Check the events
237 readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
238
239 testFetchJobStatusNoContention :: IO ()
240 testFetchJobStatusNoContention = do
241 myEnv <- newTestEnv
242
243 evts1 <- newMVar []
244 evts2 <- newMVar []
245
246 let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
247 markStarted 100 hdl
248 mb_status <- getLatestJobStatus hdl
249 liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
250 pure ()
251
252 let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
253 markStarted 50 hdl
254 mb_status <- getLatestJobStatus hdl
255 liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
256 pure ()
257
258 Async.forConcurrently_ [job1, job2] ($ ())
259 threadDelay 500_000
260 -- Check the events
261 readMVar evts1 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 100]
262 readMVar evts2 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 50]
263
264 testMarkProgress :: IO ()
265 testMarkProgress = do
266 myEnv <- newTestEnv
267 evts <- newTBQueueIO 7
268 let getStatus hdl = do
269 liftIO $ threadDelay 100_000
270 st <- getLatestJobStatus hdl
271 liftIO $ atomically $ writeTBQueue evts st
272 readAllEvents = do
273 allEventsArrived <- isFullTBQueue evts
274 if allEventsArrived then flushTBQueue evts else retry
275
276 withJob_ myEnv $ \hdl _input -> do
277 markStarted 10 hdl
278 getStatus hdl
279
280 markProgress 1 hdl
281 getStatus hdl
282
283 markFailure 1 Nothing hdl
284 getStatus hdl
285
286 markFailure 1 (Just "boom") hdl
287
288 getStatus hdl
289 markComplete hdl
290
291 getStatus hdl
292 markStarted 5 hdl
293 markProgress 1 hdl
294
295 getStatus hdl
296 markFailed (Just "kaboom") hdl
297
298 getStatus hdl
299
300 [jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- atomically readAllEvents
301
302 -- Check the events are what we expect
303 jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
304 , _scst_failed = Just 0
305 , _scst_remaining = Just 10
306 , _scst_events = Just []
307 }
308 jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
309 , _scst_failed = Just 0
310 , _scst_remaining = Just 9
311 , _scst_events = Just []
312 }
313 jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
314 , _scst_failed = Just 1
315 , _scst_remaining = Just 8
316 , _scst_events = Just []
317 }
318 jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
319 , _scst_failed = Just 2
320 , _scst_remaining = Just 7
321 , _scst_events = Just [
322 ScraperEvent { _scev_message = Just "boom"
323 , _scev_level = Just "ERROR"
324 , _scev_date = Nothing }
325 ]
326 }
327 jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
328 , _scst_failed = Just 2
329 , _scst_remaining = Just 0
330 , _scst_events = Just [
331 ScraperEvent { _scev_message = Just "boom"
332 , _scev_level = Just "ERROR"
333 , _scev_date = Nothing }
334 ]
335 }
336 jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
337 , _scst_failed = Just 0
338 , _scst_remaining = Just 4
339 , _scst_events = Just []
340 }
341 jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
342 , _scst_failed = Just 4
343 , _scst_remaining = Just 0
344 , _scst_events = Just [
345 ScraperEvent { _scev_message = Just "kaboom"
346 , _scev_level = Just "ERROR"
347 , _scev_date = Nothing }
348 ]
349 }
350
351 test :: Spec
352 test = do
353 describe "job queue" $ do
354 it "respects max runners limit" $
355 testMaxRunners
356 it "respects priorities" $
357 testPrios
358 it "can handle exceptions" $
359 testExceptions
360 it "fairly picks equal-priority-but-different-kind jobs" $
361 testFairness
362 describe "job status update and tracking" $ do
363 it "can fetch the latest job status" $
364 testFetchJobStatus
365 it "can spin two separate jobs and track their status separately" $
366 testFetchJobStatusNoContention
367 it "marking stuff behaves as expected" $
368 testMarkProgress