2 module Gargantext.Utils.Jobs.Map (
26 import Control.Concurrent
27 import Control.Concurrent.Async
28 import Control.Concurrent.STM
29 import Control.Exception
31 import Data.Map.Strict (Map)
32 import Data.Time.Clock
35 import qualified Data.Map.Strict as Map
37 import Gargantext.Utils.Jobs.Settings
39 -- | (Mutable) 'Map' containing job id -> job info mapping.
40 newtype JobMap jid w a = JobMap
41 { jobMap :: TVar (Map jid (JobEntry jid w a))
44 -- | Information associated to a job ID
45 data JobEntry jid w a = JobEntry
48 , jTimeoutAfter :: Maybe UTCTime
49 , jRegistered :: UTCTime
50 , jStarted :: Maybe UTCTime
51 , jEnded :: Maybe UTCTime
54 -- | A job computation, which has a different representation depending on the
57 -- A queued job consists of the input to the computation and the computation.
58 -- A running job consists of an 'Async' as well as an action to get the current logs.
59 -- A done job consists of the result of the computation and the final logs.
61 = QueuedJ (QueuedJob w a)
62 | RunningJ (RunningJob w a)
63 | DoneJ w (Either SomeException a)
65 -- | An unexecuted job is an input paired with a computation
66 -- to run with it. Input type is "hidden" to
67 -- be able to store different job types together.
68 data QueuedJob w r where
69 QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
71 -- | A running job points to the async computation for the job and provides a
72 -- function to peek at the current logs.
73 data RunningJob w a = RunningJob
78 -- | A @'Logger' w@ is a function that can do something with "messages" of type
80 type Logger w = w -> IO ()
82 newJobMap :: IO (JobMap jid w a)
83 newJobMap = JobMap <$> newTVarIO Map.empty
85 -- | Lookup a job by ID
90 -> IO (Maybe (JobEntry jid w a))
91 lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
93 -- | Ready to use GC thread
94 gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
95 gcThread js (JobMap mvar) = go
98 candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
99 forM_ candidateEntries $ \je -> do
100 mrunningjob <- atomically $ do
102 RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
108 threadDelay (jsGcPeriod js * 1000000)
111 expired now jobentry = case jTimeoutAfter jobentry of
115 -- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
116 jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
117 jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> old_w <> w)
119 -- | Generating new 'JobEntry's.
124 -> (jid -> a -> Logger w -> IO r)
126 -> IO (JobEntry jid w r)
127 addJobEntry jid input f (JobMap mvar) = do
128 now <- getCurrentTime
131 , jTask = QueuedJ (QueuedJob input (f jid))
133 , jTimeoutAfter = Nothing
137 atomically $ modifyTVar' mvar (Map.insert jid je)
140 deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
141 deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
144 :: (Ord jid, Monoid w)
149 -> IO (RunningJob w a)
150 runJob jid qj (JobMap mvar) js = do
152 now <- getCurrentTime
153 atomically $ modifyTVar' mvar $
154 flip Map.adjust jid $ \je ->
155 je { jTask = RunningJ rj
156 , jStarted = Just now
157 , jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
166 -> IO (Either SomeException a, w)
167 waitJobDone jid rj (JobMap mvar) = do
169 now <- getCurrentTime
171 atomically $ modifyTVar' mvar $
172 flip Map.adjust jid $ \je ->
173 je { jEnded = Just now, jTask = DoneJ logs r }
176 -- | Turn a queued job into a running job by setting up the logging of @w@s and
177 -- firing up the async action.
178 runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
179 runJ (QueuedJob a f) = do
180 logs <- newTVarIO mempty
181 act <- async $ f a (jobLog logs)
182 let readLogs = readTVarIO logs
183 return (RunningJob act readLogs)
185 -- | Wait for a running job to return (blocking).
186 waitJ :: RunningJob w a -> IO (Either SomeException a)
187 waitJ (RunningJob act _) = waitCatch act
189 -- | Poll a running job to see if it's done.
190 pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
191 pollJ (RunningJob act _) = poll act
193 -- | Kill a running job by cancelling the action.
194 killJ :: RunningJob w a -> IO ()
195 killJ (RunningJob act _) = cancel act