1 {-# LANGUAGE DeriveGeneric #-}
2 {-# LANGUAGE ScopedTypeVariables #-}
3 {-# LANGUAGE TypeApplications #-}
4 {-# LANGUAGE TypeFamilies #-}
5 {-# LANGUAGE NumericUnderscores #-}
8 import Control.Concurrent
9 import qualified Control.Concurrent.Async as Async
10 import Control.Concurrent.STM
11 import Control.Exception
13 import Control.Monad.Reader
14 import Control.Monad.Except
18 import Data.Sequence (Seq, (|>), fromList)
22 import System.IO.Unsafe
23 import Network.HTTP.Client.TLS (newTlsManager)
24 import Network.HTTP.Client (Manager)
26 import qualified Servant.Job.Types as SJ
27 import qualified Servant.Job.Core as SJ
29 import Gargantext.Utils.Jobs.Internal (newJob)
30 import Gargantext.Utils.Jobs.Map
31 import Gargantext.Utils.Jobs.Monad hiding (withJob)
32 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
33 import Gargantext.Utils.Jobs.State
34 import Gargantext.API.Prelude
35 import Gargantext.API.Admin.EnvTypes as EnvTypes
36 import Gargantext.API.Admin.Orchestrator.Types
42 deriving (Eq, Ord, Show, Enum, Bounded)
44 -- | This type models the schedule picked up by the orchestrator.
45 newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show)
47 addJobToSchedule :: JobT -> MVar JobSchedule -> IO ()
48 addJobToSchedule jobt mvar = do
49 modifyMVar_ mvar $ \js -> do
50 let js' = js { _JobSchedule = _JobSchedule js |> jobt }
53 data Counts = Counts { countAs :: Int, countBs :: Int }
56 inc, dec :: JobT -> Counts -> Counts
57 inc A cs = cs { countAs = countAs cs + 1 }
58 inc B cs = cs { countBs = countBs cs + 1 }
61 dec A cs = cs { countAs = countAs cs - 1 }
62 dec B cs = cs { countBs = countBs cs - 1 }
66 jobDuration, initialDelay :: Int
70 testMaxRunners :: IO ()
72 -- max runners = 2 with default settings
74 let settings = defaultJobSettings 2 k
75 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
76 runningJs <- newTVarIO []
77 let j num _jHandle _inp _l = do
78 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
79 threadDelay jobDuration
80 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
81 jobs = [ j n | n <- [1..4::Int] ]
82 _jids <- forM jobs $ \f -> pushJob A () f settings st
83 threadDelay initialDelay
84 r1 <- readTVarIO runningJs
85 sort r1 `shouldBe` ["Job #1", "Job #2"]
86 threadDelay jobDuration
87 r2 <- readTVarIO runningJs
88 sort r2 `shouldBe` ["Job #3", "Job #4"]
89 threadDelay jobDuration
90 r3 <- readTVarIO runningJs
96 -- Use a single runner, so that we can check the order of execution
97 -- without worrying about the runners competing with each other.
98 let settings = defaultJobSettings 1 k
99 prios = [(B, 10), (C, 1), (D, 5)]
100 st :: JobsState JobT [String] () <- newJobsState settings $
101 applyPrios prios defaultPrios -- B has the highest priority
102 pickedSchedule <- newMVar (JobSchedule mempty)
103 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
110 -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
111 -- the time 'popQueue' gets called.
112 now <- getCurrentTime
113 atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
115 -- wait for the jobs to finish, waiting for more than the total duration,
116 -- so that we are sure that all jobs have finished, then check the schedule.
117 threadDelay jobDuration
118 finalSchedule <- readMVar pickedSchedule
119 finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A])
121 testExceptions :: IO ()
124 let settings = defaultJobSettings 2 k
125 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
127 (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
129 threadDelay initialDelay
130 mjob <- lookupJob jid (jobsData st)
132 Nothing -> error "boo"
133 Just je -> case jTask je of
134 DoneJ _ r -> isLeft r `shouldBe` True
138 testFairness :: IO ()
141 let settings = defaultJobSettings 2 k
142 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
143 runningJs <- newTVarIO (Counts 0 0)
144 let j jobt _jHandle _inp _l = do
145 atomically $ modifyTVar runningJs (inc jobt)
146 threadDelay jobDuration
147 atomically $ modifyTVar runningJs (dec jobt)
154 _jids <- forM jobs $ \(t, f) -> do
155 pushJob t () f settings st
156 threadDelay initialDelay
157 r1 <- readTVarIO runningJs
158 r1 `shouldBe` (Counts 2 0)
159 threadDelay jobDuration
160 r2 <- readTVarIO runningJs
161 r2 `shouldBe` (Counts 1 1) -- MOST IMPORTANT CHECK: the B got picked after the
162 -- two As, because it was inserted right after them
163 -- and has equal priority.
164 threadDelay jobDuration
165 r3 <- readTVarIO runningJs
166 r3 `shouldBe` (Counts 1 0)
167 threadDelay jobDuration
168 r4 <- readTVarIO runningJs
169 r4 `shouldBe` (Counts 0 0)
171 newtype MyDummyMonad a =
172 MyDummyMonad { _MyDummyMonad :: GargM Env GargError a }
173 deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
175 instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
176 getJobEnv = MyDummyMonad getJobEnv
178 instance MonadJobStatus MyDummyMonad where
179 type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle GargError
180 type JobType MyDummyMonad = GargJob
181 type JobOutputType MyDummyMonad = JobLog
182 type JobEventType MyDummyMonad = JobLog
184 getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
185 withTracer _ jh n = n jh
186 markStarted n jh = MyDummyMonad (markStarted n jh)
187 markProgress steps jh = MyDummyMonad (markProgress steps jh)
188 markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
189 markComplete jh = MyDummyMonad (markComplete jh)
190 markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
192 runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
193 runMyDummyMonad env m = do
194 res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
199 testTlsManager :: Manager
200 testTlsManager = unsafePerformIO newTlsManager
201 {-# NOINLINE testTlsManager #-}
203 shouldBeE :: (MonadIO m, HasCallStack, Show a, Eq a) => a -> a -> m ()
204 shouldBeE a b = liftIO (shouldBe a b)
207 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
208 -> IO (SJ.JobStatus 'SJ.Safe JobLog)
209 withJob env f = runMyDummyMonad env $ MyDummyMonad $
210 -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
211 newJob @_ @GargError mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
212 runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
215 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
217 withJob_ env f = void (withJob env f)
222 let settings = defaultJobSettings 2 k
223 myEnv <- newJobEnv settings defaultPrios testTlsManager
225 { _env_settings = error "env_settings not needed, but forced somewhere (check StrictData)"
226 , _env_logger = error "env_logger not needed, but forced somewhere (check StrictData)"
227 , _env_pool = error "env_pool not needed, but forced somewhere (check StrictData)"
228 , _env_nodeStory = error "env_nodeStory not needed, but forced somewhere (check StrictData)"
229 , _env_manager = testTlsManager
230 , _env_self_url = error "self_url not needed, but forced somewhere (check StrictData)"
231 , _env_scrapers = error "scrapers not needed, but forced somewhere (check StrictData)"
233 , _env_config = error "config not needed, but forced somewhere (check StrictData)"
234 , _env_mail = error "mail not needed, but forced somewhere (check StrictData)"
235 , _env_nlp = error "nlp not needed, but forced somewhere (check StrictData)"
238 testFetchJobStatus :: IO ()
239 testFetchJobStatus = do
243 withJob_ myEnv $ \hdl _input -> do
244 mb_status <- getLatestJobStatus hdl
246 -- now let's log something
248 mb_status' <- getLatestJobStatus hdl
250 mb_status'' <- getLatestJobStatus hdl
252 liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
257 readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
259 testFetchJobStatusNoContention :: IO ()
260 testFetchJobStatusNoContention = do
266 let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
268 mb_status <- getLatestJobStatus hdl
269 liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
272 let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
274 mb_status <- getLatestJobStatus hdl
275 liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
278 Async.forConcurrently_ [job1, job2] ($ ())
281 readMVar evts1 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 100]
282 readMVar evts2 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 50]
284 testMarkProgress :: IO ()
285 testMarkProgress = do
289 withJob_ myEnv $ \hdl _input -> do
291 jl0 <- getLatestJobStatus hdl
293 jl1 <- getLatestJobStatus hdl
294 markFailure 1 Nothing hdl
295 jl2 <- getLatestJobStatus hdl
296 markFailure 1 (Just "boom") hdl
297 jl3 <- getLatestJobStatus hdl
299 jl4 <- getLatestJobStatus hdl
302 jl5 <- getLatestJobStatus hdl
303 markFailed (Just "kaboom") hdl
304 jl6 <- getLatestJobStatus hdl
305 liftIO $ modifyMVar_ evts (const (pure [jl0, jl1, jl2, jl3, jl4, jl5, jl6]))
308 [jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- readMVar evts
310 -- Check the events are what we expect
311 jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
312 , _scst_failed = Just 0
313 , _scst_remaining = Just 10
314 , _scst_events = Just []
316 jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
317 , _scst_failed = Just 0
318 , _scst_remaining = Just 9
319 , _scst_events = Just []
321 jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
322 , _scst_failed = Just 1
323 , _scst_remaining = Just 8
324 , _scst_events = Just []
326 jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
327 , _scst_failed = Just 2
328 , _scst_remaining = Just 7
329 , _scst_events = Just [
330 ScraperEvent { _scev_message = Just "boom"
331 , _scev_level = Just "ERROR"
332 , _scev_date = Nothing }
335 jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
336 , _scst_failed = Just 2
337 , _scst_remaining = Just 0
338 , _scst_events = Just [
339 ScraperEvent { _scev_message = Just "boom"
340 , _scev_level = Just "ERROR"
341 , _scev_date = Nothing }
344 jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
345 , _scst_failed = Just 0
346 , _scst_remaining = Just 4
347 , _scst_events = Just []
349 jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
350 , _scst_failed = Just 4
351 , _scst_remaining = Just 0
352 , _scst_events = Just [
353 ScraperEvent { _scev_message = Just "kaboom"
354 , _scev_level = Just "ERROR"
355 , _scev_date = Nothing }
361 describe "job queue" $ do
362 it "respects max runners limit" $
364 it "respects priorities" $
366 it "can handle exceptions" $
368 it "fairly picks equal-priority-but-different-kind jobs" $
370 describe "job status update and tracking" $ do
371 it "can fetch the latest job status" $
373 it "can spin two separate jobs and track their status separately" $
374 testFetchJobStatusNoContention
375 it "marking stuff behaves as expected" $