The following synchronization methods are used.
- The map with real-time data is located behind the atomic link, which allows you to safely switch the map.
- The
updateLiveSockets() method is synchronized (implicitly on this), this will prevent two threads from switching the card at the same time. - You use a local map link when using it to avoid mixing if the switch occurs during the
getNextSocket() method.
Is it thread safe as it is now?
Thread safety is always dependent on having proper synchronization for shared mutable data. In this case, the general modified data is a map of the data centers in their SocketHolders list.
The fact that the card is in AtomicReference , and for the local copy to use enough synchronization on the card. Your methods use the map version and use it, while the switch versions are thread safe due to the nature of AtomicReference . This could also be done simply by making a member field for the volatile map, since all you do is update the link (you do not perform any check-then-act operations on it).
As scheduleAtFixedRate() ensures that the passed Runnable will not start simultaneously with itself, synchronized on updateLiveSockets() not needed, however it also does no real harm.
So this class is thread safe as it is.
However, it is not clear whether SocketHolder can be used by multiple threads at the same time. Be that as it may, this class is simply trying to minimize the simultaneous use of SocketHolder by choosing a random live (no need to shuffle the entire array to select one random index, though). It does nothing to prevent simultaneous use.
Can it be made more efficient?
I think this is possible. If you look at the updateLiveSockets() method, it seems to build the same card, except that the SocketHolder can have different values ββfor the isLive flag. This leads me to the conclusion that instead of switching the entire map, I just want to switch each of the lists on the map. And to change the records on the map in streaming safe mode, I can just use ConcurrentHashMap .
If I use ConcurrentHashMap and do not switch the map, but rather the values ββon the map, I can get rid of AtomicReference .
To change the display, I can simply create a new list and put it directly on the map. This is more efficient since I publish data earlier and I create fewer objects, while my synchronization is simply based on off-the-shelf components, which ensures readability.
Here is my assembly (omitted some parts that were less relevant for brevity)
public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap private final ZContext ctx = new ZContext(); // ... private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH); liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map } } // ... // this method will be called by multiple threads to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional<SocketHolder> getNextSocket() { for (Datacenters dc : Datacenters.getOrderedDatacenters()) { Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder> if (liveSocket.isPresent()) { return liveSocket; } } return Optional.absent(); } // is there any concurrency or thread safety issue or race condition here? private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { if (!CollectionUtils.isEmpty(listOfEndPoints)) { // The list of live sockets List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size()); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one } } return Optional.absent(); } // no need to make this synchronized private void updateLiveSockets() { Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); for (SocketHolder liveSocket : liveSockets) { // LINE A Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map<byte[], byte[]> holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); boolean isLive = (status) ? true : false; SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner. } } }