[GQL] Add team leader to query
[gargantext.git] / src / Gargantext / API / Node / Corpus / New.hs
index d0aee0e2924a1455eb5c640de3ddfecea7fcea46..3268aace491d12a284c6e9f5e39e9a0b6c2a88a4 100644 (file)
@@ -18,10 +18,12 @@ New corpus means either:
 module Gargantext.API.Node.Corpus.New
       where
 
+import Conduit
 import Control.Lens hiding (elements, Empty)
 import Data.Aeson
 import Data.Aeson.TH (deriveJSON)
 import qualified Data.ByteString.Base64 as BSB64
+import Data.Conduit.Internal (zipSources)
 import Data.Either
 import Data.Maybe (fromMaybe)
 import Data.Swagger
@@ -35,19 +37,15 @@ import qualified Data.Text.Encoding as TE
 -- import Test.QuickCheck (elements)
 import Test.QuickCheck.Arbitrary
 
-import Gargantext.Prelude
-
 import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs, ScraperEvent(..), scst_events)
 import Gargantext.API.Admin.Types (HasSettings)
-import Gargantext.API.Job (jobLogSuccess, jobLogFailTotal, jobLogFailTotalWithMessage)
-import Gargantext.API.Node.Corpus.New.File
+import Gargantext.API.Job (addEvent, jobLogSuccess, jobLogFailTotal)
+import Gargantext.API.Node.Corpus.New.Types
 import Gargantext.API.Node.Corpus.Searx
 import Gargantext.API.Node.Corpus.Types
 import Gargantext.API.Node.Types
 import Gargantext.Core (Lang(..){-, allLangs-})
 import Gargantext.Core.Text.List.Social (FlowSocialListWith(..))
-import qualified Gargantext.Core.Text.Corpus.API as API
-import qualified Gargantext.Core.Text.Corpus.Parsers as Parser (FileFormat(..), parseFormat)
 import Gargantext.Core.Types.Individu (User(..))
 import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
 import Gargantext.Database.Action.Flow (flowCorpus, getDataText, flowDataText, TermType(..){-, allDataOrigins-})
@@ -61,8 +59,11 @@ import Gargantext.Database.Prelude (hasConfig)
 import Gargantext.Database.Query.Table.Node (getNodeWith)
 import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
 import Gargantext.Database.Schema.Node (node_hyperdata)
-import qualified Gargantext.Database.GargDB as GargDB
+import Gargantext.Prelude
 import Gargantext.Prelude.Config (gc_max_docs_parsers)
