1 module Gargantext.Utils.Jobs.State where
3 import Gargantext.Utils.Jobs.Map
4 import Gargantext.Utils.Jobs.Queue
5 import Gargantext.Utils.Jobs.Settings
7 import Control.Concurrent.Async
8 import Control.Concurrent.STM
11 import Data.Map.Strict (Map)
15 import Data.Time.Clock
18 import qualified Data.Map.Strict as Map
19 import qualified Servant.Job.Core as SJ
20 import qualified Servant.Job.Types as SJ
22 type IDGenerator = TVar Int
24 data JobsState t w a = JobsState
25 { jobsData :: JobMap (SJ.JobID 'SJ.Safe) w a
26 , jobsQ :: Queue t (SJ.JobID 'SJ.Safe)
27 , jobsIdGen :: IDGenerator
29 , jsRunners :: [Async ()]
32 nextID :: JobSettings -> JobsState t w a -> IO (SJ.JobID 'SJ.Safe)
35 n <- atomically $ stateTVar (jobsIdGen st) $ \i -> (i, i+1)
36 return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
40 (EnumBounded t, Monoid w)
43 -> IO (JobsState t w a)
44 newJobsState js prios = do
47 (q, runners) <- newQueueWithRunners (jsNumRunners js) prios (picker jmap) $ \jid -> do
48 mje <- lookupJob jid jmap
51 Just je -> case jTask je of
53 rj <- runJob jid qj jmap js
54 (_res, _logs) <- waitJobDone jid rj jmap
57 putStrLn $ "Starting " ++ show (jsNumRunners js) ++ " job runners."
58 gcAsync <- async $ gcThread js jmap
59 runnersAsyncs <- traverse async runners
60 return (JobsState jmap q idgen gcAsync runnersAsyncs)
63 :: JobMap (SJ.JobID 'SJ.Safe) w a
64 -> Picker (SJ.JobID 'SJ.Safe)
65 picker (JobMap jmap) xs = do
66 jinfos <- fmap catMaybes . forM xs $ \(jid, popjid) -> do
67 mje <- Map.lookup jid <$> readTVar jmap
69 Nothing -> return Nothing
70 Just je -> return $ Just (jid, popjid, jRegistered je)
71 let (jid, popjid, _) = minimumBy (comparing _3) jinfos
79 -> (a -> Logger w -> IO r)
82 -> IO (SJ.JobID 'SJ.Safe)
83 pushJob jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
85 _je <- addJobEntry jid input f jmap
86 addQueue jobkind jid jqueue