]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Internal.hs
Pass a JobHandle to the serveJobsAPI continuation
[gargantext.git] / src / Gargantext / Utils / Jobs / Internal.hs
1 {-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
2 module Gargantext.Utils.Jobs.Internal (
3 serveJobsAPI
4 , JobHandle -- opaque
5 ) where
6
7 import Control.Concurrent
8 import Control.Concurrent.Async
9 import Control.Exception
10 import Control.Lens
11 import Control.Monad
12 import Control.Monad.Except
13 import Data.Aeson (ToJSON)
14 import Data.Monoid
15 import Data.Kind (Type)
16 import Prelude
17 import Servant.API
18
19 import Gargantext.Utils.Jobs.Map
20 import Gargantext.Utils.Jobs.Monad
21
22 import qualified Data.Text as T
23 import qualified Servant.Client as C
24 import qualified Servant.Job.Async as SJ
25 import qualified Servant.Job.Client as SJ
26 import qualified Servant.Job.Types as SJ
27
28 -- | An opaque handle that abstracts over the concrete identifier for
29 -- a job. The constructor for this type is deliberately not exported.
30 newtype JobHandle =
31 JobHandle { _jh_id :: SJ.JobID 'SJ.Safe }
32 deriving (Eq, Ord)
33
34 serveJobsAPI
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Dual [event]) output
37 , ToJSON e, ToJSON event, ToJSON output
38 , Foldable callback
39 )
40 => m env
41 -> t
42 -> (JobError -> e)
43 -> (env -> JobHandle -> input -> Logger event -> IO (Either e output))
44 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
45 serveJobsAPI getenv t joberr f
46 = newJob getenv t f (SJ.JobInput undefined Nothing)
47 :<|> newJob getenv t f
48 :<|> serveJobAPI t joberr
49
50 serveJobAPI
51 :: forall (m :: Type -> Type) e t event output.
52 (Ord t, MonadError e m, MonadJob m t (Dual [event]) output)
53 => t
54 -> (JobError -> e)
55 -> SJ.JobID 'SJ.Unsafe
56 -> SJ.AsyncJobServerT event output m
57 serveJobAPI t joberr jid' = wrap' (killJob t)
58 :<|> wrap' pollJob
59 :<|> wrap (waitJob joberr)
60
61 where wrap
62 :: forall a.
63 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output -> m a)
64 -> m a
65 wrap g = do
66 jid <- handleIDError joberr (checkJID jid')
67 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
68 g jid job
69
70 wrap' g limit offset = wrap (g limit offset)
71
72 newJob
73 :: ( Ord t, Exception e, MonadJob m t (Dual [event]) output
74 , ToJSON e, ToJSON event, ToJSON output
75 , Foldable callbacks
76 )
77 => m env
78 -> t
79 -> (env -> JobHandle -> input -> Logger event -> IO (Either e output))
80 -> SJ.JobInput callbacks input
81 -> m (SJ.JobStatus 'SJ.Safe event)
82 newJob getenv jobkind f input = do
83 je <- getJobEnv
84 env <- getenv
85 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
86 C.runClientM (SJ.clientMCallback m)
87 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
88
89 pushLog logF e = do
90 postCallback (SJ.mkChanEvent e)
91 logF e
92
93 f' jId inp logF = do
94 r <- f env (JobHandle jId) inp (pushLog logF . Dual . (:[]))
95 case r of
96 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
97 Right a -> postCallback (SJ.mkChanResult a) >> return a
98
99 jid <- queueJob jobkind (input ^. SJ.job_input) f'
100 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
101
102 pollJob
103 :: MonadJob m t (Dual [event]) output
104 => Maybe SJ.Limit
105 -> Maybe SJ.Offset
106 -> SJ.JobID 'SJ.Safe
107 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
108 -> m (SJ.JobStatus 'SJ.Safe event)
109 pollJob limit offset jid je = do
110 (Dual logs, status, merr) <- case jTask je of
111 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
112 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
113 <*> pure SJ.IsRunning
114 <*> pure Nothing
115 DoneJ ls r ->
116 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
117 me = either (Just . T.pack . show) (const Nothing) r
118 in pure (ls, st, me)
119 pure $ SJ.jobStatus jid limit offset logs status merr
120
121 waitJob
122 :: (MonadError e m, MonadJob m t (Dual [event]) output)
123 => (JobError -> e)
124 -> SJ.JobID 'SJ.Safe
125 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
126 -> m (SJ.JobOutput output)
127 waitJob joberr jid je = do
128 r <- case jTask je of
129 QueuedJ _qj -> do
130 m <- getJobsMap
131 erj <- waitTilRunning
132 case erj of
133 Left res -> return res
134 Right rj -> do
135 (res, _logs) <- liftIO (waitJobDone jid rj m)
136 return res
137 RunningJ rj -> do
138 m <- getJobsMap
139 (res, _logs) <- liftIO (waitJobDone jid rj m)
140 return res
141 DoneJ _ls res -> return res
142 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
143
144 where waitTilRunning =
145 findJob jid >>= \mjob -> case mjob of
146 Nothing -> error "impossible"
147 Just je' -> case jTask je' of
148 QueuedJ _qj -> do
149 liftIO $ threadDelay 50000 -- wait 50ms
150 waitTilRunning
151 RunningJ rj -> return (Right rj)
152 DoneJ _ls res -> return (Left res)
153
154 killJob
155 :: (Ord t, MonadJob m t (Dual [event]) output)
156 => t
157 -> Maybe SJ.Limit
158 -> Maybe SJ.Offset
159 -> SJ.JobID 'SJ.Safe
160 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
161 -> m (SJ.JobStatus 'SJ.Safe event)
162 killJob t limit offset jid je = do
163 (Dual logs, status, merr) <- case jTask je of
164 QueuedJ _ -> do
165 removeJob True t jid
166 return (mempty, SJ.IsKilled, Nothing)
167 RunningJ rj -> do
168 liftIO $ cancel (rjAsync rj)
169 lgs <- liftIO (rjGetLog rj)
170 removeJob False t jid
171 return (lgs, SJ.IsKilled, Nothing)
172 DoneJ lgs r -> do
173 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
174 me = either (Just . T.pack . show) (const Nothing) r
175 removeJob False t jid
176 pure (lgs, st, me)
177 pure $ SJ.jobStatus jid limit offset logs status merr