]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/API.hs
[FIX] numRunners to 1 by default for now
[gargantext.git] / src / Gargantext / Utils / Jobs / API.hs
1 {-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
2 module Gargantext.Utils.Jobs.API where
3
4 import Control.Concurrent
5 import Control.Concurrent.Async
6 import Control.Exception
7 import Control.Lens
8 import Control.Monad
9 import Control.Monad.Except
10 import Data.Aeson (ToJSON)
11 import Data.Monoid
12 import Data.Kind (Type)
13 import Prelude
14 import Servant.API
15
16 import Gargantext.Utils.Jobs.Map
17 import Gargantext.Utils.Jobs.Monad
18
19 import qualified Data.Text as T
20 import qualified Servant.Client as C
21 import qualified Servant.Job.Async as SJ
22 import qualified Servant.Job.Client as SJ
23 import qualified Servant.Job.Types as SJ
24
25 serveJobsAPI
26 :: ( Ord t, Exception e, MonadError e m
27 , MonadJob m t (Dual [event]) output
28 , ToJSON e, ToJSON event, ToJSON output
29 , Foldable callback
30 )
31 => m env
32 -> t
33 -> (JobError -> e)
34 -> (env -> input -> Logger event -> IO (Either e output))
35 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
36 serveJobsAPI getenv t joberr f
37 = newJob getenv t f (SJ.JobInput undefined Nothing)
38 :<|> newJob getenv t f
39 :<|> serveJobAPI t joberr
40
41 serveJobAPI
42 :: forall (m :: Type -> Type) e t event output.
43 (Ord t, MonadError e m, MonadJob m t (Dual [event]) output)
44 => t
45 -> (JobError -> e)
46 -> SJ.JobID 'SJ.Unsafe
47 -> SJ.AsyncJobServerT event output m
48 serveJobAPI t joberr jid' = wrap' (killJob t)
49 :<|> wrap' pollJob
50 :<|> wrap (waitJob joberr)
51
52 where wrap
53 :: forall a.
54 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output -> m a)
55 -> m a
56 wrap g = do
57 jid <- handleIDError joberr (checkJID jid')
58 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
59 g jid job
60
61 wrap' g limit offset = wrap (g limit offset)
62
63 newJob
64 :: ( Ord t, Exception e, MonadJob m t (Dual [event]) output
65 , ToJSON e, ToJSON event, ToJSON output
66 , Foldable callbacks
67 )
68 => m env
69 -> t
70 -> (env -> input -> Logger event -> IO (Either e output))
71 -> SJ.JobInput callbacks input
72 -> m (SJ.JobStatus 'SJ.Safe event)
73 newJob getenv jobkind f input = do
74 je <- getJobEnv
75 env <- getenv
76 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
77 C.runClientM (SJ.clientMCallback m)
78 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
79
80 pushLog logF e = do
81 postCallback (SJ.mkChanEvent e)
82 logF e
83
84 f' inp logF = do
85 r <- f env inp (pushLog logF . Dual . (:[]))
86 case r of
87 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
88 Right a -> postCallback (SJ.mkChanResult a) >> return a
89
90 jid <- queueJob jobkind (input ^. SJ.job_input) f'
91 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
92
93 pollJob
94 :: MonadJob m t (Dual [event]) output
95 => Maybe SJ.Limit
96 -> Maybe SJ.Offset
97 -> SJ.JobID 'SJ.Safe
98 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
99 -> m (SJ.JobStatus 'SJ.Safe event)
100 pollJob limit offset jid je = do
101 (Dual logs, status, merr) <- case jTask je of
102 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
103 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
104 <*> pure SJ.IsRunning
105 <*> pure Nothing
106 DoneJ ls r ->
107 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
108 me = either (Just . T.pack . show) (const Nothing) r
109 in pure (ls, st, me)
110 pure $ SJ.jobStatus jid limit offset logs status merr
111
112 waitJob
113 :: (MonadError e m, MonadJob m t (Dual [event]) output)
114 => (JobError -> e)
115 -> SJ.JobID 'SJ.Safe
116 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
117 -> m (SJ.JobOutput output)
118 waitJob joberr jid je = do
119 r <- case jTask je of
120 QueuedJ _qj -> do
121 m <- getJobsMap
122 erj <- waitTilRunning
123 case erj of
124 Left res -> return res
125 Right rj -> do
126 (res, _logs) <- liftIO (waitJobDone jid rj m)
127 return res
128 RunningJ rj -> do
129 m <- getJobsMap
130 (res, _logs) <- liftIO (waitJobDone jid rj m)
131 return res
132 DoneJ _ls res -> return res
133 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
134
135 where waitTilRunning =
136 findJob jid >>= \mjob -> case mjob of
137 Nothing -> error "impossible"
138 Just je' -> case jTask je' of
139 QueuedJ _qj -> do
140 liftIO $ threadDelay 50000 -- wait 50ms
141 waitTilRunning
142 RunningJ rj -> return (Right rj)
143 DoneJ _ls res -> return (Left res)
144
145 killJob
146 :: (Ord t, MonadJob m t (Dual [event]) output)
147 => t
148 -> Maybe SJ.Limit
149 -> Maybe SJ.Offset
150 -> SJ.JobID 'SJ.Safe
151 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
152 -> m (SJ.JobStatus 'SJ.Safe event)
153 killJob t limit offset jid je = do
154 (Dual logs, status, merr) <- case jTask je of
155 QueuedJ _ -> do
156 removeJob True t jid
157 return (mempty, SJ.IsKilled, Nothing)
158 RunningJ rj -> do
159 liftIO $ cancel (rjAsync rj)
160 lgs <- liftIO (rjGetLog rj)
161 removeJob False t jid
162 return (lgs, SJ.IsKilled, Nothing)
163 DoneJ lgs r -> do
164 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
165 me = either (Just . T.pack . show) (const Nothing) r
166 removeJob False t jid
167 pure (lgs, st, me)
168 pure $ SJ.jobStatus jid limit offset logs status merr