1 module Gargantext.Utils.Jobs.State where
3 import Gargantext.Utils.Jobs.Map
4 import Gargantext.Utils.Jobs.Queue
5 import Gargantext.Utils.Jobs.Settings
7 import Control.Concurrent.Async
8 import Control.Concurrent.STM
10 import qualified Data.List as List
11 import Data.Map.Strict (Map)
15 import Data.Time.Clock
18 import qualified Data.Map.Strict as Map
19 import qualified Servant.Job.Core as SJ
20 import qualified Servant.Job.Types as SJ
22 type IDGenerator = TVar Int
24 data JobsState t w a = JobsState
25 { jobsData :: JobMap (SJ.JobID 'SJ.Safe) w a
26 , jobsQ :: Queue t (SJ.JobID 'SJ.Safe)
27 , jobsIdGen :: IDGenerator
29 , jsRunners :: [Async ()]
32 nextID :: UTCTime -> JobSettings -> JobsState t w a -> STM (SJ.JobID 'SJ.Safe)
34 n <- stateTVar (jobsIdGen st) $ \i -> (i, i+1)
35 pure $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
39 (EnumBounded t, Monoid w)
42 -> IO (JobsState t w a)
43 newJobsState js prios = do
46 (q, runners) <- newQueueWithRunners (jsNumRunners js) prios (picker jmap) $ \jid -> do
47 mje <- lookupJob jid jmap
50 Just je -> case jTask je of
52 rj <- runJob jid qj jmap js
53 (_res, _logs) <- waitJobDone jid rj jmap
56 putStrLn $ "Starting " ++ show (jsNumRunners js) ++ " job runners."
57 gcAsync <- async $ gcThread js jmap
58 runnersAsyncs <- traverse async runners
59 return (JobsState jmap q idgen gcAsync runnersAsyncs)
62 :: JobMap (SJ.JobID 'SJ.Safe) w a
63 -> Picker (SJ.JobID 'SJ.Safe)
64 picker (JobMap jmap) xs = do
65 jinfos <- fmap catMaybes . forM xs $ \(jid, popjid) -> do
66 mje <- Map.lookup jid <$> readTVar jmap
68 Nothing -> return Nothing
69 Just je -> return $ Just (jid, popjid, jRegistered je)
70 let (jid, popjid, _) = List.minimumBy (comparing _3) jinfos
79 -> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
82 -> IO (SJ.JobID 'SJ.Safe)
83 pushJob jobkind input f js st = do
85 atomically $ pushJobWithTime now jobkind input f js st
92 -> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
95 -> STM (SJ.JobID 'SJ.Safe)
96 pushJobWithTime now jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
97 jid <- nextID now js st
98 _je <- addJobEntry now jid input f jmap
99 addQueue jobkind jid jqueue