Consumer -Product issue in web server streaming data array

The customer-producer message states that:

"2) The producer does not need to know who the consumer is or how many consumers there. The same is true for the consumer."

My problem lies in the fact that I have an array of data that I need to get clients from the web server as soon as possible. Customers may appear in the middle of the calculation. Several clients at different times may request an array of data. Upon completion of the calculation, it is cached, and then you can simply read it.

Exmaple usage example : during the calculation, I want to serve all the elements of the array as soon as possible. I can’t use BlockingQueue because I’ll say if the second client starts to request an array, and the first one already used .take () in the first half of the array. Then the second client missed half the data! I need a BlockingQueue where you don't need to take (), but instead you can just read (int index).

Decision? I have many records in my array, so I don't want to use CopyOnWriteArrayList? The Vector class should work, but will be inefficient? Is it preferable to use a ThreadSafeList, such as this , and just add the waitForElement () function? I just don't want to reinvent the wheel, and I prefer crowd-tested solutions for multithreaded problems ...

+6
source share
2 answers

As far as I understand, you need broadcast data in subscribers/clients . Here are some ways I know to get closer to it.

  • Pure Java solution, each client has a BlockingQueue , and every time you send a message, you put it in each queue.

     for(BlockingQueue client: clients){ client.put(msg); } 
  • RxJava provides a responsive approach . Customers will be subscribers and when you send an emit message, subscribers will be notified and they can cancel their subscription

     Observable<String> observable = Observable.create(sub->{ String[] msgs = {"msg1","msg2","msg3"}; for (String msg : msgs) { if(!sub.isUnsubscribed()){ sub.onNext(msg); } } if (!sub.isUnsubscribed()) { // completes sub.onCompleted(); } }); 

    Now several subscribers can receive messages.

     observable.subscribe(System.out::println); observable.subscribe(System.out::println); 

    Observed bits are a bit functional , they can choose what they need.

     observable.filter(msg-> msg.equals("msg2")).map(String::length) .subscribe(msgLength->{ System.out.println(msgLength); // or do something useful }); 
  • Akka provides broadcast routers

+2
source

This is not a completely trivial problem; but not too difficult to decide.

Assuming your producer is an urgent program; it generates a piece of data with a piece, adding each piece to the cache; the process ends either successfully or with an error.

The cache must have this interface for the product in order to insert data into it.

 public class Cache public void add(byte[] bytes) public void finish(boolean error) 

Each consumer gets a new view from the cache; view is a blocking data source

 public class Cache public View newView() public class View // return null for EOF public byte[] read() throws Exception 

Here is a simple implementation

 public class Cache { final Object lock = new Object(); int state = INIT; static final int INIT=0, DONE=1, ERROR=2; ArrayList<byte[]> list = new ArrayList<>(); public void add(byte[] bytes) { synchronized (lock) { list.add(bytes); lock.notifyAll(); } } public void finish(boolean error) { synchronized (lock) { state = error? ERROR : DONE; lock.notifyAll(); } } public View newView() { return new View(); } public class View { int index; // return null for EOF public byte[] read() throws Exception { synchronized (lock) { while(state==INIT && index==list.size()) lock.wait(); if(state==ERROR) throw new Exception(); if(index<list.size()) return list.get(index++); assert state==DONE && index==list.size(); return null; } } } } 

It can be slightly optimized; most importantly, after state = DONE, consumers do not need synchronized ; a simple volatile read is enough, which can be achieved using volatile state

0
source

All Articles