2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
40 , getOrMk_RootWithCorpus
46 , indexAllDocumentsWithPosTag
51 import Control.Lens ((^.), view, _Just, makeLenses)
52 import Data.Aeson.TH (deriveJSON)
53 import Data.Conduit.Internal (zipSources)
55 import Data.HashMap.Strict (HashMap)
56 import Data.Hashable (Hashable)
57 import Data.List (concat)
58 import Data.Map (Map, lookup)
59 import Data.Maybe (catMaybes)
62 import qualified Data.Text as T
63 import Data.Traversable (traverse)
64 import Data.Tuple.Extra (first, second)
65 import GHC.Generics (Generic)
66 import Servant.Client (ClientError)
67 import System.FilePath (FilePath)
68 import qualified Data.HashMap.Strict as HashMap
69 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
70 import qualified Data.Map as Map
71 import qualified Data.Conduit.List as CL
72 import qualified Data.Conduit as C
74 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
75 import Gargantext.Core (Lang(..), PosTagAlgo(..))
76 import Gargantext.Core.Ext.IMT (toSchoolName)
77 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
78 import Gargantext.Core.Flow.Types
79 import Gargantext.Core.Text
80 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
81 import Gargantext.Core.Text.List (buildNgramsLists)
82 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
83 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
84 import Gargantext.Core.Text.Terms
85 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
86 import Gargantext.Core.Types (POS(NP))
87 import Gargantext.Core.Types.Individu (User(..))
88 import Gargantext.Core.Types.Main
89 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
90 import Gargantext.Database.Action.Flow.List
91 import Gargantext.Database.Action.Flow.Types
92 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
93 import Gargantext.Database.Action.Search (searchDocInDatabase)
94 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
95 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
96 import Gargantext.Database.Admin.Types.Hyperdata
97 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
98 import Gargantext.Database.Prelude
99 import Gargantext.Database.Query.Table.ContextNodeNgrams2
100 import Gargantext.Database.Query.Table.Ngrams
101 import Gargantext.Database.Query.Table.Node
102 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
103 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
104 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
105 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
106 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
107 import Gargantext.Database.Types
108 import Gargantext.Prelude
109 import Gargantext.Prelude.Crypto.Hash (Hash)
110 import qualified Gargantext.Core.Text.Corpus.API as API
111 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
112 import qualified Prelude
114 ------------------------------------------------------------------------
115 -- Imports for upgrade function
116 import Gargantext.Database.Query.Tree.Root (getRootId)
117 import Gargantext.Database.Query.Tree (findNodesId)
118 import qualified Data.List as List
119 ------------------------------------------------------------------------
120 -- TODO use internal with API name (could be old data)
121 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
122 | ExternalOrigin { _do_api :: API.ExternalAPIs }
124 deriving (Generic, Eq)
126 makeLenses ''DataOrigin
127 deriveJSON (unPrefix "_do_") ''DataOrigin
128 instance ToSchema DataOrigin where
129 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
131 allDataOrigins :: [DataOrigin]
132 allDataOrigins = map InternalOrigin API.externalAPIs
133 <> map ExternalOrigin API.externalAPIs
136 data DataText = DataOld ![NodeId]
137 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
138 -- | DataNew ![[HyperdataDocument]]
140 -- Show instance is not possible because of IO
141 printDataText :: DataText -> IO ()
142 printDataText (DataOld xs) = putStrLn $ show xs
143 printDataText (DataNew (maybeInt, conduitData)) = do
144 res <- C.runConduit (conduitData .| CL.consume)
145 putStrLn $ show (maybeInt, res)
147 -- TODO use the split parameter in config file
148 getDataText :: FlowCmdM env err m
153 -> m (Either ClientError DataText)
154 getDataText (ExternalOrigin api) la q li = liftBase $ do
155 eRes <- API.get api (_tt_lang la) q li
156 pure $ DataNew <$> eRes
158 getDataText (InternalOrigin _) _la q _li = do
159 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
160 (UserName userMaster)
162 (Nothing :: Maybe HyperdataCorpus)
163 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
164 pure $ Right $ DataOld ids
166 getDataText_Debug :: FlowCmdM env err m
172 getDataText_Debug a l q li = do
173 result <- getDataText a l q li
175 Left err -> liftBase $ putStrLn $ show err
176 Right res -> liftBase $ printDataText res
180 -------------------------------------------------------------------------------
181 flowDataText :: forall env err m.
188 -> Maybe FlowSocialListWith
191 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
193 corpusType = (Nothing :: Maybe HyperdataCorpus)
194 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
195 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
197 ------------------------------------------------------------------------
199 flowAnnuaire :: (FlowCmdM env err m)
201 -> Either CorpusName [CorpusId]
206 flowAnnuaire u n l filePath logStatus = do
207 -- TODO Conduit for file
208 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
209 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
211 ------------------------------------------------------------------------
212 flowCorpusFile :: (FlowCmdM env err m)
214 -> Either CorpusName [CorpusId]
215 -> Limit -- Limit the number of docs (for dev purpose)
220 -> Maybe FlowSocialListWith
223 flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
224 eParsed <- liftBase $ parseFile ft ff fp
227 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
228 --let docs = splitEvery 500 $ take l parsed
229 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
230 Left e -> panic $ "Error: " <> T.pack e
232 ------------------------------------------------------------------------
233 -- | TODO improve the needed type to create/update a corpus
234 -- (For now, Either is enough)
235 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
237 -> Either CorpusName [CorpusId]
239 -> Maybe FlowSocialListWith
240 -> (Maybe Integer, ConduitT () a m ())
243 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
246 flow :: forall env err m a c.
253 -> Either CorpusName [CorpusId]
255 -> Maybe FlowSocialListWith
256 -> (Maybe Integer, ConduitT () a m ())
259 flow c u cn la mfslw (mLength, docsC) logStatus = do
260 -- TODO if public insertMasterDocs else insertUserDocs
262 zipSources (yieldMany [1..]) docsC
265 -- ids <- traverse (\(idx, doc) -> do
266 -- id <- insertMasterDocs c la doc
267 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
268 -- , _scst_failed = Just 0
269 -- , _scst_remaining = Just $ length docs - idx
270 -- , _scst_events = Just []
273 -- ) (zip [1..] docs)
274 flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
277 insertDoc :: (Integer, a) -> m NodeId
278 insertDoc (idx, doc) = do
279 id <- insertMasterDocs c la [doc]
283 logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
284 , _scst_failed = Just 0
285 , _scst_remaining = Just $ fromIntegral $ len - idx
286 , _scst_events = Just []
288 pure $ Prelude.head id
292 ------------------------------------------------------------------------
293 flowCorpusUser :: ( FlowCmdM env err m
298 -> Either CorpusName [CorpusId]
301 -> Maybe FlowSocialListWith
303 flowCorpusUser l user corpusName ctype ids mfslw = do
305 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
306 -- NodeTexts is first
307 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
308 -- printDebug "NodeTexts: " tId
310 -- NodeList is second
311 listId <- getOrMkList userCorpusId userId
312 -- _cooc <- insertDefaultNode NodeListCooc listId userId
313 -- TODO: check if present already, ignore
314 _ <- Doc.add userCorpusId ids
316 -- printDebug "Node Text Ids:" tId
319 (masterUserId, _masterRootId, masterCorpusId)
320 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
322 --let gp = (GroupParams l 2 3 (StopSize 3))
323 -- Here the PosTagAlgo should be chosen according the Lang
324 let gp = GroupWithPosTag l CoreNLP HashMap.empty
325 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
327 -- printDebug "flowCorpusUser:ngs" ngs
329 _userListId <- flowList_DbRepo listId ngs
330 _mastListId <- getOrMkList masterCorpusId masterUserId
331 -- _ <- insertOccsUpdates userCorpusId mastListId
332 -- printDebug "userListId" userListId
334 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
335 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
336 --_ <- mkPhylo userCorpusId userId
338 -- _ <- mkAnnuaire rootUserId userId
339 _ <- updateNgramsOccurrences userCorpusId (Just listId)
344 insertMasterDocs :: ( FlowCmdM env err m
352 insertMasterDocs c lang hs = do
353 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
354 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId masterCorpusId) hs )
355 _ <- Doc.add masterCorpusId ids'
357 -- create a corpus with database name (CSV or PubMed)
358 -- add documents to the corpus (create node_node link)
359 -- this will enable global database monitoring
361 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
362 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
364 <$> documentIdWithNgrams
365 (extractNgramsT $ withLang lang documentsWithId)
368 lId <- getOrMkList masterCorpusId masterUserId
369 _ <- saveDocNgramsWith lId mapNgramsDocs'
371 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
374 saveDocNgramsWith :: ( FlowCmdM env err m)
376 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
378 saveDocNgramsWith lId mapNgramsDocs' = do
379 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
380 --printDebug "terms2id" terms2id
382 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
385 mapCgramsId <- listInsertDb lId toNodeNgramsW'
386 $ map (first _ngramsTerms . second Map.keys)
387 $ HashMap.toList mapNgramsDocs
389 --printDebug "saveDocNgramsWith" mapCgramsId
391 _return <- insertContextNodeNgrams2
392 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
393 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
394 <*> Just (fromIntegral w :: Double)
395 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
396 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
397 , (nId, w) <- Map.toList mapNodeIdWeight
401 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
406 ------------------------------------------------------------------------
407 -- TODO Type NodeDocumentUnicised
408 insertDocs :: ( FlowCmdM env err m
415 -> m ([ContextId], [Indexed ContextId a])
416 insertDocs uId cId hs = do
417 let docs = map addUniqId hs
418 newIds <- insertDb uId cId docs
419 -- printDebug "newIds" newIds
421 newIds' = map reId newIds
422 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
423 _ <- Doc.add cId newIds'
424 pure (newIds', documentsWithId)
427 ------------------------------------------------------------------------
428 viewUniqId' :: UniqId a
431 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
433 err = panic "[ERROR] Database.Flow.toInsert"
436 toInserted :: [ReturnId]
439 Map.fromList . map (\r -> (reUniqId r, r) )
440 . filter (\r -> reInserted r == True)
442 mergeData :: Map Hash ReturnId
444 -> [Indexed NodeId a]
445 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
447 toDocumentWithId (sha,hpd) =
448 Indexed <$> fmap reId (lookup sha rs)
451 ------------------------------------------------------------------------
452 ------------------------------------------------------------------------
453 ------------------------------------------------------------------------
454 documentIdWithNgrams :: HasNodeError err
456 -> Cmd err (HashMap b (Map NgramsType Int)))
457 -> [Indexed NodeId a]
458 -> Cmd err [DocumentIdWithNgrams a b]
459 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
461 toDocumentIdWithNgrams d = do
463 pure $ DocumentIdWithNgrams d e
466 -- | TODO check optimization
467 mapNodeIdNgrams :: (Ord b, Hashable b)
468 => [DocumentIdWithNgrams a b]
473 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith (+))) . fmap f
475 f :: DocumentIdWithNgrams a b
476 -> HashMap b (Map NgramsType (Map NodeId Int))
477 f d = fmap (fmap (Map.singleton nId)) $ documentNgrams d
479 nId = _index $ documentWithId d
482 ------------------------------------------------------------------------
483 instance ExtractNgramsT HyperdataContact
485 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
487 extract :: TermType Lang -> HyperdataContact
488 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
490 let authors = map text2ngrams
491 $ maybe ["Nothing"] (\a -> [a])
492 $ view (hc_who . _Just . cw_lastName) hc'
494 pure $ HashMap.fromList $ [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
497 instance ExtractNgramsT HyperdataDocument
499 extractNgramsT :: TermType Lang
501 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
502 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
504 extractNgramsT' :: TermType Lang
506 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
507 extractNgramsT' lang' doc = do
508 let source = text2ngrams
509 $ maybe "Nothing" identity
512 institutes = map text2ngrams
513 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
516 authors = map text2ngrams
517 $ maybe ["Nothing"] (T.splitOn ", ")
520 terms' <- map (enrichedTerms (lang' ^. tt_lang) CoreNLP NP)
522 <$> liftBase (extractTerms lang' $ hasText doc)
524 pure $ HashMap.fromList
525 $ [(SimpleNgrams source, Map.singleton Sources 1) ]
526 <> [(SimpleNgrams i', Map.singleton Institutes 1) | i' <- institutes ]
527 <> [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
528 <> [(EnrichedNgrams t', Map.singleton NgramsTerms 1) | t' <- terms' ]
530 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
532 extractNgramsT l (Node _ _ _ _ _ _ _ h) = extractNgramsT l h
534 instance HasText a => HasText (Node a)
536 hasText (Node _ _ _ _ _ _ _ h) = hasText h
540 -- | TODO putelsewhere
541 -- | Upgrade function
542 -- Suppose all documents are English (this is the case actually)
543 indexAllDocumentsWithPosTag :: FlowCmdM env err m
545 indexAllDocumentsWithPosTag = do
546 rootId <- getRootId (UserName userMaster)
547 corpusIds <- findNodesId rootId [NodeCorpus]
548 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
549 _ <- mapM extractInsert (splitEvery 1000 docs)
552 extractInsert :: FlowCmdM env err m
553 => [Node HyperdataDocument] -> m ()
554 extractInsert docs = do
555 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
556 mapNgramsDocs' <- mapNodeIdNgrams
557 <$> documentIdWithNgrams
558 (extractNgramsT $ withLang (Multi EN) documentsWithId)
560 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'