]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Map.hs
Cleaner Jobs API
[gargantext.git] / src / Gargantext / Utils / Jobs / Map.hs
1 {-# LANGUAGE GADTs #-}
2 module Gargantext.Utils.Jobs.Map (
3 -- * Types
4 JobMap(..)
5 , JobEntry(..)
6 , J(..)
7 , QueuedJob(..)
8 , RunningJob(..)
9 , LoggerM
10 , Logger
11
12 -- * Functions
13 , newJobMap
14 , lookupJob
15 , gcThread
16 , jobLog
17 , addJobEntry
18 , deleteJob
19 , runJob
20 , waitJobDone
21 , runJ
22 , waitJ
23 , pollJ
24 , killJ
25 ) where
26
27 import Control.Concurrent
28 import Control.Concurrent.Async
29 import Control.Concurrent.STM
30 import Control.Exception
31 import Control.Monad
32 import Data.Map.Strict (Map)
33 import Data.Time.Clock
34 import Prelude
35
36 import qualified Data.Map.Strict as Map
37
38 import Gargantext.Utils.Jobs.Settings
39
40 -- | (Mutable) 'Map' containing job id -> job info mapping.
41 newtype JobMap jid w a = JobMap
42 { jobMap :: TVar (Map jid (JobEntry jid w a))
43 }
44
45 -- | Information associated to a job ID
46 data JobEntry jid w a = JobEntry
47 { jID :: jid
48 , jTask :: J w a
49 , jTimeoutAfter :: Maybe UTCTime
50 , jRegistered :: UTCTime
51 , jStarted :: Maybe UTCTime
52 , jEnded :: Maybe UTCTime
53 }
54
55 -- | A job computation, which has a different representation depending on the
56 -- status of the job.
57 --
58 -- A queued job consists of the input to the computation and the computation.
59 -- A running job consists of an 'Async' as well as an action to get the current logs.
60 -- A done job consists of the result of the computation and the final logs.
61 data J w a
62 = QueuedJ (QueuedJob w a)
63 | RunningJ (RunningJob w a)
64 | DoneJ w (Either SomeException a)
65
66 -- | An unexecuted job is an input paired with a computation
67 -- to run with it. Input type is "hidden" to
68 -- be able to store different job types together.
69 data QueuedJob w r where
70 QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
71
72 -- | A running job points to the async computation for the job and provides a
73 -- function to peek at the current logs.
74 data RunningJob w a = RunningJob
75 { rjAsync :: Async a
76 , rjGetLog :: IO w
77 }
78
79 -- | Polymorphic logger over any monad @m@.
80 type LoggerM m w = w -> m ()
81
82 -- | A @'Logger' w@ is a function that can do something with "messages" of type
83 -- @w@ in IO.
84 type Logger w = LoggerM IO w
85
86 newJobMap :: IO (JobMap jid w a)
87 newJobMap = JobMap <$> newTVarIO Map.empty
88
89 -- | Lookup a job by ID
90 lookupJob
91 :: Ord jid
92 => jid
93 -> JobMap jid w a
94 -> IO (Maybe (JobEntry jid w a))
95 lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
96
97 -- | Ready to use GC thread
98 gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
99 gcThread js (JobMap mvar) = go
100 where go = do
101 now <- getCurrentTime
102 candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
103 forM_ candidateEntries $ \je -> do
104 mrunningjob <- atomically $ do
105 case jTask je of
106 RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
107 >> return (Just rj)
108 _ -> return Nothing
109 case mrunningjob of
110 Nothing -> return ()
111 Just a -> killJ a
112 threadDelay (jsGcPeriod js * 1000000)
113 go
114
115 expired now jobentry = case jTimeoutAfter jobentry of
116 Just t -> now >= t
117 _ -> False
118
119 -- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
120 jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
121 jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> old_w <> w)
122
123 -- | Generating new 'JobEntry's.
124 addJobEntry
125 :: Ord jid
126 => jid
127 -> a
128 -> (jid -> a -> Logger w -> IO r)
129 -> JobMap jid w r
130 -> IO (JobEntry jid w r)
131 addJobEntry jid input f (JobMap mvar) = do
132 now <- getCurrentTime
133 let je = JobEntry
134 { jID = jid
135 , jTask = QueuedJ (QueuedJob input (f jid))
136 , jRegistered = now
137 , jTimeoutAfter = Nothing
138 , jStarted = Nothing
139 , jEnded = Nothing
140 }
141 atomically $ modifyTVar' mvar (Map.insert jid je)
142 return je
143
144 deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
145 deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
146
147 runJob
148 :: (Ord jid, Monoid w)
149 => jid
150 -> QueuedJob w a
151 -> JobMap jid w a
152 -> JobSettings
153 -> IO (RunningJob w a)
154 runJob jid qj (JobMap mvar) js = do
155 rj <- runJ qj
156 now <- getCurrentTime
157 atomically $ modifyTVar' mvar $
158 flip Map.adjust jid $ \je ->
159 je { jTask = RunningJ rj
160 , jStarted = Just now
161 , jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
162 }
163 return rj
164
165 waitJobDone
166 :: Ord jid
167 => jid
168 -> RunningJob w a
169 -> JobMap jid w a
170 -> IO (Either SomeException a, w)
171 waitJobDone jid rj (JobMap mvar) = do
172 r <- waitJ rj
173 now <- getCurrentTime
174 logs <- rjGetLog rj
175 atomically $ modifyTVar' mvar $
176 flip Map.adjust jid $ \je ->
177 je { jEnded = Just now, jTask = DoneJ logs r }
178 return (r, logs)
179
180 -- | Turn a queued job into a running job by setting up the logging of @w@s and
181 -- firing up the async action.
182 runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
183 runJ (QueuedJob a f) = do
184 logs <- newTVarIO mempty
185 act <- async $ f a (jobLog logs)
186 let readLogs = readTVarIO logs
187 return (RunningJob act readLogs)
188
189 -- | Wait for a running job to return (blocking).
190 waitJ :: RunningJob w a -> IO (Either SomeException a)
191 waitJ (RunningJob act _) = waitCatch act
192
193 -- | Poll a running job to see if it's done.
194 pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
195 pollJ (RunningJob act _) = poll act
196
197 -- | Kill a running job by cancelling the action.
198 killJ :: RunningJob w a -> IO ()
199 killJ (RunningJob act _) = cancel act