1 {-# LANGUAGE ScopedTypeVariables #-}
2 {-# LANGUAGE TypeFamilies #-}
3 {-# LANGUAGE ViewPatterns #-}
4 module Gargantext.Utils.Jobs.Internal (
6 -- * Internals for testing
10 import Control.Concurrent
11 import Control.Concurrent.Async
12 import Control.Exception
15 import Control.Monad.Except
16 import Data.Aeson (ToJSON)
17 import Data.Foldable (toList)
19 import Data.Kind (Type)
20 import Data.Sequence (Seq)
21 import qualified Data.Sequence as Seq
25 import Gargantext.Utils.Jobs.Map
26 import Gargantext.Utils.Jobs.Monad
28 import qualified Data.Text as T
29 import qualified Servant.Client as C
30 import qualified Servant.Job.Async as SJ
31 import qualified Servant.Job.Client as SJ
32 import qualified Servant.Job.Types as SJ
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Seq event) output
37 , ToJSON e, ToJSON event, ToJSON output
40 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
44 -> (env -> JobHandle m -> input -> IO (Either e output))
45 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
46 serveJobsAPI newJobHandle getenv t joberr f
47 = newJob newJobHandle getenv t f (SJ.JobInput undefined Nothing)
48 :<|> newJob newJobHandle getenv t f
49 :<|> serveJobAPI t joberr
52 :: forall (m :: Type -> Type) e t event output.
53 (Ord t, MonadError e m, MonadJob m t (Seq event) output)
56 -> SJ.JobID 'SJ.Unsafe
57 -> SJ.AsyncJobServerT event output m
58 serveJobAPI t joberr jid' = wrap' (killJob t)
60 :<|> wrap (waitJob joberr)
64 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
67 jid <- handleIDError joberr (checkJID jid')
68 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
71 wrap' g limit offset = wrap (g limit offset)
74 :: ( Ord t, Exception e, MonadJob m t (Seq event) output
75 , ToJSON e, ToJSON event, ToJSON output
78 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
81 -> (env -> JobHandle m -> input -> IO (Either e output))
82 -> SJ.JobInput callbacks input
83 -> m (SJ.JobStatus 'SJ.Safe event)
84 newJob newJobHandle getenv jobkind f input = do
87 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
88 C.runClientM (SJ.clientMCallback m)
89 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
91 pushLog logF = \w -> do
92 postCallback (SJ.mkChanEvent w)
96 r <- f env (newJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
98 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
99 Right a -> postCallback (SJ.mkChanResult a) >> return a
101 jid <- queueJob jobkind (input ^. SJ.job_input) f'
102 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
105 :: MonadJob m t (Seq event) output
109 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
110 -> m (SJ.JobStatus 'SJ.Safe event)
111 pollJob limit offset jid je = do
112 (logs, status, merr) <- case jTask je of
113 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
114 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
115 <*> pure SJ.IsRunning
118 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
119 me = either (Just . T.pack . show) (const Nothing) r
121 pure $ SJ.jobStatus jid limit offset (toList logs) status merr
124 :: (MonadError e m, MonadJob m t (Seq event) output)
127 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
128 -> m (SJ.JobOutput output)
129 waitJob joberr jid je = do
130 r <- case jTask je of
133 erj <- waitTilRunning
135 Left res -> return res
137 (res, _logs) <- liftIO (waitJobDone jid rj m)
141 (res, _logs) <- liftIO (waitJobDone jid rj m)
143 DoneJ _ls res -> return res
144 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
146 where waitTilRunning =
147 findJob jid >>= \mjob -> case mjob of
148 Nothing -> error "impossible"
149 Just je' -> case jTask je' of
151 liftIO $ threadDelay 50000 -- wait 50ms
153 RunningJ rj -> return (Right rj)
154 DoneJ _ls res -> return (Left res)
157 :: (Ord t, MonadJob m t (Seq event) output)
162 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
163 -> m (SJ.JobStatus 'SJ.Safe event)
164 killJob t limit offset jid je = do
165 (logs, status, merr) <- case jTask je of
168 return (mempty, SJ.IsKilled, Nothing)
170 liftIO $ cancel (rjAsync rj)
171 lgs <- liftIO (rjGetLog rj)
172 removeJob False t jid
173 return (lgs, SJ.IsKilled, Nothing)
175 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
176 me = either (Just . T.pack . show) (const Nothing) r
177 removeJob False t jid
179 pure $ SJ.jobStatus jid limit offset (toList logs) status merr