]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Map.hs
Replace `Dual` Monoid with `Seq` in Job API
[gargantext.git] / src / Gargantext / Utils / Jobs / Map.hs
1 {-# LANGUAGE GADTs #-}
2 module Gargantext.Utils.Jobs.Map (
3 -- * Types
4 JobMap(..)
5 , JobEntry(..)
6 , J(..)
7 , QueuedJob(..)
8 , RunningJob(..)
9 , Logger
10
11 -- * Functions
12 , newJobMap
13 , lookupJob
14 , gcThread
15 , jobLog
16 , addJobEntry
17 , deleteJob
18 , runJob
19 , waitJobDone
20 , runJ
21 , waitJ
22 , pollJ
23 , killJ
24 ) where
25
26 import Control.Concurrent
27 import Control.Concurrent.Async
28 import Control.Concurrent.STM
29 import Control.Exception
30 import Control.Monad
31 import Data.Map.Strict (Map)
32 import Data.Time.Clock
33 import Prelude
34
35 import qualified Data.Map.Strict as Map
36
37 import Gargantext.Utils.Jobs.Settings
38
39 -- | (Mutable) 'Map' containing job id -> job info mapping.
40 newtype JobMap jid w a = JobMap
41 { jobMap :: TVar (Map jid (JobEntry jid w a))
42 }
43
44 -- | Information associated to a job ID
45 data JobEntry jid w a = JobEntry
46 { jID :: jid
47 , jTask :: J w a
48 , jTimeoutAfter :: Maybe UTCTime
49 , jRegistered :: UTCTime
50 , jStarted :: Maybe UTCTime
51 , jEnded :: Maybe UTCTime
52 }
53
54 -- | A job computation, which has a different representation depending on the
55 -- status of the job.
56 --
57 -- A queued job consists of the input to the computation and the computation.
58 -- A running job consists of an 'Async' as well as an action to get the current logs.
59 -- A done job consists of the result of the computation and the final logs.
60 data J w a
61 = QueuedJ (QueuedJob w a)
62 | RunningJ (RunningJob w a)
63 | DoneJ w (Either SomeException a)
64
65 -- | An unexecuted job is an input paired with a computation
66 -- to run with it. Input type is "hidden" to
67 -- be able to store different job types together.
68 data QueuedJob w r where
69 QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
70
71 -- | A running job points to the async computation for the job and provides a
72 -- function to peek at the current logs.
73 data RunningJob w a = RunningJob
74 { rjAsync :: Async a
75 , rjGetLog :: IO w
76 }
77
78 -- | A @'Logger' w@ is a function that can do something with "messages" of type
79 -- @w@ in IO.
80 type Logger w = w -> IO ()
81
82 newJobMap :: IO (JobMap jid w a)
83 newJobMap = JobMap <$> newTVarIO Map.empty
84
85 -- | Lookup a job by ID
86 lookupJob
87 :: Ord jid
88 => jid
89 -> JobMap jid w a
90 -> IO (Maybe (JobEntry jid w a))
91 lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
92
93 -- | Ready to use GC thread
94 gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
95 gcThread js (JobMap mvar) = go
96 where go = do
97 now <- getCurrentTime
98 candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
99 forM_ candidateEntries $ \je -> do
100 mrunningjob <- atomically $ do
101 case jTask je of
102 RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
103 >> return (Just rj)
104 _ -> return Nothing
105 case mrunningjob of
106 Nothing -> return ()
107 Just a -> killJ a
108 threadDelay (jsGcPeriod js * 1000000)
109 go
110
111 expired now jobentry = case jTimeoutAfter jobentry of
112 Just t -> now >= t
113 _ -> False
114
115 -- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
116 jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
117 jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> old_w <> w)
118
119 -- | Generating new 'JobEntry's.
120 addJobEntry
121 :: Ord jid
122 => jid
123 -> a
124 -> (jid -> a -> Logger w -> IO r)
125 -> JobMap jid w r
126 -> IO (JobEntry jid w r)
127 addJobEntry jid input f (JobMap mvar) = do
128 now <- getCurrentTime
129 let je = JobEntry
130 { jID = jid
131 , jTask = QueuedJ (QueuedJob input (f jid))
132 , jRegistered = now
133 , jTimeoutAfter = Nothing
134 , jStarted = Nothing
135 , jEnded = Nothing
136 }
137 atomically $ modifyTVar' mvar (Map.insert jid je)
138 return je
139
140 deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
141 deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
142
143 runJob
144 :: (Ord jid, Monoid w)
145 => jid
146 -> QueuedJob w a
147 -> JobMap jid w a
148 -> JobSettings
149 -> IO (RunningJob w a)
150 runJob jid qj (JobMap mvar) js = do
151 rj <- runJ qj
152 now <- getCurrentTime
153 atomically $ modifyTVar' mvar $
154 flip Map.adjust jid $ \je ->
155 je { jTask = RunningJ rj
156 , jStarted = Just now
157 , jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
158 }
159 return rj
160
161 waitJobDone
162 :: Ord jid
163 => jid
164 -> RunningJob w a
165 -> JobMap jid w a
166 -> IO (Either SomeException a, w)
167 waitJobDone jid rj (JobMap mvar) = do
168 r <- waitJ rj
169 now <- getCurrentTime
170 logs <- rjGetLog rj
171 atomically $ modifyTVar' mvar $
172 flip Map.adjust jid $ \je ->
173 je { jEnded = Just now, jTask = DoneJ logs r }
174 return (r, logs)
175
176 -- | Turn a queued job into a running job by setting up the logging of @w@s and
177 -- firing up the async action.
178 runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
179 runJ (QueuedJob a f) = do
180 logs <- newTVarIO mempty
181 act <- async $ f a (jobLog logs)
182 let readLogs = readTVarIO logs
183 return (RunningJob act readLogs)
184
185 -- | Wait for a running job to return (blocking).
186 waitJ :: RunningJob w a -> IO (Either SomeException a)
187 waitJ (RunningJob act _) = waitCatch act
188
189 -- | Poll a running job to see if it's done.
190 pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
191 pollJ (RunningJob act _) = poll act
192
193 -- | Kill a running job by cancelling the action.
194 killJ :: RunningJob w a -> IO ()
195 killJ (RunningJob act _) = cancel act