1 {-# LANGUAGE ScopedTypeVariables #-}
4 import Control.Concurrent
5 import Control.Concurrent.Async
6 import Control.Concurrent.STM
13 import Gargantext.Utils.Jobs
14 import Gargantext.Utils.Jobs.Map
15 import Gargantext.Utils.Jobs.Monad
16 import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
17 import Gargantext.Utils.Jobs.Settings
18 import Gargantext.Utils.Jobs.State
20 data JobT = A | B deriving (Eq, Ord, Show, Enum, Bounded)
22 data Counts = Counts { countAs :: Int, countBs :: Int }
25 inc, dec :: JobT -> Counts -> Counts
26 inc A cs = cs { countAs = countAs cs + 1 }
27 inc B cs = cs { countBs = countBs cs + 1 }
28 dec A cs = cs { countAs = countAs cs - 1 }
29 dec B cs = cs { countBs = countBs cs - 1 }
32 -- max runners = 2 with default settings
34 let settings = defaultJobSettings k
35 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
36 runningJs <- newTVarIO []
38 atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
39 -- putStrLn $ "Job #" ++ show num ++ " started"
40 threadDelay (5 * 1000000) -- 5s
41 -- putStrLn $ "Job #" ++ show num ++ " done"
42 atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
43 jobs = [ (n, j n) | n <- [1..4] ]
44 jids <- forM jobs $ \(i, f) -> do
45 -- putStrLn ("Submitting job #" ++ show i)
46 pushJob A () f settings st
47 threadDelay 10000 -- 10ms
48 r1 <- readTVarIO runningJs
49 -- putStrLn ("Jobs running: " ++ show r1)
50 sort r1 `shouldBe` ["Job #1", "Job #2"]
51 threadDelay (6 * 1000000) -- 6s
52 r2 <- readTVarIO runningJs
53 sort r2 `shouldBe` ["Job #3", "Job #4"]
54 threadDelay (5 * 1000000) -- 5s
55 r3 <- readTVarIO runningJs
60 let settings = defaultJobSettings k
61 st :: JobsState JobT [String] () <- newJobsState settings $
62 applyPrios [(B, 10)] defaultPrios -- B has higher priority
63 runningJs <- newTVarIO (Counts 0 0)
64 let j num jobt _inp l = do
65 atomically $ modifyTVar runningJs (inc jobt)
66 -- putStrLn $ "Job #" ++ show num ++ " started"
67 threadDelay (5 * 1000000) -- 5s
68 -- putStrLn $ "Job #" ++ show num ++ " done"
69 atomically $ modifyTVar runningJs (dec jobt)
70 jobs = [ (0, A, j 0 A)
75 jids <- forM jobs $ \(i, t, f) -> do
76 -- putStrLn ("Submitting job #" ++ show i)
77 pushJob t () f settings st
78 threadDelay 10000 -- 10ms
79 r1 <- readTVarIO runningJs
80 r1 `shouldBe` (Counts 0 2)
81 threadDelay (6 * 1000000) -- 6s
82 r2 <- readTVarIO runningJs
83 r2 `shouldBe` (Counts 2 0)
84 threadDelay (5 * 1000000) -- 5s
85 r3 <- readTVarIO runningJs
86 r3 `shouldBe` (Counts 0 0)
89 -- max runners = 2 with default settings
91 let settings = defaultJobSettings k
92 st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
94 (\_inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
97 mjob <- lookupJob jid (jobsData st)
99 Nothing -> error "boo"
100 Just je -> case jTask je of
101 DoneJ _ r -> isLeft r `shouldBe` True
108 describe "job queue" $ do
109 it "respects max runners limit" $
111 it "respects priorities" $
113 it "can handle exceptions" $