Observer Design Scheme

In the observer design pattern, the item notifies all observers by invoking the update() operation of each observer. One way to do this is

 void notify() { for (observer: observers) { observer.update(this); } } 

But the problem here is that each observer is updated in the sequence and update operations for the observer, which cannot be called until all observers are updated. If there is an observer that has an infinite loop for updating, then all observers after it will never be notified.

Question:

  • Is there any way around this problem?
  • If so, what will be a good example?
+4
source share
7 answers

Classic design patterns do not include parallelism and streams. You will need to create N threads for N observers. Be careful, as their interaction with this must be done in a safe thread.

+10
source

The problem is an endless loop, not one-by-one messages.

If you want something to be updated at the same time, you had to disable things on different threads, in which case each listener would need to synchronize with others in order to gain access to the object that triggered the event.

Complaints about one infinite loop stopping other updates happen like complaints that a lock and the subsequent transition to an infinite loop stop others from accessing a locked object - the problem is with the infinite loop, and not with the lock manager.

+20
source

You can use the java.utils.concurrent.Executors.newFixedThreadPool (int nThreads) method, and then call the invokeAll method (to use an infinite loop to avoid an infinite loop).

You would modify your loop to add a class that is Callable, which takes "observer" and "this", and then calls the update method in the "call" method.

Take a look at this package for more information .

This is a quick and dirty embodiment of what I was talking about:

 import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Main { private Main() { } public static void main(final String[] argv) { final Watched watched; final List<Watcher> watchers; watched = new Watched(); watchers = makeWatchers(watched, 10); watched.notifyWatchers(9); } private static List<Watcher> makeWatchers(final Watched watched, final int count) { final List<Watcher> watchers; watchers = new ArrayList<Watcher>(count); for(int i = 0; i < count; i++) { final Watcher watcher; watcher = new Watcher(i + 1); watched.addWatcher(watcher); watchers.add(watcher); } return (watchers); } } class Watched { private final List<Watcher> watchers; { watchers = new ArrayList<Watcher>(); } public void addWatcher(final Watcher watcher) { watchers.add(watcher); } public void notifyWatchers(final int seconds) { final List<Watcher> currentWatchers; final List<WatcherCallable> callables; final ExecutorService service; currentWatchers = new CopyOnWriteArrayList<Watcher>(watchers); callables = new ArrayList<WatcherCallable>(currentWatchers.size()); for(final Watcher watcher : currentWatchers) { final WatcherCallable callable; callable = new WatcherCallable(watcher); callables.add(callable); } service = Executors.newFixedThreadPool(callables.size()); try { final boolean value; service.invokeAll(callables, seconds, TimeUnit.SECONDS); value = service.awaitTermination(seconds, TimeUnit.SECONDS); System.out.println("done: " + value); } catch (InterruptedException ex) { } service.shutdown(); System.out.println("leaving"); } private class WatcherCallable implements Callable<Void> { private final Watcher watcher; WatcherCallable(final Watcher w) { watcher = w; } public Void call() { watcher.update(Watched.this); return (null); } } } class Watcher { private final int value; Watcher(final int val) { value = val; } public void update(final Watched watched) { try { Thread.sleep(value * 1000); } catch (InterruptedException ex) { System.out.println(value + "interupted"); } System.out.println(value + " done"); } } 
+5
source

I would be more concerned that the observer throws an exception than the looping endlessly about it. Your current implementation will not notify other observers of such an event.

+3
source

1. Is there a way around this problem?

Yes, make sure that the observer is working properly and is returning on time.

2. Can someone please explain this with an example.

Sure:

 class ObserverImpl implements Observer { public void update( Object state ) { // remove the infinite loop. //while( true ) { // doSomething(); //} // and use some kind of control: int iterationControl = 100; int currentIteration = 0; while( curentIteration++ < iterationControl ) { doSomething(); } } private void doSomething(){} } 

This prevents an endless transition from a given loop (if it makes sense, it should work no more than 100 times)

Another mechanism is to launch a new task in the second thread, but if it goes into an infinite loop, it will eventually consume all of the system memory:

 class ObserverImpl implements Observer { public void update( Object state ) { new Thread( new Runnable(){ public void run() { while( true ) { doSomething(); } } }).start(); } private void doSomething(){} } 

This will make the observer instance immediately return, but it will be just an illusion of what you need to do to avoid an endless loop.

Finally, if your observers are working fine, but you just want to notify them all earlier, you can take a look at this related question: Call the code after all mouse event listeners are executed. .

+2
source

All observers are notified that you are receiving all guarantees.

If you want to implement some fancy ordering, you can do this:

  • Connect only one observer;
  • this primary observer notifies his friends in the order that you define in the code or in some other way.

This will save you from the classic Observer template in which your listeners are tightly bound, but if you need it ... do it!

0
source

If you have an observer with an “infinite loop”, it is no longer an observer pattern.

You can start a different thread for each observer, but observers MUST NOT be prohibited from changing the state of the observed object.

The simplest (and most stupid) method is to simply take your example and make it threaded.

 void notify() { for (observer: observers) { new Thread(){ public static void run() { observer.update(this); } }.start(); } } 

(this has been manually encoded, unverified and probably has a mistake or five - and this is a bad idea)

The problem is that this will make your machine short, as it must allocate several new threads at once.

So, to fix the problem with all the initial steps, use ThreadPoolExecutor because it will A) process threads, and B) may limit the maximum number of threads that can be executed.

This is not determinate in your "Loop forever" case, since each forever loop will constantly have one of the threads from your pool.

It’s best not to let them loop forever, or, if they should, create their own flow.

If you need to maintain classes that cannot be changed, but you can determine what will run fast and which will run “Forever” (in computer terms, which, it seems to me, are more than second or second), you CAN use such a cycle:

 void notify() { for (observer: observers) { if(willUpdateQuickly(observer)) observer.update(this); else new Thread(){ public static void run() { observer.update(this); } }.start(); } } 

Hey, if it really is “Loops Forever”, will it consume a stream for each notification? It looks like you might have to spend more time on your design.

0
source

All Articles