]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
[nodeStory] insert/delete/update seems to work, needs more verification
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 Couple of words on how this is implemented.
14
15 First version used files which stored Archive for each NodeId in a
16 separate .cbor file.
17
18 For performance reasons, it is rewritten to use the DB.
19
20 The table `node_stories` contains two columns: `node_id` and
21 `archive`.
22
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
28
29 Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
34
35 Please note that
36
37 TODO:
38 - remove
39 - filter
40 - charger les listes
41 -}
42
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
48
49 module Gargantext.Core.NodeStory
50 ( HasNodeStory
51 , HasNodeStoryEnv
52 , hasNodeStory
53 , HasNodeStoryVar
54 , hasNodeStoryVar
55 , HasNodeStorySaver
56 , hasNodeStorySaver
57 , NodeStory(..)
58 , NgramsStatePatch'
59 , NodeListStory
60 , initNodeListStoryMock
61 , NodeStoryEnv(..)
62 , initNodeStory
63 , nse_getter
64 , nse_saver
65 , nse_var
66 , unNodeStory
67 , getNodeArchiveHistory
68 , Archive(..)
69 , initArchive
70 , insertArchiveList
71 , deleteArchiveList
72 , updateArchiveList
73 , a_history
74 , a_state
75 , a_version
76 , nodeExists
77 , runPGSQuery
78 , runPGSAdvisoryLock
79 , runPGSAdvisoryUnlock
80 , runPGSAdvisoryXactLock
81 , getNodesIdWithType
82 , readNodeStoryEnv
83 , upsertNodeStories
84 , getNodeStory
85 , nodeStoriesQuery
86 , currentVersion )
87 where
88
89 -- import Debug.Trace (traceShow)
90 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
91 import Codec.Serialise.Class
92 import Control.Concurrent (MVar(), newMVar, modifyMVar_)
93 import Control.Exception (catch, throw, SomeException(..))
94 import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
95 import Control.Monad.Except
96 import Control.Monad.Reader
97 import Data.Aeson hiding ((.=), decode)
98 import Data.ByteString.Char8 (hPutStrLn)
99 import Data.Map.Strict (Map)
100 import Data.Maybe (catMaybes)
101 import Data.Monoid
102 import Data.Pool (Pool, withResource)
103 import Data.Semigroup
104 import Database.PostgreSQL.Simple.SqlQQ (sql)
105 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
106 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
107 import GHC.Generics (Generic)
108 import Gargantext.API.Ngrams.Types
109 import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
110 import Gargantext.Core.Utils.Prefix (unPrefix)
111 import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
112 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
113 import Gargantext.Prelude
114 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
115 import System.IO (stderr)
116 import qualified Data.Map.Strict as Map
117 import qualified Data.Map.Strict.Patch as PM
118 import qualified Data.Set as Set
119 import qualified Data.Text as Text
120 import qualified Database.PostgreSQL.Simple as PGS
121 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
122
123 ------------------------------------------------------------------------
124 data NodeStoryEnv = NodeStoryEnv
125 { _nse_var :: !(MVar NodeListStory)
126 , _nse_saver :: !(IO ())
127 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
128 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
129 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
130 }
131 deriving (Generic)
132
133 type HasNodeStory env err m = ( CmdM' env err m
134 , MonadReader env m
135 , MonadError err m
136 , HasNodeStoryEnv env
137 , HasConfig env
138 , HasConnectionPool env
139 , HasNodeError err
140 )
141
142 class (HasNodeStoryVar env, HasNodeStorySaver env)
143 => HasNodeStoryEnv env where
144 hasNodeStory :: Getter env NodeStoryEnv
145
146 class HasNodeStoryVar env where
147 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
148
149 class HasNodeStorySaver env where
150 hasNodeStorySaver :: Getter env (IO ())
151
152 ------------------------------------------------------------------------
153
154 {- | Node Story for each NodeType where the Key of the Map is NodeId
155 TODO : generalize for any NodeType, let's start with NodeList which
156 is implemented already
157 -}
158 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
159 deriving (Generic, Show)
160
161 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
162 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
163 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
164
165 data Archive s p = Archive
166 { _a_version :: !Version
167 , _a_state :: !s
168 , _a_history :: ![p]
169 -- first patch in the list is the most recent
170 -- We use `take` in `commitStatePatch`, that's why.
171
172 -- History is immutable, we just insert things on top of existing
173 -- list.
174
175 -- We don't need to store the whole history in memory, this
176 -- structure holds only recent history, the one that will be
177 -- inserted to the DB.
178 }
179 deriving (Generic, Show)
180
181 instance (Serialise s, Serialise p) => Serialise (Archive s p)
182
183
184 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
185
186 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
187 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
188 instance Serialise NgramsStatePatch'
189 instance FromField (Archive NgramsState' NgramsStatePatch')
190 where
191 fromField = fromJSONField
192 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
193 where
194 defaultFromField = fromPGSFromField
195
196 -- | Combine `NgramsState'`. This is because the structure is (Map
197 -- NgramsType (Map ...)) and the default `(<>)` operator is
198 -- left-biased
199 -- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
200 combineState :: NgramsState' -> NgramsState' -> NgramsState'
201 combineState = Map.unionWith (<>)
202
203 -- TODO Semigroup instance for unions
204 -- TODO check this
205 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
206 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
207 , _a_state = s'
208 , _a_history = p' }) =
209 Archive { _a_version = v'
210 , _a_state = s'
211 , _a_history = p' <> p }
212
213 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
214 mempty = Archive { _a_version = 0
215 , _a_state = mempty
216 , _a_history = [] }
217
218 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
219 parseJSON = genericParseJSON $ unPrefix "_a_"
220
221 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
222 toJSON = genericToJSON $ unPrefix "_a_"
223 toEncoding = genericToEncoding $ unPrefix "_a_"
224
225 ------------------------------------------------------------------------
226 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
227 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
228
229 initArchive :: (Monoid s, Semigroup p) => Archive s p
230 initArchive = mempty
231
232 initNodeListStoryMock :: NodeListStory
233 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
234 where
235 nodeListId = 0
236 archive = Archive { _a_version = 0
237 , _a_state = ngramsTableMap
238 , _a_history = [] }
239 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
240 $ Map.fromList
241 [ (n ^. ne_ngrams, ngramsElementToRepo n)
242 | n <- mockTable ^. _NgramsTable
243 ]
244
245 ------------------------------------------------------------------------
246
247
248 ------------------------------------------------------------------------
249 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
250 makeLenses ''NodeStoryEnv
251 makeLenses ''NodeStory
252 makeLenses ''Archive
253
254 -----------------------------------------
255
256
257 data NodeStoryPoly nid v ngtid ngid nre =
258 NodeStoryDB { node_id :: nid
259 , version :: v
260 , ngrams_type_id :: ngtid
261 , ngrams_id :: ngid
262 , ngrams_repo_element :: nre }
263 deriving (Eq)
264
265 data NodeStoryArchivePoly nid a =
266 NodeStoryArchiveDB { a_node_id :: nid
267 , archive :: a }
268 deriving (Eq)
269
270 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
271 $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
272
273 -- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
274 -- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
275
276 -- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
277 -- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
278
279 type ArchiveList = Archive NgramsState' NgramsStatePatch'
280
281 -- DB stuff
282
283 runPGSExecute :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO Int64
284 runPGSExecute c qs a = catch (PGS.execute c qs a) printError
285 where
286 printError (SomeException e) = do
287 --q' <- PGS.formatQuery c qs a
288 --hPutStrLn stderr q'
289 throw (SomeException e)
290
291 runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
292 runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
293 where
294 printError (SomeException e) = do
295 --q' <- PGS.formatQuery c qs a
296 --hPutStrLn stderr q'
297 throw (SomeException e)
298
299 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO [r]
300 runPGSQuery c q a = catch (PGS.query c q a) printError
301 where
302 printError (SomeException e) = do
303 q' <- PGS.formatQuery c q a
304 hPutStrLn stderr q'
305 throw (SomeException e)
306
307 runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
308 runPGSAdvisoryLock c id = do
309 _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
310 pure ()
311
312 runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
313 runPGSAdvisoryUnlock c id = do
314 _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
315 pure ()
316
317 runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
318 runPGSAdvisoryXactLock c id = do
319 _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
320 pure ()
321
322 nodeExists :: PGS.Connection -> NodeId -> IO Bool
323 nodeExists c nId = (== [PGS.Only True])
324 <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
325
326 getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
327 getNodesIdWithType c nt = do
328 ns <- runPGSQuery c query (PGS.Only nt)
329 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
330 where
331 query :: PGS.Query
332 query = [sql| SELECT id FROM nodes WHERE typename = ? |]
333
334
335
336 -- nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
337 -- nodeStoryTable =
338 -- Table "node_stories"
339 -- ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
340 -- , version = tableField "version"
341 -- , ngrams_type_id = tableField "ngrams_type_id"
342 -- , ngrams_id = tableField "ngrams_id"
343 -- , ngrams_repo_element = tableField "ngrams_repo_element"
344 -- } )
345
346 -- nodeStoryArchiveTable :: Table NodeStoryArchiveRead NodeStoryArchiveWrite
347 -- nodeStoryArchiveTable =
348 -- Table "node_story_archive_history"
349 -- ( pNodeArchiveStory NodeStoryArchiveDB { a_node_id = tableField "node_id"
350 -- , archive = tableField "archive" } )
351
352 -- nodeStorySelect :: Select NodeStoryRead
353 -- nodeStorySelect = selectTable nodeStoryTable
354
355 -- TODO Check ordering, "first patch in the _a_history list is the most recent"
356 getNodeArchiveHistory :: PGS.Connection -> NodeId -> IO [NgramsStatePatch']
357 getNodeArchiveHistory c nodeId = do
358 as <- runPGSQuery c query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
359 pure $ (\(ngramsType, terms, patch) -> fst $ PM.singleton ngramsType (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> as
360 where
361 query :: PGS.Query
362 query = [sql| SELECT ngrams_type_id, terms, patch
363 FROM node_story_archive_history
364 JOIN ngrams ON ngrams.id = ngrams_id
365 WHERE node_id = ? |]
366
367 ngramsIdQuery :: PGS.Query
368 ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
369
370
371 insertNodeArchiveHistory :: PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
372 insertNodeArchiveHistory _ _ [] = pure ()
373 insertNodeArchiveHistory c nodeId (h:hs) = do
374 let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
375 (\(term, p) ->
376 (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
377 tuplesM <- mapM (\(nId, nType, term, patch) -> do
378 ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
379 pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
380 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
381 _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch)) <$> (catMaybes tuplesM))
382 _ <- insertNodeArchiveHistory c nodeId hs
383 pure ()
384 where
385
386 query :: PGS.Query
387 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch) VALUES (?, ?, ?, ?) |]
388
389 getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
390 getNodeStory c nId@(NodeId nodeId) = do
391 --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
392 res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
393 -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
394 -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
395 let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
396 Archive { _a_version = version
397 , _a_history = []
398 , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
399 -- TODO (<>) for Archive doesn't concatenate states!
400 -- NOTE When concatenating, check that the same version is for all states
401 pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
402 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
403 where
404 -- query :: Select NodeStoryRead
405 -- query = proc () -> do
406 -- row@(NodeStoryDB node_id _) <- nodeStorySelect -< ()
407 -- restrict -< node_id .== sqlInt4 nodeId
408 -- returnA -< row
409 combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
410 & a_version .~ (a2 ^. a_version) -- version should be updated from list, not taken from the empty Archive
411
412 nodeStoriesQuery :: PGS.Query
413 nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
414 FROM node_stories
415 JOIN ngrams ON ngrams.id = ngrams_id
416 WHERE node_id = ? |]
417
418 type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
419
420 -- Functions to convert archive state (which is a Map NgramsType (Map
421 -- NgramsTerm NgramsRepoElement)) to/from a flat list
422 archiveStateAsList :: NgramsState' -> ArchiveStateList
423 archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
424
425 archiveStateFromList :: ArchiveStateList -> NgramsState'
426 archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
427
428 -- | This function inserts whole new node story and archive for given node_id.
429 insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
430 insertNodeStory c (NodeId nId) a = do
431 _ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
432 termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
433 case headMay termIdM of
434 Nothing -> pure 0
435 Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
436 -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
437
438 pure ()
439 where
440 query :: PGS.Query
441 query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) VALUES (?, ?, ?, ?) |]
442 -- insert ngramsType ngrams ngramsRepoElement =
443 -- Insert { iTable = nodeStoryTable
444 -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
445 -- , version = sqlInt4 _a_version
446 -- , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
447 -- , ngrams_id = ...
448 -- , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
449 -- }]
450 -- , iReturning = rCount
451 -- , iOnConflict = Nothing }
452
453 insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
454 insertArchiveList c nodeId a = do
455 _ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
456 pure ()
457 where
458 query :: PGS.Query
459 query = [sql| INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
460 SELECT ?, ?, ?, ngrams.id, ? FROM ngrams WHERE terms = ? |]
461
462 deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
463 deleteArchiveList c nodeId a = do
464 _ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
465 pure ()
466 where
467 query :: PGS.Query
468 query = [sql| WITH (SELECT id FROM ngrams WHERE terms = ?) AS ngrams
469 DELETE FROM node_stories
470 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
471
472 updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
473 updateArchiveList c nodeId a = do
474 let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
475 --q <- PGS.format c query params
476 --printDebug "[updateArchiveList] query" q
477 _ <- mapM (\p -> runPGSExecute c query p) params
478 pure ()
479 where
480 query :: PGS.Query
481 query = [sql| UPDATE node_stories
482 SET ngrams_repo_element = ?
483 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
484
485 -- | This function updates the node story and archive for given node_id.
486 updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
487 updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
488 -- STEPS
489
490 -- 0. We assume we're inside an advisory lock
491
492 -- 1. Find differences (inserts/updates/deletes)
493 let currentList = archiveStateAsList $ currentArchive ^. a_state
494 let newList = archiveStateAsList $ newArchive ^. a_state
495 let currentSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> currentList
496 let newSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> newList
497
498 let inserts = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference newSet currentSet) newList
499 printDebug "[updateNodeStory] inserts" inserts
500 let deletes = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference currentSet newSet) currentList
501 printDebug "[updateNodeStory] deletes" deletes
502
503 -- updates are the things that are in new but not in current
504 let updates = Set.toList $ Set.difference (Set.fromList newList) (Set.fromList currentList)
505 printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
506
507 -- 2. Perform inserts/deletes/updates
508 insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
509 , _a_history = []
510 , _a_state = archiveStateFromList inserts }
511 printDebug "[updateNodeStory] insert applied" ()
512 -- TODO Use currentArchive ^. a_version in delete and report error
513 -- if entries with (node_id, ngrams_type_id, ngrams_id) but
514 -- different version are found.
515 deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
516 , _a_history = []
517 , _a_state = archiveStateFromList deletes }
518 printDebug "[updateNodeStory] delete applied" ()
519 updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
520 , _a_history = []
521 , _a_state = archiveStateFromList updates }
522 printDebug "[updateNodeStory] update applied" ()
523
524 pure ()
525 -- where
526 -- update = Update { uTable = nodeStoryTable
527 -- , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
528 -- NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
529 -- , ..}
530 -- , .. })
531 -- , uWhere = (\row -> node_id row .== sqlInt4 nId)
532 -- , uReturning = rCount }
533
534 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
535 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
536 -- where
537 -- delete = Delete { dTable = nodeStoryTable
538 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
539 -- , dReturning = rCount }
540
541 upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
542 upsertNodeStories c nodeId@(NodeId nId) newArchive = do
543 printDebug "[upsertNodeStories] START nId" nId
544 PGS.withTransaction c $ do
545 printDebug "[upsertNodeStories] locking nId" nId
546 runPGSAdvisoryXactLock c nId
547
548 -- whether it's insert or update, we can insert node archive history already
549 -- NOTE: It is assumed that the most recent change is the first in the
550 -- list, so we save these in reverse order
551 insertNodeArchiveHistory c nodeId $ reverse $ newArchive ^. a_history
552
553 (NodeStory m) <- getNodeStory c nodeId
554 case Map.lookup nodeId m of
555 Nothing -> do
556 _ <- insertNodeStory c nodeId newArchive
557 pure ()
558 Just currentArchive -> do
559 _ <- updateNodeStory c nodeId currentArchive newArchive
560 pure ()
561
562 printDebug "[upsertNodeStories] STOP nId" nId
563
564 writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
565 writeNodeStories c (NodeStory nls) = do
566 _ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
567 pure ()
568
569 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
570 nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
571 nodeStoryInc c Nothing nId = getNodeStory c nId
572 nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
573 case Map.lookup nId nls of
574 Nothing -> do
575 (NodeStory nls') <- getNodeStory c nId
576 pure $ NodeStory $ Map.union nls nls'
577 Just _ -> pure ns
578
579 nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
580 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
581 nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
582 nodeStoryIncs c Nothing (ni:ns) = do
583 m <- getNodeStory c ni
584 nodeStoryIncs c (Just m) ns
585
586 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
587 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
588 -- case Map.lookup ni nls of
589 -- Nothing -> do
590 -- _ <- nodeStoryRemove pool ni
591 -- pure ns
592 -- Just _ -> do
593 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
594 -- _ <- nodeStoryRemove pool ni
595 -- pure $ NodeStory ns'
596 ------------------------------------
597
598 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
599 readNodeStoryEnv pool = do
600 mvar <- nodeStoryVar pool Nothing []
601 saver <- mkNodeStorySaver pool mvar
602 -- let saver = modifyMVar_ mvar $ \mv -> do
603 -- writeNodeStories pool mv
604 -- printDebug "[readNodeStoryEnv] saver" mv
605 -- let mv' = clearHistory mv
606 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
607 -- return mv'
608 pure $ NodeStoryEnv { _nse_var = mvar
609 , _nse_saver = saver
610 , _nse_getter = nodeStoryVar pool (Just mvar) }
611
612 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
613 nodeStoryVar pool Nothing nIds = do
614 state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
615 newMVar state
616 nodeStoryVar pool (Just mv) nIds = do
617 _ <- withResource pool $ \c -> modifyMVar_ mv $ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
618 pure mv
619
620 -- Debounce is useful since it could delay the saving to some later
621 -- time, asynchronously and we keep operating on memory only.
622 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
623 mkNodeStorySaver pool mvns = mkDebounce settings
624 where
625 settings = defaultDebounceSettings
626 { debounceAction = do
627 -- NOTE: Lock MVar first, then use resource pool.
628 -- Otherwise we could wait for MVar, while
629 -- blocking the pool connection.
630 modifyMVar_ mvns $ \ns -> do
631 withResource pool $ \c -> do
632 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
633 writeNodeStories c ns
634 pure $ clearHistory ns
635 --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
636 , debounceFreq = 1*minute
637 }
638 minute = 60*second
639 second = 10^(6 :: Int)
640
641 clearHistory :: NodeListStory -> NodeListStory
642 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
643 where
644 emptyHistory = [] :: [NgramsStatePatch']
645
646 currentVersion :: (HasNodeStory env err m) => ListId -> m Version
647 currentVersion listId = do
648 pool <- view connPool
649 nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
650 pure $ nls ^. unNodeStory . at listId . _Just . a_version
651
652
653 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
654 -- mkNodeStorySaver mvns = mkDebounce settings
655 -- where
656 -- settings = defaultDebounceSettings
657 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
658 -- , debounceFreq = 1 * minute
659 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
660 -- }
661 -- minute = 60 * second
662 -- second = 10^(6 :: Int)
663
664
665 -----------------------------------------