]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/State.hs
introduce and use a flexible job queue system
[gargantext.git] / src / Gargantext / Utils / Jobs / State.hs
1 module Gargantext.Utils.Jobs.State where
2
3 import Gargantext.Utils.Jobs.Map
4 import Gargantext.Utils.Jobs.Queue
5 import Gargantext.Utils.Jobs.Settings
6
7 import Control.Concurrent.Async
8 import Control.Concurrent.STM
9 import Data.Map (Map)
10 import Data.Proxy
11 import Data.Time.Clock
12 import Prelude
13
14 -- import qualified Data.Map as Map
15 import qualified Servant.Job.Core as SJ
16 import qualified Servant.Job.Types as SJ
17
18 type IDGenerator = TVar Int
19
20 data JobsState t w a = JobsState
21 { jobsData :: JobMap (SJ.JobID 'SJ.Safe) w a
22 , jobsQ :: Queue t (SJ.JobID 'SJ.Safe)
23 , jobsIdGen :: IDGenerator
24 , jsGC :: Async ()
25 , jsRunners :: [Async ()]
26 }
27
28 nextID :: JobSettings -> JobsState t w a -> IO (SJ.JobID 'SJ.Safe)
29 nextID js st = do
30 now <- getCurrentTime
31 n <- atomically $ stateTVar (jobsIdGen st) $ \i -> (i, i+1)
32 return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
33
34 newJobsState
35 :: (EnumBounded t, Monoid w)
36 => JobSettings
37 -> Map t Prio
38 -> IO (JobsState t w a)
39 newJobsState js prios = do
40 jmap <- newJobMap
41 idgen <- newTVarIO 0
42 (q, runners) <- newQueueWithRunners (jsNumRunners js) prios $ \jid -> do
43 mje <- lookupJob jid jmap
44 case mje of
45 Nothing -> return ()
46 Just je -> case jTask je of
47 QueuedJ qj -> do
48 rj <- runJob jid qj jmap js
49 (_res, _logs) <- waitJobDone jid rj jmap
50 return ()
51 _ -> return ()
52 putStrLn $ "Starting " ++ show (jsNumRunners js) ++ " job runners."
53 gcAsync <- async $ gcThread js jmap
54 runnersAsyncs <- traverse async runners
55 return (JobsState jmap q idgen gcAsync runnersAsyncs)
56
57 pushJob
58 :: Ord t
59 => t
60 -> a
61 -> (a -> Logger w -> IO r)
62 -> JobSettings
63 -> JobsState t w r
64 -> IO (SJ.JobID 'SJ.Safe)
65 pushJob jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
66 jid <- nextID js st
67 _je <- addJobEntry jid input f jmap
68 addQueue jobkind jid jqueue
69 return jid