]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Queue.hs
[FIX/FEAT] Langs
[gargantext.git] / src / Gargantext / Utils / Jobs / Queue.hs
1 {-# LANGUAGE ConstraintKinds, TypeFamilies, ScopedTypeVariables #-}
2 module Gargantext.Utils.Jobs.Queue where
3
4 import Control.Concurrent
5 import Control.Concurrent.STM
6 import Control.Exception
7 import Control.Monad
8 import Data.Function
9 import Data.List
10 import Data.Ord
11 import Data.Maybe
12 import Prelude
13 import System.IO
14
15 import qualified Data.Map as Map
16 import qualified Data.Vector as Vector
17
18 type EnumBounded t = (Ord t, Enum t, Bounded t)
19
20 data Q a = Q [a] [a] !Int
21
22 emptyQ :: Q a
23 emptyQ = Q [] [] 0
24
25 singletonQ :: a -> Q a
26 singletonQ a = Q [a] [] 1
27
28 snocQ :: a -> Q a -> Q a
29 snocQ a (Q xs ys sz) = Q xs (a:ys) (sz+1)
30
31 normalizeQ :: Q a -> Q a
32 normalizeQ (Q [] ys sz) = Q (reverse ys) [] sz
33 normalizeQ q = q
34
35 deleteQ :: Eq a => a -> Q a -> Q a
36 deleteQ x (Q xs ys sz) = Q xs' ys' sz'
37 where (xs_num_x, xs') = go xs (0, [])
38 (ys_num_x, ys') = go ys (0, [])
39 sz' = sz - xs_num_x - ys_num_x
40
41 go [] (n, bs) = (n, reverse bs)
42 go (a:as) (n, bs)
43 | a == x = go as (n+1, bs)
44 | otherwise = go as (n, a:bs)
45
46 popQ :: Q a -> Maybe (a, Q a)
47 popQ q@(Q as bs sz) = case as of
48 x:xs -> Just (x, Q xs bs (sz-1))
49 _ -> case normalizeQ q of
50 Q (x:xs) ys sz' -> Just (x, Q xs ys (sz'-1))
51 _ -> Nothing
52
53 sizeQ :: Q a -> Int
54 sizeQ (Q _ _ sz) = sz
55
56 peekQ :: Q a -> Maybe a
57 peekQ (Q _ _ 0) = Nothing
58 peekQ q = case normalizeQ q of
59 Q (x:_) _ _ -> Just x
60 _ -> Nothing
61
62 dropQ :: Q a -> Q a
63 dropQ (Q [] [] _) = Q [] [] 0
64 dropQ (Q (_x:xs) ys sz) = Q xs ys (sz-1)
65 dropQ q@(Q [] _ _) = dropQ (normalizeQ q)
66
67 -- | A priority is just a number. The greater, the earlier the job will get picked.
68 type Prio = Int
69
70 applyPrios
71 :: Ord t
72 => [(t, Prio)] -> Map.Map t Prio -> Map.Map t Prio
73 applyPrios changes prios = foldl' (\m (t, p) -> Map.insert t p m) prios changes
74
75 -- | A queue with different kinds of values, described by @t@, where each
76 -- kind can have a higher or lower priority than other kinds, as described
77 -- by the 'queuePrios' field.
78 data Queue t a = Queue
79 { queueData :: Vector.Vector (TVar (Q a))
80 , queueIndices :: Map.Map t Int -- indices into queueData
81 , queuePrios :: Map.Map t Prio
82 }
83
84 -- | Default priorities for the enumeration of job types @t@: everyone at 0.
85 defaultPrios :: EnumBounded t => Map.Map t Prio
86 defaultPrios = Map.fromList [ (t, 0) | t <- [minBound..maxBound] ]
87
88 -- | Create a new queue that'll apply the given priorities
89 newQueue :: EnumBounded t => Map.Map t Prio -> IO (Queue t a)
90 newQueue prios = do
91 let allTs = [ minBound .. maxBound ]
92 indices = Map.fromList (zip allTs [0..])
93 n = Map.size indices
94 vars <- Vector.replicateM n (newTVarIO emptyQ)
95 return $ Queue vars indices prios
96
97 -- | Add a new element to the queue, with the given kind.
98 addQueue :: Ord t => t -> a -> Queue t a -> STM ()
99 addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
100 Just i -> modifyTVar (queueData q Vector.! i) (snocQ a)
101 Nothing -> error "addQueue: couldn't find queue for given job kind"
102
103 deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM ()
104 deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
105 Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
106 Nothing -> error "deleteQueue: queue type not found?!"
107
108 -- | Dump the contents of the queue, for debugging purposes.
109 debugDumpQueue :: (Enum t, Bounded t, Ord t) => Queue t a -> STM [(t, a)]
110 debugDumpQueue q = mconcat <$> (forM [minBound..maxBound] $ \t -> do
111 readTVar (queueData q Vector.! (i t)) >>= debugDumpQ t)
112 where
113 i t = fromJust $ Map.lookup t (queueIndices q)
114 debugDumpQ t (Q xs ys _) = return $ map (\x -> (t, x)) (xs ++ reverse ys)
115
116 type Picker a = [(a, STM ())] -> STM (a, STM ())
117
118 -- | Figure out the candidates for being popped from the various queues.
119 -- We always look at highest priority queues first, and will pick between
120 -- equal priority items of different queues (candidates, elements of the
121 -- returned lists) by choosing the one that was queued first.
122 popQueue :: forall a t. Ord t => Picker a -> Queue t a -> IO (Maybe a)
123 popQueue picker q = atomically $ select prioLevels
124
125 where -- TODO: cache this in the 'Queue' data structure?
126 prioLevels :: [[(t, Prio)]]
127 prioLevels = groupBy ((==) `on` snd) . sortOn (Down . snd) $
128 Map.toList (queuePrios q)
129
130 select :: [[(t, Prio)]] -> STM (Maybe a)
131 select [] = return Nothing
132 select (level:levels) = do
133 mres <- selectLevel level
134 case mres of
135 Nothing -> select levels
136 Just res -> pure $ Just res
137
138 selectLevel :: [(t, Prio)] -> STM (Maybe a)
139 selectLevel xs = do
140 let indices = catMaybes $ map (flip Map.lookup (queueIndices q) . fst) xs
141 queues = map (queueData q Vector.!) indices
142 go qvar = readTVar qvar >>= \qu ->
143 return (peekQ qu, modifyTVar' qvar dropQ)
144 mtopItems <- catMaybesFst <$> traverse go queues
145 case mtopItems of
146 Nothing -> return Nothing
147 Just [] -> return Nothing
148 Just topItems -> do
149 (earliestItem, popItem) <- picker topItems
150 popItem
151 return (Just earliestItem)
152
153 catMaybesFst ((Nothing, _b) : xs) = catMaybesFst xs
154 catMaybesFst ((Just a, b) : xs) = ((a, b) :) <$> catMaybesFst xs
155 catMaybesFst [] = Just []
156
157 -- | A ready-to-use runner that pops the highest priority item off the queue
158 -- and processes it using the given function.
159 queueRunner :: Ord t => Picker a -> (a -> IO ()) -> Queue t a -> IO ()
160 queueRunner picker f q = go
161
162 where go = do
163 mres <- popQueue picker q
164 case mres of
165 Just a -> f a `catch` exc
166 Nothing -> return ()
167 threadDelay 5000 -- 5ms
168 go
169
170 exc :: SomeException -> IO ()
171 exc e = hPutStrLn stderr ("Queue runner exception: " ++ show e)
172
173 -- | Create a queue and @n@ runner actions for it, with the given priorities
174 -- for the runners to apply when picking a new item.
175 newQueueWithRunners
176 :: EnumBounded t
177 => Int -- ^ number of runners
178 -> Map.Map t Prio -- ^ priorities
179 -> Picker a -- ^ how to pick between equal priority items
180 -> (a -> IO ()) -- ^ what to do with each item
181 -> IO (Queue t a, [IO ()])
182 newQueueWithRunners n prios picker f = do
183 q <- newQueue prios
184 let runners = replicate n (queueRunner picker f q)
185 return (q, runners)