]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Utils/Jobs/State.hs
[FIX] Clean Text before sending it to NLP micro services + tests + clean code for...
[gargantext.git] / src / Gargantext / Utils / Jobs / State.hs
1 module Gargantext.Utils.Jobs.State where
2
3 import Gargantext.Utils.Jobs.Map
4 import Gargantext.Utils.Jobs.Queue
5 import Gargantext.Utils.Jobs.Settings
6
7 import Control.Concurrent.Async
8 import Control.Concurrent.STM
9 import Control.Monad
10 import Data.List
11 import Data.Map.Strict (Map)
12 import Data.Maybe
13 import Data.Ord
14 import Data.Proxy
15 import Data.Time.Clock
16 import Prelude
17
18 import qualified Data.Map.Strict as Map
19 import qualified Servant.Job.Core as SJ
20 import qualified Servant.Job.Types as SJ
21
22 type IDGenerator = TVar Int
23
24 data JobsState t w a = JobsState
25 { jobsData :: JobMap (SJ.JobID 'SJ.Safe) w a
26 , jobsQ :: Queue t (SJ.JobID 'SJ.Safe)
27 , jobsIdGen :: IDGenerator
28 , jsGC :: Async ()
29 , jsRunners :: [Async ()]
30 }
31
32 nextID :: JobSettings -> JobsState t w a -> IO (SJ.JobID 'SJ.Safe)
33 nextID js st = do
34 now <- getCurrentTime
35 n <- atomically $ stateTVar (jobsIdGen st) $ \i -> (i, i+1)
36 return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
37
38 newJobsState
39 :: forall t w a.
40 (EnumBounded t, Monoid w)
41 => JobSettings
42 -> Map t Prio
43 -> IO (JobsState t w a)
44 newJobsState js prios = do
45 jmap <- newJobMap
46 idgen <- newTVarIO 0
47 (q, runners) <- newQueueWithRunners (jsNumRunners js) prios (picker jmap) $ \jid -> do
48 mje <- lookupJob jid jmap
49 case mje of
50 Nothing -> return ()
51 Just je -> case jTask je of
52 QueuedJ qj -> do
53 rj <- runJob jid qj jmap js
54 (_res, _logs) <- waitJobDone jid rj jmap
55 return ()
56 _ -> return ()
57 putStrLn $ "Starting " ++ show (jsNumRunners js) ++ " job runners."
58 gcAsync <- async $ gcThread js jmap
59 runnersAsyncs <- traverse async runners
60 return (JobsState jmap q idgen gcAsync runnersAsyncs)
61
62 where picker
63 :: JobMap (SJ.JobID 'SJ.Safe) w a
64 -> Picker (SJ.JobID 'SJ.Safe)
65 picker (JobMap jmap) xs = do
66 jinfos <- fmap catMaybes . forM xs $ \(jid, popjid) -> do
67 mje <- Map.lookup jid <$> readTVar jmap
68 case mje of
69 Nothing -> return Nothing
70 Just je -> return $ Just (jid, popjid, jRegistered je)
71 let (jid, popjid, _) = minimumBy (comparing _3) jinfos
72 return (jid, popjid)
73
74 _3 (_, _, c) = c
75 pushJob
76 :: Ord t
77 => t
78 -> a
79 -> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
80 -> JobSettings
81 -> JobsState t w r
82 -> IO (SJ.JobID 'SJ.Safe)
83 pushJob jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
84 jid <- nextID js st
85 _je <- addJobEntry jid input f jmap
86 addQueue jobkind jid jqueue
87 return jid