1 {-# LANGUAGE ScopedTypeVariables #-}
2 {-# LANGUAGE TypeFamilies #-}
3 {-# LANGUAGE ViewPatterns #-}
4 module Gargantext.Utils.Jobs.Internal (
6 -- * Internals for testing
10 import Control.Concurrent
11 import Control.Concurrent.Async
12 import Control.Exception
15 import Control.Monad.Except
16 import Data.Aeson (ToJSON)
17 import Data.Foldable (toList)
19 import Data.Kind (Type)
20 import Data.Sequence (Seq)
21 import qualified Data.Sequence as Seq
25 import Gargantext.Utils.Jobs.Map
26 import Gargantext.Utils.Jobs.Monad
28 import qualified Data.Text as T
29 import qualified Servant.Client as C
30 import qualified Servant.Job.Async as SJ
31 import qualified Servant.Job.Client as SJ
32 import qualified Servant.Job.Types as SJ
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Seq event) output
37 , ToJSON e, ToJSON event, ToJSON output
40 , MimeRender JSON output
42 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
46 -> (env -> JobHandle m -> input -> IO (Either e output))
47 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
48 serveJobsAPI newJobHandle getenv t joberr f
49 = newJob newJobHandle getenv t f (SJ.JobInput undefined Nothing)
50 :<|> newJob newJobHandle getenv t f
51 :<|> serveJobAPI t joberr
54 :: forall (m :: Type -> Type) e t event output.
55 (Ord t, MonadError e m, MonadJob m t (Seq event) output)
58 -> SJ.JobID 'SJ.Unsafe
59 -> SJ.AsyncJobServerT event output m
60 serveJobAPI t joberr jid' = wrap' (killJob t)
62 :<|> wrap (waitJob joberr)
66 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
69 jid <- handleIDError joberr (checkJID jid')
70 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
73 wrap' g limit offset = wrap (g limit offset)
76 :: ( Ord t, Exception e, MonadJob m t (Seq event) output
78 , MimeRender JSON output
79 , ToJSON e, ToJSON event, ToJSON output
82 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
85 -> (env -> JobHandle m -> input -> IO (Either e output))
86 -> SJ.JobInput callbacks input
87 -> m (SJ.JobStatus 'SJ.Safe event)
88 newJob newJobHandle getenv jobkind f input = do
91 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
92 C.runClientM (SJ.clientMCallback m)
93 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
95 pushLog logF = \w -> do
96 postCallback (SJ.mkChanEvent w)
100 r <- f env (newJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
102 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
103 Right a -> postCallback (SJ.mkChanResult a) >> return a
105 jid <- queueJob jobkind (input ^. SJ.job_input) f'
106 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
109 :: MonadJob m t (Seq event) output
113 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
114 -> m (SJ.JobStatus 'SJ.Safe event)
115 pollJob limit offset jid je = do
116 (logs, status, merr) <- case jTask je of
117 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
118 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
119 <*> pure SJ.IsRunning
122 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
123 me = either (Just . T.pack . show) (const Nothing) r
125 -- /NOTE/: We need to be careful with the ordering of the logs here:
126 -- we want to return the logs ordered from the newest to the oldest,
127 -- because the API will use 'limit' to show only the newest ones,
128 -- taking 'limit' of them from the front of the list.
130 -- Due to the fact we do not force any 'Ord' constraint on an 'event' type,
131 -- and it would be inefficient to reverse the list here, it's important
132 -- that the concrete implementation of 'rjGetLog' returns the logs in the
134 pure $ SJ.jobStatus jid limit offset (toList logs) status merr
137 :: (MonadError e m, MonadJob m t (Seq event) output)
140 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
141 -> m (SJ.JobOutput output)
142 waitJob joberr jid je = do
143 r <- case jTask je of
146 erj <- waitTilRunning
148 Left res -> return res
150 (res, _logs) <- liftIO (waitJobDone jid rj m)
154 (res, _logs) <- liftIO (waitJobDone jid rj m)
156 DoneJ _ls res -> return res
157 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
159 where waitTilRunning =
160 findJob jid >>= \mjob -> case mjob of
161 Nothing -> error "impossible"
162 Just je' -> case jTask je' of
164 liftIO $ threadDelay 50000 -- wait 50ms
166 RunningJ rj -> return (Right rj)
167 DoneJ _ls res -> return (Left res)
170 :: (Ord t, MonadJob m t (Seq event) output)
175 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
176 -> m (SJ.JobStatus 'SJ.Safe event)
177 killJob t limit offset jid je = do
178 (logs, status, merr) <- case jTask je of
181 return (mempty, SJ.IsKilled, Nothing)
183 liftIO $ cancel (rjAsync rj)
184 lgs <- liftIO (rjGetLog rj)
185 removeJob False t jid
186 return (lgs, SJ.IsKilled, Nothing)
188 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
189 me = either (Just . T.pack . show) (const Nothing) r
190 removeJob False t jid
192 -- /NOTE/: Same proviso as in 'pollJob' applies here.
193 pure $ SJ.jobStatus jid limit offset (toList logs) status merr