+import qualified Gargantext.Core.Text.Corpus.API as API
+import qualified Gargantext.Core.Text.Corpus.Parsers as Parser (FileType(..), parseFormatC)
+import qualified Gargantext.Database.GargDB as GargDB
 ------------------------------------------------------------------------
 {-
 data Query = Query { query_query      :: Text
@@ -174,6 +175,8 @@ type AddWithFile = Summary "Add with MultipartData to corpus endpoint"
 
 ------------------------------------------------------------------------
 -- TODO WithQuery also has a corpus id
+
+
 addToCorpusWithQuery :: FlowCmdM env err m
                        => User
                        -> CorpusId
@@ -200,7 +203,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
     Just Web -> do
       printDebug "[addToCorpusWithQuery] processing web request" datafield
 
-      _ <- triggerSearxSearch cid q l
+      _ <- triggerSearxSearch user cid q l logStatus
 
       pure JobLog { _scst_succeeded = Just 3
                   , _scst_failed    = Just 0
@@ -213,24 +216,43 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
       -- TODO if cid is folder -> create Corpus
       --      if cid is corpus -> add to corpus
       --      if cid is root   -> create corpus in Private
-      txts <- mapM (\db -> getDataText db (Multi l) q maybeLimit) [database2origin dbs]
-  
-      logStatus JobLog { _scst_succeeded = Just 2
-                       , _scst_failed    = Just 0
-                       , _scst_remaining = Just $ 1 + length txts
+      printDebug "[G.A.N.C.New] getDataText with query" q
+      eTxts <- mapM (\db -> getDataText db (Multi l) q maybeLimit) [database2origin dbs]
+
+      let lTxts = lefts eTxts
+      printDebug "[G.A.N.C.New] lTxts" lTxts
+      case lTxts of
+        [] -> do
+          let txts = rights eTxts
+          -- TODO Sum lenghts of each txt elements
+          logStatus $ JobLog { _scst_succeeded = Just 2
+                             , _scst_failed    = Just 0
+                             , _scst_remaining = Just $ 1 + length txts
+                             , _scst_events    = Just []
+                             }
+
+          cids <- mapM (\txt -> do
+                           flowDataText user txt (Multi l) cid Nothing logStatus) txts
+          printDebug "corpus id" cids
+          printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
+          sendMail user
+          -- TODO ...
+          pure JobLog { _scst_succeeded = Just 3
+                      , _scst_failed    = Just 0
+                      , _scst_remaining = Just 0
+                      , _scst_events    = Just []
+                      }
+        
+        (err:_) -> do
+          printDebug "Error: " err
+          let jl = addEvent "ERROR" (T.pack $ show err) $
+                JobLog { _scst_succeeded = Just 2
+                       , _scst_failed    = Just 1
+                       , _scst_remaining = Just 0
                        , _scst_events    = Just []
                        }
-
-      cids <- mapM (\txt -> flowDataText user txt (Multi l) cid Nothing logStatus) txts
-      printDebug "corpus id" cids
-      printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
-      sendMail user
-      -- TODO ...
-      pure JobLog { _scst_succeeded = Just 3
-                  , _scst_failed    = Just 0
-                  , _scst_remaining = Just 0
-                  , _scst_events    = Just []
-                  }
+          logStatus jl
+          pure jl
 
 
 type AddWithForm = Summary "Add with FormUrlEncoded to corpus endpoint"
@@ -248,63 +270,70 @@ addToCorpusWithForm :: (FlowCmdM env err m)
                     -> (JobLog -> m ())
                     -> JobLog
                     -> m JobLog
-addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus jobLog = do
+addToCorpusWithForm user cid (NewWithForm ft ff d l _n) logStatus jobLog = do
   printDebug "[addToCorpusWithForm] Parsing corpus: " cid
   printDebug "[addToCorpusWithForm] fileType" ft
+  printDebug "[addToCorpusWithForm] fileFormat" ff
   logStatus jobLog
+  limit' <- view $ hasConfig . gc_max_docs_parsers
+  let limit = fromIntegral limit' :: Integer
   let
-    parse = case ft of
-      CSV_HAL   -> Parser.parseFormat Parser.CsvHal
-      CSV       -> Parser.parseFormat Parser.CsvGargV3
-      WOS       -> Parser.parseFormat Parser.WOS
-      PresseRIS -> Parser.parseFormat Parser.RisPresse
-      ZIP       -> Parser.parseFormat Parser.ZIP
+    parseC = case ft of
+      CSV_HAL   -> Parser.parseFormatC Parser.CsvHal
+      CSV       -> Parser.parseFormatC Parser.CsvGargV3
+      WOS       -> Parser.parseFormatC Parser.WOS
+      PresseRIS -> Parser.parseFormatC Parser.RisPresse
   
   -- TODO granularity of the logStatus
-  let data' = case ft of
-        ZIP -> case BSB64.decode $ TE.encodeUtf8 d of
+  let data' = case ff of
+        Plain -> cs d
+        ZIP   -> case BSB64.decode $ TE.encodeUtf8 d of
           Left err -> panic $ T.pack "[addToCorpusWithForm] error decoding base64: " <> T.pack err
           Right decoded -> decoded
-        _   -> cs d
-  eDocs <- liftBase $ parse data'
-  case eDocs of
-    Right docs' -> do
+  eDocsC <- liftBase $ parseC ff data'
+  case eDocsC of
+    Right (mCount, docsC) -> do
       -- TODO Add progress (jobStatus) update for docs - this is a
       -- long action
-      limit' <- view $ hasConfig . gc_max_docs_parsers
-      let limit = fromIntegral limit'
-      if length docs' > limit then do
-        printDebug "[addToCorpusWithForm] number of docs exceeds the limit" (show $ length docs')
-        let panicMsg' = [ "[addToCorpusWithForm] number of docs ("
-                        , show $ length docs'
-                        , ") exceeds the MAX_DOCS_PARSERS limit ("
-                        , show limit
-                        , ")" ]
-        let panicMsg = T.concat $ T.pack <$> panicMsg'
-        logStatus $ jobLogFailTotalWithMessage panicMsg jobLog
-        panic panicMsg
-      else
-        pure ()
-      let docs = splitEvery 500 $ take limit docs'
-
-      printDebug "Parsing corpus finished : " cid
-      logStatus jobLog2
-
-      printDebug "Starting extraction     : " cid
+
+      let docsC' = zipSources (yieldMany [1..]) docsC
+                  .| mapMC (\(idx, doc) ->
+                        if idx > limit then do
+                          --printDebug "[addToCorpusWithForm] number of docs exceeds the limit" (show limit)
+                          let panicMsg' = [ "[addToCorpusWithForm] number of docs "
+                                          , "exceeds the MAX_DOCS_PARSERS limit ("
+                                          , show limit
+                                          , ")" ]
+                          let panicMsg = T.concat $ T.pack <$> panicMsg'
+                          --logStatus $ jobLogFailTotalWithMessage panicMsg jobLog
+                          panic panicMsg
+                        else
+                          pure doc)
+                  .| mapC toHyperdataDocument
+
+      --printDebug "Parsing corpus finished : " cid
+      --logStatus jobLog2
+
+      --printDebug "Starting extraction     : " cid
       -- TODO granularity of the logStatus
+      printDebug "flowCorpus with lang" l
+
       _cid' <- flowCorpus user
                           (Right [cid])
                           (Multi $ fromMaybe EN l)
                           Nothing
-                          (map (map toHyperdataDocument) docs)
+                          --(Just $ fromIntegral $ length docs, docsC')
+                          (mCount, transPipe liftBase docsC') -- TODO fix number of docs
+                          --(map (map toHyperdataDocument) docs)
                           logStatus
 
       printDebug "Extraction finished   : " cid
       printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
-      sendMail user
+      -- TODO uncomment this
+      --sendMail user
 
       logStatus jobLog3
-      pure jobLog3
+      pure jobLog3
     Left e -> do
       printDebug "[addToCorpusWithForm] parse error" e