]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Database/Action/Flow.hs
fix some Conduit wiring, lifting IO conduit to a more generic setting
[gargantext.git] / src / Gargantext / Database / Action / Flow.hs
1 {-|
2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 -- TODO-ACCESS:
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
13
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
16 -}
17
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
19
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
25
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
27 ( DataText(..)
28 , getDataText
29 , flowDataText
30 , flow
31
32 , flowCorpusFile
33 , flowCorpus
34 , flowAnnuaire
35 , insertMasterDocs
36 , saveDocNgramsWith
37
38 , getOrMkRoot
39 , getOrMk_RootWithCorpus
40 , TermType(..)
41 , DataOrigin(..)
42 , allDataOrigins
43
44 , do_api
45 , indexAllDocumentsWithPosTag
46 )
47 where
48
49 import Conduit
50 import Control.Lens ((^.), view, _Just, makeLenses)
51 import Data.Aeson.TH (deriveJSON)
52 import Data.Conduit.Internal (zipSources)
53 import Data.Either
54 import Data.HashMap.Strict (HashMap)
55 import Data.Hashable (Hashable)
56 import Data.List (concat)
57 import Data.Map (Map, lookup)
58 import Data.Maybe (catMaybes)
59 import Data.Monoid
60 import Data.Swagger
61 import qualified Data.Text as T
62 import Data.Traversable (traverse)
63 import Data.Tuple.Extra (first, second)
64 import GHC.Generics (Generic)
65 import System.FilePath (FilePath)
66 import qualified Data.HashMap.Strict as HashMap
67 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
68 import qualified Data.Map as Map
69
70 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
71 import Gargantext.Core (Lang(..), PosTagAlgo(..))
72 import Gargantext.Core.Ext.IMT (toSchoolName)
73 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
74 import Gargantext.Core.Flow.Types
75 import Gargantext.Core.Text
76 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat)
77 import Gargantext.Core.Text.List (buildNgramsLists)
78 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
79 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
80 import Gargantext.Core.Text.Terms
81 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
82 import Gargantext.Core.Types (POS(NP))
83 import Gargantext.Core.Types.Individu (User(..))
84 import Gargantext.Core.Types.Main
85 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
86 import Gargantext.Database.Action.Flow.List
87 import Gargantext.Database.Action.Flow.Types
88 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
89 import Gargantext.Database.Action.Search (searchDocInDatabase)
90 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
91 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
92 import Gargantext.Database.Admin.Types.Hyperdata
93 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
94 import Gargantext.Database.Prelude
95 import Gargantext.Database.Query.Table.ContextNodeNgrams2
96 import Gargantext.Database.Query.Table.Ngrams
97 import Gargantext.Database.Query.Table.Node
98 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
99 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
100 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
101 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
102 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
103 import Gargantext.Database.Types
104 import Gargantext.Prelude
105 import Gargantext.Prelude.Crypto.Hash (Hash)
106 import qualified Gargantext.Core.Text.Corpus.API as API
107 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
108 import qualified Prelude as Prelude
109
110 ------------------------------------------------------------------------
111 -- Imports for upgrade function
112 import Gargantext.Database.Query.Tree.Root (getRootId)
113 import Gargantext.Database.Query.Tree (findNodesId)
114 import qualified Data.List as List
115 ------------------------------------------------------------------------
116 -- TODO use internal with API name (could be old data)
117 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
118 | ExternalOrigin { _do_api :: API.ExternalAPIs }
119 -- TODO Web
120 deriving (Generic, Eq)
121
122 makeLenses ''DataOrigin
123 deriveJSON (unPrefix "_do_") ''DataOrigin
124 instance ToSchema DataOrigin where
125 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
126
127 allDataOrigins :: [DataOrigin]
128 allDataOrigins = map InternalOrigin API.externalAPIs
129 <> map ExternalOrigin API.externalAPIs
130
131 ---------------
132 data DataText = DataOld ![NodeId]
133 | DataNew !(ConduitT () HyperdataDocument IO ())
134 -- | DataNew ![[HyperdataDocument]]
135
136 -- TODO use the split parameter in config file
137 getDataText :: FlowCmdM env err m
138 => DataOrigin
139 -> TermType Lang
140 -> API.Query
141 -> Maybe API.Limit
142 -> m DataText
143 getDataText (ExternalOrigin api) la q li = liftBase $ do
144 docsC <- API.get api (_tt_lang la) q li
145 pure $ DataNew docsC
146
147 getDataText (InternalOrigin _) _la q _li = do
148 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
149 (UserName userMaster)
150 (Left "")
151 (Nothing :: Maybe HyperdataCorpus)
152 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
153 pure $ DataOld ids
154
155 -------------------------------------------------------------------------------
156 flowDataText :: forall env err m.
157 ( FlowCmdM env err m
158 )
159 => User
160 -> DataText
161 -> TermType Lang
162 -> CorpusId
163 -> Maybe FlowSocialListWith
164 -> (JobLog -> m ())
165 -> m CorpusId
166 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
167 where
168 corpusType = (Nothing :: Maybe HyperdataCorpus)
169 flowDataText u (DataNew txtC) tt cid mfslw logStatus = flowCorpus u (Right [cid]) tt mfslw (transPipe liftBase txtC) logStatus
170
171 ------------------------------------------------------------------------
172 -- TODO use proxy
173 flowAnnuaire :: (FlowCmdM env err m)
174 => User
175 -> Either CorpusName [CorpusId]
176 -> (TermType Lang)
177 -> FilePath
178 -> (JobLog -> m ())
179 -> m AnnuaireId
180 flowAnnuaire u n l filePath logStatus = do
181 -- TODO Conduit for file
182 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
183 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (yieldMany docs) logStatus
184
185 ------------------------------------------------------------------------
186 flowCorpusFile :: (FlowCmdM env err m)
187 => User
188 -> Either CorpusName [CorpusId]
189 -> Limit -- Limit the number of docs (for dev purpose)
190 -> TermType Lang -> FileFormat -> FilePath
191 -> Maybe FlowSocialListWith
192 -> (JobLog -> m ())
193 -> m CorpusId
194 flowCorpusFile u n _l la ff fp mfslw logStatus = do
195 eParsed <- liftBase $ parseFile ff fp
196 case eParsed of
197 Right parsed -> do
198 flowCorpus u n la mfslw (yieldMany parsed .| mapC toHyperdataDocument) logStatus
199 --let docs = splitEvery 500 $ take l parsed
200 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
201 Left e -> panic $ "Error: " <> (T.pack e)
202
203 ------------------------------------------------------------------------
204 -- | TODO improve the needed type to create/update a corpus
205 -- (For now, Either is enough)
206 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
207 => User
208 -> Either CorpusName [CorpusId]
209 -> TermType Lang
210 -> Maybe FlowSocialListWith
211 -> ConduitT () a m ()
212 -> (JobLog -> m ())
213 -> m CorpusId
214 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
215
216
217 flow :: forall env err m a c.
218 ( FlowCmdM env err m
219 , FlowCorpus a
220 , MkCorpus c
221 )
222 => Maybe c
223 -> User
224 -> Either CorpusName [CorpusId]
225 -> TermType Lang
226 -> Maybe FlowSocialListWith
227 -> ConduitT () a m ()
228 -> (JobLog -> m ())
229 -> m CorpusId
230 flow c u cn la mfslw docsC _logStatus = do
231 -- TODO if public insertMasterDocs else insertUserDocs
232 ids <- runConduit $
233 zipSources (yieldMany [1..]) docsC
234 .| mapMC insertDoc
235 .| sinkList
236 -- ids <- traverse (\(idx, doc) -> do
237 -- id <- insertMasterDocs c la doc
238 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
239 -- , _scst_failed = Just 0
240 -- , _scst_remaining = Just $ length docs - idx
241 -- , _scst_events = Just []
242 -- }
243 -- pure id
244 -- ) (zip [1..] docs)
245 flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
246
247 where
248 insertDoc :: (Int, a) -> m NodeId
249 insertDoc (_idx, doc) = do
250 id <- insertMasterDocs c la [doc]
251 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
252 -- , _scst_failed = Just 0
253 -- , _scst_remaining = Just $ length docs - idx
254 -- , _scst_events = Just []
255 -- }
256 pure $ Prelude.head id
257
258
259
260 ------------------------------------------------------------------------
261 flowCorpusUser :: ( FlowCmdM env err m
262 , MkCorpus c
263 )
264 => Lang
265 -> User
266 -> Either CorpusName [CorpusId]
267 -> Maybe c
268 -> [NodeId]
269 -> Maybe FlowSocialListWith
270 -> m CorpusId
271 flowCorpusUser l user corpusName ctype ids mfslw = do
272 -- User Flow
273 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
274 -- NodeTexts is first
275 _tId <- insertDefaultNode NodeTexts userCorpusId userId
276 -- printDebug "NodeTexts: " tId
277
278 -- NodeList is second
279 listId <- getOrMkList userCorpusId userId
280 -- _cooc <- insertDefaultNode NodeListCooc listId userId
281 -- TODO: check if present already, ignore
282 _ <- Doc.add userCorpusId ids
283
284 -- printDebug "Node Text Ids:" tId
285
286 -- User List Flow
287 (masterUserId, _masterRootId, masterCorpusId)
288 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
289
290 --let gp = (GroupParams l 2 3 (StopSize 3))
291 let gp = GroupWithPosTag l CoreNLP HashMap.empty
292 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
293
294 -- printDebug "flowCorpusUser:ngs" ngs
295
296 _userListId <- flowList_DbRepo listId ngs
297 _mastListId <- getOrMkList masterCorpusId masterUserId
298 -- _ <- insertOccsUpdates userCorpusId mastListId
299 -- printDebug "userListId" userListId
300 -- User Graph Flow
301 _ <- insertDefaultNode NodeDashboard userCorpusId userId
302 _ <- insertDefaultNode NodeGraph userCorpusId userId
303 --_ <- mkPhylo userCorpusId userId
304 -- Annuaire Flow
305 -- _ <- mkAnnuaire rootUserId userId
306 _ <- updateNgramsOccurrences userCorpusId (Just listId)
307
308 pure userCorpusId
309
310
311 insertMasterDocs :: ( FlowCmdM env err m
312 , FlowCorpus a
313 , MkCorpus c
314 )
315 => Maybe c
316 -> TermType Lang
317 -> [a]
318 -> m [DocId]
319 insertMasterDocs c lang hs = do
320 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
321 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId masterCorpusId) hs )
322 _ <- Doc.add masterCorpusId ids'
323 -- TODO
324 -- create a corpus with database name (CSV or PubMed)
325 -- add documents to the corpus (create node_node link)
326 -- this will enable global database monitoring
327
328 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
329 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
330 <- mapNodeIdNgrams
331 <$> documentIdWithNgrams
332 (extractNgramsT $ withLang lang documentsWithId)
333 documentsWithId
334
335 lId <- getOrMkList masterCorpusId masterUserId
336 _ <- saveDocNgramsWith lId mapNgramsDocs'
337
338 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
339 pure ids'
340
341 saveDocNgramsWith :: ( FlowCmdM env err m)
342 => ListId
343 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
344 -> m ()
345 saveDocNgramsWith lId mapNgramsDocs' = do
346 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
347 printDebug "terms2id" terms2id
348
349 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
350
351 -- new
352 mapCgramsId <- listInsertDb lId toNodeNgramsW'
353 $ map (first _ngramsTerms . second Map.keys)
354 $ HashMap.toList mapNgramsDocs
355
356 printDebug "saveDocNgramsWith" mapCgramsId
357 -- insertDocNgrams
358 _return <- insertContextNodeNgrams2
359 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
360 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
361 <*> Just (fromIntegral w :: Double)
362 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
363 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
364 , (nId, w) <- Map.toList mapNodeIdWeight
365 ]
366
367 -- to be removed
368 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
369
370 pure ()
371
372
373 ------------------------------------------------------------------------
374 -- TODO Type NodeDocumentUnicised
375 insertDocs :: ( FlowCmdM env err m
376 -- , FlowCorpus a
377 , FlowInsertDB a
378 )
379 => UserId
380 -> CorpusId
381 -> [a]
382 -> m ([ContextId], [Indexed ContextId a])
383 insertDocs uId cId hs = do
384 let docs = map addUniqId hs
385 newIds <- insertDb uId cId docs
386 -- printDebug "newIds" newIds
387 let
388 newIds' = map reId newIds
389 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
390 _ <- Doc.add cId newIds'
391 pure (newIds', documentsWithId)
392
393
394 ------------------------------------------------------------------------
395 viewUniqId' :: UniqId a
396 => a
397 -> (Hash, a)
398 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
399 where
400 err = panic "[ERROR] Database.Flow.toInsert"
401
402
403 toInserted :: [ReturnId]
404 -> Map Hash ReturnId
405 toInserted =
406 Map.fromList . map (\r -> (reUniqId r, r) )
407 . filter (\r -> reInserted r == True)
408
409 mergeData :: Map Hash ReturnId
410 -> Map Hash a
411 -> [Indexed NodeId a]
412 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
413 where
414 toDocumentWithId (sha,hpd) =
415 Indexed <$> fmap reId (lookup sha rs)
416 <*> Just hpd
417
418 ------------------------------------------------------------------------
419 ------------------------------------------------------------------------
420 ------------------------------------------------------------------------
421 documentIdWithNgrams :: HasNodeError err
422 => (a
423 -> Cmd err (HashMap b (Map NgramsType Int)))
424 -> [Indexed NodeId a]
425 -> Cmd err [DocumentIdWithNgrams a b]
426 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
427 where
428 toDocumentIdWithNgrams d = do
429 e <- f $ _unIndex d
430 pure $ DocumentIdWithNgrams d e
431
432
433 -- | TODO check optimization
434 mapNodeIdNgrams :: (Ord b, Hashable b)
435 => [DocumentIdWithNgrams a b]
436 -> HashMap b
437 (Map NgramsType
438 (Map NodeId Int)
439 )
440 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith (+))) . fmap f
441 where
442 f :: DocumentIdWithNgrams a b
443 -> HashMap b (Map NgramsType (Map NodeId Int))
444 f d = fmap (fmap (Map.singleton nId)) $ documentNgrams d
445 where
446 nId = _index $ documentWithId d
447
448
449 ------------------------------------------------------------------------
450 instance ExtractNgramsT HyperdataContact
451 where
452 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
453 where
454 extract :: TermType Lang -> HyperdataContact
455 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
456 extract _l hc' = do
457 let authors = map text2ngrams
458 $ maybe ["Nothing"] (\a -> [a])
459 $ view (hc_who . _Just . cw_lastName) hc'
460
461 pure $ HashMap.fromList $ [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
462
463
464 instance ExtractNgramsT HyperdataDocument
465 where
466 extractNgramsT :: TermType Lang
467 -> HyperdataDocument
468 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
469 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
470 where
471 extractNgramsT' :: TermType Lang
472 -> HyperdataDocument
473 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
474 extractNgramsT' lang' doc = do
475 let source = text2ngrams
476 $ maybe "Nothing" identity
477 $ _hd_source doc
478
479 institutes = map text2ngrams
480 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
481 $ _hd_institutes doc
482
483 authors = map text2ngrams
484 $ maybe ["Nothing"] (T.splitOn ", ")
485 $ _hd_authors doc
486
487 terms' <- map (enrichedTerms (lang' ^. tt_lang) CoreNLP NP)
488 <$> concat
489 <$> liftBase (extractTerms lang' $ hasText doc)
490
491 pure $ HashMap.fromList
492 $ [(SimpleNgrams source, Map.singleton Sources 1) ]
493 <> [(SimpleNgrams i', Map.singleton Institutes 1) | i' <- institutes ]
494 <> [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
495 <> [(EnrichedNgrams t', Map.singleton NgramsTerms 1) | t' <- terms' ]
496
497 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
498 where
499 extractNgramsT l (Node _ _ _ _ _ _ _ h) = extractNgramsT l h
500
501 instance HasText a => HasText (Node a)
502 where
503 hasText (Node _ _ _ _ _ _ _ h) = hasText h
504
505
506
507 -- | TODO putelsewhere
508 -- | Upgrade function
509 -- Suppose all documents are English (this is the case actually)
510 indexAllDocumentsWithPosTag :: FlowCmdM env err m
511 => m ()
512 indexAllDocumentsWithPosTag = do
513 rootId <- getRootId (UserName userMaster)
514 corpusIds <- findNodesId rootId [NodeCorpus]
515 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
516 _ <- mapM extractInsert (splitEvery 1000 docs)
517 pure ()
518
519 extractInsert :: FlowCmdM env err m
520 => [Node HyperdataDocument] -> m ()
521 extractInsert docs = do
522 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
523 mapNgramsDocs' <- mapNodeIdNgrams
524 <$> documentIdWithNgrams
525 (extractNgramsT $ withLang (Multi EN) documentsWithId)
526 documentsWithId
527 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
528 pure ()
529
530