]> Git — Sourcephile - gargantext.git/blob - tests/queue/Main.hs
Merge remote-tracking branch 'origin/adinapoli/issue-198' into dev-merge
[gargantext.git] / tests / queue / Main.hs
1 {-# LANGUAGE DeriveGeneric #-}
2 {-# LANGUAGE ScopedTypeVariables #-}
3 {-# LANGUAGE TypeApplications #-}
4 {-# LANGUAGE TypeFamilies #-}
5 {-# LANGUAGE NumericUnderscores #-}
6 module Main where
7
8 import Control.Concurrent
9 import qualified Control.Concurrent.Async as Async
10 import Control.Concurrent.STM
11 import Control.Exception
12 import Control.Monad
13 import Control.Monad.Reader
14 import Control.Monad.Except
15 import Data.Maybe
16 import Data.Either
17 import Data.List
18 import Data.Sequence (Seq, (|>), fromList)
19 import GHC.Stack
20 import Prelude
21 import System.IO.Unsafe
22 import Network.HTTP.Client.TLS (newTlsManager)
23 import Network.HTTP.Client (Manager)
24 import Test.Hspec
25 import qualified Servant.Job.Types as SJ
26 import qualified Servant.Job.Core as SJ
27
28 import Gargantext.Utils.Jobs.Internal (newJob)
29 import Gargantext.Utils.Jobs.Map
30 import Gargantext.Utils.Jobs.Monad hiding (withJob)
31 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
32 import Gargantext.Utils.Jobs.State
33 import Gargantext.API.Prelude
34 import Gargantext.API.Admin.EnvTypes as EnvTypes
35 import Gargantext.API.Admin.Orchestrator.Types
36
37 data JobT = A
38 | B
39 | C
40 | D
41 deriving (Eq, Ord, Show, Enum, Bounded)
42
43 -- | This type models the schedule picked up by the orchestrator.
44 newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show)
45
46 addJobToSchedule :: JobT -> MVar JobSchedule -> IO ()
47 addJobToSchedule jobt mvar = do
48 modifyMVar_ mvar $ \js -> do
49 let js' = js { _JobSchedule = _JobSchedule js |> jobt }
50 pure js'
51
52 data Counts = Counts { countAs :: Int, countBs :: Int }
53 deriving (Eq, Show)
54
55 inc, dec :: JobT -> Counts -> Counts
56 inc A cs = cs { countAs = countAs cs + 1 }
57 inc B cs = cs { countBs = countBs cs + 1 }
58 inc C cs = cs
59 inc D cs = cs
60 dec A cs = cs { countAs = countAs cs - 1 }
61 dec B cs = cs { countBs = countBs cs - 1 }
62 dec C cs = cs
63 dec D cs = cs
64
65 jobDuration, initialDelay :: Int
66 jobDuration = 100000
67 initialDelay = 20000
68
69 testMaxRunners :: IO ()
70 testMaxRunners = do
71 -- max runners = 2 with default settings
72 k <- genSecret
73 let settings = defaultJobSettings 2 k
74 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
75 runningJs <- newTVarIO []
76 let j num _jHandle _inp _l = do
77 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
78 threadDelay jobDuration
79 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
80 jobs = [ j n | n <- [1..4::Int] ]
81 _jids <- forM jobs $ \f -> pushJob A () f settings st
82 threadDelay initialDelay
83 r1 <- readTVarIO runningJs
84 sort r1 `shouldBe` ["Job #1", "Job #2"]
85 threadDelay jobDuration
86 r2 <- readTVarIO runningJs
87 sort r2 `shouldBe` ["Job #3", "Job #4"]
88 threadDelay jobDuration
89 r3 <- readTVarIO runningJs
90 r3 `shouldBe` []
91
92 testPrios :: IO ()
93 testPrios = do
94 k <- genSecret
95 let settings = defaultJobSettings 2 k
96 prios = [(B, 10), (C, 1), (D, 5)]
97 runningDelta job = fromMaybe 0 (lookup job prios) * 1000
98 st :: JobsState JobT [String] () <- newJobsState settings $
99 applyPrios prios defaultPrios -- B has the highest priority
100 pickedSchedule <- newMVar (JobSchedule mempty)
101 let j jobt _jHandle _inp _l = do
102 -- simulate the running time of a job, then add to the schedule.
103 -- The running time is proportional to the priority of the job,
104 -- to account for the fact that we are pushing jobs sequentially,
105 -- so we have to our account for the submission time.
106 threadDelay $ jobDuration - runningDelta jobt
107 addJobToSchedule jobt pickedSchedule
108 jobs = [ (A, j A)
109 , (C, j C)
110 , (B, j B)
111 , (D, j D)
112 ]
113 forM_ jobs $ \(t, f) -> void $ pushJob t () f settings st
114 -- wait for the jobs to finish, waiting for more than the total duration,
115 -- so that we are sure that all jobs have finished, then check the schedule.
116 threadDelay (5 * jobDuration)
117 finalSchedule <- readMVar pickedSchedule
118 finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A])
119
120 testExceptions :: IO ()
121 testExceptions = do
122 k <- genSecret
123 let settings = defaultJobSettings 2 k
124 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
125 jid <- pushJob A ()
126 (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
127 settings st
128 threadDelay initialDelay
129 mjob <- lookupJob jid (jobsData st)
130 case mjob of
131 Nothing -> error "boo"
132 Just je -> case jTask je of
133 DoneJ _ r -> isLeft r `shouldBe` True
134 _ -> error "boo2"
135 return ()
136
137 testFairness :: IO ()
138 testFairness = do
139 k <- genSecret
140 let settings = defaultJobSettings 2 k
141 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
142 runningJs <- newTVarIO (Counts 0 0)
143 let j jobt _jHandle _inp _l = do
144 atomically $ modifyTVar runningJs (inc jobt)
145 threadDelay jobDuration
146 atomically $ modifyTVar runningJs (dec jobt)
147 jobs = [ (A, j A)
148 , (A, j A)
149 , (B, j B)
150 , (A, j A)
151 , (A, j A)
152 ]
153 _jids <- forM jobs $ \(t, f) -> do
154 pushJob t () f settings st
155 threadDelay initialDelay
156 r1 <- readTVarIO runningJs
157 r1 `shouldBe` (Counts 2 0)
158 threadDelay jobDuration
159 r2 <- readTVarIO runningJs
160 r2 `shouldBe` (Counts 1 1) -- MOST IMPORTANT CHECK: the B got picked after the
161 -- two As, because it was inserted right after them
162 -- and has equal priority.
163 threadDelay jobDuration
164 r3 <- readTVarIO runningJs
165 r3 `shouldBe` (Counts 1 0)
166 threadDelay jobDuration
167 r4 <- readTVarIO runningJs
168 r4 `shouldBe` (Counts 0 0)
169
170 newtype MyDummyMonad a =
171 MyDummyMonad { _MyDummyMonad :: GargM Env GargError a }
172 deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
173
174 instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
175 getJobEnv = MyDummyMonad getJobEnv
176
177 instance MonadJobStatus MyDummyMonad where
178 type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle GargError
179 type JobType MyDummyMonad = GargJob
180 type JobOutputType MyDummyMonad = JobLog
181 type JobEventType MyDummyMonad = JobLog
182
183 getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
184 withTracer _ jh n = n jh
185 markStarted n jh = MyDummyMonad (markStarted n jh)
186 markProgress steps jh = MyDummyMonad (markProgress steps jh)
187 markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
188 markComplete jh = MyDummyMonad (markComplete jh)
189 markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
190
191 runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
192 runMyDummyMonad env m = do
193 res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
194 case res of
195 Left e -> throwIO e
196 Right x -> pure x
197
198 testTlsManager :: Manager
199 testTlsManager = unsafePerformIO newTlsManager
200 {-# NOINLINE testTlsManager #-}
201
202 shouldBeE :: (MonadIO m, HasCallStack, Show a, Eq a) => a -> a -> m ()
203 shouldBeE a b = liftIO (shouldBe a b)
204
205 withJob :: Env
206 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
207 -> IO (SJ.JobStatus 'SJ.Safe JobLog)
208 withJob env f = runMyDummyMonad env $ MyDummyMonad $
209 -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
210 newJob @_ @GargError mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
211 runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
212
213 withJob_ :: Env
214 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
215 -> IO ()
216 withJob_ env f = void (withJob env f)
217
218 newTestEnv :: IO Env
219 newTestEnv = do
220 k <- genSecret
221 let settings = defaultJobSettings 2 k
222 myEnv <- newJobEnv settings defaultPrios testTlsManager
223 pure $ Env
224 { _env_settings = error "env_settings not needed, but forced somewhere (check StrictData)"
225 , _env_logger = error "env_logger not needed, but forced somewhere (check StrictData)"
226 , _env_pool = error "env_pool not needed, but forced somewhere (check StrictData)"
227 , _env_nodeStory = error "env_nodeStory not needed, but forced somewhere (check StrictData)"
228 , _env_manager = testTlsManager
229 , _env_self_url = error "self_url not needed, but forced somewhere (check StrictData)"
230 , _env_scrapers = error "scrapers not needed, but forced somewhere (check StrictData)"
231 , _env_jobs = myEnv
232 , _env_config = error "config not needed, but forced somewhere (check StrictData)"
233 , _env_mail = error "mail not needed, but forced somewhere (check StrictData)"
234 , _env_nlp = error "nlp not needed, but forced somewhere (check StrictData)"
235 }
236
237 testFetchJobStatus :: IO ()
238 testFetchJobStatus = do
239 myEnv <- newTestEnv
240 evts <- newMVar []
241
242 withJob_ myEnv $ \hdl _input -> do
243 mb_status <- getLatestJobStatus hdl
244
245 -- now let's log something
246 markStarted 10 hdl
247 mb_status' <- getLatestJobStatus hdl
248 markProgress 5 hdl
249 mb_status'' <- getLatestJobStatus hdl
250
251 liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
252 pure ()
253
254 threadDelay 500_000
255 -- Check the events
256 readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
257
258 testFetchJobStatusNoContention :: IO ()
259 testFetchJobStatusNoContention = do
260 myEnv <- newTestEnv
261
262 evts1 <- newMVar []
263 evts2 <- newMVar []
264
265 let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
266 markStarted 100 hdl
267 mb_status <- getLatestJobStatus hdl
268 liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
269 pure ()
270
271 let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
272 markStarted 50 hdl
273 mb_status <- getLatestJobStatus hdl
274 liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
275 pure ()
276
277 Async.forConcurrently_ [job1, job2] ($ ())
278 threadDelay 500_000
279 -- Check the events
280 readMVar evts1 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 100]
281 readMVar evts2 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 50]
282
283 testMarkProgress :: IO ()
284 testMarkProgress = do
285 myEnv <- newTestEnv
286 evts <- newMVar []
287
288 withJob_ myEnv $ \hdl _input -> do
289 markStarted 10 hdl
290 jl0 <- getLatestJobStatus hdl
291 markProgress 1 hdl
292 jl1 <- getLatestJobStatus hdl
293 markFailure 1 Nothing hdl
294 jl2 <- getLatestJobStatus hdl
295 markFailure 1 (Just "boom") hdl
296 jl3 <- getLatestJobStatus hdl
297 markComplete hdl
298 jl4 <- getLatestJobStatus hdl
299 markStarted 5 hdl
300 markProgress 1 hdl
301 jl5 <- getLatestJobStatus hdl
302 markFailed (Just "kaboom") hdl
303 jl6 <- getLatestJobStatus hdl
304 liftIO $ modifyMVar_ evts (const (pure [jl0, jl1, jl2, jl3, jl4, jl5, jl6]))
305
306 threadDelay 500_000
307 [jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- readMVar evts
308
309 -- Check the events are what we expect
310 jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
311 , _scst_failed = Just 0
312 , _scst_remaining = Just 10
313 , _scst_events = Just []
314 }
315 jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
316 , _scst_failed = Just 0
317 , _scst_remaining = Just 9
318 , _scst_events = Just []
319 }
320 jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
321 , _scst_failed = Just 1
322 , _scst_remaining = Just 8
323 , _scst_events = Just []
324 }
325 jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
326 , _scst_failed = Just 2
327 , _scst_remaining = Just 7
328 , _scst_events = Just [
329 ScraperEvent { _scev_message = Just "boom"
330 , _scev_level = Just "ERROR"
331 , _scev_date = Nothing }
332 ]
333 }
334 jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
335 , _scst_failed = Just 2
336 , _scst_remaining = Just 0
337 , _scst_events = Just [
338 ScraperEvent { _scev_message = Just "boom"
339 , _scev_level = Just "ERROR"
340 , _scev_date = Nothing }
341 ]
342 }
343 jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
344 , _scst_failed = Just 0
345 , _scst_remaining = Just 4
346 , _scst_events = Just []
347 }
348 jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
349 , _scst_failed = Just 4
350 , _scst_remaining = Just 0
351 , _scst_events = Just [
352 ScraperEvent { _scev_message = Just "kaboom"
353 , _scev_level = Just "ERROR"
354 , _scev_date = Nothing }
355 ]
356 }
357
358 main :: IO ()
359 main = hspec $ do
360 describe "job queue" $ do
361 it "respects max runners limit" $
362 testMaxRunners
363 it "respects priorities" $
364 testPrios
365 it "can handle exceptions" $
366 testExceptions
367 it "fairly picks equal-priority-but-different-kind jobs" $
368 testFairness
369 describe "job status update and tracking" $ do
370 it "can fetch the latest job status" $
371 testFetchJobStatus
372 it "can spin two separate jobs and track their status separately" $
373 testFetchJobStatusNoContention
374 it "marking stuff behaves as expected" $
375 testMarkProgress