]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Queue.hs
Merge branch 'dev' into 193-dev-api-query-dev-fix
[gargantext.git] / src / Gargantext / Utils / Jobs / Queue.hs
1 {-# LANGUAGE ConstraintKinds, TypeFamilies, ScopedTypeVariables #-}
2 module Gargantext.Utils.Jobs.Queue where
3
4 import Control.Concurrent
5 import Control.Concurrent.STM
6 import Control.Exception
7 import Data.Function
8 import Data.List
9 import Data.Ord
10 import Data.Maybe
11 import Prelude
12 import System.IO
13
14 import qualified Data.Map as Map
15 import qualified Data.Vector as Vector
16
17 type EnumBounded t = (Ord t, Enum t, Bounded t)
18
19 data Q a = Q [a] [a] !Int
20
21 emptyQ :: Q a
22 emptyQ = Q [] [] 0
23
24 singletonQ :: a -> Q a
25 singletonQ a = Q [a] [] 1
26
27 snocQ :: a -> Q a -> Q a
28 snocQ a (Q xs ys sz) = Q xs (a:ys) (sz+1)
29
30 normalizeQ :: Q a -> Q a
31 normalizeQ (Q [] ys sz) = Q (reverse ys) [] sz
32 normalizeQ q = q
33
34 deleteQ :: Eq a => a -> Q a -> Q a
35 deleteQ x (Q xs ys sz) = Q xs' ys' sz'
36 where (xs_num_x, xs') = go xs (0, [])
37 (ys_num_x, ys') = go ys (0, [])
38 sz' = sz - xs_num_x - ys_num_x
39
40 go [] (n, bs) = (n, reverse bs)
41 go (a:as) (n, bs)
42 | a == x = go as (n+1, bs)
43 | otherwise = go as (n, a:bs)
44
45 popQ :: Q a -> Maybe (a, Q a)
46 popQ q@(Q as bs sz) = case as of
47 x:xs -> Just (x, Q xs bs (sz-1))
48 _ -> case normalizeQ q of
49 Q (x:xs) ys sz' -> Just (x, Q xs ys (sz'-1))
50 _ -> Nothing
51
52 sizeQ :: Q a -> Int
53 sizeQ (Q _ _ sz) = sz
54
55 peekQ :: Q a -> Maybe a
56 peekQ (Q _ _ 0) = Nothing
57 peekQ q = case normalizeQ q of
58 Q (x:_) _ _ -> Just x
59 _ -> Nothing
60
61 dropQ :: Q a -> Q a
62 dropQ (Q [] [] _) = Q [] [] 0
63 dropQ (Q (_x:xs) ys sz) = Q xs ys (sz-1)
64 dropQ q@(Q [] _ _) = dropQ (normalizeQ q)
65
66 -- | A priority is just a number. The greater, the earlier the job will get picked.
67 type Prio = Int
68
69 applyPrios
70 :: Ord t
71 => [(t, Prio)] -> Map.Map t Prio -> Map.Map t Prio
72 applyPrios changes prios = foldl' (\m (t, p) -> Map.insert t p m) prios changes
73
74 -- | A queue with different kinds of values, described by @t@, where each
75 -- kind can have a higher or lower priority than other kinds, as described
76 -- by the 'queuePrios' field.
77 data Queue t a = Queue
78 { queueData :: Vector.Vector (TVar (Q a))
79 , queueIndices :: Map.Map t Int -- indices into queueData
80 , queuePrios :: Map.Map t Prio
81 }
82
83 -- | Default priorities for the enumeration of job types @t@: everyone at 0.
84 defaultPrios :: EnumBounded t => Map.Map t Prio
85 defaultPrios = Map.fromList [ (t, 0) | t <- [minBound..maxBound] ]
86
87 -- | Create a new queue that'll apply the given priorities
88 newQueue :: EnumBounded t => Map.Map t Prio -> IO (Queue t a)
89 newQueue prios = do
90 let allTs = [ minBound .. maxBound ]
91 indices = Map.fromList (zip allTs [0..])
92 n = Map.size indices
93 vars <- Vector.replicateM n (newTVarIO emptyQ)
94 return $ Queue vars indices prios
95
96 -- | Add a new element to the queue, with the given kind.
97 addQueue :: Ord t => t -> a -> Queue t a -> IO ()
98 addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
99 Just i -> atomically $ modifyTVar (queueData q Vector.! i) (snocQ a)
100 Nothing -> error "addQueue: couldn't find queue for given job kind"
101
102 deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM ()
103 deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
104 Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
105 Nothing -> error "deleteQueue: queue type not found?!"
106
107
108 type Picker a = [(a, STM ())] -> STM (a, STM ())
109
110 -- | Figure out the candidates for being popped from the various queues.
111 -- We always look at highest priority queues first, and will pick between
112 -- equal priority items of different queues (candidates, elements of the
113 -- returned lists) by choosing the one that was queued first.
114 popQueue :: forall a t. Ord t => Picker a -> Queue t a -> IO (Maybe a)
115 popQueue picker q = atomically $ select prioLevels
116
117 where -- TODO: cache this in the 'Queue' data structure?
118 prioLevels :: [[(t, Prio)]]
119 prioLevels = groupBy ((==) `on` snd) . sortOn (Down . snd) $
120 Map.toList (queuePrios q)
121
122 select :: [[(t, Prio)]] -> STM (Maybe a)
123 select [] = return Nothing
124 select (level:levels) = do
125 mres <- selectLevel level
126 case mres of
127 Nothing -> select levels
128 Just res -> return (Just res)
129
130 selectLevel :: [(t, Prio)] -> STM (Maybe a)
131 selectLevel xs = do
132 let indices = catMaybes $ map (flip Map.lookup (queueIndices q) . fst) xs
133 queues = map (queueData q Vector.!) indices
134 go qvar = readTVar qvar >>= \qu ->
135 return (peekQ qu, modifyTVar' qvar dropQ)
136 mtopItems <- catMaybesFst <$> traverse go queues
137 case mtopItems of
138 Nothing -> return Nothing
139 Just [] -> return Nothing
140 Just topItems -> do
141 (earliestItem, popItem) <- picker topItems
142 popItem
143 return (Just earliestItem)
144
145 catMaybesFst ((Nothing, _b) : xs) = catMaybesFst xs
146 catMaybesFst ((Just a, b) : xs) = ((a, b) :) <$> catMaybesFst xs
147 catMaybesFst [] = Just []
148
149 -- | A ready-to-use runner that pops the highest priority item off the queue
150 -- and processes it using the given function.
151 queueRunner :: Ord t => Picker a -> (a -> IO ()) -> Queue t a -> IO ()
152 queueRunner picker f q = go
153
154 where go = do
155 mres <- popQueue picker q
156 case mres of
157 Just a -> f a `catch` exc
158 Nothing -> return ()
159 threadDelay 5000 -- 5ms
160 go
161
162 exc :: SomeException -> IO ()
163 exc e = hPutStrLn stderr ("Queue runner exception: " ++ show e)
164
165 -- | Create a queue and @n@ runner actions for it, with the given priorities
166 -- for the runners to apply when picking a new item.
167 newQueueWithRunners
168 :: EnumBounded t
169 => Int -- ^ number of runners
170 -> Map.Map t Prio -- ^ priorities
171 -> Picker a -- ^ how to pick between equal priority items
172 -> (a -> IO ()) -- ^ what to do with each item
173 -> IO (Queue t a, [IO ()])
174 newQueueWithRunners n prios picker f = do
175 q <- newQueue prios
176 let runners = replicate n (queueRunner picker f q)
177 return (q, runners)