]> Git — Sourcephile - gargantext.git/blob - tests/queue/Main.hs
[FIX]
[gargantext.git] / tests / queue / Main.hs
1 {-# LANGUAGE DeriveGeneric #-}
2 {-# LANGUAGE ScopedTypeVariables #-}
3 {-# LANGUAGE TypeApplications #-}
4 {-# LANGUAGE TypeFamilies #-}
5 {-# LANGUAGE NumericUnderscores #-}
6 module Main where
7
8 import Control.Concurrent
9 import qualified Control.Concurrent.Async as Async
10 import Control.Concurrent.STM
11 import Control.Exception
12 import Control.Monad
13 import Control.Monad.Reader
14 import Control.Monad.Except
15 import Data.Maybe
16 import Data.Either
17 import Data.List
18 import Data.Sequence (Seq, (|>), fromList)
19 import Data.Time
20 import GHC.Stack
21 import Prelude
22 import System.IO.Unsafe
23 import Network.HTTP.Client.TLS (newTlsManager)
24 import Network.HTTP.Client (Manager)
25 import Test.Hspec
26 import qualified Servant.Job.Types as SJ
27 import qualified Servant.Job.Core as SJ
28
29 import Gargantext.Utils.Jobs.Internal (newJob)
30 import Gargantext.Utils.Jobs.Map
31 import Gargantext.Utils.Jobs.Monad hiding (withJob)
32 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
33 import Gargantext.Utils.Jobs.State
34 import Gargantext.API.Prelude
35 import Gargantext.API.Admin.EnvTypes as EnvTypes
36 import Gargantext.API.Admin.Orchestrator.Types
37
38 data JobT = A
39 | B
40 | C
41 | D
42 deriving (Eq, Ord, Show, Enum, Bounded)
43
44 -- | This type models the schedule picked up by the orchestrator.
45 newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show)
46
47 addJobToSchedule :: JobT -> MVar JobSchedule -> IO ()
48 addJobToSchedule jobt mvar = do
49 modifyMVar_ mvar $ \js -> do
50 let js' = js { _JobSchedule = _JobSchedule js |> jobt }
51 pure js'
52
53 data Counts = Counts { countAs :: Int, countBs :: Int }
54 deriving (Eq, Show)
55
56 inc, dec :: JobT -> Counts -> Counts
57 inc A cs = cs { countAs = countAs cs + 1 }
58 inc B cs = cs { countBs = countBs cs + 1 }
59 inc C cs = cs
60 inc D cs = cs
61 dec A cs = cs { countAs = countAs cs - 1 }
62 dec B cs = cs { countBs = countBs cs - 1 }
63 dec C cs = cs
64 dec D cs = cs
65
66 jobDuration, initialDelay :: Int
67 jobDuration = 100000
68 initialDelay = 20000
69
70 testMaxRunners :: IO ()
71 testMaxRunners = do
72 -- max runners = 2 with default settings
73 k <- genSecret
74 let settings = defaultJobSettings 2 k
75 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
76 runningJs <- newTVarIO []
77 let j num _jHandle _inp _l = do
78 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
79 threadDelay jobDuration
80 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
81 jobs = [ j n | n <- [1..4::Int] ]
82 _jids <- forM jobs $ \f -> pushJob A () f settings st
83 threadDelay initialDelay
84 r1 <- readTVarIO runningJs
85 sort r1 `shouldBe` ["Job #1", "Job #2"]
86 threadDelay jobDuration
87 r2 <- readTVarIO runningJs
88 sort r2 `shouldBe` ["Job #3", "Job #4"]
89 threadDelay jobDuration
90 r3 <- readTVarIO runningJs
91 r3 `shouldBe` []
92
93 testPrios :: IO ()
94 testPrios = do
95 k <- genSecret
96 -- Use a single runner, so that we can check the order of execution
97 -- without worrying about the runners competing with each other.
98 let settings = defaultJobSettings 1 k
99 prios = [(B, 10), (C, 1), (D, 5)]
100 st :: JobsState JobT [String] () <- newJobsState settings $
101 applyPrios prios defaultPrios -- B has the highest priority
102 pickedSchedule <- newMVar (JobSchedule mempty)
103 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
104 jobs = [ (A, j A)
105 , (C, j C)
106 , (B, j B)
107 , (D, j D)
108 ]
109
110 -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
111 -- the time 'popQueue' gets called.
112 now <- getCurrentTime
113 atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
114
115 -- wait for the jobs to finish, waiting for more than the total duration,
116 -- so that we are sure that all jobs have finished, then check the schedule.
117 threadDelay jobDuration
118 finalSchedule <- readMVar pickedSchedule
119 finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A])
120
121 testExceptions :: IO ()
122 testExceptions = do
123 k <- genSecret
124 let settings = defaultJobSettings 2 k
125 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
126 jid <- pushJob A ()
127 (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
128 settings st
129 threadDelay initialDelay
130 mjob <- lookupJob jid (jobsData st)
131 case mjob of
132 Nothing -> error "boo"
133 Just je -> case jTask je of
134 DoneJ _ r -> isLeft r `shouldBe` True
135 _ -> error "boo2"
136 return ()
137
138 testFairness :: IO ()
139 testFairness = do
140 k <- genSecret
141 let settings = defaultJobSettings 2 k
142 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
143 runningJs <- newTVarIO (Counts 0 0)
144 let j jobt _jHandle _inp _l = do
145 atomically $ modifyTVar runningJs (inc jobt)
146 threadDelay jobDuration
147 atomically $ modifyTVar runningJs (dec jobt)
148 jobs = [ (A, j A)
149 , (A, j A)
150 , (B, j B)
151 , (A, j A)
152 , (A, j A)
153 ]
154 _jids <- forM jobs $ \(t, f) -> do
155 pushJob t () f settings st
156 threadDelay initialDelay
157 r1 <- readTVarIO runningJs
158 r1 `shouldBe` (Counts 2 0)
159 threadDelay jobDuration
160 r2 <- readTVarIO runningJs
161 r2 `shouldBe` (Counts 1 1) -- MOST IMPORTANT CHECK: the B got picked after the
162 -- two As, because it was inserted right after them
163 -- and has equal priority.
164 threadDelay jobDuration
165 r3 <- readTVarIO runningJs
166 r3 `shouldBe` (Counts 1 0)
167 threadDelay jobDuration
168 r4 <- readTVarIO runningJs
169 r4 `shouldBe` (Counts 0 0)
170
171 newtype MyDummyMonad a =
172 MyDummyMonad { _MyDummyMonad :: GargM Env GargError a }
173 deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
174
175 instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
176 getJobEnv = MyDummyMonad getJobEnv
177
178 instance MonadJobStatus MyDummyMonad where
179 type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle GargError
180 type JobType MyDummyMonad = GargJob
181 type JobOutputType MyDummyMonad = JobLog
182 type JobEventType MyDummyMonad = JobLog
183
184 getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
185 withTracer _ jh n = n jh
186 markStarted n jh = MyDummyMonad (markStarted n jh)
187 markProgress steps jh = MyDummyMonad (markProgress steps jh)
188 markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
189 markComplete jh = MyDummyMonad (markComplete jh)
190 markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
191
192 runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
193 runMyDummyMonad env m = do
194 res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
195 case res of
196 Left e -> throwIO e
197 Right x -> pure x
198
199 testTlsManager :: Manager
200 testTlsManager = unsafePerformIO newTlsManager
201 {-# NOINLINE testTlsManager #-}
202
203 shouldBeE :: (MonadIO m, HasCallStack, Show a, Eq a) => a -> a -> m ()
204 shouldBeE a b = liftIO (shouldBe a b)
205
206 withJob :: Env
207 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
208 -> IO (SJ.JobStatus 'SJ.Safe JobLog)
209 withJob env f = runMyDummyMonad env $ MyDummyMonad $
210 -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
211 newJob @_ @GargError mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
212 runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
213
214 withJob_ :: Env
215 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
216 -> IO ()
217 withJob_ env f = void (withJob env f)
218
219 newTestEnv :: IO Env
220 newTestEnv = do
221 k <- genSecret
222 let settings = defaultJobSettings 2 k
223 myEnv <- newJobEnv settings defaultPrios testTlsManager
224 pure $ Env
225 { _env_settings = error "env_settings not needed, but forced somewhere (check StrictData)"
226 , _env_logger = error "env_logger not needed, but forced somewhere (check StrictData)"
227 , _env_pool = error "env_pool not needed, but forced somewhere (check StrictData)"
228 , _env_nodeStory = error "env_nodeStory not needed, but forced somewhere (check StrictData)"
229 , _env_manager = testTlsManager
230 , _env_self_url = error "self_url not needed, but forced somewhere (check StrictData)"
231 , _env_scrapers = error "scrapers not needed, but forced somewhere (check StrictData)"
232 , _env_jobs = myEnv
233 , _env_config = error "config not needed, but forced somewhere (check StrictData)"
234 , _env_mail = error "mail not needed, but forced somewhere (check StrictData)"
235 , _env_nlp = error "nlp not needed, but forced somewhere (check StrictData)"
236 }
237
238 testFetchJobStatus :: IO ()
239 testFetchJobStatus = do
240 myEnv <- newTestEnv
241 evts <- newMVar []
242
243 withJob_ myEnv $ \hdl _input -> do
244 mb_status <- getLatestJobStatus hdl
245
246 -- now let's log something
247 markStarted 10 hdl
248 mb_status' <- getLatestJobStatus hdl
249 markProgress 5 hdl
250 mb_status'' <- getLatestJobStatus hdl
251
252 liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
253 pure ()
254
255 threadDelay 500_000
256 -- Check the events
257 readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
258
259 testFetchJobStatusNoContention :: IO ()
260 testFetchJobStatusNoContention = do
261 myEnv <- newTestEnv
262
263 evts1 <- newMVar []
264 evts2 <- newMVar []
265
266 let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
267 markStarted 100 hdl
268 mb_status <- getLatestJobStatus hdl
269 liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
270 pure ()
271
272 let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
273 markStarted 50 hdl
274 mb_status <- getLatestJobStatus hdl
275 liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
276 pure ()
277
278 Async.forConcurrently_ [job1, job2] ($ ())
279 threadDelay 500_000
280 -- Check the events
281 readMVar evts1 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 100]
282 readMVar evts2 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 50]
283
284 testMarkProgress :: IO ()
285 testMarkProgress = do
286 myEnv <- newTestEnv
287 evts <- newMVar []
288
289 withJob_ myEnv $ \hdl _input -> do
290 markStarted 10 hdl
291 jl0 <- getLatestJobStatus hdl
292 markProgress 1 hdl
293 jl1 <- getLatestJobStatus hdl
294 markFailure 1 Nothing hdl
295 jl2 <- getLatestJobStatus hdl
296 markFailure 1 (Just "boom") hdl
297 jl3 <- getLatestJobStatus hdl
298 markComplete hdl
299 jl4 <- getLatestJobStatus hdl
300 markStarted 5 hdl
301 markProgress 1 hdl
302 jl5 <- getLatestJobStatus hdl
303 markFailed (Just "kaboom") hdl
304 jl6 <- getLatestJobStatus hdl
305 liftIO $ modifyMVar_ evts (const (pure [jl0, jl1, jl2, jl3, jl4, jl5, jl6]))
306
307 threadDelay 500_000
308 [jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- readMVar evts
309
310 -- Check the events are what we expect
311 jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
312 , _scst_failed = Just 0
313 , _scst_remaining = Just 10
314 , _scst_events = Just []
315 }
316 jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
317 , _scst_failed = Just 0
318 , _scst_remaining = Just 9
319 , _scst_events = Just []
320 }
321 jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
322 , _scst_failed = Just 1
323 , _scst_remaining = Just 8
324 , _scst_events = Just []
325 }
326 jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
327 , _scst_failed = Just 2
328 , _scst_remaining = Just 7
329 , _scst_events = Just [
330 ScraperEvent { _scev_message = Just "boom"
331 , _scev_level = Just "ERROR"
332 , _scev_date = Nothing }
333 ]
334 }
335 jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
336 , _scst_failed = Just 2
337 , _scst_remaining = Just 0
338 , _scst_events = Just [
339 ScraperEvent { _scev_message = Just "boom"
340 , _scev_level = Just "ERROR"
341 , _scev_date = Nothing }
342 ]
343 }
344 jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
345 , _scst_failed = Just 0
346 , _scst_remaining = Just 4
347 , _scst_events = Just []
348 }
349 jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
350 , _scst_failed = Just 4
351 , _scst_remaining = Just 0
352 , _scst_events = Just [
353 ScraperEvent { _scev_message = Just "kaboom"
354 , _scev_level = Just "ERROR"
355 , _scev_date = Nothing }
356 ]
357 }
358
359 main :: IO ()
360 main = hspec $ do
361 describe "job queue" $ do
362 it "respects max runners limit" $
363 testMaxRunners
364 it "respects priorities" $
365 testPrios
366 it "can handle exceptions" $
367 testExceptions
368 it "fairly picks equal-priority-but-different-kind jobs" $
369 testFairness
370 describe "job status update and tracking" $ do
371 it "can fetch the latest job status" $
372 testFetchJobStatus
373 it "can spin two separate jobs and track their status separately" $
374 testFetchJobStatusNoContention
375 it "marking stuff behaves as expected" $
376 testMarkProgress