2 module Gargantext.Utils.Jobs.Map where
4 import Control.Concurrent
5 import Control.Concurrent.Async
6 import Control.Concurrent.STM
7 import Control.Exception
9 import Data.Map.Strict (Map)
10 import Data.Time.Clock
13 import qualified Data.Map.Strict as Map
15 import Gargantext.Utils.Jobs.Settings
17 -- | (Mutable) 'Map' containing job id -> job info mapping.
18 newtype JobMap jid w a = JobMap
19 { jobMap :: TVar (Map jid (JobEntry jid w a))
22 -- | Information associated to a job ID
23 data JobEntry jid w a = JobEntry
26 , jTimeoutAfter :: Maybe UTCTime
27 , jRegistered :: UTCTime
28 , jStarted :: Maybe UTCTime
29 , jEnded :: Maybe UTCTime
32 -- | A job computation, which has a different representation depending on the
35 -- A queued job consists of the input to the computation and the computation.
36 -- A running job consists of an 'Async' as well as an action to get the current logs.
37 -- A done job consists of the result of the computation and the final logs.
39 = QueuedJ (QueuedJob w a)
40 | RunningJ (RunningJob w a)
41 | DoneJ w (Either SomeException a)
43 -- | An unexecuted job is an input paired with a computation
44 -- to run with it. Input type is "hidden" to
45 -- be able to store different job types together.
46 data QueuedJob w r where
47 QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
49 -- | A running job points to the async computation for the job and provides a
50 -- function to peek at the current logs.
51 data RunningJob w a = RunningJob
56 -- | A @'Logger' w@ is a function that can do something with "messages" of type
58 type Logger w = w -> IO ()
60 newJobMap :: IO (JobMap jid w a)
61 newJobMap = JobMap <$> newTVarIO Map.empty
63 -- | Lookup a job by ID
68 -> IO (Maybe (JobEntry jid w a))
69 lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
71 -- | Ready to use GC thread
72 gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
73 gcThread js (JobMap mvar) = go
76 candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
77 forM_ candidateEntries $ \je -> do
78 mrunningjob <- atomically $ do
80 RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
86 threadDelay (jsGcPeriod js * 1000000)
89 expired now jobentry = case jTimeoutAfter jobentry of
93 -- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
94 jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
95 jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> old_w <> w)
97 -- | Generating new 'JobEntry's.
102 -> (a -> Logger w -> IO r)
104 -> IO (JobEntry jid w r)
105 addJobEntry jid input f (JobMap mvar) = do
106 now <- getCurrentTime
109 , jTask = QueuedJ (QueuedJob input f)
111 , jTimeoutAfter = Nothing
115 atomically $ modifyTVar' mvar (Map.insert jid je)
118 deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
119 deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
122 :: (Ord jid, Monoid w)
127 -> IO (RunningJob w a)
128 runJob jid qj (JobMap mvar) js = do
130 now <- getCurrentTime
131 atomically $ modifyTVar' mvar $
132 flip Map.adjust jid $ \je ->
133 je { jTask = RunningJ rj
134 , jStarted = Just now
135 , jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
144 -> IO (Either SomeException a, w)
145 waitJobDone jid rj (JobMap mvar) = do
147 now <- getCurrentTime
149 atomically $ modifyTVar' mvar $
150 flip Map.adjust jid $ \je ->
151 je { jEnded = Just now, jTask = DoneJ logs r }
154 -- | Turn a queued job into a running job by setting up the logging of @w@s and
155 -- firing up the async action.
156 runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
157 runJ (QueuedJob a f) = do
158 logs <- newTVarIO mempty
159 act <- async $ f a (jobLog logs)
160 let readLogs = readTVarIO logs
161 return (RunningJob act readLogs)
163 -- | Wait for a running job to return (blocking).
164 waitJ :: RunningJob w a -> IO (Either SomeException a)
165 waitJ (RunningJob act _) = waitCatch act
167 -- | Poll a running job to see if it's done.
168 pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
169 pollJ (RunningJob act _) = poll act
171 -- | Kill a running job by cancelling the action.
172 killJ :: RunningJob w a -> IO ()
173 killJ (RunningJob act _) = cancel act