]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Internal.hs
[VERSION] +1 to 0.0.6.9.9.3
[gargantext.git] / src / Gargantext / Utils / Jobs / Internal.hs
1 {-# LANGUAGE ScopedTypeVariables #-}
2 {-# LANGUAGE TypeFamilies #-}
3 {-# LANGUAGE ViewPatterns #-}
4 module Gargantext.Utils.Jobs.Internal (
5 serveJobsAPI
6 -- * Internals for testing
7 , newJob
8 ) where
9
10 import Control.Concurrent
11 import Control.Concurrent.Async
12 import Control.Exception
13 import Control.Lens
14 import Control.Monad
15 import Control.Monad.Except
16 import Data.Aeson (ToJSON)
17 import Data.Foldable (toList)
18 import Data.Monoid
19 import Data.Kind (Type)
20 import Data.Sequence (Seq)
21 import qualified Data.Sequence as Seq
22 import Prelude
23 import Servant.API
24
25 import Gargantext.Utils.Jobs.Map
26 import Gargantext.Utils.Jobs.Monad
27
28 import qualified Data.Text as T
29 import qualified Servant.Client as C
30 import qualified Servant.Job.Async as SJ
31 import qualified Servant.Job.Client as SJ
32 import qualified Servant.Job.Types as SJ
33
34 serveJobsAPI
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Seq event) output
37 , ToJSON e, ToJSON event, ToJSON output
38 , Foldable callback
39 )
40 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
41 -> m env
42 -> t
43 -> (JobError -> e)
44 -> (env -> JobHandle m -> input -> IO (Either e output))
45 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
46 serveJobsAPI newJobHandle getenv t joberr f
47 = newJob newJobHandle getenv t f (SJ.JobInput undefined Nothing)
48 :<|> newJob newJobHandle getenv t f
49 :<|> serveJobAPI t joberr
50
51 serveJobAPI
52 :: forall (m :: Type -> Type) e t event output.
53 (Ord t, MonadError e m, MonadJob m t (Seq event) output)
54 => t
55 -> (JobError -> e)
56 -> SJ.JobID 'SJ.Unsafe
57 -> SJ.AsyncJobServerT event output m
58 serveJobAPI t joberr jid' = wrap' (killJob t)
59 :<|> wrap' pollJob
60 :<|> wrap (waitJob joberr)
61
62 where wrap
63 :: forall a.
64 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
65 -> m a
66 wrap g = do
67 jid <- handleIDError joberr (checkJID jid')
68 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
69 g jid job
70
71 wrap' g limit offset = wrap (g limit offset)
72
73 newJob
74 :: ( Ord t, Exception e, MonadJob m t (Seq event) output
75 , ToJSON e, ToJSON event, ToJSON output
76 , Foldable callbacks
77 )
78 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
79 -> m env
80 -> t
81 -> (env -> JobHandle m -> input -> IO (Either e output))
82 -> SJ.JobInput callbacks input
83 -> m (SJ.JobStatus 'SJ.Safe event)
84 newJob newJobHandle getenv jobkind f input = do
85 je <- getJobEnv
86 env <- getenv
87 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
88 C.runClientM (SJ.clientMCallback m)
89 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
90
91 pushLog logF = \w -> do
92 postCallback (SJ.mkChanEvent w)
93 logF w
94
95 f' jId inp logF = do
96 r <- f env (newJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
97 case r of
98 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
99 Right a -> postCallback (SJ.mkChanResult a) >> return a
100
101 jid <- queueJob jobkind (input ^. SJ.job_input) f'
102 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
103
104 pollJob
105 :: MonadJob m t (Seq event) output
106 => Maybe SJ.Limit
107 -> Maybe SJ.Offset
108 -> SJ.JobID 'SJ.Safe
109 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
110 -> m (SJ.JobStatus 'SJ.Safe event)
111 pollJob limit offset jid je = do
112 (logs, status, merr) <- case jTask je of
113 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
114 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
115 <*> pure SJ.IsRunning
116 <*> pure Nothing
117 DoneJ ls r ->
118 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
119 me = either (Just . T.pack . show) (const Nothing) r
120 in pure (ls, st, me)
121 -- /NOTE/: We need to be careful with the ordering of the logs here:
122 -- we want to return the logs ordered from the newest to the oldest,
123 -- because the API will use 'limit' to show only the newest ones,
124 -- taking 'limit' of them from the front of the list.
125 --
126 -- Due to the fact we do not force any 'Ord' constraint on an 'event' type,
127 -- and it would be inefficient to reverse the list here, it's important
128 -- that the concrete implementation of 'rjGetLog' returns the logs in the
129 -- correct order.
130 pure $ SJ.jobStatus jid limit offset (toList logs) status merr
131
132 waitJob
133 :: (MonadError e m, MonadJob m t (Seq event) output)
134 => (JobError -> e)
135 -> SJ.JobID 'SJ.Safe
136 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
137 -> m (SJ.JobOutput output)
138 waitJob joberr jid je = do
139 r <- case jTask je of
140 QueuedJ _qj -> do
141 m <- getJobsMap
142 erj <- waitTilRunning
143 case erj of
144 Left res -> return res
145 Right rj -> do
146 (res, _logs) <- liftIO (waitJobDone jid rj m)
147 return res
148 RunningJ rj -> do
149 m <- getJobsMap
150 (res, _logs) <- liftIO (waitJobDone jid rj m)
151 return res
152 DoneJ _ls res -> return res
153 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
154
155 where waitTilRunning =
156 findJob jid >>= \mjob -> case mjob of
157 Nothing -> error "impossible"
158 Just je' -> case jTask je' of
159 QueuedJ _qj -> do
160 liftIO $ threadDelay 50000 -- wait 50ms
161 waitTilRunning
162 RunningJ rj -> return (Right rj)
163 DoneJ _ls res -> return (Left res)
164
165 killJob
166 :: (Ord t, MonadJob m t (Seq event) output)
167 => t
168 -> Maybe SJ.Limit
169 -> Maybe SJ.Offset
170 -> SJ.JobID 'SJ.Safe
171 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
172 -> m (SJ.JobStatus 'SJ.Safe event)
173 killJob t limit offset jid je = do
174 (logs, status, merr) <- case jTask je of
175 QueuedJ _ -> do
176 removeJob True t jid
177 return (mempty, SJ.IsKilled, Nothing)
178 RunningJ rj -> do
179 liftIO $ cancel (rjAsync rj)
180 lgs <- liftIO (rjGetLog rj)
181 removeJob False t jid
182 return (lgs, SJ.IsKilled, Nothing)
183 DoneJ lgs r -> do
184 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
185 me = either (Just . T.pack . show) (const Nothing) r
186 removeJob False t jid
187 pure (lgs, st, me)
188 -- /NOTE/: Same proviso as in 'pollJob' applies here.
189 pure $ SJ.jobStatus jid limit offset (toList logs) status merr