1 {-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
2 module Gargantext.Utils.Jobs.API where
4 import Control.Concurrent
5 import Control.Concurrent.Async
6 import Control.Exception
9 import Control.Monad.Except
10 import Data.Aeson (ToJSON)
15 import Gargantext.Utils.Jobs.Map
16 import Gargantext.Utils.Jobs.Monad
18 import qualified Data.Text as T
19 import qualified Servant.Client as C
20 import qualified Servant.Job.Async as SJ
21 import qualified Servant.Job.Client as SJ
22 import qualified Servant.Job.Types as SJ
25 :: ( Ord t, Exception e, MonadError e m
26 , MonadJob m t (Dual [event]) output
27 , ToJSON e, ToJSON event, ToJSON output
33 -> (env -> input -> Logger event -> IO (Either e output))
34 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
35 serveJobsAPI getenv t joberr f
36 = newJob getenv t f (SJ.JobInput undefined Nothing)
37 :<|> newJob getenv t f
38 :<|> serveJobAPI t joberr
41 :: forall (m :: * -> *) e t event output.
42 (Ord t, MonadError e m, MonadJob m t (Dual [event]) output)
45 -> SJ.JobID 'SJ.Unsafe
46 -> SJ.AsyncJobServerT event output m
47 serveJobAPI t joberr jid' = wrap' (killJob t)
49 :<|> wrap (waitJob joberr)
53 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output -> m a)
56 jid <- handleIDError joberr (checkJID jid')
57 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
60 wrap' g limit offset = wrap (g limit offset)
63 :: ( Ord t, Exception e, MonadJob m t (Dual [event]) output
64 , ToJSON e, ToJSON event, ToJSON output
69 -> (env -> input -> Logger event -> IO (Either e output))
70 -> SJ.JobInput callbacks input
71 -> m (SJ.JobStatus 'SJ.Safe event)
72 newJob getenv jobkind f input = do
75 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
76 C.runClientM (SJ.clientMCallback m)
77 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
80 postCallback (SJ.mkChanEvent e)
84 r <- f env inp (pushLog logF . Dual . (:[]))
86 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
87 Right a -> postCallback (SJ.mkChanResult a) >> return a
89 jid <- queueJob jobkind (input ^. SJ.job_input) f'
90 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
93 :: MonadJob m t (Dual [event]) output
97 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
98 -> m (SJ.JobStatus 'SJ.Safe event)
99 pollJob limit offset jid je = do
100 (Dual logs, status, merr) <- case jTask je of
101 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
102 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
103 <*> pure SJ.IsRunning
106 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
107 me = either (Just . T.pack . show) (const Nothing) r
109 pure $ SJ.jobStatus jid limit offset logs status merr
112 :: (MonadError e m, MonadJob m t (Dual [event]) output)
115 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
116 -> m (SJ.JobOutput output)
117 waitJob joberr jid je = do
118 r <- case jTask je of
121 erj <- waitTilRunning
123 Left res -> return res
125 (res, _logs) <- liftIO (waitJobDone jid rj m)
129 (res, _logs) <- liftIO (waitJobDone jid rj m)
131 DoneJ _ls res -> return res
132 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
134 where waitTilRunning =
135 findJob jid >>= \mjob -> case mjob of
136 Nothing -> error "impossible"
137 Just je' -> case jTask je' of
139 liftIO $ threadDelay 50000 -- wait 50ms
141 RunningJ rj -> return (Right rj)
142 DoneJ _ls res -> return (Left res)
145 :: (Ord t, MonadJob m t (Dual [event]) output)
150 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
151 -> m (SJ.JobStatus 'SJ.Safe event)
152 killJob t limit offset jid je = do
153 (Dual logs, status, merr) <- case jTask je of
156 return (mempty, SJ.IsKilled, Nothing)
158 liftIO $ cancel (rjAsync rj)
159 lgs <- liftIO (rjGetLog rj)
160 removeJob False t jid
161 return (lgs, SJ.IsKilled, Nothing)
163 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
164 me = either (Just . T.pack . show) (const Nothing) r
165 removeJob False t jid
167 pure $ SJ.jobStatus jid limit offset logs status merr