]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Internal.hs
Merge branch 'dev' into 193-dev-api-query-dev-fix
[gargantext.git] / src / Gargantext / Utils / Jobs / Internal.hs
1 {-# LANGUAGE ScopedTypeVariables #-}
2 {-# LANGUAGE TypeFamilies #-}
3 {-# LANGUAGE ViewPatterns #-}
4 module Gargantext.Utils.Jobs.Internal (
5 serveJobsAPI
6 -- * Internals for testing
7 , newJob
8 ) where
9
10 import Control.Concurrent
11 import Control.Concurrent.Async
12 import Control.Exception
13 import Control.Lens
14 import Control.Monad
15 import Control.Monad.Except
16 import Data.Aeson (ToJSON)
17 import Data.Foldable (toList)
18 import Data.Monoid
19 import Data.Kind (Type)
20 import Data.Sequence (Seq)
21 import qualified Data.Sequence as Seq
22 import Prelude
23 import Servant.API
24
25 import Gargantext.Utils.Jobs.Map
26 import Gargantext.Utils.Jobs.Monad
27
28 import qualified Data.Text as T
29 import qualified Servant.Client as C
30 import qualified Servant.Job.Async as SJ
31 import qualified Servant.Job.Client as SJ
32 import qualified Servant.Job.Types as SJ
33
34 serveJobsAPI
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Seq event) output
37 , ToJSON e, ToJSON event, ToJSON output
38 , Foldable callback
39 )
40 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
41 -> m env
42 -> t
43 -> (JobError -> e)
44 -> (env -> JobHandle m -> input -> IO (Either e output))
45 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
46 serveJobsAPI newJobHandle getenv t joberr f
47 = newJob newJobHandle getenv t f (SJ.JobInput undefined Nothing)
48 :<|> newJob newJobHandle getenv t f
49 :<|> serveJobAPI t joberr
50
51 serveJobAPI
52 :: forall (m :: Type -> Type) e t event output.
53 (Ord t, MonadError e m, MonadJob m t (Seq event) output)
54 => t
55 -> (JobError -> e)
56 -> SJ.JobID 'SJ.Unsafe
57 -> SJ.AsyncJobServerT event output m
58 serveJobAPI t joberr jid' = wrap' (killJob t)
59 :<|> wrap' pollJob
60 :<|> wrap (waitJob joberr)
61
62 where wrap
63 :: forall a.
64 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
65 -> m a
66 wrap g = do
67 jid <- handleIDError joberr (checkJID jid')
68 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
69 g jid job
70
71 wrap' g limit offset = wrap (g limit offset)
72
73 newJob
74 :: ( Ord t, Exception e, MonadJob m t (Seq event) output
75 , ToJSON e, ToJSON event, ToJSON output
76 , Foldable callbacks
77 )
78 => (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
79 -> m env
80 -> t
81 -> (env -> JobHandle m -> input -> IO (Either e output))
82 -> SJ.JobInput callbacks input
83 -> m (SJ.JobStatus 'SJ.Safe event)
84 newJob newJobHandle getenv jobkind f input = do
85 je <- getJobEnv
86 env <- getenv
87 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
88 C.runClientM (SJ.clientMCallback m)
89 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
90
91 pushLog logF = \w -> do
92 postCallback (SJ.mkChanEvent w)
93 logF w
94
95 f' jId inp logF = do
96 r <- f env (newJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
97 case r of
98 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
99 Right a -> postCallback (SJ.mkChanResult a) >> return a
100
101 jid <- queueJob jobkind (input ^. SJ.job_input) f'
102 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
103
104 pollJob
105 :: MonadJob m t (Seq event) output
106 => Maybe SJ.Limit
107 -> Maybe SJ.Offset
108 -> SJ.JobID 'SJ.Safe
109 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
110 -> m (SJ.JobStatus 'SJ.Safe event)
111 pollJob limit offset jid je = do
112 (logs, status, merr) <- case jTask je of
113 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
114 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
115 <*> pure SJ.IsRunning
116 <*> pure Nothing
117 DoneJ ls r ->
118 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
119 me = either (Just . T.pack . show) (const Nothing) r
120 in pure (ls, st, me)
121 pure $ SJ.jobStatus jid limit offset (toList logs) status merr
122
123 waitJob
124 :: (MonadError e m, MonadJob m t (Seq event) output)
125 => (JobError -> e)
126 -> SJ.JobID 'SJ.Safe
127 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
128 -> m (SJ.JobOutput output)
129 waitJob joberr jid je = do
130 r <- case jTask je of
131 QueuedJ _qj -> do
132 m <- getJobsMap
133 erj <- waitTilRunning
134 case erj of
135 Left res -> return res
136 Right rj -> do
137 (res, _logs) <- liftIO (waitJobDone jid rj m)
138 return res
139 RunningJ rj -> do
140 m <- getJobsMap
141 (res, _logs) <- liftIO (waitJobDone jid rj m)
142 return res
143 DoneJ _ls res -> return res
144 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
145
146 where waitTilRunning =
147 findJob jid >>= \mjob -> case mjob of
148 Nothing -> error "impossible"
149 Just je' -> case jTask je' of
150 QueuedJ _qj -> do
151 liftIO $ threadDelay 50000 -- wait 50ms
152 waitTilRunning
153 RunningJ rj -> return (Right rj)
154 DoneJ _ls res -> return (Left res)
155
156 killJob
157 :: (Ord t, MonadJob m t (Seq event) output)
158 => t
159 -> Maybe SJ.Limit
160 -> Maybe SJ.Offset
161 -> SJ.JobID 'SJ.Safe
162 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
163 -> m (SJ.JobStatus 'SJ.Safe event)
164 killJob t limit offset jid je = do
165 (logs, status, merr) <- case jTask je of
166 QueuedJ _ -> do
167 removeJob True t jid
168 return (mempty, SJ.IsKilled, Nothing)
169 RunningJ rj -> do
170 liftIO $ cancel (rjAsync rj)
171 lgs <- liftIO (rjGetLog rj)
172 removeJob False t jid
173 return (lgs, SJ.IsKilled, Nothing)
174 DoneJ lgs r -> do
175 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
176 me = either (Just . T.pack . show) (const Nothing) r
177 removeJob False t jid
178 pure (lgs, st, me)
179 pure $ SJ.jobStatus jid limit offset (toList logs) status merr