2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
13 Couple of words on how this is implemented.
15 First version used files which stored Archive for each NodeId in a
18 For performance reasons, it is rewritten to use the DB.
20 The table `node_stories` contains two columns: `node_id` and
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
29 Moreover, since in `G.A.Ngrams.commitStatePatch` we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
49 module Gargantext.Core.NodeStory
57 , HasNodeStoryImmediateSaver
58 , hasNodeStoryImmediateSaver
59 , HasNodeArchiveStoryImmediateSaver
60 , hasNodeArchiveStoryImmediateSaver
64 , initNodeListStoryMock
70 , nse_archive_saver_immediate
73 , getNodesArchiveHistory
82 , runPGSAdvisoryUnlock
83 , runPGSAdvisoryXactLock
90 , archiveStateFromList
92 , fixNodeStoryVersions )
95 -- import Debug.Trace (traceShow)
96 import Codec.Serialise.Class
97 import Control.Concurrent (MVar(), newMVar, modifyMVar_)
98 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
99 import Control.Exception (catch, throw, SomeException(..))
100 import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
101 import Control.Monad.Except
102 import Control.Monad.Reader
103 import Data.Aeson hiding ((.=), decode)
104 import Data.ByteString.Char8 (hPutStrLn)
105 import Data.HashMap.Strict (HashMap)
106 import Data.Map.Strict (Map)
107 import Data.Maybe (catMaybes)
109 import Data.Pool (Pool, withResource)
110 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
111 import Data.Semigroup
112 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
113 import Database.PostgreSQL.Simple.SqlQQ (sql)
114 import Database.PostgreSQL.Simple.Types (Values(..), QualifiedIdentifier(..))
115 import GHC.Generics (Generic)
116 import Gargantext.API.Ngrams.Types
117 import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
118 import Gargantext.Core.Utils.Prefix (unPrefix)
119 import Gargantext.Database.Admin.Config (nodeTypeId)
120 import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
121 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
122 import Gargantext.Database.Schema.Ngrams (NgramsType)
123 import Gargantext.Prelude
124 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
125 import System.IO (stderr)
126 import qualified Data.HashMap.Strict as HashMap
127 import qualified Data.Map.Strict as Map
128 import qualified Data.Map.Strict.Patch as PM
129 import qualified Data.Set as Set
130 import qualified Data.Text as Text
131 import qualified Database.PostgreSQL.Simple as PGS
132 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
134 ------------------------------------------------------------------------
135 data NodeStoryEnv = NodeStoryEnv
136 { _nse_var :: !(MVar NodeListStory)
137 , _nse_saver :: !(IO ())
138 , _nse_saver_immediate :: !(IO ())
139 , _nse_archive_saver_immediate :: !(NodeListStory -> IO NodeListStory)
140 , _nse_getter :: !([NodeId] -> IO (MVar NodeListStory))
141 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
142 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
146 type HasNodeStory env err m = ( CmdM' env err m
149 , HasNodeStoryEnv env
151 , HasConnectionPool env
155 class (HasNodeStoryVar env, HasNodeStorySaver env)
156 => HasNodeStoryEnv env where
157 hasNodeStory :: Getter env NodeStoryEnv
159 class HasNodeStoryVar env where
160 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
162 class HasNodeStorySaver env where
163 hasNodeStorySaver :: Getter env (IO ())
165 class HasNodeStoryImmediateSaver env where
166 hasNodeStoryImmediateSaver :: Getter env (IO ())
168 class HasNodeArchiveStoryImmediateSaver env where
169 hasNodeArchiveStoryImmediateSaver :: Getter env (NodeListStory -> IO NodeListStory)
171 ------------------------------------------------------------------------
173 {- | Node Story for each NodeType where the Key of the Map is NodeId
174 TODO : generalize for any NodeType, let's start with NodeList which
175 is implemented already
177 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
178 deriving (Generic, Show)
180 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
181 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
182 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
184 data Archive s p = Archive
185 { _a_version :: !Version
188 -- first patch in the list is the most recent
189 -- We use `take` in `commitStatePatch`, that's why.
191 -- History is immutable, we just insert things on top of existing
194 -- We don't need to store the whole history in memory, this
195 -- structure holds only recent history, the one that will be
196 -- inserted to the DB.
198 deriving (Generic, Show)
200 instance (Serialise s, Serialise p) => Serialise (Archive s p)
203 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
205 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
206 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
207 instance Serialise NgramsStatePatch'
208 instance FromField (Archive NgramsState' NgramsStatePatch')
210 fromField = fromJSONField
211 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
213 defaultFromField = fromPGSFromField
215 -- | Combine `NgramsState'`. This is because the structure is (Map
216 -- NgramsType (Map ...)) and the default `(<>)` operator is
218 -- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
219 combineState :: NgramsState' -> NgramsState' -> NgramsState'
220 combineState = Map.unionWith (<>)
222 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
223 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
225 , _a_history = p' }) =
226 Archive { _a_version = v'
228 , _a_history = p' <> p }
229 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
230 mempty = Archive { _a_version = 0
233 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
234 parseJSON = genericParseJSON $ unPrefix "_a_"
235 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
236 toJSON = genericToJSON $ unPrefix "_a_"
237 toEncoding = genericToEncoding $ unPrefix "_a_"
239 ------------------------------------------------------------------------
240 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
241 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
243 initArchive :: (Monoid s, Semigroup p) => Archive s p
246 initNodeListStoryMock :: NodeListStory
247 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
250 archive = Archive { _a_version = 0
251 , _a_state = ngramsTableMap
253 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
255 [ (n ^. ne_ngrams, ngramsElementToRepo n)
256 | n <- mockTable ^. _NgramsTable
259 ------------------------------------------------------------------------
260 ------------------------------------------------------------------------
261 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
262 makeLenses ''NodeStoryEnv
263 makeLenses ''NodeStory
266 ----------------------------------------------------------------------
267 data NodeStoryPoly nid v ngtid ngid nre =
268 NodeStoryDB { node_id :: !nid
270 , ngrams_type_id :: !ngtid
272 , ngrams_repo_element :: !nre }
275 data NodeStoryArchivePoly nid a =
276 NodeStoryArchiveDB { a_node_id :: !nid
280 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
281 $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
283 -- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
284 -- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
286 -- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
287 -- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
289 type ArchiveList = Archive NgramsState' NgramsStatePatch'
293 runPGSExecute :: (PGS.ToRow q)
294 => PGS.Connection -> PGS.Query -> q -> IO Int64
295 runPGSExecute c qs a = catch (PGS.execute c qs a) printError
297 printError (SomeException e) = do
298 --q' <- PGS.formatQuery c qs a
299 _ <- panic $ Text.pack $ show e
300 throw (SomeException e)
302 runPGSExecuteMany :: (PGS.ToRow q)
303 => PGS.Connection -> PGS.Query -> [q] -> IO Int64
304 runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
306 printError (SomeException e) = do
307 --q' <- PGS.formatQuery c qs a
308 _ <- panic $ Text.pack $ show e
309 throw (SomeException e)
311 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q)
312 => PGS.Connection -> PGS.Query -> q -> IO [r]
313 runPGSQuery c q a = catch (PGS.query c q a) printError
315 printError (SomeException e) = do
316 q' <- PGS.formatQuery c q a
318 throw (SomeException e)
320 runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
321 runPGSAdvisoryLock c id = do
322 _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |]
323 (PGS.Only id) :: IO [PGS.Only ()]
326 runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
327 runPGSAdvisoryUnlock c id = do
328 _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |]
329 (PGS.Only id) :: IO [PGS.Only Bool]
332 runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
333 runPGSAdvisoryXactLock c id = do
334 _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |]
335 (PGS.Only id) :: IO [PGS.Only ()]
338 nodeExists :: PGS.Connection -> NodeId -> IO Bool
339 nodeExists c nId = (== [PGS.Only True])
340 <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |]
343 getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
344 getNodesIdWithType c nt = do
345 ns <- runPGSQuery c query (PGS.Only $ nodeTypeId nt)
346 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
349 query = [sql| SELECT id FROM nodes WHERE typename = ? |]
352 -- /!\ This function is using an hard coded parameter
353 -- which depends on the Ngrams List Flow
354 -- Version > 5 is hard coded because by default
355 -- first version of history of manual change is 6
356 getNodesArchiveHistory :: PGS.Connection
358 -> IO [(NodeId, (Map NgramsType [HashMap NgramsTerm NgramsPatch]))]
359 getNodesArchiveHistory c nodesId = do
360 as <- runPGSQuery c query (PGS.Only $ Values fields nodesId)
361 :: IO [(Int, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
363 pure $ map (\(nId, ngramsType, terms, patch)
365 , Map.singleton ngramsType [HashMap.singleton terms patch]
370 fields = [QualifiedIdentifier Nothing "int4"]
372 query = [sql| WITH nodes_id(nid) as (?)
373 SELECT node_id, ngrams_type_id, terms, patch
374 FROM node_story_archive_history
375 JOIN ngrams ON ngrams.id = ngrams_id
376 JOIN nodes_id n ON node_id = n.nid
378 ORDER BY (version, node_story_archive_history.id) DESC
381 ngramsIdQuery :: PGS.Query
382 ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
385 insertNodeArchiveHistory :: PGS.Connection -> NodeId -> Version -> [NgramsStatePatch'] -> IO ()
386 insertNodeArchiveHistory _ _ _ [] = pure ()
387 insertNodeArchiveHistory c nodeId version (h:hs) = do
388 let tuples = mconcat $ (\(nType, NgramsTablePatch patch) ->
390 (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
391 tuplesM <- mapM (\(nId, nType, term, patch) -> do
392 ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
393 pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
394 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
395 _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> catMaybes tuplesM)
396 _ <- insertNodeArchiveHistory c nodeId version hs
399 -- https://stackoverflow.com/questions/39224438/postgresql-insert-if-foreign-key-exists
401 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version)
402 SELECT node_id, ngrams_type_id, ngrams_id, patch::jsonb, version FROM (
403 VALUES (?, ?, ?, ?, ?)
404 ) AS i(node_id, ngrams_type_id, ngrams_id, patch, version)
406 SELECT * FROM nodes where nodes.id = node_id
409 getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
410 getNodeStory c nId@(NodeId nodeId) = do
411 --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
412 res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
413 -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
414 -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
415 let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
416 Archive { _a_version = version
418 , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
419 -- NOTE Sanity check: all versions in the DB should be the same
420 -- TODO Maybe redesign the DB so that `node_stories` has only
421 -- `node_id`, `version` and there is a M2M table
422 -- `node_stories_ngrams` without the `version` colum? Then we would
423 -- have `version` in only one place.
426 let versionsS = Set.fromList $ map (\a -> a ^. a_version) dbData
427 if Set.size versionsS > 1 then
428 panic $ Text.pack $ "[getNodeStory] versions for " <> show nodeId <> " differ! " <> show versionsS
433 pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
435 -- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
436 combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
437 & a_version .~ (a2 ^. a_version) -- version should be updated from list, not taken from the empty Archive
439 nodeStoriesQuery :: PGS.Query
440 nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
442 JOIN ngrams ON ngrams.id = ngrams_id
445 type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
446 type ArchiveStateSet = Set.Set (TableNgrams.NgramsType, NgramsTerm)
448 -- |Functions to convert archive state (which is a `Map NgramsType
449 -- (Map NgramsTerm NgramsRepoElement`)) to/from a flat list
450 archiveStateToList :: NgramsState' -> ArchiveStateList
451 archiveStateToList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
453 archiveStateFromList :: ArchiveStateList -> NgramsState'
454 archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
456 archiveStateSet :: ArchiveStateList -> ArchiveStateSet
457 archiveStateSet lst = Set.fromList $ (\(nt, term, _) -> (nt, term)) <$> lst
459 archiveStateListFilterFromSet :: ArchiveStateSet -> ArchiveStateList -> ArchiveStateList
460 archiveStateListFilterFromSet set =
461 filter (\(nt, term, _) -> Set.member (nt, term) set)
463 -- | This function inserts whole new node story and archive for given node_id.
464 insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
465 insertNodeStory c (NodeId nId) a = do
466 mapM_ (\(ngramsType, ngrams, ngramsRepoElement) -> do
467 termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
468 case headMay termIdM of
470 Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateToList $ a ^. a_state
471 -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateToList _a_state
474 -- https://stackoverflow.com/questions/39224438/postgresql-insert-if-foreign-key-exists
476 query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
479 ) AS i(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
481 SELECT * FROM nodes where nodes.id = node_id
483 -- insert ngramsType ngrams ngramsRepoElement =
484 -- Insert { iTable = nodeStoryTable
485 -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
486 -- , version = sqlInt4 _a_version
487 -- , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
489 -- , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
491 -- , iReturning = rCount
492 -- , iOnConflict = Nothing }
494 insertArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
495 insertArchiveStateList c nodeId version as = do
496 mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, version, nt, nre, n)) as
499 query = [sql| WITH s as (SELECT ? as sid, ? sversion, ? sngrams_type_id, ngrams.id as sngrams_id, ?::jsonb as srepo FROM ngrams WHERE terms = ?)
500 INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
501 SELECT s.sid, s.sversion, s.sngrams_type_id, s.sngrams_id, s.srepo from s s join nodes n on s.sid = n.id
504 deleteArchiveStateList :: PGS.Connection -> NodeId -> ArchiveStateList -> IO ()
505 deleteArchiveStateList c nodeId as = do
506 mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) as
509 query = [sql| DELETE FROM node_stories
510 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
512 updateArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
513 updateArchiveStateList c nodeId version as = do
514 let params = (\(nt, n, nre) -> (nre, version, nodeId, nt, n)) <$> as
515 --q <- PGS.format c query params
516 --printDebug "[updateArchiveList] query" q
517 mapM_ (runPGSExecute c query) params
520 query = [sql| UPDATE node_stories
521 SET ngrams_repo_element = ?, version = ?
522 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
524 -- | This function updates the node story and archive for given node_id.
525 updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
526 updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
529 -- 0. We assume we're inside an advisory lock
531 -- 1. Find differences (inserts/updates/deletes)
532 let currentList = archiveStateToList $ currentArchive ^. a_state
533 let newList = archiveStateToList $ newArchive ^. a_state
534 let currentSet = archiveStateSet currentList
535 let newSet = archiveStateSet newList
537 printDebug "[updateNodeStory] new - current = " $ Set.difference newSet currentSet
538 let inserts = archiveStateListFilterFromSet (Set.difference newSet currentSet) newList
539 -- printDebug "[updateNodeStory] inserts" inserts
541 printDebug "[updateNodeStory] current - new" $ Set.difference currentSet newSet
542 let deletes = archiveStateListFilterFromSet (Set.difference currentSet newSet) currentList
543 -- printDebug "[updateNodeStory] deletes" deletes
545 -- updates are the things that are in new but not in current
546 let commonSet = Set.intersection currentSet newSet
547 let commonNewList = archiveStateListFilterFromSet commonSet newList
548 let commonCurrentList = archiveStateListFilterFromSet commonSet currentList
549 let updates = Set.toList $ Set.difference (Set.fromList commonNewList) (Set.fromList commonCurrentList)
550 printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
552 -- 2. Perform inserts/deletes/updates
553 --printDebug "[updateNodeStory] applying insert" ()
554 insertArchiveStateList c nodeId (newArchive ^. a_version) inserts
555 --printDebug "[updateNodeStory] insert applied" ()
556 --TODO Use currentArchive ^. a_version in delete and report error
557 -- if entries with (node_id, ngrams_type_id, ngrams_id) but
558 -- different version are found.
559 deleteArchiveStateList c nodeId deletes
560 --printDebug "[updateNodeStory] delete applied" ()
561 updateArchiveStateList c nodeId (newArchive ^. a_version) updates
562 --printDebug "[updateNodeStory] update applied" ()
566 -- update = Update { uTable = nodeStoryTable
567 -- , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
568 -- NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
571 -- , uWhere = (\row -> node_id row .== sqlInt4 nId)
572 -- , uReturning = rCount }
574 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
575 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
577 -- delete = Delete { dTable = nodeStoryTable
578 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
579 -- , dReturning = rCount }
581 upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
582 upsertNodeStories c nodeId@(NodeId nId) newArchive = do
583 printDebug "[upsertNodeStories] START nId" nId
584 PGS.withTransaction c $ do
585 printDebug "[upsertNodeStories] locking nId" nId
586 runPGSAdvisoryXactLock c nId
588 (NodeStory m) <- getNodeStory c nodeId
589 case Map.lookup nodeId m of
591 _ <- insertNodeStory c nodeId newArchive
593 Just currentArchive -> do
594 _ <- updateNodeStory c nodeId currentArchive newArchive
597 -- 3. Now we need to set versions of all node state to be the same
598 fixNodeStoryVersion c nodeId newArchive
600 printDebug "[upsertNodeStories] STOP nId" nId
602 fixNodeStoryVersion :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
603 fixNodeStoryVersion c nodeId newArchive = do
604 let ngramsTypes = Map.keys $ newArchive ^. a_state
605 mapM_ (\nt -> runPGSExecute c query (newArchive ^. a_version, nodeId, nt)) ngramsTypes
608 query = [sql|UPDATE node_stories
611 AND ngrams_type_id = ?|]
613 writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
614 writeNodeStories c (NodeStory nls) = do
615 mapM_ (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
617 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
618 nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
619 nodeStoryInc c Nothing nId = getNodeStory c nId
620 nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
621 case Map.lookup nId nls of
623 (NodeStory nls') <- getNodeStory c nId
624 pure $ NodeStory $ Map.union nls nls'
627 nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
628 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
629 nodeStoryIncs c Nothing (ni:ns) = do
630 m <- getNodeStory c ni
631 nodeStoryIncs c (Just m) ns
632 nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
634 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
635 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
636 -- case Map.lookup ni nls of
638 -- _ <- nodeStoryRemove pool ni
641 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
642 -- _ <- nodeStoryRemove pool ni
643 -- pure $ NodeStory ns'
644 ------------------------------------
646 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
647 readNodeStoryEnv pool = do
648 mvar <- nodeStoryVar pool Nothing []
649 let saver_immediate = modifyMVar_ mvar $ \ns -> do
650 withResource pool $ \c -> do
651 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
652 writeNodeStories c ns
654 let archive_saver_immediate ns@(NodeStory nls) = withResource pool $ \c -> do
655 mapM_ (\(nId, a) -> do
656 insertNodeArchiveHistory c nId (a ^. a_version) $ reverse $ a ^. a_history
658 pure $ clearHistory ns
659 saver <- mkNodeStorySaver saver_immediate
660 -- let saver = modifyMVar_ mvar $ \mv -> do
661 -- writeNodeStories pool mv
662 -- printDebug "[readNodeStoryEnv] saver" mv
663 -- let mv' = clearHistory mv
664 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
666 pure $ NodeStoryEnv { _nse_var = mvar
668 , _nse_saver_immediate = saver_immediate
669 , _nse_archive_saver_immediate = archive_saver_immediate
670 , _nse_getter = nodeStoryVar pool (Just mvar)
673 nodeStoryVar :: Pool PGS.Connection
674 -> Maybe (MVar NodeListStory)
676 -> IO (MVar NodeListStory)
677 nodeStoryVar pool Nothing nIds = do
678 state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
680 nodeStoryVar pool (Just mv) nIds = do
681 _ <- withResource pool
682 $ \c -> modifyMVar_ mv
683 $ \nsl -> nodeStoryIncs c (Just nsl) nIds
686 -- Debounce is useful since it could delay the saving to some later
687 -- time, asynchronously and we keep operating on memory only.
688 -- mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
689 -- mkNodeStorySaver pool mvns = do
690 mkNodeStorySaver :: IO () -> IO (IO ())
691 mkNodeStorySaver saver = mkDebounce settings
693 settings = defaultDebounceSettings
694 { debounceAction = saver
696 -- -- NOTE: Lock MVar first, then use resource pool.
697 -- -- Otherwise we could wait for MVar, while
698 -- -- blocking the pool connection.
699 -- modifyMVar_ mvns $ \ns -> do
700 -- withResource pool $ \c -> do
701 -- --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
702 -- writeNodeStories c ns
703 -- pure $ clearHistory ns
704 --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
705 , debounceFreq = 1*minute
708 second = 10^(6 :: Int)
710 clearHistory :: NodeListStory -> NodeListStory
711 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
713 emptyHistory = [] :: [NgramsStatePatch']
715 currentVersion :: (HasNodeStory env err m) => ListId -> m Version
716 currentVersion listId = do
717 pool <- view connPool
718 nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
719 pure $ nls ^. unNodeStory . at listId . _Just . a_version
722 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
723 -- mkNodeStorySaver mvns = mkDebounce settings
725 -- settings = defaultDebounceSettings
726 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
727 -- , debounceFreq = 1 * minute
728 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
730 -- minute = 60 * second
731 -- second = 10^(6 :: Int)
734 -----------------------------------------
736 fixNodeStoryVersions :: (HasNodeStory env err m) => m ()
737 fixNodeStoryVersions = do
738 pool <- view connPool
739 _ <- withResource pool $ \c -> liftBase $ PGS.withTransaction c $ do
740 nIds <- runPGSQuery c [sql| SELECT id FROM nodes WHERE ? |] (PGS.Only True) :: IO [PGS.Only Int64]
741 printDebug "[fixNodeStoryVersions] nIds" nIds
742 mapM_ (\(PGS.Only nId) -> do
743 printDebug "[fixNodeStoryVersions] nId" nId
744 updateVer c TableNgrams.Authors nId
746 updateVer c TableNgrams.Institutes nId
748 updateVer c TableNgrams.Sources nId
750 updateVer c TableNgrams.NgramsTerms nId
756 maxVerQuery :: PGS.Query
757 maxVerQuery = [sql| SELECT max(version)
760 AND ngrams_type_id = ? |]
761 updateVerQuery :: PGS.Query
762 updateVerQuery = [sql| UPDATE node_stories
765 AND ngrams_type_id = ? |]
766 updateVer :: PGS.Connection -> TableNgrams.NgramsType -> Int64 -> IO ()
767 updateVer c ngramsType nId = do
768 maxVer <- runPGSQuery c maxVerQuery (nId, ngramsType) :: IO [PGS.Only (Maybe Int64)]
771 [PGS.Only Nothing] -> pure ()
772 [PGS.Only (Just maxVersion)] -> do
773 _ <- runPGSExecute c updateVerQuery (maxVersion, nId, ngramsType)
775 _ -> panic "Should get only 1 result!"