]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/API.hs
Merge remote-tracking branch 'origin/flexible-job-queue' into dev
[gargantext.git] / src / Gargantext / Utils / Jobs / API.hs
1 {-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
2 module Gargantext.Utils.Jobs.API where
3
4 import Control.Concurrent
5 import Control.Concurrent.Async
6 import Control.Exception
7 import Control.Lens
8 import Control.Monad
9 import Control.Monad.Except
10 import Data.Aeson (ToJSON)
11 import Data.Monoid
12 import Prelude
13 import Servant.API
14
15 import Gargantext.Utils.Jobs.Map
16 import Gargantext.Utils.Jobs.Monad
17
18 import qualified Data.Text as T
19 import qualified Servant.Client as C
20 import qualified Servant.Job.Async as SJ
21 import qualified Servant.Job.Client as SJ
22 import qualified Servant.Job.Types as SJ
23
24 serveJobsAPI
25 :: ( Ord t, Exception e, MonadError e m
26 , MonadJob m t (Dual [event]) output
27 , ToJSON e, ToJSON event, ToJSON output
28 , Foldable callback
29 )
30 => m env
31 -> t
32 -> (JobError -> e)
33 -> (env -> input -> Logger event -> IO (Either e output))
34 -> SJ.AsyncJobsServerT' ctI ctO callback event input output m
35 serveJobsAPI getenv t joberr f
36 = newJob getenv t f (SJ.JobInput undefined Nothing)
37 :<|> newJob getenv t f
38 :<|> serveJobAPI t joberr
39
40 serveJobAPI
41 :: forall (m :: * -> *) e t event output.
42 (Ord t, MonadError e m, MonadJob m t (Dual [event]) output)
43 => t
44 -> (JobError -> e)
45 -> SJ.JobID 'SJ.Unsafe
46 -> SJ.AsyncJobServerT event output m
47 serveJobAPI t joberr jid' = wrap' (killJob t)
48 :<|> wrap' pollJob
49 :<|> wrap (waitJob joberr)
50
51 where wrap
52 :: forall a.
53 (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output -> m a)
54 -> m a
55 wrap g = do
56 jid <- handleIDError joberr (checkJID jid')
57 job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
58 g jid job
59
60 wrap' g limit offset = wrap (g limit offset)
61
62 newJob
63 :: ( Ord t, Exception e, MonadJob m t (Dual [event]) output
64 , ToJSON e, ToJSON event, ToJSON output
65 , Foldable callbacks
66 )
67 => m env
68 -> t
69 -> (env -> input -> Logger event -> IO (Either e output))
70 -> SJ.JobInput callbacks input
71 -> m (SJ.JobStatus 'SJ.Safe event)
72 newJob getenv jobkind f input = do
73 je <- getJobEnv
74 env <- getenv
75 let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
76 C.runClientM (SJ.clientMCallback m)
77 (C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
78
79 pushLog logF e = do
80 postCallback (SJ.mkChanEvent e)
81 logF e
82
83 f' inp logF = do
84 r <- f env inp (pushLog logF . Dual . (:[]))
85 case r of
86 Left e -> postCallback (SJ.mkChanError e) >> throwIO e
87 Right a -> postCallback (SJ.mkChanResult a) >> return a
88
89 jid <- queueJob jobkind (input ^. SJ.job_input) f'
90 return (SJ.JobStatus jid [] SJ.IsPending Nothing)
91
92 pollJob
93 :: MonadJob m t (Dual [event]) output
94 => Maybe SJ.Limit
95 -> Maybe SJ.Offset
96 -> SJ.JobID 'SJ.Safe
97 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
98 -> m (SJ.JobStatus 'SJ.Safe event)
99 pollJob limit offset jid je = do
100 (Dual logs, status, merr) <- case jTask je of
101 QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
102 RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
103 <*> pure SJ.IsRunning
104 <*> pure Nothing
105 DoneJ ls r ->
106 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
107 me = either (Just . T.pack . show) (const Nothing) r
108 in pure (ls, st, me)
109 pure $ SJ.jobStatus jid limit offset logs status merr
110
111 waitJob
112 :: (MonadError e m, MonadJob m t (Dual [event]) output)
113 => (JobError -> e)
114 -> SJ.JobID 'SJ.Safe
115 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
116 -> m (SJ.JobOutput output)
117 waitJob joberr jid je = do
118 r <- case jTask je of
119 QueuedJ _qj -> do
120 m <- getJobsMap
121 erj <- waitTilRunning
122 case erj of
123 Left res -> return res
124 Right rj -> do
125 (res, _logs) <- liftIO (waitJobDone jid rj m)
126 return res
127 RunningJ rj -> do
128 m <- getJobsMap
129 (res, _logs) <- liftIO (waitJobDone jid rj m)
130 return res
131 DoneJ _ls res -> return res
132 either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
133
134 where waitTilRunning =
135 findJob jid >>= \mjob -> case mjob of
136 Nothing -> error "impossible"
137 Just je' -> case jTask je' of
138 QueuedJ _qj -> do
139 liftIO $ threadDelay 50000 -- wait 50ms
140 waitTilRunning
141 RunningJ rj -> return (Right rj)
142 DoneJ _ls res -> return (Left res)
143
144 killJob
145 :: (Ord t, MonadJob m t (Dual [event]) output)
146 => t
147 -> Maybe SJ.Limit
148 -> Maybe SJ.Offset
149 -> SJ.JobID 'SJ.Safe
150 -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
151 -> m (SJ.JobStatus 'SJ.Safe event)
152 killJob t limit offset jid je = do
153 (Dual logs, status, merr) <- case jTask je of
154 QueuedJ _ -> do
155 removeJob True t jid
156 return (mempty, SJ.IsKilled, Nothing)
157 RunningJ rj -> do
158 liftIO $ cancel (rjAsync rj)
159 lgs <- liftIO (rjGetLog rj)
160 removeJob False t jid
161 return (lgs, SJ.IsKilled, Nothing)
162 DoneJ lgs r -> do
163 let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
164 me = either (Just . T.pack . show) (const Nothing) r
165 removeJob False t jid
166 pure (lgs, st, me)
167 pure $ SJ.jobStatus jid limit offset logs status merr