module Gargantext.API.Node.Corpus.New
where
+import Conduit
import Control.Lens hiding (elements, Empty)
import Data.Aeson
import Data.Aeson.TH (deriveJSON)
import qualified Data.ByteString.Base64 as BSB64
+import Data.Conduit.Internal (zipSources)
import Data.Either
import Data.Maybe (fromMaybe)
import Data.Swagger
-- import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary
-import Gargantext.Prelude
-
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs, ScraperEvent(..), scst_events)
import Gargantext.API.Admin.Types (HasSettings)
-import Gargantext.API.Job (jobLogSuccess, jobLogFailTotal, jobLogFailTotalWithMessage)
-import Gargantext.API.Node.Corpus.New.File
+import Gargantext.API.Job (addEvent, jobLogSuccess, jobLogFailTotal)
+import Gargantext.API.Node.Corpus.New.Types
import Gargantext.API.Node.Corpus.Searx
import Gargantext.API.Node.Corpus.Types
import Gargantext.API.Node.Types
import Gargantext.Core (Lang(..){-, allLangs-})
import Gargantext.Core.Text.List.Social (FlowSocialListWith(..))
-import qualified Gargantext.Core.Text.Corpus.API as API
-import qualified Gargantext.Core.Text.Corpus.Parsers as Parser (FileFormat(..), parseFormat)
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
import Gargantext.Database.Action.Flow (flowCorpus, getDataText, flowDataText, TermType(..){-, allDataOrigins-})
import Gargantext.Database.Query.Table.Node (getNodeWith)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_hyperdata)
-import qualified Gargantext.Database.GargDB as GargDB
+import Gargantext.Prelude
import Gargantext.Prelude.Config (gc_max_docs_parsers)
+import qualified Gargantext.Core.Text.Corpus.API as API
+import qualified Gargantext.Core.Text.Corpus.Parsers as Parser (FileType(..), parseFormatC)
+import qualified Gargantext.Database.GargDB as GargDB
------------------------------------------------------------------------
{-
data Query = Query { query_query :: Text
------------------------------------------------------------------------
-- TODO WithQuery also has a corpus id
+
+
addToCorpusWithQuery :: FlowCmdM env err m
=> User
-> CorpusId
Just Web -> do
printDebug "[addToCorpusWithQuery] processing web request" datafield
- _ <- triggerSearxSearch cid q l
+ _ <- triggerSearxSearch user cid q l logStatus
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
-- TODO if cid is folder -> create Corpus
-- if cid is corpus -> add to corpus
-- if cid is root -> create corpus in Private
- txts <- mapM (\db -> getDataText db (Multi l) q maybeLimit) [database2origin dbs]
-
- logStatus JobLog { _scst_succeeded = Just 2
- , _scst_failed = Just 0
- , _scst_remaining = Just $ 1 + length txts
+ printDebug "[G.A.N.C.New] getDataText with query" q
+ eTxts <- mapM (\db -> getDataText db (Multi l) q maybeLimit) [database2origin dbs]
+
+ let lTxts = lefts eTxts
+ printDebug "[G.A.N.C.New] lTxts" lTxts
+ case lTxts of
+ [] -> do
+ let txts = rights eTxts
+ -- TODO Sum lenghts of each txt elements
+ logStatus $ JobLog { _scst_succeeded = Just 2
+ , _scst_failed = Just 0
+ , _scst_remaining = Just $ 1 + length txts
+ , _scst_events = Just []
+ }
+
+ cids <- mapM (\txt -> do
+ flowDataText user txt (Multi l) cid Nothing logStatus) txts
+ printDebug "corpus id" cids
+ printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
+ sendMail user
+ -- TODO ...
+ pure JobLog { _scst_succeeded = Just 3
+ , _scst_failed = Just 0
+ , _scst_remaining = Just 0
+ , _scst_events = Just []
+ }
+
+ (err:_) -> do
+ printDebug "Error: " err
+ let jl = addEvent "ERROR" (T.pack $ show err) $
+ JobLog { _scst_succeeded = Just 2
+ , _scst_failed = Just 1
+ , _scst_remaining = Just 0
, _scst_events = Just []
}
-
- cids <- mapM (\txt -> flowDataText user txt (Multi l) cid Nothing logStatus) txts
- printDebug "corpus id" cids
- printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
- sendMail user
- -- TODO ...
- pure JobLog { _scst_succeeded = Just 3
- , _scst_failed = Just 0
- , _scst_remaining = Just 0
- , _scst_events = Just []
- }
+ logStatus jl
+ pure jl
type AddWithForm = Summary "Add with FormUrlEncoded to corpus endpoint"
-> (JobLog -> m ())
-> JobLog
-> m JobLog
-addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus jobLog = do
+addToCorpusWithForm user cid (NewWithForm ft ff d l _n) logStatus jobLog = do
printDebug "[addToCorpusWithForm] Parsing corpus: " cid
printDebug "[addToCorpusWithForm] fileType" ft
+ printDebug "[addToCorpusWithForm] fileFormat" ff
logStatus jobLog
+ limit' <- view $ hasConfig . gc_max_docs_parsers
+ let limit = fromIntegral limit' :: Integer
let
- parse = case ft of
- CSV_HAL -> Parser.parseFormat Parser.CsvHal
- CSV -> Parser.parseFormat Parser.CsvGargV3
- WOS -> Parser.parseFormat Parser.WOS
- PresseRIS -> Parser.parseFormat Parser.RisPresse
- ZIP -> Parser.parseFormat Parser.ZIP
+ parseC = case ft of
+ CSV_HAL -> Parser.parseFormatC Parser.CsvHal
+ CSV -> Parser.parseFormatC Parser.CsvGargV3
+ WOS -> Parser.parseFormatC Parser.WOS
+ PresseRIS -> Parser.parseFormatC Parser.RisPresse
-- TODO granularity of the logStatus
- let data' = case ft of
- ZIP -> case BSB64.decode $ TE.encodeUtf8 d of
+ let data' = case ff of
+ Plain -> cs d
+ ZIP -> case BSB64.decode $ TE.encodeUtf8 d of
Left err -> panic $ T.pack "[addToCorpusWithForm] error decoding base64: " <> T.pack err
Right decoded -> decoded
- _ -> cs d
- eDocs <- liftBase $ parse data'
- case eDocs of
- Right docs' -> do
+ eDocsC <- liftBase $ parseC ff data'
+ case eDocsC of
+ Right (mCount, docsC) -> do
-- TODO Add progress (jobStatus) update for docs - this is a
-- long action
- limit' <- view $ hasConfig . gc_max_docs_parsers
- let limit = fromIntegral limit'
- if length docs' > limit then do
- printDebug "[addToCorpusWithForm] number of docs exceeds the limit" (show $ length docs')
- let panicMsg' = [ "[addToCorpusWithForm] number of docs ("
- , show $ length docs'
- , ") exceeds the MAX_DOCS_PARSERS limit ("
- , show limit
- , ")" ]
- let panicMsg = T.concat $ T.pack <$> panicMsg'
- logStatus $ jobLogFailTotalWithMessage panicMsg jobLog
- panic panicMsg
- else
- pure ()
- let docs = splitEvery 500 $ take limit docs'
-
- printDebug "Parsing corpus finished : " cid
- logStatus jobLog2
-
- printDebug "Starting extraction : " cid
+
+ let docsC' = zipSources (yieldMany [1..]) docsC
+ .| mapMC (\(idx, doc) ->
+ if idx > limit then do
+ --printDebug "[addToCorpusWithForm] number of docs exceeds the limit" (show limit)
+ let panicMsg' = [ "[addToCorpusWithForm] number of docs "
+ , "exceeds the MAX_DOCS_PARSERS limit ("
+ , show limit
+ , ")" ]
+ let panicMsg = T.concat $ T.pack <$> panicMsg'
+ --logStatus $ jobLogFailTotalWithMessage panicMsg jobLog
+ panic panicMsg
+ else
+ pure doc)
+ .| mapC toHyperdataDocument
+
+ --printDebug "Parsing corpus finished : " cid
+ --logStatus jobLog2
+
+ --printDebug "Starting extraction : " cid
-- TODO granularity of the logStatus
+ printDebug "flowCorpus with lang" l
+
_cid' <- flowCorpus user
(Right [cid])
(Multi $ fromMaybe EN l)
Nothing
- (map (map toHyperdataDocument) docs)
+ --(Just $ fromIntegral $ length docs, docsC')
+ (mCount, transPipe liftBase docsC') -- TODO fix number of docs
+ --(map (map toHyperdataDocument) docs)
logStatus
printDebug "Extraction finished : " cid
printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
- sendMail user
+ -- TODO uncomment this
+ --sendMail user
logStatus jobLog3
- pure $ jobLog3
+ pure jobLog3
Left e -> do
printDebug "[addToCorpusWithForm] parse error" e