2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
13 Couple of words on how this is implemented.
15 First version used files which stored Archive for each NodeId in a
18 For performance reasons, it is rewritten to use the DB.
20 The table `node_stories` contains two columns: `node_id` and
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
29 Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
49 module Gargantext.Core.NodeStory
60 , initNodeListStoryMock
67 , getNodeArchiveHistory
80 -- import Debug.Trace (traceShow)
81 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
82 import Codec.Serialise.Class
83 import Control.Arrow (returnA)
84 import Control.Concurrent (MVar(), withMVar, newMVar, modifyMVar_)
85 import Control.Exception (catch, throw, SomeException(..))
86 import Control.Lens (makeLenses, Getter, (^.), (.~), traverse)
87 import Control.Monad.Except
88 import Control.Monad.Reader
89 import Data.Aeson hiding ((.=), decode)
90 import Data.ByteString.Char8 (hPutStrLn)
91 import Data.Map.Strict (Map)
92 import Data.Maybe (mapMaybe)
94 import Data.Pool (Pool, withResource)
96 import qualified Database.PostgreSQL.Simple as PGS
97 import Database.PostgreSQL.Simple.SqlQQ (sql)
98 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
99 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
100 import GHC.Generics (Generic)
101 import Gargantext.API.Ngrams.Types
102 import Gargantext.Core.Types (NodeId(..), NodeType)
103 import Gargantext.Core.Utils.Prefix (unPrefix)
104 import Gargantext.Database.Admin.Config (nodeTypeId)
105 import Gargantext.Database.Prelude (CmdM', HasConnectionPool, HasConfig)
106 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
107 import Gargantext.Prelude
108 import Opaleye (Column, DefaultFromField(..), Insert(..), Select, SqlInt4, SqlJsonb, Table, Update(..), (.==), fromPGSFromField, rCount, restrict, runInsert, runSelect, runUpdate, selectTable, sqlInt4, sqlValueJSONB, tableField, updateEasy)
109 import Opaleye.Internal.Table (Table(..))
110 import System.IO (stderr)
111 import qualified Data.Map.Strict as Map
112 import qualified Data.Map.Strict.Patch as PM
113 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
115 ------------------------------------------------------------------------
116 data NodeStoryEnv = NodeStoryEnv
117 { _nse_var :: !(MVar NodeListStory)
118 , _nse_saver :: !(IO ())
119 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
120 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
121 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
125 type HasNodeStory env err m = ( CmdM' env err m
128 , HasNodeStoryEnv env
130 , HasConnectionPool env
134 class (HasNodeStoryVar env, HasNodeStorySaver env)
135 => HasNodeStoryEnv env where
136 hasNodeStory :: Getter env NodeStoryEnv
138 class HasNodeStoryVar env where
139 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
141 class HasNodeStorySaver env where
142 hasNodeStorySaver :: Getter env (IO ())
144 ------------------------------------------------------------------------
146 {- | Node Story for each NodeType where the Key of the Map is NodeId
147 TODO : generalize for any NodeType, let's start with NodeList which
148 is implemented already
150 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
151 deriving (Generic, Show)
153 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
154 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
155 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
157 data Archive s p = Archive
158 { _a_version :: !Version
161 -- first patch in the list is the most recent
162 -- We use `take` in `commitStatePatch`, that's why.
164 -- History is immutable, we just insert things on top of existing
167 -- We don't need to store the whole history in memory, this
168 -- structure holds only recent history, the one that will be
169 -- inserted to the DB.
171 deriving (Generic, Show)
173 instance (Serialise s, Serialise p) => Serialise (Archive s p)
176 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
178 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
179 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
180 instance Serialise NgramsStatePatch'
181 instance FromField (Archive NgramsState' NgramsStatePatch')
183 fromField = fromJSONField
184 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
186 defaultFromField = fromPGSFromField
188 -- TODO Semigroup instance for unions
190 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
191 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
193 , _a_history = p' }) =
194 Archive { _a_version = v'
196 , _a_history = p' <> p }
198 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
199 mempty = Archive { _a_version = 0
203 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
204 parseJSON = genericParseJSON $ unPrefix "_a_"
206 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
207 toJSON = genericToJSON $ unPrefix "_a_"
208 toEncoding = genericToEncoding $ unPrefix "_a_"
210 ------------------------------------------------------------------------
211 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
212 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
214 initArchive :: (Monoid s, Semigroup p) => Archive s p
217 initNodeListStoryMock :: NodeListStory
218 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
221 archive = Archive { _a_version = 0
222 , _a_state = ngramsTableMap
224 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
226 [ (n ^. ne_ngrams, ngramsElementToRepo n)
227 | n <- mockTable ^. _NgramsTable
230 ------------------------------------------------------------------------
233 ------------------------------------------------------------------------
234 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
235 makeLenses ''NodeStoryEnv
236 makeLenses ''NodeStory
239 -----------------------------------------
242 data NodeStoryPoly a b = NodeStoryDB { node_id :: a
246 type ArchiveQ = Archive NgramsState' NgramsStatePatch'
248 type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
249 type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
251 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
254 runPGSExecuteMany :: (PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> [q] -> IO Int64
255 runPGSExecuteMany pool qs a = withResource pool $ \c -> catch (PGS.executeMany c qs a) (printError c)
257 printError _c (SomeException e) = do
258 --q' <- PGS.formatQuery c qs a
259 --hPutStrLn stderr q'
260 throw (SomeException e)
262 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> q -> IO [r]
263 runPGSQuery pool q a = withResource pool $ \c -> catch (PGS.query c q a) (printError c)
265 printError c (SomeException e) = do
266 q' <- PGS.formatQuery c q a
268 throw (SomeException e)
270 nodeExists :: Pool PGS.Connection -> NodeId -> IO Bool
271 nodeExists pool nId = (== [PGS.Only True])
272 <$> runPGSQuery pool [sql|SELECT true FROM nodes WHERE id = ? AND ? |] (nId, True)
274 getNodesIdWithType :: Pool PGS.Connection -> NodeType -> IO [NodeId]
275 getNodesIdWithType pool nt = do
276 ns <- runPGSQuery pool query (nodeTypeId nt, True)
277 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
280 query = [sql|SELECT id FROM nodes WHERE typename = ? AND ? |]
284 nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
287 ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
288 , archive = tableField "archive" } )
290 nodeStorySelect :: Select NodeStoryRead
291 nodeStorySelect = selectTable nodeStoryTable
293 -- TODO Check ordering, "first patch in the _a_history list is the most recent"
294 getNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> IO [NgramsStatePatch']
295 getNodeArchiveHistory pool nodeId = do
296 as <- runPGSQuery pool query (nodeId, True)
297 let asTuples = mapMaybe (\(ngrams_type_id, patch) -> (\ntId -> (ntId, patch)) <$> (TableNgrams.fromNgramsTypeId ngrams_type_id)) as
298 pure $ (\(ntId, patch) -> fst $ PM.singleton ntId patch) <$> asTuples
301 query = [sql|SELECT ngrams_type_id, patch FROM node_story_archive_history WHERE node_id = ? AND ? |]
303 insertNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
304 insertNodeArchiveHistory _ _ [] = pure ()
305 insertNodeArchiveHistory pool nodeId (h:hs) = do
306 _ <- runPGSExecuteMany pool query $ (\(nType, patch) -> (nodeId, TableNgrams.ngramsTypeId nType, patch)) <$> (PM.toList h)
307 _ <- insertNodeArchiveHistory pool nodeId hs
311 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, patch) VALUES (?, ?, ?) |]
313 getNodeStory :: Pool PGS.Connection -> NodeId -> IO NodeListStory
314 getNodeStory pool (NodeId nodeId) = do
315 res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId ArchiveQ]
316 withArchive <- mapM (\(NodeStoryDB { node_id = nId, archive = Archive { .. } }) -> do
317 --a <- getNodeArchiveHistory pool nId
318 let a = [] :: [NgramsStatePatch']
319 -- Don't read whole history. Only state is needed and most recent changes.
320 pure (nId, Archive { _a_history = a, .. })) res
321 pure $ NodeStory $ Map.fromListWith (<>) withArchive
322 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
324 query :: Select NodeStoryRead
325 query = proc () -> do
326 row@(NodeStoryDB node_id _) <- nodeStorySelect -< ()
327 restrict -< node_id .== sqlInt4 nodeId
330 insertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
331 insertNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
332 ret <- withResource pool $ \c -> runInsert c insert
333 -- NOTE: It is assumed that the most recent change is the first in the
334 -- list, so we save these in reverse order
335 insertNodeArchiveHistory pool nodeId $ reverse _a_history
338 emptyHistory = [] :: [NgramsStatePatch']
339 insert = Insert { iTable = nodeStoryTable
340 , iRows = [NodeStoryDB { node_id = sqlInt4 nId
341 , archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
343 , iReturning = rCount
344 , iOnConflict = Nothing }
346 updateNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
347 updateNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
348 ret <- withResource pool $ \c -> runUpdate c update
349 -- NOTE: It is assumed that the most recent change is the first in the
350 -- list, so we save these in reverse order
351 insertNodeArchiveHistory pool nodeId $ reverse _a_history
354 emptyHistory = [] :: [NgramsStatePatch']
355 update = Update { uTable = nodeStoryTable
356 , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) -> NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
359 , uWhere = (\row -> node_id row .== sqlInt4 nId)
360 , uReturning = rCount }
362 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
363 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
365 -- delete = Delete { dTable = nodeStoryTable
366 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
367 -- , dReturning = rCount }
369 upsertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
370 upsertNodeArchive pool nId a = do
371 (NodeStory m) <- getNodeStory pool nId
372 case Map.lookup nId m of
373 Nothing -> insertNodeArchive pool nId a
374 Just _ -> updateNodeArchive pool nId a
376 writeNodeStories :: Pool PGS.Connection -> NodeListStory -> IO ()
377 writeNodeStories pool (NodeStory nls) = do
378 _ <- mapM (\(nId, a) -> upsertNodeArchive pool nId a) $ Map.toList nls
381 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
382 nodeStoryInc :: Pool PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
383 nodeStoryInc pool Nothing nId = getNodeStory pool nId
384 nodeStoryInc pool (Just ns@(NodeStory nls)) nId = do
385 case Map.lookup nId nls of
387 (NodeStory nls') <- getNodeStory pool nId
388 pure $ NodeStory $ Map.union nls nls'
391 nodeStoryIncs :: Pool PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
392 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
393 nodeStoryIncs pool (Just nls) ns = foldM (\m n -> nodeStoryInc pool (Just m) n) nls ns
394 nodeStoryIncs pool Nothing (ni:ns) = do
395 m <- getNodeStory pool ni
396 nodeStoryIncs pool (Just m) ns
398 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
399 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
400 -- case Map.lookup ni nls of
402 -- _ <- nodeStoryRemove pool ni
405 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
406 -- _ <- nodeStoryRemove pool ni
407 -- pure $ NodeStory ns'
408 ------------------------------------
410 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
411 readNodeStoryEnv pool = do
412 mvar <- nodeStoryVar pool Nothing []
413 saver <- mkNodeStorySaver pool mvar
414 -- let saver = modifyMVar_ mvar $ \mv -> do
415 -- writeNodeStories pool mv
416 -- printDebug "[readNodeStoryEnv] saver" mv
417 -- let mv' = clearHistory mv
418 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
420 pure $ NodeStoryEnv { _nse_var = mvar
422 , _nse_getter = nodeStoryVar pool (Just mvar) }
424 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
425 nodeStoryVar pool Nothing nIds = do
426 state <- nodeStoryIncs pool Nothing nIds
428 nodeStoryVar pool (Just mv) nIds = do
429 _ <- modifyMVar_ mv $ \nsl -> (nodeStoryIncs pool (Just nsl) nIds)
432 -- Debounce is useful since it could delay the saving to some later
433 -- time, asynchronously and we keep operating on memory only.
434 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
435 mkNodeStorySaver pool mvns = mkDebounce settings
437 settings = defaultDebounceSettings
438 { debounceAction = do
439 withMVar mvns (\ns -> writeNodeStories pool ns)
440 withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
441 modifyMVar_ mvns $ \ns -> pure $ clearHistory ns
442 , debounceFreq = 1*minute
445 second = 10^(6 :: Int)
447 clearHistory :: NodeListStory -> NodeListStory
448 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
450 emptyHistory = [] :: [NgramsStatePatch']
452 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
453 -- mkNodeStorySaver mvns = mkDebounce settings
455 -- settings = defaultDebounceSettings
456 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
457 -- , debounceFreq = 1 * minute
458 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
460 -- minute = 60 * second
461 -- second = 10^(6 :: Int)
464 -----------------------------------------