2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
19 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE Arrows #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE QuasiQuotes #-}
23 {-# LANGUAGE TemplateHaskell #-}
25 module Gargantext.Core.NodeStory
36 , initNodeListStoryMock
43 , getNodeArchiveHistory
56 -- import Debug.Trace (traceShow)
57 --import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
58 import Codec.Serialise.Class
59 import Control.Arrow (returnA)
60 import Control.Concurrent (MVar(), {-withMVar,-} newMVar, modifyMVar_)
61 import Control.Exception (catch, throw, SomeException(..))
62 import Control.Lens (makeLenses, Getter, (^.), (.~), traverse)
63 import Control.Monad.Except
64 import Control.Monad.Reader
65 import Data.Aeson hiding ((.=), decode)
66 import Data.ByteString.Char8 (hPutStrLn)
67 import Data.Map.Strict (Map)
68 import Data.Maybe (mapMaybe)
70 import Data.Pool (Pool, withResource)
72 import qualified Database.PostgreSQL.Simple as PGS
73 import Database.PostgreSQL.Simple.SqlQQ (sql)
74 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
75 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
76 import GHC.Generics (Generic)
77 import Gargantext.API.Ngrams.Types
78 import Gargantext.Core.Types (NodeId(..), NodeType)
79 import Gargantext.Core.Utils.Prefix (unPrefix)
80 import Gargantext.Database.Admin.Config (nodeTypeId)
81 import Gargantext.Database.Prelude (CmdM', HasConnectionPool, HasConfig)
82 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
83 import Gargantext.Prelude
84 import Opaleye (Column, DefaultFromField(..), Insert(..), Select, SqlInt4, SqlJsonb, Table, Update(..), (.==), fromPGSFromField, rCount, restrict, runInsert, runSelect, runUpdate, selectTable, sqlInt4, sqlValueJSONB, tableField, updateEasy)
85 import Opaleye.Internal.Table (Table(..))
86 import System.IO (stderr)
87 import qualified Data.Map.Strict as Map
88 import qualified Data.Map.Strict.Patch as PM
89 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
91 ------------------------------------------------------------------------
92 data NodeStoryEnv = NodeStoryEnv
93 { _nse_var :: !(MVar NodeListStory)
94 , _nse_saver :: !(IO ())
95 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
96 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
97 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
101 type HasNodeStory env err m = ( CmdM' env err m
104 , HasNodeStoryEnv env
106 , HasConnectionPool env
110 class (HasNodeStoryVar env, HasNodeStorySaver env)
111 => HasNodeStoryEnv env where
112 hasNodeStory :: Getter env NodeStoryEnv
114 class HasNodeStoryVar env where
115 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
117 class HasNodeStorySaver env where
118 hasNodeStorySaver :: Getter env (IO ())
120 ------------------------------------------------------------------------
122 {- | Node Story for each NodeType where the Key of the Map is NodeId
123 TODO : generalize for any NodeType, let's start with NodeList which
124 is implemented already
126 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
127 deriving (Generic, Show)
129 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
130 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
131 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
133 data Archive s p = Archive
134 { _a_version :: !Version
137 -- first patch in the list is the most recent
138 -- We use `take` in `commitStatePatch`, that's why.
140 -- History is immutable, we just insert things on top of existing
143 -- We don't need to store the whole history in memory, this
144 -- structure holds only recent history, the one that will be
145 -- inserted to the DB.
147 deriving (Generic, Show)
149 instance (Serialise s, Serialise p) => Serialise (Archive s p)
152 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
154 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
155 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
156 instance Serialise NgramsStatePatch'
157 instance FromField (Archive NgramsState' NgramsStatePatch')
159 fromField = fromJSONField
160 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
162 defaultFromField = fromPGSFromField
164 -- TODO Semigroup instance for unions
166 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
167 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
169 , _a_history = p' }) =
170 Archive { _a_version = v'
172 , _a_history = p' <> p }
174 -- instance Monoid (Archive NgramsState' NgramsStatePatch') where
175 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
176 mempty = Archive { _a_version = 0
180 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
181 parseJSON = genericParseJSON $ unPrefix "_a_"
183 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
184 toJSON = genericToJSON $ unPrefix "_a_"
185 toEncoding = genericToEncoding $ unPrefix "_a_"
187 ------------------------------------------------------------------------
188 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
189 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
191 initArchive :: (Monoid s, Semigroup p) => Archive s p
194 initNodeListStoryMock :: NodeListStory
195 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
198 archive = Archive { _a_version = 0
199 , _a_state = ngramsTableMap
201 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
203 [ (n ^. ne_ngrams, ngramsElementToRepo n)
204 | n <- mockTable ^. _NgramsTable
207 ------------------------------------------------------------------------
210 ------------------------------------------------------------------------
211 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
212 makeLenses ''NodeStoryEnv
213 makeLenses ''NodeStory
216 -----------------------------------------
219 data NodeStoryPoly a b = NodeStoryDB { node_id :: a
223 type ArchiveQ = Archive NgramsState' NgramsStatePatch'
225 type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
226 type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
228 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
231 runPGSExecuteMany :: (PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> [q] -> IO Int64
232 runPGSExecuteMany pool qs a = withResource pool $ \c -> catch (PGS.executeMany c qs a) (printError c)
234 printError _c (SomeException e) = do
235 --q' <- PGS.formatQuery c qs a
236 --hPutStrLn stderr q'
237 throw (SomeException e)
239 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> q -> IO [r]
240 runPGSQuery pool q a = withResource pool $ \c -> catch (PGS.query c q a) (printError c)
242 printError c (SomeException e) = do
243 q' <- PGS.formatQuery c q a
245 throw (SomeException e)
247 nodeExists :: Pool PGS.Connection -> NodeId -> IO Bool
248 nodeExists pool nId = (== [PGS.Only True])
249 <$> runPGSQuery pool [sql|SELECT true FROM nodes WHERE id = ? AND ? |] (nId, True)
251 getNodesIdWithType :: Pool PGS.Connection -> NodeType -> IO [NodeId]
252 getNodesIdWithType pool nt = do
253 ns <- runPGSQuery pool query (nodeTypeId nt, True)
254 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
257 query = [sql|SELECT id FROM nodes WHERE typename = ? AND ? |]
261 nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
264 ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
265 , archive = tableField "archive" } )
267 nodeStorySelect :: Select NodeStoryRead
268 nodeStorySelect = selectTable nodeStoryTable
270 -- TODO Check ordering, "first patch in the _a_history list is the most recent"
271 getNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> IO [NgramsStatePatch']
272 getNodeArchiveHistory pool nodeId = do
273 as <- runPGSQuery pool query (nodeId, True)
274 let asTuples = mapMaybe (\(ngrams_type_id, patch) -> (\ntId -> (ntId, patch)) <$> (TableNgrams.fromNgramsTypeId ngrams_type_id)) as
275 pure $ (\(ntId, patch) -> fst $ PM.singleton ntId patch) <$> asTuples
278 query = [sql|SELECT ngrams_type_id, patch FROM node_story_archive_history WHERE node_id = ? AND ? |]
280 insertNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
281 insertNodeArchiveHistory _ _ [] = pure ()
282 insertNodeArchiveHistory pool nodeId (h:hs) = do
283 _ <- runPGSExecuteMany pool query $ (\(nType, patch) -> (nodeId, TableNgrams.ngramsTypeId nType, patch)) <$> (PM.toList h)
284 _ <- insertNodeArchiveHistory pool nodeId hs
288 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, patch) VALUES (?, ?, ?) |]
290 getNodeStory :: Pool PGS.Connection -> NodeId -> IO NodeListStory
291 getNodeStory pool (NodeId nodeId) = do
292 res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId ArchiveQ]
293 withArchive <- mapM (\(NodeStoryDB { node_id = nId, archive = Archive { .. } }) -> do
294 --a <- getNodeArchiveHistory pool nId
295 let a = [] :: [NgramsStatePatch']
296 -- Don't read whole history. Only state is needed and most recent changes.
297 pure (nId, Archive { _a_history = a, .. })) res
298 pure $ NodeStory $ Map.fromListWith (<>) withArchive
299 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
301 query :: Select NodeStoryRead
302 query = proc () -> do
303 row@(NodeStoryDB node_id _) <- nodeStorySelect -< ()
304 restrict -< node_id .== sqlInt4 nodeId
307 insertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
308 insertNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
309 ret <- withResource pool $ \c -> runInsert c insert
310 insertNodeArchiveHistory pool nodeId _a_history
313 emptyHistory = [] :: [NgramsStatePatch']
314 insert = Insert { iTable = nodeStoryTable
315 , iRows = [NodeStoryDB { node_id = sqlInt4 nId
316 , archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
318 , iReturning = rCount
319 , iOnConflict = Nothing }
321 updateNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
322 updateNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
323 ret <- withResource pool $ \c -> runUpdate c update
324 insertNodeArchiveHistory pool nodeId _a_history
327 emptyHistory = [] :: [NgramsStatePatch']
328 update = Update { uTable = nodeStoryTable
329 , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) -> NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
332 , uWhere = (\row -> node_id row .== sqlInt4 nId)
333 , uReturning = rCount }
335 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
336 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
338 -- delete = Delete { dTable = nodeStoryTable
339 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
340 -- , dReturning = rCount }
342 upsertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
343 upsertNodeArchive pool nId a = do
344 (NodeStory m) <- getNodeStory pool nId
345 case Map.lookup nId m of
346 Nothing -> insertNodeArchive pool nId a
347 Just _ -> updateNodeArchive pool nId a
349 writeNodeStories :: Pool PGS.Connection -> NodeListStory -> IO ()
350 writeNodeStories pool (NodeStory nls) = do
351 _ <- mapM (\(nId, a) -> upsertNodeArchive pool nId a) $ Map.toList nls
354 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
355 nodeStoryInc :: Pool PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
356 nodeStoryInc pool Nothing nId = getNodeStory pool nId
357 nodeStoryInc pool (Just ns@(NodeStory nls)) nId = do
358 case Map.lookup nId nls of
360 (NodeStory nls') <- getNodeStory pool nId
361 pure $ NodeStory $ Map.union nls nls'
364 nodeStoryIncs :: Pool PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
365 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
366 nodeStoryIncs pool (Just nls) ns = foldM (\m n -> nodeStoryInc pool (Just m) n) nls ns
367 nodeStoryIncs pool Nothing (ni:ns) = do
368 m <- getNodeStory pool ni
369 nodeStoryIncs pool (Just m) ns
371 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
372 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
373 -- case Map.lookup ni nls of
375 -- _ <- nodeStoryRemove pool ni
378 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
379 -- _ <- nodeStoryRemove pool ni
380 -- pure $ NodeStory ns'
381 ------------------------------------
383 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
384 readNodeStoryEnv pool = do
385 mvar <- nodeStoryVar pool Nothing []
386 -- saver <- mkNodeStorySaver pool mvar
387 let saver = modifyMVar_ mvar $ \mv -> do
388 writeNodeStories pool mv
389 printDebug "[readNodeStoryEnv] saver" mv
390 let mv' = clearHistory mv
391 printDebug "[readNodeStoryEnv] saver, cleared" mv'
393 pure $ NodeStoryEnv { _nse_var = mvar
395 , _nse_getter = nodeStoryVar pool (Just mvar) }
397 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
398 nodeStoryVar pool Nothing nIds = do
399 state <- nodeStoryIncs pool Nothing nIds
401 nodeStoryVar pool (Just mv) nIds = do
402 _ <- modifyMVar_ mv $ \nsl -> (nodeStoryIncs pool (Just nsl) nIds)
405 -- TODO No debounce since this is IO stuff.
406 -- debounce is useful since it could delay the saving to some later
407 -- time, asynchronously and we keep operating on memory only.
409 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
410 mkNodeStorySaver pool mvns = mkDebounce settings
412 settings = defaultDebounceSettings
413 { debounceAction = do
414 withMVar mvns (\ns -> writeNodeStories pool ns)
415 withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
416 modifyMVar_ mvns $ \ns -> pure $ clearAHistoryToInsert ns
417 , debounceFreq = 1*minute
420 second = 10^(6 :: Int)
423 clearHistory :: NodeListStory -> NodeListStory
424 -- clearHistory (NodeStory ns) =
426 -- NodeStory $ Map.map (\(Archive { .. }) -> Archive { _a_history_to_insert = emptyHistory, .. }) ns
427 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
429 emptyHistory = [] :: [NgramsStatePatch']
431 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
432 -- mkNodeStorySaver mvns = mkDebounce settings
434 -- settings = defaultDebounceSettings
435 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
436 -- , debounceFreq = 1 * minute
437 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
439 -- minute = 60 * second
440 -- second = 10^(6 :: Int)
443 -----------------------------------------