STM-friendly list as change log

I need advice on the data structure to use as an atom change log.

I am trying to implement the following algorithm. There is a stream of incoming changes updating the memory card. In a Haskell-like pseudo-code, this

update :: DataSet -> SomeListOf Change -> Change -> STM (DataSet, SomeListOf Change) update dataSet existingChanges newChange = do ... return (dataSet, existingChanges ++ [newChange]) 

where the DataSet is a map (currently it is a map from a package of stm containers, https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Map.html ). All "update" is called from an arbitrary number of threads. Some of the changes may be rejected due to the semantics of the domain; I use throwSTM to throw away the effect of the transaction. In case of successful commit, "newChange" is added to the list.

There is a separate thread that calls the following function:

  flush :: STM (DataSet, SomeListOf Change) -> IO () 

this function should take the current snapshot of the DataSet along with the list of changes (it should have a consistent pair) and dump it into the file system, i.e.

  flush data = do (dataSet, changes) <- atomically $ readTVar data_ -- write them both to FS -- ... atomically $ writeTVar data_ (dataSet, []) 

I need advice on the data structure used for "SomeListOf Change." I do not want to use [Change] because it is "too streamlined" and I am afraid that there will be too many conflicts, which will cause the whole transaction to retry. Please correct me if I am wrong.

I can't use Set ( https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Set.html ) because I still need to keep some order, for example. transaction procedure. I could use TChan for it, and it seems like a good match (the transaction order is exactly the same), but I don’t know how to implement the “flush” function so that it provides a consistent view of the entire change log with the DataSet.

The current implementation of this is here https://github.com/lolepezy/rpki-pub-server/blob/add-storage/src/RRDP/Repo.hs in the applyActionsToState and rrdpSyncThread functions, respectively. It uses TChan and seems to be doing it wrong.

Thanks in advance.

Update: A reasonable answer seems such that

  type SomeListOf c = TChan [c] update :: DataSet -> TChan [Change] -> Change -> STM DataSet update dataSet existingChanges newChange = do ... writeTChan changeChan $ reverse (newChange : existingChanges) return dataSet flush data_ = do (dataSet, changes) <- atomically $ (,) <$> readTVar data_ <*> readTChan changeChan -- write them both to FS -- ... 

But I'm still not sure if this is a neat decision to pass the whole list as a channel element.

+7
haskell stm
source share
2 answers

I would most likely go with the list and see how far it is suitable for performance. With this in mind, you should keep in mind that both are adding O (n) operations to the end of the list and changing it, so you should avoid this. Perhaps you can just add incoming changes as follows:

 update dataSet existingChanges newChange = do -- ... return (dataSet, newChange : existingChanges) 

Also, your flush example has a problem in that reading and updating state is not atomic at all. You must accomplish this using a single atomically call, for example:

 flush data = do (dataSet, changes) <- atomically $ do result <- readTVar data_ writeTVar data_ (dataSet, []) return result -- write them both to FS -- ... 

Then you can simply write them in reverse order (because now changes contains elements from the newest to the oldest) or the reverse here once, if it is important to write them from the oldest to the new. If this is important, I would probably go with some data structure that allows you to use the O (1) element as a good old vector.

When using a fixed-size vector, you obviously have to face the problem that it might become “full”, which would mean that your authors would have to wait for flush to do this work before adding new changes. That's why I personally went to a simple list and see if there is enough or where it needs to be improved.

PS: A dequeue may be appropriate for your problem, but with a fixed size, you are forced to solve the problem that your authors can potentially make more changes than your reader can discard. Dequeue may grow indefinitely, but you may have your RAM. And the vector has a pretty low overhead.

+3
source share

I did a somewhat (very simplified) investigation https://github.com/lolepezy/rpki-pub-server/tree/add-storage/test/changeLog imitating exactly the kind of load that I supposedly will have. I used the same STMContainers.Map for the data set and the usual list for the change log. To track the number of transaction attempts, I used Debug.Trace.trace, i.e. the number of lines printed along the track. And the number of unique lines printed by the trace gives me the number of transactions completed.

The result is here ( https://github.com/lolepezy/rpki-pub-server/blob/add-storage/test/changeLog/numbers.txt ). The first column is the number of threads, the second is the number of sums of changes generated. The third column is the number of trace calls for the case without a change log, and the last is the number of trace calls in the change log.

Most of the time change logs seem to add extra repetitions, but this is pretty much insignificant. Therefore, I think it is fair to say that any data structure will be quite good, because most of the work is related to updating the map, and because of this, most of the attempts occur.

0
source share

All Articles