2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
13 Couple of words on how this is implemented.
15 First version used files which stored Archive for each NodeId in a
18 For performance reasons, it is rewritten to use the DB.
20 The table `node_stories` contains two columns: `node_id` and
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
29 Moreover, since in `G.A.Ngrams.commitStatePatch` we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
49 module Gargantext.Core.NodeStory
57 , HasNodeStoryImmediateSaver
58 , hasNodeStoryImmediateSaver
59 , HasNodeArchiveStoryImmediateSaver
60 , hasNodeArchiveStoryImmediateSaver
64 , initNodeListStoryMock
70 , nse_archive_saver_immediate
73 , getNodeArchiveHistory
82 , runPGSAdvisoryUnlock
83 , runPGSAdvisoryXactLock
90 , archiveStateFromList
91 , archiveStateToList )
94 -- import Debug.Trace (traceShow)
95 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
96 import Codec.Serialise.Class
97 import Control.Concurrent (MVar(), newMVar, modifyMVar_)
98 import Control.Exception (catch, throw, SomeException(..))
99 import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
100 import Control.Monad.Except
101 import Control.Monad.Reader
102 import Data.Aeson hiding ((.=), decode)
103 import Data.ByteString.Char8 (hPutStrLn)
104 import Data.Map.Strict (Map)
105 import Data.Maybe (catMaybes)
107 import Data.Pool (Pool, withResource)
108 import Data.Semigroup
109 import Database.PostgreSQL.Simple.SqlQQ (sql)
110 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
111 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
112 import GHC.Generics (Generic)
113 import Gargantext.API.Ngrams.Types
114 import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
115 import Gargantext.Core.Utils.Prefix (unPrefix)
116 import Gargantext.Database.Admin.Config (nodeTypeId)
117 import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
118 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
119 import Gargantext.Prelude
120 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
121 import System.IO (stderr)
122 import qualified Data.Map.Strict as Map
123 import qualified Data.Map.Strict.Patch as PM
124 import qualified Data.Set as Set
125 import qualified Data.Text as Text
126 import qualified Database.PostgreSQL.Simple as PGS
127 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
129 ------------------------------------------------------------------------
130 data NodeStoryEnv = NodeStoryEnv
131 { _nse_var :: !(MVar NodeListStory)
132 , _nse_saver :: !(IO ())
133 , _nse_saver_immediate :: !(IO ())
134 , _nse_archive_saver_immediate :: !(NodeListStory -> IO NodeListStory)
135 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
136 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
137 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
141 type HasNodeStory env err m = ( CmdM' env err m
144 , HasNodeStoryEnv env
146 , HasConnectionPool env
150 class (HasNodeStoryVar env, HasNodeStorySaver env)
151 => HasNodeStoryEnv env where
152 hasNodeStory :: Getter env NodeStoryEnv
154 class HasNodeStoryVar env where
155 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
157 class HasNodeStorySaver env where
158 hasNodeStorySaver :: Getter env (IO ())
160 class HasNodeStoryImmediateSaver env where
161 hasNodeStoryImmediateSaver :: Getter env (IO ())
163 class HasNodeArchiveStoryImmediateSaver env where
164 hasNodeArchiveStoryImmediateSaver :: Getter env (NodeListStory -> IO NodeListStory)
166 ------------------------------------------------------------------------
168 {- | Node Story for each NodeType where the Key of the Map is NodeId
169 TODO : generalize for any NodeType, let's start with NodeList which
170 is implemented already
172 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
173 deriving (Generic, Show)
175 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
176 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
177 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
179 data Archive s p = Archive
180 { _a_version :: !Version
183 -- first patch in the list is the most recent
184 -- We use `take` in `commitStatePatch`, that's why.
186 -- History is immutable, we just insert things on top of existing
189 -- We don't need to store the whole history in memory, this
190 -- structure holds only recent history, the one that will be
191 -- inserted to the DB.
193 deriving (Generic, Show)
195 instance (Serialise s, Serialise p) => Serialise (Archive s p)
198 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
200 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
201 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
202 instance Serialise NgramsStatePatch'
203 instance FromField (Archive NgramsState' NgramsStatePatch')
205 fromField = fromJSONField
206 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
208 defaultFromField = fromPGSFromField
210 -- | Combine `NgramsState'`. This is because the structure is (Map
211 -- NgramsType (Map ...)) and the default `(<>)` operator is
213 -- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
214 combineState :: NgramsState' -> NgramsState' -> NgramsState'
215 combineState = Map.unionWith (<>)
217 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
218 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
220 , _a_history = p' }) =
221 Archive { _a_version = v'
223 , _a_history = p' <> p }
224 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
225 mempty = Archive { _a_version = 0
228 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
229 parseJSON = genericParseJSON $ unPrefix "_a_"
230 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
231 toJSON = genericToJSON $ unPrefix "_a_"
232 toEncoding = genericToEncoding $ unPrefix "_a_"
234 ------------------------------------------------------------------------
235 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
236 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
238 initArchive :: (Monoid s, Semigroup p) => Archive s p
241 initNodeListStoryMock :: NodeListStory
242 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
245 archive = Archive { _a_version = 0
246 , _a_state = ngramsTableMap
248 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
250 [ (n ^. ne_ngrams, ngramsElementToRepo n)
251 | n <- mockTable ^. _NgramsTable
254 ------------------------------------------------------------------------
257 ------------------------------------------------------------------------
258 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
259 makeLenses ''NodeStoryEnv
260 makeLenses ''NodeStory
263 ----------------------------------------------------------------------
264 data NodeStoryPoly nid v ngtid ngid nre =
265 NodeStoryDB { node_id :: nid
267 , ngrams_type_id :: ngtid
269 , ngrams_repo_element :: nre }
272 data NodeStoryArchivePoly nid a =
273 NodeStoryArchiveDB { a_node_id :: nid
277 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
278 $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
280 -- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
281 -- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
283 -- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
284 -- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
286 type ArchiveList = Archive NgramsState' NgramsStatePatch'
290 runPGSExecute :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO Int64
291 runPGSExecute c qs a = catch (PGS.execute c qs a) printError
293 printError (SomeException e) = do
294 --q' <- PGS.formatQuery c qs a
295 _ <- panic $ Text.pack $ show e
296 throw (SomeException e)
298 runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
299 runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
301 printError (SomeException e) = do
302 --q' <- PGS.formatQuery c qs a
303 _ <- panic $ Text.pack $ show e
304 throw (SomeException e)
306 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO [r]
307 runPGSQuery c q a = catch (PGS.query c q a) printError
309 printError (SomeException e) = do
310 q' <- PGS.formatQuery c q a
312 throw (SomeException e)
314 runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
315 runPGSAdvisoryLock c id = do
316 _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
319 runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
320 runPGSAdvisoryUnlock c id = do
321 _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
324 runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
325 runPGSAdvisoryXactLock c id = do
326 _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
329 nodeExists :: PGS.Connection -> NodeId -> IO Bool
330 nodeExists c nId = (== [PGS.Only True])
331 <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
333 getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
334 getNodesIdWithType c nt = do
335 ns <- runPGSQuery c query (PGS.Only $ nodeTypeId nt)
336 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
339 query = [sql| SELECT id FROM nodes WHERE typename = ? |]
343 -- nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
345 -- Table "node_stories"
346 -- ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
347 -- , version = tableField "version"
348 -- , ngrams_type_id = tableField "ngrams_type_id"
349 -- , ngrams_id = tableField "ngrams_id"
350 -- , ngrams_repo_element = tableField "ngrams_repo_element"
353 -- nodeStoryArchiveTable :: Table NodeStoryArchiveRead NodeStoryArchiveWrite
354 -- nodeStoryArchiveTable =
355 -- Table "node_story_archive_history"
356 -- ( pNodeArchiveStory NodeStoryArchiveDB { a_node_id = tableField "node_id"
357 -- , archive = tableField "archive" } )
359 -- nodeStorySelect :: Select NodeStoryRead
360 -- nodeStorySelect = selectTable nodeStoryTable
362 -- NOTE "first patch in the _a_history list is the most recent"
363 getNodeArchiveHistory :: PGS.Connection -> NodeId -> IO [NgramsStatePatch']
364 getNodeArchiveHistory c nodeId = do
365 as <- runPGSQuery c query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
366 pure $ (\(ngramsType, terms, patch) -> fst $ PM.singleton ngramsType (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> as
369 query = [sql| SELECT ngrams_type_id, terms, patch
370 FROM node_story_archive_history
371 JOIN ngrams ON ngrams.id = ngrams_id
373 ORDER BY (version, node_story_archive_history.id) DESC |]
375 ngramsIdQuery :: PGS.Query
376 ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
379 insertNodeArchiveHistory :: PGS.Connection -> NodeId -> Version -> [NgramsStatePatch'] -> IO ()
380 insertNodeArchiveHistory _ _ _ [] = pure ()
381 insertNodeArchiveHistory c nodeId version (h:hs) = do
382 let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
384 (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
385 tuplesM <- mapM (\(nId, nType, term, patch) -> do
386 ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
387 pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
388 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
389 _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> (catMaybes tuplesM))
390 _ <- insertNodeArchiveHistory c nodeId version hs
396 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version)
397 VALUES (?, ?, ?, ?, ?) |]
399 getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
400 getNodeStory c nId@(NodeId nodeId) = do
401 --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
402 res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
403 -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
404 -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
405 let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
406 Archive { _a_version = version
408 , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
409 -- NOTE Sanity check: all versions in the DB should be the same
410 -- TODO Maybe redesign the DB so that `node_stories` has only
411 -- `node_id`, `version` and there is a M2M table
412 -- `node_stories_ngrams` without the `version` colum? Then we would
413 -- have `version` in only one place.
416 let versionsS = Set.fromList $ map (\a -> a ^. a_version) dbData
417 if Set.size versionsS > 1 then
418 panic $ Text.pack $ "[getNodeStory] versions for " <> show nodeId <> " differ! " <> show versionsS
423 pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
425 -- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
426 combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
427 & a_version .~ (a2 ^. a_version) -- version should be updated from list, not taken from the empty Archive
429 nodeStoriesQuery :: PGS.Query
430 nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
432 JOIN ngrams ON ngrams.id = ngrams_id
435 type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
436 type ArchiveStateSet = Set.Set (TableNgrams.NgramsType, NgramsTerm)
438 -- |Functions to convert archive state (which is a `Map NgramsType
439 -- (Map NgramsTerm NgramsRepoElement`)) to/from a flat list
440 archiveStateToList :: NgramsState' -> ArchiveStateList
441 archiveStateToList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
443 archiveStateFromList :: ArchiveStateList -> NgramsState'
444 archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
446 archiveStateSet :: ArchiveStateList -> ArchiveStateSet
447 archiveStateSet lst = Set.fromList $ (\(nt, term, _) -> (nt, term)) <$> lst
449 archiveStateListFilterFromSet :: ArchiveStateSet -> ArchiveStateList -> ArchiveStateList
450 archiveStateListFilterFromSet set =
451 filter (\(nt, term, _) -> Set.member (nt, term) set)
453 -- | This function inserts whole new node story and archive for given node_id.
454 insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
455 insertNodeStory c (NodeId nId) a = do
456 _ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
457 termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
458 case headMay termIdM of
460 Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateToList $ a ^. a_state
461 -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateToList _a_state
466 query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
467 VALUES (?, ?, ?, ?) |]
468 -- insert ngramsType ngrams ngramsRepoElement =
469 -- Insert { iTable = nodeStoryTable
470 -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
471 -- , version = sqlInt4 _a_version
472 -- , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
474 -- , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
476 -- , iReturning = rCount
477 -- , iOnConflict = Nothing }
479 insertArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
480 insertArchiveStateList c nodeId version as = do
481 _ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, version, nt, nre, n)) as
485 query = [sql| WITH s as (SELECT ? as sid, ? sversion, ? sngrams_type_id, ngrams.id as sngrams_id, ?::jsonb as srepo FROM ngrams WHERE terms = ?)
486 INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
487 SELECT s.sid, s.sversion, s.sngrams_type_id, s.sngrams_id, s.srepo from s s join nodes n on s.sid = n.id
490 deleteArchiveStateList :: PGS.Connection -> NodeId -> ArchiveStateList -> IO ()
491 deleteArchiveStateList c nodeId as = do
492 _ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) as
496 query = [sql| DELETE FROM node_stories
497 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
499 updateArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
500 updateArchiveStateList c nodeId version as = do
501 let params = (\(nt, n, nre) -> (nre, version, nodeId, nt, n)) <$> as
502 --q <- PGS.format c query params
503 --printDebug "[updateArchiveList] query" q
504 _ <- mapM (\p -> runPGSExecute c query p) params
508 query = [sql| UPDATE node_stories
509 SET ngrams_repo_element = ?, version = ?
510 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
512 -- | This function updates the node story and archive for given node_id.
513 updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
514 updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
517 -- 0. We assume we're inside an advisory lock
519 -- 1. Find differences (inserts/updates/deletes)
520 let currentList = archiveStateToList $ currentArchive ^. a_state
521 let newList = archiveStateToList $ newArchive ^. a_state
522 let currentSet = archiveStateSet currentList
523 let newSet = archiveStateSet newList
525 printDebug "[updateNodeStory] new - current = " $ Set.difference newSet currentSet
526 let inserts = archiveStateListFilterFromSet (Set.difference newSet currentSet) newList
527 -- printDebug "[updateNodeStory] inserts" inserts
529 printDebug "[updateNodeStory] current - new" $ Set.difference currentSet newSet
530 let deletes = archiveStateListFilterFromSet (Set.difference currentSet newSet) currentList
531 -- printDebug "[updateNodeStory] deletes" deletes
533 -- updates are the things that are in new but not in current
534 let commonSet = Set.intersection currentSet newSet
535 let commonNewList = archiveStateListFilterFromSet commonSet newList
536 let commonCurrentList = archiveStateListFilterFromSet commonSet currentList
537 let updates = Set.toList $ Set.difference (Set.fromList commonNewList) (Set.fromList commonCurrentList)
538 printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
540 -- 2. Perform inserts/deletes/updates
541 --printDebug "[updateNodeStory] applying insert" ()
542 insertArchiveStateList c nodeId (newArchive ^. a_version) inserts
543 --printDebug "[updateNodeStory] insert applied" ()
544 --TODO Use currentArchive ^. a_version in delete and report error
545 -- if entries with (node_id, ngrams_type_id, ngrams_id) but
546 -- different version are found.
547 deleteArchiveStateList c nodeId deletes
548 --printDebug "[updateNodeStory] delete applied" ()
549 updateArchiveStateList c nodeId (newArchive ^. a_version) updates
550 --printDebug "[updateNodeStory] update applied" ()
554 -- update = Update { uTable = nodeStoryTable
555 -- , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
556 -- NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
559 -- , uWhere = (\row -> node_id row .== sqlInt4 nId)
560 -- , uReturning = rCount }
562 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
563 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
565 -- delete = Delete { dTable = nodeStoryTable
566 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
567 -- , dReturning = rCount }
569 upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
570 upsertNodeStories c nodeId@(NodeId nId) newArchive = do
571 printDebug "[upsertNodeStories] START nId" nId
572 PGS.withTransaction c $ do
573 printDebug "[upsertNodeStories] locking nId" nId
574 runPGSAdvisoryXactLock c nId
576 (NodeStory m) <- getNodeStory c nodeId
577 case Map.lookup nodeId m of
579 _ <- insertNodeStory c nodeId newArchive
581 Just currentArchive -> do
582 _ <- updateNodeStory c nodeId currentArchive newArchive
585 -- 3. Now we need to set versions of all node state to be the same
586 fixNodeStoryVersion c nodeId newArchive
588 printDebug "[upsertNodeStories] STOP nId" nId
590 fixNodeStoryVersion :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
591 fixNodeStoryVersion c nodeId newArchive = do
592 let ngramsTypes = Map.keys $ newArchive ^. a_state
593 _ <- mapM_ (\nt -> runPGSExecute c query (newArchive ^. a_version, nodeId, nt)) ngramsTypes
597 query = [sql|UPDATE node_stories
600 AND ngrams_type_id = ?|]
602 writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
603 writeNodeStories c (NodeStory nls) = do
604 _ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
607 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
608 nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
609 nodeStoryInc c Nothing nId = getNodeStory c nId
610 nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
611 case Map.lookup nId nls of
613 (NodeStory nls') <- getNodeStory c nId
614 pure $ NodeStory $ Map.union nls nls'
617 nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
618 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
619 nodeStoryIncs c Nothing (ni:ns) = do
620 m <- getNodeStory c ni
621 nodeStoryIncs c (Just m) ns
622 nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
624 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
625 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
626 -- case Map.lookup ni nls of
628 -- _ <- nodeStoryRemove pool ni
631 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
632 -- _ <- nodeStoryRemove pool ni
633 -- pure $ NodeStory ns'
634 ------------------------------------
636 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
637 readNodeStoryEnv pool = do
638 mvar <- nodeStoryVar pool Nothing []
639 let saver_immediate = modifyMVar_ mvar $ \ns -> do
640 withResource pool $ \c -> do
641 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
642 writeNodeStories c ns
644 let archive_saver_immediate ns@(NodeStory nls) = withResource pool $ \c -> do
645 _ <- mapM (\(nId, a) -> do
646 insertNodeArchiveHistory c nId (a ^. a_version) $ reverse $ a ^. a_history
648 pure $ clearHistory ns
649 saver <- mkNodeStorySaver saver_immediate
650 -- let saver = modifyMVar_ mvar $ \mv -> do
651 -- writeNodeStories pool mv
652 -- printDebug "[readNodeStoryEnv] saver" mv
653 -- let mv' = clearHistory mv
654 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
656 pure $ NodeStoryEnv { _nse_var = mvar
658 , _nse_saver_immediate = saver_immediate
659 , _nse_archive_saver_immediate = archive_saver_immediate
660 , _nse_getter = nodeStoryVar pool (Just mvar)
663 nodeStoryVar :: Pool PGS.Connection
664 -> Maybe (MVar NodeListStory)
666 -> IO (MVar NodeListStory)
667 nodeStoryVar pool Nothing nIds = do
668 state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
670 nodeStoryVar pool (Just mv) nIds = do
671 _ <- withResource pool
672 $ \c -> modifyMVar_ mv
673 $ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
676 -- Debounce is useful since it could delay the saving to some later
677 -- time, asynchronously and we keep operating on memory only.
678 -- mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
679 -- mkNodeStorySaver pool mvns = do
680 mkNodeStorySaver :: IO () -> IO (IO ())
681 mkNodeStorySaver saver = mkDebounce settings
683 settings = defaultDebounceSettings
684 { debounceAction = saver
686 -- -- NOTE: Lock MVar first, then use resource pool.
687 -- -- Otherwise we could wait for MVar, while
688 -- -- blocking the pool connection.
689 -- modifyMVar_ mvns $ \ns -> do
690 -- withResource pool $ \c -> do
691 -- --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
692 -- writeNodeStories c ns
693 -- pure $ clearHistory ns
694 --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
695 , debounceFreq = 1*minute
698 second = 10^(6 :: Int)
700 clearHistory :: NodeListStory -> NodeListStory
701 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
703 emptyHistory = [] :: [NgramsStatePatch']
705 currentVersion :: (HasNodeStory env err m) => ListId -> m Version
706 currentVersion listId = do
707 pool <- view connPool
708 nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
709 pure $ nls ^. unNodeStory . at listId . _Just . a_version
712 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
713 -- mkNodeStorySaver mvns = mkDebounce settings
715 -- settings = defaultDebounceSettings
716 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
717 -- , debounceFreq = 1 * minute
718 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
720 -- minute = 60 * second
721 -- second = 10^(6 :: Int)
724 -----------------------------------------