]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
[VERSION] +1 to 0.0.6.1
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 Couple of words on how this is implemented.
14
15 First version used files which stored Archive for each NodeId in a
16 separate .cbor file.
17
18 For performance reasons, it is rewritten to use the DB.
19
20 The table `node_stories` contains two columns: `node_id` and
21 `archive`.
22
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
28
29 Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
34
35 Please note that
36
37 TODO:
38 - remove
39 - filter
40 - charger les listes
41 -}
42
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
48
49 module Gargantext.Core.NodeStory
50 ( HasNodeStory
51 , HasNodeStoryEnv
52 , hasNodeStory
53 , HasNodeStoryVar
54 , hasNodeStoryVar
55 , HasNodeStorySaver
56 , hasNodeStorySaver
57 , NodeStory(..)
58 , NgramsStatePatch'
59 , NodeListStory
60 , initNodeListStoryMock
61 , NodeStoryEnv(..)
62 , initNodeStory
63 , nse_getter
64 , nse_saver
65 , nse_var
66 , unNodeStory
67 , getNodeArchiveHistory
68 , Archive(..)
69 , initArchive
70 , a_history
71 , a_state
72 , a_version
73 , nodeExists
74 , getNodesIdWithType
75 , readNodeStoryEnv
76 , upsertNodeArchive
77 , getNodeStory )
78 where
79
80 -- import Debug.Trace (traceShow)
81 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
82 import Codec.Serialise.Class
83 import Control.Arrow (returnA)
84 import Control.Concurrent (MVar(), withMVar, newMVar, modifyMVar_)
85 import Control.Exception (catch, throw, SomeException(..))
86 import Control.Lens (makeLenses, Getter, (^.), (.~), traverse)
87 import Control.Monad.Except
88 import Control.Monad.Reader
89 import Data.Aeson hiding ((.=), decode)
90 import Data.ByteString.Char8 (hPutStrLn)
91 import Data.Map.Strict (Map)
92 import Data.Maybe (catMaybes, mapMaybe)
93 import Data.Monoid
94 import Data.Pool (Pool, withResource)
95 import Data.Semigroup
96 import qualified Database.PostgreSQL.Simple as PGS
97 import Database.PostgreSQL.Simple.SqlQQ (sql)
98 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
99 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
100 import GHC.Generics (Generic)
101 import Gargantext.API.Ngrams.Types
102 import Gargantext.Core.Types (NodeId(..), NodeType)
103 import Gargantext.Core.Utils.Prefix (unPrefix)
104 import Gargantext.Database.Admin.Config (nodeTypeId)
105 import Gargantext.Database.Prelude (CmdM', HasConnectionPool, HasConfig)
106 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
107 import Gargantext.Prelude
108 import Opaleye (Column, DefaultFromField(..), Insert(..), Select, SqlInt4, SqlJsonb, Table, Update(..), (.==), fromPGSFromField, rCount, restrict, runInsert, runSelect, runUpdate, selectTable, sqlInt4, sqlValueJSONB, tableField, updateEasy)
109 import Opaleye.Internal.Table (Table(..))
110 import System.IO (stderr)
111 import qualified Data.Map.Strict as Map
112 import qualified Data.Map.Strict.Patch as PM
113 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
114
115 ------------------------------------------------------------------------
116 data NodeStoryEnv = NodeStoryEnv
117 { _nse_var :: !(MVar NodeListStory)
118 , _nse_saver :: !(IO ())
119 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
120 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
121 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
122 }
123 deriving (Generic)
124
125 type HasNodeStory env err m = ( CmdM' env err m
126 , MonadReader env m
127 , MonadError err m
128 , HasNodeStoryEnv env
129 , HasConfig env
130 , HasConnectionPool env
131 , HasNodeError err
132 )
133
134 class (HasNodeStoryVar env, HasNodeStorySaver env)
135 => HasNodeStoryEnv env where
136 hasNodeStory :: Getter env NodeStoryEnv
137
138 class HasNodeStoryVar env where
139 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
140
141 class HasNodeStorySaver env where
142 hasNodeStorySaver :: Getter env (IO ())
143
144 ------------------------------------------------------------------------
145
146 {- | Node Story for each NodeType where the Key of the Map is NodeId
147 TODO : generalize for any NodeType, let's start with NodeList which
148 is implemented already
149 -}
150 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
151 deriving (Generic, Show)
152
153 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
154 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
155 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
156
157 data Archive s p = Archive
158 { _a_version :: !Version
159 , _a_state :: !s
160 , _a_history :: ![p]
161 -- first patch in the list is the most recent
162 -- We use `take` in `commitStatePatch`, that's why.
163
164 -- History is immutable, we just insert things on top of existing
165 -- list.
166
167 -- We don't need to store the whole history in memory, this
168 -- structure holds only recent history, the one that will be
169 -- inserted to the DB.
170 }
171 deriving (Generic, Show)
172
173 instance (Serialise s, Serialise p) => Serialise (Archive s p)
174
175
176 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
177
178 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
179 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
180 instance Serialise NgramsStatePatch'
181 instance FromField (Archive NgramsState' NgramsStatePatch')
182 where
183 fromField = fromJSONField
184 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
185 where
186 defaultFromField = fromPGSFromField
187
188 -- TODO Semigroup instance for unions
189 -- TODO check this
190 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
191 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
192 , _a_state = s'
193 , _a_history = p' }) =
194 Archive { _a_version = v'
195 , _a_state = s'
196 , _a_history = p' <> p }
197
198 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
199 mempty = Archive { _a_version = 0
200 , _a_state = mempty
201 , _a_history = [] }
202
203 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
204 parseJSON = genericParseJSON $ unPrefix "_a_"
205
206 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
207 toJSON = genericToJSON $ unPrefix "_a_"
208 toEncoding = genericToEncoding $ unPrefix "_a_"
209
210 ------------------------------------------------------------------------
211 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
212 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
213
214 initArchive :: (Monoid s, Semigroup p) => Archive s p
215 initArchive = mempty
216
217 initNodeListStoryMock :: NodeListStory
218 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
219 where
220 nodeListId = 0
221 archive = Archive { _a_version = 0
222 , _a_state = ngramsTableMap
223 , _a_history = [] }
224 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
225 $ Map.fromList
226 [ (n ^. ne_ngrams, ngramsElementToRepo n)
227 | n <- mockTable ^. _NgramsTable
228 ]
229
230 ------------------------------------------------------------------------
231
232
233 ------------------------------------------------------------------------
234 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
235 makeLenses ''NodeStoryEnv
236 makeLenses ''NodeStory
237 makeLenses ''Archive
238
239 -----------------------------------------
240
241
242 data NodeStoryPoly a b = NodeStoryDB { node_id :: a
243 , archive :: b }
244 deriving (Eq)
245
246 type ArchiveQ = Archive NgramsState' NgramsStatePatch'
247
248 type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
249 type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
250
251 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
252
253
254 runPGSExecuteMany :: (PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> [q] -> IO Int64
255 runPGSExecuteMany pool qs a = withResource pool $ \c -> catch (PGS.executeMany c qs a) (printError c)
256 where
257 printError _c (SomeException e) = do
258 --q' <- PGS.formatQuery c qs a
259 --hPutStrLn stderr q'
260 throw (SomeException e)
261
262 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> q -> IO [r]
263 runPGSQuery pool q a = withResource pool $ \c -> catch (PGS.query c q a) (printError c)
264 where
265 printError c (SomeException e) = do
266 q' <- PGS.formatQuery c q a
267 hPutStrLn stderr q'
268 throw (SomeException e)
269
270 nodeExists :: Pool PGS.Connection -> NodeId -> IO Bool
271 nodeExists pool nId = (== [PGS.Only True])
272 <$> runPGSQuery pool [sql|SELECT true FROM nodes WHERE id = ? AND ? |] (nId, True)
273
274 getNodesIdWithType :: Pool PGS.Connection -> NodeType -> IO [NodeId]
275 getNodesIdWithType pool nt = do
276 ns <- runPGSQuery pool query (nodeTypeId nt, True)
277 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
278 where
279 query :: PGS.Query
280 query = [sql|SELECT id FROM nodes WHERE typename = ? AND ? |]
281
282
283
284 nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
285 nodeStoryTable =
286 Table "node_stories"
287 ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
288 , archive = tableField "archive" } )
289
290 nodeStorySelect :: Select NodeStoryRead
291 nodeStorySelect = selectTable nodeStoryTable
292
293 -- TODO Check ordering, "first patch in the _a_history list is the most recent"
294 getNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> IO [NgramsStatePatch']
295 getNodeArchiveHistory pool nodeId = do
296 as <- runPGSQuery pool query (nodeId, True)
297 let asTuples = mapMaybe (\(ngrams_type_id, ngrams, patch) -> (\ntId -> (ntId, ngrams, patch)) <$> (TableNgrams.fromNgramsTypeId ngrams_type_id)) as
298 pure $ (\(ntId, terms, patch) -> fst $ PM.singleton ntId (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> asTuples
299 where
300 query :: PGS.Query
301 query = [sql|SELECT ngrams_type_id, terms, patch
302 FROM node_story_archive_history
303 JOIN ngrams ON ngrams.id = ngrams_id
304 WHERE node_id = ? AND ? |]
305
306 insertNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
307 insertNodeArchiveHistory _ _ [] = pure ()
308 insertNodeArchiveHistory pool nodeId (h:hs) = do
309 let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
310 (\(term, p) ->
311 (nodeId, TableNgrams.ngramsTypeId nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsTypeId, NgramsTerm, NgramsPatch)]
312 tuplesM <- mapM (\(nId, nTypeId, term, patch) -> do
313 ngrams <- runPGSQuery pool ngramsQuery (term, True)
314 pure $ (\(PGS.Only termId) -> (nId, nTypeId, termId, term, patch)) <$> (headMay ngrams)
315 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsTypeId, Int, NgramsTerm, NgramsPatch)]
316 _ <- runPGSExecuteMany pool query $ ((\(nId, nTypeId, termId, _term, patch) -> (nId, nTypeId, termId, patch)) <$> (catMaybes tuplesM))
317 _ <- insertNodeArchiveHistory pool nodeId hs
318 pure ()
319 where
320 ngramsQuery :: PGS.Query
321 ngramsQuery = [sql| SELECT id FROM ngrams WHERE terms = ? AND ? |]
322
323 query :: PGS.Query
324 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch) VALUES (?, ?, ?, ?) |]
325
326 getNodeStory :: Pool PGS.Connection -> NodeId -> IO NodeListStory
327 getNodeStory pool (NodeId nodeId) = do
328 res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId ArchiveQ]
329 withArchive <- mapM (\(NodeStoryDB { node_id = nId, archive = Archive { .. } }) -> do
330 --a <- getNodeArchiveHistory pool nId
331 let a = [] :: [NgramsStatePatch']
332 -- Don't read whole history. Only state is needed and most recent changes.
333 pure (nId, Archive { _a_history = a, .. })) res
334 pure $ NodeStory $ Map.fromListWith (<>) withArchive
335 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
336 where
337 query :: Select NodeStoryRead
338 query = proc () -> do
339 row@(NodeStoryDB node_id _) <- nodeStorySelect -< ()
340 restrict -< node_id .== sqlInt4 nodeId
341 returnA -< row
342
343 insertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
344 insertNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
345 ret <- withResource pool $ \c -> runInsert c insert
346 -- NOTE: It is assumed that the most recent change is the first in the
347 -- list, so we save these in reverse order
348 insertNodeArchiveHistory pool nodeId $ reverse _a_history
349 pure ret
350 where
351 emptyHistory = [] :: [NgramsStatePatch']
352 insert = Insert { iTable = nodeStoryTable
353 , iRows = [NodeStoryDB { node_id = sqlInt4 nId
354 , archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
355 , .. } }]
356 , iReturning = rCount
357 , iOnConflict = Nothing }
358
359 updateNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
360 updateNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
361 ret <- withResource pool $ \c -> runUpdate c update
362 -- NOTE: It is assumed that the most recent change is the first in the
363 -- list, so we save these in reverse order
364 insertNodeArchiveHistory pool nodeId $ reverse _a_history
365 pure ret
366 where
367 emptyHistory = [] :: [NgramsStatePatch']
368 update = Update { uTable = nodeStoryTable
369 , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) -> NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
370 , ..}
371 , .. })
372 , uWhere = (\row -> node_id row .== sqlInt4 nId)
373 , uReturning = rCount }
374
375 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
376 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
377 -- where
378 -- delete = Delete { dTable = nodeStoryTable
379 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
380 -- , dReturning = rCount }
381
382 upsertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
383 upsertNodeArchive pool nId a = do
384 (NodeStory m) <- getNodeStory pool nId
385 case Map.lookup nId m of
386 Nothing -> insertNodeArchive pool nId a
387 Just _ -> updateNodeArchive pool nId a
388
389 writeNodeStories :: Pool PGS.Connection -> NodeListStory -> IO ()
390 writeNodeStories pool (NodeStory nls) = do
391 _ <- mapM (\(nId, a) -> upsertNodeArchive pool nId a) $ Map.toList nls
392 pure ()
393
394 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
395 nodeStoryInc :: Pool PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
396 nodeStoryInc pool Nothing nId = getNodeStory pool nId
397 nodeStoryInc pool (Just ns@(NodeStory nls)) nId = do
398 case Map.lookup nId nls of
399 Nothing -> do
400 (NodeStory nls') <- getNodeStory pool nId
401 pure $ NodeStory $ Map.union nls nls'
402 Just _ -> pure ns
403
404 nodeStoryIncs :: Pool PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
405 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
406 nodeStoryIncs pool (Just nls) ns = foldM (\m n -> nodeStoryInc pool (Just m) n) nls ns
407 nodeStoryIncs pool Nothing (ni:ns) = do
408 m <- getNodeStory pool ni
409 nodeStoryIncs pool (Just m) ns
410
411 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
412 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
413 -- case Map.lookup ni nls of
414 -- Nothing -> do
415 -- _ <- nodeStoryRemove pool ni
416 -- pure ns
417 -- Just _ -> do
418 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
419 -- _ <- nodeStoryRemove pool ni
420 -- pure $ NodeStory ns'
421 ------------------------------------
422
423 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
424 readNodeStoryEnv pool = do
425 mvar <- nodeStoryVar pool Nothing []
426 saver <- mkNodeStorySaver pool mvar
427 -- let saver = modifyMVar_ mvar $ \mv -> do
428 -- writeNodeStories pool mv
429 -- printDebug "[readNodeStoryEnv] saver" mv
430 -- let mv' = clearHistory mv
431 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
432 -- return mv'
433 pure $ NodeStoryEnv { _nse_var = mvar
434 , _nse_saver = saver
435 , _nse_getter = nodeStoryVar pool (Just mvar) }
436
437 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
438 nodeStoryVar pool Nothing nIds = do
439 state <- nodeStoryIncs pool Nothing nIds
440 newMVar state
441 nodeStoryVar pool (Just mv) nIds = do
442 _ <- modifyMVar_ mv $ \nsl -> (nodeStoryIncs pool (Just nsl) nIds)
443 pure mv
444
445 -- Debounce is useful since it could delay the saving to some later
446 -- time, asynchronously and we keep operating on memory only.
447 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
448 mkNodeStorySaver pool mvns = mkDebounce settings
449 where
450 settings = defaultDebounceSettings
451 { debounceAction = do
452 withMVar mvns (\ns -> writeNodeStories pool ns)
453 withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
454 modifyMVar_ mvns $ \ns -> pure $ clearHistory ns
455 , debounceFreq = 1*minute
456 }
457 minute = 60*second
458 second = 10^(6 :: Int)
459
460 clearHistory :: NodeListStory -> NodeListStory
461 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
462 where
463 emptyHistory = [] :: [NgramsStatePatch']
464
465 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
466 -- mkNodeStorySaver mvns = mkDebounce settings
467 -- where
468 -- settings = defaultDebounceSettings
469 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
470 -- , debounceFreq = 1 * minute
471 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
472 -- }
473 -- minute = 60 * second
474 -- second = 10^(6 :: Int)
475
476
477 -----------------------------------------