]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Database/Action/Flow.hs
Merge remote-tracking branch 'origin/jobqueue-test-delay' into dev
[gargantext.git] / src / Gargantext / Database / Action / Flow.hs
1 {-|
2 Module : Gargantext.Database.Flow
3 Description : Database Flow
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 -- TODO-ACCESS:
11 -- check userId CanFillUserCorpus userCorpusId
12 -- check masterUserId CanFillMasterCorpus masterCorpusId
13
14 -- TODO-ACCESS: check uId CanInsertDoc pId && checkDocType nodeType
15 -- TODO-EVENTS: InsertedNodes
16 -}
17
18 {-# OPTIONS_GHC -fno-warn-orphans #-}
19
20 {-# LANGUAGE ConstrainedClassMethods #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE InstanceSigs #-}
23 {-# LANGUAGE ScopedTypeVariables #-}
24 {-# LANGUAGE TemplateHaskell #-}
25
26 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
27 ( DataText(..)
28 , getDataText
29 , getDataText_Debug
30 , flowDataText
31 , flow
32
33 , flowCorpusFile
34 , flowCorpus
35 , flowCorpusUser
36 , flowAnnuaire
37 , insertMasterDocs
38 , saveDocNgramsWith
39
40 , getOrMkRoot
41 , getOrMk_RootWithCorpus
42 , TermType(..)
43 , DataOrigin(..)
44 , allDataOrigins
45
46 , do_api
47 , indexAllDocumentsWithPosTag
48 )
49 where
50
51 import Conduit
52 import Control.Lens ((^.), view, _Just, makeLenses, over, traverse)
53 import Data.Aeson.TH (deriveJSON)
54 import Data.Conduit.Internal (zipSources)
55 import Data.Either
56 import Data.HashMap.Strict (HashMap)
57 import Data.Hashable (Hashable)
58 import Data.List (concat)
59 import Data.Map (Map, lookup)
60 import Data.Maybe (catMaybes)
61 import Data.Monoid
62 import Data.Swagger
63 import qualified Data.Text as T
64 import Data.Tuple.Extra (first, second)
65 import GHC.Generics (Generic)
66 import Servant.Client (ClientError)
67 import System.FilePath (FilePath)
68 import qualified Data.HashMap.Strict as HashMap
69 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
70 import qualified Data.Map as Map
71 import qualified Data.Conduit.List as CL
72 import qualified Data.Conduit as C
73
74 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
75 import Gargantext.Core (Lang(..), PosTagAlgo(..))
76 import Gargantext.Core.Ext.IMT (toSchoolName)
77 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
78 import Gargantext.Core.Flow.Types
79 import Gargantext.Core.Text
80 import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
81 import Gargantext.Core.Text.List (buildNgramsLists)
82 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
83 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
84 import Gargantext.Core.Text.Terms
85 import Gargantext.Core.Text.Terms.Mono.Stem.En (stemIt)
86 import Gargantext.Core.Types (POS(NP), TermsCount)
87 import Gargantext.Core.Types.Individu (User(..))
88 import Gargantext.Core.Types.Main
89 import Gargantext.Core.Utils (addTuples)
90 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
91 import Gargantext.Database.Action.Flow.List
92 import Gargantext.Database.Action.Flow.Types
93 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
94 import Gargantext.Database.Action.Search (searchDocInDatabase)
95 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
96 import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
97 import Gargantext.Database.Admin.Types.Hyperdata
98 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
99 import Gargantext.Database.Prelude
100 import Gargantext.Database.Query.Table.ContextNodeNgrams2
101 import Gargantext.Database.Query.Table.Ngrams
102 import Gargantext.Database.Query.Table.Node
103 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
104 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
105 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
106 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
107 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
108 import Gargantext.Database.Types
109 import Gargantext.Prelude
110 import Gargantext.Prelude.Crypto.Hash (Hash)
111 import qualified Gargantext.Core.Text.Corpus.API as API
112 import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
113 import qualified Prelude
114
115 ------------------------------------------------------------------------
116 -- Imports for upgrade function
117 import Gargantext.Database.Query.Tree.Root (getRootId)
118 import Gargantext.Database.Query.Tree (findNodesId)
119 import qualified Data.List as List
120 ------------------------------------------------------------------------
121 -- TODO use internal with API name (could be old data)
122 data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
123 | ExternalOrigin { _do_api :: API.ExternalAPIs }
124 -- TODO Web
125 deriving (Generic, Eq)
126
127 makeLenses ''DataOrigin
128 deriveJSON (unPrefix "_do_") ''DataOrigin
129 instance ToSchema DataOrigin where
130 declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
131
132 allDataOrigins :: [DataOrigin]
133 allDataOrigins = map InternalOrigin API.externalAPIs
134 <> map ExternalOrigin API.externalAPIs
135
136 ---------------
137 data DataText = DataOld ![NodeId]
138 | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
139 --- | DataNew ![[HyperdataDocument]]
140
141 -- Show instance is not possible because of IO
142 printDataText :: DataText -> IO ()
143 printDataText (DataOld xs) = putStrLn $ show xs
144 printDataText (DataNew (maybeInt, conduitData)) = do
145 res <- C.runConduit (conduitData .| CL.consume)
146 putStrLn $ show (maybeInt, res)
147
148 -- TODO use the split parameter in config file
149 getDataText :: FlowCmdM env err m
150 => DataOrigin
151 -> TermType Lang
152 -> API.Query
153 -> Maybe API.Limit
154 -> m (Either ClientError DataText)
155 getDataText (ExternalOrigin api) la q li = liftBase $ do
156 eRes <- API.get api (_tt_lang la) q li
157 pure $ DataNew <$> eRes
158
159 getDataText (InternalOrigin _) _la q _li = do
160 (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
161 (UserName userMaster)
162 (Left "")
163 (Nothing :: Maybe HyperdataCorpus)
164 ids <- map fst <$> searchDocInDatabase cId (stemIt q)
165 pure $ Right $ DataOld ids
166
167 getDataText_Debug :: FlowCmdM env err m
168 => DataOrigin
169 -> TermType Lang
170 -> API.Query
171 -> Maybe API.Limit
172 -> m ()
173 getDataText_Debug a l q li = do
174 result <- getDataText a l q li
175 case result of
176 Left err -> liftBase $ putStrLn $ show err
177 Right res -> liftBase $ printDataText res
178
179
180 -------------------------------------------------------------------------------
181 flowDataText :: forall env err m.
182 ( FlowCmdM env err m
183 )
184 => User
185 -> DataText
186 -> TermType Lang
187 -> CorpusId
188 -> Maybe FlowSocialListWith
189 -> (JobLog -> m ())
190 -> m CorpusId
191 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
192 where
193 corpusType = (Nothing :: Maybe HyperdataCorpus)
194 flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
195 flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
196
197 ------------------------------------------------------------------------
198 -- TODO use proxy
199 flowAnnuaire :: (FlowCmdM env err m)
200 => User
201 -> Either CorpusName [CorpusId]
202 -> (TermType Lang)
203 -> FilePath
204 -> (JobLog -> m ())
205 -> m AnnuaireId
206 flowAnnuaire u n l filePath logStatus = do
207 -- TODO Conduit for file
208 docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
209 flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
210
211 ------------------------------------------------------------------------
212 flowCorpusFile :: (FlowCmdM env err m)
213 => User
214 -> Either CorpusName [CorpusId]
215 -> Limit -- Limit the number of docs (for dev purpose)
216 -> TermType Lang
217 -> FileType
218 -> FileFormat
219 -> FilePath
220 -> Maybe FlowSocialListWith
221 -> (JobLog -> m ())
222 -> m CorpusId
223 flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
224 eParsed <- liftBase $ parseFile ft ff fp
225 case eParsed of
226 Right parsed -> do
227 flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
228 --let docs = splitEvery 500 $ take l parsed
229 --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
230 Left e -> panic $ "Error: " <> T.pack e
231
232 ------------------------------------------------------------------------
233 -- | TODO improve the needed type to create/update a corpus
234 -- (For now, Either is enough)
235 flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
236 => User
237 -> Either CorpusName [CorpusId]
238 -> TermType Lang
239 -> Maybe FlowSocialListWith
240 -> (Maybe Integer, ConduitT () a m ())
241 -> (JobLog -> m ())
242 -> m CorpusId
243 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
244
245
246 flow :: forall env err m a c.
247 ( FlowCmdM env err m
248 , FlowCorpus a
249 , MkCorpus c
250 )
251 => Maybe c
252 -> User
253 -> Either CorpusName [CorpusId]
254 -> TermType Lang
255 -> Maybe FlowSocialListWith
256 -> (Maybe Integer, ConduitT () a m ())
257 -> (JobLog -> m ())
258 -> m CorpusId
259 flow c u cn la mfslw (mLength, docsC) logStatus = do
260 -- TODO if public insertMasterDocs else insertUserDocs
261 ids <- runConduit $ zipSources (yieldMany [1..]) docsC
262 .| mapMC insertDoc
263 .| sinkList
264 -- ids <- traverse (\(idx, doc) -> do
265 -- id <- insertMasterDocs c la doc
266 -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
267 -- , _scst_failed = Just 0
268 -- , _scst_remaining = Just $ length docs - idx
269 -- , _scst_events = Just []
270 -- }
271 -- pure id
272 -- ) (zip [1..] docs)
273 flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
274
275 where
276 insertDoc :: (Integer, a) -> m NodeId
277 insertDoc (idx, doc) = do
278 id <- insertMasterDocs c la [doc]
279 case mLength of
280 Nothing -> pure ()
281 Just len -> do
282 logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
283 , _scst_failed = Just 0
284 , _scst_remaining = Just $ fromIntegral $ len - idx
285 , _scst_events = Just []
286 }
287 pure $ Prelude.head id
288
289
290
291 ------------------------------------------------------------------------
292 flowCorpusUser :: ( FlowCmdM env err m
293 , MkCorpus c
294 )
295 => Lang
296 -> User
297 -> Either CorpusName [CorpusId]
298 -> Maybe c
299 -> [ContextId]
300 -> Maybe FlowSocialListWith
301 -> m CorpusId
302 flowCorpusUser l user corpusName ctype ids mfslw = do
303 -- User Flow
304 (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
305 -- NodeTexts is first
306 _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
307 -- printDebug "NodeTexts: " tId
308
309 -- NodeList is second
310 listId <- getOrMkList userCorpusId userId
311 -- _cooc <- insertDefaultNode NodeListCooc listId userId
312 -- TODO: check if present already, ignore
313 _ <- Doc.add userCorpusId ids
314
315 -- printDebug "Node Text Ids:" tId
316
317 -- User List Flow
318 (masterUserId, _masterRootId, masterCorpusId)
319 <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
320
321 --let gp = (GroupParams l 2 3 (StopSize 3))
322 -- Here the PosTagAlgo should be chosen according to the Lang
323 let gp = GroupWithPosTag l CoreNLP HashMap.empty
324 ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
325
326 -- printDebug "flowCorpusUser:ngs" ngs
327
328 _userListId <- flowList_DbRepo listId ngs
329 _mastListId <- getOrMkList masterCorpusId masterUserId
330 -- _ <- insertOccsUpdates userCorpusId mastListId
331 -- printDebug "userListId" userListId
332 -- User Graph Flow
333 _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
334 _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
335 --_ <- mkPhylo userCorpusId userId
336 -- Annuaire Flow
337 -- _ <- mkAnnuaire rootUserId userId
338 _ <- updateNgramsOccurrences userCorpusId (Just listId)
339
340 pure userCorpusId
341
342
343 insertMasterDocs :: ( FlowCmdM env err m
344 , FlowCorpus a
345 , MkCorpus c
346 )
347 => Maybe c
348 -> TermType Lang
349 -> [a]
350 -> m [DocId]
351 insertMasterDocs c lang hs = do
352 (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
353 (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
354 _ <- Doc.add masterCorpusId ids'
355 -- TODO
356 -- create a corpus with database name (CSV or PubMed)
357 -- add documents to the corpus (create node_node link)
358 -- this will enable global database monitoring
359
360 -- maps :: IO Map Ngrams (Map NgramsType (Map NodeId Int))
361 mapNgramsDocs' :: HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
362 <- mapNodeIdNgrams
363 <$> documentIdWithNgrams
364 (extractNgramsT $ withLang lang documentsWithId)
365 documentsWithId
366
367 lId <- getOrMkList masterCorpusId masterUserId
368 -- _ <- saveDocNgramsWith lId mapNgramsDocs'
369 _ <- saveDocNgramsWith lId mapNgramsDocs'
370
371 -- _cooc <- insertDefaultNode NodeListCooc lId masterUserId
372 pure ids'
373
374 saveDocNgramsWith :: (FlowCmdM env err m)
375 => ListId
376 -> HashMap ExtractedNgrams (Map NgramsType (Map NodeId (Int, TermsCount)))
377 -> m ()
378 saveDocNgramsWith lId mapNgramsDocs' = do
379 --printDebug "[saveDocNgramsWith] mapNgramsDocs'" mapNgramsDocs'
380 let mapNgramsDocsNoCount = over (traverse . traverse . traverse) fst mapNgramsDocs'
381 terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocsNoCount
382
383 let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
384
385 -- new
386 mapCgramsId <- listInsertDb lId toNodeNgramsW'
387 $ map (first _ngramsTerms . second Map.keys)
388 $ HashMap.toList mapNgramsDocs
389
390 --printDebug "saveDocNgramsWith" mapCgramsId
391 -- insertDocNgrams
392 _return <- insertContextNodeNgrams2
393 $ catMaybes [ ContextNodeNgrams2 <$> Just nId
394 <*> (getCgramsId mapCgramsId ngrams_type (_ngramsTerms terms''))
395 <*> Just (fromIntegral w :: Double)
396 | (terms'', mapNgramsTypes) <- HashMap.toList mapNgramsDocs
397 , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
398 , (nId, (w, _cnt)) <- Map.toList mapNodeIdWeight
399 ]
400
401 -- to be removed
402 _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
403
404 pure ()
405
406
407 ------------------------------------------------------------------------
408 -- TODO Type NodeDocumentUnicised
409 insertDocs :: ( FlowCmdM env err m
410 -- , FlowCorpus a
411 , FlowInsertDB a
412 )
413 => UserId
414 -> CorpusId
415 -> [a]
416 -> m ([ContextId], [Indexed ContextId a])
417 insertDocs uId cId hs = do
418 let docs = map addUniqId hs
419 newIds <- insertDb uId Nothing docs
420 -- printDebug "newIds" newIds
421 let
422 newIds' = map reId newIds
423 documentsWithId = mergeData (toInserted newIds) (Map.fromList $ map viewUniqId' docs)
424 _ <- Doc.add cId newIds'
425 pure (newIds', documentsWithId)
426
427
428 ------------------------------------------------------------------------
429 viewUniqId' :: UniqId a
430 => a
431 -> (Hash, a)
432 viewUniqId' d = maybe err (\h -> (h,d)) (view uniqId d)
433 where
434 err = panic "[ERROR] Database.Flow.toInsert"
435
436
437 toInserted :: [ReturnId]
438 -> Map Hash ReturnId
439 toInserted =
440 Map.fromList . map (\r -> (reUniqId r, r) )
441 . filter (\r -> reInserted r == True)
442
443 mergeData :: Map Hash ReturnId
444 -> Map Hash a
445 -> [Indexed NodeId a]
446 mergeData rs = catMaybes . map toDocumentWithId . Map.toList
447 where
448 toDocumentWithId (sha,hpd) =
449 Indexed <$> fmap reId (lookup sha rs)
450 <*> Just hpd
451
452 ------------------------------------------------------------------------
453 ------------------------------------------------------------------------
454 ------------------------------------------------------------------------
455 documentIdWithNgrams :: HasNodeError err
456 => (a
457 -> Cmd err (HashMap b (Map NgramsType Int, TermsCount)))
458 -> [Indexed NodeId a]
459 -> Cmd err [DocumentIdWithNgrams a b]
460 documentIdWithNgrams f = traverse toDocumentIdWithNgrams
461 where
462 toDocumentIdWithNgrams d = do
463 e <- f $ _unIndex d
464 pure $ DocumentIdWithNgrams d e
465
466
467 -- | TODO check optimization
468 mapNodeIdNgrams :: (Ord b, Hashable b)
469 => [DocumentIdWithNgrams a b]
470 -> HashMap b
471 (Map NgramsType
472 (Map NodeId (Int, TermsCount))
473 )
474 mapNodeIdNgrams = HashMap.unionsWith (Map.unionWith (Map.unionWith addTuples)) . fmap f
475 where
476 -- | NOTE We are somehow multiplying 'TermsCount' here: If the
477 -- same ngrams term has different ngrams types, the 'TermsCount'
478 -- for it (which is the number of times the terms appears in a
479 -- document) is copied over to all its types.
480 f :: DocumentIdWithNgrams a b
481 -> HashMap b (Map NgramsType (Map NodeId (Int, TermsCount)))
482 f d = fmap (\(ngramsTypeMap, cnt) -> fmap (\i -> Map.singleton nId (i, cnt)) ngramsTypeMap) $ documentNgrams d
483 where
484 nId = _index $ documentWithId d
485
486
487 ------------------------------------------------------------------------
488 instance ExtractNgramsT HyperdataContact
489 where
490 extractNgramsT l hc = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extract l hc
491 where
492 extract :: TermType Lang -> HyperdataContact
493 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
494 extract _l hc' = do
495 let authors = map text2ngrams
496 $ maybe ["Nothing"] (\a -> [a])
497 $ view (hc_who . _Just . cw_lastName) hc'
498
499 pure $ HashMap.fromList $ [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
500
501
502 instance ExtractNgramsT HyperdataDocument
503 where
504 extractNgramsT :: TermType Lang
505 -> HyperdataDocument
506 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
507 extractNgramsT lang hd = HashMap.mapKeys (cleanExtractedNgrams 255) <$> extractNgramsT' lang hd
508 where
509 extractNgramsT' :: TermType Lang
510 -> HyperdataDocument
511 -> Cmd err (HashMap ExtractedNgrams (Map NgramsType Int, TermsCount))
512 extractNgramsT' lang' doc = do
513 let source = text2ngrams
514 $ maybe "Nothing" identity
515 $ _hd_source doc
516
517 institutes = map text2ngrams
518 $ maybe ["Nothing"] (map toSchoolName . (T.splitOn ", "))
519 $ _hd_institutes doc
520
521 authors = map text2ngrams
522 $ maybe ["Nothing"] (T.splitOn ", ")
523 $ _hd_authors doc
524
525 termsWithCounts' <- map (\(t, cnt) -> (enrichedTerms (lang' ^. tt_lang) CoreNLP NP t, cnt))
526 <$> concat
527 <$> liftBase (extractTerms lang' $ hasText doc)
528
529 pure $ HashMap.fromList
530 $ [(SimpleNgrams source, (Map.singleton Sources 1, 1)) ]
531 <> [(SimpleNgrams i', (Map.singleton Institutes 1, 1)) | i' <- institutes ]
532 <> [(SimpleNgrams a', (Map.singleton Authors 1, 1)) | a' <- authors ]
533 <> [(EnrichedNgrams t', (Map.singleton NgramsTerms 1, cnt')) | (t', cnt') <- termsWithCounts' ]
534
535 instance (ExtractNgramsT a, HasText a) => ExtractNgramsT (Node a)
536 where
537 extractNgramsT l (Node { _node_hyperdata = h }) = extractNgramsT l h
538
539 instance HasText a => HasText (Node a)
540 where
541 hasText (Node { _node_hyperdata = h }) = hasText h
542
543
544
545 -- | TODO putelsewhere
546 -- | Upgrade function
547 -- Suppose all documents are English (this is the case actually)
548 indexAllDocumentsWithPosTag :: FlowCmdM env err m
549 => m ()
550 indexAllDocumentsWithPosTag = do
551 rootId <- getRootId (UserName userMaster)
552 corpusIds <- findNodesId rootId [NodeCorpus]
553 docs <- List.concat <$> mapM getDocumentsWithParentId corpusIds
554 _ <- mapM extractInsert (splitEvery 1000 docs)
555 pure ()
556
557 extractInsert :: FlowCmdM env err m
558 => [Node HyperdataDocument] -> m ()
559 extractInsert docs = do
560 let documentsWithId = map (\doc -> Indexed (doc ^. node_id) doc) docs
561 mapNgramsDocs' <- mapNodeIdNgrams
562 <$> documentIdWithNgrams
563 (extractNgramsT $ withLang (Multi EN) documentsWithId)
564 documentsWithId
565 _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
566 pure ()