2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
41 , getOrMk_RootWithCorpus
47 , indexAllDocumentsWithPosTag
52 import Control.Lens ((^.), view, _Just, makeLenses, over, traverse)
53 import Control.Monad.Reader (MonadReader)
54 import Data.Aeson.TH (deriveJSON)
55 import Data.Conduit.Internal (zipSources)
56 import qualified Data.Conduit.List as CList
58 import Data.HashMap.Strict (HashMap)
59 import Data.Hashable (Hashable)
60 import Data.List (concat)
61 import Data.Map.Strict (Map, lookup)
62 import Data.Maybe (catMaybes)
65 import qualified Data.Text as T
66 import Data.Tuple.Extra (first, second)
67 import GHC.Generics (Generic)
68 import Servant.Client (ClientError)
69 import System.FilePath (FilePath)
70 import qualified Data.HashMap.Strict as HashMap
71 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
72 import qualified Data.Map.Strict as Map
73 import qualified Data.Conduit.List as CL
74 import qualified Data.Conduit as C
76 import Gargantext.Core (Lang(..), PosTagAlgo(..))
77 -- import Gargantext.Core.Ext.IMT (toSchoolName)
78 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
79 import Gargantext.Core.Flow.Types
80 import Gargantext.Core.NLP (nlpServerGet)
81 import Gargantext.Core.Text
82 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType, splitOn)
83 import Gargantext.Core.Text.List (buildNgramsLists)
84 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
85 import Gargantext.Core.Text.List.Social (FlowSocialListWith(..))
86 import Gargantext.Core.Text.Terms
87 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
88 import Gargantext.Core.Types (POS(NP), TermsCount)
89 import Gargantext.Core.Types.Individu (User(..))
90 import Gargantext.Core.Types.Main
91 import Gargantext.Core.Utils (addTuples)
92 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
93 import Gargantext.Database.Action.Flow.List
94 import Gargantext.Database.Action.Flow.Types
95 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
96 import Gargantext.Database.Action.Search (searchDocInDatabase)
97 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
98 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
99 import Gargantext.Database.Admin.Types.Hyperdata
100 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
101 import Gargantext.Database.Prelude
102 import Gargantext.Database.Query.Table.ContextNodeNgrams2
103 import Gargantext.Database.Query.Table.Ngrams
104 import Gargantext.Database.Query.Table.Node
105 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
106 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
107 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
108 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
109 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
110 import Gargantext.Database.Types
111 import Gargantext.Prelude
112 import Gargantext.Prelude.Crypto.Hash (Hash)
113 import Gargantext.Utils.Jobs (JobHandle, MonadJobStatus(..))
114 import qualified Gargantext.Core.Text.Corpus.API as API
115 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
116 --import qualified Prelude
118 ------------------------------------------------------------------------
119 -- Imports for upgrade function
120 import Gargantext.Database.Query.Tree.Root (getRootId)
121 import Gargantext.Database.Query.Tree (findNodesId)
122 import qualified Data.List as List
123 ------------------------------------------------------------------------
124 -- TODO use internal with API name (could be old data)
125 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
126 | ExternalOrigin { _do_api :: API.ExternalAPIs }
128 deriving (Generic, Eq)
130 makeLenses ''DataOrigin
131 deriveJSON (unPrefix "_do_") ''DataOrigin
132 instance ToSchema DataOrigin where
133 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
135 allDataOrigins :: ( MonadReader env m
136 , HasConfig env) => m [DataOrigin]
138 ext <- API.externalAPIs
140 pure $ map InternalOrigin ext
141 <> map ExternalOrigin ext
144 data DataText = DataOld ![NodeId]
145 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
146 --- | DataNew ![[HyperdataDocument]]
148 -- Show instance is not possible because of IO
149 printDataText :: DataText -> IO ()
150 printDataText (DataOld xs) = putStrLn $ show xs
151 printDataText (DataNew (maybeInt, conduitData)) = do
152 res <- C.runConduit (conduitData .| CL.consume)
153 putStrLn $ show (maybeInt, res)
155 -- TODO use the split parameter in config file
156 getDataText :: FlowCmdM env err m
161 -> m (Either ClientError DataText)
162 getDataText (ExternalOrigin api) la q li = liftBase $ do
163 eRes <- API.get api (_tt_lang la) q li
164 pure $ DataNew <$> eRes
166 getDataText (InternalOrigin _) _la q _li = do
167 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
168 (UserName userMaster)
170 (Nothing :: Maybe HyperdataCorpus)
171 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
172 pure $ Right $ DataOld ids
174 getDataText_Debug :: FlowCmdM env err m
180 getDataText_Debug a l q li = do
181 result <- getDataText a l q li
183 Left err -> liftBase $ putStrLn $ show err
184 Right res -> liftBase $ printDataText res
187 -------------------------------------------------------------------------------
188 flowDataText :: forall env err m.
196 -> Maybe FlowSocialListWith
199 flowDataText u (DataOld ids) tt cid mfslw _ = do
200 (_userId, userCorpusId, listId) <- createNodes u (Right [cid]) corpusType
201 _ <- Doc.add userCorpusId ids
202 flowCorpusUser (_tt_lang tt) u userCorpusId listId corpusType mfslw
204 corpusType = (Nothing :: Maybe HyperdataCorpus)
205 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw jobHandle =
206 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) jobHandle
208 ------------------------------------------------------------------------
210 flowAnnuaire :: (FlowCmdM env err m, MonadJobStatus m)
212 -> Either CorpusName [CorpusId]
217 flowAnnuaire u n l filePath jobHandle = do
218 -- TODO Conduit for file
219 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
220 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) jobHandle
222 ------------------------------------------------------------------------
223 flowCorpusFile :: (FlowCmdM env err m, MonadJobStatus m)
225 -> Either CorpusName [CorpusId]
226 -> Limit -- Limit the number of docs (for dev purpose)
231 -> Maybe FlowSocialListWith
234 flowCorpusFile u n _l la ft ff fp mfslw jobHandle = do
235 eParsed <- liftBase $ parseFile ft ff fp
238 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) jobHandle
239 --let docs = splitEvery 500 $ take l parsed
240 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
241 Left e -> panic $ "Error: " <> T.pack e
243 ------------------------------------------------------------------------
244 -- | TODO improve the needed type to create/update a corpus
245 -- (For now, Either is enough)
246 flowCorpus :: (FlowCmdM env err m, FlowCorpus a, MonadJobStatus m)
248 -> Either CorpusName [CorpusId]
250 -> Maybe FlowSocialListWith
251 -> (Maybe Integer, ConduitT () a m ())
254 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
257 flow :: forall env err m a c.
265 -> Either CorpusName [CorpusId]
267 -> Maybe FlowSocialListWith
268 -> (Maybe Integer, ConduitT () a m ())
271 flow c u cn la mfslw (mLength, docsC) jobHandle = do
272 (_userId, userCorpusId, listId) <- createNodes u cn c
273 -- TODO if public insertMasterDocs else insertUserDocs
274 _ <- runConduit $ zipSources (yieldMany [1..]) docsC
275 .| CList.chunksOf 100
277 .| mapM_C (\ids' -> do
278 _ <- Doc.add userCorpusId ids'
282 _ <- flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw
284 -- ids <- traverse (\(idx, doc) -> do
285 -- id <- insertMasterDocs c la doc
286 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
287 -- , _scst_failed = Just 0
288 -- , _scst_remaining = Just $ length docs - idx
289 -- , _scst_events = Just []
292 -- ) (zip [1..] docs)
293 --printDebug "[flow] calling flowCorpusUser" (0 :: Int)
295 --flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
298 insertDocs' :: [(Integer, a)] -> m [NodeId]
299 insertDocs' [] = pure []
300 insertDocs' docs = do
301 -- printDebug "[flow] calling insertDoc, ([idx], mLength) = " (fst <$> docs, mLength)
302 ids <- insertMasterDocs c la (snd <$> docs)
303 let maxIdx = maximum (fst <$> docs)
308 let succeeded = fromIntegral (1 + maxIdx)
309 let remaining = fromIntegral (len - maxIdx)
310 -- Reconstruct the correct update state by using 'markStarted' and the other primitives.
311 -- We do this slightly awkward arithmetic such that when we call 'markProgress' we reduce
312 -- the number of 'remaining' of exactly '1 + maxIdx', and we will end up with a 'JobLog'
313 -- looking like this:
315 -- { _scst_succeeded = Just $ fromIntegral $ 1 + maxIdx
316 -- , _scst_failed = Just 0
317 -- , _scst_remaining = Just $ fromIntegral $ len - maxIdx
318 -- , _scst_events = Just []
320 markStarted (remaining + succeeded) jobHandle
321 markProgress succeeded jobHandle
327 ------------------------------------------------------------------------
328 createNodes :: ( FlowCmdM env err m
332 -> Either CorpusName [CorpusId]
334 -> m (UserId, CorpusId, ListId)
335 createNodes user corpusName ctype = do
337 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
338 -- NodeTexts is first
339 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
340 -- printDebug "NodeTexts: " tId
342 -- NodeList is second
343 listId <- getOrMkList userCorpusId userId
346 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
347 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
349 pure (userId, userCorpusId, listId)
352 flowCorpusUser :: ( FlowCmdM env err m
360 -> Maybe FlowSocialListWith
362 flowCorpusUser l user userCorpusId listId ctype mfslw = do
363 server <- view (nlpServerGet l)
365 (masterUserId, _masterRootId, masterCorpusId)
366 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
368 --let gp = (GroupParams l 2 3 (StopSize 3))
369 -- Here the PosTagAlgo should be chosen according to the Lang
371 (Just (NoList _)) -> do
372 -- printDebug "Do not build list" mfslw
375 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw
376 $ GroupWithPosTag l server HashMap.empty
378 -- printDebug "flowCorpusUser:ngs" ngs
380 _userListId <- flowList_DbRepo listId ngs
381 _mastListId <- getOrMkList masterCorpusId masterUserId
383 -- _ <- insertOccsUpdates userCorpusId mastListId
384 -- printDebug "userListId" userListId
385 --_ <- mkPhylo userCorpusId userId
387 -- _ <- mkAnnuaire rootUserId userId
388 _ <- updateNgramsOccurrences userCorpusId (Just listId)
393 insertMasterDocs :: ( FlowCmdM env err m
401 insertMasterDocs c lang hs = do
402 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
403 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
404 _ <- Doc.add masterCorpusId ids'
406 -- create a corpus with database name (CSV or PubMed)
407 -- add documents to the corpus (create node_node link)
408 -- this will enable global database monitoring
410 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
411 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
413 <$> documentIdWithNgrams
414 (extractNgramsT $ withLang lang documentsWithId)
417 lId <- getOrMkList masterCorpusId masterUserId
418 -- _ <- saveDocNgramsWith lId mapNgramsDocs'
419 _ <- saveDocNgramsWith lId mapNgramsDocs'
421 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
424 saveDocNgramsWith :: (FlowCmdM env err m)
426 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
428 saveDocNgramsWith lId mapNgramsDocs' = do
429 --printDebug "[saveDocNgramsWith] mapNgramsDocs'" mapNgramsDocs'
430 let mapNgramsDocsNoCount = over (traverse . traverse . traverse) fst mapNgramsDocs'
431 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocsNoCount
433 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
436 mapCgramsId <- listInsertDb lId toNodeNgramsW'
437 $ map (first _ngramsTerms . second Map.keys)
438 $ HashMap.toList mapNgramsDocs
440 --printDebug "saveDocNgramsWith" mapCgramsId
442 let ngrams2insert = catMaybes [ ContextNodeNgrams2 <$> Just nId
443 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
444 <*> Just (fromIntegral w :: Double)
445 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
446 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
447 , (nId, (w, _cnt)) <- Map.toList mapNodeIdWeight
449 -- printDebug "Ngrams2Insert" ngrams2insert
450 _return <- insertContextNodeNgrams2 ngrams2insert
453 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
458 ------------------------------------------------------------------------
459 -- TODO Type NodeDocumentUnicised
460 insertDocs :: ( FlowCmdM env err m
467 -> m ([ContextId], [Indexed ContextId a])
468 insertDocs uId cId hs = do
469 let docs = map addUniqId hs
470 newIds <- insertDb uId Nothing docs
471 -- printDebug "newIds" newIds
473 newIds' = map reId newIds
474 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
475 _ <- Doc.add cId newIds'
476 pure (newIds', documentsWithId)
479 ------------------------------------------------------------------------
480 viewUniqId' :: UniqId a
483 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
485 err = panic "[ERROR] Database.Flow.toInsert"
488 toInserted :: [ReturnId]
491 Map.fromList . map (\r -> (reUniqId r, r) )
492 . filter (\r -> reInserted r == True)
494 mergeData :: Map Hash ReturnId
496 -> [Indexed NodeId a]
497 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
499 toDocumentWithId (sha,hpd) =
500 Indexed <$> fmap reId (lookup sha rs)
503 ------------------------------------------------------------------------
504 ------------------------------------------------------------------------
505 ------------------------------------------------------------------------
506 documentIdWithNgrams :: HasNodeError err
508 -> Cmd err (HashMap b (Map NgramsType Int, TermsCount)))
509 -> [Indexed NodeId a]
510 -> Cmd err [DocumentIdWithNgrams a b]
511 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
513 toDocumentIdWithNgrams d = do
515 pure $ DocumentIdWithNgrams d e
518 -- | TODO check optimization
519 mapNodeIdNgrams :: (Ord b, Hashable b)
520 => [DocumentIdWithNgrams a b]
523 (Map NodeId (Int, TermsCount))
525 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith addTuples)) . fmap f
527 -- | NOTE We are somehow multiplying 'TermsCount' here: If the
528 -- same ngrams term has different ngrams types, the 'TermsCount'
529 -- for it (which is the number of times the terms appears in a
530 -- document) is copied over to all its types.
531 f :: DocumentIdWithNgrams a b
532 -> HashMap b (Map NgramsType (Map NodeId (Int, TermsCount)))
533 f d = fmap (\(ngramsTypeMap, cnt) -> fmap (\i -> Map.singleton nId (i, cnt)) ngramsTypeMap) $ documentNgrams d
535 nId = _index $ documentWithId d
538 ------------------------------------------------------------------------
539 instance ExtractNgramsT HyperdataContact
541 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
543 extract :: TermType Lang -> HyperdataContact
544 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
546 let authors = map text2ngrams
547 $ maybe ["Nothing"] (\a -> [a])
548 $ view (hc_who . _Just . cw_lastName) hc'
550 pure $ HashMap.fromList $ [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
553 instance ExtractNgramsT HyperdataDocument
555 extractNgramsT :: TermType Lang
557 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
558 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
560 extractNgramsT' :: TermType Lang
562 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
563 extractNgramsT' lang' doc = do
564 let source = text2ngrams
565 $ maybe "Nothing" identity
568 institutes = map text2ngrams
569 $ maybe ["Nothing"] (splitOn Institutes (doc^. hd_bdd))
572 authors = map text2ngrams
573 $ maybe ["Nothing"] (splitOn Authors (doc^. hd_bdd))
576 ncs <- view (nlpServerGet $ lang' ^. tt_lang)
578 termsWithCounts' <- map (\(t, cnt) -> (enrichedTerms (lang' ^. tt_lang) CoreNLP NP t, cnt))
580 <$> liftBase (extractTerms ncs lang' $ hasText doc)
582 pure $ HashMap.fromList
583 $ [(SimpleNgrams source, (Map.singleton Sources 1, 1)) ]
584 <> [(SimpleNgrams i', (Map.singleton Institutes 1, 1)) | i' <- institutes ]
585 <> [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
586 <> [(EnrichedNgrams t', (Map.singleton NgramsTerms 1, cnt')) | (t', cnt') <- termsWithCounts' ]
588 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
590 extractNgramsT l (Node { _node_hyperdata = h }) = extractNgramsT l h
592 instance HasText a => HasText (Node a)
594 hasText (Node { _node_hyperdata = h }) = hasText h
598 -- | TODO putelsewhere
599 -- | Upgrade function
600 -- Suppose all documents are English (this is the case actually)
601 indexAllDocumentsWithPosTag :: FlowCmdM env err m
603 indexAllDocumentsWithPosTag = do
604 rootId <- getRootId (UserName userMaster)
605 corpusIds <- findNodesId rootId [NodeCorpus]
606 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
607 _ <- mapM extractInsert (splitEvery 1000 docs)
610 extractInsert :: FlowCmdM env err m
611 => [Node HyperdataDocument] -> m ()
612 extractInsert docs = do
613 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
614 mapNgramsDocs' <- mapNodeIdNgrams
615 <$> documentIdWithNgrams
616 (extractNgramsT $ withLang (Multi EN) documentsWithId)
618 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'