[gargantext.ini] add BACKEND_NAME
[gargantext.git] / src / Gargantext / Database / Action / Flow.hs
index 6cc0f9388b32ad88862b9faa18cb307504dab7c6..80397d1bc0a66013f262bfc34800ea33223c3f6e 100644 (file)
@@ -26,6 +26,7 @@ Portability : POSIX
 module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
   ( DataText(..)
   , getDataText
+  , getDataText_Debug
   , flowDataText
   , flow
 
@@ -46,8 +47,10 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
   )
     where
 
+import Conduit
 import Control.Lens ((^.), view, _Just, makeLenses)
 import Data.Aeson.TH (deriveJSON)
+import Data.Conduit.Internal (zipSources)
 import Data.Either
 import Data.HashMap.Strict (HashMap)
 import Data.Hashable (Hashable)
@@ -60,10 +63,13 @@ import qualified Data.Text as T
 import Data.Traversable (traverse)
 import Data.Tuple.Extra (first, second)
 import GHC.Generics (Generic)
+import Servant.Client (ClientError)
 import System.FilePath (FilePath)
 import qualified Data.HashMap.Strict as HashMap
 import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
 import qualified Data.Map as Map
+import qualified Data.Conduit.List as CL
+import qualified Data.Conduit      as C
 
 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
 import Gargantext.Core (Lang(..), PosTagAlgo(..))
@@ -71,7 +77,7 @@ import Gargantext.Core.Ext.IMT (toSchoolName)
 import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
 import Gargantext.Core.Flow.Types
 import Gargantext.Core.Text
-import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat)
+import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
 import Gargantext.Core.Text.List (buildNgramsLists)
 import Gargantext.Core.Text.List.Group.WithStem ({-StopSize(..),-} GroupParams(..))
 import Gargantext.Core.Text.List.Social (FlowSocialListWith)
@@ -86,15 +92,16 @@ import Gargantext.Database.Action.Flow.Types
 import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, DocumentIdWithNgrams(..))
 import Gargantext.Database.Action.Search (searchDocInDatabase)
 import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
+import Gargantext.Database.Action.Metrics (updateNgramsOccurrences)
 import Gargantext.Database.Admin.Types.Hyperdata
 import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
 import Gargantext.Database.Prelude
+import Gargantext.Database.Query.Table.ContextNodeNgrams2
 import Gargantext.Database.Query.Table.Ngrams
 import Gargantext.Database.Query.Table.Node
 import Gargantext.Database.Query.Table.Node.Document.Insert -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
 import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
 import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
-import Gargantext.Database.Query.Table.ContextNodeNgrams2
 import Gargantext.Database.Query.Tree.Root (getOrMkRoot, getOrMk_RootWithCorpus)
 import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
 import Gargantext.Database.Types
@@ -102,6 +109,7 @@ import Gargantext.Prelude
 import Gargantext.Prelude.Crypto.Hash (Hash)
 import qualified Gargantext.Core.Text.Corpus.API as API
 import qualified Gargantext.Database.Query.Table.Node.Document.Add  as Doc  (add)
+import qualified Prelude
 
 ------------------------------------------------------------------------
 -- Imports for upgrade function
@@ -126,7 +134,15 @@ allDataOrigins = map InternalOrigin API.externalAPIs
 
 ---------------
 data DataText = DataOld ![NodeId]
-              | DataNew ![[HyperdataDocument]]
+              | DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
+              --- | DataNew ![[HyperdataDocument]]
+
+-- Show instance is not possible because of IO
+printDataText :: DataText -> IO ()
+printDataText (DataOld xs) = putStrLn $ show xs
+printDataText (DataNew (maybeInt, conduitData)) = do
+  res <- C.runConduit (conduitData .| CL.consume)
+  putStrLn $ show (maybeInt, res)
 
 -- TODO use the split parameter in config file
 getDataText :: FlowCmdM env err m
@@ -134,10 +150,10 @@ getDataText :: FlowCmdM env err m
             -> TermType Lang
             -> API.Query
             -> Maybe API.Limit
