]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Database/Action/Flow.hs
[text-api] first rewrite using Conduit
[gargantext.git] / src / Gargantext / Database / Action / Flow.hs
1 {-|
2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 -- TODO-ACCESS:
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
13
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
16 -}
17
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
19
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
25
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
27 ( DataText(..)
28 , getDataText
29 , flowDataText
30 , flow
31
32 , flowCorpusFile
33 , flowCorpus
34 , flowAnnuaire
35 , insertMasterDocs
36 , saveDocNgramsWith
37
38 , getOrMkRoot
39 , getOrMk_RootWithCorpus
40 , TermType(..)
41 , DataOrigin(..)
42 , allDataOrigins
43
44 , do_api
45 , indexAllDocumentsWithPosTag
46 )
47 where
48
49 import Conduit
50 import Control.Lens ((^.), view, _Just, makeLenses)
51 import Data.Aeson.TH (deriveJSON)
52 import Data.Conduit.Internal (zipSources)
53 import Data.Either
54 import Data.HashMap.Strict (HashMap)
55 import Data.Hashable (Hashable)
56 import Data.List (concat)
57 import Data.Map (Map, lookup)
58 import Data.Maybe (catMaybes)
59 import Data.Monoid
60 import Data.Swagger
61 import qualified Data.Text as T
62 import Data.Traversable (traverse)
63 import Data.Tuple.Extra (first, second)
64 import GHC.Generics (Generic)
65 import System.FilePath (FilePath)
66 import qualified Data.HashMap.Strict as HashMap
67 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
68 import qualified Data.Map as Map
69
70 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
71 import Gargantext.Core (Lang(..), PosTagAlgo(..))
72 import Gargantext.Core.Ext.IMT (toSchoolName)
73 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
74 import Gargantext.Core.Flow.Types
75 import Gargantext.Core.Text
76 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat)
77 import Gargantext.Core.Text.List (buildNgramsLists)
78 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
79 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
80 import Gargantext.Core.Text.Terms
81 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
82 import Gargantext.Core.Types (POS(NP))
83 import Gargantext.Core.Types.Individu (User(..))
84 import Gargantext.Core.Types.Main
85 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
86 import Gargantext.Database.Action.Flow.List
87 import Gargantext.Database.Action.Flow.Types
88 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
89 import Gargantext.Database.Action.Search (searchDocInDatabase)
90 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
91 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
92 import Gargantext.Database.Admin.Types.Hyperdata
93 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
94 import Gargantext.Database.Prelude
95 import Gargantext.Database.Query.Table.ContextNodeNgrams2
96 import Gargantext.Database.Query.Table.Ngrams
97 import Gargantext.Database.Query.Table.Node
98 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
99 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
100 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
101 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
102 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
103 import Gargantext.Database.Types
104 import Gargantext.Prelude
105 import Gargantext.Prelude.Crypto.Hash (Hash)
106 import qualified Gargantext.Core.Text.Corpus.API as API
107 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
108 import qualified Prelude as Prelude
109
110 ------------------------------------------------------------------------
111 -- Imports for upgrade function
112 import Gargantext.Database.Query.Tree.Root (getRootId)
113 import Gargantext.Database.Query.Tree (findNodesId)
114 import qualified Data.List as List
115 ------------------------------------------------------------------------
116 -- TODO use internal with API name (could be old data)
117 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
118 | ExternalOrigin { _do_api :: API.ExternalAPIs }
119 -- TODO Web
120 deriving (Generic, Eq)
121
122 makeLenses ''DataOrigin
123 deriveJSON (unPrefix "_do_") ''DataOrigin
124 instance ToSchema DataOrigin where
125 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
126
127 allDataOrigins :: [DataOrigin]
128 allDataOrigins = map InternalOrigin API.externalAPIs
129 <> map ExternalOrigin API.externalAPIs
130
131 ---------------
132 data DataText = DataOld ![NodeId]
133 | DataNew !(ConduitT () HyperdataDocument IO ())
134 -- | DataNew ![[HyperdataDocument]]
135
136 -- TODO use the split parameter in config file
137 getDataText :: FlowCmdM env err m
138 => DataOrigin
139 -> TermType Lang
140 -> API.Query
141 -> Maybe API.Limit
142 -> m DataText
143 getDataText (ExternalOrigin api) la q li = liftBase $ do
144 docsC <- API.get api (_tt_lang la) q li
145 pure $ DataNew docsC
146
147 getDataText (InternalOrigin _) _la q _li = do
148 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
149 (UserName userMaster)
150 (Left "")
151 (Nothing :: Maybe HyperdataCorpus)
152 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
153 pure $ DataOld ids
154
155 -------------------------------------------------------------------------------
156 flowDataText :: ( FlowCmdM env err m
157 )
158 => User
159 -> DataText
160 -> TermType Lang
161 -> CorpusId
162 -> Maybe FlowSocialListWith
163 -> (JobLog -> m ())
164 -> m CorpusId
165 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
166 where
167 corpusType = (Nothing :: Maybe HyperdataCorpus)
168 flowDataText u (DataNew txtC) tt cid mfslw logStatus = flowCorpus u (Right [cid]) tt mfslw txtC logStatus
169
170 ------------------------------------------------------------------------
171 -- TODO use proxy
172 flowAnnuaire :: (FlowCmdM env err m)
173 => User
174 -> Either CorpusName [CorpusId]
175 -> (TermType Lang)
176 -> FilePath
177 -> (JobLog -> m ())
178 -> m AnnuaireId
179 flowAnnuaire u n l filePath logStatus = do
180 -- TODO Conduit for file
181 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
182 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (yieldMany docs) logStatus
183
184 ------------------------------------------------------------------------
185 flowCorpusFile :: (FlowCmdM env err m)
186 => User
187 -> Either CorpusName [CorpusId]
188 -> Limit -- Limit the number of docs (for dev purpose)
189 -> TermType Lang -> FileFormat -> FilePath
190 -> Maybe FlowSocialListWith
191 -> (JobLog -> m ())
192 -> m CorpusId
193 flowCorpusFile u n l la ff fp mfslw logStatus = do
194 eParsed <- liftBase $ parseFile ff fp
195 case eParsed of
196 Right parsed -> do
197 flowCorpus u n la mfslw (yieldMany parsed .| mapC toHyperdataDocument) logStatus
198 --let docs = splitEvery 500 $ take l parsed
199 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
200 Left e -> panic $ "Error: " <> (T.pack e)
201
202 ------------------------------------------------------------------------
203 -- | TODO improve the needed type to create/update a corpus
204 -- (For now, Either is enough)
205 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
206 => User
207 -> Either CorpusName [CorpusId]
208 -> TermType Lang
209 -> Maybe FlowSocialListWith
210 -> ConduitT () a IO ()
211 -> (JobLog -> m ())
212 -> m CorpusId
213 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
214
215
216 flow :: ( FlowCmdM env err m
217 , FlowCorpus a
218 , MkCorpus c
219 )
220 => Maybe c
221 -> User
222 -> Either CorpusName [CorpusId]
223 -> TermType Lang
224 -> Maybe FlowSocialListWith
225 -> ConduitT () a IO ()
226 -> (JobLog -> m ())
227 -> m CorpusId
228 flow c u cn la mfslw docsC logStatus = do
229 -- TODO if public insertMasterDocs else insertUserDocs
230 ids <- liftBase $ runConduit $
231 zipSources (yieldMany [1..]) docsC
232 .| mapMC insertDoc
233 .| sinkList
234 -- ids <- traverse (\(idx, doc) -> do
235 -- id <- insertMasterDocs c la doc
236 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
237 -- , _scst_failed = Just 0
238 -- , _scst_remaining = Just $ length docs - idx
239 -- , _scst_events = Just []
240 -- }
241 -- pure id
242 -- ) (zip [1..] docs)
243 flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
244
245 where
246 insertDoc (idx, doc) = do
247 id <- insertMasterDocs c la [doc]
248 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
249 -- , _scst_failed = Just 0
250 -- , _scst_remaining = Just $ length docs - idx
251 -- , _scst_events = Just []
252 -- }
253 pure $ Prelude.head id
254
255
256
257 ------------------------------------------------------------------------
258 flowCorpusUser :: ( FlowCmdM env err m
259 , MkCorpus c
260 )
261 => Lang
262 -> User
263 -> Either CorpusName [CorpusId]
264 -> Maybe c
265 -> [NodeId]
266 -> Maybe FlowSocialListWith
267 -> m CorpusId
268 flowCorpusUser l user corpusName ctype ids mfslw = do
269 -- User Flow
270 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
271 -- NodeTexts is first
272 _tId <- insertDefaultNode NodeTexts userCorpusId userId
273 -- printDebug "NodeTexts: " tId
274
275 -- NodeList is second
276 listId <- getOrMkList userCorpusId userId
277 -- _cooc <- insertDefaultNode NodeListCooc listId userId
278 -- TODO: check if present already, ignore
279 _ <- Doc.add userCorpusId ids
280
281 -- printDebug "Node Text Ids:" tId
282
283 -- User List Flow
284 (masterUserId, _masterRootId, masterCorpusId)
285 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
286
287 --let gp = (GroupParams l 2 3 (StopSize 3))
288 let gp = GroupWithPosTag l CoreNLP HashMap.empty
289 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
290
291 -- printDebug "flowCorpusUser:ngs" ngs
292
293 _userListId <- flowList_DbRepo listId ngs
294 _mastListId <- getOrMkList masterCorpusId masterUserId
295 -- _ <- insertOccsUpdates userCorpusId mastListId
296 -- printDebug "userListId" userListId
297 -- User Graph Flow
298 _ <- insertDefaultNode NodeDashboard userCorpusId userId
299 _ <- insertDefaultNode NodeGraph userCorpusId userId
300 --_ <- mkPhylo userCorpusId userId
301 -- Annuaire Flow
302 -- _ <- mkAnnuaire rootUserId userId
303 _ <- updateNgramsOccurrences userCorpusId (Just listId)
304
305 pure userCorpusId
306
307
308 insertMasterDocs :: ( FlowCmdM env err m
309 , FlowCorpus a
310 , MkCorpus c
311 )
312 => Maybe c
313 -> TermType Lang
314 -> [a]
315 -> m [DocId]
316 insertMasterDocs c lang hs = do
317 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
318 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId masterCorpusId) hs )
319 _ <- Doc.add masterCorpusId ids'
320 -- TODO
321 -- create a corpus with database name (CSV or PubMed)
322 -- add documents to the corpus (create node_node link)
323 -- this will enable global database monitoring
324
325 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
326 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
327 <- mapNodeIdNgrams
328 <$> documentIdWithNgrams
329 (extractNgramsT $ withLang lang documentsWithId)
330 documentsWithId
331
332 lId <- getOrMkList masterCorpusId masterUserId
333 _ <- saveDocNgramsWith lId mapNgramsDocs'
334
335 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
336 pure ids'
337
338 saveDocNgramsWith :: ( FlowCmdM env err m)
339 => ListId
340 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
341 -> m ()
342 saveDocNgramsWith lId mapNgramsDocs' = do
343 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
344 printDebug "terms2id" terms2id
345
346 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
347
348 -- new
349 mapCgramsId <- listInsertDb lId toNodeNgramsW'
350 $ map (first _ngramsTerms . second Map.keys)
351 $ HashMap.toList mapNgramsDocs
352
353 printDebug "saveDocNgramsWith" mapCgramsId
354 -- insertDocNgrams
355 _return <- insertContextNodeNgrams2
356 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
357 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
358 <*> Just (fromIntegral w :: Double)
359 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
360 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
361 , (nId, w) <- Map.toList mapNodeIdWeight
362 ]
363
364 -- to be removed
365 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
366
367 pure ()
368
369
370 ------------------------------------------------------------------------
371 -- TODO Type NodeDocumentUnicised
372 insertDocs :: ( FlowCmdM env err m
373 -- , FlowCorpus a
374 , FlowInsertDB a
375 )
376 => UserId
377 -> CorpusId
378 -> [a]
379 -> m ([ContextId], [Indexed ContextId a])
380 insertDocs uId cId hs = do
381 let docs = map addUniqId hs
382 newIds <- insertDb uId cId docs
383 -- printDebug "newIds" newIds
384 let
385 newIds' = map reId newIds
386 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
387 _ <- Doc.add cId newIds'
388 pure (newIds', documentsWithId)
389
390
391 ------------------------------------------------------------------------
392 viewUniqId' :: UniqId a
393 => a
394 -> (Hash, a)
395 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
396 where
397 err = panic "[ERROR] Database.Flow.toInsert"
398
399
400 toInserted :: [ReturnId]
401 -> Map Hash ReturnId
402 toInserted =
403 Map.fromList . map (\r -> (reUniqId r, r) )
404 . filter (\r -> reInserted r == True)
405
406 mergeData :: Map Hash ReturnId
407 -> Map Hash a
408 -> [Indexed NodeId a]
409 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
410 where
411 toDocumentWithId (sha,hpd) =
412 Indexed <$> fmap reId (lookup sha rs)
413 <*> Just hpd
414
415 ------------------------------------------------------------------------
416 ------------------------------------------------------------------------
417 ------------------------------------------------------------------------
418 documentIdWithNgrams :: HasNodeError err
419 => (a
420 -> Cmd err (HashMap b (Map NgramsType Int)))
421 -> [Indexed NodeId a]
422 -> Cmd err [DocumentIdWithNgrams a b]
423 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
424 where
425 toDocumentIdWithNgrams d = do
426 e <- f $ _unIndex d
427 pure $ DocumentIdWithNgrams d e
428
429
430 -- | TODO check optimization
431 mapNodeIdNgrams :: (Ord b, Hashable b)
432 => [DocumentIdWithNgrams a b]
433 -> HashMap b
434 (Map NgramsType
435 (Map NodeId Int)
436 )
437 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith (+))) . fmap f
438 where
439 f :: DocumentIdWithNgrams a b
440 -> HashMap b (Map NgramsType (Map NodeId Int))
441 f d = fmap (fmap (Map.singleton nId)) $ documentNgrams d
442 where
443 nId = _index $ documentWithId d
444
445
446 ------------------------------------------------------------------------
447 instance ExtractNgramsT HyperdataContact
448 where
449 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
450 where
451 extract :: TermType Lang -> HyperdataContact
452 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
453 extract _l hc' = do
454 let authors = map text2ngrams
455 $ maybe ["Nothing"] (\a -> [a])
456 $ view (hc_who . _Just . cw_lastName) hc'
457
458 pure $ HashMap.fromList $ [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
459
460
461 instance ExtractNgramsT HyperdataDocument
462 where
463 extractNgramsT :: TermType Lang
464 -> HyperdataDocument
465 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
466 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
467 where
468 extractNgramsT' :: TermType Lang
469 -> HyperdataDocument
470 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
471 extractNgramsT' lang' doc = do
472 let source = text2ngrams
473 $ maybe "Nothing" identity
474 $ _hd_source doc
475
476 institutes = map text2ngrams
477 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
478 $ _hd_institutes doc
479
480 authors = map text2ngrams
481 $ maybe ["Nothing"] (T.splitOn ", ")
482 $ _hd_authors doc
483
484 terms' <- map (enrichedTerms (lang' ^. tt_lang) CoreNLP NP)
485 <$> concat
486 <$> liftBase (extractTerms lang' $ hasText doc)
487
488 pure $ HashMap.fromList
489 $ [(SimpleNgrams source, Map.singleton Sources 1) ]
490 <> [(SimpleNgrams i', Map.singleton Institutes 1) | i' <- institutes ]
491 <> [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
492 <> [(EnrichedNgrams t', Map.singleton NgramsTerms 1) | t' <- terms' ]
493
494 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
495 where
496 extractNgramsT l (Node _ _ _ _ _ _ _ h) = extractNgramsT l h
497
498 instance HasText a => HasText (Node a)
499 where
500 hasText (Node _ _ _ _ _ _ _ h) = hasText h
501
502
503
504 -- | TODO putelsewhere
505 -- | Upgrade function
506 -- Suppose all documents are English (this is the case actually)
507 indexAllDocumentsWithPosTag :: FlowCmdM env err m
508 => m ()
509 indexAllDocumentsWithPosTag = do
510 rootId <- getRootId (UserName userMaster)
511 corpusIds <- findNodesId rootId [NodeCorpus]
512 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
513 _ <- mapM extractInsert (splitEvery 1000 docs)
514 pure ()
515
516 extractInsert :: FlowCmdM env err m
517 => [Node HyperdataDocument] -> m ()
518 extractInsert docs = do
519 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
520 mapNgramsDocs' <- mapNodeIdNgrams
521 <$> documentIdWithNgrams
522 (extractNgramsT $ withLang (Multi EN) documentsWithId)
523 documentsWithId
524 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
525 pure ()
526
527