Iterate over ConcurrentHashMap when deleting records

I want to cycle through ConcurrentHashMap periodically when deleting records, for example:

 for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) { Entry<Integer, Integer> entry = iter.next(); // do something iter.remove(); } 

The problem is that another thread may update or change values โ€‹โ€‹during iteration. If this happens, these updates may be lost forever, because my thread only sees stale values โ€‹โ€‹during iteration, but remove() removes the live record.

After some consideration, I came up with this workaround:

 map.forEach((key, value) -> { // delete if value is up to date, otherwise leave for next round if (map.remove(key, value)) { // do something } }); 

One of the problems is that it will not modify mutable values โ€‹โ€‹that do not implement equals() (e.g. AtomicInteger ). Is there a better way to safely delete while changing?

+8
java multithreading concurrenthashmap
source share
3 answers

Your workaround works, but there is one potential scenario. If some entries have constant updates, map.remove (key, value) will never return true until the update is complete.

If you are using JDK8, here is my solution

 for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) { Entry<Integer, Integer> entry = iter.next(); Map.compute(entry.getKey(), (k, v) -> f(v)); //do something for prevValue } .... private Integer prevValue; private Integer f(Integer v){ prevValue = v; return null; } 

compute () will apply f (v) to the value and in our case assigns the value to the global variable and deletes the record.

According to Javadok, he is atomic.

An attempt to calculate a mapping for the specified key and its current displayed value (or null if there is no current mapping). All method invocation is performed atomically. Some attempts to perform update operations on this map by other threads may be blocked while the calculation is in progress, so the calculation should be short and simple and should not try to update any other mappings of this Map.

+1
source share

Your workaround is actually very good. There are other objects on top of which you can build a somewhat similar solution (for example, using computeIfPresent() values โ€‹โ€‹and tombstones), but they have their own warnings, and I used them in several different use cases.

As for using a type that does not implement equals() for map values, you can use your own wrapper on top of the corresponding type. This is the easiest way to introduce custom semantics for equality of objects in atom replace / delete operations provided by ConcurrentMap .

Update

Here is a sketch that shows how you can build on top of the ConcurrentMap.remove(Object key, Object value) API:

  • Define the shell type on top of the mutable type that you use for the values, also defining your own equals() method built on top of the current mutable value.
  • In BiConsumer (the lambda that you switch to forEach ), create a deep copy of the value (like your new shell type) and follow your logic to determine if that value should be deleted on the copy.
  • If the value needs to be removed, call remove(myKey, myValueCopy) .
    • If it was necessary to remove(myKey, myValueCopy) while calculating any number of simultaneous changes, remove(myKey, myValueCopy) will return false (banning ABA problems, which are a separate topic).

Here are some examples to illustrate this:

 import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; public class Playground { private static class AtomicIntegerWrapper { private final AtomicInteger value; AtomicIntegerWrapper(int value) { this.value = new AtomicInteger(value); } public void set(int value) { this.value.set(value); } public int get() { return this.value.get(); } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (!(obj instanceof AtomicIntegerWrapper)) { return false; } AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj; if (other.value.get() == this.value.get()) { return true; } return false; } public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) { int wrapped = wrapper.get(); return new AtomicIntegerWrapper(wrapped); } } private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP = new ConcurrentHashMap<>(); private static final int NUM_THREADS = 3; public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; ++i) { MAP.put(i, new AtomicIntegerWrapper(1)); } Thread.sleep(1); for (int i = 0; i < NUM_THREADS; ++i) { new Thread(() -> { Random rnd = new Random(); while (!MAP.isEmpty()) { MAP.forEach((key, value) -> { AtomicIntegerWrapper elem = MAP.get(key); if (elem == null) { System.out.println("Oops..."); } else if (elem.get() == 1986) { elem.set(1); } else if ((rnd.nextInt() & 128) == 0) { elem.set(1986); } }); } }).start(); } Thread.sleep(1); new Thread(() -> { Random rnd = new Random(); while (!MAP.isEmpty()) { MAP.forEach((key, value) -> { AtomicIntegerWrapper elem = AtomicIntegerWrapper.deepCopy(MAP.get(key)); if (elem.get() == 1986) { try { Thread.sleep(10); } catch (Exception e) {} boolean replaced = MAP.remove(key, elem); if (!replaced) { System.out.println("Bailed out!"); } else { System.out.println("Replaced!"); } } }); } }).start(); } } 

You will see printouts of "Bailed out!" mixed with "Replaced!". (the deletion was successful, since there were no parallel updates you care about), and the calculation will stop at some point.

  • If you delete the custom equals() method and continue to use the copy, you will see an endless stream of "Bailed out!" because the copy is never considered equal to the value on the map.
  • If you are not using a copy, you will not see "Bailed out!" printed, and you run into a problem that you explain - the values โ€‹โ€‹are deleted regardless of the simultaneous changes.
+1
source share

Let's look at what options you have.

  • Create your own container class using the isUpdated() operation and use your own traversal method.

  • If your map contains only a few elements, and you often repeat the map compared to the put / delete operation. It might be a good choice to use CopyOnWriteArrayList CopyOnWriteArrayList<Entry<Integer, Integer>> lookupArray = ...;

  • Another option is to implement your own CopyOnWriteMap

     public class CopyOnWriteMap<K, V> implements Map<K, V>{ private volatile Map<K, V> currentMap; public V put(K key, V value) { synchronized (this) { Map<K, V> newOne = new HashMap<K, V>(this.currentMap); V val = newOne.put(key, value); this.currentMap = newOne; // atomic operation return val; } } public V remove(Object key) { synchronized (this) { Map<K, V> newOne = new HashMap<K, V>(this.currentMap); V val = newOne.remove(key); this.currentMap = newOne; // atomic operation return val; } } [...] } 

There is a negative side effect. If you use the "Copy-on-Write" collection, your updates will never be lost, but you can see the previous deleted record again.

Worst case: deleted recording will be restored every time if the card is copied.

0
source share

All Articles