Changing a hash map from one thread and reading from multiple threads

I have a class in which I populate a liveSocketsByDatacenter card from one background thread every 30 seconds, and then I have a getNextSocket method that will be called by multiple read streams to get an available socket that uses the same card to get this information.

 public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>(); private final ZContext ctx = new ZContext(); // Lazy Loaded Singleton Pattern private static class Holder { private static final SocketManager instance = new SocketManager(); } public static SocketManager getInstance() { return Holder.instance; } private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { updateLiveSockets(); } }, 30, 30, TimeUnit.SECONDS); } private void connectToZMQSockets() { Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); } } private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) { List<SocketHolder> socketList = new ArrayList<>(); for (String address : addresses) { try { Socket client = ctx.createSocket(socketType); // Set random identity to make tracing easier String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt()); client.setIdentity(identity.getBytes(ZMQ.CHARSET)); client.setTCPKeepAlive(1); client.setSendTimeOut(7); client.setLinger(0); client.connect(address); SocketHolder zmq = new SocketHolder(client, ctx, address, true); socketList.add(zmq); } catch (Exception ex) { // log error } } return socketList; } // this method will be called by multiple threads to get the next live socket public Optional<SocketHolder> getNextSocket() { Optional<SocketHolder> liveSocket = Optional.absent(); List<Datacenters> dcs = Datacenters.getOrderedDatacenters(); for (Datacenters dc : dcs) { liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); if (liveSocket.isPresent()) { break; } } return liveSocket; } private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { if (!CollectionUtils.isEmpty(listOfEndPoints)) { Collections.shuffle(listOfEndPoints); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { return Optional.of(obj); } } } return Optional.absent(); } private void updateLiveSockets() { Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); for (SocketHolder liveSocket : liveSockets) { Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map<byte[], byte[]> holder = populateMap(); boolean status = SendToSocket.getInstance().execute(3, holder, socket); boolean isLive = (status) ? true : false; SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), liveUpdatedSockets); } } } 

As you can see in my class above:

  • From one background thread that starts every 30 seconds, I populate the liveSocketsByDatacenter card liveSocketsByDatacenter all the live sockets.
  • And then from several threads, I call the getNextSocket method to provide me with affordable cellular access, which uses the liveSocketsByDatacenter card to get the necessary information.

I'm sure my code stream is higher and all reader threads will see liveSocketsByDatacenter ? Since I change the liveSocketsByDatacenter card every 30 seconds from one background thread and then from many read threads, I call the getNextSocket method, so I'm not sure if I did something wrong.

It looks like there might be a thread safety issue in my getLiveSocket method, since each reading gets a shared ArrayList from the map and moves it? And there may be a few more places that I might have missed. What is the best way to fix these thread safety issues in my code?

If there is a better way to rewrite this, I am also open to it.

+4
source share
4 answers

To be thread safe, your code must synchronize any access to all common mutable states.

Here you share liveSocketsByDatacenter , a HashMap instance of an unsafe Map implementation that can potentially be read simultaneously (via updateLiveSockets and getNextSocket ) and modified (via updateLiveSockets and updateLiveSockets ) without synchronizing any access that is already enough to make your code unsafe. Moreover, the values โ€‹โ€‹of this Map are instances of ArrayList a non thread-safe List implementation, which can also be read at the same time ( getNextSocket and updateLiveSockets ) and more precisely ( getLiveSocket ) Collections.shuffle ).

An easy way to fix your thread 2 security issues could be:

  • use ConcurrentHashMap instead of HashMap for your liveSocketsByDatacenter variable, as it is a robust Map implementation .
  • put the unmodifiable version of your ArrayList instances as the value of your map using Collections.unmodifiableList(List<? extends T> list) , then your lists will be immutable, so the streams will be safe.

