2 module Gargantext.Utils.Jobs.Map (
27 import Control.Concurrent
28 import Control.Concurrent.Async
29 import Control.Concurrent.STM
30 import Control.Exception
32 import Data.Map.Strict (Map)
33 import Data.Time.Clock
36 import qualified Data.Map.Strict as Map
38 import Gargantext.Utils.Jobs.Settings
40 -- | (Mutable) 'Map' containing job id -> job info mapping.
41 newtype JobMap jid w a = JobMap
42 { jobMap :: TVar (Map jid (JobEntry jid w a))
45 -- | Information associated to a job ID
46 data JobEntry jid w a = JobEntry
49 , jTimeoutAfter :: Maybe UTCTime
50 , jRegistered :: UTCTime
51 , jStarted :: Maybe UTCTime
52 , jEnded :: Maybe UTCTime
55 -- | A job computation, which has a different representation depending on the
58 -- A queued job consists of the input to the computation and the computation.
59 -- A running job consists of an 'Async' as well as an action to get the current logs.
60 -- A done job consists of the result of the computation and the final logs.
62 = QueuedJ (QueuedJob w a)
63 | RunningJ (RunningJob w a)
64 | DoneJ w (Either SomeException a)
66 -- | An unexecuted job is an input paired with a computation
67 -- to run with it. Input type is "hidden" to
68 -- be able to store different job types together.
69 data QueuedJob w r where
70 QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
72 -- | A running job points to the async computation for the job and provides a
73 -- function to peek at the current logs.
74 data RunningJob w a = RunningJob
79 -- | Polymorphic logger over any monad @m@.
80 type LoggerM m w = w -> m ()
82 -- | A @'Logger' w@ is a function that can do something with "messages" of type
84 type Logger w = LoggerM IO w
86 newJobMap :: IO (JobMap jid w a)
87 newJobMap = JobMap <$> newTVarIO Map.empty
89 -- | Lookup a job by ID
94 -> IO (Maybe (JobEntry jid w a))
95 lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
97 -- | Ready to use GC thread
98 gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
99 gcThread js (JobMap mvar) = go
101 now <- getCurrentTime
102 candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
103 forM_ candidateEntries $ \je -> do
104 mrunningjob <- atomically $ do
106 RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
112 threadDelay (jsGcPeriod js * 1000000)
115 expired now jobentry = case jTimeoutAfter jobentry of
119 -- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
120 jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
121 jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> old_w <> w)
123 -- | Generating new 'JobEntry's.
128 -> (jid -> a -> Logger w -> IO r)
130 -> IO (JobEntry jid w r)
131 addJobEntry jid input f (JobMap mvar) = do
132 now <- getCurrentTime
135 , jTask = QueuedJ (QueuedJob input (f jid))
137 , jTimeoutAfter = Nothing
141 atomically $ modifyTVar' mvar (Map.insert jid je)
144 deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
145 deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
148 :: (Ord jid, Monoid w)
153 -> IO (RunningJob w a)
154 runJob jid qj (JobMap mvar) js = do
156 now <- getCurrentTime
157 atomically $ modifyTVar' mvar $
158 flip Map.adjust jid $ \je ->
159 je { jTask = RunningJ rj
160 , jStarted = Just now
161 , jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
170 -> IO (Either SomeException a, w)
171 waitJobDone jid rj (JobMap mvar) = do
173 now <- getCurrentTime
175 atomically $ modifyTVar' mvar $
176 flip Map.adjust jid $ \je ->
177 je { jEnded = Just now, jTask = DoneJ logs r }
180 -- | Turn a queued job into a running job by setting up the logging of @w@s and
181 -- firing up the async action.
182 runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
183 runJ (QueuedJob a f) = do
184 logs <- newTVarIO mempty
185 act <- async $ f a (jobLog logs)
186 let readLogs = readTVarIO logs
187 return (RunningJob act readLogs)
189 -- | Wait for a running job to return (blocking).
190 waitJ :: RunningJob w a -> IO (Either SomeException a)
191 waitJ (RunningJob act _) = waitCatch act
193 -- | Poll a running job to see if it's done.
194 pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
195 pollJ (RunningJob act _) = poll act
197 -- | Kill a running job by cancelling the action.
198 killJ :: RunningJob w a -> IO ()
199 killJ (RunningJob act _) = cancel act