]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Internal.hs
impl: fix breaking changes with morpheus-graphql-core >=0.25
[gargantext.git] / src / Gargantext / Utils / Jobs / Internal.hs
1 {-# LANGUAGE ScopedTypeVariables #-}
2 {-# LANGUAGE TypeFamilies #-}
3 {-# LANGUAGE ViewPatterns #-}
4 module Gargantext.Utils.Jobs.Internal (
5 serveJobsAPI
6 -- * Internals for testing
7 , newJob
8 ) where
9
10 import Control.Concurrent
11 import Control.Concurrent.Async
12 import Control.Exception
13 import Control.Lens
14 import Control.Monad
15 import Control.Monad.Except
16 import Data.Aeson (ToJSON)
17 import Data.Foldable (toList)
18 import Data.Monoid
19 import Data.Kind (Type)
20 import Data.Sequence (Seq)
21 import qualified Data.Sequence as Seq
22 import Prelude
23 import Servant.API
24
25 import Gargantext.Utils.Jobs.Map
26 import Gargantext.Utils.Jobs.Monad
27
28 import qualified Data.Text as T
29 import qualified Servant.Client as C
30 import qualified Servant.Job.Async as SJ
31 import qualified Servant.Job.Client as SJ
32 import qualified Servant.Job.Types as SJ
33
34 serveJobsAPI
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Seq event) output
37 , ToJSON e, ToJSON event, ToJSON output
38 , Foldable callback
39 , MimeRender JSON e
40 , MimeRender JSON output
41 )
42 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
43 -> m env
44 -> t
45 -> (JobError -> e)
46 -> (env -> JobHandle m -> input -> IO (Either e output))
47 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
48 serveJobsAPI newJobHandle getenv t joberr f
49 = newJob newJobHandle getenv t f (SJ.JobInput undefined Nothing)
50 :<|> newJob newJobHandle getenv t f
51 :<|> serveJobAPI t joberr
52
53 serveJobAPI
54 :: forall (m :: Type -> Type) e t event output.
55 (Ord t, MonadError e m, MonadJob m t (Seq event) output)
56 => t
57 -> (JobError -> e)
58 -> SJ.JobID 'SJ.Unsafe
59 -> SJ.AsyncJobServerT event output m
60 serveJobAPI t joberr jid' = wrap' (killJob t)
61 :<|> wrap' pollJob
62 :<|> wrap (waitJob joberr)
63
64 where wrap
65 :: forall a.
66 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
67 -> m a
68 wrap g = do
69 jid <- handleIDError joberr (checkJID jid')
70 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
71 g jid job
72
73 wrap' g limit offset = wrap (g limit offset)
74
75 newJob
76 :: ( Ord t, Exception e, MonadJob m t (Seq event) output
77 , MimeRender JSON e
78 , MimeRender JSON output
79 , ToJSON e, ToJSON event, ToJSON output
80 , Foldable callbacks
81 )
82 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
83 -> m env
84 -> t
85 -> (env -> JobHandle m -> input -> IO (Either e output))
86 -> SJ.JobInput callbacks input
87 -> m (SJ.JobStatus 'SJ.Safe event)
88 newJob newJobHandle getenv jobkind f input = do
89 je <- getJobEnv
90 env <- getenv
91 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
92 C.runClientM (SJ.clientMCallback m)
93 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
94
95 pushLog logF = \w -> do
96 postCallback (SJ.mkChanEvent w)
97 logF w
98
99 f' jId inp logF = do
100 r <- f env (newJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
101 case r of
102 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
103 Right a -> postCallback (SJ.mkChanResult a) >> return a
104
105 jid <- queueJob jobkind (input ^. SJ.job_input) f'
106 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
107
108 pollJob
109 :: MonadJob m t (Seq event) output
110 => Maybe SJ.Limit
111 -> Maybe SJ.Offset
112 -> SJ.JobID 'SJ.Safe
113 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
114 -> m (SJ.JobStatus 'SJ.Safe event)
115 pollJob limit offset jid je = do
116 (logs, status, merr) <- case jTask je of
117 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
118 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
119 <*> pure SJ.IsRunning
120 <*> pure Nothing
121 DoneJ ls r ->
122 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
123 me = either (Just . T.pack . show) (const Nothing) r
124 in pure (ls, st, me)
125 -- /NOTE/: We need to be careful with the ordering of the logs here:
126 -- we want to return the logs ordered from the newest to the oldest,
127 -- because the API will use 'limit' to show only the newest ones,
128 -- taking 'limit' of them from the front of the list.
129 --
130 -- Due to the fact we do not force any 'Ord' constraint on an 'event' type,
131 -- and it would be inefficient to reverse the list here, it's important
132 -- that the concrete implementation of 'rjGetLog' returns the logs in the
133 -- correct order.
134 pure $ SJ.jobStatus jid limit offset (toList logs) status merr
135
136 waitJob
137 :: (MonadError e m, MonadJob m t (Seq event) output)
138 => (JobError -> e)
139 -> SJ.JobID 'SJ.Safe
140 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
141 -> m (SJ.JobOutput output)
142 waitJob joberr jid je = do
143 r <- case jTask je of
144 QueuedJ _qj -> do
145 m <- getJobsMap
146 erj <- waitTilRunning
147 case erj of
148 Left res -> return res
149 Right rj -> do
150 (res, _logs) <- liftIO (waitJobDone jid rj m)
151 return res
152 RunningJ rj -> do
153 m <- getJobsMap
154 (res, _logs) <- liftIO (waitJobDone jid rj m)
155 return res
156 DoneJ _ls res -> return res
157 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
158
159 where waitTilRunning =
160 findJob jid >>= \mjob -> case mjob of
161 Nothing -> error "impossible"
162 Just je' -> case jTask je' of
163 QueuedJ _qj -> do
164 liftIO $ threadDelay 50000 -- wait 50ms
165 waitTilRunning
166 RunningJ rj -> return (Right rj)
167 DoneJ _ls res -> return (Left res)
168
169 killJob
170 :: (Ord t, MonadJob m t (Seq event) output)
171 => t
172 -> Maybe SJ.Limit
173 -> Maybe SJ.Offset
174 -> SJ.JobID 'SJ.Safe
175 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
176 -> m (SJ.JobStatus 'SJ.Safe event)
177 killJob t limit offset jid je = do
178 (logs, status, merr) <- case jTask je of
179 QueuedJ _ -> do
180 removeJob True t jid
181 return (mempty, SJ.IsKilled, Nothing)
182 RunningJ rj -> do
183 liftIO $ cancel (rjAsync rj)
184 lgs <- liftIO (rjGetLog rj)
185 removeJob False t jid
186 return (lgs, SJ.IsKilled, Nothing)
187 DoneJ lgs r -> do
188 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
189 me = either (Just . T.pack . show) (const Nothing) r
190 removeJob False t jid
191 pure (lgs, st, me)
192 -- /NOTE/: Same proviso as in 'pollJob' applies here.
193 pure $ SJ.jobStatus jid limit offset (toList logs) status merr