I need advice on the data structure to use as an atom change log.
I am trying to implement the following algorithm. There is a stream of incoming changes updating the memory card. In a Haskell-like pseudo-code, this
update :: DataSet -> SomeListOf Change -> Change -> STM (DataSet, SomeListOf Change) update dataSet existingChanges newChange = do ... return (dataSet, existingChanges ++ [newChange])
where the DataSet is a map (currently it is a map from a package of stm containers, https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Map.html ). All "update" is called from an arbitrary number of threads. Some of the changes may be rejected due to the semantics of the domain; I use throwSTM to throw away the effect of the transaction. In case of successful commit, "newChange" is added to the list.
There is a separate thread that calls the following function:
flush :: STM (DataSet, SomeListOf Change) -> IO ()
this function should take the current snapshot of the DataSet along with the list of changes (it should have a consistent pair) and dump it into the file system, i.e.
flush data = do (dataSet, changes) <- atomically $ readTVar data_
I need advice on the data structure used for "SomeListOf Change." I do not want to use [Change] because it is "too streamlined" and I am afraid that there will be too many conflicts, which will cause the whole transaction to retry. Please correct me if I am wrong.
I can't use Set ( https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Set.html ) because I still need to keep some order, for example. transaction procedure. I could use TChan for it, and it seems like a good match (the transaction order is exactly the same), but I don’t know how to implement the “flush” function so that it provides a consistent view of the entire change log with the DataSet.
The current implementation of this is here https://github.com/lolepezy/rpki-pub-server/blob/add-storage/src/RRDP/Repo.hs in the applyActionsToState and rrdpSyncThread functions, respectively. It uses TChan and seems to be doing it wrong.
Thanks in advance.
Update: A reasonable answer seems such that
type SomeListOf c = TChan [c] update :: DataSet -> TChan [Change] -> Change -> STM DataSet update dataSet existingChanges newChange = do ... writeTChan changeChan $ reverse (newChange : existingChanges) return dataSet flush data_ = do (dataSet, changes) <- atomically $ (,) <$> readTVar data_ <*> readTChan changeChan
But I'm still not sure if this is a neat decision to pass the whole list as a channel element.