1 {-# LANGUAGE DeriveGeneric #-}
2 {-# LANGUAGE ScopedTypeVariables #-}
3 {-# LANGUAGE TypeApplications #-}
4 {-# LANGUAGE TypeFamilies #-}
5 {-# LANGUAGE NumericUnderscores #-}
6 module Utils.Jobs (test) where
8 import Control.Concurrent
9 import qualified Control.Concurrent.Async as Async
10 import Control.Concurrent.STM
11 import Control.Exception
13 import Control.Monad.Reader
14 import Control.Monad.Except
18 import Data.Sequence (Seq, (|>), fromList)
22 import System.IO.Unsafe
23 import Network.HTTP.Client.TLS (newTlsManager)
24 import Network.HTTP.Client (Manager)
26 import qualified Servant.Job.Types as SJ
27 import qualified Servant.Job.Core as SJ
29 import Gargantext.Utils.Jobs.Internal (newJob)
30 import Gargantext.Utils.Jobs.Map
31 import Gargantext.Utils.Jobs.Monad hiding (withJob)
32 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
33 import Gargantext.Utils.Jobs.State
34 import Gargantext.API.Prelude
35 import Gargantext.API.Admin.EnvTypes as EnvTypes
36 import Gargantext.API.Admin.Orchestrator.Types
43 deriving (Eq, Ord, Show, Enum, Bounded)
45 -- | This type models the schedule picked up by the orchestrator.
46 newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show)
48 addJobToSchedule :: JobT -> MVar JobSchedule -> IO ()
49 addJobToSchedule jobt mvar = do
50 modifyMVar_ mvar $ \js -> do
51 let js' = js { _JobSchedule = _JobSchedule js |> jobt }
54 data Counts = Counts { countAs :: Int, countBs :: Int }
57 jobDuration, initialDelay :: Int
61 testMaxRunners :: IO ()
63 -- max runners = 2 with default settings
65 let settings = defaultJobSettings 2 k
66 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
67 runningJs <- newTVarIO []
68 let j num _jHandle _inp _l = do
69 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
70 threadDelay jobDuration
71 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
72 jobs = [ j n | n <- [1..4::Int] ]
73 _jids <- forM jobs $ \f -> pushJob A () f settings st
74 threadDelay initialDelay
75 r1 <- readTVarIO runningJs
76 sort r1 `shouldBe` ["Job #1", "Job #2"]
77 threadDelay jobDuration
78 r2 <- readTVarIO runningJs
79 sort r2 `shouldBe` ["Job #3", "Job #4"]
80 threadDelay jobDuration
81 r3 <- readTVarIO runningJs
87 -- Use a single runner, so that we can check the order of execution
88 -- without worrying about the runners competing with each other.
89 let settings = defaultJobSettings 1 k
90 prios = [(B, 10), (C, 1), (D, 5)]
91 st :: JobsState JobT [String] () <- newJobsState settings $
92 applyPrios prios defaultPrios -- B has the highest priority
93 pickedSchedule <- newMVar (JobSchedule mempty)
94 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
101 -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
102 -- the time 'popQueue' gets called.
103 now <- getCurrentTime
104 atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
106 -- wait for the jobs to finish, waiting for more than the total duration,
107 -- so that we are sure that all jobs have finished, then check the schedule.
108 threadDelay jobDuration
109 finalSchedule <- readMVar pickedSchedule
110 finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A])
112 testExceptions :: IO ()
115 let settings = defaultJobSettings 2 k
116 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
118 (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
120 threadDelay initialDelay
121 mjob <- lookupJob jid (jobsData st)
123 Nothing -> error "boo"
124 Just je -> case jTask je of
125 DoneJ _ r -> isLeft r `shouldBe` True
129 testFairness :: IO ()
132 let settings = defaultJobSettings 1 k
133 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
134 pickedSchedule <- newMVar (JobSchedule mempty)
135 let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
142 time <- getCurrentTime
143 -- in this scenario we simulate two types of jobs all with
144 -- all the same level of priority: our queue implementation
145 -- will behave as a classic FIFO, keeping into account the
147 atomically $ forM_ (zip [0,2 ..] jobs) $ \(timeDelta, (t, f)) -> void $
148 pushJobWithTime (addUTCTime (fromInteger timeDelta) time) t () f settings st
150 threadDelay jobDuration
151 finalSchedule <- readMVar pickedSchedule
152 finalSchedule `shouldBe` JobSchedule (fromList [A, A, B, A, A])
155 newtype MyDummyMonad a =
156 MyDummyMonad { _MyDummyMonad :: GargM Env GargError a }
157 deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
159 instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
160 getJobEnv = MyDummyMonad getJobEnv
162 instance MonadJobStatus MyDummyMonad where
163 type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle GargError
164 type JobType MyDummyMonad = GargJob
165 type JobOutputType MyDummyMonad = JobLog
166 type JobEventType MyDummyMonad = JobLog
168 getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
169 withTracer _ jh n = n jh
170 markStarted n jh = MyDummyMonad (markStarted n jh)
171 markProgress steps jh = MyDummyMonad (markProgress steps jh)
172 markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
173 markComplete jh = MyDummyMonad (markComplete jh)
174 markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
176 runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
177 runMyDummyMonad env m = do
178 res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
183 testTlsManager :: Manager
184 testTlsManager = unsafePerformIO newTlsManager
185 {-# NOINLINE testTlsManager #-}
188 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
189 -> IO (SJ.JobStatus 'SJ.Safe JobLog)
190 withJob env f = runMyDummyMonad env $ MyDummyMonad $
191 -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
192 newJob @_ @GargError mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
193 runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
196 -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
198 withJob_ env f = void (withJob env f)
203 let settings = defaultJobSettings 1 k
204 myEnv <- newJobEnv settings defaultPrios testTlsManager
206 { _env_settings = error "env_settings not needed, but forced somewhere (check StrictData)"
207 , _env_logger = error "env_logger not needed, but forced somewhere (check StrictData)"
208 , _env_pool = error "env_pool not needed, but forced somewhere (check StrictData)"
209 , _env_nodeStory = error "env_nodeStory not needed, but forced somewhere (check StrictData)"
210 , _env_manager = testTlsManager
211 , _env_self_url = error "self_url not needed, but forced somewhere (check StrictData)"
212 , _env_scrapers = error "scrapers not needed, but forced somewhere (check StrictData)"
214 , _env_config = error "config not needed, but forced somewhere (check StrictData)"
215 , _env_mail = error "mail not needed, but forced somewhere (check StrictData)"
216 , _env_nlp = error "nlp not needed, but forced somewhere (check StrictData)"
219 testFetchJobStatus :: IO ()
220 testFetchJobStatus = do
224 withJob_ myEnv $ \hdl _input -> do
225 mb_status <- getLatestJobStatus hdl
227 -- now let's log something
229 mb_status' <- getLatestJobStatus hdl
231 mb_status'' <- getLatestJobStatus hdl
233 liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
238 readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
240 testFetchJobStatusNoContention :: IO ()
241 testFetchJobStatusNoContention = do
247 let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
249 mb_status <- getLatestJobStatus hdl
250 liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
253 let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
255 mb_status <- getLatestJobStatus hdl
256 liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
259 Async.forConcurrently_ [job1, job2] ($ ())
262 readMVar evts1 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 100]
263 readMVar evts2 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 50]
265 testMarkProgress :: IO ()
266 testMarkProgress = do
268 evts <- newTBQueueIO 7
269 let getStatus hdl = do
270 liftIO $ threadDelay 100_000
271 st <- getLatestJobStatus hdl
272 liftIO $ atomically $ writeTBQueue evts st
274 allEventsArrived <- isFullTBQueue evts
275 if allEventsArrived then flushTBQueue evts else retry
277 withJob_ myEnv $ \hdl _input -> do
284 markFailure 1 Nothing hdl
287 markFailure 1 (Just "boom") hdl
297 markFailed (Just "kaboom") hdl
301 [jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- atomically readAllEvents
303 -- Check the events are what we expect
304 jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
305 , _scst_failed = Just 0
306 , _scst_remaining = Just 10
307 , _scst_events = Just []
309 jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
310 , _scst_failed = Just 0
311 , _scst_remaining = Just 9
312 , _scst_events = Just []
314 jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
315 , _scst_failed = Just 1
316 , _scst_remaining = Just 8
317 , _scst_events = Just []
319 jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
320 , _scst_failed = Just 2
321 , _scst_remaining = Just 7
322 , _scst_events = Just [
323 ScraperEvent { _scev_message = Just "boom"
324 , _scev_level = Just "ERROR"
325 , _scev_date = Nothing }
328 jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
329 , _scst_failed = Just 2
330 , _scst_remaining = Just 0
331 , _scst_events = Just [
332 ScraperEvent { _scev_message = Just "boom"
333 , _scev_level = Just "ERROR"
334 , _scev_date = Nothing }
337 jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
338 , _scst_failed = Just 0
339 , _scst_remaining = Just 4
340 , _scst_events = Just []
342 jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
343 , _scst_failed = Just 4
344 , _scst_remaining = Just 0
345 , _scst_events = Just [
346 ScraperEvent { _scev_message = Just "kaboom"
347 , _scev_level = Just "ERROR"
348 , _scev_date = Nothing }
354 describe "job queue" $ do
355 it "respects max runners limit" $
357 it "respects priorities" $
359 it "can handle exceptions" $
361 it "fairly picks equal-priority-but-different-kind jobs" $
363 describe "job status update and tracking" $ do
364 it "can fetch the latest job status" $
366 it "can spin two separate jobs and track their status separately" $
367 testFetchJobStatusNoContention
368 it "marking stuff behaves as expected" $