[nodeStory] add version to node_story_archive_history
[gargantext.git] / src / Gargantext / Core / NodeStory.hs
index 4fbb6ce09530cdc5cfcd5edaecdfdfd7a5cb6275..e3e89a7d56baac4033d0243fd2dfd8cf3b30bb02 100644 (file)
@@ -10,6 +10,30 @@ Portability : POSIX
 A Node Story is a Map between NodeId and an Archive (with state,
 version and history) for that node.
 
+Couple of words on how this is implemented.
+
+First version used files which stored Archive for each NodeId in a
+separate .cbor file.
+
+For performance reasons, it is rewritten to use the DB.
+
+The table `node_stories` contains two columns: `node_id` and
+`archive`.
+
+Next, it was observed that `a_history` in `Archive` takes much
+space. So a new table was created, `node_story_archive_history` with
+columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
+history item is in fact a map from `NgramsType` to `NgramsTablePatch`
+(see the `NgramsStatePatch'` type).
+
+Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
+only, with only recent history items, I concluded that it is not
+necessary to load whole history into memory. Instead, it is kept in DB
+(history is immutable) and only recent changes are added to
+`a_history`. Then that record is cleared whenever `Archive` is saved.
+
+Please note that
+
 TODO:
 - remove
 - filter
@@ -17,38 +41,82 @@ TODO:
 -}
 
 {-# OPTIONS_GHC -fno-warn-orphans #-}
-{-# LANGUAGE TemplateHaskell   #-}
+{-# LANGUAGE Arrows #-}
 {-# LANGUAGE ConstraintKinds   #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE TemplateHaskell   #-}
 
-module Gargantext.Core.NodeStory where
+module Gargantext.Core.NodeStory
+  ( HasNodeStory
+  , HasNodeStoryEnv
+  , hasNodeStory
+  , HasNodeStoryVar
+  , hasNodeStoryVar
+  , HasNodeStorySaver
+  , hasNodeStorySaver
+  , NodeStory(..)
+  , NgramsStatePatch'
+  , NodeListStory
+  , initNodeListStoryMock
+  , NodeStoryEnv(..)
+  , initNodeStory
+  , nse_getter
+  , nse_saver
+  , nse_var
+  , unNodeStory
+  , getNodeArchiveHistory
+  , Archive(..)
+  , initArchive
+  , insertArchiveList
+  , deleteArchiveList
+  , updateArchiveList
+  , a_history
+  , a_state
+  , a_version
+  , nodeExists
+  , runPGSQuery
+  , runPGSAdvisoryLock
+  , runPGSAdvisoryUnlock
+  , runPGSAdvisoryXactLock
+  , getNodesIdWithType
+  , readNodeStoryEnv
+  , upsertNodeStories
+  , getNodeStory
+  , nodeStoriesQuery
+  , currentVersion )
+where
 
 -- import Debug.Trace (traceShow)
-import Codec.Serialise (serialise, deserialise)
-import Codec.Serialise.Class 
-import Control.Concurrent (MVar(), withMVar, newMVar, modifyMVar_)
 import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
-import Control.Lens (makeLenses, Getter, (^.))
+import Codec.Serialise.Class
+import Control.Concurrent (MVar(), newMVar, modifyMVar_)
+import Control.Exception (catch, throw, SomeException(..))
+import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
 import Control.Monad.Except
 import Control.Monad.Reader
 import Data.Aeson hiding ((.=), decode)
+import Data.ByteString.Char8 (hPutStrLn)
 import Data.Map.Strict (Map)
+import Data.Maybe (catMaybes)
 import Data.Monoid
+import Data.Pool (Pool, withResource)
 import Data.Semigroup
+import Database.PostgreSQL.Simple.SqlQQ (sql)
 import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField)
+import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
 import GHC.Generics (Generic)
 import Gargantext.API.Ngrams.Types
-import Gargantext.Core.Types (NodeId)
+import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
 import Gargantext.Core.Utils.Prefix (unPrefix)
-import Gargantext.Database.Prelude (CmdM', HasConnectionPool, HasConfig)
+import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
 import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
 import Gargantext.Prelude
 import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
-import System.Directory (renameFile, createDirectoryIfMissing, doesFileExist, removeFile)
-import System.IO (FilePath, hClose)
-import System.IO.Temp (withTempFile)
-import qualified Data.ByteString.Lazy                   as DBL
-import qualified Data.List                              as List
+import System.IO (stderr)
 import qualified Data.Map.Strict                        as Map
+import qualified Data.Map.Strict.Patch                  as PM
+import qualified Data.Set as Set
+import qualified Database.PostgreSQL.Simple as PGS
 import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
 
 ------------------------------------------------------------------------
@@ -81,183 +149,12 @@ class HasNodeStorySaver env where
   hasNodeStorySaver :: Getter env (IO ())
 
 ------------------------------------------------------------------------
-readNodeStoryEnv :: NodeStoryDir -> IO NodeStoryEnv
-readNodeStoryEnv nsd = do
-  mvar  <- nodeStoryVar nsd Nothing []
-  saver <- mkNodeStorySaver nsd mvar
-  pure $ NodeStoryEnv { _nse_var = mvar
-                      , _nse_saver = saver
-                      , _nse_getter = nodeStoryVar nsd (Just mvar) }
-
-------------------------------------------------------------------------
-mkNodeStorySaver :: NodeStoryDir -> MVar NodeListStory -> IO (IO ())
-mkNodeStorySaver nsd mvns = mkDebounce settings
-  where
-    settings = defaultDebounceSettings
-                 { debounceAction = withMVar mvns (writeNodeStories nsd)
-                 , debounceFreq = 1 * minute
---                 , debounceEdge = trailingEdge -- Trigger on the trailing edge
-                 }
-    minute = 60 * second
-    second = 10^(6 :: Int)
-
-nodeStoryVar :: NodeStoryDir
-             -> Maybe (MVar NodeListStory)
-             -> [NodeId]
-             -> IO (MVar NodeListStory)
-nodeStoryVar nsd Nothing ni = nodeStoryIncs nsd Nothing ni >>= newMVar
-nodeStoryVar nsd (Just mv) ni = do
-  _ <- modifyMVar_ mv $ \mv' -> (nodeStoryIncs nsd (Just mv') ni)
-  pure mv
-
-
-nodeStoryInc :: NodeStoryDir -> Maybe NodeListStory -> NodeId -> IO NodeListStory
-nodeStoryInc nsd (Just ns@(NodeStory nls)) ni = do
-  case Map.lookup ni nls of
-    Nothing -> do
-      (NodeStory nls') <- nodeStoryRead nsd ni
-      pure $ NodeStory $ Map.union nls nls'
-    Just _  -> pure ns
-nodeStoryInc nsd Nothing ni = nodeStoryRead nsd ni
-
-
-nodeStoryIncs :: NodeStoryDir
-              -> Maybe NodeListStory
-              -> [NodeId]
-              -> IO NodeListStory
-nodeStoryIncs _ Nothing    []        = pure $ NodeStory $ Map.empty
-nodeStoryIncs nsd (Just nls) ns      = foldM (\m n -> nodeStoryInc nsd (Just m) n) nls ns
-nodeStoryIncs nsd Nothing    (ni:ns) = do
-  m <- nodeStoryRead nsd ni
-  nodeStoryIncs nsd (Just m) ns
-
-
-nodeStoryDec :: NodeStoryDir
-             -> NodeListStory
-             -> NodeId
-             -> IO NodeListStory
-nodeStoryDec nsd ns@(NodeStory nls) ni = do
-  case Map.lookup ni nls of
-    Nothing -> do
-      -- we make sure the corresponding file repo is really removed
-      _ <- nodeStoryRemove nsd ni
-      pure ns
-    Just _  -> do
-      let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
-      _ <- nodeStoryRemove nsd ni
-      pure $ NodeStory ns'
-
--- | TODO lock
-nodeStoryRead :: NodeStoryDir -> NodeId -> IO NodeListStory
-nodeStoryRead nsd ni = do
-  _repoDir <- createDirectoryIfMissing True nsd
-  let nsp = nodeStoryPath nsd ni
-  exists <- doesFileExist nsp
-  if exists
-     then deserialise <$> DBL.readFile nsp
-     else pure (initNodeStory ni)
-
-nodeStoryRemove :: NodeStoryDir -> NodeId -> IO ()
-nodeStoryRemove nsd ni = do
-  let nsp = nodeStoryPath nsd ni
-  exists <- doesFileExist nsp
-  if exists
-     then removeFile nsp
-     else pure ()
-
-
-
-nodeStoryRead_test :: NodeStoryDir -> NodeId -> IO (Maybe [ TableNgrams.NgramsType ])
-nodeStoryRead_test nsd ni = nodeStoryRead nsd ni >>= \n -> pure
-                          $ fmap Map.keys
-                          $ fmap _a_state
-                          $ Map.lookup ni
-                          $ _unNodeStory n
-
-------------------------------------------------------------------------
-type NodeStoryDir = FilePath
-
-writeNodeStories :: NodeStoryDir -> NodeListStory -> IO ()
-writeNodeStories fp nls = do
-  _done <- mapM (writeNodeStory fp) $ splitByNode nls
-  -- printDebug "[writeNodeStories]" done
-  pure ()
-
-writeNodeStory :: NodeStoryDir -> (NodeId, NodeListStory) -> IO ()
-writeNodeStory rdfp (n, ns) = saverAction' rdfp n ns
-
-splitByNode :: NodeListStory -> [(NodeId, NodeListStory)]
-splitByNode (NodeStory m) =
-  List.map (\(n,a) -> (n, NodeStory $ Map.singleton n a)) $ Map.toList m
-
-
-saverAction' :: Serialise a => NodeStoryDir -> NodeId -> a -> IO ()
-saverAction' repoDir nId a = do
-  withTempFile repoDir ((cs $ show nId) <> "-tmp-repo.cbor") $ \fp h -> do
-    -- printDebug "[repoSaverAction]" fp
-    DBL.hPut h $ serialise a
-    hClose h
-    renameFile fp (nodeStoryPath repoDir nId)
-
-nodeStoryPath :: NodeStoryDir -> NodeId -> FilePath
-nodeStoryPath repoDir nId = repoDir <> "/" <> filename
-  where
-    filename = "repo" <> "-" <> (cs $ show nId) <> ".cbor"
-
-
-------------------------------------------------------------------------
--- TODO : repo Migration TODO TESTS
-{-
-repoMigration :: NodeStoryDir -> NgramsRepo -> IO ()
-repoMigration fp r = writeNodeStories fp (repoToNodeListStory r)
-
-repoToNodeListStory :: NgramsRepo -> NodeListStory
-repoToNodeListStory (Repo _v s h) = NodeStory $ Map.fromList ns
-  where
-    s' = ngramsState_migration      s
-    h' = ngramsStatePatch_migration h
-    ns = List.map (\(n,ns')
-                    -> (n, let hs = fromMaybe [] (Map.lookup n h') in
-                               Archive { _a_version = List.length hs
-                                       , _a_state = ns'
-                                       , _a_history = hs }
-                       )
-                  ) $ Map.toList s'
-
-ngramsState_migration :: NgramsState
-                      -> Map NodeId NgramsState'
-ngramsState_migration ns =
-  Map.fromListWith (Map.union) $ 
-  List.concat $
-    map (\(nt, nTable)
-          -> map (\(nid, table)
-                   -> (nid, Map.singleton nt table)
-                 ) $ Map.toList nTable
-        ) $ Map.toList ns
-
-
-ngramsStatePatch_migration :: [NgramsStatePatch]
-                           -> Map NodeId [NgramsStatePatch']
-ngramsStatePatch_migration np' = Map.fromListWith (<>)
-                               $ List.concat
-                               $ map toPatch np'
-  where
-    toPatch :: NgramsStatePatch -> [(NodeId, [NgramsStatePatch'])]
-    toPatch p = 
-      List.concat $
-        map (\(nt, nTable)
-              -> map (\(nid, table)
-                       -> (nid, [fst $ Patch.singleton nt table])
-                     ) $ Patch.toList nTable
-            ) $ Patch.toList p
--}
-------------------------------------------------------------------------
 
 {- | Node Story for each NodeType where the Key of the Map is NodeId
   TODO : generalize for any NodeType, let's start with NodeList which
   is implemented already
 -}
-data NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
+newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
   deriving (Generic, Show)
 
 instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
@@ -265,10 +162,18 @@ instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
 instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
 
 data Archive s p = Archive
-  { _a_version :: !Version
-  , _a_state   :: !s
-  , _a_history :: ![p]
+  { _a_version           :: !Version
+  , _a_state             :: !s
+  , _a_history           :: ![p]
     -- first patch in the list is the most recent
+    -- We use `take` in `commitStatePatch`, that's why.
+
+    -- History is immutable, we just insert things on top of existing
+    -- list.
+
+    -- We don't need to store the whole history in memory, this
+    -- structure holds only recent history, the one that will be
+    -- inserted to the DB.
   }
   deriving (Generic, Show)
 
@@ -287,44 +192,44 @@ instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
   where
     defaultFromField = fromPGSFromField
 
--- TODO Semigroup instance for unions
--- TODO check this
+-- | Combine `NgramsState'`. This is because the structure is (Map
+-- NgramsType (Map ...)) and the default `(<>)` operator is
+-- left-biased
+-- (https://hackage.haskell.org/package/containers-0.6.6/docs/Data-Map-Internal.html#v:union)
+combineState :: NgramsState' -> NgramsState' -> NgramsState'
+combineState = Map.unionWith (<>)
+
 instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
   (<>) (Archive { _a_history = p }) (Archive { _a_version = v'
                                              , _a_state = s'
-                                             , _a_history = p'}) =
+                                             , _a_history = p' }) =
     Archive { _a_version = v'
             , _a_state = s'
             , _a_history = p' <> p }
-
-instance Monoid (Archive NgramsState' NgramsStatePatch') where
+instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
   mempty = Archive { _a_version = 0
                    , _a_state = mempty
                    , _a_history = [] }
-
 instance (FromJSON s, FromJSON p) => FromJSON (Archive s p) where
   parseJSON = genericParseJSON $ unPrefix "_a_"
-
 instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
   toJSON     = genericToJSON     $ unPrefix "_a_"
   toEncoding = genericToEncoding $ unPrefix "_a_"
 
 ------------------------------------------------------------------------
-initNodeStory :: Monoid s => NodeId -> NodeStory s p
+initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
 initNodeStory ni = NodeStory $ Map.singleton ni initArchive
 
-initArchive :: Monoid s => Archive s p
-initArchive = Archive { _a_version = 0
-                      , _a_state = mempty
-                      , _a_history = [] }
+initArchive :: (Monoid s, Semigroup p) => Archive s p
+initArchive = mempty
 
 initNodeListStoryMock :: NodeListStory
 initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
   where
     nodeListId = 0
-    archive        = Archive { _a_version = 0
-                             , _a_state = ngramsTableMap
-                             , _a_history = [] }
+    archive = Archive { _a_version = 0
+                      , _a_state = ngramsTableMap
+                      , _a_history = [] }
     ngramsTableMap = Map.singleton TableNgrams.NgramsTerms
                    $ Map.fromList
                    [ (n ^. ne_ngrams, ngramsElementToRepo n)
@@ -339,3 +244,415 @@ initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
 makeLenses ''NodeStoryEnv
 makeLenses ''NodeStory
 makeLenses ''Archive
+
+-----------------------------------------
+
+
+data NodeStoryPoly nid v ngtid ngid nre =
+  NodeStoryDB { node_id             :: nid
+              , version             :: v
+              , ngrams_type_id      :: ngtid
+              , ngrams_id           :: ngid
+              , ngrams_repo_element :: nre }
+  deriving (Eq)
+
+data NodeStoryArchivePoly nid a =
+  NodeStoryArchiveDB { a_node_id :: nid
+                     , archive :: a }
+  deriving (Eq)
+
+$(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
+$(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
+
+-- type NodeStoryWrite = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
+-- type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlInt4) (Column SqlJsonb)
+
+-- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
+-- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
+
+type ArchiveList = Archive NgramsState' NgramsStatePatch'
+
+-- DB stuff
+
+runPGSExecute :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO Int64
+runPGSExecute c qs a = catch (PGS.execute c qs a) printError
+  where
+    printError (SomeException e) = do
+      --q' <- PGS.formatQuery c qs a
+      --hPutStrLn stderr q'
+      throw (SomeException e)
+
+runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
+runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
+  where
+    printError (SomeException e) = do
+      --q' <- PGS.formatQuery c qs a
+      --hPutStrLn stderr q'
+      throw (SomeException e)
+
+runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO [r]
+runPGSQuery c q a = catch (PGS.query c q a) printError
+  where
+    printError (SomeException e) = do
+      q' <- PGS.formatQuery c q a
+      hPutStrLn stderr q'
+      throw (SomeException e)
+
+runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
+runPGSAdvisoryLock c id = do
+  _ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
+  pure ()
+
+runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
+runPGSAdvisoryUnlock c id = do
+  _ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
+  pure ()
+
+runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
+runPGSAdvisoryXactLock c id = do
+  _ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
+  pure ()
+
+nodeExists :: PGS.Connection -> NodeId -> IO Bool
+nodeExists c nId = (== [PGS.Only True])
+  <$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
+
+getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
+getNodesIdWithType c nt = do
+  ns <- runPGSQuery c query (PGS.Only nt)
+  pure $ map (\(PGS.Only nId) -> NodeId nId) ns
+  where
+    query :: PGS.Query
+    query = [sql| SELECT id FROM nodes WHERE typename = ? |]
+
+
+
+-- nodeStoryTable :: Table NodeStoryRead NodeStoryWrite
+-- nodeStoryTable =
+--   Table "node_stories"
+--     ( pNodeStory NodeStoryDB { node_id             = tableField "node_id"
+--                              , version             = tableField "version"
+--                              , ngrams_type_id      = tableField "ngrams_type_id"
+--                              , ngrams_id           = tableField "ngrams_id"
+--                              , ngrams_repo_element = tableField "ngrams_repo_element"
+--                              } )
+
+-- nodeStoryArchiveTable :: Table NodeStoryArchiveRead NodeStoryArchiveWrite
+-- nodeStoryArchiveTable =
+--   Table "node_story_archive_history"
+--     ( pNodeArchiveStory NodeStoryArchiveDB { a_node_id = tableField "node_id"
+--                                            , archive   = tableField "archive" } )
+
+-- nodeStorySelect :: Select NodeStoryRead
+-- nodeStorySelect = selectTable nodeStoryTable
+
+-- NOTE "first patch in the _a_history list is the most recent"
+getNodeArchiveHistory :: PGS.Connection -> NodeId -> IO [NgramsStatePatch']
+getNodeArchiveHistory c nodeId = do
+  as <- runPGSQuery c query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
+  pure $ (\(ngramsType, terms, patch) -> fst $ PM.singleton ngramsType (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> as
+  where
+    query :: PGS.Query
+    query = [sql| SELECT ngrams_type_id, terms, patch
+                    FROM node_story_archive_history
+                    JOIN ngrams ON ngrams.id = ngrams_id
+                    WHERE node_id = ?
+                    ORDER BY version DESC |]
+
+ngramsIdQuery :: PGS.Query
+ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
+
+
+insertNodeArchiveHistory :: PGS.Connection -> NodeId -> Version -> [NgramsStatePatch'] -> IO ()
+insertNodeArchiveHistory _ _ _ [] = pure ()
+insertNodeArchiveHistory c nodeId version (h:hs) = do
+  let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
+                           (\(term, p) ->
+                              (nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
+  tuplesM <- mapM (\(nId, nType, term, patch) -> do
+                      ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
+                      pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
+                      ) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
+  _ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> (catMaybes tuplesM))
+  _ <- insertNodeArchiveHistory c nodeId version hs
+  pure ()
+  where
+
+    query :: PGS.Query
+    query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version) VALUES (?, ?, ?, ?, ?) |]
+
+getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
+getNodeStory c nId@(NodeId nodeId) = do
+  --res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
+  res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
+  -- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
+  -- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
+  let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
+                      Archive { _a_version = version
+                              , _a_history = []
+                              , _a_state   = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
+  -- NOTE When concatenating, check that the same version is for all states
+  pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
+  --pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
+  where
+    -- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
+    combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
+                       & a_version .~ (a2 ^. a_version)  -- version should be updated from list, not taken from the empty Archive
+
+nodeStoriesQuery :: PGS.Query
+nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_element
+                           FROM node_stories
+                           JOIN ngrams ON ngrams.id = ngrams_id
+                           WHERE node_id = ? |]
+
+type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
+
+-- Functions to convert archive state (which is a Map NgramsType (Map
+-- NgramsTerm NgramsRepoElement)) to/from a flat list
+archiveStateAsList :: NgramsState' -> ArchiveStateList
+archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
+
+archiveStateFromList :: ArchiveStateList -> NgramsState'
+archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
+
+-- | This function inserts whole new node story and archive for given node_id.
+insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
+insertNodeStory c (NodeId nId) a = do
+  _ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
+                termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
+                case headMay termIdM of
+                  Nothing -> pure 0
+                  Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
+             -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
+
+  pure ()
+  where
+    query :: PGS.Query
+    query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) VALUES (?, ?, ?, ?) |]
+    -- insert ngramsType ngrams ngramsRepoElement =
+    --   Insert { iTable      = nodeStoryTable
+    --          , iRows       = [NodeStoryDB { node_id = sqlInt4 nId
+    --                                       , version = sqlInt4 _a_version
+    --                                       , ngrams_type_id = sqlInt4 $ TableNgrams.ngramsTypeId ngramsType
+    --                                       , ngrams_id = ...
+    --                                       , ngrams_repo_element = sqlValueJSONB ngramsRepoElement
+    --                                       }]
+    --          , iReturning  = rCount
+    --          , iOnConflict = Nothing }
+
+insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
+insertArchiveList c nodeId a = do
+  _ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, a ^. a_version, nt, nre, n)) (archiveStateAsList $ a ^. a_state)
+  --_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
+  pure ()
+  where
+    query :: PGS.Query
+    query = [sql| INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
+                    SELECT ?, ?, ?, ngrams.id, ? FROM ngrams WHERE terms = ? |]
+
+deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
+deleteArchiveList c nodeId a = do
+  _ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) (archiveStateAsList $ a ^. a_state)
+  --_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
+  pure ()
+  where
+    query :: PGS.Query
+    query = [sql| WITH (SELECT id FROM ngrams WHERE terms = ?) AS ngrams
+                  DELETE FROM node_stories
+                    WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
+
+updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
+updateArchiveList c nodeId a = do
+  let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
+  --q <- PGS.format c query params
+  --printDebug "[updateArchiveList] query" q
+  _ <- mapM (\p -> runPGSExecute c query p) params
+  pure ()
+  where
+    query :: PGS.Query
+    query = [sql| UPDATE node_stories
+                    SET ngrams_repo_element = ?
+                    WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
+
+-- | This function updates the node story and archive for given node_id.
+updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
+updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
+  -- STEPS
+
+  -- 0. We assume we're inside an advisory lock
+
+  -- 1. Find differences (inserts/updates/deletes)
+  let currentList = archiveStateAsList $ currentArchive ^. a_state
+  let newList = archiveStateAsList $ newArchive ^. a_state
+  let currentSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> currentList
+  let newSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> newList
+
+  let inserts = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference newSet currentSet) newList
+  --printDebug "[updateNodeStory] inserts" inserts
+  let deletes = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference currentSet newSet) currentList
+  --printDebug "[updateNodeStory] deletes" deletes
+
+  -- updates are the things that are in new but not in current
+  let updates = Set.toList $ Set.difference (Set.fromList newList) (Set.fromList currentList)
+  --printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
+
+  -- 2. Perform inserts/deletes/updates
+  printDebug "[updateNodeStory] applying insert" ()
+  insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
+                                       , _a_history = []
+                                       , _a_state = archiveStateFromList inserts }
+  printDebug "[updateNodeStory] insert applied" ()
+    --TODO Use currentArchive ^. a_version in delete and report error
+  -- if entries with (node_id, ngrams_type_id, ngrams_id) but
+  -- different version are found.
+  deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
+                                       , _a_history = []
+                                       , _a_state = archiveStateFromList deletes }
+  printDebug "[updateNodeStory] delete applied" ()
+  updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
+                                       , _a_history = []
+                                       , _a_state = archiveStateFromList updates }
+  printDebug "[updateNodeStory] update applied" ()
+
+  pure ()
+  -- where
+  --   update = Update { uTable      = nodeStoryTable
+  --                   , uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) ->
+  --                                                 NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
+  --                                                                                                                               , ..}
+  --                                                                                           , .. })
+  --                   , uWhere      = (\row -> node_id row .== sqlInt4 nId)
+  --                   , uReturning  = rCount }
+
+-- nodeStoryRemove :: Pool PGS.Connection -> NodeId -> IO Int64
+-- nodeStoryRemove pool (NodeId nId) = withResource pool $ \c -> runDelete c delete
+--   where
+--     delete = Delete { dTable     = nodeStoryTable
+--                     , dWhere     = (\row -> node_id row .== sqlInt4 nId)
+--                     , dReturning = rCount }
+
+upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
+upsertNodeStories c nodeId@(NodeId nId) newArchive = do
+  printDebug "[upsertNodeStories] START nId" nId
+  PGS.withTransaction c $ do
+    printDebug "[upsertNodeStories] locking nId" nId
+    runPGSAdvisoryXactLock c nId
+
+    -- whether it's insert or update, we can insert node archive history already
+    -- NOTE: It is assumed that the most recent change is the first in the
+    -- list, so we save these in reverse order
+    insertNodeArchiveHistory c nodeId (newArchive ^. a_version) $ reverse $ newArchive ^. a_history
+
+    (NodeStory m) <- getNodeStory c nodeId
+    case Map.lookup nodeId m of
+      Nothing -> do
+        _ <- insertNodeStory c nodeId newArchive
+        pure ()
+      Just currentArchive  -> do
+        _ <- updateNodeStory c nodeId currentArchive newArchive
+        pure ()
+
+    printDebug "[upsertNodeStories] STOP nId" nId
+
+writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
+writeNodeStories c (NodeStory nls) = do
+  _ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
+  pure ()
+
+-- | Returns a `NodeListStory`, updating the given one for given `NodeId`
+nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
+nodeStoryInc c Nothing nId = getNodeStory c nId
+nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
+  case Map.lookup nId nls of
+    Nothing -> do
+      (NodeStory nls') <- getNodeStory c nId
+      pure $ NodeStory $ Map.union nls nls'
+    Just _ -> pure ns
+
+nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
+nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
+nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
+nodeStoryIncs c Nothing (ni:ns) = do
+  m <- getNodeStory c ni
+  nodeStoryIncs c (Just m) ns
+
+-- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
+-- nodeStoryDec pool ns@(NodeStory nls) ni = do
+--   case Map.lookup ni nls of
+--     Nothing -> do
+--       _ <- nodeStoryRemove pool ni
+--       pure ns
+--     Just _ -> do
+--       let ns' = Map.filterWithKey (\k _v -> k /= ni) nls
+--       _ <- nodeStoryRemove pool ni
+--       pure $ NodeStory ns'
+------------------------------------
+
+readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
+readNodeStoryEnv pool = do
+  mvar <- nodeStoryVar pool Nothing []
+  saver <- mkNodeStorySaver pool mvar
+  -- let saver = modifyMVar_ mvar $ \mv -> do
+  --       writeNodeStories pool mv
+  --       printDebug "[readNodeStoryEnv] saver" mv
+  --       let mv' = clearHistory mv
+  --       printDebug "[readNodeStoryEnv] saver, cleared" mv'
+  --       return mv'
+  pure $ NodeStoryEnv { _nse_var    = mvar
+                      , _nse_saver  = saver
+                      , _nse_getter = nodeStoryVar pool (Just mvar) }
+
+nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
+nodeStoryVar pool Nothing nIds = do
+  state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
+  newMVar state
+nodeStoryVar pool (Just mv) nIds = do
+  _ <- withResource pool $ \c -> modifyMVar_ mv $ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
+  pure mv
+
+-- Debounce is useful since it could delay the saving to some later
+-- time, asynchronously and we keep operating on memory only.
+mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
+mkNodeStorySaver pool mvns = mkDebounce settings
+  where
+    settings = defaultDebounceSettings
+                 { debounceAction = do
+                     -- NOTE: Lock MVar first, then use resource pool.
+                     -- Otherwise we could wait for MVar, while
+                     -- blocking the pool connection.
+                     modifyMVar_ mvns $ \ns -> do
+                       withResource pool $ \c -> do
+                         --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
+                         writeNodeStories c ns
+                         pure $ clearHistory ns
+                     --withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
+                 , debounceFreq = 1*minute
+                 }
+    minute = 60*second
+    second = 10^(6 :: Int)
+
+clearHistory :: NodeListStory -> NodeListStory
+clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
+  where
+    emptyHistory = [] :: [NgramsStatePatch']
+
+currentVersion :: (HasNodeStory env err m) => ListId -> m Version
+currentVersion listId = do
+  pool <- view connPool
+  nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
+  pure $ nls ^. unNodeStory . at listId . _Just . a_version
+
+
+-- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
+-- mkNodeStorySaver mvns = mkDebounce settings
+--   where
+--     settings = defaultDebounceSettings
+--                  { debounceAction = withMVar mvns (\ns -> writeNodeStories ns)
+--                  , debounceFreq = 1 * minute
+-- --                 , debounceEdge = trailingEdge -- Trigger on the trailing edge
+--                  }
+--     minute = 60 * second
+--     second = 10^(6 :: Int)
+
+
+-----------------------------------------