1 {-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses, TypeFamilies #-}
2 module Gargantext.Utils.Jobs.Monad (
11 -- * Tracking jobs status
34 import Gargantext.Utils.Jobs.Settings
35 import Gargantext.Utils.Jobs.Map
36 import Gargantext.Utils.Jobs.Queue
37 import Gargantext.Utils.Jobs.State
39 import Control.Concurrent.STM
40 import Control.Exception
41 import Control.Monad.Except
42 import Control.Monad.Reader
43 import Data.Functor ((<&>))
44 import Data.Kind (Type)
45 import Data.Map.Strict (Map)
46 import Data.Sequence (Seq, viewr, ViewR(..))
47 import Data.Time.Clock
48 import Network.HTTP.Client (Manager)
51 import qualified Servant.Job.Core as SJ
52 import qualified Servant.Job.Types as SJ
54 data JobEnv t w a = JobEnv
55 { jeSettings :: JobSettings
56 , jeState :: JobsState t w a
57 , jeManager :: Manager
61 :: (EnumBounded t, Monoid w)
66 newJobEnv js prios mgr = JobEnv js <$> newJobsState js prios <*> pure mgr
70 defaultJobSettings :: NumRunners -> SJ.SecretKey -> JobSettings
71 defaultJobSettings numRunners k = JobSettings
72 { jsNumRunners = numRunners
73 , jsJobTimeout = 30 * 60 -- 30 minutes
74 , jsIDTimeout = 30 * 60 -- 30 minutes
75 , jsGcPeriod = 1 * 60 -- 1 minute
79 genSecret :: IO SJ.SecretKey
80 genSecret = SJ.generateSecretKey
82 class MonadIO m => MonadJob m t w a | m -> t w a where
83 getJobEnv :: m (JobEnv t w a)
85 instance MonadIO m => MonadJob (ReaderT (JobEnv t w a) m) t w a where
88 getJobsSettings :: MonadJob m t w a => m JobSettings
89 getJobsSettings = jeSettings <$> getJobEnv
91 getJobsState :: MonadJob m t w a => m (JobsState t w a)
92 getJobsState = jeState <$> getJobEnv
94 getJobsMap :: MonadJob m t w a => m (JobMap (SJ.JobID 'SJ.Safe) w a)
95 getJobsMap = jobsData <$> getJobsState
97 getJobsQueue :: MonadJob m t w a => m (Queue t (SJ.JobID 'SJ.Safe))
98 getJobsQueue = jobsQ <$> getJobsState
101 :: (MonadJob m t w a, Ord t)
104 -> (SJ.JobID 'SJ.Safe -> i -> Logger w -> IO a)
105 -> m (SJ.JobID 'SJ.Safe)
106 queueJob jobkind input f = do
107 js <- getJobsSettings
109 liftIO (pushJob jobkind input f js st)
114 -> m (Maybe (JobEntry (SJ.JobID 'SJ.Safe) w a))
117 liftIO $ lookupJob jid jmap
124 | JobException SomeException
129 => SJ.JobID 'SJ.Unsafe
130 -> m (Either JobError (SJ.JobID 'SJ.Safe))
131 checkJID (SJ.PrivateID tn n t d) = do
132 now <- liftIO getCurrentTime
133 js <- getJobsSettings
134 if | tn /= "job" -> return (Left InvalidIDType)
135 | now > addUTCTime (fromIntegral $ jsIDTimeout js) t -> return (Left IDExpired)
136 | d /= SJ.macID tn (jsSecretKey js) t n -> return (Left InvalidMacID)
137 | otherwise -> return $ Right (SJ.PrivateID tn n t d)
141 => SJ.JobID 'SJ.Unsafe
142 -> (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) w a -> m r)
143 -> m (Either JobError (Maybe r))
147 Left e -> return (Left e)
151 Nothing -> return (Right Nothing)
152 Just j -> Right . Just <$> f jid' j
157 -> m (Either JobError a)
159 handleIDError toE act = act >>= \r -> case r of
160 Left err -> throwError (toE err)
164 :: (Ord t, MonadJob m t w a)
165 => Bool -- is it queued (and we have to remove jid from queue)
169 removeJob queued t jid = do
172 liftIO . atomically $ do
178 -- Tracking jobs status
181 -- | An opaque handle that abstracts over the concrete identifier for
182 -- a job. The constructor for this type is deliberately not exported.
183 data JobHandle m event = JobHandle {
184 _jh_id :: !(SJ.JobID 'SJ.Safe)
185 , _jh_logger :: LoggerM m event
188 -- | Creates a new 'JobHandle', given its underlying 'JobID' and the logging function to
189 -- be used to report the status.
190 mkJobHandle :: SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m event
191 mkJobHandle jId = JobHandle jId
193 jobHandleLogger :: JobHandle m event -> LoggerM m event
194 jobHandleLogger (JobHandle _ lgr) = lgr
196 -- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
197 class MonadJob m (JobType m) (Seq (JobEventType m)) (JobOutputType m) => MonadJobStatus m where
198 type JobType m :: Type
199 type JobOutputType m :: Type
200 type JobEventType m :: Type
203 -- Tracking jobs status API
206 -- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
207 -- used to query the latest status for a particular job, given its 'JobHandle' as input.
208 getLatestJobStatus :: MonadJobStatus m => JobHandle m (JobEventType m) -> m (Maybe (JobEventType m))
209 getLatestJobStatus (JobHandle jId _) = do
212 Nothing -> pure Nothing
213 Just j -> case jTask j of
214 QueuedJ _ -> pure Nothing
215 RunningJ rj -> liftIO (rjGetLog rj) <&>
216 \lgs -> case viewr lgs of
219 DoneJ lgs _ -> pure $ case viewr lgs of
223 updateJobProgress :: (Monoid (JobEventType m), MonadJobStatus m)
224 => JobHandle m (JobEventType m)
225 -- ^ The handle that uniquely identifies this job.
226 -> (JobEventType m -> JobEventType m)
227 -- ^ A /pure/ function to update the 'JobEventType'. The input
228 -- is the /latest/ event, i.e. the current progress status. If
229 -- this is the first time we report progress and therefore there
230 -- is no previous progress status, this function will be applied
231 -- over 'mempty', thus the 'Monoid' constraint.
233 updateJobProgress hdl@(JobHandle _jId logStatus) updateJobStatus = do
234 latestStatus <- getLatestJobStatus hdl
236 Nothing -> logStatus (updateJobStatus mempty)
237 Just s -> logStatus (updateJobStatus s)