]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/Internal.hs
Merge branch 'dev' into 175-dev-doc-table-count
[gargantext.git] / src / Gargantext / Utils / Jobs / Internal.hs
1 {-# LANGUAGE ScopedTypeVariables #-}
2 {-# LANGUAGE TypeFamilies #-}
3 {-# LANGUAGE ViewPatterns #-}
4 module Gargantext.Utils.Jobs.Internal (
5 serveJobsAPI
6 -- * Internals for testing
7 , newJob
8 ) where
9
10 import Control.Concurrent
11 import Control.Concurrent.Async
12 import Control.Exception
13 import Control.Lens
14 import Control.Monad
15 import Control.Monad.Except
16 import Data.Aeson (ToJSON)
17 import Data.Foldable (toList)
18 import Data.Monoid
19 import Data.Kind (Type)
20 import Data.Sequence (Seq)
21 import qualified Data.Sequence as Seq
22 import Prelude
23 import Servant.API
24
25 import Gargantext.Utils.Jobs.Map
26 import Gargantext.Utils.Jobs.Monad
27
28 import qualified Data.Text as T
29 import qualified Servant.Client as C
30 import qualified Servant.Job.Async as SJ
31 import qualified Servant.Job.Client as SJ
32 import qualified Servant.Job.Types as SJ
33
34 serveJobsAPI
35 :: ( Ord t, Exception e, MonadError e m
36 , MonadJob m t (Seq event) output
37 , ToJSON e, ToJSON event, ToJSON output
38 , Foldable callback
39 )
40 => m env
41 -> t
42 -> (JobError -> e)
43 -> (env -> JobHandle m event -> input -> IO (Either e output))
44 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
45 serveJobsAPI getenv t joberr f
46 = newJob getenv t f (SJ.JobInput undefined Nothing)
47 :<|> newJob getenv t f
48 :<|> serveJobAPI t joberr
49
50 serveJobAPI
51 :: forall (m :: Type -> Type) e t event output.
52 (Ord t, MonadError e m, MonadJob m t (Seq event) output)
53 => t
54 -> (JobError -> e)
55 -> SJ.JobID 'SJ.Unsafe
56 -> SJ.AsyncJobServerT event output m
57 serveJobAPI t joberr jid' = wrap' (killJob t)
58 :<|> wrap' pollJob
59 :<|> wrap (waitJob joberr)
60
61 where wrap
62 :: forall a.
63 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
64 -> m a
65 wrap g = do
66 jid <- handleIDError joberr (checkJID jid')
67 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
68 g jid job
69
70 wrap' g limit offset = wrap (g limit offset)
71
72 newJob
73 :: ( Ord t, Exception e, MonadJob m t (Seq event) output
74 , ToJSON e, ToJSON event, ToJSON output
75 , Foldable callbacks
76 )
77 => m env
78 -> t
79 -> (env -> JobHandle m event -> input -> IO (Either e output))
80 -> SJ.JobInput callbacks input
81 -> m (SJ.JobStatus 'SJ.Safe event)
82 newJob getenv jobkind f input = do
83 je <- getJobEnv
84 env <- getenv
85 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
86 C.runClientM (SJ.clientMCallback m)
87 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
88
89 pushLog logF = \w -> do
90 postCallback (SJ.mkChanEvent w)
91 logF w
92
93 f' jId inp logF = do
94 r <- f env (mkJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
95 case r of
96 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
97 Right a -> postCallback (SJ.mkChanResult a) >> return a
98
99 jid <- queueJob jobkind (input ^. SJ.job_input) f'
100 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
101
102 pollJob
103 :: MonadJob m t (Seq event) output
104 => Maybe SJ.Limit
105 -> Maybe SJ.Offset
106 -> SJ.JobID 'SJ.Safe
107 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
108 -> m (SJ.JobStatus 'SJ.Safe event)
109 pollJob limit offset jid je = do
110 (logs, status, merr) <- case jTask je of
111 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
112 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
113 <*> pure SJ.IsRunning
114 <*> pure Nothing
115 DoneJ ls r ->
116 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
117 me = either (Just . T.pack . show) (const Nothing) r
118 in pure (ls, st, me)
119 pure $ SJ.jobStatus jid limit offset (toList logs) status merr
120
121 waitJob
122 :: (MonadError e m, MonadJob m t (Seq event) output)
123 => (JobError -> e)
124 -> SJ.JobID 'SJ.Safe
125 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
126 -> m (SJ.JobOutput output)
127 waitJob joberr jid je = do
128 r <- case jTask je of
129 QueuedJ _qj -> do
130 m <- getJobsMap
131 erj <- waitTilRunning
132 case erj of
133 Left res -> return res
134 Right rj -> do
135 (res, _logs) <- liftIO (waitJobDone jid rj m)
136 return res
137 RunningJ rj -> do
138 m <- getJobsMap
139 (res, _logs) <- liftIO (waitJobDone jid rj m)
140 return res
141 DoneJ _ls res -> return res
142 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
143
144 where waitTilRunning =
145 findJob jid >>= \mjob -> case mjob of
146 Nothing -> error "impossible"
147 Just je' -> case jTask je' of
148 QueuedJ _qj -> do
149 liftIO $ threadDelay 50000 -- wait 50ms
150 waitTilRunning
151 RunningJ rj -> return (Right rj)
152 DoneJ _ls res -> return (Left res)
153
154 killJob
155 :: (Ord t, MonadJob m t (Seq event) output)
156 => t
157 -> Maybe SJ.Limit
158 -> Maybe SJ.Offset
159 -> SJ.JobID 'SJ.Safe
160 -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
161 -> m (SJ.JobStatus 'SJ.Safe event)
162 killJob t limit offset jid je = do
163 (logs, status, merr) <- case jTask je of
164 QueuedJ _ -> do
165 removeJob True t jid
166 return (mempty, SJ.IsKilled, Nothing)
167 RunningJ rj -> do
168 liftIO $ cancel (rjAsync rj)
169 lgs <- liftIO (rjGetLog rj)
170 removeJob False t jid
171 return (lgs, SJ.IsKilled, Nothing)
172 DoneJ lgs r -> do
173 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
174 me = either (Just . T.pack . show) (const Nothing) r
175 removeJob False t jid
176 pure (lgs, st, me)
177 pure $ SJ.jobStatus jid limit offset (toList logs) status merr