1 {-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
2 module Gargantext.Utils.Jobs.API where
4 import Control.Concurrent
5 import Control.Concurrent.Async
6 import Control.Exception
9 import Control.Monad.Except
10 import Data.Aeson (ToJSON)
12 import Data.Kind (Type)
16 import Gargantext.Utils.Jobs.Map
17 import Gargantext.Utils.Jobs.Monad
19 import qualified Data.Text as T
20 import qualified Servant.Client as C
21 import qualified Servant.Job.Async as SJ
22 import qualified Servant.Job.Client as SJ
23 import qualified Servant.Job.Types as SJ
26 :: ( Ord t, Exception e, MonadError e m
27 , MonadJob m t (Dual [event]) output
28 , ToJSON e, ToJSON event, ToJSON output
34 -> (env -> input -> Logger event -> IO (Either e output))
35 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
36 serveJobsAPI getenv t joberr f
37 = newJob getenv t f (SJ.JobInput undefined Nothing)
38 :<|> newJob getenv t f
39 :<|> serveJobAPI t joberr
42 :: forall (m :: Type -> Type) e t event output.
43 (Ord t, MonadError e m, MonadJob m t (Dual [event]) output)
46 -> SJ.JobID 'SJ.Unsafe
47 -> SJ.AsyncJobServerT event output m
48 serveJobAPI t joberr jid' = wrap' (killJob t)
50 :<|> wrap (waitJob joberr)
54 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output -> m a)
57 jid <- handleIDError joberr (checkJID jid')
58 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
61 wrap' g limit offset = wrap (g limit offset)
64 :: ( Ord t, Exception e, MonadJob m t (Dual [event]) output
65 , ToJSON e, ToJSON event, ToJSON output
70 -> (env -> input -> Logger event -> IO (Either e output))
71 -> SJ.JobInput callbacks input
72 -> m (SJ.JobStatus 'SJ.Safe event)
73 newJob getenv jobkind f input = do
76 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
77 C.runClientM (SJ.clientMCallback m)
78 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
81 postCallback (SJ.mkChanEvent e)
85 r <- f env inp (pushLog logF . Dual . (:[]))
87 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
88 Right a -> postCallback (SJ.mkChanResult a) >> return a
90 jid <- queueJob jobkind (input ^. SJ.job_input) f'
91 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
94 :: MonadJob m t (Dual [event]) output
98 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
99 -> m (SJ.JobStatus 'SJ.Safe event)
100 pollJob limit offset jid je = do
101 (Dual logs, status, merr) <- case jTask je of
102 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
103 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
104 <*> pure SJ.IsRunning
107 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
108 me = either (Just . T.pack . show) (const Nothing) r
110 pure $ SJ.jobStatus jid limit offset logs status merr
113 :: (MonadError e m, MonadJob m t (Dual [event]) output)
116 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
117 -> m (SJ.JobOutput output)
118 waitJob joberr jid je = do
119 r <- case jTask je of
122 erj <- waitTilRunning
124 Left res -> return res
126 (res, _logs) <- liftIO (waitJobDone jid rj m)
130 (res, _logs) <- liftIO (waitJobDone jid rj m)
132 DoneJ _ls res -> return res
133 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
135 where waitTilRunning =
136 findJob jid >>= \mjob -> case mjob of
137 Nothing -> error "impossible"
138 Just je' -> case jTask je' of
140 liftIO $ threadDelay 50000 -- wait 50ms
142 RunningJ rj -> return (Right rj)
143 DoneJ _ls res -> return (Left res)
146 :: (Ord t, MonadJob m t (Dual [event]) output)
151 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
152 -> m (SJ.JobStatus 'SJ.Safe event)
153 killJob t limit offset jid je = do
154 (Dual logs, status, merr) <- case jTask je of
157 return (mempty, SJ.IsKilled, Nothing)
159 liftIO $ cancel (rjAsync rj)
160 lgs <- liftIO (rjGetLog rj)
161 removeJob False t jid
162 return (lgs, SJ.IsKilled, Nothing)
164 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
165 me = either (Just . T.pack . show) (const Nothing) r
166 removeJob False t jid
168 pure $ SJ.jobStatus jid limit offset logs status merr