2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
41 , getOrMk_RootWithCorpus
47 , indexAllDocumentsWithPosTag
52 import Control.Lens ((^.), view, _Just, makeLenses, over, traverse)
53 import Control.Monad.Reader (MonadReader)
54 import Data.Aeson.TH (deriveJSON)
55 import Data.Conduit.Internal (zipSources)
56 import qualified Data.Conduit.List as CList
58 import Data.HashMap.Strict (HashMap)
59 import Data.Hashable (Hashable)
60 import Data.List (concat)
61 import Data.Map (Map, lookup)
62 import Data.Maybe (catMaybes)
65 import qualified Data.Text as T
66 import Data.Tuple.Extra (first, second)
67 import GHC.Generics (Generic)
68 import Servant.Client (ClientError)
69 import System.FilePath (FilePath)
70 import qualified Data.HashMap.Strict as HashMap
71 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
72 import qualified Data.Map as Map
73 import qualified Data.Conduit.List as CL
74 import qualified Data.Conduit as C
76 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
77 import Gargantext.Core (Lang(..), PosTagAlgo(..))
78 import Gargantext.Core.Ext.IMT (toSchoolName)
79 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
80 import Gargantext.Core.Flow.Types
81 import Gargantext.Core.Text
82 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
83 import Gargantext.Core.Text.List (buildNgramsLists)
84 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
85 import Gargantext.Core.Text.List.Social (FlowSocialListWith(..))
86 import Gargantext.Core.Text.Terms
87 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
88 import Gargantext.Core.Types (POS(NP), TermsCount)
89 import Gargantext.Core.Types.Individu (User(..))
90 import Gargantext.Core.Types.Main
91 import Gargantext.Core.Utils (addTuples)
92 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
93 import Gargantext.Database.Action.Flow.List
94 import Gargantext.Database.Action.Flow.Types
95 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
96 import Gargantext.Database.Action.Search (searchDocInDatabase)
97 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
98 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
99 import Gargantext.Database.Admin.Types.Hyperdata
100 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
101 import Gargantext.Database.Prelude
102 import Gargantext.Database.Query.Table.ContextNodeNgrams2
103 import Gargantext.Database.Query.Table.Ngrams
104 import Gargantext.Database.Query.Table.Node
105 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
106 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
107 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
108 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
109 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
110 import Gargantext.Database.Types
111 import Gargantext.Prelude
112 import Gargantext.Prelude.Crypto.Hash (Hash)
113 import qualified Gargantext.Core.Text.Corpus.API as API
114 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
115 --import qualified Prelude
117 ------------------------------------------------------------------------
118 -- Imports for upgrade function
119 import Gargantext.Database.Query.Tree.Root (getRootId)
120 import Gargantext.Database.Query.Tree (findNodesId)
121 import qualified Data.List as List
122 ------------------------------------------------------------------------
123 -- TODO use internal with API name (could be old data)
124 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
125 | ExternalOrigin { _do_api :: API.ExternalAPIs }
127 deriving (Generic, Eq)
129 makeLenses ''DataOrigin
130 deriveJSON (unPrefix "_do_") ''DataOrigin
131 instance ToSchema DataOrigin where
132 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
134 allDataOrigins :: ( MonadReader env m
135 , HasConfig env) => m [DataOrigin]
137 ext <- API.externalAPIs
139 pure $ map InternalOrigin ext
140 <> map ExternalOrigin ext
143 data DataText = DataOld ![NodeId]
144 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
145 --- | DataNew ![[HyperdataDocument]]
147 -- Show instance is not possible because of IO
148 printDataText :: DataText -> IO ()
149 printDataText (DataOld xs) = putStrLn $ show xs
150 printDataText (DataNew (maybeInt, conduitData)) = do
151 res <- C.runConduit (conduitData .| CL.consume)
152 putStrLn $ show (maybeInt, res)
154 -- TODO use the split parameter in config file
155 getDataText :: FlowCmdM env err m
160 -> m (Either ClientError DataText)
161 getDataText (ExternalOrigin api) la q li = liftBase $ do
162 eRes <- API.get api (_tt_lang la) q li
163 pure $ DataNew <$> eRes
165 getDataText (InternalOrigin _) _la q _li = do
166 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
167 (UserName userMaster)
169 (Nothing :: Maybe HyperdataCorpus)
170 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
171 pure $ Right $ DataOld ids
173 getDataText_Debug :: FlowCmdM env err m
179 getDataText_Debug a l q li = do
180 result <- getDataText a l q li
182 Left err -> liftBase $ putStrLn $ show err
183 Right res -> liftBase $ printDataText res
186 -------------------------------------------------------------------------------
187 flowDataText :: forall env err m.
194 -> Maybe FlowSocialListWith
197 flowDataText u (DataOld ids) tt cid mfslw _ = do
198 (_userId, userCorpusId, listId) <- createNodes u (Right [cid]) corpusType
199 _ <- Doc.add userCorpusId ids
200 flowCorpusUser (_tt_lang tt) u userCorpusId listId corpusType mfslw
202 corpusType = (Nothing :: Maybe HyperdataCorpus)
203 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
204 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
206 ------------------------------------------------------------------------
208 flowAnnuaire :: (FlowCmdM env err m)
210 -> Either CorpusName [CorpusId]
215 flowAnnuaire u n l filePath logStatus = do
216 -- TODO Conduit for file
217 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
218 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
220 ------------------------------------------------------------------------
221 flowCorpusFile :: (FlowCmdM env err m)
223 -> Either CorpusName [CorpusId]
224 -> Limit -- Limit the number of docs (for dev purpose)
229 -> Maybe FlowSocialListWith
232 flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
233 eParsed <- liftBase $ parseFile ft ff fp
236 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
237 --let docs = splitEvery 500 $ take l parsed
238 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
239 Left e -> panic $ "Error: " <> T.pack e
241 ------------------------------------------------------------------------
242 -- | TODO improve the needed type to create/update a corpus
243 -- (For now, Either is enough)
244 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
246 -> Either CorpusName [CorpusId]
248 -> Maybe FlowSocialListWith
249 -> (Maybe Integer, ConduitT () a m ())
252 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
255 flow :: forall env err m a c.
262 -> Either CorpusName [CorpusId]
264 -> Maybe FlowSocialListWith
265 -> (Maybe Integer, ConduitT () a m ())
268 flow c u cn la mfslw (mLength, docsC) logStatus = do
269 (_userId, userCorpusId, listId) <- createNodes u cn c
270 -- TODO if public insertMasterDocs else insertUserDocs
271 _ <- runConduit $ zipSources (yieldMany [1..]) docsC
272 .| CList.chunksOf 100
274 .| mapM_C (\ids' -> do
275 _ <- Doc.add userCorpusId ids'
279 _ <- flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw
281 -- ids <- traverse (\(idx, doc) -> do
282 -- id <- insertMasterDocs c la doc
283 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
284 -- , _scst_failed = Just 0
285 -- , _scst_remaining = Just $ length docs - idx
286 -- , _scst_events = Just []
289 -- ) (zip [1..] docs)
290 --printDebug "[flow] calling flowCorpusUser" (0 :: Int)
292 --flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
295 insertDocs' :: [(Integer, a)] -> m [NodeId]
296 insertDocs' [] = pure []
297 insertDocs' docs = do
298 printDebug "[flow] calling insertDoc, ([idx], mLength) = " (fst <$> docs, mLength)
299 ids <- insertMasterDocs c la (snd <$> docs)
300 let maxIdx = maximum (fst <$> docs)
304 logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + maxIdx
305 , _scst_failed = Just 0
306 , _scst_remaining = Just $ fromIntegral $ len - maxIdx
307 , _scst_events = Just []
313 ------------------------------------------------------------------------
314 createNodes :: ( FlowCmdM env err m
318 -> Either CorpusName [CorpusId]
320 -> m (UserId, CorpusId, ListId)
321 createNodes user corpusName ctype = do
323 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
324 -- NodeTexts is first
325 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
326 -- printDebug "NodeTexts: " tId
328 -- NodeList is second
329 listId <- getOrMkList userCorpusId userId
332 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
333 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
335 pure (userId, userCorpusId, listId)
338 flowCorpusUser :: ( FlowCmdM env err m
346 -> Maybe FlowSocialListWith
348 flowCorpusUser l user userCorpusId listId ctype mfslw = do
350 (masterUserId, _masterRootId, masterCorpusId)
351 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
353 --let gp = (GroupParams l 2 3 (StopSize 3))
354 -- Here the PosTagAlgo should be chosen according to the Lang
356 (Just (NoList _)) -> do
357 printDebug "Do not build list" mfslw
360 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw
361 $ GroupWithPosTag l CoreNLP HashMap.empty
363 -- printDebug "flowCorpusUser:ngs" ngs
365 _userListId <- flowList_DbRepo listId ngs
366 _mastListId <- getOrMkList masterCorpusId masterUserId
368 -- _ <- insertOccsUpdates userCorpusId mastListId
369 -- printDebug "userListId" userListId
370 --_ <- mkPhylo userCorpusId userId
372 -- _ <- mkAnnuaire rootUserId userId
373 _ <- updateNgramsOccurrences userCorpusId (Just listId)
378 insertMasterDocs :: ( FlowCmdM env err m
386 insertMasterDocs c lang hs = do
387 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
388 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
389 _ <- Doc.add masterCorpusId ids'
391 -- create a corpus with database name (CSV or PubMed)
392 -- add documents to the corpus (create node_node link)
393 -- this will enable global database monitoring
395 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
396 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
398 <$> documentIdWithNgrams
399 (extractNgramsT $ withLang lang documentsWithId)
402 lId <- getOrMkList masterCorpusId masterUserId
403 -- _ <- saveDocNgramsWith lId mapNgramsDocs'
404 _ <- saveDocNgramsWith lId mapNgramsDocs'
406 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
409 saveDocNgramsWith :: (FlowCmdM env err m)
411 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
413 saveDocNgramsWith lId mapNgramsDocs' = do
414 --printDebug "[saveDocNgramsWith] mapNgramsDocs'" mapNgramsDocs'
415 let mapNgramsDocsNoCount = over (traverse . traverse . traverse) fst mapNgramsDocs'
416 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocsNoCount
418 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
421 mapCgramsId <- listInsertDb lId toNodeNgramsW'
422 $ map (first _ngramsTerms . second Map.keys)
423 $ HashMap.toList mapNgramsDocs
425 --printDebug "saveDocNgramsWith" mapCgramsId
427 _return <- insertContextNodeNgrams2
428 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
429 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
430 <*> Just (fromIntegral w :: Double)
431 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
432 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
433 , (nId, (w, _cnt)) <- Map.toList mapNodeIdWeight
437 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
442 ------------------------------------------------------------------------
443 -- TODO Type NodeDocumentUnicised
444 insertDocs :: ( FlowCmdM env err m
451 -> m ([ContextId], [Indexed ContextId a])
452 insertDocs uId cId hs = do
453 let docs = map addUniqId hs
454 newIds <- insertDb uId Nothing docs
455 -- printDebug "newIds" newIds
457 newIds' = map reId newIds
458 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
459 _ <- Doc.add cId newIds'
460 pure (newIds', documentsWithId)
463 ------------------------------------------------------------------------
464 viewUniqId' :: UniqId a
467 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
469 err = panic "[ERROR] Database.Flow.toInsert"
472 toInserted :: [ReturnId]
475 Map.fromList . map (\r -> (reUniqId r, r) )
476 . filter (\r -> reInserted r == True)
478 mergeData :: Map Hash ReturnId
480 -> [Indexed NodeId a]
481 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
483 toDocumentWithId (sha,hpd) =
484 Indexed <$> fmap reId (lookup sha rs)
487 ------------------------------------------------------------------------
488 ------------------------------------------------------------------------
489 ------------------------------------------------------------------------
490 documentIdWithNgrams :: HasNodeError err
492 -> Cmd err (HashMap b (Map NgramsType Int, TermsCount)))
493 -> [Indexed NodeId a]
494 -> Cmd err [DocumentIdWithNgrams a b]
495 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
497 toDocumentIdWithNgrams d = do
499 pure $ DocumentIdWithNgrams d e
502 -- | TODO check optimization
503 mapNodeIdNgrams :: (Ord b, Hashable b)
504 => [DocumentIdWithNgrams a b]
507 (Map NodeId (Int, TermsCount))
509 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith addTuples)) . fmap f
511 -- | NOTE We are somehow multiplying 'TermsCount' here: If the
512 -- same ngrams term has different ngrams types, the 'TermsCount'
513 -- for it (which is the number of times the terms appears in a
514 -- document) is copied over to all its types.
515 f :: DocumentIdWithNgrams a b
516 -> HashMap b (Map NgramsType (Map NodeId (Int, TermsCount)))
517 f d = fmap (\(ngramsTypeMap, cnt) -> fmap (\i -> Map.singleton nId (i, cnt)) ngramsTypeMap) $ documentNgrams d
519 nId = _index $ documentWithId d
522 ------------------------------------------------------------------------
523 instance ExtractNgramsT HyperdataContact
525 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
527 extract :: TermType Lang -> HyperdataContact
528 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
530 let authors = map text2ngrams
531 $ maybe ["Nothing"] (\a -> [a])
532 $ view (hc_who . _Just . cw_lastName) hc'
534 pure $ HashMap.fromList $ [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
537 instance ExtractNgramsT HyperdataDocument
539 extractNgramsT :: TermType Lang
541 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
542 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
544 extractNgramsT' :: TermType Lang
546 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
547 extractNgramsT' lang' doc = do
548 let source = text2ngrams
549 $ maybe "Nothing" identity
552 institutes = map text2ngrams
553 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
556 authors = map text2ngrams
557 $ maybe ["Nothing"] (T.splitOn ", ")
560 termsWithCounts' <- map (\(t, cnt) -> (enrichedTerms (lang' ^. tt_lang) CoreNLP NP t, cnt))
562 <$> liftBase (extractTerms lang' $ hasText doc)
564 pure $ HashMap.fromList
565 $ [(SimpleNgrams source, (Map.singleton Sources 1, 1)) ]
566 <> [(SimpleNgrams i', (Map.singleton Institutes 1, 1)) | i' <- institutes ]
567 <> [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
568 <> [(EnrichedNgrams t', (Map.singleton NgramsTerms 1, cnt')) | (t', cnt') <- termsWithCounts' ]
570 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
572 extractNgramsT l (Node { _node_hyperdata = h }) = extractNgramsT l h
574 instance HasText a => HasText (Node a)
576 hasText (Node { _node_hyperdata = h }) = hasText h
580 -- | TODO putelsewhere
581 -- | Upgrade function
582 -- Suppose all documents are English (this is the case actually)
583 indexAllDocumentsWithPosTag :: FlowCmdM env err m
585 indexAllDocumentsWithPosTag = do
586 rootId <- getRootId (UserName userMaster)
587 corpusIds <- findNodesId rootId [NodeCorpus]
588 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
589 _ <- mapM extractInsert (splitEvery 1000 docs)
592 extractInsert :: FlowCmdM env err m
593 => [Node HyperdataDocument] -> m ()
594 extractInsert docs = do
595 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
596 mapNgramsDocs' <- mapNodeIdNgrams
597 <$> documentIdWithNgrams
598 (extractNgramsT $ withLang (Multi EN) documentsWithId)
600 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'