2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
40 , getOrMk_RootWithCorpus
46 , indexAllDocumentsWithPosTag
51 import Control.Lens ((^.), view, _Just, makeLenses, over, traverse)
52 import Data.Aeson.TH (deriveJSON)
53 import Data.Conduit.Internal (zipSources)
55 import Data.HashMap.Strict (HashMap)
56 import Data.Hashable (Hashable)
57 import Data.List (concat)
58 import Data.Map (Map, lookup)
59 import Data.Maybe (catMaybes)
62 import qualified Data.Text as T
63 import Data.Tuple.Extra (first, second)
64 import GHC.Generics (Generic)
65 import Servant.Client (ClientError)
66 import System.FilePath (FilePath)
67 import qualified Data.HashMap.Strict as HashMap
68 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
69 import qualified Data.Map as Map
70 import qualified Data.Conduit.List as CL
71 import qualified Data.Conduit as C
73 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
74 import Gargantext.Core (Lang(..), PosTagAlgo(..))
75 import Gargantext.Core.Ext.IMT (toSchoolName)
76 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
77 import Gargantext.Core.Flow.Types
78 import Gargantext.Core.Text
79 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
80 import Gargantext.Core.Text.List (buildNgramsLists)
81 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
82 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
83 import Gargantext.Core.Text.Terms
84 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
85 import Gargantext.Core.Types (POS(NP), TermsCount)
86 import Gargantext.Core.Types.Individu (User(..))
87 import Gargantext.Core.Types.Main
88 import Gargantext.Core.Utils (addTuples)
89 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
90 import Gargantext.Database.Action.Flow.List
91 import Gargantext.Database.Action.Flow.Types
92 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
93 import Gargantext.Database.Action.Search (searchDocInDatabase)
94 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
95 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
96 import Gargantext.Database.Admin.Types.Hyperdata
97 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
98 import Gargantext.Database.Prelude
99 import Gargantext.Database.Query.Table.ContextNodeNgrams2
100 import Gargantext.Database.Query.Table.Ngrams
101 import Gargantext.Database.Query.Table.Node
102 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
103 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
104 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
105 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
106 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
107 import Gargantext.Database.Types
108 import Gargantext.Prelude
109 import Gargantext.Prelude.Crypto.Hash (Hash)
110 import qualified Gargantext.Core.Text.Corpus.API as API
111 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
112 import qualified Prelude
114 ------------------------------------------------------------------------
115 -- Imports for upgrade function
116 import Gargantext.Database.Query.Tree.Root (getRootId)
117 import Gargantext.Database.Query.Tree (findNodesId)
118 import qualified Data.List as List
119 ------------------------------------------------------------------------
120 -- TODO use internal with API name (could be old data)
121 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
122 | ExternalOrigin { _do_api :: API.ExternalAPIs }
124 deriving (Generic, Eq)
126 makeLenses ''DataOrigin
127 deriveJSON (unPrefix "_do_") ''DataOrigin
128 instance ToSchema DataOrigin where
129 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
131 allDataOrigins :: [DataOrigin]
132 allDataOrigins = map InternalOrigin API.externalAPIs
133 <> map ExternalOrigin API.externalAPIs
136 data DataText = DataOld ![NodeId]
137 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
138 --- | DataNew ![[HyperdataDocument]]
140 -- Show instance is not possible because of IO
141 printDataText :: DataText -> IO ()
142 printDataText (DataOld xs) = putStrLn $ show xs
143 printDataText (DataNew (maybeInt, conduitData)) = do
144 res <- C.runConduit (conduitData .| CL.consume)
145 putStrLn $ show (maybeInt, res)
147 -- TODO use the split parameter in config file
148 getDataText :: FlowCmdM env err m
153 -> m (Either ClientError DataText)
154 getDataText (ExternalOrigin api) la q li = liftBase $ do
155 eRes <- API.get api (_tt_lang la) q li
156 pure $ DataNew <$> eRes
158 getDataText (InternalOrigin _) _la q _li = do
159 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
160 (UserName userMaster)
162 (Nothing :: Maybe HyperdataCorpus)
163 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
164 pure $ Right $ DataOld ids
166 getDataText_Debug :: FlowCmdM env err m
172 getDataText_Debug a l q li = do
173 result <- getDataText a l q li
175 Left err -> liftBase $ putStrLn $ show err
176 Right res -> liftBase $ printDataText res
179 -------------------------------------------------------------------------------
180 flowDataText :: forall env err m.
187 -> Maybe FlowSocialListWith
190 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
192 corpusType = (Nothing :: Maybe HyperdataCorpus)
193 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
194 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
196 ------------------------------------------------------------------------
198 flowAnnuaire :: (FlowCmdM env err m)
200 -> Either CorpusName [CorpusId]
205 flowAnnuaire u n l filePath logStatus = do
206 -- TODO Conduit for file
207 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
208 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
210 ------------------------------------------------------------------------
211 flowCorpusFile :: (FlowCmdM env err m)
213 -> Either CorpusName [CorpusId]
214 -> Limit -- Limit the number of docs (for dev purpose)
219 -> Maybe FlowSocialListWith
222 flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
223 eParsed <- liftBase $ parseFile ft ff fp
226 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
227 --let docs = splitEvery 500 $ take l parsed
228 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
229 Left e -> panic $ "Error: " <> T.pack e
231 ------------------------------------------------------------------------
232 -- | TODO improve the needed type to create/update a corpus
233 -- (For now, Either is enough)
234 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
236 -> Either CorpusName [CorpusId]
238 -> Maybe FlowSocialListWith
239 -> (Maybe Integer, ConduitT () a m ())
242 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
245 flow :: forall env err m a c.
252 -> Either CorpusName [CorpusId]
254 -> Maybe FlowSocialListWith
255 -> (Maybe Integer, ConduitT () a m ())
258 flow c u cn la mfslw (mLength, docsC) logStatus = do
259 -- TODO if public insertMasterDocs else insertUserDocs
260 ids <- runConduit $ zipSources (yieldMany [1..]) docsC
263 -- ids <- traverse (\(idx, doc) -> do
264 -- id <- insertMasterDocs c la doc
265 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
266 -- , _scst_failed = Just 0
267 -- , _scst_remaining = Just $ length docs - idx
268 -- , _scst_events = Just []
271 -- ) (zip [1..] docs)
272 flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
275 insertDoc :: (Integer, a) -> m NodeId
276 insertDoc (idx, doc) = do
277 id <- insertMasterDocs c la [doc]
281 logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
282 , _scst_failed = Just 0
283 , _scst_remaining = Just $ fromIntegral $ len - idx
284 , _scst_events = Just []
286 pure $ Prelude.head id
290 ------------------------------------------------------------------------
291 flowCorpusUser :: ( FlowCmdM env err m
296 -> Either CorpusName [CorpusId]
299 -> Maybe FlowSocialListWith
301 flowCorpusUser l user corpusName ctype ids mfslw = do
303 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
304 -- NodeTexts is first
305 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
306 -- printDebug "NodeTexts: " tId
308 -- NodeList is second
309 listId <- getOrMkList userCorpusId userId
310 -- _cooc <- insertDefaultNode NodeListCooc listId userId
311 -- TODO: check if present already, ignore
312 _ <- Doc.add userCorpusId ids
314 -- printDebug "Node Text Ids:" tId
317 (masterUserId, _masterRootId, masterCorpusId)
318 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
320 --let gp = (GroupParams l 2 3 (StopSize 3))
321 -- Here the PosTagAlgo should be chosen according to the Lang
322 let gp = GroupWithPosTag l CoreNLP HashMap.empty
323 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
325 -- printDebug "flowCorpusUser:ngs" ngs
327 _userListId <- flowList_DbRepo listId ngs
328 _mastListId <- getOrMkList masterCorpusId masterUserId
329 -- _ <- insertOccsUpdates userCorpusId mastListId
330 -- printDebug "userListId" userListId
332 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
333 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
334 --_ <- mkPhylo userCorpusId userId
336 -- _ <- mkAnnuaire rootUserId userId
337 _ <- updateNgramsOccurrences userCorpusId (Just listId)
342 insertMasterDocs :: ( FlowCmdM env err m
350 insertMasterDocs c lang hs = do
351 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
352 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
353 _ <- Doc.add masterCorpusId ids'
355 -- create a corpus with database name (CSV or PubMed)
356 -- add documents to the corpus (create node_node link)
357 -- this will enable global database monitoring
359 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
360 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
362 <$> documentIdWithNgrams
363 (extractNgramsT $ withLang lang documentsWithId)
366 lId <- getOrMkList masterCorpusId masterUserId
367 -- _ <- saveDocNgramsWith lId mapNgramsDocs'
368 _ <- saveDocNgramsWith lId mapNgramsDocs'
370 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
373 saveDocNgramsWith :: (FlowCmdM env err m)
375 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
377 saveDocNgramsWith lId mapNgramsDocs' = do
378 --printDebug "[saveDocNgramsWith] mapNgramsDocs'" mapNgramsDocs'
379 let mapNgramsDocsNoCount = over (traverse . traverse . traverse) fst mapNgramsDocs'
380 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocsNoCount
382 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
385 mapCgramsId <- listInsertDb lId toNodeNgramsW'
386 $ map (first _ngramsTerms . second Map.keys)
387 $ HashMap.toList mapNgramsDocs
389 --printDebug "saveDocNgramsWith" mapCgramsId
391 _return <- insertContextNodeNgrams2
392 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
393 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
394 <*> Just (fromIntegral w :: Double)
395 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
396 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
397 , (nId, (w, _cnt)) <- Map.toList mapNodeIdWeight
401 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
406 ------------------------------------------------------------------------
407 -- TODO Type NodeDocumentUnicised
408 insertDocs :: ( FlowCmdM env err m
415 -> m ([ContextId], [Indexed ContextId a])
416 insertDocs uId cId hs = do
417 let docs = map addUniqId hs
418 newIds <- insertDb uId Nothing docs
419 -- printDebug "newIds" newIds
421 newIds' = map reId newIds
422 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
423 _ <- Doc.add cId newIds'
424 pure (newIds', documentsWithId)
427 ------------------------------------------------------------------------
428 viewUniqId' :: UniqId a
431 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
433 err = panic "[ERROR] Database.Flow.toInsert"
436 toInserted :: [ReturnId]
439 Map.fromList . map (\r -> (reUniqId r, r) )
440 . filter (\r -> reInserted r == True)
442 mergeData :: Map Hash ReturnId
444 -> [Indexed NodeId a]
445 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
447 toDocumentWithId (sha,hpd) =
448 Indexed <$> fmap reId (lookup sha rs)
451 ------------------------------------------------------------------------
452 ------------------------------------------------------------------------
453 ------------------------------------------------------------------------
454 documentIdWithNgrams :: HasNodeError err
456 -> Cmd err (HashMap b (Map NgramsType Int, TermsCount)))
457 -> [Indexed NodeId a]
458 -> Cmd err [DocumentIdWithNgrams a b]
459 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
461 toDocumentIdWithNgrams d = do
463 pure $ DocumentIdWithNgrams d e
466 -- | TODO check optimization
467 mapNodeIdNgrams :: (Ord b, Hashable b)
468 => [DocumentIdWithNgrams a b]
471 (Map NodeId (Int, TermsCount))
473 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith addTuples)) . fmap f
475 -- | NOTE We are somehow multiplying 'TermsCount' here: If the
476 -- same ngrams term has different ngrams types, the 'TermsCount'
477 -- for it (which is the number of times the terms appears in a
478 -- document) is copied over to all its types.
479 f :: DocumentIdWithNgrams a b
480 -> HashMap b (Map NgramsType (Map NodeId (Int, TermsCount)))
481 f d = fmap (\(ngramsTypeMap, cnt) -> fmap (\i -> Map.singleton nId (i, cnt)) ngramsTypeMap) $ documentNgrams d
483 nId = _index $ documentWithId d
486 ------------------------------------------------------------------------
487 instance ExtractNgramsT HyperdataContact
489 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
491 extract :: TermType Lang -> HyperdataContact
492 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
494 let authors = map text2ngrams
495 $ maybe ["Nothing"] (\a -> [a])
496 $ view (hc_who . _Just . cw_lastName) hc'
498 pure $ HashMap.fromList $ [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
501 instance ExtractNgramsT HyperdataDocument
503 extractNgramsT :: TermType Lang
505 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
506 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
508 extractNgramsT' :: TermType Lang
510 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
511 extractNgramsT' lang' doc = do
512 let source = text2ngrams
513 $ maybe "Nothing" identity
516 institutes = map text2ngrams
517 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
520 authors = map text2ngrams
521 $ maybe ["Nothing"] (T.splitOn ", ")
524 termsWithCounts' <- map (\(t, cnt) -> (enrichedTerms (lang' ^. tt_lang) CoreNLP NP t, cnt))
526 <$> liftBase (extractTerms lang' $ hasText doc)
528 pure $ HashMap.fromList
529 $ [(SimpleNgrams source, (Map.singleton Sources 1, 1)) ]
530 <> [(SimpleNgrams i', (Map.singleton Institutes 1, 1)) | i' <- institutes ]
531 <> [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
532 <> [(EnrichedNgrams t', (Map.singleton NgramsTerms 1, cnt')) | (t', cnt') <- termsWithCounts' ]
534 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
536 extractNgramsT l (Node { _node_hyperdata = h }) = extractNgramsT l h
538 instance HasText a => HasText (Node a)
540 hasText (Node { _node_hyperdata = h }) = hasText h
544 -- | TODO putelsewhere
545 -- | Upgrade function
546 -- Suppose all documents are English (this is the case actually)
547 indexAllDocumentsWithPosTag :: FlowCmdM env err m
549 indexAllDocumentsWithPosTag = do
550 rootId <- getRootId (UserName userMaster)
551 corpusIds <- findNodesId rootId [NodeCorpus]
552 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
553 _ <- mapM extractInsert (splitEvery 1000 docs)
556 extractInsert :: FlowCmdM env err m
557 => [Node HyperdataDocument] -> m ()
558 extractInsert docs = do
559 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
560 mapNgramsDocs' <- mapNodeIdNgrams
561 <$> documentIdWithNgrams
562 (extractNgramsT $ withLang (Multi EN) documentsWithId)
564 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'