]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
[nodeStory] fixes to saving nodeStory
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 Couple of words on how this is implemented.
14
15 First version used files which stored Archive for each NodeId in a
16 separate .cbor file.
17
18 For performance reasons, it is rewritten to use the DB.
19
20 The table `node_stories` contains two columns: `node_id` and
21 `archive`.
22
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
28
29 Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
34
35 Please note that
36
37 TODO:
38 - remove
39 - filter
40 - charger les listes
41 -}
42
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
48
49 module Gargantext.Core.NodeStory
50 ( HasNodeStory
51 , HasNodeStoryEnv
52 , hasNodeStory
53 , HasNodeStoryVar
54 , hasNodeStoryVar
55 , HasNodeStorySaver
56 , hasNodeStorySaver
57 , HasNodeStoryImmediateSaver
58 , hasNodeStoryImmediateSaver
59 , NodeStory(..)
60 , NgramsStatePatch'
61 , NodeListStory
62 , initNodeListStoryMock
63 , NodeStoryEnv(..)
64 , initNodeStory
65 , nse_getter
66 , nse_saver
67 , nse_saver_immediate
68 , nse_var
69 , unNodeStory
70 , getNodeArchiveHistory
71 , Archive(..)
72 , initArchive
73 , insertArchiveList
74 , deleteArchiveList
75 , updateArchiveList
76 , a_history
77 , a_state
78 , a_version
79 , nodeExists
80 , runPGSQuery
81 , runPGSAdvisoryLock
82 , runPGSAdvisoryUnlock
83 , runPGSAdvisoryXactLock
84 , getNodesIdWithType
85 , readNodeStoryEnv
86 , upsertNodeStories
87 , getNodeStory
88 , nodeStoriesQuery
89 , currentVersion )
90 where
91
92 -- import Debug.Trace (traceShow)
93 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
94 import Codec.Serialise.Class
95 import Control.Concurrent (MVar(), newMVar, modifyMVar_)
96 import Control.Exception (catch, throw, SomeException(..))
97 import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
98 import Control.Monad.Except
99 import Control.Monad.Reader
100 import Data.Aeson hiding ((.=), decode)
101 import Data.ByteString.Char8 (hPutStrLn)
102 import Data.Map.Strict (Map)
103 import Data.Maybe (catMaybes)
104 import Data.Monoid
105 import Data.Pool (Pool, withResource)
106 import Data.Semigroup
107 import Database.PostgreSQL.Simple.SqlQQ (sql)
108 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
109 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
110 import GHC.Generics (Generic)
111 import Gargantext.API.Ngrams.Types
112 import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
113 import Gargantext.Core.Utils.Prefix (unPrefix)
114 import Gargantext.Database.Admin.Config (nodeTypeId)
115 import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
116 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
117 import Gargantext.Prelude
118 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
119 import System.IO (stderr)
120 import qualified Data.Map.Strict as Map
121 import qualified Data.Map.Strict.Patch as PM
122 import qualified Data.Set as Set
123 import qualified Data.Text as Text
124 import qualified Database.PostgreSQL.Simple as PGS
125 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
126
127 ------------------------------------------------------------------------
128 data NodeStoryEnv = NodeStoryEnv
129 { _nse_var :: !(MVar NodeListStory)
130 , _nse_saver :: !(IO ())
131 , _nse_saver_immediate :: !(IO ())
132 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
133 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
134 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
135 }
136 deriving (Generic)
137
138 type HasNodeStory env err m = ( CmdM' env err m
139 , MonadReader env m
140 , MonadError err m
141 , HasNodeStoryEnv env
142 , HasConfig env
143 , HasConnectionPool env
144 , HasNodeError err
145 )
146
147 class (HasNodeStoryVar env, HasNodeStorySaver env)
148 => HasNodeStoryEnv env where
149 hasNodeStory :: Getter env NodeStoryEnv
150
151 class HasNodeStoryVar env where
152 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
153
154 class HasNodeStorySaver env where
155 hasNodeStorySaver :: Getter env (IO ())
156
157 class HasNodeStoryImmediateSaver env where
158 hasNodeStoryImmediateSaver :: Getter env (IO ())
159
160 ------------------------------------------------------------------------
161
162 {- | Node Story for each NodeType where the Key of the Map is NodeId
163 TODO : generalize for any NodeType, let's start with NodeList which
164 is implemented already
165 -}
166 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
167 deriving (Generic, Show)
168
169 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
170 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
171 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
172
173 data Archive s p = Archive
174 { _a_version :: !Version
175 , _a_state :: !s
176 , _a_history :: ![p]
177 -- first patch in the list is the most recent
178 -- We use `take` in `commitStatePatch`, that's why.
179
180 -- History is immutable, we just insert things on top of existing
181 -- list.
182
183 -- We don't need to store the whole history in memory, this
184 -- structure holds only recent history, the one that will be
185 -- inserted to the DB.
186 }
187 deriving (Generic, Show)
188
189 instance (Serialise s, Serialise p) => Serialise (Archive s p)
190
191
192 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
193
194 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
195 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
196 instance Serialise NgramsStatePatch'
197 instance FromField (Archive NgramsState' NgramsStatePatch')
198 where
199 fromField = fromJSONField
200 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
201 where
202 defaultFromField = fromPGSFromField
203
204 -- | Combine `NgramsState'`. This is because the structure is (Map
205 -- NgramsType (Map ...)) and the default `(<>)` operator is
206 -- left-biased
207 -- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
208 combineState :: NgramsState' -> NgramsState' -> NgramsState'
209 combineState = Map.unionWith (<>)
210
211 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
212 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
213 , _a_state = s'
214 , _a_history = p' }) =
215 Archive { _a_version = v'
216 , _a_state = s'
217 , _a_history = p' <> p }
218 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
219 mempty = Archive { _a_version = 0
220 , _a_state = mempty
221 , _a_history = [] }
222 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
223 parseJSON = genericParseJSON $ unPrefix "_a_"
224 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
225 toJSON = genericToJSON $ unPrefix "_a_"
226 toEncoding = genericToEncoding $ unPrefix "_a_"
227
228 ------------------------------------------------------------------------
229 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
230 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
231
232 initArchive :: (Monoid s, Semigroup p) => Archive s p
233 initArchive = mempty
234
235 initNodeListStoryMock :: NodeListStory
236 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
237 where
238 nodeListId = 0
239 archive = Archive { _a_version = 0
240 , _a_state = ngramsTableMap
241 , _a_history = [] }
242 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
243 $ Map.fromList
244 [ (n ^. ne_ngrams, ngramsElementToRepo n)
245 | n <- mockTable ^. _NgramsTable
246 ]
247
248 ------------------------------------------------------------------------
249
250
251 ------------------------------------------------------------------------
252 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
253 makeLenses ''NodeStoryEnv
254 makeLenses ''NodeStory
255 makeLenses ''Archive
256
257 ----------------------------------------------------------------------
258 data NodeStoryPoly nid v ngtid ngid nre =
259 NodeStoryDB { node_id :: nid
260 , version :: v
261 , ngrams_type_id :: ngtid
262 , ngrams_id :: ngid
263 , ngrams_repo_element :: nre }
264 deriving (Eq)
265
266 data NodeStoryArchivePoly nid a =
267 NodeStoryArchiveDB { a_node_id :: nid
268 , archive :: a }
269 deriving (Eq)
270
271 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
272 $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
273
274 -- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
275 -- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
276
277 -- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
278 -- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
279
280 type ArchiveList = Archive NgramsState' NgramsStatePatch'
281
282 -- DB stuff
283
284 runPGSExecute :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO Int64
285 runPGSExecute c qs a = catch (PGS.execute c qs a) printError
286 where
287 printError (SomeException e) = do
288 --q' <- PGS.formatQuery c qs a
289 _ <- panic $ Text.pack $ show e
290 throw (SomeException e)
291
292 runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
293 runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
294 where
295 printError (SomeException e) = do
296 --q' <- PGS.formatQuery c qs a
297 _ <- panic $ Text.pack $ show e
298 throw (SomeException e)
299
300 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO [r]
301 runPGSQuery c q a = catch (PGS.query c q a) printError
302 where
303 printError (SomeException e) = do
304 q' <- PGS.formatQuery c q a
305 hPutStrLn stderr q'
306 throw (SomeException e)
307
308 runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
309 runPGSAdvisoryLock c id = do
310 _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
311 pure ()
312
313 runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
314 runPGSAdvisoryUnlock c id = do
315 _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
316 pure ()
317
318 runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
319 runPGSAdvisoryXactLock c id = do
320 _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
321 pure ()
322
323 nodeExists :: PGS.Connection -> NodeId -> IO Bool
324 nodeExists c nId = (== [PGS.Only True])
325 <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
326
327 getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
328 getNodesIdWithType c nt = do
329 ns <- runPGSQuery c query (PGS.Only $ nodeTypeId nt)
330 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
331 where
332 query :: PGS.Query
333 query = [sql| SELECT id FROM nodes WHERE typename = ? |]
334
335
336
337 -- nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
338 -- nodeStoryTable =
339 -- Table "node_stories"
340 -- ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
341 -- , version = tableField "version"
342 -- , ngrams_type_id = tableField "ngrams_type_id"
343 -- , ngrams_id = tableField "ngrams_id"
344 -- , ngrams_repo_element = tableField "ngrams_repo_element"
345 -- } )
346
347 -- nodeStoryArchiveTable :: Table NodeStoryArchiveRead NodeStoryArchiveWrite
348 -- nodeStoryArchiveTable =
349 -- Table "node_story_archive_history"
350 -- ( pNodeArchiveStory NodeStoryArchiveDB { a_node_id = tableField "node_id"
351 -- , archive = tableField "archive" } )
352
353 -- nodeStorySelect :: Select NodeStoryRead
354 -- nodeStorySelect = selectTable nodeStoryTable
355
356 -- NOTE "first patch in the _a_history list is the most recent"
357 getNodeArchiveHistory :: PGS.Connection -> NodeId -> IO [NgramsStatePatch']
358 getNodeArchiveHistory c nodeId = do
359 as <- runPGSQuery c query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
360 pure $ (\(ngramsType, terms, patch) -> fst $ PM.singleton ngramsType (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> as
361 where
362 query :: PGS.Query
363 query = [sql| SELECT ngrams_type_id, terms, patch
364 FROM node_story_archive_history
365 JOIN ngrams ON ngrams.id = ngrams_id
366 WHERE node_id = ?
367 ORDER BY (version, node_story_archive_history.id) DESC |]
368
369 ngramsIdQuery :: PGS.Query
370 ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
371
372
373 insertNodeArchiveHistory :: PGS.Connection -> NodeId -> Version -> [NgramsStatePatch'] -> IO ()
374 insertNodeArchiveHistory _ _ _ [] = pure ()
375 insertNodeArchiveHistory c nodeId version (h:hs) = do
376 let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
377 (\(term, p) ->
378 (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
379 tuplesM <- mapM (\(nId, nType, term, patch) -> do
380 ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
381 pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
382 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
383 _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> (catMaybes tuplesM))
384 _ <- insertNodeArchiveHistory c nodeId version hs
385 pure ()
386 where
387
388 query :: PGS.Query
389
390 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version)
391 VALUES (?, ?, ?, ?, ?) |]
392
393 getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
394 getNodeStory c nId@(NodeId nodeId) = do
395 --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
396 res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
397 -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
398 -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
399 let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
400 Archive { _a_version = version
401 , _a_history = []
402 , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
403 -- NOTE Sanity check: all versions in the DB should be the same
404 let versionsS = Set.fromList $ map (\a -> a ^. a_version) dbData
405 if Set.size versionsS > 1 then
406 panic $ Text.pack $ "[getNodeStory] versions for " <> show nodeId <> " differ! " <> show versionsS
407 else
408 pure ()
409 -- NOTE When concatenating, check that the same version is for all states
410 pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
411 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
412 where
413 -- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
414 combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
415 & a_version .~ (a2 ^. a_version) -- version should be updated from list, not taken from the empty Archive
416
417 nodeStoriesQuery :: PGS.Query
418 nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
419 FROM node_stories
420 JOIN ngrams ON ngrams.id = ngrams_id
421 WHERE node_id = ? |]
422
423 type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
424
425 -- Functions to convert archive state (which is a Map NgramsType (Map
426 -- NgramsTerm NgramsRepoElement)) to/from a flat list
427 archiveStateAsList :: NgramsState' -> ArchiveStateList
428 archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
429
430 archiveStateFromList :: ArchiveStateList -> NgramsState'
431 archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
432
433 -- | This function inserts whole new node story and archive for given node_id.
434 insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
435 insertNodeStory c (NodeId nId) a = do
436 _ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
437 termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
438 case headMay termIdM of
439 Nothing -> pure 0
440 Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
441 -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
442
443 pure ()
444 where
445 query :: PGS.Query
446 query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) VALUES (?, ?, ?, ?) |]
447 -- insert ngramsType ngrams ngramsRepoElement =
448 -- Insert { iTable = nodeStoryTable
449 -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
450 -- , version = sqlInt4 _a_version
451 -- , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
452 -- , ngrams_id = ...
453 -- , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
454 -- }]
455 -- , iReturning = rCount
456 -- , iOnConflict = Nothing }
457
458 insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
459 insertArchiveList c nodeId a = do
460 _ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, a ^. a_version, nt, nre, n)) (archiveStateAsList $ a ^. a_state)
461 --_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
462 pure ()
463 where
464 query :: PGS.Query
465 query = [sql| WITH s as (SELECT ? as sid, ? sversion, ? sngrams_type_id, ngrams.id as sngrams_id, ?::jsonb as srepo FROM ngrams WHERE terms = ?)
466 INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
467 SELECT s.sid, s.sversion, s.sngrams_type_id, s.sngrams_id, s.srepo from s s join nodes n on s.sid = n.id
468 |]
469
470 deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
471 deleteArchiveList c nodeId a = do
472 _ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) (archiveStateAsList $ a ^. a_state)
473 --_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
474 pure ()
475 where
476 query :: PGS.Query
477 query = [sql| DELETE FROM node_stories
478 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
479
480 updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
481 updateArchiveList c nodeId a = do
482 let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
483 --q <- PGS.format c query params
484 --printDebug "[updateArchiveList] query" q
485 _ <- mapM (\p -> runPGSExecute c query p) params
486 pure ()
487 where
488 query :: PGS.Query
489 query = [sql| UPDATE node_stories
490 SET ngrams_repo_element = ?
491 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
492
493 -- | This function updates the node story and archive for given node_id.
494 updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
495 updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
496 -- STEPS
497
498 -- 0. We assume we're inside an advisory lock
499
500 -- 1. Find differences (inserts/updates/deletes)
501 let currentList = archiveStateAsList $ currentArchive ^. a_state
502 let newList = archiveStateAsList $ newArchive ^. a_state
503 let currentSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> currentList
504 let newSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> newList
505
506 let inserts = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference newSet currentSet) newList
507 --printDebug "[updateNodeStory] inserts" inserts
508 let deletes = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference currentSet newSet) currentList
509 --printDebug "[updateNodeStory] deletes" deletes
510
511 -- updates are the things that are in new but not in current
512 let updates = Set.toList $ Set.difference (Set.fromList newList) (Set.fromList currentList)
513 --printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
514
515 -- 2. Perform inserts/deletes/updates
516 --printDebug "[updateNodeStory] applying insert" ()
517 insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
518 , _a_history = []
519 , _a_state = archiveStateFromList inserts }
520 --printDebug "[updateNodeStory] insert applied" ()
521 --TODO Use currentArchive ^. a_version in delete and report error
522 -- if entries with (node_id, ngrams_type_id, ngrams_id) but
523 -- different version are found.
524 deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
525 , _a_history = []
526 , _a_state = archiveStateFromList deletes }
527 --printDebug "[updateNodeStory] delete applied" ()
528 updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
529 , _a_history = []
530 , _a_state = archiveStateFromList updates }
531 --printDebug "[updateNodeStory] update applied" ()
532
533 pure ()
534 -- where
535 -- update = Update { uTable = nodeStoryTable
536 -- , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
537 -- NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
538 -- , ..}
539 -- , .. })
540 -- , uWhere = (\row -> node_id row .== sqlInt4 nId)
541 -- , uReturning = rCount }
542
543 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
544 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
545 -- where
546 -- delete = Delete { dTable = nodeStoryTable
547 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
548 -- , dReturning = rCount }
549
550 upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
551 upsertNodeStories c nodeId@(NodeId nId) newArchive = do
552 printDebug "[upsertNodeStories] START nId" nId
553 PGS.withTransaction c $ do
554 printDebug "[upsertNodeStories] locking nId" nId
555 runPGSAdvisoryXactLock c nId
556
557 -- whether it's insert or update, we can insert node archive history already
558 -- NOTE: It is assumed that the most recent change is the first in the
559 -- list, so we save these in reverse order
560 insertNodeArchiveHistory c nodeId (newArchive ^. a_version) $ reverse $ newArchive ^. a_history
561
562 (NodeStory m) <- getNodeStory c nodeId
563 case Map.lookup nodeId m of
564 Nothing -> do
565 _ <- insertNodeStory c nodeId newArchive
566 pure ()
567 Just currentArchive -> do
568 _ <- updateNodeStory c nodeId currentArchive newArchive
569 pure ()
570
571 -- 3. Now we need to set versions of all node state to be the same
572 fixNodeStoryVersion c nodeId newArchive
573
574 printDebug "[upsertNodeStories] STOP nId" nId
575
576 fixNodeStoryVersion :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
577 fixNodeStoryVersion c nodeId newArchive = do
578 let ngramsTypes = Map.keys $ newArchive ^. a_state
579 _ <- mapM_ (\nt -> runPGSExecute c query (newArchive ^. a_version, nodeId, nt)) ngramsTypes
580 pure ()
581 where
582 query :: PGS.Query
583 query = [sql|UPDATE node_stories
584 SET version = ?
585 WHERE node_id = ?
586 AND ngrams_type_id IN ?|]
587
588 writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
589 writeNodeStories c (NodeStory nls) = do
590 _ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
591 pure ()
592
593 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
594 nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
595 nodeStoryInc c Nothing nId = getNodeStory c nId
596 nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
597 case Map.lookup nId nls of
598 Nothing -> do
599 (NodeStory nls') <- getNodeStory c nId
600 pure $ NodeStory $ Map.union nls nls'
601 Just _ -> pure ns
602
603 nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
604 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
605 nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
606 nodeStoryIncs c Nothing (ni:ns) = do
607 m <- getNodeStory c ni
608 nodeStoryIncs c (Just m) ns
609
610 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
611 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
612 -- case Map.lookup ni nls of
613 -- Nothing -> do
614 -- _ <- nodeStoryRemove pool ni
615 -- pure ns
616 -- Just _ -> do
617 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
618 -- _ <- nodeStoryRemove pool ni
619 -- pure $ NodeStory ns'
620 ------------------------------------
621
622 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
623 readNodeStoryEnv pool = do
624 mvar <- nodeStoryVar pool Nothing []
625 let saver_immediate = modifyMVar_ mvar $ \ns -> do
626 withResource pool $ \c -> do
627 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
628 writeNodeStories c ns
629 pure $ clearHistory ns
630 saver <- mkNodeStorySaver saver_immediate
631 -- let saver = modifyMVar_ mvar $ \mv -> do
632 -- writeNodeStories pool mv
633 -- printDebug "[readNodeStoryEnv] saver" mv
634 -- let mv' = clearHistory mv
635 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
636 -- return mv'
637 pure $ NodeStoryEnv { _nse_var = mvar
638 , _nse_saver = saver
639 , _nse_saver_immediate = saver_immediate
640 , _nse_getter = nodeStoryVar pool (Just mvar)
641 }
642
643 nodeStoryVar :: Pool PGS.Connection
644 -> Maybe (MVar NodeListStory)
645 -> [NodeId]
646 -> IO (MVar NodeListStory)
647 nodeStoryVar pool Nothing nIds = do
648 state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
649 newMVar state
650 nodeStoryVar pool (Just mv) nIds = do
651 _ <- withResource pool
652 $ \c -> modifyMVar_ mv
653 $ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
654 pure mv
655
656 -- Debounce is useful since it could delay the saving to some later
657 -- time, asynchronously and we keep operating on memory only.
658 -- mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
659 -- mkNodeStorySaver pool mvns = do
660 mkNodeStorySaver :: IO () -> IO (IO ())
661 mkNodeStorySaver saver = mkDebounce settings
662 where
663 settings = defaultDebounceSettings
664 { debounceAction = saver
665 -- do
666 -- -- NOTE: Lock MVar first, then use resource pool.
667 -- -- Otherwise we could wait for MVar, while
668 -- -- blocking the pool connection.
669 -- modifyMVar_ mvns $ \ns -> do
670 -- withResource pool $ \c -> do
671 -- --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
672 -- writeNodeStories c ns
673 -- pure $ clearHistory ns
674 --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
675 , debounceFreq = 1*minute
676 }
677 minute = 60*second
678 second = 10^(6 :: Int)
679
680 clearHistory :: NodeListStory -> NodeListStory
681 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
682 where
683 emptyHistory = [] :: [NgramsStatePatch']
684
685 currentVersion :: (HasNodeStory env err m) => ListId -> m Version
686 currentVersion listId = do
687 pool <- view connPool
688 nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
689 pure $ nls ^. unNodeStory . at listId . _Just . a_version
690
691
692 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
693 -- mkNodeStorySaver mvns = mkDebounce settings
694 -- where
695 -- settings = defaultDebounceSettings
696 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
697 -- , debounceFreq = 1 * minute
698 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
699 -- }
700 -- minute = 60 * second
701 -- second = 10^(6 :: Int)
702
703
704 -----------------------------------------