2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
41 , getOrMk_RootWithCorpus
47 , indexAllDocumentsWithPosTag
52 import Control.Lens ((^.), view, _Just, makeLenses, over, traverse)
53 import Control.Monad.Reader (MonadReader)
54 import Data.Aeson.TH (deriveJSON)
55 import Data.Conduit.Internal (zipSources)
56 import qualified Data.Conduit.List as CList
58 import Data.HashMap.Strict (HashMap)
59 import Data.Hashable (Hashable)
60 import Data.List (concat)
61 import Data.Map.Strict (Map, lookup)
62 import Data.Maybe (catMaybes)
65 import qualified Data.Text as T
66 import Data.Tuple.Extra (first, second)
67 import GHC.Generics (Generic)
68 import System.FilePath (FilePath)
69 import qualified Data.HashMap.Strict as HashMap
70 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
71 import qualified Data.Map.Strict as Map
72 import qualified Data.Conduit.List as CL
73 import qualified Data.Conduit as C
75 import Gargantext.Core (Lang(..), PosTagAlgo(..))
76 -- import Gargantext.Core.Ext.IMT (toSchoolName)
77 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
78 import Gargantext.Core.Flow.Types
79 import Gargantext.Core.NLP (nlpServerGet)
80 import Gargantext.Core.Text
81 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType, splitOn)
82 import Gargantext.Core.Text.List (buildNgramsLists)
83 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
84 import Gargantext.Core.Text.List.Social (FlowSocialListWith(..))
85 import Gargantext.Core.Text.Terms
86 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
87 import Gargantext.Core.Types (POS(NP), TermsCount)
88 import Gargantext.Core.Types.Individu (User(..))
89 import Gargantext.Core.Types.Main
90 import Gargantext.Core.Types.Query (Limit)
91 import Gargantext.Core.Utils (addTuples)
92 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
93 import Gargantext.Database.Action.Flow.List
94 import Gargantext.Database.Action.Flow.Types
95 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
96 import Gargantext.Database.Action.Search (searchDocInDatabase)
97 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
98 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
99 import Gargantext.Database.Admin.Types.Hyperdata
100 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
101 import Gargantext.Database.Prelude
102 import Gargantext.Database.Query.Table.ContextNodeNgrams2
103 import Gargantext.Database.Query.Table.Ngrams
104 import Gargantext.Database.Query.Table.Node
105 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
106 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
107 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
108 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
109 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
110 import Gargantext.Database.Types
111 import Gargantext.Prelude
112 import Gargantext.Prelude.Crypto.Hash (Hash)
113 import Gargantext.Utils.Jobs (JobHandle, MonadJobStatus(..))
114 import qualified Gargantext.Core.Text.Corpus.API as API
115 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
116 --import qualified Prelude
118 ------------------------------------------------------------------------
119 -- Imports for upgrade function
120 import Gargantext.Database.Query.Tree.Root (getRootId)
121 import Gargantext.Database.Query.Tree (findNodesId)
122 import qualified Data.List as List
123 ------------------------------------------------------------------------
124 -- TODO use internal with API name (could be old data)
125 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
126 | ExternalOrigin { _do_api :: API.ExternalAPIs }
128 deriving (Generic, Eq)
130 makeLenses ''DataOrigin
131 deriveJSON (unPrefix "_do_") ''DataOrigin
132 instance ToSchema DataOrigin where
133 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
135 allDataOrigins :: ( MonadReader env m
136 , HasConfig env) => m [DataOrigin]
138 ext <- API.externalAPIs
140 pure $ map InternalOrigin ext
141 <> map ExternalOrigin ext
144 data DataText = DataOld ![NodeId]
145 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
146 --- | DataNew ![[HyperdataDocument]]
148 -- Show instance is not possible because of IO
149 printDataText :: DataText -> IO ()
150 printDataText (DataOld xs) = putStrLn $ show xs
151 printDataText (DataNew (maybeInt, conduitData)) = do
152 res <- C.runConduit (conduitData .| CL.consume)
153 putStrLn $ show (maybeInt, res)
155 -- TODO use the split parameter in config file
156 getDataText :: FlowCmdM env err m
161 -> m (Either API.GetCorpusError DataText)
162 getDataText (ExternalOrigin api) la q li = do
163 -- cfg <- view $ hasConfig
164 -- DEPRECATED: Use apiKey per user instead (not the global one)
165 eRes <- liftBase $ API.get api (_tt_lang la) q li
166 pure $ DataNew <$> eRes
168 getDataText (InternalOrigin _) _la q _li = do
169 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
170 (UserName userMaster)
172 (Nothing :: Maybe HyperdataCorpus)
173 ids <- map fst <$> searchDocInDatabase cId (stemIt $ API.getRawQuery q)
174 pure $ Right $ DataOld ids
176 getDataText_Debug :: FlowCmdM env err m
182 getDataText_Debug a l q li = do
183 result <- getDataText a l q li
185 Left err -> liftBase $ putStrLn $ show err
186 Right res -> liftBase $ printDataText res
189 -------------------------------------------------------------------------------
190 flowDataText :: forall env err m.
198 -> Maybe FlowSocialListWith
201 flowDataText u (DataOld ids) tt cid mfslw _ = do
202 (_userId, userCorpusId, listId) <- createNodes u (Right [cid]) corpusType
203 _ <- Doc.add userCorpusId ids
204 flowCorpusUser (_tt_lang tt) u userCorpusId listId corpusType mfslw
206 corpusType = (Nothing :: Maybe HyperdataCorpus)
207 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw jobHandle =
208 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) jobHandle
210 ------------------------------------------------------------------------
212 flowAnnuaire :: (FlowCmdM env err m, MonadJobStatus m)
214 -> Either CorpusName [CorpusId]
219 flowAnnuaire u n l filePath jobHandle = do
220 -- TODO Conduit for file
221 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
222 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) jobHandle
224 ------------------------------------------------------------------------
225 flowCorpusFile :: (FlowCmdM env err m, MonadJobStatus m)
227 -> Either CorpusName [CorpusId]
228 -> Limit -- Limit the number of docs (for dev purpose)
233 -> Maybe FlowSocialListWith
236 flowCorpusFile u n _l la ft ff fp mfslw jobHandle = do
237 eParsed <- liftBase $ parseFile ft ff fp
240 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) jobHandle
241 --let docs = splitEvery 500 $ take l parsed
242 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
243 Left e -> panic $ "Error: " <> T.pack e
245 ------------------------------------------------------------------------
246 -- | TODO improve the needed type to create/update a corpus
247 -- (For now, Either is enough)
248 flowCorpus :: (FlowCmdM env err m, FlowCorpus a, MonadJobStatus m)
250 -> Either CorpusName [CorpusId]
252 -> Maybe FlowSocialListWith
253 -> (Maybe Integer, ConduitT () a m ())
256 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
259 flow :: forall env err m a c.
267 -> Either CorpusName [CorpusId]
269 -> Maybe FlowSocialListWith
270 -> (Maybe Integer, ConduitT () a m ())
273 flow c u cn la mfslw (mLength, docsC) jobHandle = do
274 (_userId, userCorpusId, listId) <- createNodes u cn c
275 -- TODO if public insertMasterDocs else insertUserDocs
276 _ <- runConduit $ zipSources (yieldMany [1..]) docsC
277 .| CList.chunksOf 100
279 .| mapM_C (\ids' -> do
280 _ <- Doc.add userCorpusId ids'
284 _ <- flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw
286 -- ids <- traverse (\(idx, doc) -> do
287 -- id <- insertMasterDocs c la doc
288 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
289 -- , _scst_failed = Just 0
290 -- , _scst_remaining = Just $ length docs - idx
291 -- , _scst_events = Just []
294 -- ) (zip [1..] docs)
295 --printDebug "[flow] calling flowCorpusUser" (0 :: Int)
297 --flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
300 insertDocs' :: [(Integer, a)] -> m [NodeId]
301 insertDocs' [] = pure []
302 insertDocs' docs = do
303 -- printDebug "[flow] calling insertDoc, ([idx], mLength) = " (fst <$> docs, mLength)
304 ids <- insertMasterDocs c la (snd <$> docs)
305 let maxIdx = maximum (fst <$> docs)
310 let succeeded = fromIntegral (1 + maxIdx)
311 -- let remaining = fromIntegral (len - maxIdx)
312 -- Reconstruct the correct update state by using 'markStarted' and the other primitives.
313 -- We do this slightly awkward arithmetic such that when we call 'markProgress' we reduce
314 -- the number of 'remaining' of exactly '1 + maxIdx', and we will end up with a 'JobLog'
315 -- looking like this:
317 -- { _scst_succeeded = Just $ fromIntegral $ 1 + maxIdx
318 -- , _scst_failed = Just 0
319 -- , _scst_remaining = Just $ fromIntegral $ len - maxIdx
320 -- , _scst_events = Just []
322 -- markStarted (remaining + succeeded) jobHandle
323 markProgress succeeded jobHandle
329 ------------------------------------------------------------------------
330 createNodes :: ( FlowCmdM env err m
334 -> Either CorpusName [CorpusId]
336 -> m (UserId, CorpusId, ListId)
337 createNodes user corpusName ctype = do
339 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
340 -- NodeTexts is first
341 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
342 -- printDebug "NodeTexts: " tId
344 -- NodeList is second
345 listId <- getOrMkList userCorpusId userId
348 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
349 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
351 pure (userId, userCorpusId, listId)
354 flowCorpusUser :: ( FlowCmdM env err m
362 -> Maybe FlowSocialListWith
364 flowCorpusUser l user userCorpusId listId ctype mfslw = do
365 server <- view (nlpServerGet l)
367 (masterUserId, _masterRootId, masterCorpusId)
368 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
370 --let gp = (GroupParams l 2 3 (StopSize 3))
371 -- Here the PosTagAlgo should be chosen according to the Lang
373 (Just (NoList _)) -> do
374 -- printDebug "Do not build list" mfslw
377 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw
378 $ GroupWithPosTag l server HashMap.empty
380 -- printDebug "flowCorpusUser:ngs" ngs
382 _userListId <- flowList_DbRepo listId ngs
383 _mastListId <- getOrMkList masterCorpusId masterUserId
385 -- _ <- insertOccsUpdates userCorpusId mastListId
386 -- printDebug "userListId" userListId
387 --_ <- mkPhylo userCorpusId userId
389 -- _ <- mkAnnuaire rootUserId userId
390 _ <- updateNgramsOccurrences userCorpusId (Just listId)
395 insertMasterDocs :: ( FlowCmdM env err m
403 insertMasterDocs c lang hs = do
404 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
405 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
406 _ <- Doc.add masterCorpusId ids'
408 -- create a corpus with database name (CSV or PubMed)
409 -- add documents to the corpus (create node_node link)
410 -- this will enable global database monitoring
412 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
413 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
415 <$> documentIdWithNgrams
416 (extractNgramsT $ withLang lang documentsWithId)
419 lId <- getOrMkList masterCorpusId masterUserId
420 -- _ <- saveDocNgramsWith lId mapNgramsDocs'
421 _ <- saveDocNgramsWith lId mapNgramsDocs'
423 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
426 saveDocNgramsWith :: (FlowCmdM env err m)
428 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
430 saveDocNgramsWith lId mapNgramsDocs' = do
431 --printDebug "[saveDocNgramsWith] mapNgramsDocs'" mapNgramsDocs'
432 let mapNgramsDocsNoCount = over (traverse . traverse . traverse) fst mapNgramsDocs'
433 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocsNoCount
435 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
438 mapCgramsId <- listInsertDb lId toNodeNgramsW'
439 $ map (first _ngramsTerms . second Map.keys)
440 $ HashMap.toList mapNgramsDocs
442 --printDebug "saveDocNgramsWith" mapCgramsId
444 let ngrams2insert = catMaybes [ ContextNodeNgrams2 <$> Just nId
445 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
446 <*> Just (fromIntegral w :: Double)
447 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
448 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
449 , (nId, (w, _cnt)) <- Map.toList mapNodeIdWeight
451 -- printDebug "Ngrams2Insert" ngrams2insert
452 _return <- insertContextNodeNgrams2 ngrams2insert
455 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
460 ------------------------------------------------------------------------
461 -- TODO Type NodeDocumentUnicised
462 insertDocs :: ( FlowCmdM env err m
469 -> m ([ContextId], [Indexed ContextId a])
470 insertDocs uId cId hs = do
471 let docs = map addUniqId hs
472 newIds <- insertDb uId Nothing docs
473 -- printDebug "newIds" newIds
475 newIds' = map reId newIds
476 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
477 _ <- Doc.add cId newIds'
478 pure (newIds', documentsWithId)
481 ------------------------------------------------------------------------
482 viewUniqId' :: UniqId a
485 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
487 err = panic "[ERROR] Database.Flow.toInsert"
490 toInserted :: [ReturnId]
493 Map.fromList . map (\r -> (reUniqId r, r) )
494 . filter (\r -> reInserted r == True)
496 mergeData :: Map Hash ReturnId
498 -> [Indexed NodeId a]
499 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
501 toDocumentWithId (sha,hpd) =
502 Indexed <$> fmap reId (lookup sha rs)
505 ------------------------------------------------------------------------
506 ------------------------------------------------------------------------
507 ------------------------------------------------------------------------
508 documentIdWithNgrams :: HasNodeError err
510 -> Cmd err (HashMap b (Map NgramsType Int, TermsCount)))
511 -> [Indexed NodeId a]
512 -> Cmd err [DocumentIdWithNgrams a b]
513 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
515 toDocumentIdWithNgrams d = do
517 pure $ DocumentIdWithNgrams d e
520 -- | TODO check optimization
521 mapNodeIdNgrams :: (Ord b, Hashable b)
522 => [DocumentIdWithNgrams a b]
525 (Map NodeId (Int, TermsCount))
527 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith addTuples)) . fmap f
529 -- | NOTE We are somehow multiplying 'TermsCount' here: If the
530 -- same ngrams term has different ngrams types, the 'TermsCount'
531 -- for it (which is the number of times the terms appears in a
532 -- document) is copied over to all its types.
533 f :: DocumentIdWithNgrams a b
534 -> HashMap b (Map NgramsType (Map NodeId (Int, TermsCount)))
535 f d = fmap (\(ngramsTypeMap, cnt) -> fmap (\i -> Map.singleton nId (i, cnt)) ngramsTypeMap) $ documentNgrams d
537 nId = _index $ documentWithId d
540 ------------------------------------------------------------------------
541 instance ExtractNgramsT HyperdataContact
543 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
545 extract :: TermType Lang -> HyperdataContact
546 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
548 let authors = map text2ngrams
549 $ maybe ["Nothing"] (\a -> [a])
550 $ view (hc_who . _Just . cw_lastName) hc'
552 pure $ HashMap.fromList $ [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
555 instance ExtractNgramsT HyperdataDocument
557 extractNgramsT :: TermType Lang
559 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
560 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
562 extractNgramsT' :: TermType Lang
564 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
565 extractNgramsT' lang' doc = do
566 let source = text2ngrams
567 $ maybe "Nothing" identity
570 institutes = map text2ngrams
571 $ maybe ["Nothing"] (splitOn Institutes (doc^. hd_bdd))
574 authors = map text2ngrams
575 $ maybe ["Nothing"] (splitOn Authors (doc^. hd_bdd))
578 ncs <- view (nlpServerGet $ lang' ^. tt_lang)
580 termsWithCounts' <- map (\(t, cnt) -> (enrichedTerms (lang' ^. tt_lang) CoreNLP NP t, cnt))
582 <$> liftBase (extractTerms ncs lang' $ hasText doc)
584 pure $ HashMap.fromList
585 $ [(SimpleNgrams source, (Map.singleton Sources 1, 1)) ]
586 <> [(SimpleNgrams i', (Map.singleton Institutes 1, 1)) | i' <- institutes ]
587 <> [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
588 <> [(EnrichedNgrams t', (Map.singleton NgramsTerms 1, cnt')) | (t', cnt') <- termsWithCounts' ]
590 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
592 extractNgramsT l (Node { _node_hyperdata = h }) = extractNgramsT l h
594 instance HasText a => HasText (Node a)
596 hasText (Node { _node_hyperdata = h }) = hasText h
600 -- | TODO putelsewhere
601 -- | Upgrade function
602 -- Suppose all documents are English (this is the case actually)
603 indexAllDocumentsWithPosTag :: FlowCmdM env err m
605 indexAllDocumentsWithPosTag = do
606 rootId <- getRootId (UserName userMaster)
607 corpusIds <- findNodesId rootId [NodeCorpus]
608 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
609 _ <- mapM extractInsert (splitEvery 1000 docs)
612 extractInsert :: FlowCmdM env err m
613 => [Node HyperdataDocument] -> m ()
614 extractInsert docs = do
615 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
616 mapNgramsDocs' <- mapNodeIdNgrams
617 <$> documentIdWithNgrams
618 (extractNgramsT $ withLang (Multi EN) documentsWithId)
620 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'