]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Map.hs
[FIX] numRunners to 1 by default for now
[gargantext.git] / src / Gargantext / Utils / Jobs / Map.hs
1 {-# LANGUAGE GADTs #-}
2 module Gargantext.Utils.Jobs.Map where
3
4 import Control.Concurrent
5 import Control.Concurrent.Async
6 import Control.Concurrent.STM
7 import Control.Exception
8 import Control.Monad
9 import Data.Map (Map)
10 import Data.Time.Clock
11 import Prelude
12
13 import qualified Data.Map as Map
14
15 import Gargantext.Utils.Jobs.Settings
16
17 -- | (Mutable) 'Map' containing job id -> job info mapping.
18 newtype JobMap jid w a = JobMap
19 { jobMap :: TVar (Map jid (JobEntry jid w a))
20 }
21
22 -- | Information associated to a job ID
23 data JobEntry jid w a = JobEntry
24 { jID :: jid
25 , jTask :: J w a
26 , jTimeoutAfter :: Maybe UTCTime
27 , jRegistered :: UTCTime
28 , jStarted :: Maybe UTCTime
29 , jEnded :: Maybe UTCTime
30 }
31
32 -- | A job computation, which has a different representation depending on the
33 -- status of the job.
34 --
35 -- A queued job consists of the input to the computation and the computation.
36 -- A running job consists of an 'Async' as well as an action to get the current logs.
37 -- A done job consists of the result of the computation and the final logs.
38 data J w a
39 = QueuedJ (QueuedJob w a)
40 | RunningJ (RunningJob w a)
41 | DoneJ w (Either SomeException a)
42
43 -- | An unexecuted job is an input paired with a computation
44 -- to run with it. Input type is "hidden" to
45 -- be able to store different job types together.
46 data QueuedJob w r where
47 QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
48
49 -- | A running job points to the async computation for the job and provides a
50 -- function to peek at the current logs.
51 data RunningJob w a = RunningJob
52 { rjAsync :: Async a
53 , rjGetLog :: IO w
54 }
55
56 -- | A @'Logger' w@ is a function that can do something with "messages" of type
57 -- @w@ in IO.
58 type Logger w = w -> IO ()
59
60 newJobMap :: IO (JobMap jid w a)
61 newJobMap = JobMap <$> newTVarIO Map.empty
62
63 -- | Lookup a job by ID
64 lookupJob
65 :: Ord jid
66 => jid
67 -> JobMap jid w a
68 -> IO (Maybe (JobEntry jid w a))
69 lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
70
71 -- | Ready to use GC thread
72 gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
73 gcThread js (JobMap mvar) = go
74 where go = do
75 now <- getCurrentTime
76 candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
77 forM_ candidateEntries $ \je -> do
78 mrunningjob <- atomically $ do
79 case jTask je of
80 RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
81 >> return (Just rj)
82 _ -> return Nothing
83 case mrunningjob of
84 Nothing -> return ()
85 Just a -> killJ a
86 threadDelay (jsGcPeriod js * 1000000)
87 go
88
89 expired now jobentry = case jTimeoutAfter jobentry of
90 Just t -> now >= t
91 _ -> False
92
93 -- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
94 jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
95 jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> old_w <> w)
96
97 -- | Generating new 'JobEntry's.
98 addJobEntry
99 :: Ord jid
100 => jid
101 -> a
102 -> (a -> Logger w -> IO r)
103 -> JobMap jid w r
104 -> IO (JobEntry jid w r)
105 addJobEntry jid input f (JobMap mvar) = do
106 now <- getCurrentTime
107 let je = JobEntry
108 { jID = jid
109 , jTask = QueuedJ (QueuedJob input f)
110 , jRegistered = now
111 , jTimeoutAfter = Nothing
112 , jStarted = Nothing
113 , jEnded = Nothing
114 }
115 atomically $ modifyTVar' mvar (Map.insert jid je)
116 return je
117
118 deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
119 deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
120
121 runJob
122 :: (Ord jid, Monoid w)
123 => jid
124 -> QueuedJob w a
125 -> JobMap jid w a
126 -> JobSettings
127 -> IO (RunningJob w a)
128 runJob jid qj (JobMap mvar) js = do
129 rj <- runJ qj
130 now <- getCurrentTime
131 atomically $ modifyTVar' mvar $
132 flip Map.adjust jid $ \je ->
133 je { jTask = RunningJ rj
134 , jStarted = Just now
135 , jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
136 }
137 return rj
138
139 waitJobDone
140 :: Ord jid
141 => jid
142 -> RunningJob w a
143 -> JobMap jid w a
144 -> IO (Either SomeException a, w)
145 waitJobDone jid rj (JobMap mvar) = do
146 r <- waitJ rj
147 now <- getCurrentTime
148 logs <- rjGetLog rj
149 atomically $ modifyTVar' mvar $
150 flip Map.adjust jid $ \je ->
151 je { jEnded = Just now, jTask = DoneJ logs r }
152 return (r, logs)
153
154 -- | Turn a queued job into a running job by setting up the logging of @w@s and
155 -- firing up the async action.
156 runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
157 runJ (QueuedJob a f) = do
158 logs <- newTVarIO mempty
159 act <- async $ f a (jobLog logs)
160 let readLogs = readTVarIO logs
161 return (RunningJob act readLogs)
162
163 -- | Wait for a running job to return (blocking).
164 waitJ :: RunningJob w a -> IO (Either SomeException a)
165 waitJ (RunningJob act _) = waitCatch act
166
167 -- | Poll a running job to see if it's done.
168 pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
169 pollJ (RunningJob act _) = poll act
170
171 -- | Kill a running job by cancelling the action.
172 killJ :: RunningJob w a -> IO ()
173 killJ (RunningJob act _) = cancel act