]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
[nodeStory] migration script gets repo filepath from env
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 Couple of words on how this is implemented.
14
15 First version used files which stored Archive for each NodeId in a
16 separate .cbor file.
17
18 For performance reasons, it is rewritten to use the DB.
19
20 The table `node_stories` contains two columns: `node_id` and
21 `archive`.
22
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
28
29 Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
34
35 Please note that
36
37 TODO:
38 - remove
39 - filter
40 - charger les listes
41 -}
42
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
48
49 module Gargantext.Core.NodeStory
50 ( HasNodeStory
51 , HasNodeStoryEnv
52 , hasNodeStory
53 , HasNodeStoryVar
54 , hasNodeStoryVar
55 , HasNodeStorySaver
56 , hasNodeStorySaver
57 , NodeStory(..)
58 , NgramsStatePatch'
59 , NodeListStory
60 , initNodeListStoryMock
61 , NodeStoryEnv(..)
62 , initNodeStory
63 , nse_getter
64 , nse_saver
65 , nse_var
66 , unNodeStory
67 , getNodeArchiveHistory
68 , Archive(..)
69 , initArchive
70 , a_history
71 , a_state
72 , a_version
73 , nodeExists
74 , getNodesIdWithType
75 , readNodeStoryEnv
76 , upsertNodeArchive
77 , getNodeStory )
78 where
79
80 -- import Debug.Trace (traceShow)
81 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
82 import Codec.Serialise.Class
83 import Control.Arrow (returnA)
84 import Control.Concurrent (MVar(), withMVar, newMVar, modifyMVar_)
85 import Control.Exception (catch, throw, SomeException(..))
86 import Control.Lens (makeLenses, Getter, (^.), (.~), traverse)
87 import Control.Monad.Except
88 import Control.Monad.Reader
89 import Data.Aeson hiding ((.=), decode)
90 import Data.ByteString.Char8 (hPutStrLn)
91 import Data.Map.Strict (Map)
92 import Data.Maybe (mapMaybe)
93 import Data.Monoid
94 import Data.Pool (Pool, withResource)
95 import Data.Semigroup
96 import qualified Database.PostgreSQL.Simple as PGS
97 import Database.PostgreSQL.Simple.SqlQQ (sql)
98 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
99 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
100 import GHC.Generics (Generic)
101 import Gargantext.API.Ngrams.Types
102 import Gargantext.Core.Types (NodeId(..), NodeType)
103 import Gargantext.Core.Utils.Prefix (unPrefix)
104 import Gargantext.Database.Admin.Config (nodeTypeId)
105 import Gargantext.Database.Prelude (CmdM', HasConnectionPool, HasConfig)
106 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
107 import Gargantext.Prelude
108 import Opaleye (Column, DefaultFromField(..), Insert(..), Select, SqlInt4, SqlJsonb, Table, Update(..), (.==), fromPGSFromField, rCount, restrict, runInsert, runSelect, runUpdate, selectTable, sqlInt4, sqlValueJSONB, tableField, updateEasy)
109 import Opaleye.Internal.Table (Table(..))
110 import System.IO (stderr)
111 import qualified Data.Map.Strict as Map
112 import qualified Data.Map.Strict.Patch as PM
113 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
114
115 ------------------------------------------------------------------------
116 data NodeStoryEnv = NodeStoryEnv
117 { _nse_var :: !(MVar NodeListStory)
118 , _nse_saver :: !(IO ())
119 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
120 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
121 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
122 }
123 deriving (Generic)
124
125 type HasNodeStory env err m = ( CmdM' env err m
126 , MonadReader env m
127 , MonadError err m
128 , HasNodeStoryEnv env
129 , HasConfig env
130 , HasConnectionPool env
131 , HasNodeError err
132 )
133
134 class (HasNodeStoryVar env, HasNodeStorySaver env)
135 => HasNodeStoryEnv env where
136 hasNodeStory :: Getter env NodeStoryEnv
137
138 class HasNodeStoryVar env where
139 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
140
141 class HasNodeStorySaver env where
142 hasNodeStorySaver :: Getter env (IO ())
143
144 ------------------------------------------------------------------------
145
146 {- | Node Story for each NodeType where the Key of the Map is NodeId
147 TODO : generalize for any NodeType, let's start with NodeList which
148 is implemented already
149 -}
150 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
151 deriving (Generic, Show)
152
153 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
154 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
155 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
156
157 data Archive s p = Archive
158 { _a_version :: !Version
159 , _a_state :: !s
160 , _a_history :: ![p]
161 -- first patch in the list is the most recent
162 -- We use `take` in `commitStatePatch`, that's why.
163
164 -- History is immutable, we just insert things on top of existing
165 -- list.
166
167 -- We don't need to store the whole history in memory, this
168 -- structure holds only recent history, the one that will be
169 -- inserted to the DB.
170 }
171 deriving (Generic, Show)
172
173 instance (Serialise s, Serialise p) => Serialise (Archive s p)
174
175
176 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
177
178 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
179 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
180 instance Serialise NgramsStatePatch'
181 instance FromField (Archive NgramsState' NgramsStatePatch')
182 where
183 fromField = fromJSONField
184 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
185 where
186 defaultFromField = fromPGSFromField
187
188 -- TODO Semigroup instance for unions
189 -- TODO check this
190 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
191 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
192 , _a_state = s'
193 , _a_history = p' }) =
194 Archive { _a_version = v'
195 , _a_state = s'
196 , _a_history = p' <> p }
197
198 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
199 mempty = Archive { _a_version = 0
200 , _a_state = mempty
201 , _a_history = [] }
202
203 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
204 parseJSON = genericParseJSON $ unPrefix "_a_"
205
206 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
207 toJSON = genericToJSON $ unPrefix "_a_"
208 toEncoding = genericToEncoding $ unPrefix "_a_"
209
210 ------------------------------------------------------------------------
211 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
212 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
213
214 initArchive :: (Monoid s, Semigroup p) => Archive s p
215 initArchive = mempty
216
217 initNodeListStoryMock :: NodeListStory
218 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
219 where
220 nodeListId = 0
221 archive = Archive { _a_version = 0
222 , _a_state = ngramsTableMap
223 , _a_history = [] }
224 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
225 $ Map.fromList
226 [ (n ^. ne_ngrams, ngramsElementToRepo n)
227 | n <- mockTable ^. _NgramsTable
228 ]
229
230 ------------------------------------------------------------------------
231
232
233 ------------------------------------------------------------------------
234 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
235 makeLenses ''NodeStoryEnv
236 makeLenses ''NodeStory
237 makeLenses ''Archive
238
239 -----------------------------------------
240
241
242 data NodeStoryPoly a b = NodeStoryDB { node_id :: a
243 , archive :: b }
244 deriving (Eq)
245
246 type ArchiveQ = Archive NgramsState' NgramsStatePatch'
247
248 type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
249 type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
250
251 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
252
253
254 runPGSExecuteMany :: (PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> [q] -> IO Int64
255 runPGSExecuteMany pool qs a = withResource pool $ \c -> catch (PGS.executeMany c qs a) (printError c)
256 where
257 printError _c (SomeException e) = do
258 --q' <- PGS.formatQuery c qs a
259 --hPutStrLn stderr q'
260 throw (SomeException e)
261
262 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> q -> IO [r]
263 runPGSQuery pool q a = withResource pool $ \c -> catch (PGS.query c q a) (printError c)
264 where
265 printError c (SomeException e) = do
266 q' <- PGS.formatQuery c q a
267 hPutStrLn stderr q'
268 throw (SomeException e)
269
270 nodeExists :: Pool PGS.Connection -> NodeId -> IO Bool
271 nodeExists pool nId = (== [PGS.Only True])
272 <$> runPGSQuery pool [sql|SELECT true FROM nodes WHERE id = ? AND ? |] (nId, True)
273
274 getNodesIdWithType :: Pool PGS.Connection -> NodeType -> IO [NodeId]
275 getNodesIdWithType pool nt = do
276 ns <- runPGSQuery pool query (nodeTypeId nt, True)
277 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
278 where
279 query :: PGS.Query
280 query = [sql|SELECT id FROM nodes WHERE typename = ? AND ? |]
281
282
283
284 nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
285 nodeStoryTable =
286 Table "node_stories"
287 ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
288 , archive = tableField "archive" } )
289
290 nodeStorySelect :: Select NodeStoryRead
291 nodeStorySelect = selectTable nodeStoryTable
292
293 -- TODO Check ordering, "first patch in the _a_history list is the most recent"
294 getNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> IO [NgramsStatePatch']
295 getNodeArchiveHistory pool nodeId = do
296 as <- runPGSQuery pool query (nodeId, True)
297 let asTuples = mapMaybe (\(ngrams_type_id, patch) -> (\ntId -> (ntId, patch)) <$> (TableNgrams.fromNgramsTypeId ngrams_type_id)) as
298 pure $ (\(ntId, patch) -> fst $ PM.singleton ntId patch) <$> asTuples
299 where
300 query :: PGS.Query
301 query = [sql|SELECT ngrams_type_id, patch FROM node_story_archive_history WHERE node_id = ? AND ? |]
302
303 insertNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
304 insertNodeArchiveHistory _ _ [] = pure ()
305 insertNodeArchiveHistory pool nodeId (h:hs) = do
306 _ <- runPGSExecuteMany pool query $ (\(nType, patch) -> (nodeId, TableNgrams.ngramsTypeId nType, patch)) <$> (PM.toList h)
307 _ <- insertNodeArchiveHistory pool nodeId hs
308 pure ()
309 where
310 query :: PGS.Query
311 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, patch) VALUES (?, ?, ?) |]
312
313 getNodeStory :: Pool PGS.Connection -> NodeId -> IO NodeListStory
314 getNodeStory pool (NodeId nodeId) = do
315 res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId ArchiveQ]
316 withArchive <- mapM (\(NodeStoryDB { node_id = nId, archive = Archive { .. } }) -> do
317 --a <- getNodeArchiveHistory pool nId
318 let a = [] :: [NgramsStatePatch']
319 -- Don't read whole history. Only state is needed and most recent changes.
320 pure (nId, Archive { _a_history = a, .. })) res
321 pure $ NodeStory $ Map.fromListWith (<>) withArchive
322 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
323 where
324 query :: Select NodeStoryRead
325 query = proc () -> do
326 row@(NodeStoryDB node_id _) <- nodeStorySelect -< ()
327 restrict -< node_id .== sqlInt4 nodeId
328 returnA -< row
329
330 insertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
331 insertNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
332 ret <- withResource pool $ \c -> runInsert c insert
333 -- NOTE: It is assumed that the most recent change is the first in the
334 -- list, so we save these in reverse order
335 insertNodeArchiveHistory pool nodeId $ reverse _a_history
336 pure ret
337 where
338 emptyHistory = [] :: [NgramsStatePatch']
339 insert = Insert { iTable = nodeStoryTable
340 , iRows = [NodeStoryDB { node_id = sqlInt4 nId
341 , archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
342 , .. } }]
343 , iReturning = rCount
344 , iOnConflict = Nothing }
345
346 updateNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
347 updateNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
348 ret <- withResource pool $ \c -> runUpdate c update
349 -- NOTE: It is assumed that the most recent change is the first in the
350 -- list, so we save these in reverse order
351 insertNodeArchiveHistory pool nodeId $ reverse _a_history
352 pure ret
353 where
354 emptyHistory = [] :: [NgramsStatePatch']
355 update = Update { uTable = nodeStoryTable
356 , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) -> NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
357 , ..}
358 , .. })
359 , uWhere = (\row -> node_id row .== sqlInt4 nId)
360 , uReturning = rCount }
361
362 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
363 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
364 -- where
365 -- delete = Delete { dTable = nodeStoryTable
366 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
367 -- , dReturning = rCount }
368
369 upsertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
370 upsertNodeArchive pool nId a = do
371 (NodeStory m) <- getNodeStory pool nId
372 case Map.lookup nId m of
373 Nothing -> insertNodeArchive pool nId a
374 Just _ -> updateNodeArchive pool nId a
375
376 writeNodeStories :: Pool PGS.Connection -> NodeListStory -> IO ()
377 writeNodeStories pool (NodeStory nls) = do
378 _ <- mapM (\(nId, a) -> upsertNodeArchive pool nId a) $ Map.toList nls
379 pure ()
380
381 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
382 nodeStoryInc :: Pool PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
383 nodeStoryInc pool Nothing nId = getNodeStory pool nId
384 nodeStoryInc pool (Just ns@(NodeStory nls)) nId = do
385 case Map.lookup nId nls of
386 Nothing -> do
387 (NodeStory nls') <- getNodeStory pool nId
388 pure $ NodeStory $ Map.union nls nls'
389 Just _ -> pure ns
390
391 nodeStoryIncs :: Pool PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
392 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
393 nodeStoryIncs pool (Just nls) ns = foldM (\m n -> nodeStoryInc pool (Just m) n) nls ns
394 nodeStoryIncs pool Nothing (ni:ns) = do
395 m <- getNodeStory pool ni
396 nodeStoryIncs pool (Just m) ns
397
398 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
399 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
400 -- case Map.lookup ni nls of
401 -- Nothing -> do
402 -- _ <- nodeStoryRemove pool ni
403 -- pure ns
404 -- Just _ -> do
405 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
406 -- _ <- nodeStoryRemove pool ni
407 -- pure $ NodeStory ns'
408 ------------------------------------
409
410 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
411 readNodeStoryEnv pool = do
412 mvar <- nodeStoryVar pool Nothing []
413 saver <- mkNodeStorySaver pool mvar
414 -- let saver = modifyMVar_ mvar $ \mv -> do
415 -- writeNodeStories pool mv
416 -- printDebug "[readNodeStoryEnv] saver" mv
417 -- let mv' = clearHistory mv
418 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
419 -- return mv'
420 pure $ NodeStoryEnv { _nse_var = mvar
421 , _nse_saver = saver
422 , _nse_getter = nodeStoryVar pool (Just mvar) }
423
424 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
425 nodeStoryVar pool Nothing nIds = do
426 state <- nodeStoryIncs pool Nothing nIds
427 newMVar state
428 nodeStoryVar pool (Just mv) nIds = do
429 _ <- modifyMVar_ mv $ \nsl -> (nodeStoryIncs pool (Just nsl) nIds)
430 pure mv
431
432 -- Debounce is useful since it could delay the saving to some later
433 -- time, asynchronously and we keep operating on memory only.
434 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
435 mkNodeStorySaver pool mvns = mkDebounce settings
436 where
437 settings = defaultDebounceSettings
438 { debounceAction = do
439 withMVar mvns (\ns -> writeNodeStories pool ns)
440 withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
441 modifyMVar_ mvns $ \ns -> pure $ clearHistory ns
442 , debounceFreq = 1*minute
443 }
444 minute = 60*second
445 second = 10^(6 :: Int)
446
447 clearHistory :: NodeListStory -> NodeListStory
448 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
449 where
450 emptyHistory = [] :: [NgramsStatePatch']
451
452 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
453 -- mkNodeStorySaver mvns = mkDebounce settings
454 -- where
455 -- settings = defaultDebounceSettings
456 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
457 -- , debounceFreq = 1 * minute
458 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
459 -- }
460 -- minute = 60 * second
461 -- second = 10^(6 :: Int)
462
463
464 -----------------------------------------