Process items in the queue when items are added inside the while loop

I have a method in which I listen to UDP packets in a while loop. I want to parse packages using another method in another class as they arrive and do many different parsing and analysis of each package in another part of the application. I think it would be better if the PacketParser methods processed the queue out of loop. Is it possible to simply add packets to Queue as they arrive, and then add another part of the application to listen to the elements as they arrive in the queue and perform other actions as the original, while the loop continues to listen to the packets and adds them to the queue? I would like another function to control the queue and process packets, is there anything in Java for monitoring Queue or Stack ? Is there a better way to do this?

 public void read(String multicastIpAddress, int multicastPortNumber) { PacketParser parser = new PacketParser(logger); InetAddress multicastAddress = null; MulticastSocket multicastSocket = null; final int PortNumber = multicastPortNumber; try { multicastAddress = InetAddress.getByName(multicastIpAddress); multicastSocket = new MulticastSocket(PortNumber); String hostname = InetAddress.getLocalHost().getHostName(); byte[] buffer = new byte[8192]; multicastSocket.joinGroup(multicastAddress); System.out.println("Listening from " + hostname + " at " + multicastAddress.getHostName()); int numberOfPackets = 0; while (true) { numberOfPackets++; DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length); multicastSocket.receive(datagramPacket); // add to queue for another function to process the packets } } catch (SocketException socketException) { System.out.println("Socket exception " + socketException); } catch (IOException exception) { System.out.println("Exception " + exception); } finally { if (multicastSocket != null) { try { multicastSocket.leaveGroup(multicastAddress); multicastSocket.close(); } catch (IOException exception) { System.out.println(exception.toString()); } } } } 
source share
1 answer

Okay, so I read something about the consumer-producer pattern and figured it out, that's what I did.

In principle, the producer-consumer pattern includes three things: producer, consumer, and the general queue. In this context, PacketReader is a manufacturer that accepts network packets and puts them in a shared queue. PacketParser - A user who processes packets in a shared queue. So I created an instance of LinkedBlockingQueue and passed this shared queue to the user instance (PacketReader) and the manufacturer instance (PacketParser). Then consumer and producer instances are passed to the Thread class instance. Finally, call the start() method for each instance of the stream.

 public class Main { public static void main(String[] args) { BlockingQueue<Packet> queue = new LinkedBlockingQueue<>(); ILogger logger = Injector.getLogger(); Thread reader = new Thread(new PacketReader(logger, queue, "", 49410)); Thread parser = new Thread(new PacketParser(logger, queue)); reader.start(); parser.start(); } } 

The reason for using LinkedBlockingQueue is that the put() method blocks the queue if full and take() will block if the queue is empty. Manufacturer and consumer profiles must implement the Runnable interface and contain a method called run() that does not accept any parameters.

User class

 public class PacketParser implements Runnable { private ILogger logger; private BlockingQueue<Packet> queue; private boolean running = true; public PacketParser(ILogger logger, BlockingQueue<Packet> queue) { this.logger = logger; this.queue = queue; } public void stop() { running = false; } public void run() { while (running) { Packet packet; try { packet = queue.take(); parse(packet); } catch (InterruptedException exception) { logger.Log(exception.getStackTrace().toString()); } } } 

Manufacturer Class

 public class PacketReader implements Runnable { private ILogger logger; private final Queue<Packet> queue; private String multicastIpAddress; private int multicastPortNumber; private boolean running = true; public PacketReader(ILogger logger, Queue<Packet> queue, String multicastIpAddress, int multicastPortNumber) { this.logger = logger; this.queue = queue; this.multicastIpAddress = multicastIpAddress; this.multicastPortNumber = multicastPortNumber; } public void stop() { running = false; } public void run() { InetAddress multicastAddress = null; MulticastSocket multicastSocket = null; try { multicastAddress = InetAddress.getByName(multicastIpAddress); multicastSocket = new MulticastSocket(multicastPortNumber); String hostname = InetAddress.getLocalHost().getHostName(); byte[] buffer = new byte[8192]; multicastSocket.joinGroup(multicastAddress); System.out.println("Listening from " + hostname + " at " + multicastAddress.getHostName()); int numberOfPackets = 0; while (running) { numberOfPackets++; DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length); multicastSocket.receive(datagramPacket); Packet packet = new Packet(numberOfPackets, datagramPacket); queue.add(packet); } } catch (SocketException socketException) { System.out.println("Socket exception " + socketException); } catch (IOException exception) { System.out.println("Exception " + exception); } finally { if (multicastSocket != null) { try { multicastSocket.leaveGroup(multicastAddress); multicastSocket.close(); } catch (IOException exception) { System.out.println(exception.toString()); } } } } } 