]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/State.hs
fix the synchronic clustering
[gargantext.git] / src / Gargantext / Utils / Jobs / State.hs
1 module Gargantext.Utils.Jobs.State where
2
3 import Gargantext.Utils.Jobs.Map
4 import Gargantext.Utils.Jobs.Queue
5 import Gargantext.Utils.Jobs.Settings
6
7 import Control.Concurrent.Async
8 import Control.Concurrent.STM
9 import Control.Monad
10 import qualified Data.List as List
11 import Data.Map.Strict (Map)
12 import Data.Maybe
13 import Data.Ord
14 import Data.Proxy
15 import Data.Time.Clock
16 import Prelude
17
18 import qualified Data.Map.Strict as Map
19 import qualified Servant.Job.Core as SJ
20 import qualified Servant.Job.Types as SJ
21
22 type IDGenerator = TVar Int
23
24 data JobsState t w a = JobsState
25 { jobsData :: JobMap (SJ.JobID 'SJ.Safe) w a
26 , jobsQ :: Queue t (SJ.JobID 'SJ.Safe)
27 , jobsIdGen :: IDGenerator
28 , jsGC :: Async ()
29 , jsRunners :: [Async ()]
30 }
31
32 nextID :: UTCTime -> JobSettings -> JobsState t w a -> STM (SJ.JobID 'SJ.Safe)
33 nextID now js st = do
34 n <- stateTVar (jobsIdGen st) $ \i -> (i, i+1)
35 pure $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
36
37 newJobsState
38 :: forall t w a.
39 (EnumBounded t, Monoid w)
40 => JobSettings
41 -> Map t Prio
42 -> IO (JobsState t w a)
43 newJobsState js prios = do
44 jmap <- newJobMap
45 idgen <- newTVarIO 0
46 (q, runners) <- newQueueWithRunners (jsNumRunners js) prios (picker jmap) $ \jid -> do
47 mje <- lookupJob jid jmap
48 case mje of
49 Nothing -> return ()
50 Just je -> case jTask je of
51 QueuedJ qj -> do
52 rj <- runJob jid qj jmap js
53 (_res, _logs) <- waitJobDone jid rj jmap
54 return ()
55 _ -> return ()
56 putStrLn $ "Starting " ++ show (jsNumRunners js) ++ " job runners."
57 gcAsync <- async $ gcThread js jmap
58 runnersAsyncs <- traverse async runners
59 return (JobsState jmap q idgen gcAsync runnersAsyncs)
60
61 where picker
62 :: JobMap (SJ.JobID 'SJ.Safe) w a
63 -> Picker (SJ.JobID 'SJ.Safe)
64 picker (JobMap jmap) xs = do
65 jinfos <- fmap catMaybes . forM xs $ \(jid, popjid) -> do
66 mje <- Map.lookup jid <$> readTVar jmap
67 case mje of
68 Nothing -> return Nothing
69 Just je -> return $ Just (jid, popjid, jRegistered je)
70 let (jid, popjid, _) = List.minimumBy (comparing _3) jinfos
71 return (jid, popjid)
72
73 _3 (_, _, c) = c
74
75 pushJob
76 :: Ord t
77 => t
78 -> a
79 -> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
80 -> JobSettings
81 -> JobsState t w r
82 -> IO (SJ.JobID 'SJ.Safe)
83 pushJob jobkind input f js st = do
84 now <- getCurrentTime
85 atomically $ pushJobWithTime now jobkind input f js st
86
87 pushJobWithTime
88 :: Ord t
89 => UTCTime
90 -> t
91 -> a
92 -> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
93 -> JobSettings
94 -> JobsState t w r
95 -> STM (SJ.JobID 'SJ.Safe)
96 pushJobWithTime now jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
97 jid <- nextID now js st
98 _je <- addJobEntry now jid input f jmap
99 addQueue jobkind jid jqueue
100 pure jid