]> Git — Sourcephile - gargantext.git/blob - tests/queue/Main.hs
Merge remote-tracking branch 'origin/adinapoli/issue-198' into dev
[gargantext.git] / tests / queue / Main.hs
1 {-# LANGUAGE DeriveGeneric #-}
2 {-# LANGUAGE ScopedTypeVariables #-}
3 {-# LANGUAGE TypeApplications #-}
4 {-# LANGUAGE TypeFamilies #-}
5 {-# LANGUAGE NumericUnderscores #-}
6 module Main where
7
8 import Control.Concurrent
9 import qualified Control.Concurrent.Async as Async
10 import Control.Concurrent.STM
11 import Control.Exception
12 import Control.Monad
13 import Control.Monad.Reader
14 import Control.Monad.Except
15 import Data.Maybe
16 import Data.Either
17 import Data.List
18 import Data.Sequence (Seq, (|>), fromList)
19 import Data.Time
20 import GHC.Stack
21 import Prelude
22 import System.IO.Unsafe
23 import Network.HTTP.Client.TLS (newTlsManager)
24 import Network.HTTP.Client (Manager)
25 import Test.Hspec
26 import qualified Servant.Job.Types as SJ
27 import qualified Servant.Job.Core as SJ
28
29 import Gargantext.Utils.Jobs.Internal (newJob)
30 import Gargantext.Utils.Jobs.Map
31 import Gargantext.Utils.Jobs.Monad hiding (withJob)
32 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
33 import Gargantext.Utils.Jobs.State
34 import Gargantext.API.Prelude
35 import Gargantext.API.Admin.EnvTypes as EnvTypes
36 import Gargantext.API.Admin.Orchestrator.Types
37
38
39 data JobT = A
40 | B
41 | C
42 | D
43 deriving (Eq, Ord, Show, Enum, Bounded)
44
45 -- | This type models the schedule picked up by the orchestrator.
46 newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show)
47
48 addJobToSchedule :: JobT -> MVar JobSchedule -> IO ()
49 addJobToSchedule jobt mvar = do
50 modifyMVar_ mvar $ \js -> do
51 let js' = js { _JobSchedule = _JobSchedule js |> jobt }
52 pure js'
53
54 data Counts = Counts { countAs :: Int, countBs :: Int }
55 deriving (Eq, Show)
56
57 inc, dec :: JobT -> Counts -> Counts
58 inc A cs = cs { countAs = countAs cs + 1 }
59 inc B cs = cs { countBs = countBs cs + 1 }
60 inc C cs = cs
61 inc D cs = cs
62 dec A cs = cs { countAs = countAs cs - 1 }
63 dec B cs = cs { countBs = countBs cs - 1 }
64 dec C cs = cs
65 dec D cs = cs
66
67 jobDuration, initialDelay :: Int
68 jobDuration = 100000
69 initialDelay = 20000
70
71 testMaxRunners :: IO ()
72 testMaxRunners = do
73 -- max runners = 2 with default settings
74 k <- genSecret
75 let settings = defaultJobSettings 2 k
76 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
77 runningJs <- newTVarIO []
78 let j num _jHandle _inp _l = do
79 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
80 threadDelay jobDuration
81 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
82 jobs = [ j n | n <- [1..4::Int] ]
83 _jids <- forM jobs $ \f -> pushJob A () f settings st
84 threadDelay initialDelay
85 r1 <- readTVarIO runningJs
86 sort r1 `shouldBe` ["Job #1", "Job #2"]
87 threadDelay jobDuration
88 r2 <- readTVarIO runningJs
89 sort r2 `shouldBe` ["Job #3", "Job #4"]
90 threadDelay jobDuration
91 r3 <- readTVarIO runningJs
92 r3 `shouldBe` []
93
94 testPrios :: IO ()
95 testPrios = do
96 k <- genSecret
97 -- Use a single runner, so that we can check the order of execution
98 -- without worrying about the runners competing with each other.
99 let settings = defaultJobSettings 1 k
100 prios = [(B, 10), (C, 1), (D, 5)]
101 st :: JobsState JobT [String] () <- newJobsState settings $
102 applyPrios prios defaultPrios -- B has the highest priority
103 pickedSchedule <- newMVar (JobSchedule mempty)
104 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
105 jobs = [ (A, j A)
106 , (C, j C)
107 , (B, j B)
108 , (D, j D)
109 ]
110
111 -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
112 -- the time 'popQueue' gets called.
113 now <- getCurrentTime
114 atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
115
116 -- wait for the jobs to finish, waiting for more than the total duration,
117 -- so that we are sure that all jobs have finished, then check the schedule.
118 threadDelay jobDuration
119 finalSchedule <- readMVar pickedSchedule
120 finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A])
121
122 testExceptions :: IO ()
123 testExceptions = do
124 k <- genSecret
125 let settings = defaultJobSettings 2 k
126 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
127 jid <- pushJob A ()
128 (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
129 settings st
130 threadDelay initialDelay
131 mjob <- lookupJob jid (jobsData st)
132 case mjob of
133 Nothing -> error "boo"
134 Just je -> case jTask je of
135 DoneJ _ r -> isLeft r `shouldBe` True
136 _ -> error "boo2"
137 return ()
138
139 testFairness :: IO ()
140 testFairness = do
141 k <- genSecret
142 let settings = defaultJobSettings 1 k
143 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
144 pickedSchedule <- newMVar (JobSchedule mempty)
145 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
146 jobs = [ (A, j A)
147 , (A, j A)
148 , (B, j B)
149 , (A, j A)
150 , (A, j A)
151 ]
152 time <- getCurrentTime
153 -- in this scenario we simulate two types of jobs all with
154 -- all the same level of priority: our queue implementation
155 -- will behave as a classic FIFO, keeping into account the
156 -- time of arrival.
157 atomically $ forM_ (zip [0,2 ..] jobs) $ \(timeDelta, (t, f)) -> void $
158 pushJobWithTime (addUTCTime (fromInteger timeDelta) time) t () f settings st
159
160 threadDelay jobDuration
161 finalSchedule <- readMVar pickedSchedule
162 finalSchedule `shouldBe` JobSchedule (fromList [A, A, B, A, A])
163
164
165 newtype MyDummyMonad a =
166 MyDummyMonad { _MyDummyMonad :: GargM Env GargError a }
167 deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
168
169 instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
170 getJobEnv = MyDummyMonad getJobEnv
171
172 instance MonadJobStatus MyDummyMonad where
173 type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle GargError
174 type JobType MyDummyMonad = GargJob
175 type JobOutputType MyDummyMonad = JobLog
176 type JobEventType MyDummyMonad = JobLog
177
178 getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
179 withTracer _ jh n = n jh
180 markStarted n jh = MyDummyMonad (markStarted n jh)
181 markProgress steps jh = MyDummyMonad (markProgress steps jh)
182 markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
183 markComplete jh = MyDummyMonad (markComplete jh)
184 markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
185
186 runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
187 runMyDummyMonad env m = do
188 res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
189 case res of
190 Left e -> throwIO e
191 Right x -> pure x
192
193 testTlsManager :: Manager
194 testTlsManager = unsafePerformIO newTlsManager
195 {-# NOINLINE testTlsManager #-}
196
197 shouldBeE :: (MonadIO m, HasCallStack, Show a, Eq a) => a -> a -> m ()
198 shouldBeE a b = liftIO (shouldBe a b)
199
200 withJob :: Env
201 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
202 -> IO (SJ.JobStatus 'SJ.Safe JobLog)
203 withJob env f = runMyDummyMonad env $ MyDummyMonad $
204 -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
205 newJob @_ @GargError mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
206 runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
207
208 withJob_ :: Env
209 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
210 -> IO ()
211 withJob_ env f = void (withJob env f)
212
213 newTestEnv :: IO Env
214 newTestEnv = do
215 k <- genSecret
216 let settings = defaultJobSettings 1 k
217 myEnv <- newJobEnv settings defaultPrios testTlsManager
218 pure $ Env
219 { _env_settings = error "env_settings not needed, but forced somewhere (check StrictData)"
220 , _env_logger = error "env_logger not needed, but forced somewhere (check StrictData)"
221 , _env_pool = error "env_pool not needed, but forced somewhere (check StrictData)"
222 , _env_nodeStory = error "env_nodeStory not needed, but forced somewhere (check StrictData)"
223 , _env_manager = testTlsManager
224 , _env_self_url = error "self_url not needed, but forced somewhere (check StrictData)"
225 , _env_scrapers = error "scrapers not needed, but forced somewhere (check StrictData)"
226 , _env_jobs = myEnv
227 , _env_config = error "config not needed, but forced somewhere (check StrictData)"
228 , _env_mail = error "mail not needed, but forced somewhere (check StrictData)"
229 , _env_nlp = error "nlp not needed, but forced somewhere (check StrictData)"
230 }
231
232 testFetchJobStatus :: IO ()
233 testFetchJobStatus = do
234 myEnv <- newTestEnv
235 evts <- newMVar []
236
237 withJob_ myEnv $ \hdl _input -> do
238 mb_status <- getLatestJobStatus hdl
239
240 -- now let's log something
241 markStarted 10 hdl
242 mb_status' <- getLatestJobStatus hdl
243 markProgress 5 hdl
244 mb_status'' <- getLatestJobStatus hdl
245
246 liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
247 pure ()
248
249 threadDelay 500_000
250 -- Check the events
251 readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
252
253 testFetchJobStatusNoContention :: IO ()
254 testFetchJobStatusNoContention = do
255 myEnv <- newTestEnv
256
257 evts1 <- newMVar []
258 evts2 <- newMVar []
259
260 let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
261 markStarted 100 hdl
262 mb_status <- getLatestJobStatus hdl
263 liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
264 pure ()
265
266 let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
267 markStarted 50 hdl
268 mb_status <- getLatestJobStatus hdl
269 liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
270 pure ()
271
272 Async.forConcurrently_ [job1, job2] ($ ())
273 threadDelay 500_000
274 -- Check the events
275 readMVar evts1 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 100]
276 readMVar evts2 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 50]
277
278 testMarkProgress :: IO ()
279 testMarkProgress = do
280 myEnv <- newTestEnv
281 evts <- newTBQueueIO 7
282 let getStatus hdl = do
283 liftIO $ threadDelay 100_000
284 st <- getLatestJobStatus hdl
285 liftIO $ atomically $ writeTBQueue evts st
286 readAllEvents = do
287 allEventsArrived <- isFullTBQueue evts
288 if allEventsArrived then flushTBQueue evts else retry
289
290 withJob_ myEnv $ \hdl _input -> do
291 markStarted 10 hdl
292 getStatus hdl
293
294 markProgress 1 hdl
295 getStatus hdl
296
297 markFailure 1 Nothing hdl
298 getStatus hdl
299
300 markFailure 1 (Just "boom") hdl
301
302 getStatus hdl
303 markComplete hdl
304
305 getStatus hdl
306 markStarted 5 hdl
307 markProgress 1 hdl
308
309 getStatus hdl
310 markFailed (Just "kaboom") hdl
311
312 getStatus hdl
313
314 [jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- atomically readAllEvents
315
316 -- Check the events are what we expect
317 jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
318 , _scst_failed = Just 0
319 , _scst_remaining = Just 10
320 , _scst_events = Just []
321 }
322 jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
323 , _scst_failed = Just 0
324 , _scst_remaining = Just 9
325 , _scst_events = Just []
326 }
327 jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
328 , _scst_failed = Just 1
329 , _scst_remaining = Just 8
330 , _scst_events = Just []
331 }
332 jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
333 , _scst_failed = Just 2
334 , _scst_remaining = Just 7
335 , _scst_events = Just [
336 ScraperEvent { _scev_message = Just "boom"
337 , _scev_level = Just "ERROR"
338 , _scev_date = Nothing }
339 ]
340 }
341 jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
342 , _scst_failed = Just 2
343 , _scst_remaining = Just 0
344 , _scst_events = Just [
345 ScraperEvent { _scev_message = Just "boom"
346 , _scev_level = Just "ERROR"
347 , _scev_date = Nothing }
348 ]
349 }
350 jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
351 , _scst_failed = Just 0
352 , _scst_remaining = Just 4
353 , _scst_events = Just []
354 }
355 jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
356 , _scst_failed = Just 4
357 , _scst_remaining = Just 0
358 , _scst_events = Just [
359 ScraperEvent { _scev_message = Just "kaboom"
360 , _scev_level = Just "ERROR"
361 , _scev_date = Nothing }
362 ]
363 }
364
365 main :: IO ()
366 main = hspec $ do
367 describe "job queue" $ do
368 it "respects max runners limit" $
369 testMaxRunners
370 it "respects priorities" $
371 testPrios
372 it "can handle exceptions" $
373 testExceptions
374 it "fairly picks equal-priority-but-different-kind jobs" $
375 testFairness
376 describe "job status update and tracking" $ do
377 it "can fetch the latest job status" $
378 testFetchJobStatus
379 it "can spin two separate jobs and track their status separately" $
380 testFetchJobStatusNoContention
381 it "marking stuff behaves as expected" $
382 testMarkProgress