1 module Gargantext.Utils.Jobs.State where
3 import Gargantext.Utils.Jobs.Map
4 import Gargantext.Utils.Jobs.Queue
5 import Gargantext.Utils.Jobs.Settings
7 import Control.Concurrent.Async
8 import Control.Concurrent.STM
11 import Data.Map.Strict (Map)
15 import Data.Time.Clock
18 import qualified Data.Map.Strict as Map
19 import qualified Servant.Job.Core as SJ
20 import qualified Servant.Job.Types as SJ
22 type IDGenerator = TVar Int
24 data JobsState t w a = JobsState
25 { jobsData :: JobMap (SJ.JobID 'SJ.Safe) w a
26 , jobsQ :: Queue t (SJ.JobID 'SJ.Safe)
27 , jobsIdGen :: IDGenerator
29 , jsRunners :: [Async ()]
32 nextID :: UTCTime -> JobSettings -> JobsState t w a -> STM (SJ.JobID 'SJ.Safe)
34 n <- stateTVar (jobsIdGen st) $ \i -> (i, i+1)
35 pure $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
39 (EnumBounded t, Monoid w)
42 -> IO (JobsState t w a)
43 newJobsState js prios = do
46 (q, runners) <- newQueueWithRunners (jsNumRunners js) prios (picker jmap) $ \jid -> do
47 mje <- lookupJob jid jmap
50 Just je -> case jTask je of
52 rj <- runJob jid qj jmap js
53 (_res, _logs) <- waitJobDone jid rj jmap
56 putStrLn $ "Starting " ++ show (jsNumRunners js) ++ " job runners."
57 gcAsync <- async $ gcThread js jmap
58 runnersAsyncs <- traverse async runners
59 return (JobsState jmap q idgen gcAsync runnersAsyncs)
62 :: JobMap (SJ.JobID 'SJ.Safe) w a
63 -> Picker (SJ.JobID 'SJ.Safe)
64 picker (JobMap jmap) xs = do
65 jinfos <- fmap catMaybes . forM xs $ \(jid, popjid) -> do
66 mje <- Map.lookup jid <$> readTVar jmap
68 Nothing -> return Nothing
69 Just je -> return $ Just (jid, popjid, jRegistered je)
70 let (jid, popjid, _) = minimumBy (comparing _3) jinfos
79 -> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
82 -> IO (SJ.JobID 'SJ.Safe)
83 pushJob jobkind input f js st = do
85 atomically $ pushJobWithTime now jobkind input f js st
92 -> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
95 -> STM (SJ.JobID 'SJ.Safe)
96 pushJobWithTime now jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
97 jid <- nextID now js st
98 _je <- addJobEntry now jid input f jmap
99 addQueue jobkind jid jqueue