1 {-# LANGUAGE ScopedTypeVariables #-}
2 {-# LANGUAGE TypeFamilies #-}
3 {-# LANGUAGE ViewPatterns #-}
4 module Gargantext.Utils.Jobs.Internal (
6 -- * Internals for testing
10 import Control.Concurrent
11 import Control.Concurrent.Async
12 import Control.Exception
15 import Control.Monad.Except
16 import Data.Aeson (ToJSON)
17 import Data.Foldable (toList)
19 import Data.Kind (Type)
20 import Data.Sequence (Seq)
21 import qualified Data.Sequence as Seq
25 import Gargantext.Utils.Jobs.Map
26 import Gargantext.Utils.Jobs.Monad
28 import qualified Data.Text as T
29 import qualified Servant.Client as C
30 import qualified Servant.Job.Async as SJ
31 import qualified Servant.Job.Client as SJ
32 import qualified Servant.Job.Types as SJ
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Seq event) output
37 , ToJSON e, ToJSON event, ToJSON output
43 -> (env -> JobHandle m event -> input -> IO (Either e output))
44 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
45 serveJobsAPI getenv t joberr f
46 = newJob getenv t f (SJ.JobInput undefined Nothing)
47 :<|> newJob getenv t f
48 :<|> serveJobAPI t joberr
51 :: forall (m :: Type -> Type) e t event output.
52 (Ord t, MonadError e m, MonadJob m t (Seq event) output)
55 -> SJ.JobID 'SJ.Unsafe
56 -> SJ.AsyncJobServerT event output m
57 serveJobAPI t joberr jid' = wrap' (killJob t)
59 :<|> wrap (waitJob joberr)
63 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
66 jid <- handleIDError joberr (checkJID jid')
67 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
70 wrap' g limit offset = wrap (g limit offset)
73 :: ( Ord t, Exception e, MonadJob m t (Seq event) output
74 , ToJSON e, ToJSON event, ToJSON output
79 -> (env -> JobHandle m event -> input -> IO (Either e output))
80 -> SJ.JobInput callbacks input
81 -> m (SJ.JobStatus 'SJ.Safe event)
82 newJob getenv jobkind f input = do
85 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
86 C.runClientM (SJ.clientMCallback m)
87 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
89 pushLog logF = \w -> do
90 postCallback (SJ.mkChanEvent w)
94 r <- f env (mkJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
96 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
97 Right a -> postCallback (SJ.mkChanResult a) >> return a
99 jid <- queueJob jobkind (input ^. SJ.job_input) f'
100 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
103 :: MonadJob m t (Seq event) output
107 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
108 -> m (SJ.JobStatus 'SJ.Safe event)
109 pollJob limit offset jid je = do
110 (logs, status, merr) <- case jTask je of
111 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
112 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
113 <*> pure SJ.IsRunning
116 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
117 me = either (Just . T.pack . show) (const Nothing) r
119 pure $ SJ.jobStatus jid limit offset (toList logs) status merr
122 :: (MonadError e m, MonadJob m t (Seq event) output)
125 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
126 -> m (SJ.JobOutput output)
127 waitJob joberr jid je = do
128 r <- case jTask je of
131 erj <- waitTilRunning
133 Left res -> return res
135 (res, _logs) <- liftIO (waitJobDone jid rj m)
139 (res, _logs) <- liftIO (waitJobDone jid rj m)
141 DoneJ _ls res -> return res
142 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
144 where waitTilRunning =
145 findJob jid >>= \mjob -> case mjob of
146 Nothing -> error "impossible"
147 Just je' -> case jTask je' of
149 liftIO $ threadDelay 50000 -- wait 50ms
151 RunningJ rj -> return (Right rj)
152 DoneJ _ls res -> return (Left res)
155 :: (Ord t, MonadJob m t (Seq event) output)
160 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
161 -> m (SJ.JobStatus 'SJ.Safe event)
162 killJob t limit offset jid je = do
163 (logs, status, merr) <- case jTask je of
166 return (mempty, SJ.IsKilled, Nothing)
168 liftIO $ cancel (rjAsync rj)
169 lgs <- liftIO (rjGetLog rj)
170 removeJob False t jid
171 return (lgs, SJ.IsKilled, Nothing)
173 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
174 me = either (Just . T.pack . show) (const Nothing) r
175 removeJob False t jid
177 pure $ SJ.jobStatus jid limit offset (toList logs) status merr