]> Git — Sourcephile - gargantext.git/blob - src/Gargantext/Core/NodeStory.hs
[FIX] NodeWrites parsing first fix
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
1 {-|
2 Module : Gargantext.Core.NodeStory
3 Description : Node API generation
4 Copyright : (c) CNRS, 2017-Present
5 License : AGPL + CECILL v3
6 Maintainer : team@gargantext.org
7 Stability : experimental
8 Portability : POSIX
9
10 A Node Story is a Map between NodeId and an Archive (with state,
11 version and history) for that node.
12
13 Couple of words on how this is implemented.
14
15 First version used files which stored Archive for each NodeId in a
16 separate .cbor file.
17
18 For performance reasons, it is rewritten to use the DB.
19
20 The table `node_stories` contains two columns: `node_id` and
21 `archive`.
22
23 Next, it was observed that `a_history` in `Archive` takes much
24 space. So a new table was created, `node_story_archive_history` with
25 columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
26 history item is in fact a map from `NgramsType` to `NgramsTablePatch`
27 (see the `NgramsStatePatch'` type).
28
29 Moreover, since in `G.A.Ngrams.commitStatePatch` we use current state
30 only, with only recent history items, I concluded that it is not
31 necessary to load whole history into memory. Instead, it is kept in DB
32 (history is immutable) and only recent changes are added to
33 `a_history`. Then that record is cleared whenever `Archive` is saved.
34
35 Please note that
36
37 TODO:
38 - remove
39 - filter
40 - charger les listes
41 -}
42
43 {-# OPTIONS_GHC -fno-warn-orphans #-}
44 {-# LANGUAGE Arrows #-}
45 {-# LANGUAGE ConstraintKinds #-}
46 {-# LANGUAGE QuasiQuotes #-}
47 {-# LANGUAGE TemplateHaskell #-}
48
49 module Gargantext.Core.NodeStory
50 ( HasNodeStory
51 , HasNodeStoryEnv
52 , hasNodeStory
53 , HasNodeStoryVar
54 , hasNodeStoryVar
55 , HasNodeStorySaver
56 , hasNodeStorySaver
57 , HasNodeStoryImmediateSaver
58 , hasNodeStoryImmediateSaver
59 , HasNodeArchiveStoryImmediateSaver
60 , hasNodeArchiveStoryImmediateSaver
61 , NodeStory(..)
62 , NgramsStatePatch'
63 , NodeListStory
64 , initNodeListStoryMock
65 , NodeStoryEnv(..)
66 , initNodeStory
67 , nse_getter
68 , nse_saver
69 , nse_saver_immediate
70 , nse_archive_saver_immediate
71 , nse_var
72 , unNodeStory
73 , getNodesArchiveHistory
74 , Archive(..)
75 , initArchive
76 , a_history
77 , a_state
78 , a_version
79 , nodeExists
80 , runPGSQuery
81 , runPGSAdvisoryLock
82 , runPGSAdvisoryUnlock
83 , runPGSAdvisoryXactLock
84 , getNodesIdWithType
85 , readNodeStoryEnv
86 , upsertNodeStories
87 , getNodeStory
88 , nodeStoriesQuery
89 , currentVersion
90 , archiveStateFromList
91 , archiveStateToList
92 , fixNodeStoryVersions )
93 where
94
95 -- import Debug.Trace (traceShow)
96 import Codec.Serialise.Class
97 import Control.Concurrent (MVar(), newMVar, modifyMVar_)
98 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
99 import Control.Exception (catch, throw, SomeException(..))
100 import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
101 import Control.Monad.Except
102 import Control.Monad.Reader
103 import Data.Aeson hiding ((.=), decode)
104 import Data.ByteString.Char8 (hPutStrLn)
105 import Data.HashMap.Strict (HashMap)
106 import Data.Map.Strict (Map)
107 import Data.Maybe (catMaybes)
108 import Data.Monoid
109 import Data.Pool (Pool, withResource)
110 import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
111 import Data.Semigroup
112 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
113 import Database.PostgreSQL.Simple.SqlQQ (sql)
114 import Database.PostgreSQL.Simple.Types (Values(..), QualifiedIdentifier(..))
115 import GHC.Generics (Generic)
116 import Gargantext.API.Ngrams.Types
117 import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
118 import Gargantext.Core.Utils.Prefix (unPrefix)
119 import Gargantext.Database.Admin.Config (nodeTypeId)
120 import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
121 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
122 import Gargantext.Database.Schema.Ngrams (NgramsType)
123 import Gargantext.Prelude
124 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
125 import System.IO (stderr)
126 import qualified Data.HashMap.Strict as HashMap
127 import qualified Data.Map.Strict as Map
128 import qualified Data.Map.Strict.Patch as PM
129 import qualified Data.Set as Set
130 import qualified Data.Text as Text
131 import qualified Database.PostgreSQL.Simple as PGS
132 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
133
134 ------------------------------------------------------------------------
135 data NodeStoryEnv = NodeStoryEnv
136 { _nse_var :: !(MVar NodeListStory)
137 , _nse_saver :: !(IO ())
138 , _nse_saver_immediate :: !(IO ())
139 , _nse_archive_saver_immediate :: !(NodeListStory -> IO NodeListStory)
140 , _nse_getter :: !([NodeId] -> IO (MVar NodeListStory))
141 --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
142 -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
143 }
144 deriving (Generic)
145
146 type HasNodeStory env err m = ( CmdM' env err m
147 , MonadReader env m
148 , MonadError err m
149 , HasNodeStoryEnv env
150 , HasConfig env
151 , HasConnectionPool env
152 , HasNodeError err
153 )
154
155 class (HasNodeStoryVar env, HasNodeStorySaver env)
156 => HasNodeStoryEnv env where
157 hasNodeStory :: Getter env NodeStoryEnv
158
159 class HasNodeStoryVar env where
160 hasNodeStoryVar :: Getter env ([NodeId] -> IO (MVar NodeListStory))
161
162 class HasNodeStorySaver env where
163 hasNodeStorySaver :: Getter env (IO ())
164
165 class HasNodeStoryImmediateSaver env where
166 hasNodeStoryImmediateSaver :: Getter env (IO ())
167
168 class HasNodeArchiveStoryImmediateSaver env where
169 hasNodeArchiveStoryImmediateSaver :: Getter env (NodeListStory -> IO NodeListStory)
170
171 ------------------------------------------------------------------------
172
173 {- | Node Story for each NodeType where the Key of the Map is NodeId
174 TODO : generalize for any NodeType, let's start with NodeList which
175 is implemented already
176 -}
177 newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
178 deriving (Generic, Show)
179
180 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
181 instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
182 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
183
184 data Archive s p = Archive
185 { _a_version :: !Version
186 , _a_state :: !s
187 , _a_history :: ![p]
188 -- first patch in the list is the most recent
189 -- We use `take` in `commitStatePatch`, that's why.
190
191 -- History is immutable, we just insert things on top of existing
192 -- list.
193
194 -- We don't need to store the whole history in memory, this
195 -- structure holds only recent history, the one that will be
196 -- inserted to the DB.
197 }
198 deriving (Generic, Show)
199
200 instance (Serialise s, Serialise p) => Serialise (Archive s p)
201
202
203 type NodeListStory = NodeStory NgramsState' NgramsStatePatch'
204
205 type NgramsState' = Map TableNgrams.NgramsType NgramsTableMap
206 type NgramsStatePatch' = PatchMap TableNgrams.NgramsType NgramsTablePatch
207 instance Serialise NgramsStatePatch'
208 instance FromField (Archive NgramsState' NgramsStatePatch')
209 where
210 fromField = fromJSONField
211 instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
212 where
213 defaultFromField = fromPGSFromField
214
215 -- | Combine `NgramsState'`. This is because the structure is (Map
216 -- NgramsType (Map ...)) and the default `(<>)` operator is
217 -- left-biased
218 -- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
219 combineState :: NgramsState' -> NgramsState' -> NgramsState'
220 combineState = Map.unionWith (<>)
221
222 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
223 (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
224 , _a_state = s'
225 , _a_history = p' }) =
226 Archive { _a_version = v'
227 , _a_state = s'
228 , _a_history = p' <> p }
229 instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
230 mempty = Archive { _a_version = 0
231 , _a_state = mempty
232 , _a_history = [] }
233 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
234 parseJSON = genericParseJSON $ unPrefix "_a_"
235 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
236 toJSON = genericToJSON $ unPrefix "_a_"
237 toEncoding = genericToEncoding $ unPrefix "_a_"
238
239 ------------------------------------------------------------------------
240 initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
241 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
242
243 initArchive :: (Monoid s, Semigroup p) => Archive s p
244 initArchive = mempty
245
246 initNodeListStoryMock :: NodeListStory
247 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
248 where
249 nodeListId = 0
250 archive = Archive { _a_version = 0
251 , _a_state = ngramsTableMap
252 , _a_history = [] }
253 ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
254 $ Map.fromList
255 [ (n ^. ne_ngrams, ngramsElementToRepo n)
256 | n <- mockTable ^. _NgramsTable
257 ]
258
259 ------------------------------------------------------------------------
260 ------------------------------------------------------------------------
261 -- | Lenses at the bottom of the file because Template Haskell would reorder order of execution in others cases
262 makeLenses ''NodeStoryEnv
263 makeLenses ''NodeStory
264 makeLenses ''Archive
265
266 ----------------------------------------------------------------------
267 data NodeStoryPoly nid v ngtid ngid nre =
268 NodeStoryDB { node_id :: !nid
269 , version :: !v
270 , ngrams_type_id :: !ngtid
271 , ngrams_id :: !ngid
272 , ngrams_repo_element :: !nre }
273 deriving (Eq)
274
275 data NodeStoryArchivePoly nid a =
276 NodeStoryArchiveDB { a_node_id :: !nid
277 , archive :: !a }
278 deriving (Eq)
279
280 $(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
281 $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
282
283 -- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
284 -- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
285
286 -- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
287 -- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
288
289 type ArchiveList = Archive NgramsState' NgramsStatePatch'
290
291 -- DB stuff
292
293 runPGSExecute :: (PGS.ToRow q)
294 => PGS.Connection -> PGS.Query -> q -> IO Int64
295 runPGSExecute c qs a = catch (PGS.execute c qs a) printError
296 where
297 printError (SomeException e) = do
298 --q' <- PGS.formatQuery c qs a
299 _ <- panic $ Text.pack $ show e
300 throw (SomeException e)
301
302 runPGSExecuteMany :: (PGS.ToRow q)
303 => PGS.Connection -> PGS.Query -> [q] -> IO Int64
304 runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
305 where
306 printError (SomeException e) = do
307 --q' <- PGS.formatQuery c qs a
308 _ <- panic $ Text.pack $ show e
309 throw (SomeException e)
310
311 runPGSQuery :: (PGS.FromRow r, PGS.ToRow q)
312 => PGS.Connection -> PGS.Query -> q -> IO [r]
313 runPGSQuery c q a = catch (PGS.query c q a) printError
314 where
315 printError (SomeException e) = do
316 q' <- PGS.formatQuery c q a
317 hPutStrLn stderr q'
318 throw (SomeException e)
319
320 runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
321 runPGSAdvisoryLock c id = do
322 _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |]
323 (PGS.Only id) :: IO [PGS.Only ()]
324 pure ()
325
326 runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
327 runPGSAdvisoryUnlock c id = do
328 _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |]
329 (PGS.Only id) :: IO [PGS.Only Bool]
330 pure ()
331
332 runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
333 runPGSAdvisoryXactLock c id = do
334 _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |]
335 (PGS.Only id) :: IO [PGS.Only ()]
336 pure ()
337
338 nodeExists :: PGS.Connection -> NodeId -> IO Bool
339 nodeExists c nId = (== [PGS.Only True])
340 <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |]
341 (PGS.Only nId)
342
343 getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
344 getNodesIdWithType c nt = do
345 ns <- runPGSQuery c query (PGS.Only $ nodeTypeId nt)
346 pure $ map (\(PGS.Only nId) -> NodeId nId) ns
347 where
348 query :: PGS.Query
349 query = [sql| SELECT id FROM nodes WHERE typename = ? |]
350
351
352 -- /!\ This function is using an hard coded parameter
353 -- which depends on the Ngrams List Flow
354 -- Version > 5 is hard coded because by default
355 -- first version of history of manual change is 6
356 getNodesArchiveHistory :: PGS.Connection
357 -> [NodeId]
358 -> IO [(NodeId, (Map NgramsType [HashMap NgramsTerm NgramsPatch]))]
359 getNodesArchiveHistory c nodesId = do
360 as <- runPGSQuery c query (PGS.Only $ Values fields nodesId)
361 :: IO [(Int, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
362
363 pure $ map (\(nId, ngramsType, terms, patch)
364 -> ( NodeId nId
365 , Map.singleton ngramsType [HashMap.singleton terms patch]
366 )
367 ) as
368 where
369
370 fields = [QualifiedIdentifier Nothing "int4"]
371 query :: PGS.Query
372 query = [sql| WITH nodes_id(nid) as (?)
373 SELECT node_id, ngrams_type_id, terms, patch
374 FROM node_story_archive_history
375 JOIN ngrams ON ngrams.id = ngrams_id
376 JOIN nodes_id n ON node_id = n.nid
377 WHERE version > 5
378 ORDER BY (version, node_story_archive_history.id) DESC
379 |]
380
381 ngramsIdQuery :: PGS.Query
382 ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
383
384
385 insertNodeArchiveHistory :: PGS.Connection -> NodeId -> Version -> [NgramsStatePatch'] -> IO ()
386 insertNodeArchiveHistory _ _ _ [] = pure ()
387 insertNodeArchiveHistory c nodeId version (h:hs) = do
388 let tuples = mconcat $ (\(nType, NgramsTablePatch patch) ->
389 (\(term, p) ->
390 (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
391 tuplesM <- mapM (\(nId, nType, term, patch) -> do
392 ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
393 pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
394 ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
395 _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> catMaybes tuplesM)
396 _ <- insertNodeArchiveHistory c nodeId version hs
397 pure ()
398 where
399
400 -- https://dba.stackexchange.com/questions/265554/how-to-check-other-table-for-value-during-insert
401 query :: PGS.Query
402 query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version)
403 SELECT node_id, ngrams_type_id, ngrams_id, patch::jsonb, version FROM (
404 VALUES (?, ?, ?, ?, ?)
405 ) AS i(node_id, ngrams_type_id, ngrams_id, patch, version)
406 WHERE NOT EXISTS (
407 SELECT FROM ngrams
408 CROSS JOIN nodes
409 WHERE ngrams.id = ngrams_id
410 AND nodes.id = node_id
411 )|]
412
413 getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
414 getNodeStory c nId@(NodeId nodeId) = do
415 --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
416 res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
417 -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
418 -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
419 let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
420 Archive { _a_version = version
421 , _a_history = []
422 , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
423 -- NOTE Sanity check: all versions in the DB should be the same
424 -- TODO Maybe redesign the DB so that `node_stories` has only
425 -- `node_id`, `version` and there is a M2M table
426 -- `node_stories_ngrams` without the `version` colum? Then we would
427 -- have `version` in only one place.
428
429 {-
430 let versionsS = Set.fromList $ map (\a -> a ^. a_version) dbData
431 if Set.size versionsS > 1 then
432 panic $ Text.pack $ "[getNodeStory] versions for " <> show nodeId <> " differ! " <> show versionsS
433 else
434 pure ()
435 -}
436
437 pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
438 where
439 -- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
440 combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
441 & a_version .~ (a2 ^. a_version) -- version should be updated from list, not taken from the empty Archive
442
443 nodeStoriesQuery :: PGS.Query
444 nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
445 FROM node_stories
446 JOIN ngrams ON ngrams.id = ngrams_id
447 WHERE node_id = ? |]
448
449 type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
450 type ArchiveStateSet = Set.Set (TableNgrams.NgramsType, NgramsTerm)
451
452 -- |Functions to convert archive state (which is a `Map NgramsType
453 -- (Map NgramsTerm NgramsRepoElement`)) to/from a flat list
454 archiveStateToList :: NgramsState' -> ArchiveStateList
455 archiveStateToList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
456
457 archiveStateFromList :: ArchiveStateList -> NgramsState'
458 archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
459
460 archiveStateSet :: ArchiveStateList -> ArchiveStateSet
461 archiveStateSet lst = Set.fromList $ (\(nt, term, _) -> (nt, term)) <$> lst
462
463 archiveStateListFilterFromSet :: ArchiveStateSet -> ArchiveStateList -> ArchiveStateList
464 archiveStateListFilterFromSet set =
465 filter (\(nt, term, _) -> Set.member (nt, term) set)
466
467 -- | This function inserts whole new node story and archive for given node_id.
468 insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
469 insertNodeStory c (NodeId nId) a = do
470 mapM_ (\(ngramsType, ngrams, ngramsRepoElement) -> do
471 termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
472 case headMay termIdM of
473 Nothing -> pure 0
474 Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateToList $ a ^. a_state
475 -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateToList _a_state
476
477 where
478 -- https://dba.stackexchange.com/questions/265554/how-to-check-other-table-for-value-during-insert
479 query :: PGS.Query
480 query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
481 SELECT * FROM (
482 VALUES (?, ?, ?, ?)
483 ) AS i(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
484 WHERE NOT EXISTS (
485 SELECT FROM ngrams
486 CROSS JOIN nodes
487 WHERE ngrams.id = ngrams_id
488 AND nodes.id = node_id
489 )|]
490 -- insert ngramsType ngrams ngramsRepoElement =
491 -- Insert { iTable = nodeStoryTable
492 -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
493 -- , version = sqlInt4 _a_version
494 -- , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
495 -- , ngrams_id = ...
496 -- , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
497 -- }]
498 -- , iReturning = rCount
499 -- , iOnConflict = Nothing }
500
501 insertArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
502 insertArchiveStateList c nodeId version as = do
503 mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, version, nt, nre, n)) as
504 where
505 query :: PGS.Query
506 query = [sql| WITH s as (SELECT ? as sid, ? sversion, ? sngrams_type_id, ngrams.id as sngrams_id, ?::jsonb as srepo FROM ngrams WHERE terms = ?)
507 INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
508 SELECT s.sid, s.sversion, s.sngrams_type_id, s.sngrams_id, s.srepo from s s join nodes n on s.sid = n.id
509 |]
510
511 deleteArchiveStateList :: PGS.Connection -> NodeId -> ArchiveStateList -> IO ()
512 deleteArchiveStateList c nodeId as = do
513 mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) as
514 where
515 query :: PGS.Query
516 query = [sql| DELETE FROM node_stories
517 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
518
519 updateArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
520 updateArchiveStateList c nodeId version as = do
521 let params = (\(nt, n, nre) -> (nre, version, nodeId, nt, n)) <$> as
522 --q <- PGS.format c query params
523 --printDebug "[updateArchiveList] query" q
524 mapM_ (runPGSExecute c query) params
525 where
526 query :: PGS.Query
527 query = [sql| UPDATE node_stories
528 SET ngrams_repo_element = ?, version = ?
529 WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
530
531 -- | This function updates the node story and archive for given node_id.
532 updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
533 updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
534 -- STEPS
535
536 -- 0. We assume we're inside an advisory lock
537
538 -- 1. Find differences (inserts/updates/deletes)
539 let currentList = archiveStateToList $ currentArchive ^. a_state
540 let newList = archiveStateToList $ newArchive ^. a_state
541 let currentSet = archiveStateSet currentList
542 let newSet = archiveStateSet newList
543
544 printDebug "[updateNodeStory] new - current = " $ Set.difference newSet currentSet
545 let inserts = archiveStateListFilterFromSet (Set.difference newSet currentSet) newList
546 -- printDebug "[updateNodeStory] inserts" inserts
547
548 printDebug "[updateNodeStory] current - new" $ Set.difference currentSet newSet
549 let deletes = archiveStateListFilterFromSet (Set.difference currentSet newSet) currentList
550 -- printDebug "[updateNodeStory] deletes" deletes
551
552 -- updates are the things that are in new but not in current
553 let commonSet = Set.intersection currentSet newSet
554 let commonNewList = archiveStateListFilterFromSet commonSet newList
555 let commonCurrentList = archiveStateListFilterFromSet commonSet currentList
556 let updates = Set.toList $ Set.difference (Set.fromList commonNewList) (Set.fromList commonCurrentList)
557 printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
558
559 -- 2. Perform inserts/deletes/updates
560 --printDebug "[updateNodeStory] applying insert" ()
561 insertArchiveStateList c nodeId (newArchive ^. a_version) inserts
562 --printDebug "[updateNodeStory] insert applied" ()
563 --TODO Use currentArchive ^. a_version in delete and report error
564 -- if entries with (node_id, ngrams_type_id, ngrams_id) but
565 -- different version are found.
566 deleteArchiveStateList c nodeId deletes
567 --printDebug "[updateNodeStory] delete applied" ()
568 updateArchiveStateList c nodeId (newArchive ^. a_version) updates
569 --printDebug "[updateNodeStory] update applied" ()
570
571 pure ()
572 -- where
573 -- update = Update { uTable = nodeStoryTable
574 -- , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
575 -- NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
576 -- , ..}
577 -- , .. })
578 -- , uWhere = (\row -> node_id row .== sqlInt4 nId)
579 -- , uReturning = rCount }
580
581 -- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
582 -- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
583 -- where
584 -- delete = Delete { dTable = nodeStoryTable
585 -- , dWhere = (\row -> node_id row .== sqlInt4 nId)
586 -- , dReturning = rCount }
587
588 upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
589 upsertNodeStories c nodeId@(NodeId nId) newArchive = do
590 printDebug "[upsertNodeStories] START nId" nId
591 PGS.withTransaction c $ do
592 printDebug "[upsertNodeStories] locking nId" nId
593 runPGSAdvisoryXactLock c nId
594
595 (NodeStory m) <- getNodeStory c nodeId
596 case Map.lookup nodeId m of
597 Nothing -> do
598 _ <- insertNodeStory c nodeId newArchive
599 pure ()
600 Just currentArchive -> do
601 _ <- updateNodeStory c nodeId currentArchive newArchive
602 pure ()
603
604 -- 3. Now we need to set versions of all node state to be the same
605 fixNodeStoryVersion c nodeId newArchive
606
607 printDebug "[upsertNodeStories] STOP nId" nId
608
609 fixNodeStoryVersion :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
610 fixNodeStoryVersion c nodeId newArchive = do
611 let ngramsTypes = Map.keys $ newArchive ^. a_state
612 mapM_ (\nt -> runPGSExecute c query (newArchive ^. a_version, nodeId, nt)) ngramsTypes
613 where
614 query :: PGS.Query
615 query = [sql|UPDATE node_stories
616 SET version = ?
617 WHERE node_id = ?
618 AND ngrams_type_id = ?|]
619
620 writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
621 writeNodeStories c (NodeStory nls) = do
622 mapM_ (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
623
624 -- | Returns a `NodeListStory`, updating the given one for given `NodeId`
625 nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
626 nodeStoryInc c Nothing nId = getNodeStory c nId
627 nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
628 case Map.lookup nId nls of
629 Nothing -> do
630 (NodeStory nls') <- getNodeStory c nId
631 pure $ NodeStory $ Map.union nls nls'
632 Just _ -> pure ns
633
634 nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
635 nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
636 nodeStoryIncs c Nothing (ni:ns) = do
637 m <- getNodeStory c ni
638 nodeStoryIncs c (Just m) ns
639 nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
640
641 -- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
642 -- nodeStoryDec pool ns@(NodeStory nls) ni = do
643 -- case Map.lookup ni nls of
644 -- Nothing -> do
645 -- _ <- nodeStoryRemove pool ni
646 -- pure ns
647 -- Just _ -> do
648 -- let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
649 -- _ <- nodeStoryRemove pool ni
650 -- pure $ NodeStory ns'
651 ------------------------------------
652
653 readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
654 readNodeStoryEnv pool = do
655 mvar <- nodeStoryVar pool Nothing []
656 let saver_immediate = modifyMVar_ mvar $ \ns -> do
657 withResource pool $ \c -> do
658 --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
659 writeNodeStories c ns
660 pure ns
661 let archive_saver_immediate ns@(NodeStory nls) = withResource pool $ \c -> do
662 mapM_ (\(nId, a) -> do
663 insertNodeArchiveHistory c nId (a ^. a_version) $ reverse $ a ^. a_history
664 ) $ Map.toList nls
665 pure $ clearHistory ns
666 saver <- mkNodeStorySaver saver_immediate
667 -- let saver = modifyMVar_ mvar $ \mv -> do
668 -- writeNodeStories pool mv
669 -- printDebug "[readNodeStoryEnv] saver" mv
670 -- let mv' = clearHistory mv
671 -- printDebug "[readNodeStoryEnv] saver, cleared" mv'
672 -- return mv'
673 pure $ NodeStoryEnv { _nse_var = mvar
674 , _nse_saver = saver
675 , _nse_saver_immediate = saver_immediate
676 , _nse_archive_saver_immediate = archive_saver_immediate
677 , _nse_getter = nodeStoryVar pool (Just mvar)
678 }
679
680 nodeStoryVar :: Pool PGS.Connection
681 -> Maybe (MVar NodeListStory)
682 -> [NodeId]
683 -> IO (MVar NodeListStory)
684 nodeStoryVar pool Nothing nIds = do
685 state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
686 newMVar state
687 nodeStoryVar pool (Just mv) nIds = do
688 _ <- withResource pool
689 $ \c -> modifyMVar_ mv
690 $ \nsl -> nodeStoryIncs c (Just nsl) nIds
691 pure mv
692
693 -- Debounce is useful since it could delay the saving to some later
694 -- time, asynchronously and we keep operating on memory only.
695 -- mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
696 -- mkNodeStorySaver pool mvns = do
697 mkNodeStorySaver :: IO () -> IO (IO ())
698 mkNodeStorySaver saver = mkDebounce settings
699 where
700 settings = defaultDebounceSettings
701 { debounceAction = saver
702 -- do
703 -- -- NOTE: Lock MVar first, then use resource pool.
704 -- -- Otherwise we could wait for MVar, while
705 -- -- blocking the pool connection.
706 -- modifyMVar_ mvns $ \ns -> do
707 -- withResource pool $ \c -> do
708 -- --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
709 -- writeNodeStories c ns
710 -- pure $ clearHistory ns
711 --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
712 , debounceFreq = 1*minute
713 }
714 minute = 60*second
715 second = 10^(6 :: Int)
716
717 clearHistory :: NodeListStory -> NodeListStory
718 clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
719 where
720 emptyHistory = [] :: [NgramsStatePatch']
721
722 currentVersion :: (HasNodeStory env err m) => ListId -> m Version
723 currentVersion listId = do
724 pool <- view connPool
725 nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
726 pure $ nls ^. unNodeStory . at listId . _Just . a_version
727
728
729 -- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
730 -- mkNodeStorySaver mvns = mkDebounce settings
731 -- where
732 -- settings = defaultDebounceSettings
733 -- { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
734 -- , debounceFreq = 1 * minute
735 -- -- , debounceEdge = trailingEdge -- Trigger on the trailing edge
736 -- }
737 -- minute = 60 * second
738 -- second = 10^(6 :: Int)
739
740
741 -----------------------------------------
742
743 fixNodeStoryVersions :: (HasNodeStory env err m) => m ()
744 fixNodeStoryVersions = do
745 pool <- view connPool
746 _ <- withResource pool $ \c -> liftBase $ PGS.withTransaction c $ do
747 nIds <- runPGSQuery c [sql| SELECT id FROM nodes WHERE ? |] (PGS.Only True) :: IO [PGS.Only Int64]
748 printDebug "[fixNodeStoryVersions] nIds" nIds
749 mapM_ (\(PGS.Only nId) -> do
750 printDebug "[fixNodeStoryVersions] nId" nId
751 updateVer c TableNgrams.Authors nId
752
753 updateVer c TableNgrams.Institutes nId
754
755 updateVer c TableNgrams.Sources nId
756
757 updateVer c TableNgrams.NgramsTerms nId
758
759 pure ()
760 ) nIds
761 pure ()
762 where
763 maxVerQuery :: PGS.Query
764 maxVerQuery = [sql| SELECT max(version)
765 FROM node_stories
766 WHERE node_id = ?
767 AND ngrams_type_id = ? |]
768 updateVerQuery :: PGS.Query
769 updateVerQuery = [sql| UPDATE node_stories
770 SET version = ?
771 WHERE node_id = ?
772 AND ngrams_type_id = ? |]
773 updateVer :: PGS.Connection -> TableNgrams.NgramsType -> Int64 -> IO ()
774 updateVer c ngramsType nId = do
775 maxVer <- runPGSQuery c maxVerQuery (nId, ngramsType) :: IO [PGS.Only (Maybe Int64)]
776 case maxVer of
777 [] -> pure ()
778 [PGS.Only Nothing] -> pure ()
779 [PGS.Only (Just maxVersion)] -> do
780 _ <- runPGSExecute c updateVerQuery (maxVersion, nId, ngramsType)
781 pure ()
782 _ -> panic "Should get only 1 result!"