For instance:

 liveSocketsByDatacenter.put( entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets) );` 
  1. getLiveSocket your getLiveSocket method to not call Collections.shuffle directly on your list, for example, you can only shuffle the list of live sockets instead of all sockets or use a copy of your list (for example, new ArrayList<>(listOfEndPoints) ) instead of the list itself.

For instance:

 private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { if (!CollectionUtils.isEmpty(listOfEndPoints)) { // The list of live sockets List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size()); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element Collections.shuffle(liveOnly); return Optional.of(liveOnly.get(0)); } } return Optional.absent(); } 

For # 1, as you seem to read often and rarely (only once every 30 seconds) change your map, you can think about restoring your map and then share your immutable version (using Collections.unmodifiableMap(Map<? extends K,? extends V> m) ) every 30 seconds, this approach is very effective in the main reading scenario, since you no longer pay the price for any synchronization mechanism to access the contents of your map.

Then your code will look like this:

 // Your variable is no more final, it is now volatile to ensure that all // threads will see the same thing at all time by getting it from // the main memory instead of the CPU cache private volatile Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = Collections.unmodifiableMap(new HashMap<>()); private void connectToZMQSockets() { Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; // The map in which I put all the live sockets Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>(); for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> addedColoSockets = connect( entry.getKey(), entry.getValue(), ZMQ.PUSH ); liveSockets.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets)); } // Set the new content of my map as an unmodifiable map this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSockets); } public Optional<SocketHolder> getNextSocket() { // For the sake of consistency make sure to use the same map instance // in the whole implementation of my method by getting my entries // from the local variable instead of the member variable Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = this.liveSocketsByDatacenter; ... } ... // Added the modifier synchronized to prevent concurrent modification // it is needed because to build the new map we first need to get the // old one so both must be done atomically to prevent concistency issues private synchronized void updateLiveSockets() { // Initialize my new map with the current map content Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>(this.liveSocketsByDatacenter); Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; // The map in which I put all the live sockets Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>(); for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { ... liveSockets.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); } // Set the new content of my map as an unmodifiable map this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSocketsByDatacenter); } 

Your liveSocketsByDatacenter field liveSocketsByDatacenter also be of type AtomicReference<Map<Datacenters, List<SocketHolder>>> , then it will be final , your card will still be stored in the volatile variable, but in the AtomicReference class.

Then the previous code:

 private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>())); ... private void connectToZMQSockets() { ... // Update the map content this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSockets)); } public Optional<SocketHolder> getNextSocket() { // For the sake of consistency make sure to use the same map instance // in the whole implementation of my method by getting my entries // from the local variable instead of the member variable Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = this.liveSocketsByDatacenter.get(); ... } // Added the modifier synchronized to prevent concurrent modification // it is needed because to build the new map we first need to get the // old one so both must be done atomically to prevent concistency issues private synchronized void updateLiveSockets() { // Initialize my new map with the current map content Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>(this.liveSocketsByDatacenter.get()); ... // Update the map content this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter)); } 
+1
source

It seems that you can safely use ConcurrentHashMap here instead of the usual HashMap , and it should work.

In your current approach, using a regular HashMap, you need to synchronize the methods:

getNextSocket , getNextSocket and updateLiveSockets (wherever you update or read the HashMap), like the word sychronized before these methods or another lock on the monitor, common to all these methods. And this is not due to ConcurrentModificationException , but since without synchronous reading, threads can see non-updated values.

There is also a problem with concurrent modification in getLiveSocket, one of the easiest ways to avoid this problem is to copy listOfEndpoints to a new list before shuffling, for example:

 private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> endPoints) { List<SocketHolder> listOfEndPoints = new ArrayList<SocketHolder>(endPoints); if (!CollectionUtils.isEmpty(listOfEndPoints)) { Collections.shuffle(listOfEndPoints); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { return Optional.of(obj); } } } return Optional.absent(); } 
+1
source

As you can read in detail, for example. here , if several threads access the hash map at the same time, and at least one of the flows changes the structure structurally, it must be synchronized from the outside to avoid an inconsistent view of the contents. Thus, to be thread safe, you must use the Java Collections synchronizedMap () or ConcurrentHashMap method.

 //synchronizedMap private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = Collections.synchronizedMap(new HashMap<Datacenters, List<SocketHolder>>()); 

or

 //ConcurrentHashMap private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<Datacenters, List<SocketHolder>>(); 

Since you have a very strong simultaneous change and reading of the key value in different streams, you should also look at the Producer-Consumer principle, for example. here .

+1
source

Using ConcurrentHashMap should make your code thread safe. Alternatively, use synchronized methods to access an existing hashmap.

-1
source

All Articles