]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Monad.hs
Merge remote-tracking branch 'origin/adinapoli/issue-185-job-api' into dev-merge
[gargantext.git] / src / Gargantext / Utils / Jobs / Monad.hs
1 {-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses, TypeFamilies #-}
2 module Gargantext.Utils.Jobs.Monad (
3 -- * Types and classes
4 JobEnv(..)
5 , NumRunners
6 , JobError(..)
7 , JobHandle -- opaque
8
9 , MonadJob(..)
10
11 -- * Tracking jobs status
12 , MonadJobStatus(..)
13 , getLatestJobStatus
14 , updateJobProgress
15
16 -- * Functions
17 , newJobEnv
18 , defaultJobSettings
19 , genSecret
20 , getJobsSettings
21 , getJobsState
22 , getJobsMap
23 , getJobsQueue
24 , queueJob
25 , findJob
26 , checkJID
27 , withJob
28 , handleIDError
29 , removeJob
30 , mkJobHandle
31 , jobHandleLogger
32 ) where
33
34 import Gargantext.Utils.Jobs.Settings
35 import Gargantext.Utils.Jobs.Map
36 import Gargantext.Utils.Jobs.Queue
37 import Gargantext.Utils.Jobs.State
38
39 import Control.Concurrent.STM
40 import Control.Exception
41 import Control.Monad.Except
42 import Control.Monad.Reader
43 import Data.Functor ((<&>))
44 import Data.Kind (Type)
45 import Data.Map.Strict (Map)
46 import Data.Sequence (Seq, viewr, ViewR(..))
47 import Data.Time.Clock
48 import Network.HTTP.Client (Manager)
49 import Prelude
50
51 import qualified Servant.Job.Core as SJ
52 import qualified Servant.Job.Types as SJ
53
54 data JobEnv t w a = JobEnv
55 { jeSettings :: JobSettings
56 , jeState :: JobsState t w a
57 , jeManager :: Manager
58 }
59
60 newJobEnv
61 :: (EnumBounded t, Monoid w)
62 => JobSettings
63 -> Map t Prio
64 -> Manager
65 -> IO (JobEnv t w a)
66 newJobEnv js prios mgr = JobEnv js <$> newJobsState js prios <*> pure mgr
67
68 type NumRunners = Int
69
70 defaultJobSettings :: NumRunners -> SJ.SecretKey -> JobSettings
71 defaultJobSettings numRunners k = JobSettings
72 { jsNumRunners = numRunners
73 , jsJobTimeout = 30 * 60 -- 30 minutes
74 , jsIDTimeout = 30 * 60 -- 30 minutes
75 , jsGcPeriod = 1 * 60 -- 1 minute
76 , jsSecretKey = k
77 }
78
79 genSecret :: IO SJ.SecretKey
80 genSecret = SJ.generateSecretKey
81
82 class MonadIO m => MonadJob m t w a | m -> t w a where
83 getJobEnv :: m (JobEnv t w a)
84
85 instance MonadIO m => MonadJob (ReaderT (JobEnv t w a) m) t w a where
86 getJobEnv = ask
87
88 getJobsSettings :: MonadJob m t w a => m JobSettings
89 getJobsSettings = jeSettings <$> getJobEnv
90
91 getJobsState :: MonadJob m t w a => m (JobsState t w a)
92 getJobsState = jeState <$> getJobEnv
93
94 getJobsMap :: MonadJob m t w a => m (JobMap (SJ.JobID 'SJ.Safe) w a)
95 getJobsMap = jobsData <$> getJobsState
96
97 getJobsQueue :: MonadJob m t w a => m (Queue t (SJ.JobID 'SJ.Safe))
98 getJobsQueue = jobsQ <$> getJobsState
99
100 queueJob
101 :: (MonadJob m t w a, Ord t)
102 => t
103 -> i
104 -> (SJ.JobID 'SJ.Safe -> i -> Logger w -> IO a)
105 -> m (SJ.JobID 'SJ.Safe)
106 queueJob jobkind input f = do
107 js <- getJobsSettings
108 st <- getJobsState
109 liftIO (pushJob jobkind input f js st)
110
111 findJob
112 :: MonadJob m t w a
113 => SJ.JobID 'SJ.Safe
114 -> m (Maybe (JobEntry (SJ.JobID 'SJ.Safe) w a))
115 findJob jid = do
116 jmap <- getJobsMap
117 liftIO $ lookupJob jid jmap
118
119 data JobError
120 = InvalidIDType
121 | IDExpired
122 | InvalidMacID
123 | UnknownJob
124 | JobException SomeException
125 deriving Show
126
127 checkJID
128 :: MonadJob m t w a
129 => SJ.JobID 'SJ.Unsafe
130 -> m (Either JobError (SJ.JobID 'SJ.Safe))
131 checkJID (SJ.PrivateID tn n t d) = do
132 now <- liftIO getCurrentTime
133 js <- getJobsSettings
134 if | tn /= "job" -> return (Left InvalidIDType)
135 | now > addUTCTime (fromIntegral $ jsIDTimeout js) t -> return (Left IDExpired)
136 | d /= SJ.macID tn (jsSecretKey js) t n -> return (Left InvalidMacID)
137 | otherwise -> return $ Right (SJ.PrivateID tn n t d)
138
139 withJob
140 :: MonadJob m t w a
141 => SJ.JobID 'SJ.Unsafe
142 -> (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) w a -> m r)
143 -> m (Either JobError (Maybe r))
144 withJob jid f = do
145 r <- checkJID jid
146 case r of
147 Left e -> return (Left e)
148 Right jid' -> do
149 mj <- findJob jid'
150 case mj of
151 Nothing -> return (Right Nothing)
152 Just j -> Right . Just <$> f jid' j
153
154 handleIDError
155 :: MonadError e m
156 => (JobError -> e)
157 -> m (Either JobError a)
158 -> m a
159 handleIDError toE act = act >>= \r -> case r of
160 Left err -> throwError (toE err)
161 Right a -> return a
162
163 removeJob
164 :: (Ord t, MonadJob m t w a)
165 => Bool -- is it queued (and we have to remove jid from queue)
166 -> t
167 -> SJ.JobID 'SJ.Safe
168 -> m ()
169 removeJob queued t jid = do
170 q <- getJobsQueue
171 m <- getJobsMap
172 liftIO . atomically $ do
173 when queued $
174 deleteQueue t jid q
175 deleteJob jid m
176
177 --
178 -- Tracking jobs status
179 --
180
181 -- | An opaque handle that abstracts over the concrete identifier for
182 -- a job. The constructor for this type is deliberately not exported.
183 data JobHandle m event = JobHandle {
184 _jh_id :: !(SJ.JobID 'SJ.Safe)
185 , _jh_logger :: LoggerM m event
186 }
187
188 -- | Creates a new 'JobHandle', given its underlying 'JobID' and the logging function to
189 -- be used to report the status.
190 mkJobHandle :: SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m event
191 mkJobHandle jId = JobHandle jId
192
193 jobHandleLogger :: JobHandle m event -> LoggerM m event
194 jobHandleLogger (JobHandle _ lgr) = lgr
195
196 -- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
197 class MonadJob m (JobType m) (Seq (JobEventType m)) (JobOutputType m) => MonadJobStatus m where
198 type JobType m :: Type
199 type JobOutputType m :: Type
200 type JobEventType m :: Type
201
202 --
203 -- Tracking jobs status API
204 --
205
206 -- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
207 -- used to query the latest status for a particular job, given its 'JobHandle' as input.
208 getLatestJobStatus :: MonadJobStatus m => JobHandle m (JobEventType m) -> m (Maybe (JobEventType m))
209 getLatestJobStatus (JobHandle jId _) = do
210 mb_jb <- findJob jId
211 case mb_jb of
212 Nothing -> pure Nothing
213 Just j -> case jTask j of
214 QueuedJ _ -> pure Nothing
215 RunningJ rj -> liftIO (rjGetLog rj) <&>
216 \lgs -> case viewr lgs of
217 EmptyR -> Nothing
218 _ :> l -> Just l
219 DoneJ lgs _ -> pure $ case viewr lgs of
220 EmptyR -> Nothing
221 _ :> l -> Just l
222
223 updateJobProgress :: (Monoid (JobEventType m), MonadJobStatus m)
224 => JobHandle m (JobEventType m)
225 -- ^ The handle that uniquely identifies this job.
226 -> (JobEventType m -> JobEventType m)
227 -- ^ A /pure/ function to update the 'JobEventType'. The input
228 -- is the /latest/ event, i.e. the current progress status. If
229 -- this is the first time we report progress and therefore there
230 -- is no previous progress status, this function will be applied
231 -- over 'mempty', thus the 'Monoid' constraint.
232 -> m ()
233 updateJobProgress hdl@(JobHandle _jId logStatus) updateJobStatus = do
234 latestStatus <- getLatestJobStatus hdl
235 case latestStatus of
236 Nothing -> logStatus (updateJobStatus mempty)
237 Just s -> logStatus (updateJobStatus s)