1 {-# LANGUAGE ConstraintKinds, TypeFamilies, ScopedTypeVariables #-}
2 module Gargantext.Utils.Jobs.Queue where
4 import Control.Concurrent
5 import Control.Concurrent.STM
6 import Control.Exception
15 import qualified Data.Map as Map
16 import qualified Data.Vector as Vector
18 type EnumBounded t = (Ord t, Enum t, Bounded t)
20 data Q a = Q [a] [a] !Int
25 singletonQ :: a -> Q a
26 singletonQ a = Q [a] [] 1
28 snocQ :: a -> Q a -> Q a
29 snocQ a (Q xs ys sz) = Q xs (a:ys) (sz+1)
31 normalizeQ :: Q a -> Q a
32 normalizeQ (Q [] ys sz) = Q (reverse ys) [] sz
35 deleteQ :: Eq a => a -> Q a -> Q a
36 deleteQ x (Q xs ys sz) = Q xs' ys' sz'
37 where (xs_num_x, xs') = go xs (0, [])
38 (ys_num_x, ys') = go ys (0, [])
39 sz' = sz - xs_num_x - ys_num_x
41 go [] (n, bs) = (n, reverse bs)
43 | a == x = go as (n+1, bs)
44 | otherwise = go as (n, a:bs)
46 popQ :: Q a -> Maybe (a, Q a)
47 popQ q@(Q as bs sz) = case as of
48 x:xs -> Just (x, Q xs bs (sz-1))
49 _ -> case normalizeQ q of
50 Q (x:xs) ys sz' -> Just (x, Q xs ys (sz'-1))
56 peekQ :: Q a -> Maybe a
57 peekQ (Q _ _ 0) = Nothing
58 peekQ q = case normalizeQ q of
63 dropQ (Q [] [] _) = Q [] [] 0
64 dropQ (Q (_x:xs) ys sz) = Q xs ys (sz-1)
65 dropQ q@(Q [] _ _) = dropQ (normalizeQ q)
67 -- | A priority is just a number. The greater, the earlier the job will get picked.
72 => [(t, Prio)] -> Map.Map t Prio -> Map.Map t Prio
73 applyPrios changes prios = foldl' (\m (t, p) -> Map.insert t p m) prios changes
75 -- | A queue with different kinds of values, described by @t@, where each
76 -- kind can have a higher or lower priority than other kinds, as described
77 -- by the 'queuePrios' field.
78 data Queue t a = Queue
79 { queueData :: Vector.Vector (TVar (Q a))
80 , queueIndices :: Map.Map t Int -- indices into queueData
81 , queuePrios :: Map.Map t Prio
84 -- | Default priorities for the enumeration of job types @t@: everyone at 0.
85 defaultPrios :: EnumBounded t => Map.Map t Prio
86 defaultPrios = Map.fromList [ (t, 0) | t <- [minBound..maxBound] ]
88 -- | Create a new queue that'll apply the given priorities
89 newQueue :: EnumBounded t => Map.Map t Prio -> IO (Queue t a)
91 let allTs = [ minBound .. maxBound ]
92 indices = Map.fromList (zip allTs [0..])
94 vars <- Vector.replicateM n (newTVarIO emptyQ)
95 return $ Queue vars indices prios
97 -- | Add a new element to the queue, with the given kind.
98 addQueue :: Ord t => t -> a -> Queue t a -> STM ()
99 addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
100 Just i -> modifyTVar (queueData q Vector.! i) (snocQ a)
101 Nothing -> error "addQueue: couldn't find queue for given job kind"
103 deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM ()
104 deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
105 Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
106 Nothing -> error "deleteQueue: queue type not found?!"
108 -- | Dump the contents of the queue, for debugging purposes.
109 debugDumpQueue :: (Enum t, Bounded t, Ord t) => Queue t a -> STM [(t, a)]
110 debugDumpQueue q = mconcat <$> (forM [minBound..maxBound] $ \t -> do
111 readTVar (queueData q Vector.! (i t)) >>= debugDumpQ t)
113 i t = fromJust $ Map.lookup t (queueIndices q)
114 debugDumpQ t (Q xs ys _) = return $ map (\x -> (t, x)) (xs ++ reverse ys)
116 type Picker a = [(a, STM ())] -> STM (a, STM ())
118 -- | Figure out the candidates for being popped from the various queues.
119 -- We always look at highest priority queues first, and will pick between
120 -- equal priority items of different queues (candidates, elements of the
121 -- returned lists) by choosing the one that was queued first.
122 popQueue :: forall a t. Ord t => Picker a -> Queue t a -> IO (Maybe a)
123 popQueue picker q = atomically $ select prioLevels
125 where -- TODO: cache this in the 'Queue' data structure?
126 prioLevels :: [[(t, Prio)]]
127 prioLevels = groupBy ((==) `on` snd) . sortOn (Down . snd) $
128 Map.toList (queuePrios q)
130 select :: [[(t, Prio)]] -> STM (Maybe a)
131 select [] = return Nothing
132 select (level:levels) = do
133 mres <- selectLevel level
135 Nothing -> select levels
136 Just res -> pure $ Just res
138 selectLevel :: [(t, Prio)] -> STM (Maybe a)
140 let indices = catMaybes $ map (flip Map.lookup (queueIndices q) . fst) xs
141 queues = map (queueData q Vector.!) indices
142 go qvar = readTVar qvar >>= \qu ->
143 return (peekQ qu, modifyTVar' qvar dropQ)
144 mtopItems <- catMaybesFst <$> traverse go queues
146 Nothing -> return Nothing
147 Just [] -> return Nothing
149 (earliestItem, popItem) <- picker topItems
151 return (Just earliestItem)
153 catMaybesFst ((Nothing, _b) : xs) = catMaybesFst xs
154 catMaybesFst ((Just a, b) : xs) = ((a, b) :) <$> catMaybesFst xs
155 catMaybesFst [] = Just []
157 -- | A ready-to-use runner that pops the highest priority item off the queue
158 -- and processes it using the given function.
159 queueRunner :: Ord t => Picker a -> (a -> IO ()) -> Queue t a -> IO ()
160 queueRunner picker f q = go
163 mres <- popQueue picker q
165 Just a -> f a `catch` exc
167 threadDelay 5000 -- 5ms
170 exc :: SomeException -> IO ()
171 exc e = hPutStrLn stderr ("Queue runner exception: " ++ show e)
173 -- | Create a queue and @n@ runner actions for it, with the given priorities
174 -- for the runners to apply when picking a new item.
177 => Int -- ^ number of runners
178 -> Map.Map t Prio -- ^ priorities
179 -> Picker a -- ^ how to pick between equal priority items
180 -> (a -> IO ()) -- ^ what to do with each item
181 -> IO (Queue t a, [IO ()])
182 newQueueWithRunners n prios picker f = do
184 let runners = replicate n (queueRunner picker f q)