Okay, so I read something about the consumer-producer pattern and figured it out, that's what I did.
In principle, the producer-consumer pattern includes three things: producer, consumer, and the general queue. In this context, PacketReader
is a manufacturer that accepts network packets and puts them in a shared queue. PacketParser
- A user who processes packets in a shared queue. So I created an instance of LinkedBlockingQueue
and passed this shared queue to the user instance (PacketReader) and the manufacturer instance (PacketParser). Then consumer and producer instances are passed to the Thread
class instance. Finally, call the start()
method for each instance of the stream.
public class Main { public static void main(String[] args) { BlockingQueue<Packet> queue = new LinkedBlockingQueue<>(); ILogger logger = Injector.getLogger(); Thread reader = new Thread(new PacketReader(logger, queue, "239.1.1.1", 49410)); Thread parser = new Thread(new PacketParser(logger, queue)); reader.start(); parser.start(); } }
The reason for using LinkedBlockingQueue
is that the put()
method blocks the queue if full and take()
will block if the queue is empty. Manufacturer and consumer profiles must implement the Runnable
interface and contain a method called run()
that does not accept any parameters.
User class
public class PacketParser implements Runnable { private ILogger logger; private BlockingQueue<Packet> queue; private boolean running = true; public PacketParser(ILogger logger, BlockingQueue<Packet> queue) { this.logger = logger; this.queue = queue; } public void stop() { running = false; } public void run() { while (running) { Packet packet; try { packet = queue.take(); parse(packet); } catch (InterruptedException exception) { logger.Log(exception.getStackTrace().toString()); } } }
Manufacturer Class
public class PacketReader implements Runnable { private ILogger logger; private final Queue<Packet> queue; private String multicastIpAddress; private int multicastPortNumber; private boolean running = true; public PacketReader(ILogger logger, Queue<Packet> queue, String multicastIpAddress, int multicastPortNumber) { this.logger = logger; this.queue = queue; this.multicastIpAddress = multicastIpAddress; this.multicastPortNumber = multicastPortNumber; } public void stop() { running = false; } public void run() { InetAddress multicastAddress = null; MulticastSocket multicastSocket = null; try { multicastAddress = InetAddress.getByName(multicastIpAddress); multicastSocket = new MulticastSocket(multicastPortNumber); String hostname = InetAddress.getLocalHost().getHostName(); byte[] buffer = new byte[8192]; multicastSocket.joinGroup(multicastAddress); System.out.println("Listening from " + hostname + " at " + multicastAddress.getHostName()); int numberOfPackets = 0; while (running) { numberOfPackets++; DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length); multicastSocket.receive(datagramPacket); Packet packet = new Packet(numberOfPackets, datagramPacket); queue.add(packet); } } catch (SocketException socketException) { System.out.println("Socket exception " + socketException); } catch (IOException exception) { System.out.println("Exception " + exception); } finally { if (multicastSocket != null) { try { multicastSocket.leaveGroup(multicastAddress); multicastSocket.close(); } catch (IOException exception) { System.out.println(exception.toString()); } } } } }
source share