-            -> m DataText
-getDataText (ExternalOrigin api) la q li = liftBase $ DataNew
-                                  <$> splitEvery 500
-                                  <$> API.get api (_tt_lang la) q li
+            -> m (Either ClientError DataText)
+getDataText (ExternalOrigin api) la q li = liftBase $ do
+  eRes <- API.get api (_tt_lang la) q li
+  pure $ DataNew <$> eRes
 
 getDataText (InternalOrigin _) _la q _li = do
   (_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
@@ -145,10 +161,24 @@ getDataText (InternalOrigin _) _la q _li = do
                                            (Left "")
                                            (Nothing :: Maybe HyperdataCorpus)
   ids <-  map fst <$> searchDocInDatabase cId (stemIt q)
-  pure $ DataOld ids
+  pure $ Right $ DataOld ids
+
+getDataText_Debug :: FlowCmdM env err m
+            => DataOrigin
+            -> TermType Lang
+            -> API.Query
+            -> Maybe API.Limit
+            -> m ()
+getDataText_Debug a l q li = do
+  result <- getDataText a l q li
+  case result of
+    Left  err -> liftBase $ putStrLn $ show err
+    Right res -> liftBase $ printDataText res
+
 
 -------------------------------------------------------------------------------
-flowDataText :: ( FlowCmdM env err m
+flowDataText :: forall env err m.
+                ( FlowCmdM env err m
                 )
                 => User
                 -> DataText
@@ -160,7 +190,8 @@ flowDataText :: ( FlowCmdM env err m
 flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
   where
     corpusType = (Nothing :: Maybe HyperdataCorpus)
-flowDataText u (DataNew txt) tt cid mfslw logStatus = flowCorpus u (Right [cid]) tt mfslw txt logStatus
+flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
+  flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
 
 ------------------------------------------------------------------------
 -- TODO use proxy
@@ -172,25 +203,30 @@ flowAnnuaire :: (FlowCmdM env err m)
              -> (JobLog -> m ())
              -> m AnnuaireId
 flowAnnuaire u n l filePath logStatus = do
-  docs <- liftBase $ (( splitEvery 500 <$> readFile_Annuaire filePath) :: IO [[HyperdataContact]])
-  flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing docs logStatus
+  -- TODO Conduit for file
+  docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
+  flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
 
 ------------------------------------------------------------------------
 flowCorpusFile :: (FlowCmdM env err m)
            => User
            -> Either CorpusName [CorpusId]
            -> Limit -- Limit the number of docs (for dev purpose)
-           -> TermType Lang -> FileFormat -> FilePath
+           -> TermType Lang
+           -> FileType
+           -> FileFormat
+           -> FilePath
            -> Maybe FlowSocialListWith
            -> (JobLog -> m ())
            -> m CorpusId
-flowCorpusFile u n l la ff fp mfslw logStatus = do
-  eParsed <- liftBase $ parseFile ff fp
+flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
+  eParsed <- liftBase $ parseFile ft ff fp
   case eParsed of
     Right parsed -> do
-      let docs = splitEvery 500 $ take l parsed
-      flowCorpus u n la mfslw (map (map toHyperdataDocument) docs) logStatus
-    Left e       -> panic $ "Error: " <> (T.pack e)
+      flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
+      --let docs = splitEvery 500 $ take l parsed
+      --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
+    Left e       -> panic $ "Error: " <> T.pack e
 
 ------------------------------------------------------------------------
 -- | TODO improve the needed type to create/update a corpus
@@ -200,13 +236,14 @@ flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
            -> Either CorpusName [CorpusId]
            -> TermType Lang
            -> Maybe FlowSocialListWith
-           -> [[a]]
+           -> (Maybe Integer, ConduitT () a m ())
            -> (JobLog -> m ())
            -> m CorpusId
 flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
 
 
-flow :: ( FlowCmdM env err m
+flow :: forall env err m a c.
+        ( FlowCmdM env err m
         , FlowCorpus a
         , MkCorpus c
         )
@@ -215,23 +252,39 @@ flow :: ( FlowCmdM env err m
         -> Either CorpusName [CorpusId]
         -> TermType Lang
         -> Maybe FlowSocialListWith
-        -> [[a]]
+        -> (Maybe Integer, ConduitT () a m ())
         -> (JobLog -> m ())
         -> m CorpusId
-flow c u cn la mfslw docs logStatus = do
+flow c u cn la mfslw (mLength, docsC) logStatus = do
   -- TODO if public insertMasterDocs else insertUserDocs
-  ids <- traverse (\(idx, doc) -> do
-                      id <- insertMasterDocs c la doc
-                      logStatus JobLog { _scst_succeeded = Just $ 1 + idx
-                                       , _scst_failed    = Just 0
-                                       , _scst_remaining = Just $ length docs - idx
-                                       , _scst_events    = Just []
-                                       }
-                      pure id
-                  ) (zip [1..] docs)
-  flowCorpusUser (la ^. tt_lang) u cn c (concat ids) mfslw
-
+  ids <- runConduit $ zipSources (yieldMany [1..]) docsC
+                   .| mapMC insertDoc
+                   .| sinkList
+--  ids <- traverse (\(idx, doc) -> do
+--                      id <- insertMasterDocs c la doc
+--                      logStatus JobLog { _scst_succeeded = Just $ 1 + idx
+--                                       , _scst_failed    = Just 0
+--                                       , _scst_remaining = Just $ length docs - idx
+--                                       , _scst_events    = Just []
+--                                       }
+--                      pure id
+--                  ) (zip [1..] docs)
+  flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
 
+  where
+    insertDoc :: (Integer, a) -> m NodeId
+    insertDoc (idx, doc) = do
+      id <- insertMasterDocs c la [doc]
+      case mLength of
+        Nothing -> pure ()
+        Just len -> do
+          logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
+                           , _scst_failed    = Just 0
+                           , _scst_remaining = Just $ fromIntegral $ len - idx
+                           , _scst_events    = Just []
+                           }
+      pure $ Prelude.head id
+      
 
 
 ------------------------------------------------------------------------
@@ -249,7 +302,7 @@ flowCorpusUser l user corpusName ctype ids mfslw = do
   -- User Flow
   (userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
   -- NodeTexts is first
-  _tId <- insertDefaultNode NodeTexts userCorpusId userId
+  _tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
   -- printDebug "NodeTexts: " tId
 
   -- NodeList is second
@@ -264,20 +317,25 @@ flowCorpusUser l user corpusName ctype ids mfslw = do
   (masterUserId, _masterRootId, masterCorpusId)
     <- getOrMk_RootWithCorpus (UserName userMaster) (Left "") ctype
 
-  --let gp = (GroupParams l 2 3 (StopSize 3)) 
-  let gp = GroupWithPosTag l CoreNLP HashMap.empty 
-  ngs         <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
+  --let gp = (GroupParams l 2 3 (StopSize 3))
+  -- Here the PosTagAlgo should be chosen according to the Lang
+  let gp = GroupWithPosTag l CoreNLP HashMap.empty
+  ngs    <- buildNgramsLists user userCorpusId masterCorpusId mfslw gp
+
+  -- printDebug "flowCorpusUser:ngs" ngs
 
   _userListId <- flowList_DbRepo listId ngs
   _mastListId <- getOrMkList masterCorpusId masterUserId
   -- _ <- insertOccsUpdates userCorpusId mastListId
   -- printDebug "userListId" userListId
   -- User Graph Flow
-  _ <- insertDefaultNode NodeDashboard userCorpusId userId
-  _ <- insertDefaultNode NodeGraph     userCorpusId userId
+  _ <- insertDefaultNodeIfNotExists NodeGraph     userCorpusId userId
+  _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
   --_ <- mkPhylo  userCorpusId userId
   -- Annuaire Flow
   -- _ <- mkAnnuaire  rootUserId userId
+  _ <- updateNgramsOccurrences userCorpusId (Just listId)
+
   pure userCorpusId
 
 
@@ -291,7 +349,7 @@ insertMasterDocs :: ( FlowCmdM env err m
                  -> m [DocId]
 insertMasterDocs c lang hs  =  do
   (masterUserId, _, masterCorpusId) <- getOrMk_RootWithCorpus (UserName userMaster) (Left corpusMasterName) c
-  (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId masterCorpusId) hs )
+  (ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
   _ <- Doc.add masterCorpusId ids'
   -- TODO
   -- create a corpus with database name (CSV or PubMed)
@@ -317,16 +375,16 @@ saveDocNgramsWith :: ( FlowCmdM env err m)
                   -> m ()
 saveDocNgramsWith lId mapNgramsDocs' = do
   terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
-  let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
+  --printDebug "terms2id" terms2id
 
-  -- to be removed
-  let indexedNgrams = HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
+  let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
 
   -- new
   mapCgramsId <- listInsertDb lId toNodeNgramsW'
                $ map (first _ngramsTerms . second Map.keys)
                $ HashMap.toList mapNgramsDocs
 
+  --printDebug "saveDocNgramsWith" mapCgramsId
   -- insertDocNgrams
   _return <- insertContextNodeNgrams2
            $ catMaybes [ ContextNodeNgrams2 <$> Just nId
@@ -336,8 +394,9 @@ saveDocNgramsWith lId mapNgramsDocs' = do
                        , (ngrams_type, mapNodeIdWeight) <- Map.toList mapNgramsTypes
                        , (nId, w)                       <- Map.toList mapNodeIdWeight
                        ]
+
   -- to be removed
-  _   <- insertDocNgrams lId indexedNgrams
+  _   <- insertDocNgrams lId $ HashMap.mapKeys (indexNgrams terms2id) mapNgramsDocs
 
   pure ()
 
@@ -351,10 +410,10 @@ insertDocs :: ( FlowCmdM env err m
               => UserId
               -> CorpusId
               -> [a]
-              -> m ([DocId], [Indexed NodeId a])
+              -> m ([ContextId], [Indexed ContextId a])
 insertDocs uId cId hs = do
   let docs = map addUniqId hs
-  newIds <- insertDb uId cId docs
+  newIds <- insertDb uId Nothing docs
   -- printDebug "newIds" newIds
   let
     newIds' = map reId newIds
@@ -498,5 +557,3 @@ extractInsert docs = do
                     documentsWithId
   _ <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
   pure ()
-
-