1 {-# LANGUAGE DeriveGeneric #-}
2 {-# LANGUAGE ScopedTypeVariables #-}
3 {-# LANGUAGE TypeApplications #-}
4 {-# LANGUAGE TypeFamilies #-}
5 {-# LANGUAGE NumericUnderscores #-}
8 import Control.Concurrent
9 import qualified Control.Concurrent.Async as Async
10 import Control.Concurrent.STM
11 import Control.Exception
13 import Control.Monad.Reader
17 import Data.Sequence (Seq)
21 import System.IO.Unsafe
22 import Network.HTTP.Client.TLS (newTlsManager)
23 import Network.HTTP.Client (Manager)
25 import qualified Servant.Job.Types as SJ
26 import qualified Servant.Job.Core as SJ
28 import Gargantext.Utils.Jobs.Internal (newJob)
29 import Gargantext.Utils.Jobs.Map
30 import Gargantext.Utils.Jobs.Monad hiding (withJob)
31 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
32 import Gargantext.Utils.Jobs.State
34 data JobT = A | B deriving (Eq, Ord, Show, Enum, Bounded)
36 data Counts = Counts { countAs :: Int, countBs :: Int }
39 inc, dec :: JobT -> Counts -> Counts
40 inc A cs = cs { countAs = countAs cs + 1 }
41 inc B cs = cs { countBs = countBs cs + 1 }
42 dec A cs = cs { countAs = countAs cs - 1 }
43 dec B cs = cs { countBs = countBs cs - 1 }
45 jobDuration, initialDelay :: Int
49 testMaxRunners :: IO ()
51 -- max runners = 2 with default settings
53 let settings = defaultJobSettings 2 k
54 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
55 runningJs <- newTVarIO []
56 let j num _jHandle _inp _l = do
57 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
58 threadDelay jobDuration
59 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
60 jobs = [ j n | n <- [1..4::Int] ]
61 _jids <- forM jobs $ \f -> pushJob A () f settings st
62 threadDelay initialDelay
63 r1 <- readTVarIO runningJs
64 sort r1 `shouldBe` ["Job #1", "Job #2"]
65 threadDelay jobDuration
66 r2 <- readTVarIO runningJs
67 sort r2 `shouldBe` ["Job #3", "Job #4"]
68 threadDelay jobDuration
69 r3 <- readTVarIO runningJs
75 let settings = defaultJobSettings 2 k
76 st :: JobsState JobT [String] () <- newJobsState settings $
77 applyPrios [(B, 10)] defaultPrios -- B has higher priority
78 runningJs <- newTVarIO (Counts 0 0)
79 let j jobt _jHandle _inp _l = do
80 atomically $ modifyTVar runningJs (inc jobt)
81 threadDelay jobDuration
82 atomically $ modifyTVar runningJs (dec jobt)
88 _jids <- forM jobs $ \(t, f) -> do
89 pushJob t () f settings st
90 threadDelay (2*initialDelay)
91 r1 <- readTVarIO runningJs
92 r1 `shouldBe` (Counts 0 2)
93 threadDelay jobDuration
94 r2 <- readTVarIO runningJs
95 r2 `shouldBe` (Counts 2 0)
96 threadDelay jobDuration
97 r3 <- readTVarIO runningJs
98 r3 `shouldBe` (Counts 0 0)
100 testExceptions :: IO ()
103 let settings = defaultJobSettings 2 k
104 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
106 (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
108 threadDelay initialDelay
109 mjob <- lookupJob jid (jobsData st)
111 Nothing -> error "boo"
112 Just je -> case jTask je of
113 DoneJ _ r -> isLeft r `shouldBe` True
117 testFairness :: IO ()
120 let settings = defaultJobSettings 2 k
121 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
122 runningJs <- newTVarIO (Counts 0 0)
123 let j jobt _jHandle _inp _l = do
124 atomically $ modifyTVar runningJs (inc jobt)
125 threadDelay jobDuration
126 atomically $ modifyTVar runningJs (dec jobt)
133 _jids <- forM jobs $ \(t, f) -> do
134 pushJob t () f settings st
135 threadDelay initialDelay
136 r1 <- readTVarIO runningJs
137 r1 `shouldBe` (Counts 2 0)
138 threadDelay jobDuration
139 r2 <- readTVarIO runningJs
140 r2 `shouldBe` (Counts 1 1) -- MOST IMPORTANT CHECK: the B got picked after the
141 -- two As, because it was inserted right after them
142 -- and has equal priority.
143 threadDelay jobDuration
144 r3 <- readTVarIO runningJs
145 r3 `shouldBe` (Counts 1 0)
146 threadDelay jobDuration
147 r4 <- readTVarIO runningJs
148 r4 `shouldBe` (Counts 0 0)
152 deriving (Show, Eq, Ord, Enum, Bounded)
155 = SomethingWentWrong JobError
158 instance Exception MyDummyError where
159 toException _ = toException (userError "SomethingWentWrong")
161 instance ToJSON MyDummyError where
162 toJSON (SomethingWentWrong _) = String "SomethingWentWrong"
169 deriving (Show, Eq, Ord, Generic)
171 instance Monoid MyDummyLog where
174 instance Semigroup MyDummyLog where
175 _ <> _ = error "not needed"
177 instance ToJSON MyDummyLog
179 newtype MyDummyEnv = MyDummyEnv { _MyDummyEnv :: JobEnv MyDummyJob (Seq MyDummyLog) () }
181 newtype MyDummyMonad a =
182 MyDummyMonad { _MyDummyMonad :: ReaderT MyDummyEnv IO a }
183 deriving (Functor, Applicative, Monad, MonadIO, MonadReader MyDummyEnv)
185 runMyDummyMonad :: MyDummyEnv -> MyDummyMonad a -> IO a
186 runMyDummyMonad env = flip runReaderT env . _MyDummyMonad
188 instance MonadJob MyDummyMonad MyDummyJob (Seq MyDummyLog) () where
189 getJobEnv = asks _MyDummyEnv
191 instance MonadJobStatus MyDummyMonad where
192 type JobType MyDummyMonad = MyDummyJob
193 type JobOutputType MyDummyMonad = ()
194 type JobEventType MyDummyMonad = MyDummyLog
196 testTlsManager :: Manager
197 testTlsManager = unsafePerformIO newTlsManager
198 {-# NOINLINE testTlsManager #-}
200 shouldBeE :: (MonadIO m, HasCallStack, Show a, Eq a) => a -> a -> m ()
201 shouldBeE a b = liftIO (shouldBe a b)
203 type TheEnv = JobEnv MyDummyJob (Seq MyDummyLog) ()
206 -> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
207 -> IO (SJ.JobStatus 'SJ.Safe MyDummyLog)
208 withJob myEnv f = runMyDummyMonad (MyDummyEnv myEnv) $
209 newJob @_ @MyDummyError getJobEnv MyDummyJob (\env hdl input ->
210 runMyDummyMonad (MyDummyEnv myEnv) $ f env hdl input) (SJ.JobInput () Nothing)
213 -> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
215 withJob_ env f = void (withJob env f)
217 testFetchJobStatus :: IO ()
218 testFetchJobStatus = do
220 let settings = defaultJobSettings 2 k
221 myEnv <- newJobEnv settings defaultPrios testTlsManager
224 withJob_ myEnv $ \_ hdl _input -> do
225 mb_status <- getLatestJobStatus hdl
227 -- now let's log something
228 updateJobProgress hdl (const $ Step_0 20)
229 mb_status' <- getLatestJobStatus hdl
230 updateJobProgress hdl (\(Step_0 x) -> Step_0 (x + 5))
231 mb_status'' <- getLatestJobStatus hdl
233 liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
238 readMVar evts >>= \expected -> expected `shouldBe` [Nothing, Just (Step_0 20), Just (Step_0 25)]
240 testFetchJobStatusNoContention :: IO ()
241 testFetchJobStatusNoContention = do
243 let settings = defaultJobSettings 2 k
244 myEnv <- newJobEnv settings defaultPrios testTlsManager
249 let job1 = \() -> withJob_ myEnv $ \_ hdl _input -> do
250 updateJobProgress hdl (const $ Step_1 100)
251 mb_status <- getLatestJobStatus hdl
252 liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
255 let job2 = \() -> withJob_ myEnv $ \_ hdl _input -> do
256 updateJobProgress hdl (const $ Step_0 50)
257 mb_status <- getLatestJobStatus hdl
258 liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
261 Async.forConcurrently_ [job1, job2] ($ ())
264 readMVar evts1 >>= \expected -> expected `shouldBe` [Just (Step_1 100)]
265 readMVar evts2 >>= \expected -> expected `shouldBe` [Just (Step_0 50)]
269 describe "job queue" $ do
270 it "respects max runners limit" $
272 it "respects priorities" $
274 it "can handle exceptions" $
276 it "fairly picks equal-priority-but-different-kind jobs" $
278 describe "job status update and tracking" $ do
279 it "can fetch the latest job status" $
281 it "can spin two separate jobs and track their status separately" $
282 testFetchJobStatusNoContention