]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
Merge branch 'dev' into 141-dev-node-stories-db-optimization
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 Couple of words on how this is implemented.
14
15 First version used files which stored Archive for each NodeId in a
16 separate .cbor file.
17
18 For performance reasons, it is rewritten to use the DB.
19
20 The table `node_stories` contains two columns: `node_id` and
21 `archive`.
22
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
28
29 Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
34
35 Please note that
36
37 TODO:
38 - remove
39 - filter
40 - charger les listes
41 -}
42
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
48
49 module Gargantext.Core.NodeStory
50 ( HasNodeStory
51 , HasNodeStoryEnv
52 , hasNodeStory
53 , HasNodeStoryVar
54 , hasNodeStoryVar
55 , HasNodeStorySaver
56 , hasNodeStorySaver
57 , HasNodeStoryImmediateSaver
58 , hasNodeStoryImmediateSaver
59 , NodeStory(..)
60 , NgramsStatePatch'
61 , NodeListStory
62 , initNodeListStoryMock
63 , NodeStoryEnv(..)
64 , initNodeStory
65 , nse_getter
66 , nse_saver
67 , nse_saver_immediate
68 , nse_var
69 , unNodeStory
70 , getNodeArchiveHistory
71 , Archive(..)
72 , initArchive
73 , insertArchiveList
74 , deleteArchiveList
75 , updateArchiveList
76 , a_history
77 , a_state
78 , a_version
79 , nodeExists
80 , runPGSQuery
81 , runPGSAdvisoryLock
82 , runPGSAdvisoryUnlock
83 , runPGSAdvisoryXactLock
84 , getNodesIdWithType
85 , readNodeStoryEnv
86 , upsertNodeStories
87 , getNodeStory
88 , nodeStoriesQuery
89 , currentVersion )
90 where
91
92 -- import Debug.Trace (traceShow)
93 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
94 import Codec.Serialise.Class
95 import Control.Concurrent (MVar(), newMVar, modifyMVar_)
96 import Control.Exception (catch, throw, SomeException(..))
97 import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
98 import Control.Monad.Except
99 import Control.Monad.Reader
100 import Data.Aeson hiding ((.=), decode)
101 import Data.ByteString.Char8 (hPutStrLn)
102 import Data.Map.Strict (Map)
103 import Data.Maybe (catMaybes)
104 import Data.Monoid
105 import Data.Pool (Pool, withResource)
106 import Data.Semigroup
107 import Database.PostgreSQL.Simple.SqlQQ (sql)
108 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
109 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
110 import GHC.Generics (Generic)
111 import Gargantext.API.Ngrams.Types
112 import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
113 import Gargantext.Core.Utils.Prefix (unPrefix)
114 import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
115 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
116 import Gargantext.Prelude
117 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
118 import System.IO (stderr)
119 import qualified Data.Map.Strict as Map
120 import qualified Data.Map.Strict.Patch as PM
121 import qualified Data.Set as Set
122 import qualified Data.Text as Text
123 import qualified Database.PostgreSQL.Simple as PGS
124 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
125
126 ------------------------------------------------------------------------
127 data NodeStoryEnv = NodeStoryEnv
128 { _nse_var :: !(MVar NodeListStory)
129 , _nse_saver :: !(IO ())
130 , _nse_saver_immediate :: !(IO ())
131 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
132 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
133 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
134 }
135 deriving (Generic)
136
137 type HasNodeStory env err m = ( CmdM' env err m
138 , MonadReader env m
139 , MonadError err m
140 , HasNodeStoryEnv env
141 , HasConfig env
142 , HasConnectionPool env
143 , HasNodeError err
144 )
145
146 class (HasNodeStoryVar env, HasNodeStorySaver env)
147 => HasNodeStoryEnv env where
148 hasNodeStory :: Getter env NodeStoryEnv
149
150 class HasNodeStoryVar env where
151 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
152
153 class HasNodeStorySaver env where
154 hasNodeStorySaver :: Getter env (IO ())
155
156 class HasNodeStoryImmediateSaver env where
157 hasNodeStoryImmediateSaver :: Getter env (IO ())
158
159 ------------------------------------------------------------------------
160
161 {- | Node Story for each NodeType where the Key of the Map is NodeId
162 TODO : generalize for any NodeType, let's start with NodeList which
163 is implemented already
164 -}
165 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
166 deriving (Generic, Show)
167
168 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
169 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
170 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
171
172 data Archive s p = Archive
173 { _a_version :: !Version
174 , _a_state :: !s
175 , _a_history :: ![p]
176 -- first patch in the list is the most recent
177 -- We use `take` in `commitStatePatch`, that's why.
178
179 -- History is immutable, we just insert things on top of existing
180 -- list.
181
182 -- We don't need to store the whole history in memory, this
183 -- structure holds only recent history, the one that will be
184 -- inserted to the DB.
185 }
186 deriving (Generic, Show)
187
188 instance (Serialise s, Serialise p) => Serialise (Archive s p)
189
190
191 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
192
193 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
194 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
195 instance Serialise NgramsStatePatch'
196 instance FromField (Archive NgramsState' NgramsStatePatch')
197 where
198 fromField = fromJSONField
199 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
200 where
201 defaultFromField = fromPGSFromField
202
203 -- | Combine `NgramsState'`. This is because the structure is (Map
204 -- NgramsType (Map ...)) and the default `(<>)` operator is
205 -- left-biased
206 -- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
207 combineState :: NgramsState' -> NgramsState' -> NgramsState'
208 combineState = Map.unionWith (<>)
209
210 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
211 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
212 , _a_state = s'
213 , _a_history = p' }) =
214 Archive { _a_version = v'
215 , _a_state = s'
216 , _a_history = p' <> p }
217 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
218 mempty = Archive { _a_version = 0
219 , _a_state = mempty
220 , _a_history = [] }
221 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
222 parseJSON = genericParseJSON $ unPrefix "_a_"
223 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
224 toJSON = genericToJSON $ unPrefix "_a_"
225 toEncoding = genericToEncoding $ unPrefix "_a_"
226
227 ------------------------------------------------------------------------
228 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
229 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
230
231 initArchive :: (Monoid s, Semigroup p) => Archive s p
232 initArchive = mempty
233
234 initNodeListStoryMock :: NodeListStory
235 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
236 where
237 nodeListId = 0
238 archive = Archive { _a_version = 0
239 , _a_state = ngramsTableMap
240 , _a_history = [] }
241 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
242 $ Map.fromList
243 [ (n ^. ne_ngrams, ngramsElementToRepo n)
244 | n <- mockTable ^. _NgramsTable
245 ]
246
247 ------------------------------------------------------------------------
248
249
250 ------------------------------------------------------------------------
251 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
252 makeLenses ''NodeStoryEnv
253 makeLenses ''NodeStory
254 makeLenses ''Archive
255
256 -----------------------------------------
257
258
259 data NodeStoryPoly nid v ngtid ngid nre =
260 NodeStoryDB { node_id :: nid
261 , version :: v
262 , ngrams_type_id :: ngtid
263 , ngrams_id :: ngid
264 , ngrams_repo_element :: nre }
265 deriving (Eq)
266
267 data NodeStoryArchivePoly nid a =
268 NodeStoryArchiveDB { a_node_id :: nid
269 , archive :: a }
270 deriving (Eq)
271
272 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
273 $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
274
275 -- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
276 -- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
277
278 -- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
279 -- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
280
281 type ArchiveList = Archive NgramsState' NgramsStatePatch'
282
283 -- DB stuff
284
285 runPGSExecute :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO Int64
286 runPGSExecute c qs a = catch (PGS.execute c qs a) printError
287 where
288 printError (SomeException e) = do
289 --q' <- PGS.formatQuery c qs a
290 _ <- panic $ Text.pack $ show e
291 throw (SomeException e)
292
293 runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
294 runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
295 where
296 printError (SomeException e) = do
297 --q' <- PGS.formatQuery c qs a
298 _ <- panic $ Text.pack $ show e
299 throw (SomeException e)
300
301 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO [r]
302 runPGSQuery c q a = catch (PGS.query c q a) printError
303 where
304 printError (SomeException e) = do
305 q' <- PGS.formatQuery c q a
306 hPutStrLn stderr q'
307 throw (SomeException e)
308
309 runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
310 runPGSAdvisoryLock c id = do
311 _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
312 pure ()
313
314 runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
315 runPGSAdvisoryUnlock c id = do
316 _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
317 pure ()
318
319 runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
320 runPGSAdvisoryXactLock c id = do
321 _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
322 pure ()
323
324 nodeExists :: PGS.Connection -> NodeId -> IO Bool
325 nodeExists c nId = (== [PGS.Only True])
326 <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
327
328 getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
329 getNodesIdWithType c nt = do
330 ns <- runPGSQuery c query (PGS.Only nt)
331 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
332 where
333 query :: PGS.Query
334 query = [sql| SELECT id FROM nodes WHERE typename = ? |]
335
336
337
338 -- nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
339 -- nodeStoryTable =
340 -- Table "node_stories"
341 -- ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
342 -- , version = tableField "version"
343 -- , ngrams_type_id = tableField "ngrams_type_id"
344 -- , ngrams_id = tableField "ngrams_id"
345 -- , ngrams_repo_element = tableField "ngrams_repo_element"
346 -- } )
347
348 -- nodeStoryArchiveTable :: Table NodeStoryArchiveRead NodeStoryArchiveWrite
349 -- nodeStoryArchiveTable =
350 -- Table "node_story_archive_history"
351 -- ( pNodeArchiveStory NodeStoryArchiveDB { a_node_id = tableField "node_id"
352 -- , archive = tableField "archive" } )
353
354 -- nodeStorySelect :: Select NodeStoryRead
355 -- nodeStorySelect = selectTable nodeStoryTable
356
357 -- NOTE "first patch in the _a_history list is the most recent"
358 getNodeArchiveHistory :: PGS.Connection -> NodeId -> IO [NgramsStatePatch']
359 getNodeArchiveHistory c nodeId = do
360 as <- runPGSQuery c query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
361 pure $ (\(ngramsType, terms, patch) -> fst $ PM.singleton ngramsType (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> as
362 where
363 query :: PGS.Query
364 query = [sql| SELECT ngrams_type_id, terms, patch
365 FROM node_story_archive_history
366 JOIN ngrams ON ngrams.id = ngrams_id
367 WHERE node_id = ?
368 ORDER BY (version, node_story_archive_history.id) DESC |]
369
370 ngramsIdQuery :: PGS.Query
371 ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
372
373
374 insertNodeArchiveHistory :: PGS.Connection -> NodeId -> Version -> [NgramsStatePatch'] -> IO ()
375 insertNodeArchiveHistory _ _ _ [] = pure ()
376 insertNodeArchiveHistory c nodeId version (h:hs) = do
377 let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
378 (\(term, p) ->
379 (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
380 tuplesM <- mapM (\(nId, nType, term, patch) -> do
381 ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
382 pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
383 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
384 _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> (catMaybes tuplesM))
385 _ <- insertNodeArchiveHistory c nodeId version hs
386 pure ()
387 where
388
389 query :: PGS.Query
390 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version) VALUES (?, ?, ?, ?, ?) |]
391
392 getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
393 getNodeStory c nId@(NodeId nodeId) = do
394 --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
395 res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
396 -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
397 -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
398 let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
399 Archive { _a_version = version
400 , _a_history = []
401 , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
402 -- NOTE When concatenating, check that the same version is for all states
403 pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
404 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
405 where
406 -- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
407 combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
408 & a_version .~ (a2 ^. a_version) -- version should be updated from list, not taken from the empty Archive
409
410 nodeStoriesQuery :: PGS.Query
411 nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
412 FROM node_stories
413 JOIN ngrams ON ngrams.id = ngrams_id
414 WHERE node_id = ? |]
415
416 type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
417
418 -- Functions to convert archive state (which is a Map NgramsType (Map
419 -- NgramsTerm NgramsRepoElement)) to/from a flat list
420 archiveStateAsList :: NgramsState' -> ArchiveStateList
421 archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
422
423 archiveStateFromList :: ArchiveStateList -> NgramsState'
424 archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
425
426 -- | This function inserts whole new node story and archive for given node_id.
427 insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
428 insertNodeStory c (NodeId nId) a = do
429 _ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
430 termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
431 case headMay termIdM of
432 Nothing -> pure 0
433 Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
434 -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
435
436 pure ()
437 where
438 query :: PGS.Query
439 query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) VALUES (?, ?, ?, ?) |]
440 -- insert ngramsType ngrams ngramsRepoElement =
441 -- Insert { iTable = nodeStoryTable
442 -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
443 -- , version = sqlInt4 _a_version
444 -- , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
445 -- , ngrams_id = ...
446 -- , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
447 -- }]
448 -- , iReturning = rCount
449 -- , iOnConflict = Nothing }
450
451 insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
452 insertArchiveList c nodeId a = do
453 _ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, a ^. a_version, nt, nre, n)) (archiveStateAsList $ a ^. a_state)
454 --_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
455 pure ()
456 where
457 query :: PGS.Query
458 query = [sql| INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
459 SELECT ?, ?, ?, ngrams.id, ? FROM ngrams WHERE terms = ? |]
460
461 deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
462 deleteArchiveList c nodeId a = do
463 _ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) (archiveStateAsList $ a ^. a_state)
464 --_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
465 pure ()
466 where
467 query :: PGS.Query
468 query = [sql| WITH (SELECT id FROM ngrams WHERE terms = ?) AS ngrams
469 DELETE FROM node_stories
470 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
471
472 updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
473 updateArchiveList c nodeId a = do
474 let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
475 --q <- PGS.format c query params
476 --printDebug "[updateArchiveList] query" q
477 _ <- mapM (\p -> runPGSExecute c query p) params
478 pure ()
479 where
480 query :: PGS.Query
481 query = [sql| UPDATE node_stories
482 SET ngrams_repo_element = ?
483 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
484
485 -- | This function updates the node story and archive for given node_id.
486 updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
487 updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
488 -- STEPS
489
490 -- 0. We assume we're inside an advisory lock
491
492 -- 1. Find differences (inserts/updates/deletes)
493 let currentList = archiveStateAsList $ currentArchive ^. a_state
494 let newList = archiveStateAsList $ newArchive ^. a_state
495 let currentSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> currentList
496 let newSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> newList
497
498 let inserts = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference newSet currentSet) newList
499 --printDebug "[updateNodeStory] inserts" inserts
500 let deletes = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference currentSet newSet) currentList
501 --printDebug "[updateNodeStory] deletes" deletes
502
503 -- updates are the things that are in new but not in current
504 let updates = Set.toList $ Set.difference (Set.fromList newList) (Set.fromList currentList)
505 --printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
506
507 -- 2. Perform inserts/deletes/updates
508 printDebug "[updateNodeStory] applying insert" ()
509 insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
510 , _a_history = []
511 , _a_state = archiveStateFromList inserts }
512 printDebug "[updateNodeStory] insert applied" ()
513 --TODO Use currentArchive ^. a_version in delete and report error
514 -- if entries with (node_id, ngrams_type_id, ngrams_id) but
515 -- different version are found.
516 deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
517 , _a_history = []
518 , _a_state = archiveStateFromList deletes }
519 printDebug "[updateNodeStory] delete applied" ()
520 updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
521 , _a_history = []
522 , _a_state = archiveStateFromList updates }
523 printDebug "[updateNodeStory] update applied" ()
524
525 pure ()
526 -- where
527 -- update = Update { uTable = nodeStoryTable
528 -- , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
529 -- NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
530 -- , ..}
531 -- , .. })
532 -- , uWhere = (\row -> node_id row .== sqlInt4 nId)
533 -- , uReturning = rCount }
534
535 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
536 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
537 -- where
538 -- delete = Delete { dTable = nodeStoryTable
539 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
540 -- , dReturning = rCount }
541
542 upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
543 upsertNodeStories c nodeId@(NodeId nId) newArchive = do
544 printDebug "[upsertNodeStories] START nId" nId
545 PGS.withTransaction c $ do
546 printDebug "[upsertNodeStories] locking nId" nId
547 runPGSAdvisoryXactLock c nId
548
549 -- whether it's insert or update, we can insert node archive history already
550 -- NOTE: It is assumed that the most recent change is the first in the
551 -- list, so we save these in reverse order
552 insertNodeArchiveHistory c nodeId (newArchive ^. a_version) $ reverse $ newArchive ^. a_history
553
554 (NodeStory m) <- getNodeStory c nodeId
555 case Map.lookup nodeId m of
556 Nothing -> do
557 _ <- insertNodeStory c nodeId newArchive
558 pure ()
559 Just currentArchive -> do
560 _ <- updateNodeStory c nodeId currentArchive newArchive
561 pure ()
562
563 printDebug "[upsertNodeStories] STOP nId" nId
564
565 writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
566 writeNodeStories c (NodeStory nls) = do
567 _ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
568 pure ()
569
570 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
571 nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
572 nodeStoryInc c Nothing nId = getNodeStory c nId
573 nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
574 case Map.lookup nId nls of
575 Nothing -> do
576 (NodeStory nls') <- getNodeStory c nId
577 pure $ NodeStory $ Map.union nls nls'
578 Just _ -> pure ns
579
580 nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
581 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
582 nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
583 nodeStoryIncs c Nothing (ni:ns) = do
584 m <- getNodeStory c ni
585 nodeStoryIncs c (Just m) ns
586
587 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
588 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
589 -- case Map.lookup ni nls of
590 -- Nothing -> do
591 -- _ <- nodeStoryRemove pool ni
592 -- pure ns
593 -- Just _ -> do
594 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
595 -- _ <- nodeStoryRemove pool ni
596 -- pure $ NodeStory ns'
597 ------------------------------------
598
599 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
600 readNodeStoryEnv pool = do
601 mvar <- nodeStoryVar pool Nothing []
602 saver <- mkNodeStorySaver pool mvar
603 let saver_immediate = modifyMVar_ mvar $ \ns -> do
604 withResource pool $ \c -> do
605 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
606 writeNodeStories c ns
607 pure $ clearHistory ns
608 -- let saver = modifyMVar_ mvar $ \mv -> do
609 -- writeNodeStories pool mv
610 -- printDebug "[readNodeStoryEnv] saver" mv
611 -- let mv' = clearHistory mv
612 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
613 -- return mv'
614 pure $ NodeStoryEnv { _nse_var = mvar
615 , _nse_saver = saver
616 , _nse_saver_immediate = saver_immediate
617 , _nse_getter = nodeStoryVar pool (Just mvar) }
618
619 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
620 nodeStoryVar pool Nothing nIds = do
621 state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
622 newMVar state
623 nodeStoryVar pool (Just mv) nIds = do
624 _ <- withResource pool $ \c -> modifyMVar_ mv $ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
625 pure mv
626
627 -- Debounce is useful since it could delay the saving to some later
628 -- time, asynchronously and we keep operating on memory only.
629 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
630 mkNodeStorySaver pool mvns = mkDebounce settings
631 where
632 settings = defaultDebounceSettings
633 { debounceAction = do
634 -- NOTE: Lock MVar first, then use resource pool.
635 -- Otherwise we could wait for MVar, while
636 -- blocking the pool connection.
637 modifyMVar_ mvns $ \ns -> do
638 withResource pool $ \c -> do
639 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
640 writeNodeStories c ns
641 pure $ clearHistory ns
642 --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
643 , debounceFreq = 1*minute
644 }
645 minute = 60*second
646 second = 10^(6 :: Int)
647
648 clearHistory :: NodeListStory -> NodeListStory
649 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
650 where
651 emptyHistory = [] :: [NgramsStatePatch']
652
653 currentVersion :: (HasNodeStory env err m) => ListId -> m Version
654 currentVersion listId = do
655 pool <- view connPool
656 nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
657 pure $ nls ^. unNodeStory . at listId . _Just . a_version
658
659
660 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
661 -- mkNodeStorySaver mvns = mkDebounce settings
662 -- where
663 -- settings = defaultDebounceSettings
664 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
665 -- , debounceFreq = 1 * minute
666 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
667 -- }
668 -- minute = 60 * second
669 -- second = 10^(6 :: Int)
670
671
672 -----------------------------------------