1 module Gargantext.Utils.Jobs.State where
3 import Gargantext.Utils.Jobs.Map
4 import Gargantext.Utils.Jobs.Queue
5 import Gargantext.Utils.Jobs.Settings
7 import Control.Concurrent.Async
8 import Control.Concurrent.STM
11 import Data.Time.Clock
14 -- import qualified Data.Map as Map
15 import qualified Servant.Job.Core as SJ
16 import qualified Servant.Job.Types as SJ
18 type IDGenerator = TVar Int
20 data JobsState t w a = JobsState
21 { jobsData :: JobMap (SJ.JobID 'SJ.Safe) w a
22 , jobsQ :: Queue t (SJ.JobID 'SJ.Safe)
23 , jobsIdGen :: IDGenerator
25 , jsRunners :: [Async ()]
28 nextID :: JobSettings -> JobsState t w a -> IO (SJ.JobID 'SJ.Safe)
31 n <- atomically $ stateTVar (jobsIdGen st) $ \i -> (i, i+1)
32 return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
35 :: (EnumBounded t, Monoid w)
38 -> IO (JobsState t w a)
39 newJobsState js prios = do
42 (q, runners) <- newQueueWithRunners (jsNumRunners js) prios $ \jid -> do
43 mje <- lookupJob jid jmap
46 Just je -> case jTask je of
48 rj <- runJob jid qj jmap js
49 (_res, _logs) <- waitJobDone jid rj jmap
52 putStrLn $ "Starting " ++ show (jsNumRunners js) ++ " job runners."
53 gcAsync <- async $ gcThread js jmap
54 runnersAsyncs <- traverse async runners
55 return (JobsState jmap q idgen gcAsync runnersAsyncs)
61 -> (a -> Logger w -> IO r)
64 -> IO (SJ.JobID 'SJ.Safe)
65 pushJob jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
67 _je <- addJobEntry jid input f jmap
68 addQueue jobkind jid jqueue