2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
41 , getOrMk_RootWithCorpus
47 , indexAllDocumentsWithPosTag
52 import Control.Lens ((^.), view, _Just, makeLenses, over, traverse)
53 import Data.Aeson.TH (deriveJSON)
54 import Data.Conduit.Internal (zipSources)
56 import Data.HashMap.Strict (HashMap)
57 import Data.Hashable (Hashable)
58 import Data.List (concat)
59 import Data.Map (Map, lookup)
60 import Data.Maybe (catMaybes)
63 import qualified Data.Text as T
64 import Data.Tuple.Extra (first, second)
65 import GHC.Generics (Generic)
66 import Servant.Client (ClientError)
67 import System.FilePath (FilePath)
68 import qualified Data.HashMap.Strict as HashMap
69 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
70 import qualified Data.Map as Map
71 import qualified Data.Conduit.List as CL
72 import qualified Data.Conduit as C
74 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
75 import Gargantext.Core (Lang(..), PosTagAlgo(..))
76 import Gargantext.Core.Ext.IMT (toSchoolName)
77 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
78 import Gargantext.Core.Flow.Types
79 import Gargantext.Core.Text
80 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
81 import Gargantext.Core.Text.List (buildNgramsLists)
82 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
83 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
84 import Gargantext.Core.Text.Terms
85 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
86 import Gargantext.Core.Types (POS(NP), TermsCount)
87 import Gargantext.Core.Types.Individu (User(..))
88 import Gargantext.Core.Types.Main
89 import Gargantext.Core.Utils (addTuples)
90 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
91 import Gargantext.Database.Action.Flow.List
92 import Gargantext.Database.Action.Flow.Types
93 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
94 import Gargantext.Database.Action.Search (searchDocInDatabase)
95 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
96 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
97 import Gargantext.Database.Admin.Types.Hyperdata
98 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
99 import Gargantext.Database.Prelude
100 import Gargantext.Database.Query.Table.ContextNodeNgrams2
101 import Gargantext.Database.Query.Table.Ngrams
102 import Gargantext.Database.Query.Table.Node
103 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
104 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
105 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
106 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
107 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
108 import Gargantext.Database.Types
109 import Gargantext.Prelude
110 import Gargantext.Prelude.Crypto.Hash (Hash)
111 import qualified Gargantext.Core.Text.Corpus.API as API
112 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
113 import qualified Prelude
115 ------------------------------------------------------------------------
116 -- Imports for upgrade function
117 import Gargantext.Database.Query.Tree.Root (getRootId)
118 import Gargantext.Database.Query.Tree (findNodesId)
119 import qualified Data.List as List
120 ------------------------------------------------------------------------
121 -- TODO use internal with API name (could be old data)
122 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
123 | ExternalOrigin { _do_api :: API.ExternalAPIs }
125 deriving (Generic, Eq)
127 makeLenses ''DataOrigin
128 deriveJSON (unPrefix "_do_") ''DataOrigin
129 instance ToSchema DataOrigin where
130 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
132 allDataOrigins :: [DataOrigin]
133 allDataOrigins = map InternalOrigin API.externalAPIs
134 <> map ExternalOrigin API.externalAPIs
137 data DataText = DataOld ![NodeId]
138 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
139 --- | DataNew ![[HyperdataDocument]]
141 -- Show instance is not possible because of IO
142 printDataText :: DataText -> IO ()
143 printDataText (DataOld xs) = putStrLn $ show xs
144 printDataText (DataNew (maybeInt, conduitData)) = do
145 res <- C.runConduit (conduitData .| CL.consume)
146 putStrLn $ show (maybeInt, res)
148 -- TODO use the split parameter in config file
149 getDataText :: FlowCmdM env err m
154 -> m (Either ClientError DataText)
155 getDataText (ExternalOrigin api) la q li = liftBase $ do
156 eRes <- API.get api (_tt_lang la) q li
157 pure $ DataNew <$> eRes
159 getDataText (InternalOrigin _) _la q _li = do
160 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
161 (UserName userMaster)
163 (Nothing :: Maybe HyperdataCorpus)
164 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
165 pure $ Right $ DataOld ids
167 getDataText_Debug :: FlowCmdM env err m
173 getDataText_Debug a l q li = do
174 result <- getDataText a l q li
176 Left err -> liftBase $ putStrLn $ show err
177 Right res -> liftBase $ printDataText res
180 -------------------------------------------------------------------------------
181 flowDataText :: forall env err m.
188 -> Maybe FlowSocialListWith
191 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
193 corpusType = (Nothing :: Maybe HyperdataCorpus)
194 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
195 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
197 ------------------------------------------------------------------------
199 flowAnnuaire :: (FlowCmdM env err m)
201 -> Either CorpusName [CorpusId]
206 flowAnnuaire u n l filePath logStatus = do
207 -- TODO Conduit for file
208 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
209 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
211 ------------------------------------------------------------------------
212 flowCorpusFile :: (FlowCmdM env err m)
214 -> Either CorpusName [CorpusId]
215 -> Limit -- Limit the number of docs (for dev purpose)
220 -> Maybe FlowSocialListWith
223 flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
224 eParsed <- liftBase $ parseFile ft ff fp
227 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
228 --let docs = splitEvery 500 $ take l parsed
229 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
230 Left e -> panic $ "Error: " <> T.pack e
232 ------------------------------------------------------------------------
233 -- | TODO improve the needed type to create/update a corpus
234 -- (For now, Either is enough)
235 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
237 -> Either CorpusName [CorpusId]
239 -> Maybe FlowSocialListWith
240 -> (Maybe Integer, ConduitT () a m ())
243 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
246 flow :: forall env err m a c.
253 -> Either CorpusName [CorpusId]
255 -> Maybe FlowSocialListWith
256 -> (Maybe Integer, ConduitT () a m ())
259 flow c u cn la mfslw (mLength, docsC) logStatus = do
260 -- TODO if public insertMasterDocs else insertUserDocs
261 ids <- runConduit $ zipSources (yieldMany [1..]) docsC
264 -- ids <- traverse (\(idx, doc) -> do
265 -- id <- insertMasterDocs c la doc
266 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
267 -- , _scst_failed = Just 0
268 -- , _scst_remaining = Just $ length docs - idx
269 -- , _scst_events = Just []
272 -- ) (zip [1..] docs)
273 flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
276 insertDoc :: (Integer, a) -> m NodeId
277 insertDoc (idx, doc) = do
278 id <- insertMasterDocs c la [doc]
282 logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
283 , _scst_failed = Just 0
284 , _scst_remaining = Just $ fromIntegral $ len - idx
285 , _scst_events = Just []
287 pure $ Prelude.head id
291 ------------------------------------------------------------------------
292 flowCorpusUser :: ( FlowCmdM env err m
297 -> Either CorpusName [CorpusId]
300 -> Maybe FlowSocialListWith
302 flowCorpusUser l user corpusName ctype ids mfslw = do
304 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
305 -- NodeTexts is first
306 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
307 -- printDebug "NodeTexts: " tId
309 -- NodeList is second
310 listId <- getOrMkList userCorpusId userId
311 -- _cooc <- insertDefaultNode NodeListCooc listId userId
312 -- TODO: check if present already, ignore
313 _ <- Doc.add userCorpusId ids
315 -- printDebug "Node Text Ids:" tId
318 (masterUserId, _masterRootId, masterCorpusId)
319 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
321 --let gp = (GroupParams l 2 3 (StopSize 3))
322 -- Here the PosTagAlgo should be chosen according to the Lang
323 let gp = GroupWithPosTag l CoreNLP HashMap.empty
324 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
326 -- printDebug "flowCorpusUser:ngs" ngs
328 _userListId <- flowList_DbRepo listId ngs
329 _mastListId <- getOrMkList masterCorpusId masterUserId
330 -- _ <- insertOccsUpdates userCorpusId mastListId
331 -- printDebug "userListId" userListId
333 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
334 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
335 --_ <- mkPhylo userCorpusId userId
337 -- _ <- mkAnnuaire rootUserId userId
338 _ <- updateNgramsOccurrences userCorpusId (Just listId)
343 insertMasterDocs :: ( FlowCmdM env err m
351 insertMasterDocs c lang hs = do
352 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
353 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
354 _ <- Doc.add masterCorpusId ids'
356 -- create a corpus with database name (CSV or PubMed)
357 -- add documents to the corpus (create node_node link)
358 -- this will enable global database monitoring
360 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
361 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
363 <$> documentIdWithNgrams
364 (extractNgramsT $ withLang lang documentsWithId)
367 lId <- getOrMkList masterCorpusId masterUserId
368 -- _ <- saveDocNgramsWith lId mapNgramsDocs'
369 _ <- saveDocNgramsWith lId mapNgramsDocs'
371 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
374 saveDocNgramsWith :: (FlowCmdM env err m)
376 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
378 saveDocNgramsWith lId mapNgramsDocs' = do
379 --printDebug "[saveDocNgramsWith] mapNgramsDocs'" mapNgramsDocs'
380 let mapNgramsDocsNoCount = over (traverse . traverse . traverse) fst mapNgramsDocs'
381 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocsNoCount
383 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
386 mapCgramsId <- listInsertDb lId toNodeNgramsW'
387 $ map (first _ngramsTerms . second Map.keys)
388 $ HashMap.toList mapNgramsDocs
390 --printDebug "saveDocNgramsWith" mapCgramsId
392 _return <- insertContextNodeNgrams2
393 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
394 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
395 <*> Just (fromIntegral w :: Double)
396 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
397 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
398 , (nId, (w, _cnt)) <- Map.toList mapNodeIdWeight
402 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
407 ------------------------------------------------------------------------
408 -- TODO Type NodeDocumentUnicised
409 insertDocs :: ( FlowCmdM env err m
416 -> m ([ContextId], [Indexed ContextId a])
417 insertDocs uId cId hs = do
418 let docs = map addUniqId hs
419 newIds <- insertDb uId Nothing docs
420 -- printDebug "newIds" newIds
422 newIds' = map reId newIds
423 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
424 _ <- Doc.add cId newIds'
425 pure (newIds', documentsWithId)
428 ------------------------------------------------------------------------
429 viewUniqId' :: UniqId a
432 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
434 err = panic "[ERROR] Database.Flow.toInsert"
437 toInserted :: [ReturnId]
440 Map.fromList . map (\r -> (reUniqId r, r) )
441 . filter (\r -> reInserted r == True)
443 mergeData :: Map Hash ReturnId
445 -> [Indexed NodeId a]
446 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
448 toDocumentWithId (sha,hpd) =
449 Indexed <$> fmap reId (lookup sha rs)
452 ------------------------------------------------------------------------
453 ------------------------------------------------------------------------
454 ------------------------------------------------------------------------
455 documentIdWithNgrams :: HasNodeError err
457 -> Cmd err (HashMap b (Map NgramsType Int, TermsCount)))
458 -> [Indexed NodeId a]
459 -> Cmd err [DocumentIdWithNgrams a b]
460 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
462 toDocumentIdWithNgrams d = do
464 pure $ DocumentIdWithNgrams d e
467 -- | TODO check optimization
468 mapNodeIdNgrams :: (Ord b, Hashable b)
469 => [DocumentIdWithNgrams a b]
472 (Map NodeId (Int, TermsCount))
474 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith addTuples)) . fmap f
476 -- | NOTE We are somehow multiplying 'TermsCount' here: If the
477 -- same ngrams term has different ngrams types, the 'TermsCount'
478 -- for it (which is the number of times the terms appears in a
479 -- document) is copied over to all its types.
480 f :: DocumentIdWithNgrams a b
481 -> HashMap b (Map NgramsType (Map NodeId (Int, TermsCount)))
482 f d = fmap (\(ngramsTypeMap, cnt) -> fmap (\i -> Map.singleton nId (i, cnt)) ngramsTypeMap) $ documentNgrams d
484 nId = _index $ documentWithId d
487 ------------------------------------------------------------------------
488 instance ExtractNgramsT HyperdataContact
490 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
492 extract :: TermType Lang -> HyperdataContact
493 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
495 let authors = map text2ngrams
496 $ maybe ["Nothing"] (\a -> [a])
497 $ view (hc_who . _Just . cw_lastName) hc'
499 pure $ HashMap.fromList $ [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
502 instance ExtractNgramsT HyperdataDocument
504 extractNgramsT :: TermType Lang
506 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
507 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
509 extractNgramsT' :: TermType Lang
511 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
512 extractNgramsT' lang' doc = do
513 let source = text2ngrams
514 $ maybe "Nothing" identity
517 institutes = map text2ngrams
518 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
521 authors = map text2ngrams
522 $ maybe ["Nothing"] (T.splitOn ", ")
525 termsWithCounts' <- map (\(t, cnt) -> (enrichedTerms (lang' ^. tt_lang) CoreNLP NP t, cnt))
527 <$> liftBase (extractTerms lang' $ hasText doc)
529 pure $ HashMap.fromList
530 $ [(SimpleNgrams source, (Map.singleton Sources 1, 1)) ]
531 <> [(SimpleNgrams i', (Map.singleton Institutes 1, 1)) | i' <- institutes ]
532 <> [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
533 <> [(EnrichedNgrams t', (Map.singleton NgramsTerms 1, cnt')) | (t', cnt') <- termsWithCounts' ]
535 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
537 extractNgramsT l (Node { _node_hyperdata = h }) = extractNgramsT l h
539 instance HasText a => HasText (Node a)
541 hasText (Node { _node_hyperdata = h }) = hasText h
545 -- | TODO putelsewhere
546 -- | Upgrade function
547 -- Suppose all documents are English (this is the case actually)
548 indexAllDocumentsWithPosTag :: FlowCmdM env err m
550 indexAllDocumentsWithPosTag = do
551 rootId <- getRootId (UserName userMaster)
552 corpusIds <- findNodesId rootId [NodeCorpus]
553 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
554 _ <- mapM extractInsert (splitEvery 1000 docs)
557 extractInsert :: FlowCmdM env err m
558 => [Node HyperdataDocument] -> m ()
559 extractInsert docs = do
560 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
561 mapNgramsDocs' <- mapNodeIdNgrams
562 <$> documentIdWithNgrams
563 (extractNgramsT $ withLang (Multi EN) documentsWithId)
565 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'