]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Queue.hs
[FIX] numRunners to 1 by default for now
[gargantext.git] / src / Gargantext / Utils / Jobs / Queue.hs
1 {-# LANGUAGE ConstraintKinds #-}
2 module Gargantext.Utils.Jobs.Queue where
3
4 import Control.Concurrent
5 import Control.Concurrent.STM
6 import Control.Exception
7 import Data.List
8 import Data.Ord
9 import Data.Maybe
10 import Prelude
11 import System.IO
12
13 import qualified Data.Map as Map
14 import qualified Data.Vector as Vector
15
16 type EnumBounded t = (Ord t, Enum t, Bounded t)
17
18 data Q a = Q [a] [a] !Int
19
20 emptyQ :: Q a
21 emptyQ = Q [] [] 0
22
23 singletonQ :: a -> Q a
24 singletonQ a = Q [a] [] 1
25
26 snocQ :: a -> Q a -> Q a
27 snocQ a (Q xs ys sz) = Q xs (a:ys) (sz+1)
28
29 normalizeQ :: Q a -> Q a
30 normalizeQ (Q [] ys sz) = Q (reverse ys) [] sz
31 normalizeQ q = q
32
33 deleteQ :: Eq a => a -> Q a -> Q a
34 deleteQ x (Q xs ys sz) = Q xs' ys' sz'
35 where (xs_num_x, xs') = go xs (0, [])
36 (ys_num_x, ys') = go ys (0, [])
37 sz' = sz - xs_num_x - ys_num_x
38
39 go [] (n, bs) = (n, reverse bs)
40 go (a:as) (n, bs)
41 | a == x = go as (n+1, bs)
42 | otherwise = go as (n, a:bs)
43
44 popQ :: Q a -> Maybe (a, Q a)
45 popQ q@(Q as bs sz) = case as of
46 x:xs -> Just (x, Q xs bs (sz-1))
47 _ -> case normalizeQ q of
48 Q (x:xs) ys sz' -> Just (x, Q xs ys (sz'-1))
49 _ -> Nothing
50
51 sizeQ :: Q a -> Int
52 sizeQ (Q _ _ sz) = sz
53
54 -- | A priority is just a number. The greater, the earlier the job will get picked.
55 type Prio = Int
56
57 applyPrios
58 :: Ord t
59 => [(t, Prio)] -> Map.Map t Prio -> Map.Map t Prio
60 applyPrios changes prios = foldl' (\m (t, p) -> Map.insert t p m) prios changes
61
62 -- | A queue with different kinds of values, described by @t@, where each
63 -- kind can have a higher or lower priority than other kinds, as described
64 -- by the 'queuePrios' field.
65 data Queue t a = Queue
66 { queueData :: Vector.Vector (TVar (Q a))
67 , queueIndices :: Map.Map t Int -- indices into queueData
68 , queuePrios :: Map.Map t Prio
69 }
70
71 -- | Default priorities for the enumeration of job types @t@: everyone at 0.
72 defaultPrios :: EnumBounded t => Map.Map t Prio
73 defaultPrios = Map.fromList [ (t, 0) | t <- [minBound..maxBound] ]
74
75 -- | Create a new queue that'll apply the given priorities
76 newQueue :: EnumBounded t => Map.Map t Prio -> IO (Queue t a)
77 newQueue prios = do
78 let allTs = [ minBound .. maxBound ]
79 indices = Map.fromList (zip allTs [0..])
80 n = Map.size indices
81 vars <- Vector.replicateM n (newTVarIO emptyQ)
82 return $ Queue vars indices prios
83
84 -- | Add a new element to the queue, with the given kind.
85 addQueue :: Ord t => t -> a -> Queue t a -> IO ()
86 addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
87 Just i -> atomically $ modifyTVar (queueData q Vector.! i) (snocQ a)
88 Nothing -> error "addQueue: couldn't find queue for given job kind"
89
90 deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM ()
91 deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
92 Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
93 Nothing -> error "deleteQueue: queue type not found?!"
94
95 -- | Try to pop the highest priority item off of the queue, per the priorities
96 -- defined by the @'Map.Map' t 'Prio'@ argument to 'newQueue'.
97 popQueue :: Ord t => Queue t a -> IO (Maybe a)
98 popQueue q = go queues
99
100 where prios = sortOn (Down . snd) $ Map.toList (queuePrios q)
101 indices = flip map prios $ \(t, _prio) ->
102 case Map.lookup t (queueIndices q) of
103 Just i -> i
104 Nothing -> error "popQueue: couldn't find queue index for given job kind"
105 queues = [ queueData q Vector.! i | i <- indices ]
106 go [] = return Nothing
107 go (q1:qs) = do
108 mitem <- atomically $ do
109 qa <- readTVar q1
110 case popQ qa of
111 Just (a, qa') -> writeTVar q1 qa' >> return (Just a)
112 Nothing -> return Nothing
113 case mitem of
114 Nothing -> go qs
115 a -> return a
116
117 -- | A ready-to-use runner that pops the highest priority item off the queue
118 -- and processes it using the given function.
119 queueRunner :: Ord t => (a -> IO ()) -> Queue t a -> IO ()
120 queueRunner f q = go
121
122 where go = do
123 mres <- popQueue q
124 case mres of
125 Just a -> f a `catch` exc
126 Nothing -> return ()
127 threadDelay 5000 -- 5ms
128 go
129
130 exc :: SomeException -> IO ()
131 exc e = hPutStrLn stderr ("Queue runner exception: " ++ show e)
132
133 -- | Create a queue and @n@ runner actions for it, with the given priorities
134 -- for the runners to apply when picking a new item.
135 newQueueWithRunners
136 :: EnumBounded t
137 => Int -- ^ number of runners
138 -> Map.Map t Prio -- ^ priorities
139 -> (a -> IO ()) -- ^ what to do with each item
140 -> IO (Queue t a, [IO ()])
141 newQueueWithRunners n prios f = do
142 q <- newQueue prios
143 let runners = replicate n (queueRunner f q)
144 return (q, runners)