]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Monad.hs
[FIX] Clean Text before sending it to NLP micro services + tests + clean code for...
[gargantext.git] / src / Gargantext / Utils / Jobs / Monad.hs
1 {-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses, TypeFamilies #-}
2 module Gargantext.Utils.Jobs.Monad (
3 -- * Types and classes
4 JobEnv(..)
5 , NumRunners
6 , JobError(..)
7
8 , MonadJob(..)
9
10 -- * Tracking jobs status
11 , MonadJobStatus(..)
12
13 -- * Functions
14 , newJobEnv
15 , defaultJobSettings
16 , genSecret
17 , getJobsSettings
18 , getJobsState
19 , getJobsMap
20 , getJobsQueue
21 , queueJob
22 , findJob
23 , checkJID
24 , withJob
25 , handleIDError
26 , removeJob
27 ) where
28
29 import Gargantext.Utils.Jobs.Settings
30 import Gargantext.Utils.Jobs.Map
31 import Gargantext.Utils.Jobs.Queue
32 import Gargantext.Utils.Jobs.State
33
34 import Control.Concurrent.STM
35 import Control.Exception
36 import Control.Monad.Except
37 import Control.Monad.Reader
38 import Data.Kind (Type)
39 import Data.Map.Strict (Map)
40 import Data.Time.Clock
41 import qualified Data.Text as T
42 import Network.HTTP.Client (Manager)
43 import Prelude
44
45 import qualified Servant.Job.Core as SJ
46 import qualified Servant.Job.Types as SJ
47
48 data JobEnv t w a = JobEnv
49 { jeSettings :: JobSettings
50 , jeState :: JobsState t w a
51 , jeManager :: Manager
52 }
53
54 newJobEnv
55 :: (EnumBounded t, Monoid w)
56 => JobSettings
57 -> Map t Prio
58 -> Manager
59 -> IO (JobEnv t w a)
60 newJobEnv js prios mgr = JobEnv js <$> newJobsState js prios <*> pure mgr
61
62 type NumRunners = Int
63
64 defaultJobSettings :: NumRunners -> SJ.SecretKey -> JobSettings
65 defaultJobSettings numRunners k = JobSettings
66 { jsNumRunners = numRunners
67 , jsJobTimeout = 30 * 60 -- 30 minutes
68 , jsIDTimeout = 30 * 60 -- 30 minutes
69 , jsGcPeriod = 1 * 60 -- 1 minute
70 , jsSecretKey = k
71 }
72
73 genSecret :: IO SJ.SecretKey
74 genSecret = SJ.generateSecretKey
75
76 class MonadIO m => MonadJob m t w a | m -> t w a where
77 getJobEnv :: m (JobEnv t w a)
78
79 instance MonadIO m => MonadJob (ReaderT (JobEnv t w a) m) t w a where
80 getJobEnv = ask
81
82 getJobsSettings :: MonadJob m t w a => m JobSettings
83 getJobsSettings = jeSettings <$> getJobEnv
84
85 getJobsState :: MonadJob m t w a => m (JobsState t w a)
86 getJobsState = jeState <$> getJobEnv
87
88 getJobsMap :: MonadJob m t w a => m (JobMap (SJ.JobID 'SJ.Safe) w a)
89 getJobsMap = jobsData <$> getJobsState
90
91 getJobsQueue :: MonadJob m t w a => m (Queue t (SJ.JobID 'SJ.Safe))
92 getJobsQueue = jobsQ <$> getJobsState
93
94 queueJob
95 :: (MonadJob m t w a, Ord t)
96 => t
97 -> i
98 -> (SJ.JobID 'SJ.Safe -> i -> Logger w -> IO a)
99 -> m (SJ.JobID 'SJ.Safe)
100 queueJob jobkind input f = do
101 js <- getJobsSettings
102 st <- getJobsState
103 liftIO (pushJob jobkind input f js st)
104
105 findJob
106 :: MonadJob m t w a
107 => SJ.JobID 'SJ.Safe
108 -> m (Maybe (JobEntry (SJ.JobID 'SJ.Safe) w a))
109 findJob jid = do
110 jmap <- getJobsMap
111 liftIO $ lookupJob jid jmap
112
113 data JobError
114 = InvalidIDType
115 | IDExpired
116 | InvalidMacID
117 | UnknownJob
118 | JobException SomeException
119 deriving Show
120
121 checkJID
122 :: MonadJob m t w a
123 => SJ.JobID 'SJ.Unsafe
124 -> m (Either JobError (SJ.JobID 'SJ.Safe))
125 checkJID (SJ.PrivateID tn n t d) = do
126 now <- liftIO getCurrentTime
127 js <- getJobsSettings
128 if | tn /= "job" -> return (Left InvalidIDType)
129 | now > addUTCTime (fromIntegral $ jsIDTimeout js) t -> return (Left IDExpired)
130 | d /= SJ.macID tn (jsSecretKey js) t n -> return (Left InvalidMacID)
131 | otherwise -> return $ Right (SJ.PrivateID tn n t d)
132
133 withJob
134 :: MonadJob m t w a
135 => SJ.JobID 'SJ.Unsafe
136 -> (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) w a -> m r)
137 -> m (Either JobError (Maybe r))
138 withJob jid f = do
139 r <- checkJID jid
140 case r of
141 Left e -> return (Left e)
142 Right jid' -> do
143 mj <- findJob jid'
144 case mj of
145 Nothing -> return (Right Nothing)
146 Just j -> Right . Just <$> f jid' j
147
148 handleIDError
149 :: MonadError e m
150 => (JobError -> e)
151 -> m (Either JobError a)
152 -> m a
153 handleIDError toE act = act >>= \r -> case r of
154 Left err -> throwError (toE err)
155 Right a -> return a
156
157 removeJob
158 :: (Ord t, MonadJob m t w a)
159 => Bool -- is it queued (and we have to remove jid from queue)
160 -> t
161 -> SJ.JobID 'SJ.Safe
162 -> m ()
163 removeJob queued t jid = do
164 q <- getJobsQueue
165 m <- getJobsMap
166 liftIO . atomically $ do
167 when queued $
168 deleteQueue t jid q
169 deleteJob jid m
170
171 --
172 -- Tracking jobs status
173 --
174
175 -- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
176 class MonadJobStatus m where
177
178 -- | This is type family for the concrete 'JobHandle' that is associated to
179 -- a job when it starts and it can be used to query for its completion status. Different environment
180 -- can decide how this will look like.
181 type JobHandle m :: Type
182
183 type JobType m :: Type
184 type JobOutputType m :: Type
185 type JobEventType m :: Type
186
187 -- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
188 -- used to query the latest status for a particular job, given its 'JobHandle' as input.
189 getLatestJobStatus :: JobHandle m -> m (JobEventType m)
190
191 -- | Adds an extra \"tracer\" that logs events to the passed action. Produces
192 -- a new 'JobHandle'.
193 withTracer :: Logger (JobEventType m) -> JobHandle m -> (JobHandle m -> m a) -> m a
194
195 -- Creating events
196
197 -- | Start tracking a new 'JobEventType' with 'n' remaining steps.
198 markStarted :: Int -> JobHandle m -> m ()
199
200 -- | Mark 'n' steps of the job as succeeded, while simultaneously substracting this number
201 -- from the remaining steps.
202 markProgress :: Int -> JobHandle m -> m ()
203
204 -- | Mark 'n' step of the job as failed, while simultaneously substracting this number
205 -- from the remaining steps. Attach an optional error message to the failure.
206 markFailure :: Int -> Maybe T.Text -> JobHandle m -> m ()
207
208 -- | Finish tracking a job by marking all the remaining steps as succeeded.
209 markComplete :: JobHandle m -> m ()
210
211 -- | Finish tracking a job by marking all the remaining steps as failed. Attach an optional
212 -- message to the failure.
213 markFailed :: Maybe T.Text -> JobHandle m -> m ()