module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
( DataText(..)
, getDataText
+ , getDataText_Debug
, flowDataText
, flow
)
where
+import Conduit
import Control.Lens ((^.), view, _Just, makeLenses)
import Data.Aeson.TH (deriveJSON)
+import Data.Conduit.Internal (zipSources)
import Data.Either
import Data.HashMap.Strict (HashMap)
import Data.Hashable (Hashable)
import Data.Traversable (traverse)
import Data.Tuple.Extra (first, second)
import GHC.Generics (Generic)
+import Servant.Client (ClientError)
import System.FilePath (FilePath)
import qualified Data.HashMap.Strict as HashMap
import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
import qualified Data.Map as Map
+import qualified Data.Conduit.List as CL
+import qualified Data.Conduit as C
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
import Gargantext.Core (Lang(..), PosTagAlgo(..))
import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
import Gargantext.Core.Flow.Types
import Gargantext.Core.Text
-import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat)
+import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
import Gargantext.Core.Text.List (buildNgramsLists)
import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
import Gargantext.Core.Text.List.Social (FlowSocialListWith)
import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
import Gargantext.Database.Action.Search (searchDocInDatabase)
import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
+import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
import Gargantext.Database.Admin.Types.Hyperdata
import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
import Gargantext.Database.Prelude
+import Gargantext.Database.Query.Table.ContextNodeNgrams2
import Gargantext.Database.Query.Table.Ngrams
import Gargantext.Database.Query.Table.Node
import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
-import Gargantext.Database.Query.Table.ContextNodeNgrams2
import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
import Gargantext.Database.Types
import Gargantext.Prelude.Crypto.Hash (Hash)
import qualified Gargantext.Core.Text.Corpus.API as API
import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
+import qualified Prelude
------------------------------------------------------------------------
-- Imports for upgrade function
---------------
data DataText = DataOld ![NodeId]
- | DataNew ![[HyperdataDocument]]
+ | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
+ --- | DataNew ![[HyperdataDocument]]
+
+-- Show instance is not possible because of IO
+printDataText :: DataText -> IO ()
+printDataText (DataOld xs) = putStrLn $ show xs
+printDataText (DataNew (maybeInt, conduitData)) = do
+ res <- C.runConduit (conduitData .| CL.consume)
+ putStrLn $ show (maybeInt, res)
-- TODO use the split parameter in config file
getDataText :: FlowCmdM env err m
-> TermType Lang
-> API.Query
-> Maybe API.Limit
- -> m DataText
-getDataText (ExternalOrigin api) la q li = liftBase $ DataNew
- <$> splitEvery 500
- <$> API.get api (_tt_lang la) q li
+ -> m (Either ClientError DataText)
+getDataText (ExternalOrigin api) la q li = liftBase $ do
+ eRes <- API.get api (_tt_lang la) q li
+ pure $ DataNew <$> eRes
getDataText (InternalOrigin _) _la q _li = do
(_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
(Left "")
(Nothing :: Maybe HyperdataCorpus)
ids <- map fst <$> searchDocInDatabase cId (stemIt q)
- pure $ DataOld ids
+ pure $ Right $ DataOld ids
+
+getDataText_Debug :: FlowCmdM env err m
+ => DataOrigin
+ -> TermType Lang
+ -> API.Query
+ -> Maybe API.Limit
+ -> m ()
+getDataText_Debug a l q li = do
+ result <- getDataText a l q li
+ case result of
+ Left err -> liftBase $ putStrLn $ show err
+ Right res -> liftBase $ printDataText res
+
-------------------------------------------------------------------------------
-flowDataText :: ( FlowCmdM env err m
+flowDataText :: forall env err m.
+ ( FlowCmdM env err m
)
=> User
-> DataText
flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
where
corpusType = (Nothing :: Maybe HyperdataCorpus)
-flowDataText u (DataNew txt) tt cid mfslw logStatus = flowCorpus u (Right [cid]) tt mfslw txt logStatus
+flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
+ flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
------------------------------------------------------------------------
-- TODO use proxy
-> (JobLog -> m ())
-> m AnnuaireId
flowAnnuaire u n l filePath logStatus = do
- docs <- liftBase $ (( splitEvery 500 <$> readFile_Annuaire filePath) :: IO [[HyperdataContact]])
- flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing docs logStatus
+ -- TODO Conduit for file
+ docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
+ flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
------------------------------------------------------------------------
flowCorpusFile :: (FlowCmdM env err m)
=> User
-> Either CorpusName [CorpusId]
-> Limit -- Limit the number of docs (for dev purpose)
- -> TermType Lang -> FileFormat -> FilePath
+ -> TermType Lang
+ -> FileType
+ -> FileFormat
+ -> FilePath
-> Maybe FlowSocialListWith
-> (JobLog -> m ())
-> m CorpusId
-flowCorpusFile u n l la ff fp mfslw logStatus = do
- eParsed <- liftBase $ parseFile ff fp
+flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
+ eParsed <- liftBase $ parseFile ft ff fp
case eParsed of
Right parsed -> do
- let docs = splitEvery 500 $ take l parsed
- flowCorpus u n la mfslw (map (map toHyperdataDocument) docs) logStatus
- Left e -> panic $ "Error: " <> (T.pack e)
+ flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
+ --let docs = splitEvery 500 $ take l parsed
+ --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
+ Left e -> panic $ "Error: " <> T.pack e
------------------------------------------------------------------------
-- | TODO improve the needed type to create/update a corpus
-> Either CorpusName [CorpusId]
-> TermType Lang
-> Maybe FlowSocialListWith
- -> [[a]]
+ -> (Maybe Integer, ConduitT () a m ())
-> (JobLog -> m ())
-> m CorpusId
flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
-flow :: ( FlowCmdM env err m
+flow :: forall env err m a c.
+ ( FlowCmdM env err m
, FlowCorpus a
, MkCorpus c
)
-> Either CorpusName [CorpusId]
-> TermType Lang
-> Maybe FlowSocialListWith
- -> [[a]]
+ -> (Maybe Integer, ConduitT () a m ())
-> (JobLog -> m ())
-> m CorpusId
-flow c u cn la mfslw docs logStatus = do
+flow c u cn la mfslw (mLength, docsC) logStatus = do
-- TODO if public insertMasterDocs else insertUserDocs
- ids <- traverse (\(idx, doc) -> do
- id <- insertMasterDocs c la doc
- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
- , _scst_failed = Just 0
- , _scst_remaining = Just $ length docs - idx
- , _scst_events = Just []
- }
- pure id
- ) (zip [1..] docs)
- flowCorpusUser (la ^. tt_lang) u cn c (concat ids) mfslw
-
+ ids <- runConduit $ zipSources (yieldMany [1..]) docsC
+ .| mapMC insertDoc
+ .| sinkList
+-- ids <- traverse (\(idx, doc) -> do
+-- id <- insertMasterDocs c la doc
+-- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
+-- , _scst_failed = Just 0
+-- , _scst_remaining = Just $ length docs - idx
+-- , _scst_events = Just []
+-- }
+-- pure id
+-- ) (zip [1..] docs)
+ flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
+ where
+ insertDoc :: (Integer, a) -> m NodeId
+ insertDoc (idx, doc) = do
+ id <- insertMasterDocs c la [doc]
+ case mLength of
+ Nothing -> pure ()
+ Just len -> do
+ logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
+ , _scst_failed = Just 0
+ , _scst_remaining = Just $ fromIntegral $ len - idx
+ , _scst_events = Just []
+ }
+ pure $ Prelude.head id
+
------------------------------------------------------------------------
-- User Flow
(userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
-- NodeTexts is first
- _tId <- insertDefaultNode NodeTexts userCorpusId userId
+ _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
-- printDebug "NodeTexts: " tId
-- NodeList is second
(masterUserId, _masterRootId, masterCorpusId)
<- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
- --let gp = (GroupParams l 2 3 (StopSize 3))
- let gp = GroupWithPosTag l CoreNLP HashMap.empty
- ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
+ --let gp = (GroupParams l 2 3 (StopSize 3))
+ -- Here the PosTagAlgo should be chosen according to the Lang
+ let gp = GroupWithPosTag l CoreNLP HashMap.empty
+ ngs <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
+
+ -- printDebug "flowCorpusUser:ngs" ngs
_userListId <- flowList_DbRepo listId ngs
_mastListId <- getOrMkList masterCorpusId masterUserId
-- _ <- insertOccsUpdates userCorpusId mastListId
-- printDebug "userListId" userListId
-- User Graph Flow
- _ <- insertDefaultNode NodeDashboard userCorpusId userId
- _ <- insertDefaultNode NodeGraph userCorpusId userId
+ _ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
+ _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
--_ <- mkPhylo userCorpusId userId
-- Annuaire Flow
-- _ <- mkAnnuaire rootUserId userId
+ _ <- updateNgramsOccurrences userCorpusId (Just listId)
+
pure userCorpusId
-> m [DocId]
insertMasterDocs c lang hs = do
(masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
- (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId masterCorpusId) hs )
+ (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
_ <- Doc.add masterCorpusId ids'
-- TODO
-- create a corpus with database name (CSV or PubMed)
-> m ()
saveDocNgramsWith lId mapNgramsDocs' = do
terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
- let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
+ --printDebug "terms2id" terms2id
- -- to be removed
- let indexedNgrams = HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
+ let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
-- new
mapCgramsId <- listInsertDb lId toNodeNgramsW'
$ map (first _ngramsTerms . second Map.keys)
$ HashMap.toList mapNgramsDocs
+ --printDebug "saveDocNgramsWith" mapCgramsId
-- insertDocNgrams
_return <- insertContextNodeNgrams2
$ catMaybes [ ContextNodeNgrams2 <$> Just nId
, (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
, (nId, w) <- Map.toList mapNodeIdWeight
]
+
-- to be removed
- _ <- insertDocNgrams lId indexedNgrams
+ _ <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
pure ()
=> UserId
-> CorpusId
-> [a]
- -> m ([DocId], [Indexed NodeId a])
+ -> m ([ContextId], [Indexed ContextId a])
insertDocs uId cId hs = do
let docs = map addUniqId hs
- newIds <- insertDb uId cId docs
+ newIds <- insertDb uId Nothing docs
-- printDebug "newIds" newIds
let
newIds' = map reId newIds
documentsWithId
_ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
pure ()
-
-