2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
41 , getOrMk_RootWithCorpus
47 , indexAllDocumentsWithPosTag
52 import Control.Lens ((^.), view, _Just, makeLenses, over, traverse)
53 import Control.Monad.Reader (MonadReader)
54 import Data.Aeson.TH (deriveJSON)
55 import Data.Conduit.Internal (zipSources)
56 import qualified Data.Conduit.List as CList
58 import Data.HashMap.Strict (HashMap)
59 import Data.Hashable (Hashable)
60 import Data.List (concat)
61 import Data.Map.Strict (Map, lookup)
62 import Data.Maybe (catMaybes)
65 import qualified Data.Text as T
66 import Data.Tuple.Extra (first, second)
67 import GHC.Generics (Generic)
68 import Servant.Client (ClientError)
69 import System.FilePath (FilePath)
70 import qualified Data.HashMap.Strict as HashMap
71 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
72 import qualified Data.Map.Strict as Map
73 import qualified Data.Conduit.List as CL
74 import qualified Data.Conduit as C
76 import Gargantext.Core (Lang(..), PosTagAlgo(..))
77 -- import Gargantext.Core.Ext.IMT (toSchoolName)
78 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
79 import Gargantext.Core.Flow.Types
80 import Gargantext.Core.NLP (nlpServerGet)
81 import Gargantext.Core.Text
82 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType, splitOn)
83 import Gargantext.Core.Text.List (buildNgramsLists)
84 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
85 import Gargantext.Core.Text.List.Social (FlowSocialListWith(..))
86 import Gargantext.Core.Text.Terms
87 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
88 import Gargantext.Core.Types (POS(NP), TermsCount)
89 import Gargantext.Core.Types.Individu (User(..))
90 import Gargantext.Core.Types.Main
91 import Gargantext.Core.Types.Query (Limit)
92 import Gargantext.Core.Utils (addTuples)
93 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
94 import Gargantext.Database.Action.Flow.List
95 import Gargantext.Database.Action.Flow.Types
96 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
97 import Gargantext.Database.Action.Search (searchDocInDatabase)
98 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
99 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
100 import Gargantext.Database.Admin.Types.Hyperdata
101 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
102 import Gargantext.Database.Prelude
103 import Gargantext.Database.Query.Table.ContextNodeNgrams2
104 import Gargantext.Database.Query.Table.Ngrams
105 import Gargantext.Database.Query.Table.Node
106 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
107 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
108 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
109 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
110 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
111 import Gargantext.Database.Types
112 import Gargantext.Prelude
113 import Gargantext.Prelude.Crypto.Hash (Hash)
114 import Gargantext.Utils.Jobs (JobHandle, MonadJobStatus(..))
115 import qualified Gargantext.Core.Text.Corpus.API as API
116 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
117 --import qualified Prelude
119 ------------------------------------------------------------------------
120 -- Imports for upgrade function
121 import Gargantext.Database.Query.Tree.Root (getRootId)
122 import Gargantext.Database.Query.Tree (findNodesId)
123 import qualified Data.List as List
124 ------------------------------------------------------------------------
125 -- TODO use internal with API name (could be old data)
126 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
127 | ExternalOrigin { _do_api :: API.ExternalAPIs }
129 deriving (Generic, Eq)
131 makeLenses ''DataOrigin
132 deriveJSON (unPrefix "_do_") ''DataOrigin
133 instance ToSchema DataOrigin where
134 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
136 allDataOrigins :: ( MonadReader env m
137 , HasConfig env) => m [DataOrigin]
139 ext <- API.externalAPIs
141 pure $ map InternalOrigin ext
142 <> map ExternalOrigin ext
145 data DataText = DataOld ![NodeId]
146 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
147 --- | DataNew ![[HyperdataDocument]]
149 -- Show instance is not possible because of IO
150 printDataText :: DataText -> IO ()
151 printDataText (DataOld xs) = putStrLn $ show xs
152 printDataText (DataNew (maybeInt, conduitData)) = do
153 res <- C.runConduit (conduitData .| CL.consume)
154 putStrLn $ show (maybeInt, res)
156 -- TODO use the split parameter in config file
157 getDataText :: FlowCmdM env err m
162 -> m (Either ClientError DataText)
163 getDataText (ExternalOrigin api) la q li = liftBase $ do
164 eRes <- API.get api (_tt_lang la) q li
165 pure $ DataNew <$> eRes
167 getDataText (InternalOrigin _) _la q _li = do
168 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
169 (UserName userMaster)
171 (Nothing :: Maybe HyperdataCorpus)
172 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
173 pure $ Right $ DataOld ids
175 getDataText_Debug :: FlowCmdM env err m
181 getDataText_Debug a l q li = do
182 result <- getDataText a l q li
184 Left err -> liftBase $ putStrLn $ show err
185 Right res -> liftBase $ printDataText res
188 -------------------------------------------------------------------------------
189 flowDataText :: forall env err m.
197 -> Maybe FlowSocialListWith
200 flowDataText u (DataOld ids) tt cid mfslw _ = do
201 (_userId, userCorpusId, listId) <- createNodes u (Right [cid]) corpusType
202 _ <- Doc.add userCorpusId ids
203 flowCorpusUser (_tt_lang tt) u userCorpusId listId corpusType mfslw
205 corpusType = (Nothing :: Maybe HyperdataCorpus)
206 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw jobHandle =
207 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) jobHandle
209 ------------------------------------------------------------------------
211 flowAnnuaire :: (FlowCmdM env err m, MonadJobStatus m)
213 -> Either CorpusName [CorpusId]
218 flowAnnuaire u n l filePath jobHandle = do
219 -- TODO Conduit for file
220 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
221 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) jobHandle
223 ------------------------------------------------------------------------
224 flowCorpusFile :: (FlowCmdM env err m, MonadJobStatus m)
226 -> Either CorpusName [CorpusId]
227 -> Limit -- Limit the number of docs (for dev purpose)
232 -> Maybe FlowSocialListWith
235 flowCorpusFile u n _l la ft ff fp mfslw jobHandle = do
236 eParsed <- liftBase $ parseFile ft ff fp
239 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) jobHandle
240 --let docs = splitEvery 500 $ take l parsed
241 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
242 Left e -> panic $ "Error: " <> T.pack e
244 ------------------------------------------------------------------------
245 -- | TODO improve the needed type to create/update a corpus
246 -- (For now, Either is enough)
247 flowCorpus :: (FlowCmdM env err m, FlowCorpus a, MonadJobStatus m)
249 -> Either CorpusName [CorpusId]
251 -> Maybe FlowSocialListWith
252 -> (Maybe Integer, ConduitT () a m ())
255 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
258 flow :: forall env err m a c.
266 -> Either CorpusName [CorpusId]
268 -> Maybe FlowSocialListWith
269 -> (Maybe Integer, ConduitT () a m ())
272 flow c u cn la mfslw (mLength, docsC) jobHandle = do
273 (_userId, userCorpusId, listId) <- createNodes u cn c
274 -- TODO if public insertMasterDocs else insertUserDocs
275 _ <- runConduit $ zipSources (yieldMany [1..]) docsC
276 .| CList.chunksOf 100
278 .| mapM_C (\ids' -> do
279 _ <- Doc.add userCorpusId ids'
283 _ <- flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw
285 -- ids <- traverse (\(idx, doc) -> do
286 -- id <- insertMasterDocs c la doc
287 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
288 -- , _scst_failed = Just 0
289 -- , _scst_remaining = Just $ length docs - idx
290 -- , _scst_events = Just []
293 -- ) (zip [1..] docs)
294 --printDebug "[flow] calling flowCorpusUser" (0 :: Int)
296 --flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
299 insertDocs' :: [(Integer, a)] -> m [NodeId]
300 insertDocs' [] = pure []
301 insertDocs' docs = do
302 -- printDebug "[flow] calling insertDoc, ([idx], mLength) = " (fst <$> docs, mLength)
303 ids <- insertMasterDocs c la (snd <$> docs)
304 let maxIdx = maximum (fst <$> docs)
309 let succeeded = fromIntegral (1 + maxIdx)
310 -- let remaining = fromIntegral (len - maxIdx)
311 -- Reconstruct the correct update state by using 'markStarted' and the other primitives.
312 -- We do this slightly awkward arithmetic such that when we call 'markProgress' we reduce
313 -- the number of 'remaining' of exactly '1 + maxIdx', and we will end up with a 'JobLog'
314 -- looking like this:
316 -- { _scst_succeeded = Just $ fromIntegral $ 1 + maxIdx
317 -- , _scst_failed = Just 0
318 -- , _scst_remaining = Just $ fromIntegral $ len - maxIdx
319 -- , _scst_events = Just []
321 -- markStarted (remaining + succeeded) jobHandle
322 markProgress succeeded jobHandle
328 ------------------------------------------------------------------------
329 createNodes :: ( FlowCmdM env err m
333 -> Either CorpusName [CorpusId]
335 -> m (UserId, CorpusId, ListId)
336 createNodes user corpusName ctype = do
338 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
339 -- NodeTexts is first
340 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
341 -- printDebug "NodeTexts: " tId
343 -- NodeList is second
344 listId <- getOrMkList userCorpusId userId
347 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
348 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
350 pure (userId, userCorpusId, listId)
353 flowCorpusUser :: ( FlowCmdM env err m
361 -> Maybe FlowSocialListWith
363 flowCorpusUser l user userCorpusId listId ctype mfslw = do
364 server <- view (nlpServerGet l)
366 (masterUserId, _masterRootId, masterCorpusId)
367 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
369 --let gp = (GroupParams l 2 3 (StopSize 3))
370 -- Here the PosTagAlgo should be chosen according to the Lang
372 (Just (NoList _)) -> do
373 -- printDebug "Do not build list" mfslw
376 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw
377 $ GroupWithPosTag l server HashMap.empty
379 -- printDebug "flowCorpusUser:ngs" ngs
381 _userListId <- flowList_DbRepo listId ngs
382 _mastListId <- getOrMkList masterCorpusId masterUserId
384 -- _ <- insertOccsUpdates userCorpusId mastListId
385 -- printDebug "userListId" userListId
386 --_ <- mkPhylo userCorpusId userId
388 -- _ <- mkAnnuaire rootUserId userId
389 _ <- updateNgramsOccurrences userCorpusId (Just listId)
394 insertMasterDocs :: ( FlowCmdM env err m
402 insertMasterDocs c lang hs = do
403 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
404 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
405 _ <- Doc.add masterCorpusId ids'
407 -- create a corpus with database name (CSV or PubMed)
408 -- add documents to the corpus (create node_node link)
409 -- this will enable global database monitoring
411 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
412 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
414 <$> documentIdWithNgrams
415 (extractNgramsT $ withLang lang documentsWithId)
418 lId <- getOrMkList masterCorpusId masterUserId
419 -- _ <- saveDocNgramsWith lId mapNgramsDocs'
420 _ <- saveDocNgramsWith lId mapNgramsDocs'
422 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
425 saveDocNgramsWith :: (FlowCmdM env err m)
427 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
429 saveDocNgramsWith lId mapNgramsDocs' = do
430 --printDebug "[saveDocNgramsWith] mapNgramsDocs'" mapNgramsDocs'
431 let mapNgramsDocsNoCount = over (traverse . traverse . traverse) fst mapNgramsDocs'
432 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocsNoCount
434 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
437 mapCgramsId <- listInsertDb lId toNodeNgramsW'
438 $ map (first _ngramsTerms . second Map.keys)
439 $ HashMap.toList mapNgramsDocs
441 --printDebug "saveDocNgramsWith" mapCgramsId
443 let ngrams2insert = catMaybes [ ContextNodeNgrams2 <$> Just nId
444 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
445 <*> Just (fromIntegral w :: Double)
446 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
447 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
448 , (nId, (w, _cnt)) <- Map.toList mapNodeIdWeight
450 -- printDebug "Ngrams2Insert" ngrams2insert
451 _return <- insertContextNodeNgrams2 ngrams2insert
454 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
459 ------------------------------------------------------------------------
460 -- TODO Type NodeDocumentUnicised
461 insertDocs :: ( FlowCmdM env err m
468 -> m ([ContextId], [Indexed ContextId a])
469 insertDocs uId cId hs = do
470 let docs = map addUniqId hs
471 newIds <- insertDb uId Nothing docs
472 -- printDebug "newIds" newIds
474 newIds' = map reId newIds
475 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
476 _ <- Doc.add cId newIds'
477 pure (newIds', documentsWithId)
480 ------------------------------------------------------------------------
481 viewUniqId' :: UniqId a
484 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
486 err = panic "[ERROR] Database.Flow.toInsert"
489 toInserted :: [ReturnId]
492 Map.fromList . map (\r -> (reUniqId r, r) )
493 . filter (\r -> reInserted r == True)
495 mergeData :: Map Hash ReturnId
497 -> [Indexed NodeId a]
498 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
500 toDocumentWithId (sha,hpd) =
501 Indexed <$> fmap reId (lookup sha rs)
504 ------------------------------------------------------------------------
505 ------------------------------------------------------------------------
506 ------------------------------------------------------------------------
507 documentIdWithNgrams :: HasNodeError err
509 -> Cmd err (HashMap b (Map NgramsType Int, TermsCount)))
510 -> [Indexed NodeId a]
511 -> Cmd err [DocumentIdWithNgrams a b]
512 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
514 toDocumentIdWithNgrams d = do
516 pure $ DocumentIdWithNgrams d e
519 -- | TODO check optimization
520 mapNodeIdNgrams :: (Ord b, Hashable b)
521 => [DocumentIdWithNgrams a b]
524 (Map NodeId (Int, TermsCount))
526 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith addTuples)) . fmap f
528 -- | NOTE We are somehow multiplying 'TermsCount' here: If the
529 -- same ngrams term has different ngrams types, the 'TermsCount'
530 -- for it (which is the number of times the terms appears in a
531 -- document) is copied over to all its types.
532 f :: DocumentIdWithNgrams a b
533 -> HashMap b (Map NgramsType (Map NodeId (Int, TermsCount)))
534 f d = fmap (\(ngramsTypeMap, cnt) -> fmap (\i -> Map.singleton nId (i, cnt)) ngramsTypeMap) $ documentNgrams d
536 nId = _index $ documentWithId d
539 ------------------------------------------------------------------------
540 instance ExtractNgramsT HyperdataContact
542 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
544 extract :: TermType Lang -> HyperdataContact
545 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
547 let authors = map text2ngrams
548 $ maybe ["Nothing"] (\a -> [a])
549 $ view (hc_who . _Just . cw_lastName) hc'
551 pure $ HashMap.fromList $ [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
554 instance ExtractNgramsT HyperdataDocument
556 extractNgramsT :: TermType Lang
558 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
559 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
561 extractNgramsT' :: TermType Lang
563 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
564 extractNgramsT' lang' doc = do
565 let source = text2ngrams
566 $ maybe "Nothing" identity
569 institutes = map text2ngrams
570 $ maybe ["Nothing"] (splitOn Institutes (doc^. hd_bdd))
573 authors = map text2ngrams
574 $ maybe ["Nothing"] (splitOn Authors (doc^. hd_bdd))
577 ncs <- view (nlpServerGet $ lang' ^. tt_lang)
579 termsWithCounts' <- map (\(t, cnt) -> (enrichedTerms (lang' ^. tt_lang) CoreNLP NP t, cnt))
581 <$> liftBase (extractTerms ncs lang' $ hasText doc)
583 pure $ HashMap.fromList
584 $ [(SimpleNgrams source, (Map.singleton Sources 1, 1)) ]
585 <> [(SimpleNgrams i', (Map.singleton Institutes 1, 1)) | i' <- institutes ]
586 <> [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
587 <> [(EnrichedNgrams t', (Map.singleton NgramsTerms 1, cnt')) | (t', cnt') <- termsWithCounts' ]
589 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
591 extractNgramsT l (Node { _node_hyperdata = h }) = extractNgramsT l h
593 instance HasText a => HasText (Node a)
595 hasText (Node { _node_hyperdata = h }) = hasText h
599 -- | TODO putelsewhere
600 -- | Upgrade function
601 -- Suppose all documents are English (this is the case actually)
602 indexAllDocumentsWithPosTag :: FlowCmdM env err m
604 indexAllDocumentsWithPosTag = do
605 rootId <- getRootId (UserName userMaster)
606 corpusIds <- findNodesId rootId [NodeCorpus]
607 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
608 _ <- mapM extractInsert (splitEvery 1000 docs)
611 extractInsert :: FlowCmdM env err m
612 => [Node HyperdataDocument] -> m ()
613 extractInsert docs = do
614 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
615 mapNgramsDocs' <- mapNodeIdNgrams
616 <$> documentIdWithNgrams
617 (extractNgramsT $ withLang (Multi EN) documentsWithId)
619 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'