Simultaneous reading of a map when one background thread changes it regularly

I have a class in which I populate a liveSocketsByDatacenter card from one background thread every 30 seconds inside the updateLiveSockets() method, and then I have a getNextSocket() method that will be called by multiple read streams to get a live socket that uses that same card to get this information.

 public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>())); private final ZContext ctx = new ZContext(); // Lazy Loaded Singleton Pattern private static class Holder { private static final SocketManager instance = new SocketManager(); } public static SocketManager getInstance() { return Holder.instance; } private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { updateLiveSockets(); } }, 30, 30, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; // The map in which I put all the live sockets Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>(); for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); updatedLiveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets)); } // Update the map content this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter)); } private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) { List<SocketHolder> socketList = new ArrayList<>(); for (String address : addresses) { try { Socket client = ctx.createSocket(socketType); // Set random identity to make tracing easier String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt()); client.setIdentity(identity.getBytes(ZMQ.CHARSET)); client.setTCPKeepAlive(1); client.setSendTimeOut(7); client.setLinger(0); client.connect(address); SocketHolder zmq = new SocketHolder(client, ctx, address, true); socketList.add(zmq); } catch (Exception ex) { // log error } } return socketList; } // this method will be called by multiple threads to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional<SocketHolder> getNextSocket() { // For the sake of consistency make sure to use the same map instance // in the whole implementation of my method by getting my entries // from the local variable instead of the member variable Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = this.liveSocketsByDatacenter.get(); Optional<SocketHolder> liveSocket = Optional.absent(); List<Datacenters> dcs = Datacenters.getOrderedDatacenters(); for (Datacenters dc : dcs) { liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); if (liveSocket.isPresent()) { break; } } return liveSocket; } // is there any concurrency or thread safety issue or race condition here? private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) { if (!CollectionUtils.isEmpty(endpoints)) { // The list of live sockets List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size()); for (SocketHolder obj : endpoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element Collections.shuffle(liveOnly); return Optional.of(liveOnly.get(0)); } } return Optional.absent(); } // Added the modifier synchronized to prevent concurrent modification // it is needed because to build the new map we first need to get the // old one so both must be done atomically to prevent concistency issues private synchronized void updateLiveSockets() { Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; // Initialize my new map with the current map content Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>(this.liveSocketsByDatacenter.get()); for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); for (SocketHolder liveSocket : liveSockets) { // LINE A Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map<byte[], byte[]> holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); boolean isLive = (status) ? true : false; // is there any problem the way I am using `SocketHolder` class? SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); } this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter)); } } 

As you can see in my class:

  • From one background thread that runs every 30 seconds, I populate the liveSocketsByDatacenter card liveSocketsByDatacenter all the live sockets in the updateLiveSockets() method.
  • And then from several threads, I call the getNextSocket() method to provide me with an available socket that uses the liveSocketsByDatacenter card to get the required information.

My code works fine for me without any problems and would like to see if there is a better or more efficient way to write this. I also wanted to get an opinion on thread safety problems or any race conditions, if any, but so far I have not seen them, but I could be wrong.

I am mainly concerned about the updateLiveSockets() and getLiveSocketX() . I repeat liveSockets , which is a List of SocketHolder in LINE A, and then creates a new SocketHolder object and adds it to another new list. Is everything all right here?

Note. SocketHolder is an immutable class. And you can ignore the ZeroMQ that I have.

+9
java hashmap multithreading thread-safety race-condition
29 Oct '17 at 7:51 on
source share
2 answers

The following synchronization methods are used.

  • The map with real-time data is located behind the atomic link, which allows you to safely switch the map.
  • The updateLiveSockets() method is synchronized (implicitly on this), this will prevent two threads from switching the card at the same time.
  • You use a local map link when using it to avoid mixing if the switch occurs during the getNextSocket() method.

Is it thread safe as it is now?

Thread safety is always dependent on having proper synchronization for shared mutable data. In this case, the general modified data is a map of the data centers in their SocketHolders list.

The fact that the card is in AtomicReference , and for the local copy to use enough synchronization on the card. Your methods use the map version and use it, while the switch versions are thread safe due to the nature of AtomicReference . This could also be done simply by making a member field for the volatile map, since all you do is update the link (you do not perform any check-then-act operations on it).

As scheduleAtFixedRate() ensures that the passed Runnable will not start simultaneously with itself, synchronized on updateLiveSockets() not needed, however it also does no real harm.

So this class is thread safe as it is.

However, it is not clear whether SocketHolder can be used by multiple threads at the same time. Be that as it may, this class is simply trying to minimize the simultaneous use of SocketHolder by choosing a random live (no need to shuffle the entire array to select one random index, though). It does nothing to prevent simultaneous use.

Can it be made more efficient?

I think this is possible. If you look at the updateLiveSockets() method, it seems to build the same card, except that the SocketHolder can have different values ​​for the isLive flag. This leads me to the conclusion that instead of switching the entire map, I just want to switch each of the lists on the map. And to change the records on the map in streaming safe mode, I can just use ConcurrentHashMap .

If I use ConcurrentHashMap and do not switch the map, but rather the values ​​on the map, I can get rid of AtomicReference .

To change the display, I can simply create a new list and put it directly on the map. This is more efficient since I publish data earlier and I create fewer objects, while my synchronization is simply based on off-the-shelf components, which ensures readability.

Here is my assembly (omitted some parts that were less relevant for brevity)

 public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap private final ZContext ctx = new ZContext(); // ... private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH); liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map } } // ... // this method will be called by multiple threads to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional<SocketHolder> getNextSocket() { for (Datacenters dc : Datacenters.getOrderedDatacenters()) { Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder> if (liveSocket.isPresent()) { return liveSocket; } } return Optional.absent(); } // is there any concurrency or thread safety issue or race condition here? private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { if (!CollectionUtils.isEmpty(listOfEndPoints)) { // The list of live sockets List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size()); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one } } return Optional.absent(); } // no need to make this synchronized private void updateLiveSockets() { Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); for (SocketHolder liveSocket : liveSockets) { // LINE A Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map<byte[], byte[]> holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); boolean isLive = (status) ? true : false; SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner. } } } 
+8
Nov 02 '17 at 0:28
source share

If SocketHolder and Datacenters, immutable, your programs look great. Here are some minor reviews.

1. Using AtomicReference

AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter

This member variable should not be wrapped in an AtomicReference. You are not performing a CAS atomic operation with it. You can simply declare volative Map<Datacenters, List<SocketHolder>> , and when reading just create a local link to it. This is enough to guarantee an atomic swap link to a new map.

2. Synchronized method

private synchronized void updateLiveSockets()

This method is called from a single thread executor, so there is no need to synchronize it.

3. Some simplifications

  • From your current use of this class, it seems that you can filter out sockets that are not alive in updateLiveSockets , avoiding filtering every time the client calls getNextSocket

  • You can replace Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS with Set<Datacenters> datacenters = Utils.SERVERS.keySet() and work with keys.

    4. Java 8

If possible, switch to Java 8. Streams with Java8 Optionally, remove a lot of template code and make your code much easier to read.

+4
30 Oct. '17 at 10:47 on
source share



All Articles