Thread Safety with ConcurrentHashMap

I have the following class. I am using ConcurrentHashMap. I have many streams writing to cards, and a timer that saves data on the card every 5 minutes. I manage to achieve thread safety using putIfAbsent () when I write entries on the map. However, when I read it and then delete all the records using the clear () method, I do not want any other thread record to appear until Im in the process of reading the contents of the map and then deleting them. Obviously, my code is not thread safe even with a synchronized (lock) {}, b / c stream, which owns the lock in the saveEntries () file, is not necessarily the same stream that writes my cards to the log () method ! If I lock all the code in log () with the same lock object!

I was wondering if there is another way to achieve thread safety without forcing synchronization with an external lock? Any help is appreciated.

public class Logging { private static Logging instance; private static final String vendor1 = "vendor1"; private static final String vendor2 = "vendor2"; private static long delay = 5 * 60 * 1000; private ConcurrentMap<String, Event> vendor1Calls = new ConcurrentHashMap<String, Event>(); private ConcurrentMap<String, Event> vendor2Calls = new ConcurrentHashMap<String, Event>(); private Timer timer; private final Object lock = new Object(); private Logging(){ timer = new Timer(); timer.schedule(new TimerTask() { public void run() { try { saveEntries(); } catch (Throwable t) { timer.cancel(); timer.purge(); } } }, 0, delay); } public static synchronized Logging getInstance(){ if (instance == null){ instance = new Logging(); } return instance; } public void log(){ ConcurrentMap<String, Event> map; String key = ""; if (vendor1.equalsIgnoreCase(engine)){ map = vendor1Calls; }else if(vendor2.equalsIgnoreCase(engine)){ map = vendor2Calls; }else{ return; } key = service + "." + method; // It would be the code if I use a regular HashMap instead of ConcurrentHashMap /*Event event = map.get(key); // Map does not contain this service.method, create an Event for the first time. if(event == null){ event = new Event(); map.put(key, event); // Map already contains this key, just adjust the numbers. }else{ // Modify the object fields }*/ //} // Make it thread-safe using CHM Event newEvent = new Event(); Event existingEvent= map.putIfAbsent(key, newEvent); if(existingEvent!=null && existingEvent!=newEvent){ // Modify the object fields } private void saveEntries(){ Map<String, List<Event>> engineCalls = null; try { engineCalls = new HashMap<String, List<Event>>(); List<Event> events = null; // How can I achieve therad safety here w/o applying any lock? //synchronized(lock){ if(!vendor1Calls.isEmpty()){ events = new ArrayList<Event>(); events.addAll(vendor1Calls.values()); engineCalls.put(vendor1, events); vendor1Calls.clear(); } if(!vendor2Calls.isEmpty()){ events = new ArrayList<Event>(); events.addAll(vendor2Calls.values()); engineCalls.put(vendor2, events); vendor2Calls.clear(); } //} // logICalls() saves the events in the DB. DBHandle.logCalls(engineCalls); } catch (Throwable t) { } finally { if(engineCalls!=null){ engineCalls.clear(); } } } 

}

+6
source share
5 answers

However, when I read it and then delete all the entries using the clear () method, I do not want any other stream to write to the card while Im in the process of reading the contents of the card and then deleting it.

I think that you are trying to say that you really do not need to strictly block cards. Instead, you really care about losing any log entries between vender1Calls.values ​​() and vendor1Calls.clear (), right?

In this case, I can imagine that you can replace

 events.addAll(vendor1Calls.values()); vendor1Calls.clear(); 

with this in saveEntries:

 for (Iterator<Event> iter = vendor1Calls.values().iterator(); iter.hasNext(); ) { Event e = iter.next(); events.add(e); iter.remove(); } 

Thus, you only delete events that you have added to the event list. You can still write vendor1Calls to the map while saveEntries () is still running, but the iterator skips the added values.

+3
source

Without any external synchronization, you cannot achieve this with CHM. The returned Iterator views are poorly matched, which means that the contents of the Map may change while you actually iterate over it.

It seems you need to use Collections.synchronizedMap to get the functionality you need.

Edit to make my point clearer:

To achieve this using synchronizedMap First you need to synchronize on the card, and then you can repeat or copy the contents to another card and then clear it.

