2 module Gargantext.Utils.Jobs.Map (
26 import Control.Concurrent
27 import Control.Concurrent.Async
28 import Control.Concurrent.STM
29 import Control.Exception
31 import Data.Map.Strict (Map)
32 import Data.Time.Clock
35 import qualified Data.Map.Strict as Map
37 import Gargantext.Utils.Jobs.Settings
39 -- | (Mutable) 'Map' containing job id -> job info mapping.
40 newtype JobMap jid w a = JobMap
41 { jobMap :: TVar (Map jid (JobEntry jid w a))
44 -- | Information associated to a job ID
45 data JobEntry jid w a = JobEntry
48 , jTimeoutAfter :: Maybe UTCTime
49 , jRegistered :: UTCTime
50 , jStarted :: Maybe UTCTime
51 , jEnded :: Maybe UTCTime
54 -- | A job computation, which has a different representation depending on the
57 -- A queued job consists of the input to the computation and the computation.
58 -- A running job consists of an 'Async' as well as an action to get the current logs.
59 -- A done job consists of the result of the computation and the final logs.
61 = QueuedJ (QueuedJob w a)
62 | RunningJ (RunningJob w a)
63 | DoneJ w (Either SomeException a)
65 -- | An unexecuted job is an input paired with a computation
66 -- to run with it. Input type is "hidden" to
67 -- be able to store different job types together.
68 data QueuedJob w r where
69 QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
71 -- | A running job points to the async computation for the job and provides a
72 -- function to peek at the current logs.
73 data RunningJob w a = RunningJob
78 -- | Polymorphic logger over any monad @m@.
79 type LoggerM m w = w -> m ()
81 -- | A @'Logger' w@ is a function that can do something with "messages" of type
83 type Logger w = LoggerM IO w
85 newJobMap :: IO (JobMap jid w a)
86 newJobMap = JobMap <$> newTVarIO Map.empty
88 -- | Lookup a job by ID
93 -> IO (Maybe (JobEntry jid w a))
94 lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
96 -- | Ready to use GC thread
97 gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
98 gcThread js (JobMap mvar) = go
100 now <- getCurrentTime
101 candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
102 forM_ candidateEntries $ \je -> do
103 mrunningjob <- atomically $ do
105 RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
111 threadDelay (jsGcPeriod js * 1000000)
114 expired now jobentry = case jTimeoutAfter jobentry of
118 -- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
119 -- /IMPORTANT/: The new value is appended in front. The ordering is important later on
120 -- when consuming logs from the API (see for example 'pollJob').
121 jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
122 jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> w <> old_w)
124 -- | Generating new 'JobEntry's.
129 -> (jid -> a -> Logger w -> IO r)
131 -> IO (JobEntry jid w r)
132 addJobEntry jid input f (JobMap mvar) = do
133 now <- getCurrentTime
136 , jTask = QueuedJ (QueuedJob input (f jid))
138 , jTimeoutAfter = Nothing
142 atomically $ modifyTVar' mvar (Map.insert jid je)
145 deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
146 deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
149 :: (Ord jid, Monoid w)
154 -> IO (RunningJob w a)
155 runJob jid qj (JobMap mvar) js = do
157 now <- getCurrentTime
158 atomically $ modifyTVar' mvar $
159 flip Map.adjust jid $ \je ->
160 je { jTask = RunningJ rj
161 , jStarted = Just now
162 , jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
171 -> IO (Either SomeException a, w)
172 waitJobDone jid rj (JobMap mvar) = do
174 now <- getCurrentTime
176 atomically $ modifyTVar' mvar $
177 flip Map.adjust jid $ \je ->
178 je { jEnded = Just now, jTask = DoneJ logs r }
181 -- | Turn a queued job into a running job by setting up the logging of @w@s and
182 -- firing up the async action.
183 runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
184 runJ (QueuedJob a f) = do
185 logs <- newTVarIO mempty
186 act <- async $ f a (jobLog logs)
187 let readLogs = readTVarIO logs
188 return (RunningJob act readLogs)
190 -- | Wait for a running job to return (blocking).
191 waitJ :: RunningJob w a -> IO (Either SomeException a)
192 waitJ (RunningJob act _) = waitCatch act
194 -- | Poll a running job to see if it's done.
195 pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
196 pollJ (RunningJob act _) = poll act
198 -- | Kill a running job by cancelling the action.
199 killJ :: RunningJob w a -> IO ()
200 killJ (RunningJob act _) = cancel act