]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
Merge branch 'dev-merge' into dev
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 Couple of words on how this is implemented.
14
15 First version used files which stored Archive for each NodeId in a
16 separate .cbor file.
17
18 For performance reasons, it is rewritten to use the DB.
19
20 The table `node_stories` contains two columns: `node_id` and
21 `archive`.
22
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
28
29 Moreover, since in `G.A.Ngrams.commitStatePatch` we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
34
35 Please note that
36
37 TODO:
38 - remove
39 - filter
40 - charger les listes
41 -}
42
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
48
49 module Gargantext.Core.NodeStory
50 ( HasNodeStory
51 , HasNodeStoryEnv
52 , hasNodeStory
53 , HasNodeStoryVar
54 , hasNodeStoryVar
55 , HasNodeStorySaver
56 , hasNodeStorySaver
57 , HasNodeStoryImmediateSaver
58 , hasNodeStoryImmediateSaver
59 , HasNodeArchiveStoryImmediateSaver
60 , hasNodeArchiveStoryImmediateSaver
61 , NodeStory(..)
62 , NgramsStatePatch'
63 , NodeListStory
64 , initNodeListStoryMock
65 , NodeStoryEnv(..)
66 , initNodeStory
67 , nse_getter
68 , nse_saver
69 , nse_saver_immediate
70 , nse_archive_saver_immediate
71 , nse_var
72 , unNodeStory
73 , getNodeArchiveHistory
74 , Archive(..)
75 , initArchive
76 , a_history
77 , a_state
78 , a_version
79 , nodeExists
80 , runPGSQuery
81 , runPGSAdvisoryLock
82 , runPGSAdvisoryUnlock
83 , runPGSAdvisoryXactLock
84 , getNodesIdWithType
85 , readNodeStoryEnv
86 , upsertNodeStories
87 , getNodeStory
88 , nodeStoriesQuery
89 , currentVersion
90 , archiveStateFromList
91 , archiveStateToList
92 , fixNodeStoryVersions )
93 where
94
95 -- import Debug.Trace (traceShow)
96 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
97 import Codec.Serialise.Class
98 import Control.Concurrent (MVar(), newMVar, modifyMVar_)
99 import Control.Exception (catch, throw, SomeException(..))
100 import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
101 import Control.Monad.Except
102 import Control.Monad.Reader
103 import Data.Aeson hiding ((.=), decode)
104 import Data.ByteString.Char8 (hPutStrLn)
105 import Data.Map.Strict (Map)
106 import Data.Maybe (catMaybes)
107 import Data.Monoid
108 import Data.Pool (Pool, withResource)
109 import Data.Semigroup
110 import Database.PostgreSQL.Simple.SqlQQ (sql)
111 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
112 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
113 import GHC.Generics (Generic)
114 import Gargantext.API.Ngrams.Types
115 import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
116 import Gargantext.Core.Utils.Prefix (unPrefix)
117 import Gargantext.Database.Admin.Config (nodeTypeId)
118 import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
119 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
120 import Gargantext.Prelude
121 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
122 import System.IO (stderr)
123 import qualified Data.Map.Strict as Map
124 import qualified Data.Map.Strict.Patch as PM
125 import qualified Data.Set as Set
126 import qualified Data.Text as Text
127 import qualified Database.PostgreSQL.Simple as PGS
128 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
129
130 ------------------------------------------------------------------------
131 data NodeStoryEnv = NodeStoryEnv
132 { _nse_var :: !(MVar NodeListStory)
133 , _nse_saver :: !(IO ())
134 , _nse_saver_immediate :: !(IO ())
135 , _nse_archive_saver_immediate :: !(NodeListStory -> IO NodeListStory)
136 , _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
137 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
138 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
139 }
140 deriving (Generic)
141
142 type HasNodeStory env err m = ( CmdM' env err m
143 , MonadReader env m
144 , MonadError err m
145 , HasNodeStoryEnv env
146 , HasConfig env
147 , HasConnectionPool env
148 , HasNodeError err
149 )
150
151 class (HasNodeStoryVar env, HasNodeStorySaver env)
152 => HasNodeStoryEnv env where
153 hasNodeStory :: Getter env NodeStoryEnv
154
155 class HasNodeStoryVar env where
156 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
157
158 class HasNodeStorySaver env where
159 hasNodeStorySaver :: Getter env (IO ())
160
161 class HasNodeStoryImmediateSaver env where
162 hasNodeStoryImmediateSaver :: Getter env (IO ())
163
164 class HasNodeArchiveStoryImmediateSaver env where
165 hasNodeArchiveStoryImmediateSaver :: Getter env (NodeListStory -> IO NodeListStory)
166
167 ------------------------------------------------------------------------
168
169 {- | Node Story for each NodeType where the Key of the Map is NodeId
170 TODO : generalize for any NodeType, let's start with NodeList which
171 is implemented already
172 -}
173 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
174 deriving (Generic, Show)
175
176 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
177 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
178 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
179
180 data Archive s p = Archive
181 { _a_version :: !Version
182 , _a_state :: !s
183 , _a_history :: ![p]
184 -- first patch in the list is the most recent
185 -- We use `take` in `commitStatePatch`, that's why.
186
187 -- History is immutable, we just insert things on top of existing
188 -- list.
189
190 -- We don't need to store the whole history in memory, this
191 -- structure holds only recent history, the one that will be
192 -- inserted to the DB.
193 }
194 deriving (Generic, Show)
195
196 instance (Serialise s, Serialise p) => Serialise (Archive s p)
197
198
199 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
200
201 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
202 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
203 instance Serialise NgramsStatePatch'
204 instance FromField (Archive NgramsState' NgramsStatePatch')
205 where
206 fromField = fromJSONField
207 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
208 where
209 defaultFromField = fromPGSFromField
210
211 -- | Combine `NgramsState'`. This is because the structure is (Map
212 -- NgramsType (Map ...)) and the default `(<>)` operator is
213 -- left-biased
214 -- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
215 combineState :: NgramsState' -> NgramsState' -> NgramsState'
216 combineState = Map.unionWith (<>)
217
218 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
219 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
220 , _a_state = s'
221 , _a_history = p' }) =
222 Archive { _a_version = v'
223 , _a_state = s'
224 , _a_history = p' <> p }
225 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
226 mempty = Archive { _a_version = 0
227 , _a_state = mempty
228 , _a_history = [] }
229 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
230 parseJSON = genericParseJSON $ unPrefix "_a_"
231 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
232 toJSON = genericToJSON $ unPrefix "_a_"
233 toEncoding = genericToEncoding $ unPrefix "_a_"
234
235 ------------------------------------------------------------------------
236 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
237 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
238
239 initArchive :: (Monoid s, Semigroup p) => Archive s p
240 initArchive = mempty
241
242 initNodeListStoryMock :: NodeListStory
243 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
244 where
245 nodeListId = 0
246 archive = Archive { _a_version = 0
247 , _a_state = ngramsTableMap
248 , _a_history = [] }
249 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
250 $ Map.fromList
251 [ (n ^. ne_ngrams, ngramsElementToRepo n)
252 | n <- mockTable ^. _NgramsTable
253 ]
254
255 ------------------------------------------------------------------------
256
257
258 ------------------------------------------------------------------------
259 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
260 makeLenses ''NodeStoryEnv
261 makeLenses ''NodeStory
262 makeLenses ''Archive
263
264 ----------------------------------------------------------------------
265 data NodeStoryPoly nid v ngtid ngid nre =
266 NodeStoryDB { node_id :: nid
267 , version :: v
268 , ngrams_type_id :: ngtid
269 , ngrams_id :: ngid
270 , ngrams_repo_element :: nre }
271 deriving (Eq)
272
273 data NodeStoryArchivePoly nid a =
274 NodeStoryArchiveDB { a_node_id :: nid
275 , archive :: a }
276 deriving (Eq)
277
278 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
279 $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
280
281 -- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
282 -- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
283
284 -- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
285 -- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
286
287 type ArchiveList = Archive NgramsState' NgramsStatePatch'
288
289 -- DB stuff
290
291 runPGSExecute :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO Int64
292 runPGSExecute c qs a = catch (PGS.execute c qs a) printError
293 where
294 printError (SomeException e) = do
295 --q' <- PGS.formatQuery c qs a
296 _ <- panic $ Text.pack $ show e
297 throw (SomeException e)
298
299 runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
300 runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
301 where
302 printError (SomeException e) = do
303 --q' <- PGS.formatQuery c qs a
304 _ <- panic $ Text.pack $ show e
305 throw (SomeException e)
306
307 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO [r]
308 runPGSQuery c q a = catch (PGS.query c q a) printError
309 where
310 printError (SomeException e) = do
311 q' <- PGS.formatQuery c q a
312 hPutStrLn stderr q'
313 throw (SomeException e)
314
315 runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
316 runPGSAdvisoryLock c id = do
317 _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
318 pure ()
319
320 runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
321 runPGSAdvisoryUnlock c id = do
322 _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
323 pure ()
324
325 runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
326 runPGSAdvisoryXactLock c id = do
327 _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
328 pure ()
329
330 nodeExists :: PGS.Connection -> NodeId -> IO Bool
331 nodeExists c nId = (== [PGS.Only True])
332 <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
333
334 getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
335 getNodesIdWithType c nt = do
336 ns <- runPGSQuery c query (PGS.Only $ nodeTypeId nt)
337 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
338 where
339 query :: PGS.Query
340 query = [sql| SELECT id FROM nodes WHERE typename = ? |]
341
342
343
344 -- nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
345 -- nodeStoryTable =
346 -- Table "node_stories"
347 -- ( pNodeStory NodeStoryDB { node_id = tableField "node_id"
348 -- , version = tableField "version"
349 -- , ngrams_type_id = tableField "ngrams_type_id"
350 -- , ngrams_id = tableField "ngrams_id"
351 -- , ngrams_repo_element = tableField "ngrams_repo_element"
352 -- } )
353
354 -- nodeStoryArchiveTable :: Table NodeStoryArchiveRead NodeStoryArchiveWrite
355 -- nodeStoryArchiveTable =
356 -- Table "node_story_archive_history"
357 -- ( pNodeArchiveStory NodeStoryArchiveDB { a_node_id = tableField "node_id"
358 -- , archive = tableField "archive" } )
359
360 -- nodeStorySelect :: Select NodeStoryRead
361 -- nodeStorySelect = selectTable nodeStoryTable
362
363 -- NOTE "first patch in the _a_history list is the most recent"
364 getNodeArchiveHistory :: PGS.Connection -> NodeId -> IO [NgramsStatePatch']
365 getNodeArchiveHistory c nodeId = do
366 as <- runPGSQuery c query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
367 pure $ (\(ngramsType, terms, patch) -> fst $ PM.singleton ngramsType (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> as
368 where
369 query :: PGS.Query
370 query = [sql| SELECT ngrams_type_id, terms, patch
371 FROM node_story_archive_history
372 JOIN ngrams ON ngrams.id = ngrams_id
373 WHERE node_id = ?
374 ORDER BY (version, node_story_archive_history.id) DESC |]
375
376 ngramsIdQuery :: PGS.Query
377 ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
378
379
380 insertNodeArchiveHistory :: PGS.Connection -> NodeId -> Version -> [NgramsStatePatch'] -> IO ()
381 insertNodeArchiveHistory _ _ _ [] = pure ()
382 insertNodeArchiveHistory c nodeId version (h:hs) = do
383 let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
384 (\(term, p) ->
385 (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
386 tuplesM <- mapM (\(nId, nType, term, patch) -> do
387 ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
388 pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
389 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
390 _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> (catMaybes tuplesM))
391 _ <- insertNodeArchiveHistory c nodeId version hs
392 pure ()
393 where
394
395 query :: PGS.Query
396
397 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version)
398 VALUES (?, ?, ?, ?, ?) |]
399
400 getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
401 getNodeStory c nId@(NodeId nodeId) = do
402 --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
403 res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
404 -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
405 -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
406 let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
407 Archive { _a_version = version
408 , _a_history = []
409 , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
410 -- NOTE Sanity check: all versions in the DB should be the same
411 -- TODO Maybe redesign the DB so that `node_stories` has only
412 -- `node_id`, `version` and there is a M2M table
413 -- `node_stories_ngrams` without the `version` colum? Then we would
414 -- have `version` in only one place.
415
416 {-
417 let versionsS = Set.fromList $ map (\a -> a ^. a_version) dbData
418 if Set.size versionsS > 1 then
419 panic $ Text.pack $ "[getNodeStory] versions for " <> show nodeId <> " differ! " <> show versionsS
420 else
421 pure ()
422 -}
423
424 pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
425 where
426 -- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
427 combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
428 & a_version .~ (a2 ^. a_version) -- version should be updated from list, not taken from the empty Archive
429
430 nodeStoriesQuery :: PGS.Query
431 nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
432 FROM node_stories
433 JOIN ngrams ON ngrams.id = ngrams_id
434 WHERE node_id = ? |]
435
436 type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
437 type ArchiveStateSet = Set.Set (TableNgrams.NgramsType, NgramsTerm)
438
439 -- |Functions to convert archive state (which is a `Map NgramsType
440 -- (Map NgramsTerm NgramsRepoElement`)) to/from a flat list
441 archiveStateToList :: NgramsState' -> ArchiveStateList
442 archiveStateToList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
443
444 archiveStateFromList :: ArchiveStateList -> NgramsState'
445 archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
446
447 archiveStateSet :: ArchiveStateList -> ArchiveStateSet
448 archiveStateSet lst = Set.fromList $ (\(nt, term, _) -> (nt, term)) <$> lst
449
450 archiveStateListFilterFromSet :: ArchiveStateSet -> ArchiveStateList -> ArchiveStateList
451 archiveStateListFilterFromSet set =
452 filter (\(nt, term, _) -> Set.member (nt, term) set)
453
454 -- | This function inserts whole new node story and archive for given node_id.
455 insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
456 insertNodeStory c (NodeId nId) a = do
457 _ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
458 termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
459 case headMay termIdM of
460 Nothing -> pure 0
461 Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateToList $ a ^. a_state
462 -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateToList _a_state
463
464 pure ()
465 where
466 query :: PGS.Query
467 query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
468 VALUES (?, ?, ?, ?) |]
469 -- insert ngramsType ngrams ngramsRepoElement =
470 -- Insert { iTable = nodeStoryTable
471 -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
472 -- , version = sqlInt4 _a_version
473 -- , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
474 -- , ngrams_id = ...
475 -- , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
476 -- }]
477 -- , iReturning = rCount
478 -- , iOnConflict = Nothing }
479
480 insertArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
481 insertArchiveStateList c nodeId version as = do
482 _ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, version, nt, nre, n)) as
483 pure ()
484 where
485 query :: PGS.Query
486 query = [sql| WITH s as (SELECT ? as sid, ? sversion, ? sngrams_type_id, ngrams.id as sngrams_id, ?::jsonb as srepo FROM ngrams WHERE terms = ?)
487 INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
488 SELECT s.sid, s.sversion, s.sngrams_type_id, s.sngrams_id, s.srepo from s s join nodes n on s.sid = n.id
489 |]
490
491 deleteArchiveStateList :: PGS.Connection -> NodeId -> ArchiveStateList -> IO ()
492 deleteArchiveStateList c nodeId as = do
493 _ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) as
494 pure ()
495 where
496 query :: PGS.Query
497 query = [sql| DELETE FROM node_stories
498 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
499
500 updateArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
501 updateArchiveStateList c nodeId version as = do
502 let params = (\(nt, n, nre) -> (nre, version, nodeId, nt, n)) <$> as
503 --q <- PGS.format c query params
504 --printDebug "[updateArchiveList] query" q
505 _ <- mapM (\p -> runPGSExecute c query p) params
506 pure ()
507 where
508 query :: PGS.Query
509 query = [sql| UPDATE node_stories
510 SET ngrams_repo_element = ?, version = ?
511 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
512
513 -- | This function updates the node story and archive for given node_id.
514 updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
515 updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
516 -- STEPS
517
518 -- 0. We assume we're inside an advisory lock
519
520 -- 1. Find differences (inserts/updates/deletes)
521 let currentList = archiveStateToList $ currentArchive ^. a_state
522 let newList = archiveStateToList $ newArchive ^. a_state
523 let currentSet = archiveStateSet currentList
524 let newSet = archiveStateSet newList
525
526 printDebug "[updateNodeStory] new - current = " $ Set.difference newSet currentSet
527 let inserts = archiveStateListFilterFromSet (Set.difference newSet currentSet) newList
528 -- printDebug "[updateNodeStory] inserts" inserts
529
530 printDebug "[updateNodeStory] current - new" $ Set.difference currentSet newSet
531 let deletes = archiveStateListFilterFromSet (Set.difference currentSet newSet) currentList
532 -- printDebug "[updateNodeStory] deletes" deletes
533
534 -- updates are the things that are in new but not in current
535 let commonSet = Set.intersection currentSet newSet
536 let commonNewList = archiveStateListFilterFromSet commonSet newList
537 let commonCurrentList = archiveStateListFilterFromSet commonSet currentList
538 let updates = Set.toList $ Set.difference (Set.fromList commonNewList) (Set.fromList commonCurrentList)
539 printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
540
541 -- 2. Perform inserts/deletes/updates
542 --printDebug "[updateNodeStory] applying insert" ()
543 insertArchiveStateList c nodeId (newArchive ^. a_version) inserts
544 --printDebug "[updateNodeStory] insert applied" ()
545 --TODO Use currentArchive ^. a_version in delete and report error
546 -- if entries with (node_id, ngrams_type_id, ngrams_id) but
547 -- different version are found.
548 deleteArchiveStateList c nodeId deletes
549 --printDebug "[updateNodeStory] delete applied" ()
550 updateArchiveStateList c nodeId (newArchive ^. a_version) updates
551 --printDebug "[updateNodeStory] update applied" ()
552
553 pure ()
554 -- where
555 -- update = Update { uTable = nodeStoryTable
556 -- , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
557 -- NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
558 -- , ..}
559 -- , .. })
560 -- , uWhere = (\row -> node_id row .== sqlInt4 nId)
561 -- , uReturning = rCount }
562
563 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
564 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
565 -- where
566 -- delete = Delete { dTable = nodeStoryTable
567 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
568 -- , dReturning = rCount }
569
570 upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
571 upsertNodeStories c nodeId@(NodeId nId) newArchive = do
572 printDebug "[upsertNodeStories] START nId" nId
573 PGS.withTransaction c $ do
574 printDebug "[upsertNodeStories] locking nId" nId
575 runPGSAdvisoryXactLock c nId
576
577 (NodeStory m) <- getNodeStory c nodeId
578 case Map.lookup nodeId m of
579 Nothing -> do
580 _ <- insertNodeStory c nodeId newArchive
581 pure ()
582 Just currentArchive -> do
583 _ <- updateNodeStory c nodeId currentArchive newArchive
584 pure ()
585
586 -- 3. Now we need to set versions of all node state to be the same
587 fixNodeStoryVersion c nodeId newArchive
588
589 printDebug "[upsertNodeStories] STOP nId" nId
590
591 fixNodeStoryVersion :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
592 fixNodeStoryVersion c nodeId newArchive = do
593 let ngramsTypes = Map.keys $ newArchive ^. a_state
594 _ <- mapM_ (\nt -> runPGSExecute c query (newArchive ^. a_version, nodeId, nt)) ngramsTypes
595 pure ()
596 where
597 query :: PGS.Query
598 query = [sql|UPDATE node_stories
599 SET version = ?
600 WHERE node_id = ?
601 AND ngrams_type_id = ?|]
602
603 writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
604 writeNodeStories c (NodeStory nls) = do
605 _ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
606 pure ()
607
608 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
609 nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
610 nodeStoryInc c Nothing nId = getNodeStory c nId
611 nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
612 case Map.lookup nId nls of
613 Nothing -> do
614 (NodeStory nls') <- getNodeStory c nId
615 pure $ NodeStory $ Map.union nls nls'
616 Just _ -> pure ns
617
618 nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
619 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
620 nodeStoryIncs c Nothing (ni:ns) = do
621 m <- getNodeStory c ni
622 nodeStoryIncs c (Just m) ns
623 nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
624
625 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
626 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
627 -- case Map.lookup ni nls of
628 -- Nothing -> do
629 -- _ <- nodeStoryRemove pool ni
630 -- pure ns
631 -- Just _ -> do
632 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
633 -- _ <- nodeStoryRemove pool ni
634 -- pure $ NodeStory ns'
635 ------------------------------------
636
637 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
638 readNodeStoryEnv pool = do
639 mvar <- nodeStoryVar pool Nothing []
640 let saver_immediate = modifyMVar_ mvar $ \ns -> do
641 withResource pool $ \c -> do
642 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
643 writeNodeStories c ns
644 pure ns
645 let archive_saver_immediate ns@(NodeStory nls) = withResource pool $ \c -> do
646 _ <- mapM (\(nId, a) -> do
647 insertNodeArchiveHistory c nId (a ^. a_version) $ reverse $ a ^. a_history
648 ) $ Map.toList nls
649 pure $ clearHistory ns
650 saver <- mkNodeStorySaver saver_immediate
651 -- let saver = modifyMVar_ mvar $ \mv -> do
652 -- writeNodeStories pool mv
653 -- printDebug "[readNodeStoryEnv] saver" mv
654 -- let mv' = clearHistory mv
655 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
656 -- return mv'
657 pure $ NodeStoryEnv { _nse_var = mvar
658 , _nse_saver = saver
659 , _nse_saver_immediate = saver_immediate
660 , _nse_archive_saver_immediate = archive_saver_immediate
661 , _nse_getter = nodeStoryVar pool (Just mvar)
662 }
663
664 nodeStoryVar :: Pool PGS.Connection
665 -> Maybe (MVar NodeListStory)
666 -> [NodeId]
667 -> IO (MVar NodeListStory)
668 nodeStoryVar pool Nothing nIds = do
669 state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
670 newMVar state
671 nodeStoryVar pool (Just mv) nIds = do
672 _ <- withResource pool
673 $ \c -> modifyMVar_ mv
674 $ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
675 pure mv
676
677 -- Debounce is useful since it could delay the saving to some later
678 -- time, asynchronously and we keep operating on memory only.
679 -- mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
680 -- mkNodeStorySaver pool mvns = do
681 mkNodeStorySaver :: IO () -> IO (IO ())
682 mkNodeStorySaver saver = mkDebounce settings
683 where
684 settings = defaultDebounceSettings
685 { debounceAction = saver
686 -- do
687 -- -- NOTE: Lock MVar first, then use resource pool.
688 -- -- Otherwise we could wait for MVar, while
689 -- -- blocking the pool connection.
690 -- modifyMVar_ mvns $ \ns -> do
691 -- withResource pool $ \c -> do
692 -- --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
693 -- writeNodeStories c ns
694 -- pure $ clearHistory ns
695 --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
696 , debounceFreq = 1*minute
697 }
698 minute = 60*second
699 second = 10^(6 :: Int)
700
701 clearHistory :: NodeListStory -> NodeListStory
702 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
703 where
704 emptyHistory = [] :: [NgramsStatePatch']
705
706 currentVersion :: (HasNodeStory env err m) => ListId -> m Version
707 currentVersion listId = do
708 pool <- view connPool
709 nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
710 pure $ nls ^. unNodeStory . at listId . _Just . a_version
711
712
713 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
714 -- mkNodeStorySaver mvns = mkDebounce settings
715 -- where
716 -- settings = defaultDebounceSettings
717 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
718 -- , debounceFreq = 1 * minute
719 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
720 -- }
721 -- minute = 60 * second
722 -- second = 10^(6 :: Int)
723
724
725 -----------------------------------------
726
727 fixNodeStoryVersions :: (HasNodeStory env err m) => m ()
728 fixNodeStoryVersions = do
729 pool <- view connPool
730 _ <- withResource pool $ \c -> liftBase $ PGS.withTransaction c $ do
731 nIds <- runPGSQuery c [sql| SELECT id FROM nodes WHERE ? |] (PGS.Only True) :: IO [PGS.Only Int64]
732 printDebug "[fixNodeStoryVersions] nIds" nIds
733 _ <- mapM_ (\(PGS.Only nId) -> do
734 printDebug "[fixNodeStoryVersions] nId" nId
735 updateVer c TableNgrams.Authors nId
736
737 updateVer c TableNgrams.Institutes nId
738
739 updateVer c TableNgrams.Sources nId
740
741 updateVer c TableNgrams.NgramsTerms nId
742
743 pure ()
744 ) nIds
745 pure ()
746 pure ()
747 where
748 maxVerQuery :: PGS.Query
749 maxVerQuery = [sql| SELECT max(version)
750 FROM node_stories
751 WHERE node_id = ?
752 AND ngrams_type_id = ? |]
753 updateVerQuery :: PGS.Query
754 updateVerQuery = [sql| UPDATE node_stories
755 SET version = ?
756 WHERE node_id = ?
757 AND ngrams_type_id = ? |]
758 updateVer :: PGS.Connection -> TableNgrams.NgramsType -> Int64 -> IO ()
759 updateVer c ngramsType nId = do
760 maxVer <- runPGSQuery c maxVerQuery (nId, ngramsType) :: IO [PGS.Only (Maybe Int64)]
761 case maxVer of
762 [] -> pure ()
763 [PGS.Only Nothing] -> pure ()
764 [PGS.Only (Just maxVersion)] -> do
765 _ <- runPGSExecute c updateVerQuery (maxVersion, nId, ngramsType)
766 pure ()
767 _ -> panic "Should get only 1 result!"