]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Map.hs
Merge remote-tracking branch 'origin/adinapoli/issue-198' into dev
[gargantext.git] / src / Gargantext / Utils / Jobs / Map.hs
1 {-# LANGUAGE GADTs #-}
2 module Gargantext.Utils.Jobs.Map (
3 -- * Types
4 JobMap(..)
5 , JobEntry(..)
6 , J(..)
7 , QueuedJob(..)
8 , RunningJob(..)
9 , LoggerM
10 , Logger
11
12 -- * Functions
13 , newJobMap
14 , lookupJob
15 , gcThread
16 , addJobEntry
17 , deleteJob
18 , runJob
19 , waitJobDone
20 , runJ
21 , waitJ
22 , pollJ
23 , killJ
24 ) where
25
26 import Control.Concurrent
27 import Control.Concurrent.Async
28 import Control.Concurrent.STM
29 import Control.Exception
30 import Control.Monad
31 import Data.Map.Strict (Map)
32 import Data.Time.Clock
33 import Prelude
34
35 import qualified Data.Map.Strict as Map
36
37 import Gargantext.Utils.Jobs.Settings
38
39 -- | (Mutable) 'Map' containing job id -> job info mapping.
40 newtype JobMap jid w a = JobMap
41 { jobMap :: TVar (Map jid (JobEntry jid w a))
42 }
43
44 -- | Information associated to a job ID
45 data JobEntry jid w a = JobEntry
46 { jID :: jid
47 , jTask :: J w a
48 , jTimeoutAfter :: Maybe UTCTime
49 , jRegistered :: UTCTime
50 , jStarted :: Maybe UTCTime
51 , jEnded :: Maybe UTCTime
52 }
53
54 -- | A job computation, which has a different representation depending on the
55 -- status of the job.
56 --
57 -- A queued job consists of the input to the computation and the computation.
58 -- A running job consists of an 'Async' as well as an action to get the current logs.
59 -- A done job consists of the result of the computation and the final logs.
60 data J w a
61 = QueuedJ (QueuedJob w a)
62 | RunningJ (RunningJob w a)
63 | DoneJ w (Either SomeException a)
64
65 -- | An unexecuted job is an input paired with a computation
66 -- to run with it. Input type is "hidden" to
67 -- be able to store different job types together.
68 data QueuedJob w r where
69 QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
70
71 -- | A running job points to the async computation for the job and provides a
72 -- function to peek at the current logs.
73 data RunningJob w a = RunningJob
74 { rjAsync :: Async a
75 , rjGetLog :: IO w
76 }
77
78 -- | Polymorphic logger over any monad @m@.
79 type LoggerM m w = w -> m ()
80
81 -- | A @'Logger' w@ is a function that can do something with "messages" of type
82 -- @w@ in IO.
83 type Logger w = LoggerM IO w
84
85 newJobMap :: IO (JobMap jid w a)
86 newJobMap = JobMap <$> newTVarIO Map.empty
87
88 -- | Lookup a job by ID
89 lookupJob
90 :: Ord jid
91 => jid
92 -> JobMap jid w a
93 -> IO (Maybe (JobEntry jid w a))
94 lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
95
96 -- | Ready to use GC thread
97 gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
98 gcThread js (JobMap mvar) = go
99 where go = do
100 now <- getCurrentTime
101 candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
102 forM_ candidateEntries $ \je -> do
103 mrunningjob <- atomically $ do
104 case jTask je of
105 RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
106 >> return (Just rj)
107 _ -> return Nothing
108 case mrunningjob of
109 Nothing -> return ()
110 Just a -> killJ a
111 threadDelay (jsGcPeriod js * 1000000)
112 go
113
114 expired now jobentry = case jTimeoutAfter jobentry of
115 Just t -> now >= t
116 _ -> False
117
118 -- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
119 -- /IMPORTANT/: The new value is appended in front. The ordering is important later on
120 -- when consuming logs from the API (see for example 'pollJob').
121 jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
122 jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> w <> old_w)
123
124 -- | Generating new 'JobEntry's.
125 addJobEntry
126 :: Ord jid
127 => UTCTime
128 -> jid
129 -> a
130 -> (jid -> a -> Logger w -> IO r)
131 -> JobMap jid w r
132 -> STM (JobEntry jid w r)
133 addJobEntry now jid input f (JobMap mvar) = do
134 let je = JobEntry
135 { jID = jid
136 , jTask = QueuedJ (QueuedJob input (f jid))
137 , jRegistered = now
138 , jTimeoutAfter = Nothing
139 , jStarted = Nothing
140 , jEnded = Nothing
141 }
142 modifyTVar' mvar (Map.insert jid je)
143 pure je
144
145 deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
146 deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
147
148 runJob
149 :: (Ord jid, Monoid w)
150 => jid
151 -> QueuedJob w a
152 -> JobMap jid w a
153 -> JobSettings
154 -> IO (RunningJob w a)
155 runJob jid qj (JobMap mvar) js = do
156 rj <- runJ qj
157 now <- getCurrentTime
158 atomically $ modifyTVar' mvar $
159 flip Map.adjust jid $ \je ->
160 je { jTask = RunningJ rj
161 , jStarted = Just now
162 , jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
163 }
164 return rj
165
166 waitJobDone
167 :: Ord jid
168 => jid
169 -> RunningJob w a
170 -> JobMap jid w a
171 -> IO (Either SomeException a, w)
172 waitJobDone jid rj (JobMap mvar) = do
173 r <- waitJ rj
174 now <- getCurrentTime
175 logs <- rjGetLog rj
176 atomically $ modifyTVar' mvar $
177 flip Map.adjust jid $ \je ->
178 je { jEnded = Just now, jTask = DoneJ logs r }
179 return (r, logs)
180
181 -- | Turn a queued job into a running job by setting up the logging of @w@s and
182 -- firing up the async action.
183 runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
184 runJ (QueuedJob a f) = do
185 logs <- newTVarIO mempty
186 act <- async $ f a (jobLog logs)
187 let readLogs = readTVarIO logs
188 return (RunningJob act readLogs)
189
190 -- | Wait for a running job to return (blocking).
191 waitJ :: RunningJob w a -> IO (Either SomeException a)
192 waitJ (RunningJob act _) = waitCatch act
193
194 -- | Poll a running job to see if it's done.
195 pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
196 pollJ (RunningJob act _) = poll act
197
198 -- | Kill a running job by cancelling the action.
199 killJ :: RunningJob w a -> IO ()
200 killJ (RunningJob act _) = cancel act