]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Database/Action/Flow.hs
[conduit] implement conduit for Hal, Pubmed
[gargantext.git] / src / Gargantext / Database / Action / Flow.hs
1 {-|
2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 -- TODO-ACCESS:
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
13
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
16 -}
17
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
19
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
25
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
27 ( DataText(..)
28 , getDataText
29 , flowDataText
30 , flow
31
32 , flowCorpusFile
33 , flowCorpus
34 , flowAnnuaire
35 , insertMasterDocs
36 , saveDocNgramsWith
37
38 , getOrMkRoot
39 , getOrMk_RootWithCorpus
40 , TermType(..)
41 , DataOrigin(..)
42 , allDataOrigins
43
44 , do_api
45 , indexAllDocumentsWithPosTag
46 )
47 where
48
49 import Conduit
50 import Control.Lens ((^.), view, _Just, makeLenses)
51 import Data.Aeson.TH (deriveJSON)
52 import Data.Conduit.Internal (zipSources)
53 import Data.Either
54 import Data.HashMap.Strict (HashMap)
55 import Data.Hashable (Hashable)
56 import Data.List (concat)
57 import Data.Map (Map, lookup)
58 import Data.Maybe (catMaybes)
59 import Data.Monoid
60 import Data.Swagger
61 import qualified Data.Text as T
62 import Data.Traversable (traverse)
63 import Data.Tuple.Extra (first, second)
64 import GHC.Generics (Generic)
65 import Servant.Client (ClientError)
66 import System.FilePath (FilePath)
67 import qualified Data.HashMap.Strict as HashMap
68 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
69 import qualified Data.Map as Map
70
71 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
72 import Gargantext.Core (Lang(..), PosTagAlgo(..))
73 import Gargantext.Core.Ext.IMT (toSchoolName)
74 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
75 import Gargantext.Core.Flow.Types
76 import Gargantext.Core.Text
77 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat)
78 import Gargantext.Core.Text.List (buildNgramsLists)
79 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
80 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
81 import Gargantext.Core.Text.Terms
82 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
83 import Gargantext.Core.Types (POS(NP))
84 import Gargantext.Core.Types.Individu (User(..))
85 import Gargantext.Core.Types.Main
86 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
87 import Gargantext.Database.Action.Flow.List
88 import Gargantext.Database.Action.Flow.Types
89 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
90 import Gargantext.Database.Action.Search (searchDocInDatabase)
91 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
92 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
93 import Gargantext.Database.Admin.Types.Hyperdata
94 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
95 import Gargantext.Database.Prelude
96 import Gargantext.Database.Query.Table.ContextNodeNgrams2
97 import Gargantext.Database.Query.Table.Ngrams
98 import Gargantext.Database.Query.Table.Node
99 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
100 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
101 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
102 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
103 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
104 import Gargantext.Database.Types
105 import Gargantext.Prelude
106 import Gargantext.Prelude.Crypto.Hash (Hash)
107 import qualified Gargantext.Core.Text.Corpus.API as API
108 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
109 import qualified Prelude as Prelude
110
111 ------------------------------------------------------------------------
112 -- Imports for upgrade function
113 import Gargantext.Database.Query.Tree.Root (getRootId)
114 import Gargantext.Database.Query.Tree (findNodesId)
115 import qualified Data.List as List
116 ------------------------------------------------------------------------
117 -- TODO use internal with API name (could be old data)
118 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
119 | ExternalOrigin { _do_api :: API.ExternalAPIs }
120 -- TODO Web
121 deriving (Generic, Eq)
122
123 makeLenses ''DataOrigin
124 deriveJSON (unPrefix "_do_") ''DataOrigin
125 instance ToSchema DataOrigin where
126 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
127
128 allDataOrigins :: [DataOrigin]
129 allDataOrigins = map InternalOrigin API.externalAPIs
130 <> map ExternalOrigin API.externalAPIs
131
132 ---------------
133 data DataText = DataOld ![NodeId]
134 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
135 -- | DataNew ![[HyperdataDocument]]
136
137 -- TODO use the split parameter in config file
138 getDataText :: FlowCmdM env err m
139 => DataOrigin
140 -> TermType Lang
141 -> API.Query
142 -> Maybe API.Limit
143 -> m (Either ClientError DataText)
144 getDataText (ExternalOrigin api) la q li = liftBase $ do
145 eRes <- API.get api (_tt_lang la) q li
146 pure $ DataNew <$> eRes
147
148 getDataText (InternalOrigin _) _la q _li = do
149 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
150 (UserName userMaster)
151 (Left "")
152 (Nothing :: Maybe HyperdataCorpus)
153 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
154 pure $ Right $ DataOld ids
155
156 -------------------------------------------------------------------------------
157 flowDataText :: forall env err m.
158 ( FlowCmdM env err m
159 )
160 => User
161 -> DataText
162 -> TermType Lang
163 -> CorpusId
164 -> Maybe FlowSocialListWith
165 -> (JobLog -> m ())
166 -> m CorpusId
167 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
168 where
169 corpusType = (Nothing :: Maybe HyperdataCorpus)
170 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
171 flowCorpus u (Right [cid]) tt mfslw (mLen, transPipe liftBase txtC) logStatus
172
173 ------------------------------------------------------------------------
174 -- TODO use proxy
175 flowAnnuaire :: (FlowCmdM env err m)
176 => User
177 -> Either CorpusName [CorpusId]
178 -> (TermType Lang)
179 -> FilePath
180 -> (JobLog -> m ())
181 -> m AnnuaireId
182 flowAnnuaire u n l filePath logStatus = do
183 -- TODO Conduit for file
184 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
185 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
186
187 ------------------------------------------------------------------------
188 flowCorpusFile :: (FlowCmdM env err m)
189 => User
190 -> Either CorpusName [CorpusId]
191 -> Limit -- Limit the number of docs (for dev purpose)
192 -> TermType Lang -> FileFormat -> FilePath
193 -> Maybe FlowSocialListWith
194 -> (JobLog -> m ())
195 -> m CorpusId
196 flowCorpusFile u n _l la ff fp mfslw logStatus = do
197 eParsed <- liftBase $ parseFile ff fp
198 case eParsed of
199 Right parsed -> do
200 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
201 --let docs = splitEvery 500 $ take l parsed
202 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
203 Left e -> panic $ "Error: " <> (T.pack e)
204
205 ------------------------------------------------------------------------
206 -- | TODO improve the needed type to create/update a corpus
207 -- (For now, Either is enough)
208 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
209 => User
210 -> Either CorpusName [CorpusId]
211 -> TermType Lang
212 -> Maybe FlowSocialListWith
213 -> (Maybe Integer, ConduitT () a m ())
214 -> (JobLog -> m ())
215 -> m CorpusId
216 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
217
218
219 flow :: forall env err m a c.
220 ( FlowCmdM env err m
221 , FlowCorpus a
222 , MkCorpus c
223 )
224 => Maybe c
225 -> User
226 -> Either CorpusName [CorpusId]
227 -> TermType Lang
228 -> Maybe FlowSocialListWith
229 -> (Maybe Integer, ConduitT () a m ())
230 -> (JobLog -> m ())
231 -> m CorpusId
232 flow c u cn la mfslw (mLength, docsC) logStatus = do
233 -- TODO if public insertMasterDocs else insertUserDocs
234 ids <- runConduit $
235 zipSources (yieldMany [1..]) docsC
236 .| mapMC insertDoc
237 .| sinkList
238 -- ids <- traverse (\(idx, doc) -> do
239 -- id <- insertMasterDocs c la doc
240 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
241 -- , _scst_failed = Just 0
242 -- , _scst_remaining = Just $ length docs - idx
243 -- , _scst_events = Just []
244 -- }
245 -- pure id
246 -- ) (zip [1..] docs)
247 flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
248
249 where
250 insertDoc :: (Integer, a) -> m NodeId
251 insertDoc (idx, doc) = do
252 id <- insertMasterDocs c la [doc]
253 case mLength of
254 Nothing -> pure ()
255 Just len ->
256 logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
257 , _scst_failed = Just 0
258 , _scst_remaining = Just $ fromIntegral $ len - idx
259 , _scst_events = Just []
260 }
261 pure $ Prelude.head id
262
263
264
265 ------------------------------------------------------------------------
266 flowCorpusUser :: ( FlowCmdM env err m
267 , MkCorpus c
268 )
269 => Lang
270 -> User
271 -> Either CorpusName [CorpusId]
272 -> Maybe c
273 -> [NodeId]
274 -> Maybe FlowSocialListWith
275 -> m CorpusId
276 flowCorpusUser l user corpusName ctype ids mfslw = do
277 -- User Flow
278 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
279 -- NodeTexts is first
280 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
281 -- printDebug "NodeTexts: " tId
282
283 -- NodeList is second
284 listId <- getOrMkList userCorpusId userId
285 -- _cooc <- insertDefaultNode NodeListCooc listId userId
286 -- TODO: check if present already, ignore
287 _ <- Doc.add userCorpusId ids
288
289 -- printDebug "Node Text Ids:" tId
290
291 -- User List Flow
292 (masterUserId, _masterRootId, masterCorpusId)
293 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
294
295 --let gp = (GroupParams l 2 3 (StopSize 3))
296 let gp = GroupWithPosTag l CoreNLP HashMap.empty
297 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
298
299 -- printDebug "flowCorpusUser:ngs" ngs
300
301 _userListId <- flowList_DbRepo listId ngs
302 _mastListId <- getOrMkList masterCorpusId masterUserId
303 -- _ <- insertOccsUpdates userCorpusId mastListId
304 -- printDebug "userListId" userListId
305 -- User Graph Flow
306 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
307 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
308 --_ <- mkPhylo userCorpusId userId
309 -- Annuaire Flow
310 -- _ <- mkAnnuaire rootUserId userId
311 _ <- updateNgramsOccurrences userCorpusId (Just listId)
312
313 pure userCorpusId
314
315
316 insertMasterDocs :: ( FlowCmdM env err m
317 , FlowCorpus a
318 , MkCorpus c
319 )
320 => Maybe c
321 -> TermType Lang
322 -> [a]
323 -> m [DocId]
324 insertMasterDocs c lang hs = do
325 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
326 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId masterCorpusId) hs )
327 _ <- Doc.add masterCorpusId ids'
328 -- TODO
329 -- create a corpus with database name (CSV or PubMed)
330 -- add documents to the corpus (create node_node link)
331 -- this will enable global database monitoring
332
333 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
334 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
335 <- mapNodeIdNgrams
336 <$> documentIdWithNgrams
337 (extractNgramsT $ withLang lang documentsWithId)
338 documentsWithId
339
340 lId <- getOrMkList masterCorpusId masterUserId
341 _ <- saveDocNgramsWith lId mapNgramsDocs'
342
343 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
344 pure ids'
345
346 saveDocNgramsWith :: ( FlowCmdM env err m)
347 => ListId
348 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId Int))
349 -> m ()
350 saveDocNgramsWith lId mapNgramsDocs' = do
351 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
352 --printDebug "terms2id" terms2id
353
354 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
355
356 -- new
357 mapCgramsId <- listInsertDb lId toNodeNgramsW'
358 $ map (first _ngramsTerms . second Map.keys)
359 $ HashMap.toList mapNgramsDocs
360
361 --printDebug "saveDocNgramsWith" mapCgramsId
362 -- insertDocNgrams
363 _return <- insertContextNodeNgrams2
364 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
365 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
366 <*> Just (fromIntegral w :: Double)
367 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
368 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
369 , (nId, w) <- Map.toList mapNodeIdWeight
370 ]
371
372 -- to be removed
373 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
374
375 pure ()
376
377
378 ------------------------------------------------------------------------
379 -- TODO Type NodeDocumentUnicised
380 insertDocs :: ( FlowCmdM env err m
381 -- , FlowCorpus a
382 , FlowInsertDB a
383 )
384 => UserId
385 -> CorpusId
386 -> [a]
387 -> m ([ContextId], [Indexed ContextId a])
388 insertDocs uId cId hs = do
389 let docs = map addUniqId hs
390 newIds <- insertDb uId cId docs
391 -- printDebug "newIds" newIds
392 let
393 newIds' = map reId newIds
394 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
395 _ <- Doc.add cId newIds'
396 pure (newIds', documentsWithId)
397
398
399 ------------------------------------------------------------------------
400 viewUniqId' :: UniqId a
401 => a
402 -> (Hash, a)
403 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
404 where
405 err = panic "[ERROR] Database.Flow.toInsert"
406
407
408 toInserted :: [ReturnId]
409 -> Map Hash ReturnId
410 toInserted =
411 Map.fromList . map (\r -> (reUniqId r, r) )
412 . filter (\r -> reInserted r == True)
413
414 mergeData :: Map Hash ReturnId
415 -> Map Hash a
416 -> [Indexed NodeId a]
417 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
418 where
419 toDocumentWithId (sha,hpd) =
420 Indexed <$> fmap reId (lookup sha rs)
421 <*> Just hpd
422
423 ------------------------------------------------------------------------
424 ------------------------------------------------------------------------
425 ------------------------------------------------------------------------
426 documentIdWithNgrams :: HasNodeError err
427 => (a
428 -> Cmd err (HashMap b (Map NgramsType Int)))
429 -> [Indexed NodeId a]
430 -> Cmd err [DocumentIdWithNgrams a b]
431 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
432 where
433 toDocumentIdWithNgrams d = do
434 e <- f $ _unIndex d
435 pure $ DocumentIdWithNgrams d e
436
437
438 -- | TODO check optimization
439 mapNodeIdNgrams :: (Ord b, Hashable b)
440 => [DocumentIdWithNgrams a b]
441 -> HashMap b
442 (Map NgramsType
443 (Map NodeId Int)
444 )
445 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith (+))) . fmap f
446 where
447 f :: DocumentIdWithNgrams a b
448 -> HashMap b (Map NgramsType (Map NodeId Int))
449 f d = fmap (fmap (Map.singleton nId)) $ documentNgrams d
450 where
451 nId = _index $ documentWithId d
452
453
454 ------------------------------------------------------------------------
455 instance ExtractNgramsT HyperdataContact
456 where
457 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
458 where
459 extract :: TermType Lang -> HyperdataContact
460 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
461 extract _l hc' = do
462 let authors = map text2ngrams
463 $ maybe ["Nothing"] (\a -> [a])
464 $ view (hc_who . _Just . cw_lastName) hc'
465
466 pure $ HashMap.fromList $ [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
467
468
469 instance ExtractNgramsT HyperdataDocument
470 where
471 extractNgramsT :: TermType Lang
472 -> HyperdataDocument
473 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
474 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
475 where
476 extractNgramsT' :: TermType Lang
477 -> HyperdataDocument
478 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int))
479 extractNgramsT' lang' doc = do
480 let source = text2ngrams
481 $ maybe "Nothing" identity
482 $ _hd_source doc
483
484 institutes = map text2ngrams
485 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
486 $ _hd_institutes doc
487
488 authors = map text2ngrams
489 $ maybe ["Nothing"] (T.splitOn ", ")
490 $ _hd_authors doc
491
492 terms' <- map (enrichedTerms (lang' ^. tt_lang) CoreNLP NP)
493 <$> concat
494 <$> liftBase (extractTerms lang' $ hasText doc)
495
496 pure $ HashMap.fromList
497 $ [(SimpleNgrams source, Map.singleton Sources 1) ]
498 <> [(SimpleNgrams i', Map.singleton Institutes 1) | i' <- institutes ]
499 <> [(SimpleNgrams a', Map.singleton Authors 1) | a' <- authors ]
500 <> [(EnrichedNgrams t', Map.singleton NgramsTerms 1) | t' <- terms' ]
501
502 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
503 where
504 extractNgramsT l (Node _ _ _ _ _ _ _ h) = extractNgramsT l h
505
506 instance HasText a => HasText (Node a)
507 where
508 hasText (Node _ _ _ _ _ _ _ h) = hasText h
509
510
511
512 -- | TODO putelsewhere
513 -- | Upgrade function
514 -- Suppose all documents are English (this is the case actually)
515 indexAllDocumentsWithPosTag :: FlowCmdM env err m
516 => m ()
517 indexAllDocumentsWithPosTag = do
518 rootId <- getRootId (UserName userMaster)
519 corpusIds <- findNodesId rootId [NodeCorpus]
520 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
521 _ <- mapM extractInsert (splitEvery 1000 docs)
522 pure ()
523
524 extractInsert :: FlowCmdM env err m
525 => [Node HyperdataDocument] -> m ()
526 extractInsert docs = do
527 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
528 mapNgramsDocs' <- mapNodeIdNgrams
529 <$> documentIdWithNgrams
530 (extractNgramsT $ withLang (Multi EN) documentsWithId)
531 documentsWithId
532 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
533 pure ()
534
535