1 {-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses, TypeFamilies #-}
2 module Gargantext.Utils.Jobs.Monad (
11 -- * Tracking jobs status
33 import Gargantext.Utils.Jobs.Settings
34 import Gargantext.Utils.Jobs.Map
35 import Gargantext.Utils.Jobs.Queue
36 import Gargantext.Utils.Jobs.State
38 import Control.Concurrent.STM
39 import Control.Exception
40 import Control.Monad.Except
41 import Control.Monad.Reader
42 import Data.Functor ((<&>))
43 import Data.Kind (Type)
44 import Data.Map.Strict (Map)
45 import Data.Sequence (Seq, viewr, ViewR(..))
46 import Data.Time.Clock
47 import Network.HTTP.Client (Manager)
50 import qualified Servant.Job.Core as SJ
51 import qualified Servant.Job.Types as SJ
53 data JobEnv t w a = JobEnv
54 { jeSettings :: JobSettings
55 , jeState :: JobsState t w a
56 , jeManager :: Manager
60 :: (EnumBounded t, Monoid w)
65 newJobEnv js prios mgr = JobEnv js <$> newJobsState js prios <*> pure mgr
69 defaultJobSettings :: NumRunners -> SJ.SecretKey -> JobSettings
70 defaultJobSettings numRunners k = JobSettings
71 { jsNumRunners = numRunners
72 , jsJobTimeout = 30 * 60 -- 30 minutes
73 , jsIDTimeout = 30 * 60 -- 30 minutes
74 , jsGcPeriod = 1 * 60 -- 1 minute
78 genSecret :: IO SJ.SecretKey
79 genSecret = SJ.generateSecretKey
81 class MonadIO m => MonadJob m t w a | m -> t w a where
82 getJobEnv :: m (JobEnv t w a)
84 instance MonadIO m => MonadJob (ReaderT (JobEnv t w a) m) t w a where
87 getJobsSettings :: MonadJob m t w a => m JobSettings
88 getJobsSettings = jeSettings <$> getJobEnv
90 getJobsState :: MonadJob m t w a => m (JobsState t w a)
91 getJobsState = jeState <$> getJobEnv
93 getJobsMap :: MonadJob m t w a => m (JobMap (SJ.JobID 'SJ.Safe) w a)
94 getJobsMap = jobsData <$> getJobsState
96 getJobsQueue :: MonadJob m t w a => m (Queue t (SJ.JobID 'SJ.Safe))
97 getJobsQueue = jobsQ <$> getJobsState
100 :: (MonadJob m t w a, Ord t)
103 -> (SJ.JobID 'SJ.Safe -> i -> Logger w -> IO a)
104 -> m (SJ.JobID 'SJ.Safe)
105 queueJob jobkind input f = do
106 js <- getJobsSettings
108 liftIO (pushJob jobkind input f js st)
113 -> m (Maybe (JobEntry (SJ.JobID 'SJ.Safe) w a))
116 liftIO $ lookupJob jid jmap
123 | JobException SomeException
128 => SJ.JobID 'SJ.Unsafe
129 -> m (Either JobError (SJ.JobID 'SJ.Safe))
130 checkJID (SJ.PrivateID tn n t d) = do
131 now <- liftIO getCurrentTime
132 js <- getJobsSettings
133 if | tn /= "job" -> return (Left InvalidIDType)
134 | now > addUTCTime (fromIntegral $ jsIDTimeout js) t -> return (Left IDExpired)
135 | d /= SJ.macID tn (jsSecretKey js) t n -> return (Left InvalidMacID)
136 | otherwise -> return $ Right (SJ.PrivateID tn n t d)
140 => SJ.JobID 'SJ.Unsafe
141 -> (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) w a -> m r)
142 -> m (Either JobError (Maybe r))
146 Left e -> return (Left e)
150 Nothing -> return (Right Nothing)
151 Just j -> Right . Just <$> f jid' j
156 -> m (Either JobError a)
158 handleIDError toE act = act >>= \r -> case r of
159 Left err -> throwError (toE err)
163 :: (Ord t, MonadJob m t w a)
164 => Bool -- is it queued (and we have to remove jid from queue)
168 removeJob queued t jid = do
171 liftIO . atomically $ do
177 -- Tracking jobs status
180 -- | An opaque handle that abstracts over the concrete identifier for
181 -- a job. The constructor for this type is deliberately not exported.
182 data JobHandle event = JobHandle {
183 _jh_id :: !(SJ.JobID 'SJ.Safe)
184 , _jh_logger :: Logger event
187 -- | Creates a new 'JobHandle', given its underlying 'JobID' and the logging function to
188 -- be used to report the status.
189 mkJobHandle :: SJ.JobID 'SJ.Safe -> Logger event -> JobHandle event
190 mkJobHandle jId = JobHandle jId
192 jobHandleLogger :: JobHandle event -> Logger event
193 jobHandleLogger (JobHandle _ lgr) = lgr
195 -- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
196 class MonadJob m (JobType m) (Seq (JobEventType m)) (JobOutputType m) => MonadJobStatus m where
197 type JobType m :: Type
198 type JobOutputType m :: Type
199 type JobEventType m :: Type
201 instance MonadIO m => MonadJobStatus (ReaderT (JobEnv t (Seq event) a) m) where
202 type JobType (ReaderT (JobEnv t (Seq event) a) m) = t
203 type JobOutputType (ReaderT (JobEnv t (Seq event) a) m) = a
204 type JobEventType (ReaderT (JobEnv t (Seq event) a) m) = event
208 -- Tracking jobs status API
211 -- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
212 -- used to query the latest status for a particular job, given its 'JobHandle' as input.
213 getLatestJobStatus :: MonadJobStatus m => JobHandle (JobEventType m) -> m (Maybe (JobEventType m))
214 getLatestJobStatus (JobHandle jId _) = do
217 Nothing -> pure Nothing
218 Just j -> case jTask j of
219 QueuedJ _ -> pure Nothing
220 RunningJ rj -> liftIO (rjGetLog rj) <&>
221 \lgs -> case viewr lgs of
224 DoneJ lgs _ -> pure $ case viewr lgs of