2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
39 , getOrMk_RootWithCorpus
45 , indexAllDocumentsWithPosTag
50 import Control.Lens ((^.), view, _Just, makeLenses)
51 import Data.Aeson.TH (deriveJSON)
52 import Data.Conduit.Internal (zipSources)
54 import Data.HashMap.Strict (HashMap)
55 import Data.Hashable (Hashable)
56 import Data.List (concat)
57 import Data.Map (Map, lookup)
58 import Data.Maybe (catMaybes)
61 import qualified Data.Text as T
62 import Data.Traversable (traverse)
63 import Data.Tuple.Extra (first, second)
64 import GHC.Generics (Generic)
65 import Servant.Client (ClientError)
66 import System.FilePath (FilePath)
67 import qualified Data.HashMap.Strict as HashMap
68 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
69 import qualified Data.Map as Map
71 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
72 import Gargantext.Core (Lang(..), PosTagAlgo(..))
73 import Gargantext.Core.Ext.IMT (toSchoolName)
74 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
75 import Gargantext.Core.Flow.Types
76 import Gargantext.Core.Text
77 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
78 import Gargantext.Core.Text.List (buildNgramsLists)
79 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
80 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
81 import Gargantext.Core.Text.Terms
82 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
83 import Gargantext.Core.Types (POS(NP))
84 import Gargantext.Core.Types.Individu (User(..))
85 import Gargantext.Core.Types.Main
86 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
87 import Gargantext.Database.Action.Flow.List
88 import Gargantext.Database.Action.Flow.Types
89 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
90 import Gargantext.Database.Action.Search (searchDocInDatabase)
91 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
92 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
93 import Gargantext.Database.Admin.Types.Hyperdata
94 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
95 import Gargantext.Database.Prelude
96 import Gargantext.Database.Query.Table.ContextNodeNgrams2
97 import Gargantext.Database.Query.Table.Ngrams
98 import Gargantext.Database.Query.Table.Node
99 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
100 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
101 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
102 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
103 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
104 import Gargantext.Database.Types
105 import Gargantext.Prelude
106 import Gargantext.Prelude.Crypto.Hash (Hash)
107 import qualified Gargantext.Core.Text.Corpus.API as API
108 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
109 import qualified Prelude
111 ------------------------------------------------------------------------
112 -- Imports for upgrade function
113 import Gargantext.Database.Query.Tree.Root (getRootId)
114 import Gargantext.Database.Query.Tree (findNodesId)
115 import qualified Data.List as List
116 ------------------------------------------------------------------------
117 -- TODO use internal with API name (could be old data)
118 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
119 | ExternalOrigin { _do_api :: API.ExternalAPIs }
121 deriving (Generic, Eq)
123 makeLenses ''DataOrigin
124 deriveJSON (unPrefix "_do_") ''DataOrigin
125 instance ToSchema DataOrigin where
126 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
128 allDataOrigins :: [DataOrigin]
129 allDataOrigins = map InternalOrigin API.externalAPIs
130 <> map ExternalOrigin API.externalAPIs
133 data DataText = DataOld ![NodeId]
134 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
135 -- | DataNew ![[HyperdataDocument]]
137 -- TODO use the split parameter in config file
138 getDataText :: FlowCmdM env err m
143 -> m (Either ClientError DataText)
144 getDataText (ExternalOrigin api) la q li = liftBase $ do
145 eRes <- API.get api (_tt_lang la) q li
146 pure $ DataNew <$> eRes
148 getDataText (InternalOrigin _) _la q _li = do
149 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
150 (UserName userMaster)
152 (Nothing :: Maybe HyperdataCorpus)
153 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
154 pure $ Right $ DataOld ids
156 -------------------------------------------------------------------------------
157 flowDataText :: forall env err m.
164 -> Maybe FlowSocialListWith
167 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
169 corpusType = (Nothing :: Maybe HyperdataCorpus)
170 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
171 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
173 ------------------------------------------------------------------------
175 flowAnnuaire :: (FlowCmdM env err m)
177 -> Either CorpusName [CorpusId]
182 flowAnnuaire u n l filePath logStatus = do
183 -- TODO Conduit for file
184 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
185 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
187 ------------------------------------------------------------------------
188 flowCorpusFile :: (FlowCmdM env err m)
190 -> Either CorpusName [CorpusId]
191 -> Limit -- Limit the number of docs (for dev purpose)
196 -> Maybe FlowSocialListWith
199 flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
200 eParsed <- liftBase $ parseFile ft ff fp
203 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
204 --let docs = splitEvery 500 $ take l parsed
205 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
206 Left e -> panic $ "Error: " <> T.pack e
208 ------------------------------------------------------------------------
209 -- | TODO improve the needed type to create/update a corpus
210 -- (For now, Either is enough)
211 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
213 -> Either CorpusName [CorpusId]
215 -> Maybe FlowSocialListWith
216 -> (Maybe Integer, ConduitT () a m ())
219 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
222 flow :: forall env err m a c.
229 -> Either CorpusName [CorpusId]
231 -> Maybe FlowSocialListWith
232 -> (Maybe Integer, ConduitT () a m ())
235 flow c u cn la mfslw (mLength, docsC) logStatus = do
236 -- TODO if public insertMasterDocs else insertUserDocs
238 zipSources (yieldMany [1..]) docsC
241 -- ids <- traverse (\(idx, doc) -> do
242 -- id <- insertMasterDocs c la doc
243 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
244 -- , _scst_failed = Just 0
245 -- , _scst_remaining = Just $ length docs - idx
246 -- , _scst_events = Just []
249 -- ) (zip [1..] docs)
250 flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
253 insertDoc :: (Integer, a) -> m NodeId
254 insertDoc (idx, doc) = do
255 id <- insertMasterDocs c la [doc]
259 logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
260 , _scst_failed = Just 0
261 , _scst_remaining = Just $ fromIntegral $ len - idx
262 , _scst_events = Just []
264 pure $ Prelude.head id
268 ------------------------------------------------------------------------
269 flowCorpusUser :: ( FlowCmdM env err m
274 -> Either CorpusName [CorpusId]
277 -> Maybe FlowSocialListWith
279 flowCorpusUser l user corpusName ctype ids mfslw = do
281 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
282 -- NodeTexts is first
283 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
284 -- printDebug "NodeTexts: " tId
286 -- NodeList is second
287 listId <- getOrMkList userCorpusId userId
288 -- _cooc <- insertDefaultNode NodeListCooc listId userId
289 -- TODO: check if present already, ignore
290 _ <- Doc.add userCorpusId ids
292 -- printDebug "Node Text Ids:" tId
295 (masterUserId, _masterRootId, masterCorpusId)
296 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
298 --let gp = (GroupParams l 2 3 (StopSize 3))
299 let gp = GroupWithPosTag l CoreNLP HashMap.empty
300 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
302 -- printDebug "flowCorpusUser:ngs" ngs
304 _userListId <- flowList_DbRepo listId ngs
305 _mastListId <- getOrMkList masterCorpusId masterUserId
306 -- _ <- insertOccsUpdates userCorpusId mastListId
307 -- printDebug "userListId" userListId
309 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
310 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
311 --_ <- mkPhylo userCorpusId userId
313 -- _ <- mkAnnuaire rootUserId userId
314 _ <- updateNgramsOccurrences userCorpusId (Just listId)
319 insertMasterDocs :: ( FlowCmdM env err m
327 insertMasterDocs c lang hs = do
328 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
329 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId masterCorpusId) hs )
330 _ <- Doc.add masterCorpusId ids'
332 -- create a corpus with database name (CSV or PubMed)
333 -- add documents to the corpus (create node_node link)
334 -- this will enable global database monitoring
336 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
337 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
339 <$> documentIdWithNgrams
340 (extractNgramsT $ withLang lang documentsWithId)
343 lId <- getOrMkList masterCorpusId masterUserId
344 _ <- saveDocNgramsWith lId mapNgramsDocs'
346 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
349 saveDocNgramsWith :: ( FlowCmdM env err m)
351 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
353 saveDocNgramsWith lId mapNgramsDocs' = do
354 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
355 --printDebug "terms2id" terms2id
357 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
360 mapCgramsId <- listInsertDb lId toNodeNgramsW'
361 $ map (first _ngramsTerms . second Map.keys)
362 $ HashMap.toList mapNgramsDocs
364 --printDebug "saveDocNgramsWith" mapCgramsId
366 _return <- insertContextNodeNgrams2
367 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
368 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
369 <*> Just (fromIntegral w :: Double)
370 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
371 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
372 , (nId, w) <- Map.toList mapNodeIdWeight
376 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
381 ------------------------------------------------------------------------
382 -- TODO Type NodeDocumentUnicised
383 insertDocs :: ( FlowCmdM env err m
390 -> m ([ContextId], [Indexed ContextId a])
391 insertDocs uId cId hs = do
392 let docs = map addUniqId hs
393 newIds <- insertDb uId cId docs
394 -- printDebug "newIds" newIds
396 newIds' = map reId newIds
397 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
398 _ <- Doc.add cId newIds'
399 pure (newIds', documentsWithId)
402 ------------------------------------------------------------------------
403 viewUniqId' :: UniqId a
406 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
408 err = panic "[ERROR] Database.Flow.toInsert"
411 toInserted :: [ReturnId]
414 Map.fromList . map (\r -> (reUniqId r, r) )
415 . filter (\r -> reInserted r == True)
417 mergeData :: Map Hash ReturnId
419 -> [Indexed NodeId a]
420 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
422 toDocumentWithId (sha,hpd) =
423 Indexed <$> fmap reId (lookup sha rs)
426 ------------------------------------------------------------------------
427 ------------------------------------------------------------------------
428 ------------------------------------------------------------------------
429 documentIdWithNgrams :: HasNodeError err
431 -> Cmd err (HashMap b (Map NgramsType Int)))
432 -> [Indexed NodeId a]
433 -> Cmd err [DocumentIdWithNgrams a b]
434 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
436 toDocumentIdWithNgrams d = do
438 pure $ DocumentIdWithNgrams d e
441 -- | TODO check optimization
442 mapNodeIdNgrams :: (Ord b, Hashable b)
443 => [DocumentIdWithNgrams a b]
448 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith (+))) . fmap f
450 f :: DocumentIdWithNgrams a b
451 -> HashMap b (Map NgramsType (Map NodeId Int))
452 f d = fmap (fmap (Map.singleton nId)) $ documentNgrams d
454 nId = _index $ documentWithId d
457 ------------------------------------------------------------------------
458 instance ExtractNgramsT HyperdataContact
460 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
462 extract :: TermType Lang -> HyperdataContact
463 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
465 let authors = map text2ngrams
466 $ maybe ["Nothing"] (\a -> [a])
467 $ view (hc_who . _Just . cw_lastName) hc'
469 pure $ HashMap.fromList $ [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
472 instance ExtractNgramsT HyperdataDocument
474 extractNgramsT :: TermType Lang
476 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
477 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
479 extractNgramsT' :: TermType Lang
481 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
482 extractNgramsT' lang' doc = do
483 let source = text2ngrams
484 $ maybe "Nothing" identity
487 institutes = map text2ngrams
488 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
491 authors = map text2ngrams
492 $ maybe ["Nothing"] (T.splitOn ", ")
495 terms' <- map (enrichedTerms (lang' ^. tt_lang) CoreNLP NP)
497 <$> liftBase (extractTerms lang' $ hasText doc)
499 pure $ HashMap.fromList
500 $ [(SimpleNgrams source, Map.singleton Sources 1) ]
501 <> [(SimpleNgrams i', Map.singleton Institutes 1) | i' <- institutes ]
502 <> [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
503 <> [(EnrichedNgrams t', Map.singleton NgramsTerms 1) | t' <- terms' ]
505 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
507 extractNgramsT l (Node _ _ _ _ _ _ _ h) = extractNgramsT l h
509 instance HasText a => HasText (Node a)
511 hasText (Node _ _ _ _ _ _ _ h) = hasText h
515 -- | TODO putelsewhere
516 -- | Upgrade function
517 -- Suppose all documents are English (this is the case actually)
518 indexAllDocumentsWithPosTag :: FlowCmdM env err m
520 indexAllDocumentsWithPosTag = do
521 rootId <- getRootId (UserName userMaster)
522 corpusIds <- findNodesId rootId [NodeCorpus]
523 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
524 _ <- mapM extractInsert (splitEvery 1000 docs)
527 extractInsert :: FlowCmdM env err m
528 => [Node HyperdataDocument] -> m ()
529 extractInsert docs = do
530 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
531 mapNgramsDocs' <- mapNodeIdNgrams
532 <$> documentIdWithNgrams
533 (extractNgramsT $ withLang (Multi EN) documentsWithId)
535 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'