]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/API/Node/Corpus/New.hs
introduce and use a flexible job queue system
[gargantext.git] / src / Gargantext / API / Node / Corpus / New.hs
1 {-|
2 Module : Gargantext.API.Node.Corpus.New
3 Description : New corpus API
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 New corpus means either:
11 - new corpus
12 - new data in existing corpus
13 -}
14
15 {-# LANGUAGE TemplateHaskell #-}
16 {-# LANGUAGE TypeOperators #-}
17
18 module Gargantext.API.Node.Corpus.New
19 where
20
21 import Conduit
22 import Control.Lens hiding (elements, Empty)
23 import Data.Aeson
24 import Data.Aeson.TH (deriveJSON)
25 import qualified Data.ByteString.Base64 as BSB64
26 import Data.Conduit.Internal (zipSources)
27 import Data.Either
28 import Data.Maybe (fromMaybe)
29 import Data.Swagger
30 import Data.Text (Text)
31 import qualified Data.Text as T
32 import GHC.Generics (Generic)
33 import Servant
34 import Servant.Job.Utils (jsonOptions)
35 -- import Servant.Multipart
36 import qualified Data.Text.Encoding as TE
37 -- import Test.QuickCheck (elements)
38 import Test.QuickCheck.Arbitrary
39
40 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs, ScraperEvent(..), scst_events)
41 import Gargantext.API.Admin.Types (HasSettings)
42 import Gargantext.API.Job (addEvent, jobLogSuccess, jobLogFailTotal)
43 import Gargantext.API.Node.Corpus.New.Types
44 import Gargantext.API.Node.Corpus.Searx
45 import Gargantext.API.Node.Corpus.Types
46 import Gargantext.API.Node.Types
47 import Gargantext.Core (Lang(..){-, allLangs-})
48 import Gargantext.Core.Text.List.Social (FlowSocialListWith(..))
49 import Gargantext.Core.Types.Individu (User(..))
50 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
51 import Gargantext.Database.Action.Flow (flowCorpus, getDataText, flowDataText, TermType(..){-, allDataOrigins-})
52 import Gargantext.Database.Action.Flow.Types (FlowCmdM)
53 import Gargantext.Database.Action.Mail (sendMail)
54 import Gargantext.Database.Action.Node (mkNodeWithParent)
55 import Gargantext.Database.Action.User (getUserId)
56 import Gargantext.Database.Admin.Types.Hyperdata
57 import Gargantext.Database.Admin.Types.Node (CorpusId, NodeType(..), UserId)
58 import Gargantext.Database.Prelude (hasConfig)
59 import Gargantext.Database.Query.Table.Node (getNodeWith)
60 import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
61 import Gargantext.Database.Schema.Node (node_hyperdata)
62 import Gargantext.Prelude
63 import Gargantext.Prelude.Config (gc_max_docs_parsers)
64 import qualified Gargantext.Core.Text.Corpus.API as API
65 import qualified Gargantext.Core.Text.Corpus.Parsers as Parser (FileType(..), parseFormatC)
66 import qualified Gargantext.Database.GargDB as GargDB
67 ------------------------------------------------------------------------
68 {-
69 data Query = Query { query_query :: Text
70 , query_node_id :: Int
71 , query_lang :: Lang
72 , query_databases :: [DataOrigin]
73 }
74 deriving (Eq, Generic)
75
76 deriveJSON (unPrefix "query_") 'Query
77
78 instance Arbitrary Query where
79 arbitrary = elements [ Query q n la fs
80 | q <- ["honeybee* AND collapse"
81 ,"covid 19"
82 ]
83 , n <- [0..10]
84 , la <- allLangs
85 , fs <- take 3 $ repeat allDataOrigins
86 ]
87
88 instance ToSchema Query where
89 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "query_")
90 -}
91
92 ------------------------------------------------------------------------
93
94 {-
95 type Api = PostApi
96 :<|> GetApi
97
98 type PostApi = Summary "New Corpus endpoint"
99 :> ReqBody '[JSON] Query
100 :> Post '[JSON] CorpusId
101 type GetApi = Get '[JSON] ApiInfo
102 -}
103
104 -- | TODO manage several apis
105 -- TODO-ACCESS
106 -- TODO this is only the POST
107 {-
108 api :: (FlowCmdM env err m) => UserId -> Query -> m CorpusId
109 api uid (Query q _ as) = do
110 cId <- case head as of
111 Nothing -> flowCorpusSearchInDatabase (UserDBId uid) EN q
112 Just API.All -> flowCorpusSearchInDatabase (UserDBId uid) EN q
113 Just a -> do
114 docs <- liftBase $ API.get a q (Just 1000)
115 cId' <- flowCorpus (UserDBId uid) (Left q) (Multi EN) [docs]
116 pure cId'
117
118 pure cId
119 -}
120
121 ------------------------------------------------
122 -- TODO use this route for Client implementation
123 data ApiInfo = ApiInfo { api_info :: [API.ExternalAPIs]}
124 deriving (Generic)
125 instance Arbitrary ApiInfo where
126 arbitrary = ApiInfo <$> arbitrary
127
128 deriveJSON (unPrefix "") 'ApiInfo
129
130 instance ToSchema ApiInfo
131
132 info :: FlowCmdM env err m => UserId -> m ApiInfo
133 info _u = pure $ ApiInfo API.externalAPIs
134
135 ------------------------------------------------------------------------
136 ------------------------------------------------------------------------
137 data WithQuery = WithQuery
138 { _wq_query :: !Text
139 , _wq_databases :: !Database
140 , _wq_datafield :: !(Maybe Datafield)
141 , _wq_lang :: !Lang
142 , _wq_node_id :: !Int
143 , _wq_flowListWith :: !FlowSocialListWith
144 }
145 deriving Generic
146
147 makeLenses ''WithQuery
148 instance FromJSON WithQuery where
149 parseJSON = genericParseJSON $ jsonOptions "_wq_"
150 instance ToJSON WithQuery where
151 toJSON = genericToJSON $ jsonOptions "_wq_"
152 instance ToSchema WithQuery where
153 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_wq_")
154
155 ------------------------------------------------------------------------
156
157 type AddWithQuery = Summary "Add with Query to corpus endpoint"
158 :> "corpus"
159 :> Capture "corpus_id" CorpusId
160 :> "query"
161 :> AsyncJobs JobLog '[JSON] WithQuery JobLog
162
163 {-
164 type AddWithFile = Summary "Add with MultipartData to corpus endpoint"
165 :> "corpus"
166 :> Capture "corpus_id" CorpusId
167 :> "add"
168 :> "file"
169 :> MultipartForm Mem (MultipartData Mem)
170 :> QueryParam "fileType" FileType
171 :> "async"
172 :> AsyncJobs JobLog '[JSON] () JobLog
173 -}
174
175
176 ------------------------------------------------------------------------
177 -- TODO WithQuery also has a corpus id
178
179
180 addToCorpusWithQuery :: FlowCmdM env err m
181 => User
182 -> CorpusId
183 -> WithQuery
184 -> Maybe Integer
185 -> (JobLog -> m ())
186 -> m JobLog
187 addToCorpusWithQuery user cid (WithQuery { _wq_query = q
188 , _wq_databases = dbs
189 , _wq_datafield = datafield
190 , _wq_lang = l
191 , _wq_flowListWith = flw }) maybeLimit logStatus = do
192 -- TODO ...
193 logStatus JobLog { _scst_succeeded = Just 0
194 , _scst_failed = Just 0
195 , _scst_remaining = Just 3
196 , _scst_events = Just []
197 }
198 printDebug "[addToCorpusWithQuery] (cid, dbs)" (cid, dbs)
199 printDebug "[addToCorpusWithQuery] datafield" datafield
200 printDebug "[addToCorpusWithQuery] flowListWith" flw
201
202 case datafield of
203 Just Web -> do
204 printDebug "[addToCorpusWithQuery] processing web request" datafield
205
206 _ <- triggerSearxSearch user cid q l logStatus
207
208 pure JobLog { _scst_succeeded = Just 3
209 , _scst_failed = Just 0
210 , _scst_remaining = Just 0
211 , _scst_events = Just []
212 }
213
214 _ -> do
215 -- TODO add cid
216 -- TODO if cid is folder -> create Corpus
217 -- if cid is corpus -> add to corpus
218 -- if cid is root -> create corpus in Private
219 printDebug "[G.A.N.C.New] getDataText with query" q
220 eTxts <- mapM (\db -> getDataText db (Multi l) q maybeLimit) [database2origin dbs]
221
222 let lTxts = lefts eTxts
223 printDebug "[G.A.N.C.New] lTxts" lTxts
224 case lTxts of
225 [] -> do
226 let txts = rights eTxts
227 -- TODO Sum lenghts of each txt elements
228 logStatus $ JobLog { _scst_succeeded = Just 2
229 , _scst_failed = Just 0
230 , _scst_remaining = Just $ 1 + length txts
231 , _scst_events = Just []
232 }
233
234 cids <- mapM (\txt -> do
235 flowDataText user txt (Multi l) cid Nothing logStatus) txts
236 printDebug "corpus id" cids
237 printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
238 sendMail user
239 -- TODO ...
240 pure JobLog { _scst_succeeded = Just 3
241 , _scst_failed = Just 0
242 , _scst_remaining = Just 0
243 , _scst_events = Just []
244 }
245
246 (err:_) -> do
247 printDebug "Error: " err
248 let jl = addEvent "ERROR" (T.pack $ show err) $
249 JobLog { _scst_succeeded = Just 2
250 , _scst_failed = Just 1
251 , _scst_remaining = Just 0
252 , _scst_events = Just []
253 }
254 logStatus jl
255 pure jl
256
257
258 type AddWithForm = Summary "Add with FormUrlEncoded to corpus endpoint"
259 :> "corpus"
260 :> Capture "corpus_id" CorpusId
261 :> "add"
262 :> "form"
263 :> "async"
264 :> AsyncJobs JobLog '[FormUrlEncoded] NewWithForm JobLog
265
266 addToCorpusWithForm :: (FlowCmdM env err m)
267 => User
268 -> CorpusId
269 -> NewWithForm
270 -> (JobLog -> m ())
271 -> JobLog
272 -> m JobLog
273 addToCorpusWithForm user cid (NewWithForm ft ff d l _n) logStatus jobLog = do
274 printDebug "[addToCorpusWithForm] Parsing corpus: " cid
275 printDebug "[addToCorpusWithForm] fileType" ft
276 printDebug "[addToCorpusWithForm] fileFormat" ff
277 logStatus jobLog
278 limit' <- view $ hasConfig . gc_max_docs_parsers
279 let limit = fromIntegral limit' :: Integer
280 let
281 parseC = case ft of
282 CSV_HAL -> Parser.parseFormatC Parser.CsvHal
283 CSV -> Parser.parseFormatC Parser.CsvGargV3
284 WOS -> Parser.parseFormatC Parser.WOS
285 PresseRIS -> Parser.parseFormatC Parser.RisPresse
286
287 -- TODO granularity of the logStatus
288 let data' = case ff of
289 Plain -> cs d
290 ZIP -> case BSB64.decode $ TE.encodeUtf8 d of
291 Left err -> panic $ T.pack "[addToCorpusWithForm] error decoding base64: " <> T.pack err
292 Right decoded -> decoded
293 eDocsC <- liftBase $ parseC ff data'
294 case eDocsC of
295 Right (mCount, docsC) -> do
296 -- TODO Add progress (jobStatus) update for docs - this is a
297 -- long action
298
299 let docsC' = zipSources (yieldMany [1..]) docsC
300 .| mapMC (\(idx, doc) ->
301 if idx > limit then do
302 --printDebug "[addToCorpusWithForm] number of docs exceeds the limit" (show limit)
303 let panicMsg' = [ "[addToCorpusWithForm] number of docs "
304 , "exceeds the MAX_DOCS_PARSERS limit ("
305 , show limit
306 , ")" ]
307 let panicMsg = T.concat $ T.pack <$> panicMsg'
308 --logStatus $ jobLogFailTotalWithMessage panicMsg jobLog
309 panic panicMsg
310 else
311 pure doc)
312 .| mapC toHyperdataDocument
313
314 --printDebug "Parsing corpus finished : " cid
315 --logStatus jobLog2
316
317 --printDebug "Starting extraction : " cid
318 -- TODO granularity of the logStatus
319 printDebug "flowCorpus with (corpus_id, lang)" (cid, l)
320
321 _cid' <- flowCorpus user
322 (Right [cid])
323 (Multi $ fromMaybe EN l)
324 Nothing
325 --(Just $ fromIntegral $ length docs, docsC')
326 (mCount, transPipe liftBase docsC') -- TODO fix number of docs
327 --(map (map toHyperdataDocument) docs)
328 logStatus
329
330 printDebug "Extraction finished : " cid
331 printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
332 -- TODO uncomment this
333 --sendMail user
334
335 logStatus jobLog3
336 pure jobLog3
337 Left e -> do
338 printDebug "[addToCorpusWithForm] parse error" e
339
340 let evt = ScraperEvent { _scev_message = Just $ T.pack e
341 , _scev_level = Just "ERROR"
342 , _scev_date = Nothing }
343
344 logStatus $ over (scst_events . _Just) (\evt' -> evt' <> [evt]) jobLogE
345 pure jobLogE
346 where
347 jobLog2 = jobLogSuccess jobLog
348 jobLog3 = jobLogSuccess jobLog2
349 jobLogE = jobLogFailTotal jobLog
350
351 {-
352 addToCorpusWithFile :: FlowCmdM env err m
353 => CorpusId
354 -> MultipartData Mem
355 -> Maybe FileType
356 -> (JobLog -> m ())
357 -> m JobLog
358 addToCorpusWithFile cid input filetype logStatus = do
359 logStatus JobLog { _scst_succeeded = Just 10
360 , _scst_failed = Just 2
361 , _scst_remaining = Just 138
362 , _scst_events = Just []
363 }
364 printDebug "addToCorpusWithFile" cid
365 _h <- postUpload cid filetype input
366
367 pure JobLog { _scst_succeeded = Just 137
368 , _scst_failed = Just 13
369 , _scst_remaining = Just 0
370 , _scst_events = Just []
371 }
372 -}
373
374
375
376 type AddWithFile = Summary "Add with FileUrlEncoded to corpus endpoint"
377 :> "corpus"
378 :> Capture "corpus_id" CorpusId
379 :> "add"
380 :> "file"
381 :> "async"
382 :> AsyncJobs JobLog '[FormUrlEncoded] NewWithFile JobLog
383
384 addToCorpusWithFile :: (HasSettings env, FlowCmdM env err m)
385 => User
386 -> CorpusId
387 -> NewWithFile
388 -> (JobLog -> m ())
389 -> m JobLog
390 addToCorpusWithFile user cid nwf@(NewWithFile _d _l fName) logStatus = do
391
392 printDebug "[addToCorpusWithFile] Uploading file to corpus: " cid
393 logStatus JobLog { _scst_succeeded = Just 0
394 , _scst_failed = Just 0
395 , _scst_remaining = Just 1
396 , _scst_events = Just []
397 }
398
399 fPath <- GargDB.writeFile nwf
400 printDebug "[addToCorpusWithFile] File saved as: " fPath
401
402 uId <- getUserId user
403 nIds <- mkNodeWithParent NodeFile (Just cid) uId fName
404
405 _ <- case nIds of
406 [nId] -> do
407 node <- getNodeWith nId (Proxy :: Proxy HyperdataFile)
408 let hl = node ^. node_hyperdata
409 _ <- updateHyperdata nId $ hl { _hff_name = fName
410 , _hff_path = T.pack fPath }
411
412 printDebug "[addToCorpusWithFile] Created node with id: " nId
413 _ -> pure ()
414
415 printDebug "[addToCorpusWithFile] File upload to corpus finished: " cid
416
417 printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
418 sendMail user
419
420 pure $ JobLog { _scst_succeeded = Just 1
421 , _scst_failed = Just 0
422 , _scst_remaining = Just 0
423 , _scst_events = Just []
424 }
425