1 {-# LANGUAGE ScopedTypeVariables #-}
2 {-# LANGUAGE TypeFamilies #-}
3 {-# LANGUAGE ViewPatterns #-}
4 module Gargantext.Utils.Jobs.Internal (
6 -- * Internals for testing
10 import Control.Concurrent
11 import Control.Concurrent.Async
12 import Control.Exception
15 import Control.Monad.Except
16 import Data.Aeson (ToJSON)
17 import Data.Foldable (toList)
19 import Data.Kind (Type)
20 import Data.Sequence (Seq)
21 import qualified Data.Sequence as Seq
25 import Gargantext.Utils.Jobs.Map
26 import Gargantext.Utils.Jobs.Monad
28 import qualified Data.Text as T
29 import qualified Servant.Client as C
30 import qualified Servant.Job.Async as SJ
31 import qualified Servant.Job.Client as SJ
32 import qualified Servant.Job.Types as SJ
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Seq event) output
37 , ToJSON e, ToJSON event, ToJSON output
40 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
44 -> (env -> JobHandle m -> input -> IO (Either e output))
45 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
46 serveJobsAPI newJobHandle getenv t joberr f
47 = newJob newJobHandle getenv t f (SJ.JobInput undefined Nothing)
48 :<|> newJob newJobHandle getenv t f
49 :<|> serveJobAPI t joberr
52 :: forall (m :: Type -> Type) e t event output.
53 (Ord t, MonadError e m, MonadJob m t (Seq event) output)
56 -> SJ.JobID 'SJ.Unsafe
57 -> SJ.AsyncJobServerT event output m
58 serveJobAPI t joberr jid' = wrap' (killJob t)
60 :<|> wrap (waitJob joberr)
64 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
67 jid <- handleIDError joberr (checkJID jid')
68 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
71 wrap' g limit offset = wrap (g limit offset)
74 :: ( Ord t, Exception e, MonadJob m t (Seq event) output
75 , ToJSON e, ToJSON event, ToJSON output
78 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
81 -> (env -> JobHandle m -> input -> IO (Either e output))
82 -> SJ.JobInput callbacks input
83 -> m (SJ.JobStatus 'SJ.Safe event)
84 newJob newJobHandle getenv jobkind f input = do
87 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
88 C.runClientM (SJ.clientMCallback m)
89 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
91 pushLog logF = \w -> do
92 postCallback (SJ.mkChanEvent w)
96 r <- f env (newJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
98 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
99 Right a -> postCallback (SJ.mkChanResult a) >> return a
101 jid <- queueJob jobkind (input ^. SJ.job_input) f'
102 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
105 :: MonadJob m t (Seq event) output
109 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
110 -> m (SJ.JobStatus 'SJ.Safe event)
111 pollJob limit offset jid je = do
112 (logs, status, merr) <- case jTask je of
113 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
114 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
115 <*> pure SJ.IsRunning
118 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
119 me = either (Just . T.pack . show) (const Nothing) r
121 -- /NOTE/: We need to be careful with the ordering of the logs here:
122 -- we want to return the logs ordered from the newest to the oldest,
123 -- because the API will use 'limit' to show only the newest ones,
124 -- taking 'limit' of them from the front of the list.
126 -- Due to the fact we do not force any 'Ord' constraint on an 'event' type,
127 -- and it would be inefficient to reverse the list here, it's important
128 -- that the concrete implementation of 'rjGetLog' returns the logs in the
130 pure $ SJ.jobStatus jid limit offset (toList logs) status merr
133 :: (MonadError e m, MonadJob m t (Seq event) output)
136 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
137 -> m (SJ.JobOutput output)
138 waitJob joberr jid je = do
139 r <- case jTask je of
142 erj <- waitTilRunning
144 Left res -> return res
146 (res, _logs) <- liftIO (waitJobDone jid rj m)
150 (res, _logs) <- liftIO (waitJobDone jid rj m)
152 DoneJ _ls res -> return res
153 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
155 where waitTilRunning =
156 findJob jid >>= \mjob -> case mjob of
157 Nothing -> error "impossible"
158 Just je' -> case jTask je' of
160 liftIO $ threadDelay 50000 -- wait 50ms
162 RunningJ rj -> return (Right rj)
163 DoneJ _ls res -> return (Left res)
166 :: (Ord t, MonadJob m t (Seq event) output)
171 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
172 -> m (SJ.JobStatus 'SJ.Safe event)
173 killJob t limit offset jid je = do
174 (logs, status, merr) <- case jTask je of
177 return (mempty, SJ.IsKilled, Nothing)
179 liftIO $ cancel (rjAsync rj)
180 lgs <- liftIO (rjGetLog rj)
181 removeJob False t jid
182 return (lgs, SJ.IsKilled, Nothing)
184 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
185 me = either (Just . T.pack . show) (const Nothing) r
186 removeJob False t jid
188 -- /NOTE/: Same proviso as in 'pollJob' applies here.
189 pure $ SJ.jobStatus jid limit offset (toList logs) status merr