]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
[nodeStory] implement history in the DB
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 TODO:
14 - remove
15 - filter
16 - charger les listes
17 -}
18
19 {-# OPTIONS_GHC -fno-warn-orphans #-}
20 {-# LANGUAGE Arrows #-}
21 {-# LANGUAGE ConstraintKinds #-}
22 {-# LANGUAGE QuasiQuotes #-}
23 {-# LANGUAGE TemplateHaskell #-}
24
25 module Gargantext.Core.NodeStory
26 ( HasNodeStory
27 , HasNodeStoryEnv
28 , hasNodeStory
29 , HasNodeStoryVar
30 , hasNodeStoryVar
31 , HasNodeStorySaver
32 , hasNodeStorySaver
33 , NodeStory(..)
34 , NgramsStatePatch'
35 , NodeListStory
36 , initNodeListStoryMock
37 , NodeStoryEnv(..)
38 , initNodeStory
39 , nse_getter
40 , nse_saver
41 , nse_var
42 , unNodeStory
43 , getNodeArchiveHistory
44 , Archive(..)
45 , initArchive
46 , a_history
47 , a_state
48 , a_version
49 , nodeExists
50 , getNodesIdWithType
51 , readNodeStoryEnv
52 , upsertNodeArchive
53 , getNodeStory )
54 where
55
56 -- import Debug.Trace (traceShow)
57 --import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
58 import Codec.Serialise.Class
59 import Control.Arrow (returnA)
60 import Control.Concurrent (MVar(), {-withMVar,-} newMVar, modifyMVar_)
61 import Control.Exception (catch, throw, SomeException(..))
62 import Control.Lens (makeLenses, Getter, (^.), (.~), traverse)
63 import Control.Monad.Except
64 import Control.Monad.Reader
65 import Data.Aeson hiding ((.=), decode)
66 import Data.ByteString.Char8 (hPutStrLn)
67 import Data.Map.Strict (Map)
68 import Data.Maybe (mapMaybe)
69 import Data.Monoid
70 import Data.Pool (Pool, withResource)
71 import Data.Semigroup
72 import qualified Database.PostgreSQL.Simple as PGS
73 import Database.PostgreSQL.Simple.SqlQQ (sql)
74 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
75 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
76 import GHC.Generics (Generic)
77 import Gargantext.API.Ngrams.Types
78 import Gargantext.Core.Types (NodeId(..), NodeType)
79 import Gargantext.Core.Utils.Prefix (unPrefix)
80 import Gargantext.Database.Admin.Config (nodeTypeId)
81 import Gargantext.Database.Prelude (CmdM', HasConnectionPool, HasConfig)
82 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
83 import Gargantext.Prelude
84 import Opaleye (Column, DefaultFromField(..), Insert(..), Select, SqlInt4, SqlJsonb, Table, Update(..), (.==), fromPGSFromField, rCount, restrict, runInsert, runSelect, runUpdate, selectTable, sqlInt4, sqlValueJSONB, tableField, updateEasy)
85 import Opaleye.Internal.Table (Table(..))
86 import System.IO (stderr)
87 import qualified Data.Map.Strict as Map
88 import qualified Data.Map.Strict.Patch as PM
89 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
90
91 ------------------------------------------------------------------------
92 data NodeStoryEnv = NodeStoryEnv
93 { _nse_var :: !(MVar NodeListStory)
94 , _nse_saver :: !(IO ())
95 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
96 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
97 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
98 }
99 deriving (Generic)
100
101 type HasNodeStory env err m = ( CmdM' env err m
102 , MonadReader env m
103 , MonadError err m
104 , HasNodeStoryEnv env
105 , HasConfig env
106 , HasConnectionPool env
107 , HasNodeError err
108 )
109
110 class (HasNodeStoryVar env, HasNodeStorySaver env)
111 => HasNodeStoryEnv env where
112 hasNodeStory :: Getter env NodeStoryEnv
113
114 class HasNodeStoryVar env where
115 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
116
117 class HasNodeStorySaver env where
118 hasNodeStorySaver :: Getter env (IO ())
119
120 ------------------------------------------------------------------------
121
122 {- | Node Story for each NodeType where the Key of the Map is NodeId
123 TODO : generalize for any NodeType, let's start with NodeList which
124 is implemented already
125 -}
126 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
127 deriving (Generic, Show)
128
129 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
130 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
131 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
132
133 data Archive s p = Archive
134 { _a_version :: !Version
135 , _a_state :: !s
136 , _a_history :: ![p]
137 -- first patch in the list is the most recent
138 -- We use `take` in `commitStatePatch`, that's why.
139
140 -- History is immutable, we just insert things on top of existing
141 -- list.
142
143 -- We don't need to store the whole history in memory, this
144 -- structure holds only recent history, the one that will be
145 -- inserted to the DB.
146 }
147 deriving (Generic, Show)
148
149 instance (Serialise s, Serialise p) => Serialise (Archive s p)
150
151
152 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
153
154 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
155 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
156 instance Serialise NgramsStatePatch'
157 instance FromField (Archive NgramsState' NgramsStatePatch')
158 where
159 fromField = fromJSONField
160 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
161 where
162 defaultFromField = fromPGSFromField
163
164 -- TODO Semigroup instance for unions
165 -- TODO check this
166 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
167 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
168 , _a_state = s'
169 , _a_history = p' }) =
170 Archive { _a_version = v'
171 , _a_state = s'
172 , _a_history = p' <> p }
173
174 -- instance Monoid (Archive NgramsState' NgramsStatePatch') where
175 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
176 mempty = Archive { _a_version = 0
177 , _a_state = mempty
178 , _a_history = [] }
179
180 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
181 parseJSON = genericParseJSON $ unPrefix "_a_"
182
183 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
184 toJSON = genericToJSON $ unPrefix "_a_"
185 toEncoding = genericToEncoding $ unPrefix "_a_"
186
187 ------------------------------------------------------------------------
188 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
189 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
190
191 initArchive :: (Monoid s, Semigroup p) => Archive s p
192 initArchive = mempty
193
194 initNodeListStoryMock :: NodeListStory
195 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
196 where
197 nodeListId = 0
198 archive = Archive { _a_version = 0
199 , _a_state = ngramsTableMap
200 , _a_history = [] }
201 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
202 $ Map.fromList
203 [ (n ^. ne_ngrams, ngramsElementToRepo n)
204 | n <- mockTable ^. _NgramsTable
205 ]
206
207 ------------------------------------------------------------------------
208
209
210 ------------------------------------------------------------------------
211 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
212 makeLenses ''NodeStoryEnv
213 makeLenses ''NodeStory
214 makeLenses ''Archive
215
216 -----------------------------------------
217
218
219 data NodeStoryPoly a b = NodeStoryDB { node_id :: a
220 , archive :: b }
221 deriving (Eq)
222
223 type ArchiveQ = Archive NgramsState' NgramsStatePatch'
224
225 type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
226 type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
227
228 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
229
230
231 runPGSExecuteMany :: (PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> [q] -> IO Int64
232 runPGSExecuteMany pool qs a = withResource pool $ \c -> catch (PGS.executeMany c qs a) (printError c)
233 where
234 printError _c (SomeException e) = do
235 --q' <- PGS.formatQuery c qs a
236 --hPutStrLn stderr q'
237 throw (SomeException e)
238
239 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> q -> IO [r]
240 runPGSQuery pool q a = withResource pool $ \c -> catch (PGS.query c q a) (printError c)
241 where
242 printError c (SomeException e) = do
243 q' <- PGS.formatQuery c q a
244 hPutStrLn stderr q'
245 throw (SomeException e)
246
247 nodeExists :: Pool PGS.Connection -> NodeId -> IO Bool
248 nodeExists pool nId = (== [PGS.Only True])
249 <$> runPGSQuery pool [sql|SELECT true FROM nodes WHERE id = ? AND ? |] (nId, True)
250
251 getNodesIdWithType :: Pool PGS.Connection -> NodeType -> IO [NodeId]
252 getNodesIdWithType pool nt = do
253 ns <- runPGSQuery pool query (nodeTypeId nt, True)
254 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
255 where
256 query :: PGS.Query
257 query = [sql|SELECT id FROM nodes WHERE typename = ? AND ? |]
258
259
260
261 nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
262 nodeStoryTable =
263 Table "node_stories"
264 ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
265 , archive = tableField "archive" } )
266
267 nodeStorySelect :: Select NodeStoryRead
268 nodeStorySelect = selectTable nodeStoryTable
269
270 -- TODO Check ordering, "first patch in the _a_history list is the most recent"
271 getNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> IO [NgramsStatePatch']
272 getNodeArchiveHistory pool nodeId = do
273 as <- runPGSQuery pool query (nodeId, True)
274 let asTuples = mapMaybe (\(ngrams_type_id, patch) -> (\ntId -> (ntId, patch)) <$> (TableNgrams.fromNgramsTypeId ngrams_type_id)) as
275 pure $ (\(ntId, patch) -> fst $ PM.singleton ntId patch) <$> asTuples
276 where
277 query :: PGS.Query
278 query = [sql|SELECT ngrams_type_id, patch FROM node_story_archive_history WHERE node_id = ? AND ? |]
279
280 insertNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
281 insertNodeArchiveHistory _ _ [] = pure ()
282 insertNodeArchiveHistory pool nodeId (h:hs) = do
283 _ <- runPGSExecuteMany pool query $ (\(nType, patch) -> (nodeId, TableNgrams.ngramsTypeId nType, patch)) <$> (PM.toList h)
284 _ <- insertNodeArchiveHistory pool nodeId hs
285 pure ()
286 where
287 query :: PGS.Query
288 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, patch) VALUES (?, ?, ?) |]
289
290 getNodeStory :: Pool PGS.Connection -> NodeId -> IO NodeListStory
291 getNodeStory pool (NodeId nodeId) = do
292 res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId ArchiveQ]
293 withArchive <- mapM (\(NodeStoryDB { node_id = nId, archive = Archive { .. } }) -> do
294 --a <- getNodeArchiveHistory pool nId
295 let a = [] :: [NgramsStatePatch']
296 -- Don't read whole history. Only state is needed and most recent changes.
297 pure (nId, Archive { _a_history = a, .. })) res
298 pure $ NodeStory $ Map.fromListWith (<>) withArchive
299 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
300 where
301 query :: Select NodeStoryRead
302 query = proc () -> do
303 row@(NodeStoryDB node_id _) <- nodeStorySelect -< ()
304 restrict -< node_id .== sqlInt4 nodeId
305 returnA -< row
306
307 insertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
308 insertNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
309 ret <- withResource pool $ \c -> runInsert c insert
310 insertNodeArchiveHistory pool nodeId _a_history
311 pure ret
312 where
313 emptyHistory = [] :: [NgramsStatePatch']
314 insert = Insert { iTable = nodeStoryTable
315 , iRows = [NodeStoryDB { node_id = sqlInt4 nId
316 , archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
317 , .. } }]
318 , iReturning = rCount
319 , iOnConflict = Nothing }
320
321 updateNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
322 updateNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
323 ret <- withResource pool $ \c -> runUpdate c update
324 insertNodeArchiveHistory pool nodeId _a_history
325 pure ret
326 where
327 emptyHistory = [] :: [NgramsStatePatch']
328 update = Update { uTable = nodeStoryTable
329 , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) -> NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
330 , ..}
331 , .. })
332 , uWhere = (\row -> node_id row .== sqlInt4 nId)
333 , uReturning = rCount }
334
335 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
336 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
337 -- where
338 -- delete = Delete { dTable = nodeStoryTable
339 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
340 -- , dReturning = rCount }
341
342 upsertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
343 upsertNodeArchive pool nId a = do
344 (NodeStory m) <- getNodeStory pool nId
345 case Map.lookup nId m of
346 Nothing -> insertNodeArchive pool nId a
347 Just _ -> updateNodeArchive pool nId a
348
349 writeNodeStories :: Pool PGS.Connection -> NodeListStory -> IO ()
350 writeNodeStories pool (NodeStory nls) = do
351 _ <- mapM (\(nId, a) -> upsertNodeArchive pool nId a) $ Map.toList nls
352 pure ()
353
354 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
355 nodeStoryInc :: Pool PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
356 nodeStoryInc pool Nothing nId = getNodeStory pool nId
357 nodeStoryInc pool (Just ns@(NodeStory nls)) nId = do
358 case Map.lookup nId nls of
359 Nothing -> do
360 (NodeStory nls') <- getNodeStory pool nId
361 pure $ NodeStory $ Map.union nls nls'
362 Just _ -> pure ns
363
364 nodeStoryIncs :: Pool PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
365 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
366 nodeStoryIncs pool (Just nls) ns = foldM (\m n -> nodeStoryInc pool (Just m) n) nls ns
367 nodeStoryIncs pool Nothing (ni:ns) = do
368 m <- getNodeStory pool ni
369 nodeStoryIncs pool (Just m) ns
370
371 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
372 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
373 -- case Map.lookup ni nls of
374 -- Nothing -> do
375 -- _ <- nodeStoryRemove pool ni
376 -- pure ns
377 -- Just _ -> do
378 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
379 -- _ <- nodeStoryRemove pool ni
380 -- pure $ NodeStory ns'
381 ------------------------------------
382
383 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
384 readNodeStoryEnv pool = do
385 mvar <- nodeStoryVar pool Nothing []
386 -- saver <- mkNodeStorySaver pool mvar
387 let saver = modifyMVar_ mvar $ \mv -> do
388 writeNodeStories pool mv
389 printDebug "[readNodeStoryEnv] saver" mv
390 let mv' = clearHistory mv
391 printDebug "[readNodeStoryEnv] saver, cleared" mv'
392 return mv'
393 pure $ NodeStoryEnv { _nse_var = mvar
394 , _nse_saver = saver
395 , _nse_getter = nodeStoryVar pool (Just mvar) }
396
397 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
398 nodeStoryVar pool Nothing nIds = do
399 state <- nodeStoryIncs pool Nothing nIds
400 newMVar state
401 nodeStoryVar pool (Just mv) nIds = do
402 _ <- modifyMVar_ mv $ \nsl -> (nodeStoryIncs pool (Just nsl) nIds)
403 pure mv
404
405 -- TODO No debounce since this is IO stuff.
406 -- debounce is useful since it could delay the saving to some later
407 -- time, asynchronously and we keep operating on memory only.
408 {-
409 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
410 mkNodeStorySaver pool mvns = mkDebounce settings
411 where
412 settings = defaultDebounceSettings
413 { debounceAction = do
414 withMVar mvns (\ns -> writeNodeStories pool ns)
415 withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
416 modifyMVar_ mvns $ \ns -> pure $ clearAHistoryToInsert ns
417 , debounceFreq = 1*minute
418 }
419 minute = 60*second
420 second = 10^(6 :: Int)
421 -}
422
423 clearHistory :: NodeListStory -> NodeListStory
424 -- clearHistory (NodeStory ns) =
425
426 -- NodeStory $ Map.map (\(Archive { .. }) -> Archive { _a_history_to_insert = emptyHistory, .. }) ns
427 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
428 where
429 emptyHistory = [] :: [NgramsStatePatch']
430
431 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
432 -- mkNodeStorySaver mvns = mkDebounce settings
433 -- where
434 -- settings = defaultDebounceSettings
435 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
436 -- , debounceFreq = 1 * minute
437 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
438 -- }
439 -- minute = 60 * second
440 -- second = 10^(6 :: Int)
441
442
443 -----------------------------------------