]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
[nodeStory] fix runPGSExecuteMany calls, remove debugging prints
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 Couple of words on how this is implemented.
14
15 First version used files which stored Archive for each NodeId in a
16 separate .cbor file.
17
18 For performance reasons, it is rewritten to use the DB.
19
20 The table `node_stories` contains two columns: `node_id` and
21 `archive`.
22
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
28
29 Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
34
35 Please note that
36
37 TODO:
38 - remove
39 - filter
40 - charger les listes
41 -}
42
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
48
49 module Gargantext.Core.NodeStory
50 ( HasNodeStory
51 , HasNodeStoryEnv
52 , hasNodeStory
53 , HasNodeStoryVar
54 , hasNodeStoryVar
55 , HasNodeStorySaver
56 , hasNodeStorySaver
57 , NodeStory(..)
58 , NgramsStatePatch'
59 , NodeListStory
60 , initNodeListStoryMock
61 , NodeStoryEnv(..)
62 , initNodeStory
63 , nse_getter
64 , nse_saver
65 , nse_var
66 , unNodeStory
67 , getNodeArchiveHistory
68 , Archive(..)
69 , initArchive
70 , insertArchiveList
71 , deleteArchiveList
72 , updateArchiveList
73 , a_history
74 , a_state
75 , a_version
76 , nodeExists
77 , runPGSQuery
78 , runPGSAdvisoryLock
79 , runPGSAdvisoryUnlock
80 , runPGSAdvisoryXactLock
81 , getNodesIdWithType
82 , readNodeStoryEnv
83 , upsertNodeStories
84 , getNodeStory
85 , nodeStoriesQuery
86 , currentVersion )
87 where
88
89 -- import Debug.Trace (traceShow)
90 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
91 import Codec.Serialise.Class
92 import Control.Concurrent (MVar(), newMVar, modifyMVar_)
93 import Control.Exception (catch, throw, SomeException(..))
94 import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
95 import Control.Monad.Except
96 import Control.Monad.Reader
97 import Data.Aeson hiding ((.=), decode)
98 import Data.ByteString.Char8 (hPutStrLn)
99 import Data.Map.Strict (Map)
100 import Data.Maybe (catMaybes)
101 import Data.Monoid
102 import Data.Pool (Pool, withResource)
103 import Data.Semigroup
104 import Database.PostgreSQL.Simple.SqlQQ (sql)
105 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
106 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
107 import GHC.Generics (Generic)
108 import Gargantext.API.Ngrams.Types
109 import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
110 import Gargantext.Core.Utils.Prefix (unPrefix)
111 import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
112 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
113 import Gargantext.Prelude
114 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
115 import System.IO (stderr)
116 import qualified Data.Map.Strict as Map
117 import qualified Data.Map.Strict.Patch as PM
118 import qualified Data.Set as Set
119 import qualified Database.PostgreSQL.Simple as PGS
120 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
121
122 ------------------------------------------------------------------------
123 data NodeStoryEnv = NodeStoryEnv
124 { _nse_var :: !(MVar NodeListStory)
125 , _nse_saver :: !(IO ())
126 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
127 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
128 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
129 }
130 deriving (Generic)
131
132 type HasNodeStory env err m = ( CmdM' env err m
133 , MonadReader env m
134 , MonadError err m
135 , HasNodeStoryEnv env
136 , HasConfig env
137 , HasConnectionPool env
138 , HasNodeError err
139 )
140
141 class (HasNodeStoryVar env, HasNodeStorySaver env)
142 => HasNodeStoryEnv env where
143 hasNodeStory :: Getter env NodeStoryEnv
144
145 class HasNodeStoryVar env where
146 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
147
148 class HasNodeStorySaver env where
149 hasNodeStorySaver :: Getter env (IO ())
150
151 ------------------------------------------------------------------------
152
153 {- | Node Story for each NodeType where the Key of the Map is NodeId
154 TODO : generalize for any NodeType, let's start with NodeList which
155 is implemented already
156 -}
157 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
158 deriving (Generic, Show)
159
160 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
161 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
162 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
163
164 data Archive s p = Archive
165 { _a_version :: !Version
166 , _a_state :: !s
167 , _a_history :: ![p]
168 -- first patch in the list is the most recent
169 -- We use `take` in `commitStatePatch`, that's why.
170
171 -- History is immutable, we just insert things on top of existing
172 -- list.
173
174 -- We don't need to store the whole history in memory, this
175 -- structure holds only recent history, the one that will be
176 -- inserted to the DB.
177 }
178 deriving (Generic, Show)
179
180 instance (Serialise s, Serialise p) => Serialise (Archive s p)
181
182
183 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
184
185 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
186 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
187 instance Serialise NgramsStatePatch'
188 instance FromField (Archive NgramsState' NgramsStatePatch')
189 where
190 fromField = fromJSONField
191 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
192 where
193 defaultFromField = fromPGSFromField
194
195 -- | Combine `NgramsState'`. This is because the structure is (Map
196 -- NgramsType (Map ...)) and the default `(<>)` operator is
197 -- left-biased
198 -- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
199 combineState :: NgramsState' -> NgramsState' -> NgramsState'
200 combineState = Map.unionWith (<>)
201
202 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
203 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
204 , _a_state = s'
205 , _a_history = p' }) =
206 Archive { _a_version = v'
207 , _a_state = s'
208 , _a_history = p' <> p }
209 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
210 mempty = Archive { _a_version = 0
211 , _a_state = mempty
212 , _a_history = [] }
213 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
214 parseJSON = genericParseJSON $ unPrefix "_a_"
215 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
216 toJSON = genericToJSON $ unPrefix "_a_"
217 toEncoding = genericToEncoding $ unPrefix "_a_"
218
219 ------------------------------------------------------------------------
220 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
221 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
222
223 initArchive :: (Monoid s, Semigroup p) => Archive s p
224 initArchive = mempty
225
226 initNodeListStoryMock :: NodeListStory
227 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
228 where
229 nodeListId = 0
230 archive = Archive { _a_version = 0
231 , _a_state = ngramsTableMap
232 , _a_history = [] }
233 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
234 $ Map.fromList
235 [ (n ^. ne_ngrams, ngramsElementToRepo n)
236 | n <- mockTable ^. _NgramsTable
237 ]
238
239 ------------------------------------------------------------------------
240
241
242 ------------------------------------------------------------------------
243 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
244 makeLenses ''NodeStoryEnv
245 makeLenses ''NodeStory
246 makeLenses ''Archive
247
248 -----------------------------------------
249
250
251 data NodeStoryPoly nid v ngtid ngid nre =
252 NodeStoryDB { node_id :: nid
253 , version :: v
254 , ngrams_type_id :: ngtid
255 , ngrams_id :: ngid
256 , ngrams_repo_element :: nre }
257 deriving (Eq)
258
259 data NodeStoryArchivePoly nid a =
260 NodeStoryArchiveDB { a_node_id :: nid
261 , archive :: a }
262 deriving (Eq)
263
264 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
265 $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
266
267 -- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
268 -- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
269
270 -- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
271 -- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
272
273 type ArchiveList = Archive NgramsState' NgramsStatePatch'
274
275 -- DB stuff
276
277 runPGSExecute :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO Int64
278 runPGSExecute c qs a = catch (PGS.execute c qs a) printError
279 where
280 printError (SomeException e) = do
281 --q' <- PGS.formatQuery c qs a
282 --hPutStrLn stderr q'
283 throw (SomeException e)
284
285 runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
286 runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
287 where
288 printError (SomeException e) = do
289 --q' <- PGS.formatQuery c qs a
290 --hPutStrLn stderr q'
291 throw (SomeException e)
292
293 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO [r]
294 runPGSQuery c q a = catch (PGS.query c q a) printError
295 where
296 printError (SomeException e) = do
297 q' <- PGS.formatQuery c q a
298 hPutStrLn stderr q'
299 throw (SomeException e)
300
301 runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
302 runPGSAdvisoryLock c id = do
303 _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
304 pure ()
305
306 runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
307 runPGSAdvisoryUnlock c id = do
308 _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
309 pure ()
310
311 runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
312 runPGSAdvisoryXactLock c id = do
313 _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
314 pure ()
315
316 nodeExists :: PGS.Connection -> NodeId -> IO Bool
317 nodeExists c nId = (== [PGS.Only True])
318 <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
319
320 getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
321 getNodesIdWithType c nt = do
322 ns <- runPGSQuery c query (PGS.Only nt)
323 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
324 where
325 query :: PGS.Query
326 query = [sql| SELECT id FROM nodes WHERE typename = ? |]
327
328
329
330 -- nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
331 -- nodeStoryTable =
332 -- Table "node_stories"
333 -- ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
334 -- , version = tableField "version"
335 -- , ngrams_type_id = tableField "ngrams_type_id"
336 -- , ngrams_id = tableField "ngrams_id"
337 -- , ngrams_repo_element = tableField "ngrams_repo_element"
338 -- } )
339
340 -- nodeStoryArchiveTable :: Table NodeStoryArchiveRead NodeStoryArchiveWrite
341 -- nodeStoryArchiveTable =
342 -- Table "node_story_archive_history"
343 -- ( pNodeArchiveStory NodeStoryArchiveDB { a_node_id = tableField "node_id"
344 -- , archive = tableField "archive" } )
345
346 -- nodeStorySelect :: Select NodeStoryRead
347 -- nodeStorySelect = selectTable nodeStoryTable
348
349 -- TODO Check ordering, "first patch in the _a_history list is the most recent"
350 getNodeArchiveHistory :: PGS.Connection -> NodeId -> IO [NgramsStatePatch']
351 getNodeArchiveHistory c nodeId = do
352 as <- runPGSQuery c query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
353 pure $ (\(ngramsType, terms, patch) -> fst $ PM.singleton ngramsType (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> as
354 where
355 query :: PGS.Query
356 query = [sql| SELECT ngrams_type_id, terms, patch
357 FROM node_story_archive_history
358 JOIN ngrams ON ngrams.id = ngrams_id
359 WHERE node_id = ? |]
360
361 ngramsIdQuery :: PGS.Query
362 ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
363
364
365 insertNodeArchiveHistory :: PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
366 insertNodeArchiveHistory _ _ [] = pure ()
367 insertNodeArchiveHistory c nodeId (h:hs) = do
368 let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
369 (\(term, p) ->
370 (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
371 tuplesM <- mapM (\(nId, nType, term, patch) -> do
372 ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
373 pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
374 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
375 _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch)) <$> (catMaybes tuplesM))
376 _ <- insertNodeArchiveHistory c nodeId hs
377 pure ()
378 where
379
380 query :: PGS.Query
381 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch) VALUES (?, ?, ?, ?) |]
382
383 getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
384 getNodeStory c nId@(NodeId nodeId) = do
385 --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
386 res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
387 -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
388 -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
389 let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
390 Archive { _a_version = version
391 , _a_history = []
392 , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
393 -- NOTE When concatenating, check that the same version is for all states
394 pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
395 --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
396 where
397 -- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
398 combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
399 & a_version .~ (a2 ^. a_version) -- version should be updated from list, not taken from the empty Archive
400
401 nodeStoriesQuery :: PGS.Query
402 nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
403 FROM node_stories
404 JOIN ngrams ON ngrams.id = ngrams_id
405 WHERE node_id = ? |]
406
407 type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
408
409 -- Functions to convert archive state (which is a Map NgramsType (Map
410 -- NgramsTerm NgramsRepoElement)) to/from a flat list
411 archiveStateAsList :: NgramsState' -> ArchiveStateList
412 archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
413
414 archiveStateFromList :: ArchiveStateList -> NgramsState'
415 archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
416
417 -- | This function inserts whole new node story and archive for given node_id.
418 insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
419 insertNodeStory c (NodeId nId) a = do
420 _ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
421 termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
422 case headMay termIdM of
423 Nothing -> pure 0
424 Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
425 -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
426
427 pure ()
428 where
429 query :: PGS.Query
430 query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) VALUES (?, ?, ?, ?) |]
431 -- insert ngramsType ngrams ngramsRepoElement =
432 -- Insert { iTable = nodeStoryTable
433 -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
434 -- , version = sqlInt4 _a_version
435 -- , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
436 -- , ngrams_id = ...
437 -- , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
438 -- }]
439 -- , iReturning = rCount
440 -- , iOnConflict = Nothing }
441
442 insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
443 insertArchiveList c nodeId a = do
444 _ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, a ^. a_version, nt, nre, n)) (archiveStateAsList $ a ^. a_state)
445 --_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
446 pure ()
447 where
448 query :: PGS.Query
449 query = [sql| INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
450 SELECT ?, ?, ?, ngrams.id, ? FROM ngrams WHERE terms = ? |]
451
452 deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
453 deleteArchiveList c nodeId a = do
454 _ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) (archiveStateAsList $ a ^. a_state)
455 --_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
456 pure ()
457 where
458 query :: PGS.Query
459 query = [sql| WITH (SELECT id FROM ngrams WHERE terms = ?) AS ngrams
460 DELETE FROM node_stories
461 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
462
463 updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
464 updateArchiveList c nodeId a = do
465 let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
466 --q <- PGS.format c query params
467 --printDebug "[updateArchiveList] query" q
468 _ <- mapM (\p -> runPGSExecute c query p) params
469 pure ()
470 where
471 query :: PGS.Query
472 query = [sql| UPDATE node_stories
473 SET ngrams_repo_element = ?
474 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
475
476 -- | This function updates the node story and archive for given node_id.
477 updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
478 updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
479 -- STEPS
480
481 -- 0. We assume we're inside an advisory lock
482
483 -- 1. Find differences (inserts/updates/deletes)
484 let currentList = archiveStateAsList $ currentArchive ^. a_state
485 let newList = archiveStateAsList $ newArchive ^. a_state
486 let currentSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> currentList
487 let newSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> newList
488
489 let inserts = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference newSet currentSet) newList
490 --printDebug "[updateNodeStory] inserts" inserts
491 let deletes = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference currentSet newSet) currentList
492 --printDebug "[updateNodeStory] deletes" deletes
493
494 -- updates are the things that are in new but not in current
495 let updates = Set.toList $ Set.difference (Set.fromList newList) (Set.fromList currentList)
496 --printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
497
498 -- 2. Perform inserts/deletes/updates
499 printDebug "[updateNodeStory] applying insert" ()
500 insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
501 , _a_history = []
502 , _a_state = archiveStateFromList inserts }
503 printDebug "[updateNodeStory] insert applied" ()
504 --TODO Use currentArchive ^. a_version in delete and report error
505 -- if entries with (node_id, ngrams_type_id, ngrams_id) but
506 -- different version are found.
507 deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
508 , _a_history = []
509 , _a_state = archiveStateFromList deletes }
510 printDebug "[updateNodeStory] delete applied" ()
511 updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
512 , _a_history = []
513 , _a_state = archiveStateFromList updates }
514 printDebug "[updateNodeStory] update applied" ()
515
516 pure ()
517 -- where
518 -- update = Update { uTable = nodeStoryTable
519 -- , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
520 -- NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
521 -- , ..}
522 -- , .. })
523 -- , uWhere = (\row -> node_id row .== sqlInt4 nId)
524 -- , uReturning = rCount }
525
526 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
527 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
528 -- where
529 -- delete = Delete { dTable = nodeStoryTable
530 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
531 -- , dReturning = rCount }
532
533 upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
534 upsertNodeStories c nodeId@(NodeId nId) newArchive = do
535 printDebug "[upsertNodeStories] START nId" nId
536 PGS.withTransaction c $ do
537 printDebug "[upsertNodeStories] locking nId" nId
538 runPGSAdvisoryXactLock c nId
539
540 -- whether it's insert or update, we can insert node archive history already
541 -- NOTE: It is assumed that the most recent change is the first in the
542 -- list, so we save these in reverse order
543 insertNodeArchiveHistory c nodeId $ reverse $ newArchive ^. a_history
544
545 (NodeStory m) <- getNodeStory c nodeId
546 case Map.lookup nodeId m of
547 Nothing -> do
548 _ <- insertNodeStory c nodeId newArchive
549 pure ()
550 Just currentArchive -> do
551 _ <- updateNodeStory c nodeId currentArchive newArchive
552 pure ()
553
554 printDebug "[upsertNodeStories] STOP nId" nId
555
556 writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
557 writeNodeStories c (NodeStory nls) = do
558 _ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
559 pure ()
560
561 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
562 nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
563 nodeStoryInc c Nothing nId = getNodeStory c nId
564 nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
565 case Map.lookup nId nls of
566 Nothing -> do
567 (NodeStory nls') <- getNodeStory c nId
568 pure $ NodeStory $ Map.union nls nls'
569 Just _ -> pure ns
570
571 nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
572 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
573 nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
574 nodeStoryIncs c Nothing (ni:ns) = do
575 m <- getNodeStory c ni
576 nodeStoryIncs c (Just m) ns
577
578 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
579 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
580 -- case Map.lookup ni nls of
581 -- Nothing -> do
582 -- _ <- nodeStoryRemove pool ni
583 -- pure ns
584 -- Just _ -> do
585 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
586 -- _ <- nodeStoryRemove pool ni
587 -- pure $ NodeStory ns'
588 ------------------------------------
589
590 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
591 readNodeStoryEnv pool = do
592 mvar <- nodeStoryVar pool Nothing []
593 saver <- mkNodeStorySaver pool mvar
594 -- let saver = modifyMVar_ mvar $ \mv -> do
595 -- writeNodeStories pool mv
596 -- printDebug "[readNodeStoryEnv] saver" mv
597 -- let mv' = clearHistory mv
598 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
599 -- return mv'
600 pure $ NodeStoryEnv { _nse_var = mvar
601 , _nse_saver = saver
602 , _nse_getter = nodeStoryVar pool (Just mvar) }
603
604 nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
605 nodeStoryVar pool Nothing nIds = do
606 state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
607 newMVar state
608 nodeStoryVar pool (Just mv) nIds = do
609 _ <- withResource pool $ \c -> modifyMVar_ mv $ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
610 pure mv
611
612 -- Debounce is useful since it could delay the saving to some later
613 -- time, asynchronously and we keep operating on memory only.
614 mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
615 mkNodeStorySaver pool mvns = mkDebounce settings
616 where
617 settings = defaultDebounceSettings
618 { debounceAction = do
619 -- NOTE: Lock MVar first, then use resource pool.
620 -- Otherwise we could wait for MVar, while
621 -- blocking the pool connection.
622 modifyMVar_ mvns $ \ns -> do
623 withResource pool $ \c -> do
624 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
625 writeNodeStories c ns
626 pure $ clearHistory ns
627 --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
628 , debounceFreq = 1*minute
629 }
630 minute = 60*second
631 second = 10^(6 :: Int)
632
633 clearHistory :: NodeListStory -> NodeListStory
634 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
635 where
636 emptyHistory = [] :: [NgramsStatePatch']
637
638 currentVersion :: (HasNodeStory env err m) => ListId -> m Version
639 currentVersion listId = do
640 pool <- view connPool
641 nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
642 pure $ nls ^. unNodeStory . at listId . _Just . a_version
643
644
645 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
646 -- mkNodeStorySaver mvns = mkDebounce settings
647 -- where
648 -- settings = defaultDebounceSettings
649 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
650 -- , debounceFreq = 1 * minute
651 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
652 -- }
653 -- minute = 60 * second
654 -- second = 10^(6 :: Int)
655
656
657 -----------------------------------------