]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
[nodeStory] add immediate saver
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 Couple of words on how this is implemented.
14
15 First version used files which stored Archive for each NodeId in a
16 separate .cbor file.
17
18 For performance reasons, it is rewritten to use the DB.
19
20 The table `node_stories` contains two columns: `node_id` and
21 `archive`.
22
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
28
29 Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
34
35 Please note that
36
37 TODO:
38 - remove
39 - filter
40 - charger les listes
41 -}
42
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
48
49 module Gargantext.Core.NodeStory
50 ( HasNodeStory
51 , HasNodeStoryEnv
52 , hasNodeStory
53 , HasNodeStoryVar
54 , hasNodeStoryVar
55 , HasNodeStorySaver
56 , hasNodeStorySaver
57 , HasNodeStoryImmediateSaver
58 , hasNodeStoryImmediateSaver
59 , NodeStory(..)
60 , NgramsStatePatch'
61 , NodeListStory
62 , initNodeListStoryMock
63 , NodeStoryEnv(..)
64 , initNodeStory
65 , nse_getter
66 , nse_saver
67 , nse_saver_immediate
68 , nse_var
69 , unNodeStory
70 , getNodeArchiveHistory
71 , Archive(..)
72 , initArchive
73 , insertArchiveList
74 , deleteArchiveList
75 , updateArchiveList
76 , a_history
77 , a_state
78 , a_version
79 , nodeExists
80 , runPGSQuery
81 , runPGSAdvisoryLock
82 , runPGSAdvisoryUnlock
83 , runPGSAdvisoryXactLock
84 , getNodesIdWithType
85 , readNodeStoryEnv
86 , upsertNodeStories
87 , getNodeStory
88 , nodeStoriesQuery
89 , currentVersion )
90 where
91
92 -- import Debug.Trace (traceShow)
93 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
94 import Codec.Serialise.Class
95 import Control.Concurrent (MVar(), newMVar, modifyMVar_)
96 import Control.Exception (catch, throw, SomeException(..))
97 import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
98 import Control.Monad.Except
99 import Control.Monad.Reader
100 import Data.Aeson hiding ((.=), decode)
101 import Data.ByteString.Char8 (hPutStrLn)
102 import Data.Map.Strict (Map)
103 import Data.Maybe (catMaybes)
104 import Data.Monoid
105 import Data.Pool (Pool, withResource)
106 import Data.Semigroup
107 import Database.PostgreSQL.Simple.SqlQQ (sql)
108 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
109 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
110 import GHC.Generics (Generic)
111 import Gargantext.API.Ngrams.Types
112 import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
113 import Gargantext.Core.Utils.Prefix (unPrefix)
114 import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
115 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
116 import Gargantext.Prelude
117 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
118 import System.IO (stderr)
119 import qualified Data.Map.Strict as Map
120 import qualified Data.Map.Strict.Patch as PM
121 import qualified Data.Set as Set
122 import qualified Database.PostgreSQL.Simple as PGS
123 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
124
125 ------------------------------------------------------------------------
126 data NodeStoryEnv = NodeStoryEnv
127 { _nse_var :: !(MVar NodeListStory)
128 , _nse_saver :: !(IO ())
129 , _nse_saver_immediate :: !(IO ())
130 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
131 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
132 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
133 }
134 deriving (Generic)
135
136 type HasNodeStory env err m = ( CmdM' env err m
137 , MonadReader env m
138 , MonadError err m
139 , HasNodeStoryEnv env
140 , HasConfig env
141 , HasConnectionPool env
142 , HasNodeError err
143 )
144
145 class (HasNodeStoryVar env, HasNodeStorySaver env)
146 => HasNodeStoryEnv env where
147 hasNodeStory :: Getter env NodeStoryEnv
148
149 class HasNodeStoryVar env where
150 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
151
152 class HasNodeStorySaver env where
153 hasNodeStorySaver :: Getter env (IO ())
154
155 class HasNodeStoryImmediateSaver env where
156 hasNodeStoryImmediateSaver :: Getter env (IO ())
157
158 ------------------------------------------------------------------------
159
160 {- | Node Story for each NodeType where the Key of the Map is NodeId
161 TODO : generalize for any NodeType, let's start with NodeList which
162 is implemented already
163 -}
164 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
165 deriving (Generic, Show)
166
167 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
168 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
169 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
170
171 data Archive s p = Archive
172 { _a_version :: !Version
173 , _a_state :: !s
174 , _a_history :: ![p]
175 -- first patch in the list is the most recent
176 -- We use `take` in `commitStatePatch`, that's why.
177
178 -- History is immutable, we just insert things on top of existing
179 -- list.
180
181 -- We don't need to store the whole history in memory, this
182 -- structure holds only recent history, the one that will be
183 -- inserted to the DB.
184 }
185 deriving (Generic, Show)
186
187 instance (Serialise s, Serialise p) => Serialise (Archive s p)
188
189
190 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
191
192 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
193 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
194 instance Serialise NgramsStatePatch'
195 instance FromField (Archive NgramsState' NgramsStatePatch')
196 where
197 fromField = fromJSONField
198 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
199 where
200 defaultFromField = fromPGSFromField
201
202 -- | Combine `NgramsState'`. This is because the structure is (Map
203 -- NgramsType (Map ...)) and the default `(<>)` operator is
204 -- left-biased
205 -- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
206 combineState :: NgramsState' -> NgramsState' -> NgramsState'
207 combineState = Map.unionWith (<>)
208
209 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
210 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
211 , _a_state = s'
212 , _a_history = p' }) =
213 Archive { _a_version = v'
214 , _a_state = s'
215 , _a_history = p' <> p }
216 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
217 mempty = Archive { _a_version = 0
218 , _a_state = mempty
219 , _a_history = [] }
220 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
221 parseJSON = genericParseJSON $ unPrefix "_a_"
222 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
223 toJSON = genericToJSON $ unPrefix "_a_"
224 toEncoding = genericToEncoding $ unPrefix "_a_"
225
226 ------------------------------------------------------------------------
227 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
228 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
229
230 initArchive :: (Monoid s, Semigroup p) => Archive s p
231 initArchive = mempty
232
233 initNodeListStoryMock :: NodeListStory
234 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
235 where
236 nodeListId = 0
237 archive = Archive { _a_version = 0
238 , _a_state = ngramsTableMap
239 , _a_history = [] }
240 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
241 $ Map.fromList
242 [ (n ^. ne_ngrams, ngramsElementToRepo n)
243 | n <- mockTable ^. _NgramsTable
244 ]
245
246 ------------------------------------------------------------------------
247
248
249 ------------------------------------------------------------------------
250 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
251 makeLenses ''NodeStoryEnv
252 makeLenses ''NodeStory
253 makeLenses ''Archive
254
255 -----------------------------------------
256
257
258 data NodeStoryPoly nid v ngtid ngid nre =
259 NodeStoryDB { node_id :: nid
260 , version :: v
261 , ngrams_type_id :: ngtid
262 , ngrams_id :: ngid
263 , ngrams_repo_element :: nre }
264 deriving (Eq)
265
266 data NodeStoryArchivePoly nid a =
267 NodeStoryArchiveDB { a_node_id :: nid
268 , archive :: a }
269 deriving (Eq)
270
271 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
272 $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
273
274 -- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
275 -- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
276
277 -- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
278 -- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
279
280 type ArchiveList = Archive NgramsState' NgramsStatePatch'
281
282 -- DB stuff
283
284 runPGSExecute :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO Int64
285 runPGSExecute c qs a = catch (PGS.execute c qs a) printError
286 where
287 printError (SomeException e) = do
288 --q' <- PGS.formatQuery c qs a
289 --hPutStrLn stderr q'
290 throw (SomeException e)
291
292 runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
293 runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
294 where
295 printError (SomeException e) = do
296 --q' <- PGS.formatQuery c qs a
297 --hPutStrLn stderr q'
298 throw (SomeException e)
299
300 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO [r]
301 runPGSQuery c q a = catch (PGS.query c q a) printError
302 where
303 printError (SomeException e) = do
304 q' <- PGS.formatQuery c q a
305 hPutStrLn stderr q'
306 throw (SomeException e)
307
308 runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
309 runPGSAdvisoryLock c id = do
310 _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
311 pure ()
312
313 runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
314 runPGSAdvisoryUnlock c id = do
315 _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
316 pure ()
317
318 runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
319 runPGSAdvisoryXactLock c id = do
320 _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
321 pure ()
322
323 nodeExists :: PGS.Connection -> NodeId -> IO Bool
324 nodeExists c nId = (== [PGS.Only True])
325 <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
326
327 getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
328 getNodesIdWithType c nt = do
329 ns <- runPGSQuery c query (PGS.Only nt)
330 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
331 where
332 query :: PGS.Query
333 query = [sql| SELECT id FROM nodes WHERE typename = ? |]
334
335
336
337 -- nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
338 -- nodeStoryTable =
339 -- Table "node_stories"
340 -- ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
341 -- , version = tableField "version"
342 -- , ngrams_type_id = tableField "ngrams_type_id"
343 -- , ngrams_id = tableField "ngrams_id"
344 -- , ngrams_repo_element = tableField "ngrams_repo_element"
345 -- } )
346
347 -- nodeStoryArchiveTable :: Table NodeStoryArchiveRead NodeStoryArchiveWrite
348 -- nodeStoryArchiveTable =
349 -- Table "node_story_archive_history"
350 -- ( pNodeArchiveStory NodeStoryArchiveDB { a_node_id = tableField "node_id"
351 -- , archive = tableField "archive" } )
352
353 -- nodeStorySelect :: Select NodeStoryRead
354 -- nodeStorySelect = selectTable nodeStoryTable
355
356 -- NOTE "first patch in the _a_history list is the most recent"
357 getNodeArchiveHistory :: PGS.Connection -> NodeId -> IO [NgramsStatePatch']
358 getNodeArchiveHistory c nodeId = do
359 as <- runPGSQuery c query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
360 pure $ (\(ngramsType, terms, patch) -> fst $ PM.singleton ngramsType (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> as
361 where
362 query :: PGS.Query
363 query = [sql| SELECT ngrams_type_id, terms, patch
364 FROM node_story_archive_history
365 JOIN ngrams ON ngrams.id = ngrams_id
366 WHERE node_id = ?
367 ORDER BY version DESC |]
368
369 ngramsIdQuery :: PGS.Query
370 ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
371
372
373 insertNodeArchiveHistory :: PGS.Connection -> NodeId -> Version -> [NgramsStatePatch'] -> IO ()
374 insertNodeArchiveHistory _ _ _ [] = pure ()
375 insertNodeArchiveHistory c nodeId version (h:hs) = do
376 let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
377 (\(term, p) ->
378 (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
379 tuplesM <- mapM (\(nId, nType, term, patch) -> do
380 ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
381 pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
382 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
383 _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> (catMaybes tuplesM))
384 _ <- insertNodeArchiveHistory c nodeId version hs
385 pure ()
386 where
387
388 query :: PGS.Query
389 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version) VALUES (?, ?, ?, ?, ?) |]
390
391 getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
392 getNodeStory c nId@(NodeId nodeId) = do
393 --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
394 res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
395 -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
396 -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
397 let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
398 Archive { _a_version = version
399 , _a_history = []
400 , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
401 -- NOTE When concatenating, check that the same version is for all states
402 pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
403 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
404 where
405 -- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
406 combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
407 & a_version .~ (a2 ^. a_version) -- version should be updated from list, not taken from the empty Archive
408
409 nodeStoriesQuery :: PGS.Query
410 nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
411 FROM node_stories
412 JOIN ngrams ON ngrams.id = ngrams_id
413 WHERE node_id = ? |]
414
415 type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
416
417 -- Functions to convert archive state (which is a Map NgramsType (Map
418 -- NgramsTerm NgramsRepoElement)) to/from a flat list
419 archiveStateAsList :: NgramsState' -> ArchiveStateList
420 archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
421
422 archiveStateFromList :: ArchiveStateList -> NgramsState'
423 archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
424
425 -- | This function inserts whole new node story and archive for given node_id.
426 insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
427 insertNodeStory c (NodeId nId) a = do
428 _ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
429 termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
430 case headMay termIdM of
431 Nothing -> pure 0
432 Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
433 -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
434
435 pure ()
436 where
437 query :: PGS.Query
438 query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) VALUES (?, ?, ?, ?) |]
439 -- insert ngramsType ngrams ngramsRepoElement =
440 -- Insert { iTable = nodeStoryTable
441 -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
442 -- , version = sqlInt4 _a_version
443 -- , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
444 -- , ngrams_id = ...
445 -- , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
446 -- }]
447 -- , iReturning = rCount
448 -- , iOnConflict = Nothing }
449
450 insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
451 insertArchiveList c nodeId a = do
452 _ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, a ^. a_version, nt, nre, n)) (archiveStateAsList $ a ^. a_state)
453 --_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
454 pure ()
455 where
456 query :: PGS.Query
457 query = [sql| INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
458 SELECT ?, ?, ?, ngrams.id, ? FROM ngrams WHERE terms = ? |]
459
460 deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
461 deleteArchiveList c nodeId a = do
462 _ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) (archiveStateAsList $ a ^. a_state)
463 --_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
464 pure ()
465 where
466 query :: PGS.Query
467 query = [sql| WITH (SELECT id FROM ngrams WHERE terms = ?) AS ngrams
468 DELETE FROM node_stories
469 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
470
471 updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
472 updateArchiveList c nodeId a = do
473 let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
474 --q <- PGS.format c query params
475 --printDebug "[updateArchiveList] query" q
476 _ <- mapM (\p -> runPGSExecute c query p) params
477 pure ()
478 where
479 query :: PGS.Query
480 query = [sql| UPDATE node_stories
481 SET ngrams_repo_element = ?
482 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
483
484 -- | This function updates the node story and archive for given node_id.
485 updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
486 updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
487 -- STEPS
488
489 -- 0. We assume we're inside an advisory lock
490
491 -- 1. Find differences (inserts/updates/deletes)
492 let currentList = archiveStateAsList $ currentArchive ^. a_state
493 let newList = archiveStateAsList $ newArchive ^. a_state
494 let currentSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> currentList
495 let newSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> newList
496
497 let inserts = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference newSet currentSet) newList
498 --printDebug "[updateNodeStory] inserts" inserts
499 let deletes = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference currentSet newSet) currentList
500 --printDebug "[updateNodeStory] deletes" deletes
501
502 -- updates are the things that are in new but not in current
503 let updates = Set.toList $ Set.difference (Set.fromList newList) (Set.fromList currentList)
504 --printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
505
506 -- 2. Perform inserts/deletes/updates
507 printDebug "[updateNodeStory] applying insert" ()
508 insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
509 , _a_history = []
510 , _a_state = archiveStateFromList inserts }
511 printDebug "[updateNodeStory] insert applied" ()
512 --TODO Use currentArchive ^. a_version in delete and report error
513 -- if entries with (node_id, ngrams_type_id, ngrams_id) but
514 -- different version are found.
515 deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
516 , _a_history = []
517 , _a_state = archiveStateFromList deletes }
518 printDebug "[updateNodeStory] delete applied" ()
519 updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
520 , _a_history = []
521 , _a_state = archiveStateFromList updates }
522 printDebug "[updateNodeStory] update applied" ()
523
524 pure ()
525 -- where
526 -- update = Update { uTable = nodeStoryTable
527 -- , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
528 -- NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
529 -- , ..}
530 -- , .. })
531 -- , uWhere = (\row -> node_id row .== sqlInt4 nId)
532 -- , uReturning = rCount }
533
534 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
535 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
536 -- where
537 -- delete = Delete { dTable = nodeStoryTable
538 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
539 -- , dReturning = rCount }
540
541 upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
542 upsertNodeStories c nodeId@(NodeId nId) newArchive = do
543 printDebug "[upsertNodeStories] START nId" nId
544 PGS.withTransaction c $ do
545 printDebug "[upsertNodeStories] locking nId" nId
546 runPGSAdvisoryXactLock c nId
547
548 -- whether it's insert or update, we can insert node archive history already
549 -- NOTE: It is assumed that the most recent change is the first in the
550 -- list, so we save these in reverse order
551 insertNodeArchiveHistory c nodeId (newArchive ^. a_version) $ reverse $ newArchive ^. a_history
552
553 (NodeStory m) <- getNodeStory c nodeId
554 case Map.lookup nodeId m of
555 Nothing -> do
556 _ <- insertNodeStory c nodeId newArchive
557 pure ()
558 Just currentArchive -> do
559 _ <- updateNodeStory c nodeId currentArchive newArchive
560 pure ()
561
562 printDebug "[upsertNodeStories] STOP nId" nId
563
564 writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
565 writeNodeStories c (NodeStory nls) = do
566 _ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
567 pure ()
568
569 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
570 nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
571 nodeStoryInc c Nothing nId = getNodeStory c nId
572 nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
573 case Map.lookup nId nls of
574 Nothing -> do
575 (NodeStory nls') <- getNodeStory c nId
576 pure $ NodeStory $ Map.union nls nls'
577 Just _ -> pure ns
578
579 nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
580 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
581 nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
582 nodeStoryIncs c Nothing (ni:ns) = do
583 m <- getNodeStory c ni
584 nodeStoryIncs c (Just m) ns
585
586 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
587 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
588 -- case Map.lookup ni nls of
589 -- Nothing -> do
590 -- _ <- nodeStoryRemove pool ni
591 -- pure ns
592 -- Just _ -> do
593 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
594 -- _ <- nodeStoryRemove pool ni
595 -- pure $ NodeStory ns'
596 ------------------------------------
597
598 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
599 readNodeStoryEnv pool = do
600 mvar <- nodeStoryVar pool Nothing []
601 saver <- mkNodeStorySaver pool mvar
602 let saver_immediate = modifyMVar_ mvar $ \ns -> do
603 withResource pool $ \c -> do
604 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
605 writeNodeStories c ns
606 pure $ clearHistory ns
607 -- let saver = modifyMVar_ mvar $ \mv -> do
608 -- writeNodeStories pool mv
609 -- printDebug "[readNodeStoryEnv] saver" mv
610 -- let mv' = clearHistory mv
611 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
612 -- return mv'
613 pure $ NodeStoryEnv { _nse_var = mvar
614 , _nse_saver = saver
615 , _nse_saver_immediate = saver_immediate
616 , _nse_getter = nodeStoryVar pool (Just mvar) }
617
618 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
619 nodeStoryVar pool Nothing nIds = do
620 state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
621 newMVar state
622 nodeStoryVar pool (Just mv) nIds = do
623 _ <- withResource pool $ \c -> modifyMVar_ mv $ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
624 pure mv
625
626 -- Debounce is useful since it could delay the saving to some later
627 -- time, asynchronously and we keep operating on memory only.
628 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
629 mkNodeStorySaver pool mvns = mkDebounce settings
630 where
631 settings = defaultDebounceSettings
632 { debounceAction = do
633 -- NOTE: Lock MVar first, then use resource pool.
634 -- Otherwise we could wait for MVar, while
635 -- blocking the pool connection.
636 modifyMVar_ mvns $ \ns -> do
637 withResource pool $ \c -> do
638 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
639 writeNodeStories c ns
640 pure $ clearHistory ns
641 --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
642 , debounceFreq = 1*minute
643 }
644 minute = 60*second
645 second = 10^(6 :: Int)
646
647 clearHistory :: NodeListStory -> NodeListStory
648 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
649 where
650 emptyHistory = [] :: [NgramsStatePatch']
651
652 currentVersion :: (HasNodeStory env err m) => ListId -> m Version
653 currentVersion listId = do
654 pool <- view connPool
655 nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
656 pure $ nls ^. unNodeStory . at listId . _Just . a_version
657
658
659 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
660 -- mkNodeStorySaver mvns = mkDebounce settings
661 -- where
662 -- settings = defaultDebounceSettings
663 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
664 -- , debounceFreq = 1 * minute
665 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
666 -- }
667 -- minute = 60 * second
668 -- second = 10^(6 :: Int)
669
670
671 -----------------------------------------