 Map map = Collections.synchronizedMap(new HashMap()); public void work(){ Map local = new HashMap(); synchronized(map){ local.putAll(map); map.clear(); } //do work on local instance } 

Instead of a local instance, as I mentioned, you can iterate + remove similar to @Kevin Jin's answer.

+3
source

The atomic version of this example is shown in this thread (using only functions in ConcurrentMap).

0
source

It would be best to use ReadWriteLock , but since you specifically state that you do not want to use any locks (bit. ConcurrentHashMap will probably use them internally), you can try the following.

Use the AtomicReference for each of your cards, and when the time comes to register their contents, use getAndSet to replace the old card with a new empty one.

Now you have a map with exclusive use, thanks to which you can iterate and clean as much as you want. Unfortunately, there is one small problem (which will be used to get rid of the lock), and if another thread is in the process of adding to the card at a time when you change it with an empty one. You could add a delay at this point hoping to wait a long time until another thread completes what it does. There may be some internal ConcurrentHashMap functionality that you can use to wait until everyone is done with it.

0
source

The following code uses a persistent map from functional java . It uses more memory, but (AFAIK :) is safe for use by multiple threads. The only mutable value in AtomicReference and it is updated by comparison and setting. The map and event are immutable and therefore thread safe. In addition, instead of clearing the card, I am replacing the link to it.

 import fj.F; import fj.Ord; import fj.data.TreeMap; import java.util.*; import java.util.concurrent.atomic.AtomicReference; public class Logging { // Event is immutable private static class Event { // updates are done by creating new values Event update(String key) { return new Event(); } } // Refactored code pertaining to one vendor into a separate class. private static class EngineLogger { public final String vendor; private final AtomicReference<TreeMap<String, Event>> mapRef = new AtomicReference<TreeMap<String, Event>>(TreeMap.<String, Event>empty(Ord.stringOrd)); private EngineLogger(String vendor) { this.vendor = vendor; } public void log(String service, String method) { final String key = service + "." + method; boolean updated = false; while (! updated) { // get the current value of the map TreeMap<String, Event> currMap = mapRef.get(); // create an updated value of the map, which is the current map plus info about the new key TreeMap<String, Event> newMap = currMap.update(key, new F<Event, Event>() { @Override public Event f(Event event) { // Modify the object fields of event, if the map contains the key return event.update(key); } // create a new event if the map does not contain the key }, new Event()); // compare-and-set the new value in .. repeat until this succeeds updated = mapRef.compareAndSet(currMap, newMap); } } public List<Event> reset() { /* replace the reference with a new map */ TreeMap<String, Event> oldMap = mapRef.getAndSet(TreeMap.<String, Event>empty(Ord.stringOrd)); /* use the old map to generate the list */ return new ArrayList<Event>(oldMap.toMutableMap().values()); } } private static Logging instance; private static long delay = 5 * 60 * 1000; private final Timer timer; private final EngineLogger vendor1 = new EngineLogger("vendor1"); private final EngineLogger vendor2 = new EngineLogger("vendor2"); private Logging() { timer = new Timer(); timer.schedule(new TimerTask() { public void run() { try { saveEntries(); } catch (Throwable t) { timer.cancel(); timer.purge(); } } }, 0, delay); } public static synchronized Logging getInstance() { if (instance == null) { instance = new Logging(); } return instance; } public void log(String engine, String service, String method) { if (vendor1.vendor.equals(engine)) { vendor1.log(service, method); } else if (vendor2.vendor.equals(engine)) { vendor2.log(service, method); } } private void saveEntries() { Map<String, List<Event>> engineCalls = new HashMap<String, List<Event>>(); engineCalls.put(vendor1.vendor, vendor1.reset()); engineCalls.put(vendor2.vendor, vendor2.reset()); DBHandle.logCalls(engineCalls); } } 
0
source

Source: https://habr.com/ru/post/923706/


All Articles