2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
41 , getOrMk_RootWithCorpus
47 , indexAllDocumentsWithPosTag
52 import Control.Lens ((^.), view, _Just, makeLenses, over, traverse)
53 import Data.Aeson.TH (deriveJSON)
54 import Data.Conduit.Internal (zipSources)
55 import qualified Data.Conduit.List as CList
57 import Data.HashMap.Strict (HashMap)
58 import Data.Hashable (Hashable)
59 import Data.List (concat)
60 import Data.Map.Strict (Map, lookup)
61 import Data.Maybe (catMaybes)
64 import qualified Data.Text as T
65 import Data.Tuple.Extra (first, second)
66 import GHC.Generics (Generic)
67 import System.FilePath (FilePath)
68 import qualified Data.HashMap.Strict as HashMap
69 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
70 import qualified Data.Map.Strict as Map
71 import qualified Data.Conduit.List as CL
72 import qualified Data.Conduit as C
74 import Gargantext.Core (Lang(..), PosTagAlgo(..))
75 -- import Gargantext.Core.Ext.IMT (toSchoolName)
76 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
77 import Gargantext.Core.Flow.Types
78 import Gargantext.Core.NLP (nlpServerGet)
79 import Gargantext.Core.Text
80 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType, splitOn)
81 import Gargantext.Core.Text.List (buildNgramsLists)
82 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
83 import Gargantext.Core.Text.List.Social (FlowSocialListWith(..))
84 import Gargantext.Core.Text.Terms
85 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
86 import Gargantext.Core.Types (POS(NP), TermsCount)
87 import Gargantext.Core.Types.Individu (User(..))
88 import Gargantext.Core.Types.Main
89 import Gargantext.Core.Types.Query (Limit)
90 import Gargantext.Core.Utils (addTuples)
91 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
92 import Gargantext.Database.Action.Flow.List
93 import Gargantext.Database.Action.Flow.Types
94 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
95 import Gargantext.Database.Action.Search (searchDocInDatabase)
96 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
97 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
98 import Gargantext.Database.Admin.Types.Hyperdata
99 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
100 import Gargantext.Database.Prelude
101 import Gargantext.Database.Query.Table.ContextNodeNgrams2
102 import Gargantext.Database.Query.Table.Ngrams
103 import Gargantext.Database.Query.Table.Node
104 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
105 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
106 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
107 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
108 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
109 import Gargantext.Database.Types
110 import Gargantext.Prelude
111 import Gargantext.Prelude.Crypto.Hash (Hash)
112 import Gargantext.Utils.Jobs (JobHandle, MonadJobStatus(..))
113 import qualified Gargantext.Core.Text.Corpus.API as API
114 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
115 --import qualified Prelude
117 ------------------------------------------------------------------------
118 -- Imports for upgrade function
119 import Gargantext.Database.Query.Tree.Root (getRootId)
120 import Gargantext.Database.Query.Tree (findNodesId)
121 import qualified Data.List as List
122 ------------------------------------------------------------------------
123 -- TODO use internal with API name (could be old data)
124 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
125 | ExternalOrigin { _do_api :: API.ExternalAPIs }
127 deriving (Generic, Eq)
129 makeLenses ''DataOrigin
130 deriveJSON (unPrefix "_do_") ''DataOrigin
131 instance ToSchema DataOrigin where
132 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
134 allDataOrigins :: [DataOrigin]
135 allDataOrigins = map InternalOrigin API.externalAPIs <> map ExternalOrigin API.externalAPIs
138 data DataText = DataOld ![NodeId]
139 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
140 --- | DataNew ![[HyperdataDocument]]
142 -- Show instance is not possible because of IO
143 printDataText :: DataText -> IO ()
144 printDataText (DataOld xs) = putStrLn $ show xs
145 printDataText (DataNew (maybeInt, conduitData)) = do
146 res <- C.runConduit (conduitData .| CL.consume)
147 putStrLn $ show (maybeInt, res)
149 -- TODO use the split parameter in config file
150 getDataText :: FlowCmdM env err m
155 -> m (Either API.GetCorpusError DataText)
156 getDataText (ExternalOrigin api) la q li = do
157 cfg <- view $ hasConfig
158 eRes <- liftBase $ API.get cfg api (_tt_lang la) q li
159 pure $ DataNew <$> eRes
161 getDataText (InternalOrigin _) _la q _li = do
162 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
163 (UserName userMaster)
165 (Nothing :: Maybe HyperdataCorpus)
166 ids <- map fst <$> searchDocInDatabase cId (stemIt $ API.getRawQuery q)
167 pure $ Right $ DataOld ids
169 getDataText_Debug :: FlowCmdM env err m
175 getDataText_Debug a l q li = do
176 result <- getDataText a l q li
178 Left err -> liftBase $ putStrLn $ show err
179 Right res -> liftBase $ printDataText res
182 -------------------------------------------------------------------------------
183 flowDataText :: forall env err m.
191 -> Maybe FlowSocialListWith
194 flowDataText u (DataOld ids) tt cid mfslw _ = do
195 (_userId, userCorpusId, listId) <- createNodes u (Right [cid]) corpusType
196 _ <- Doc.add userCorpusId ids
197 flowCorpusUser (_tt_lang tt) u userCorpusId listId corpusType mfslw
199 corpusType = (Nothing :: Maybe HyperdataCorpus)
200 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw jobHandle =
201 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) jobHandle
203 ------------------------------------------------------------------------
205 flowAnnuaire :: (FlowCmdM env err m, MonadJobStatus m)
207 -> Either CorpusName [CorpusId]
212 flowAnnuaire u n l filePath jobHandle = do
213 -- TODO Conduit for file
214 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
215 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) jobHandle
217 ------------------------------------------------------------------------
218 flowCorpusFile :: (FlowCmdM env err m, MonadJobStatus m)
220 -> Either CorpusName [CorpusId]
221 -> Limit -- Limit the number of docs (for dev purpose)
226 -> Maybe FlowSocialListWith
229 flowCorpusFile u n _l la ft ff fp mfslw jobHandle = do
230 eParsed <- liftBase $ parseFile ft ff fp
233 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) jobHandle
234 --let docs = splitEvery 500 $ take l parsed
235 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
236 Left e -> panic $ "Error: " <> T.pack e
238 ------------------------------------------------------------------------
239 -- | TODO improve the needed type to create/update a corpus
240 -- (For now, Either is enough)
241 flowCorpus :: (FlowCmdM env err m, FlowCorpus a, MonadJobStatus m)
243 -> Either CorpusName [CorpusId]
245 -> Maybe FlowSocialListWith
246 -> (Maybe Integer, ConduitT () a m ())
249 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
252 flow :: forall env err m a c.
260 -> Either CorpusName [CorpusId]
262 -> Maybe FlowSocialListWith
263 -> (Maybe Integer, ConduitT () a m ())
266 flow c u cn la mfslw (mLength, docsC) jobHandle = do
267 (_userId, userCorpusId, listId) <- createNodes u cn c
268 -- TODO if public insertMasterDocs else insertUserDocs
269 _ <- runConduit $ zipSources (yieldMany [1..]) docsC
270 .| CList.chunksOf 100
272 .| mapM_C (\ids' -> do
273 _ <- Doc.add userCorpusId ids'
277 _ <- flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw
279 -- ids <- traverse (\(idx, doc) -> do
280 -- id <- insertMasterDocs c la doc
281 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
282 -- , _scst_failed = Just 0
283 -- , _scst_remaining = Just $ length docs - idx
284 -- , _scst_events = Just []
287 -- ) (zip [1..] docs)
288 --printDebug "[flow] calling flowCorpusUser" (0 :: Int)
290 --flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
293 insertDocs' :: [(Integer, a)] -> m [NodeId]
294 insertDocs' [] = pure []
295 insertDocs' docs = do
296 -- printDebug "[flow] calling insertDoc, ([idx], mLength) = " (fst <$> docs, mLength)
297 ids <- insertMasterDocs c la (snd <$> docs)
298 let maxIdx = maximum (fst <$> docs)
303 let succeeded = fromIntegral (1 + maxIdx)
304 -- let remaining = fromIntegral (len - maxIdx)
305 -- Reconstruct the correct update state by using 'markStarted' and the other primitives.
306 -- We do this slightly awkward arithmetic such that when we call 'markProgress' we reduce
307 -- the number of 'remaining' of exactly '1 + maxIdx', and we will end up with a 'JobLog'
308 -- looking like this:
310 -- { _scst_succeeded = Just $ fromIntegral $ 1 + maxIdx
311 -- , _scst_failed = Just 0
312 -- , _scst_remaining = Just $ fromIntegral $ len - maxIdx
313 -- , _scst_events = Just []
315 -- markStarted (remaining + succeeded) jobHandle
316 markProgress succeeded jobHandle
322 ------------------------------------------------------------------------
323 createNodes :: ( FlowCmdM env err m
327 -> Either CorpusName [CorpusId]
329 -> m (UserId, CorpusId, ListId)
330 createNodes user corpusName ctype = do
332 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
333 -- NodeTexts is first
334 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
335 -- printDebug "NodeTexts: " tId
337 -- NodeList is second
338 listId <- getOrMkList userCorpusId userId
341 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
342 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
344 pure (userId, userCorpusId, listId)
347 flowCorpusUser :: ( FlowCmdM env err m
355 -> Maybe FlowSocialListWith
357 flowCorpusUser l user userCorpusId listId ctype mfslw = do
358 server <- view (nlpServerGet l)
360 (masterUserId, _masterRootId, masterCorpusId)
361 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
363 --let gp = (GroupParams l 2 3 (StopSize 3))
364 -- Here the PosTagAlgo should be chosen according to the Lang
366 (Just (NoList _)) -> do
367 -- printDebug "Do not build list" mfslw
370 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw
371 $ GroupWithPosTag l server HashMap.empty
373 -- printDebug "flowCorpusUser:ngs" ngs
375 _userListId <- flowList_DbRepo listId ngs
376 _mastListId <- getOrMkList masterCorpusId masterUserId
378 -- _ <- insertOccsUpdates userCorpusId mastListId
379 -- printDebug "userListId" userListId
380 --_ <- mkPhylo userCorpusId userId
382 -- _ <- mkAnnuaire rootUserId userId
383 _ <- updateNgramsOccurrences userCorpusId (Just listId)
388 insertMasterDocs :: ( FlowCmdM env err m
396 insertMasterDocs c lang hs = do
397 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
398 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
399 _ <- Doc.add masterCorpusId ids'
401 -- create a corpus with database name (CSV or PubMed)
402 -- add documents to the corpus (create node_node link)
403 -- this will enable global database monitoring
405 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
406 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
408 <$> documentIdWithNgrams
409 (extractNgramsT $ withLang lang documentsWithId)
412 lId <- getOrMkList masterCorpusId masterUserId
413 -- _ <- saveDocNgramsWith lId mapNgramsDocs'
414 _ <- saveDocNgramsWith lId mapNgramsDocs'
416 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
419 saveDocNgramsWith :: (FlowCmdM env err m)
421 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
423 saveDocNgramsWith lId mapNgramsDocs' = do
424 --printDebug "[saveDocNgramsWith] mapNgramsDocs'" mapNgramsDocs'
425 let mapNgramsDocsNoCount = over (traverse . traverse . traverse) fst mapNgramsDocs'
426 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocsNoCount
428 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
431 mapCgramsId <- listInsertDb lId toNodeNgramsW'
432 $ map (first _ngramsTerms . second Map.keys)
433 $ HashMap.toList mapNgramsDocs
435 --printDebug "saveDocNgramsWith" mapCgramsId
437 let ngrams2insert = catMaybes [ ContextNodeNgrams2 <$> Just nId
438 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
439 <*> Just (fromIntegral w :: Double)
440 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
441 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
442 , (nId, (w, _cnt)) <- Map.toList mapNodeIdWeight
444 -- printDebug "Ngrams2Insert" ngrams2insert
445 _return <- insertContextNodeNgrams2 ngrams2insert
448 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
453 ------------------------------------------------------------------------
454 -- TODO Type NodeDocumentUnicised
455 insertDocs :: ( FlowCmdM env err m
462 -> m ([ContextId], [Indexed ContextId a])
463 insertDocs uId cId hs = do
464 let docs = map addUniqId hs
465 newIds <- insertDb uId Nothing docs
466 -- printDebug "newIds" newIds
468 newIds' = map reId newIds
469 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
470 _ <- Doc.add cId newIds'
471 pure (newIds', documentsWithId)
474 ------------------------------------------------------------------------
475 viewUniqId' :: UniqId a
478 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
480 err = panic "[ERROR] Database.Flow.toInsert"
483 toInserted :: [ReturnId]
486 Map.fromList . map (\r -> (reUniqId r, r) )
487 . filter (\r -> reInserted r == True)
489 mergeData :: Map Hash ReturnId
491 -> [Indexed NodeId a]
492 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
494 toDocumentWithId (sha,hpd) =
495 Indexed <$> fmap reId (lookup sha rs)
498 ------------------------------------------------------------------------
499 ------------------------------------------------------------------------
500 ------------------------------------------------------------------------
501 documentIdWithNgrams :: HasNodeError err
503 -> Cmd err (HashMap b (Map NgramsType Int, TermsCount)))
504 -> [Indexed NodeId a]
505 -> Cmd err [DocumentIdWithNgrams a b]
506 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
508 toDocumentIdWithNgrams d = do
510 pure $ DocumentIdWithNgrams d e
513 -- | TODO check optimization
514 mapNodeIdNgrams :: (Ord b, Hashable b)
515 => [DocumentIdWithNgrams a b]
518 (Map NodeId (Int, TermsCount))
520 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith addTuples)) . fmap f
522 -- | NOTE We are somehow multiplying 'TermsCount' here: If the
523 -- same ngrams term has different ngrams types, the 'TermsCount'
524 -- for it (which is the number of times the terms appears in a
525 -- document) is copied over to all its types.
526 f :: DocumentIdWithNgrams a b
527 -> HashMap b (Map NgramsType (Map NodeId (Int, TermsCount)))
528 f d = fmap (\(ngramsTypeMap, cnt) -> fmap (\i -> Map.singleton nId (i, cnt)) ngramsTypeMap) $ documentNgrams d
530 nId = _index $ documentWithId d
533 ------------------------------------------------------------------------
534 instance ExtractNgramsT HyperdataContact
536 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
538 extract :: TermType Lang -> HyperdataContact
539 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
541 let authors = map text2ngrams
542 $ maybe ["Nothing"] (\a -> [a])
543 $ view (hc_who . _Just . cw_lastName) hc'
545 pure $ HashMap.fromList $ [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
548 instance ExtractNgramsT HyperdataDocument
550 extractNgramsT :: TermType Lang
552 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
553 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
555 extractNgramsT' :: TermType Lang
557 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
558 extractNgramsT' lang' doc = do
559 let source = text2ngrams
560 $ maybe "Nothing" identity
563 institutes = map text2ngrams
564 $ maybe ["Nothing"] (splitOn Institutes (doc^. hd_bdd))
567 authors = map text2ngrams
568 $ maybe ["Nothing"] (splitOn Authors (doc^. hd_bdd))
571 ncs <- view (nlpServerGet $ lang' ^. tt_lang)
573 termsWithCounts' <- map (\(t, cnt) -> (enrichedTerms (lang' ^. tt_lang) CoreNLP NP t, cnt))
575 <$> liftBase (extractTerms ncs lang' $ hasText doc)
577 pure $ HashMap.fromList
578 $ [(SimpleNgrams source, (Map.singleton Sources 1, 1)) ]
579 <> [(SimpleNgrams i', (Map.singleton Institutes 1, 1)) | i' <- institutes ]
580 <> [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
581 <> [(EnrichedNgrams t', (Map.singleton NgramsTerms 1, cnt')) | (t', cnt') <- termsWithCounts' ]
583 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
585 extractNgramsT l (Node { _node_hyperdata = h }) = extractNgramsT l h
587 instance HasText a => HasText (Node a)
589 hasText (Node { _node_hyperdata = h }) = hasText h
593 -- | TODO putelsewhere
594 -- | Upgrade function
595 -- Suppose all documents are English (this is the case actually)
596 indexAllDocumentsWithPosTag :: FlowCmdM env err m
598 indexAllDocumentsWithPosTag = do
599 rootId <- getRootId (UserName userMaster)
600 corpusIds <- findNodesId rootId [NodeCorpus]
601 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
602 _ <- mapM extractInsert (splitEvery 1000 docs)
605 extractInsert :: FlowCmdM env err m
606 => [Node HyperdataDocument] -> m ()
607 extractInsert docs = do
608 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
609 mapNgramsDocs' <- mapNodeIdNgrams
610 <$> documentIdWithNgrams
611 (extractNgramsT $ withLang (Multi EN) documentsWithId)
613 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'