2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
13 Couple of words on how this is implemented.
15 First version used files which stored Archive for each NodeId in a
18 For performance reasons, it is rewritten to use the DB.
20 The table `node_stories` contains two columns: `node_id` and
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
29 Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
49 module Gargantext.Core.NodeStory
60 , initNodeListStoryMock
67 , getNodeArchiveHistory
80 -- import Debug.Trace (traceShow)
81 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
82 import Codec.Serialise.Class
83 import Control.Arrow (returnA)
84 import Control.Concurrent (MVar(), withMVar, newMVar, modifyMVar_)
85 import Control.Exception (catch, throw, SomeException(..))
86 import Control.Lens (makeLenses, Getter, (^.), (.~), traverse)
87 import Control.Monad.Except
88 import Control.Monad.Reader
89 import Data.Aeson hiding ((.=), decode)
90 import Data.ByteString.Char8 (hPutStrLn)
91 import Data.Map.Strict (Map)
92 import Data.Maybe (catMaybes, mapMaybe)
94 import Data.Pool (Pool, withResource)
96 import qualified Database.PostgreSQL.Simple as PGS
97 import Database.PostgreSQL.Simple.SqlQQ (sql)
98 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
99 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
100 import GHC.Generics (Generic)
101 import Gargantext.API.Ngrams.Types
102 import Gargantext.Core.Types (NodeId(..), NodeType)
103 import Gargantext.Core.Utils.Prefix (unPrefix)
104 import Gargantext.Database.Admin.Config (nodeTypeId)
105 import Gargantext.Database.Prelude (CmdM', HasConnectionPool, HasConfig)
106 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
107 import Gargantext.Prelude
108 import Opaleye (Column, DefaultFromField(..), Insert(..), Select, SqlInt4, SqlJsonb, Table, Update(..), (.==), fromPGSFromField, rCount, restrict, runInsert, runSelect, runUpdate, selectTable, sqlInt4, sqlValueJSONB, tableField, updateEasy)
109 import Opaleye.Internal.Table (Table(..))
110 import System.IO (stderr)
111 import qualified Data.Map.Strict as Map
112 import qualified Data.Map.Strict.Patch as PM
113 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
115 ------------------------------------------------------------------------
116 data NodeStoryEnv = NodeStoryEnv
117 { _nse_var :: !(MVar NodeListStory)
118 , _nse_saver :: !(IO ())
119 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
120 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
121 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
125 type HasNodeStory env err m = ( CmdM' env err m
128 , HasNodeStoryEnv env
130 , HasConnectionPool env
134 class (HasNodeStoryVar env, HasNodeStorySaver env)
135 => HasNodeStoryEnv env where
136 hasNodeStory :: Getter env NodeStoryEnv
138 class HasNodeStoryVar env where
139 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
141 class HasNodeStorySaver env where
142 hasNodeStorySaver :: Getter env (IO ())
144 ------------------------------------------------------------------------
146 {- | Node Story for each NodeType where the Key of the Map is NodeId
147 TODO : generalize for any NodeType, let's start with NodeList which
148 is implemented already
150 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
151 deriving (Generic, Show)
153 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
154 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
155 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
157 data Archive s p = Archive
158 { _a_version :: !Version
161 -- first patch in the list is the most recent
162 -- We use `take` in `commitStatePatch`, that's why.
164 -- History is immutable, we just insert things on top of existing
167 -- We don't need to store the whole history in memory, this
168 -- structure holds only recent history, the one that will be
169 -- inserted to the DB.
171 deriving (Generic, Show)
173 instance (Serialise s, Serialise p) => Serialise (Archive s p)
176 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
178 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
179 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
180 instance Serialise NgramsStatePatch'
181 instance FromField (Archive NgramsState' NgramsStatePatch')
183 fromField = fromJSONField
184 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
186 defaultFromField = fromPGSFromField
188 -- TODO Semigroup instance for unions
190 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
191 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
193 , _a_history = p' }) =
194 Archive { _a_version = v'
196 , _a_history = p' <> p }
198 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
199 mempty = Archive { _a_version = 0
203 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
204 parseJSON = genericParseJSON $ unPrefix "_a_"
206 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
207 toJSON = genericToJSON $ unPrefix "_a_"
208 toEncoding = genericToEncoding $ unPrefix "_a_"
210 ------------------------------------------------------------------------
211 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
212 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
214 initArchive :: (Monoid s, Semigroup p) => Archive s p
217 initNodeListStoryMock :: NodeListStory
218 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
221 archive = Archive { _a_version = 0
222 , _a_state = ngramsTableMap
224 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
226 [ (n ^. ne_ngrams, ngramsElementToRepo n)
227 | n <- mockTable ^. _NgramsTable
230 ------------------------------------------------------------------------
233 ------------------------------------------------------------------------
234 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
235 makeLenses ''NodeStoryEnv
236 makeLenses ''NodeStory
239 -----------------------------------------
242 data NodeStoryPoly a b = NodeStoryDB { node_id :: a
246 type ArchiveQ = Archive NgramsState' NgramsStatePatch'
248 type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
249 type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
251 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
254 runPGSExecuteMany :: (PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> [q] -> IO Int64
255 runPGSExecuteMany pool qs a = withResource pool $ \c -> catch (PGS.executeMany c qs a) (printError c)
257 printError _c (SomeException e) = do
258 --q' <- PGS.formatQuery c qs a
259 --hPutStrLn stderr q'
260 throw (SomeException e)
262 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> q -> IO [r]
263 runPGSQuery pool q a = withResource pool $ \c -> catch (PGS.query c q a) (printError c)
265 printError c (SomeException e) = do
266 q' <- PGS.formatQuery c q a
268 throw (SomeException e)
270 nodeExists :: Pool PGS.Connection -> NodeId -> IO Bool
271 nodeExists pool nId = (== [PGS.Only True])
272 <$> runPGSQuery pool [sql|SELECT true FROM nodes WHERE id = ? AND ? |] (nId, True)
274 getNodesIdWithType :: Pool PGS.Connection -> NodeType -> IO [NodeId]
275 getNodesIdWithType pool nt = do
276 ns <- runPGSQuery pool query (nodeTypeId nt, True)
277 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
280 query = [sql|SELECT id FROM nodes WHERE typename = ? AND ? |]
284 nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
287 ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
288 , archive = tableField "archive" } )
290 nodeStorySelect :: Select NodeStoryRead
291 nodeStorySelect = selectTable nodeStoryTable
293 -- TODO Check ordering, "first patch in the _a_history list is the most recent"
294 getNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> IO [NgramsStatePatch']
295 getNodeArchiveHistory pool nodeId = do
296 as <- runPGSQuery pool query (nodeId, True)
297 let asTuples = mapMaybe (\(ngrams_type_id, ngrams, patch) -> (\ntId -> (ntId, ngrams, patch)) <$> (TableNgrams.fromNgramsTypeId ngrams_type_id)) as
298 pure $ (\(ntId, terms, patch) -> fst $ PM.singleton ntId (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> asTuples
301 query = [sql|SELECT ngrams_type_id, terms, patch
302 FROM node_story_archive_history
303 JOIN ngrams ON ngrams.id = ngrams_id
304 WHERE node_id = ? AND ? |]
306 insertNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
307 insertNodeArchiveHistory _ _ [] = pure ()
308 insertNodeArchiveHistory pool nodeId (h:hs) = do
309 let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
311 (nodeId, TableNgrams.ngramsTypeId nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsTypeId, NgramsTerm, NgramsPatch)]
312 tuplesM <- mapM (\(nId, nTypeId, term, patch) -> do
313 ngrams <- runPGSQuery pool ngramsQuery (term, True)
314 pure $ (\(PGS.Only termId) -> (nId, nTypeId, termId, term, patch)) <$> (headMay ngrams)
315 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsTypeId, Int, NgramsTerm, NgramsPatch)]
316 _ <- runPGSExecuteMany pool query $ ((\(nId, nTypeId, termId, _term, patch) -> (nId, nTypeId, termId, patch)) <$> (catMaybes tuplesM))
317 _ <- insertNodeArchiveHistory pool nodeId hs
320 ngramsQuery :: PGS.Query
321 ngramsQuery = [sql| SELECT id FROM ngrams WHERE terms = ? AND ? |]
324 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch) VALUES (?, ?, ?, ?) |]
326 getNodeStory :: Pool PGS.Connection -> NodeId -> IO NodeListStory
327 getNodeStory pool (NodeId nodeId) = do
328 res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId ArchiveQ]
329 withArchive <- mapM (\(NodeStoryDB { node_id = nId, archive = Archive { .. } }) -> do
330 --a <- getNodeArchiveHistory pool nId
331 let a = [] :: [NgramsStatePatch']
332 -- Don't read whole history. Only state is needed and most recent changes.
333 pure (nId, Archive { _a_history = a, .. })) res
334 pure $ NodeStory $ Map.fromListWith (<>) withArchive
335 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
337 query :: Select NodeStoryRead
338 query = proc () -> do
339 row@(NodeStoryDB node_id _) <- nodeStorySelect -< ()
340 restrict -< node_id .== sqlInt4 nodeId
343 insertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
344 insertNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
345 ret <- withResource pool $ \c -> runInsert c insert
346 -- NOTE: It is assumed that the most recent change is the first in the
347 -- list, so we save these in reverse order
348 insertNodeArchiveHistory pool nodeId $ reverse _a_history
351 emptyHistory = [] :: [NgramsStatePatch']
352 insert = Insert { iTable = nodeStoryTable
353 , iRows = [NodeStoryDB { node_id = sqlInt4 nId
354 , archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
356 , iReturning = rCount
357 , iOnConflict = Nothing }
359 updateNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
360 updateNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
361 ret <- withResource pool $ \c -> runUpdate c update
362 -- NOTE: It is assumed that the most recent change is the first in the
363 -- list, so we save these in reverse order
364 insertNodeArchiveHistory pool nodeId $ reverse _a_history
367 emptyHistory = [] :: [NgramsStatePatch']
368 update = Update { uTable = nodeStoryTable
369 , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) -> NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
372 , uWhere = (\row -> node_id row .== sqlInt4 nId)
373 , uReturning = rCount }
375 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
376 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
378 -- delete = Delete { dTable = nodeStoryTable
379 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
380 -- , dReturning = rCount }
382 upsertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
383 upsertNodeArchive pool nId a = do
384 (NodeStory m) <- getNodeStory pool nId
385 case Map.lookup nId m of
386 Nothing -> insertNodeArchive pool nId a
387 Just _ -> updateNodeArchive pool nId a
389 writeNodeStories :: Pool PGS.Connection -> NodeListStory -> IO ()
390 writeNodeStories pool (NodeStory nls) = do
391 _ <- mapM (\(nId, a) -> upsertNodeArchive pool nId a) $ Map.toList nls
394 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
395 nodeStoryInc :: Pool PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
396 nodeStoryInc pool Nothing nId = getNodeStory pool nId
397 nodeStoryInc pool (Just ns@(NodeStory nls)) nId = do
398 case Map.lookup nId nls of
400 (NodeStory nls') <- getNodeStory pool nId
401 pure $ NodeStory $ Map.union nls nls'
404 nodeStoryIncs :: Pool PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
405 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
406 nodeStoryIncs pool (Just nls) ns = foldM (\m n -> nodeStoryInc pool (Just m) n) nls ns
407 nodeStoryIncs pool Nothing (ni:ns) = do
408 m <- getNodeStory pool ni
409 nodeStoryIncs pool (Just m) ns
411 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
412 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
413 -- case Map.lookup ni nls of
415 -- _ <- nodeStoryRemove pool ni
418 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
419 -- _ <- nodeStoryRemove pool ni
420 -- pure $ NodeStory ns'
421 ------------------------------------
423 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
424 readNodeStoryEnv pool = do
425 mvar <- nodeStoryVar pool Nothing []
426 saver <- mkNodeStorySaver pool mvar
427 -- let saver = modifyMVar_ mvar $ \mv -> do
428 -- writeNodeStories pool mv
429 -- printDebug "[readNodeStoryEnv] saver" mv
430 -- let mv' = clearHistory mv
431 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
433 pure $ NodeStoryEnv { _nse_var = mvar
435 , _nse_getter = nodeStoryVar pool (Just mvar) }
437 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
438 nodeStoryVar pool Nothing nIds = do
439 state <- nodeStoryIncs pool Nothing nIds
441 nodeStoryVar pool (Just mv) nIds = do
442 _ <- modifyMVar_ mv $ \nsl -> (nodeStoryIncs pool (Just nsl) nIds)
445 -- Debounce is useful since it could delay the saving to some later
446 -- time, asynchronously and we keep operating on memory only.
447 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
448 mkNodeStorySaver pool mvns = mkDebounce settings
450 settings = defaultDebounceSettings
451 { debounceAction = do
452 withMVar mvns (\ns -> writeNodeStories pool ns)
453 withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
454 modifyMVar_ mvns $ \ns -> pure $ clearHistory ns
455 , debounceFreq = 1*minute
458 second = 10^(6 :: Int)
460 clearHistory :: NodeListStory -> NodeListStory
461 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
463 emptyHistory = [] :: [NgramsStatePatch']
465 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
466 -- mkNodeStorySaver mvns = mkDebounce settings
468 -- settings = defaultDebounceSettings
469 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
470 -- , debounceFreq = 1 * minute
471 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
473 -- minute = 60 * second
474 -- second = 10^(6 :: Int)
477 -----------------------------------------