1 {-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
2 module Gargantext.Utils.Jobs.Internal (
7 import Control.Concurrent
8 import Control.Concurrent.Async
9 import Control.Exception
12 import Control.Monad.Except
13 import Data.Aeson (ToJSON)
15 import Data.Kind (Type)
19 import Gargantext.Utils.Jobs.Map
20 import Gargantext.Utils.Jobs.Monad
22 import qualified Data.Text as T
23 import qualified Servant.Client as C
24 import qualified Servant.Job.Async as SJ
25 import qualified Servant.Job.Client as SJ
26 import qualified Servant.Job.Types as SJ
28 -- | An opaque handle that abstracts over the concrete identifier for
29 -- a job. The constructor for this type is deliberately not exported.
31 JobHandle { _jh_id :: SJ.JobID 'SJ.Safe }
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Dual [event]) output
37 , ToJSON e, ToJSON event, ToJSON output
43 -> (env -> JobHandle -> input -> Logger event -> IO (Either e output))
44 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
45 serveJobsAPI getenv t joberr f
46 = newJob getenv t f (SJ.JobInput undefined Nothing)
47 :<|> newJob getenv t f
48 :<|> serveJobAPI t joberr
51 :: forall (m :: Type -> Type) e t event output.
52 (Ord t, MonadError e m, MonadJob m t (Dual [event]) output)
55 -> SJ.JobID 'SJ.Unsafe
56 -> SJ.AsyncJobServerT event output m
57 serveJobAPI t joberr jid' = wrap' (killJob t)
59 :<|> wrap (waitJob joberr)
63 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output -> m a)
66 jid <- handleIDError joberr (checkJID jid')
67 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
70 wrap' g limit offset = wrap (g limit offset)
73 :: ( Ord t, Exception e, MonadJob m t (Dual [event]) output
74 , ToJSON e, ToJSON event, ToJSON output
79 -> (env -> JobHandle -> input -> Logger event -> IO (Either e output))
80 -> SJ.JobInput callbacks input
81 -> m (SJ.JobStatus 'SJ.Safe event)
82 newJob getenv jobkind f input = do
85 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
86 C.runClientM (SJ.clientMCallback m)
87 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
90 postCallback (SJ.mkChanEvent e)
94 r <- f env (JobHandle jId) inp (pushLog logF . Dual . (:[]))
96 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
97 Right a -> postCallback (SJ.mkChanResult a) >> return a
99 jid <- queueJob jobkind (input ^. SJ.job_input) f'
100 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
103 :: MonadJob m t (Dual [event]) output
107 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
108 -> m (SJ.JobStatus 'SJ.Safe event)
109 pollJob limit offset jid je = do
110 (Dual logs, status, merr) <- case jTask je of
111 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
112 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
113 <*> pure SJ.IsRunning
116 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
117 me = either (Just . T.pack . show) (const Nothing) r
119 pure $ SJ.jobStatus jid limit offset logs status merr
122 :: (MonadError e m, MonadJob m t (Dual [event]) output)
125 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
126 -> m (SJ.JobOutput output)
127 waitJob joberr jid je = do
128 r <- case jTask je of
131 erj <- waitTilRunning
133 Left res -> return res
135 (res, _logs) <- liftIO (waitJobDone jid rj m)
139 (res, _logs) <- liftIO (waitJobDone jid rj m)
141 DoneJ _ls res -> return res
142 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
144 where waitTilRunning =
145 findJob jid >>= \mjob -> case mjob of
146 Nothing -> error "impossible"
147 Just je' -> case jTask je' of
149 liftIO $ threadDelay 50000 -- wait 50ms
151 RunningJ rj -> return (Right rj)
152 DoneJ _ls res -> return (Left res)
155 :: (Ord t, MonadJob m t (Dual [event]) output)
160 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
161 -> m (SJ.JobStatus 'SJ.Safe event)
162 killJob t limit offset jid je = do
163 (Dual logs, status, merr) <- case jTask je of
166 return (mempty, SJ.IsKilled, Nothing)
168 liftIO $ cancel (rjAsync rj)
169 lgs <- liftIO (rjGetLog rj)
170 removeJob False t jid
171 return (lgs, SJ.IsKilled, Nothing)
173 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
174 me = either (Just . T.pack . show) (const Nothing) r
175 removeJob False t jid
177 pure $ SJ.jobStatus jid limit offset logs status merr