]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Database/Action/Flow.hs
Merge remote-tracking branch 'origin/131-dev-ngrams-table-db-connection-2' into dev...
[gargantext.git] / src / Gargantext / Database / Action / Flow.hs
1 {-|
2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 -- TODO-ACCESS:
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
13
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
16 -}
17
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
19
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
25
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
27 ( DataText(..)
28 , getDataText
29 , getDataText_Debug
30 , flowDataText
31 , flow
32
33 , flowCorpusFile
34 , flowCorpus
35 , flowAnnuaire
36 , insertMasterDocs
37 , saveDocNgramsWith
38
39 , getOrMkRoot
40 , getOrMk_RootWithCorpus
41 , TermType(..)
42 , DataOrigin(..)
43 , allDataOrigins
44
45 , do_api
46 , indexAllDocumentsWithPosTag
47 )
48 where
49
50 import Conduit
51 import Control.Lens ((^.), view, _Just, makeLenses)
52 import Data.Aeson.TH (deriveJSON)
53 import Data.Conduit.Internal (zipSources)
54 import Data.Either
55 import Data.HashMap.Strict (HashMap)
56 import Data.Hashable (Hashable)
57 import Data.List (concat)
58 import Data.Map (Map, lookup)
59 import Data.Maybe (catMaybes)
60 import Data.Monoid
61 import Data.Swagger
62 import qualified Data.Text as T
63 import Data.Traversable (traverse)
64 import Data.Tuple.Extra (first, second)
65 import GHC.Generics (Generic)
66 import Servant.Client (ClientError)
67 import System.FilePath (FilePath)
68 import qualified Data.HashMap.Strict as HashMap
69 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
70 import qualified Data.Map as Map
71 import qualified Data.Conduit.List as CL
72 import qualified Data.Conduit as C
73
74 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
75 import Gargantext.Core (Lang(..), PosTagAlgo(..))
76 import Gargantext.Core.Ext.IMT (toSchoolName)
77 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
78 import Gargantext.Core.Flow.Types
79 import Gargantext.Core.Text
80 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
81 import Gargantext.Core.Text.List (buildNgramsLists)
82 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
83 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
84 import Gargantext.Core.Text.Terms
85 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
86 import Gargantext.Core.Types (POS(NP))
87 import Gargantext.Core.Types.Individu (User(..))
88 import Gargantext.Core.Types.Main
89 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
90 import Gargantext.Database.Action.Flow.List
91 import Gargantext.Database.Action.Flow.Types
92 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
93 import Gargantext.Database.Action.Search (searchDocInDatabase)
94 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
95 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
96 import Gargantext.Database.Admin.Types.Hyperdata
97 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
98 import Gargantext.Database.Prelude
99 import Gargantext.Database.Query.Table.ContextNodeNgrams2
100 import Gargantext.Database.Query.Table.Ngrams
101 import Gargantext.Database.Query.Table.Node
102 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
103 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
104 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
105 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
106 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
107 import Gargantext.Database.Types
108 import Gargantext.Prelude
109 import Gargantext.Prelude.Crypto.Hash (Hash)
110 import qualified Gargantext.Core.Text.Corpus.API as API
111 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
112 import qualified Prelude
113
114 ------------------------------------------------------------------------
115 -- Imports for upgrade function
116 import Gargantext.Database.Query.Tree.Root (getRootId)
117 import Gargantext.Database.Query.Tree (findNodesId)
118 import qualified Data.List as List
119 ------------------------------------------------------------------------
120 -- TODO use internal with API name (could be old data)
121 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
122 | ExternalOrigin { _do_api :: API.ExternalAPIs }
123 -- TODO Web
124 deriving (Generic, Eq)
125
126 makeLenses ''DataOrigin
127 deriveJSON (unPrefix "_do_") ''DataOrigin
128 instance ToSchema DataOrigin where
129 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
130
131 allDataOrigins :: [DataOrigin]
132 allDataOrigins = map InternalOrigin API.externalAPIs
133 <> map ExternalOrigin API.externalAPIs
134
135 ---------------
136 data DataText = DataOld ![NodeId]
137 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
138 --- | DataNew ![[HyperdataDocument]]
139
140 -- Show instance is not possible because of IO
141 printDataText :: DataText -> IO ()
142 printDataText (DataOld xs) = putStrLn $ show xs
143 printDataText (DataNew (maybeInt, conduitData)) = do
144 res <- C.runConduit (conduitData .| CL.consume)
145 putStrLn $ show (maybeInt, res)
146
147 -- TODO use the split parameter in config file
148 getDataText :: FlowCmdM env err m
149 => DataOrigin
150 -> TermType Lang
151 -> API.Query
152 -> Maybe API.Limit
153 -> m (Either ClientError DataText)
154 getDataText (ExternalOrigin api) la q li = liftBase $ do
155 eRes <- API.get api (_tt_lang la) q li
156 pure $ DataNew <$> eRes
157
158 getDataText (InternalOrigin _) _la q _li = do
159 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
160 (UserName userMaster)
161 (Left "")
162 (Nothing :: Maybe HyperdataCorpus)
163 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
164 pure $ Right $ DataOld ids
165
166 getDataText_Debug :: FlowCmdM env err m
167 => DataOrigin
168 -> TermType Lang
169 -> API.Query
170 -> Maybe API.Limit
171 -> m ()
172 getDataText_Debug a l q li = do
173 result <- getDataText a l q li
174 case result of
175 Left err -> liftBase $ putStrLn $ show err
176 Right res -> liftBase $ printDataText res
177
178
179 -------------------------------------------------------------------------------
180 flowDataText :: forall env err m.
181 ( FlowCmdM env err m
182 )
183 => User
184 -> DataText
185 -> TermType Lang
186 -> CorpusId
187 -> Maybe FlowSocialListWith
188 -> (JobLog -> m ())
189 -> m CorpusId
190 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
191 where
192 corpusType = (Nothing :: Maybe HyperdataCorpus)
193 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
194 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
195
196 ------------------------------------------------------------------------
197 -- TODO use proxy
198 flowAnnuaire :: (FlowCmdM env err m)
199 => User
200 -> Either CorpusName [CorpusId]
201 -> (TermType Lang)
202 -> FilePath
203 -> (JobLog -> m ())
204 -> m AnnuaireId
205 flowAnnuaire u n l filePath logStatus = do
206 -- TODO Conduit for file
207 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
208 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
209
210 ------------------------------------------------------------------------
211 flowCorpusFile :: (FlowCmdM env err m)
212 => User
213 -> Either CorpusName [CorpusId]
214 -> Limit -- Limit the number of docs (for dev purpose)
215 -> TermType Lang
216 -> FileType
217 -> FileFormat
218 -> FilePath
219 -> Maybe FlowSocialListWith
220 -> (JobLog -> m ())
221 -> m CorpusId
222 flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
223 eParsed <- liftBase $ parseFile ft ff fp
224 case eParsed of
225 Right parsed -> do
226 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
227 --let docs = splitEvery 500 $ take l parsed
228 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
229 Left e -> panic $ "Error: " <> T.pack e
230
231 ------------------------------------------------------------------------
232 -- | TODO improve the needed type to create/update a corpus
233 -- (For now, Either is enough)
234 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
235 => User
236 -> Either CorpusName [CorpusId]
237 -> TermType Lang
238 -> Maybe FlowSocialListWith
239 -> (Maybe Integer, ConduitT () a m ())
240 -> (JobLog -> m ())
241 -> m CorpusId
242 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
243
244
245 flow :: forall env err m a c.
246 ( FlowCmdM env err m
247 , FlowCorpus a
248 , MkCorpus c
249 )
250 => Maybe c
251 -> User
252 -> Either CorpusName [CorpusId]
253 -> TermType Lang
254 -> Maybe FlowSocialListWith
255 -> (Maybe Integer, ConduitT () a m ())
256 -> (JobLog -> m ())
257 -> m CorpusId
258 flow c u cn la mfslw (mLength, docsC) logStatus = do
259 -- TODO if public insertMasterDocs else insertUserDocs
260 ids <- runConduit $ zipSources (yieldMany [1..]) docsC
261 .| mapMC insertDoc
262 .| sinkList
263 -- ids <- traverse (\(idx, doc) -> do
264 -- id <- insertMasterDocs c la doc
265 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
266 -- , _scst_failed = Just 0
267 -- , _scst_remaining = Just $ length docs - idx
268 -- , _scst_events = Just []
269 -- }
270 -- pure id
271 -- ) (zip [1..] docs)
272 flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
273
274 where
275 insertDoc :: (Integer, a) -> m NodeId
276 insertDoc (idx, doc) = do
277 id <- insertMasterDocs c la [doc]
278 case mLength of
279 Nothing -> pure ()
280 Just len -> do
281 logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
282 , _scst_failed = Just 0
283 , _scst_remaining = Just $ fromIntegral $ len - idx
284 , _scst_events = Just []
285 }
286 pure $ Prelude.head id
287
288
289
290 ------------------------------------------------------------------------
291 flowCorpusUser :: ( FlowCmdM env err m
292 , MkCorpus c
293 )
294 => Lang
295 -> User
296 -> Either CorpusName [CorpusId]
297 -> Maybe c
298 -> [NodeId]
299 -> Maybe FlowSocialListWith
300 -> m CorpusId
301 flowCorpusUser l user corpusName ctype ids mfslw = do
302 -- User Flow
303 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
304 -- NodeTexts is first
305 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
306 -- printDebug "NodeTexts: " tId
307
308 -- NodeList is second
309 listId <- getOrMkList userCorpusId userId
310 -- _cooc <- insertDefaultNode NodeListCooc listId userId
311 -- TODO: check if present already, ignore
312 _ <- Doc.add userCorpusId ids
313
314 -- printDebug "Node Text Ids:" tId
315
316 -- User List Flow
317 (masterUserId, _masterRootId, masterCorpusId)
318 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
319
320 --let gp = (GroupParams l 2 3 (StopSize 3))
321 -- Here the PosTagAlgo should be chosen according the Lang
322 let gp = GroupWithPosTag l CoreNLP HashMap.empty
323 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
324
325 -- printDebug "flowCorpusUser:ngs" ngs
326
327 _userListId <- flowList_DbRepo listId ngs
328 _mastListId <- getOrMkList masterCorpusId masterUserId
329 -- _ <- insertOccsUpdates userCorpusId mastListId
330 -- printDebug "userListId" userListId
331 -- User Graph Flow
332 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
333 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
334 --_ <- mkPhylo userCorpusId userId
335 -- Annuaire Flow
336 -- _ <- mkAnnuaire rootUserId userId
337 _ <- updateNgramsOccurrences userCorpusId (Just listId)
338
339 pure userCorpusId
340
341
342 insertMasterDocs :: ( FlowCmdM env err m
343 , FlowCorpus a
344 , MkCorpus c
345 )
346 => Maybe c
347 -> TermType Lang
348 -> [a]
349 -> m [DocId]
350 insertMasterDocs c lang hs = do
351 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
352 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
353 _ <- Doc.add masterCorpusId ids'
354 -- TODO
355 -- create a corpus with database name (CSV or PubMed)
356 -- add documents to the corpus (create node_node link)
357 -- this will enable global database monitoring
358
359 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
360 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
361 <- mapNodeIdNgrams
362 <$> documentIdWithNgrams
363 (extractNgramsT $ withLang lang documentsWithId)
364 documentsWithId
365
366 lId <- getOrMkList masterCorpusId masterUserId
367 _ <- saveDocNgramsWith lId mapNgramsDocs'
368
369 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
370 pure ids'
371
372 saveDocNgramsWith :: ( FlowCmdM env err m)
373 => ListId
374 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
375 -> m ()
376 saveDocNgramsWith lId mapNgramsDocs' = do
377 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
378 --printDebug "terms2id" terms2id
379
380 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
381
382 -- new
383 mapCgramsId <- listInsertDb lId toNodeNgramsW'
384 $ map (first _ngramsTerms . second Map.keys)
385 $ HashMap.toList mapNgramsDocs
386
387 --printDebug "saveDocNgramsWith" mapCgramsId
388 -- insertDocNgrams
389 _return <- insertContextNodeNgrams2
390 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
391 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
392 <*> Just (fromIntegral w :: Double)
393 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
394 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
395 , (nId, w) <- Map.toList mapNodeIdWeight
396 ]
397
398 -- to be removed
399 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
400
401 pure ()
402
403
404 ------------------------------------------------------------------------
405 -- TODO Type NodeDocumentUnicised
406 insertDocs :: ( FlowCmdM env err m
407 -- , FlowCorpus a
408 , FlowInsertDB a
409 )
410 => UserId
411 -> CorpusId
412 -> [a]
413 -> m ([ContextId], [Indexed ContextId a])
414 insertDocs uId cId hs = do
415 let docs = map addUniqId hs
416 newIds <- insertDb uId Nothing docs
417 -- printDebug "newIds" newIds
418 let
419 newIds' = map reId newIds
420 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
421 _ <- Doc.add cId newIds'
422 pure (newIds', documentsWithId)
423
424
425 ------------------------------------------------------------------------
426 viewUniqId' :: UniqId a
427 => a
428 -> (Hash, a)
429 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
430 where
431 err = panic "[ERROR] Database.Flow.toInsert"
432
433
434 toInserted :: [ReturnId]
435 -> Map Hash ReturnId
436 toInserted =
437 Map.fromList . map (\r -> (reUniqId r, r) )
438 . filter (\r -> reInserted r == True)
439
440 mergeData :: Map Hash ReturnId
441 -> Map Hash a
442 -> [Indexed NodeId a]
443 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
444 where
445 toDocumentWithId (sha,hpd) =
446 Indexed <$> fmap reId (lookup sha rs)
447 <*> Just hpd
448
449 ------------------------------------------------------------------------
450 ------------------------------------------------------------------------
451 ------------------------------------------------------------------------
452 documentIdWithNgrams :: HasNodeError err
453 => (a
454 -> Cmd err (HashMap b (Map NgramsType Int)))
455 -> [Indexed NodeId a]
456 -> Cmd err [DocumentIdWithNgrams a b]
457 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
458 where
459 toDocumentIdWithNgrams d = do
460 e <- f $ _unIndex d
461 pure $ DocumentIdWithNgrams d e
462
463
464 -- | TODO check optimization
465 mapNodeIdNgrams :: (Ord b, Hashable b)
466 => [DocumentIdWithNgrams a b]
467 -> HashMap b
468 (Map NgramsType
469 (Map NodeId Int)
470 )
471 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith (+))) . fmap f
472 where
473 f :: DocumentIdWithNgrams a b
474 -> HashMap b (Map NgramsType (Map NodeId Int))
475 f d = fmap (fmap (Map.singleton nId)) $ documentNgrams d
476 where
477 nId = _index $ documentWithId d
478
479
480 ------------------------------------------------------------------------
481 instance ExtractNgramsT HyperdataContact
482 where
483 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
484 where
485 extract :: TermType Lang -> HyperdataContact
486 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
487 extract _l hc' = do
488 let authors = map text2ngrams
489 $ maybe ["Nothing"] (\a -> [a])
490 $ view (hc_who . _Just . cw_lastName) hc'
491
492 pure $ HashMap.fromList $ [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
493
494
495 instance ExtractNgramsT HyperdataDocument
496 where
497 extractNgramsT :: TermType Lang
498 -> HyperdataDocument
499 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
500 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
501 where
502 extractNgramsT' :: TermType Lang
503 -> HyperdataDocument
504 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
505 extractNgramsT' lang' doc = do
506 let source = text2ngrams
507 $ maybe "Nothing" identity
508 $ _hd_source doc
509
510 institutes = map text2ngrams
511 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
512 $ _hd_institutes doc
513
514 authors = map text2ngrams
515 $ maybe ["Nothing"] (T.splitOn ", ")
516 $ _hd_authors doc
517
518 terms' <- map (enrichedTerms (lang' ^. tt_lang) CoreNLP NP)
519 <$> concat
520 <$> liftBase (extractTerms lang' $ hasText doc)
521
522 pure $ HashMap.fromList
523 $ [(SimpleNgrams source, Map.singleton Sources 1) ]
524 <> [(SimpleNgrams i', Map.singleton Institutes 1) | i' <- institutes ]
525 <> [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
526 <> [(EnrichedNgrams t', Map.singleton NgramsTerms 1) | t' <- terms' ]
527
528 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
529 where
530 extractNgramsT l (Node _ _ _ _ _ _ _ h) = extractNgramsT l h
531
532 instance HasText a => HasText (Node a)
533 where
534 hasText (Node _ _ _ _ _ _ _ h) = hasText h
535
536
537
538 -- | TODO putelsewhere
539 -- | Upgrade function
540 -- Suppose all documents are English (this is the case actually)
541 indexAllDocumentsWithPosTag :: FlowCmdM env err m
542 => m ()
543 indexAllDocumentsWithPosTag = do
544 rootId <- getRootId (UserName userMaster)
545 corpusIds <- findNodesId rootId [NodeCorpus]
546 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
547 _ <- mapM extractInsert (splitEvery 1000 docs)
548 pure ()
549
550 extractInsert :: FlowCmdM env err m
551 => [Node HyperdataDocument] -> m ()
552 extractInsert docs = do
553 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
554 mapNgramsDocs' <- mapNodeIdNgrams
555 <$> documentIdWithNgrams
556 (extractNgramsT $ withLang (Multi EN) documentsWithId)
557 documentsWithId
558 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
559 pure ()