Error handling for zipped observables

My use case: I get a list of permalinks and you need to issue two REST requests for the permalink to get your data in parts. When both requests return, I want to combine their information and do something with it (here - print). I want to do this with code using the zip operator. Here is my current code (along with mocks for the library I'm using):

 public class Main { public static void main(String[] args) { ContentManager cm = new ContentManager(); Observable .from(cm.getPermalinks(10)) .flatMap(permalink -> Observable.zip( Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), (dataContent, streamUrlContent) -> { if (dataContent == null || streamUrlContent == null) { System.err.println("not zipping " + dataContent + " and " + streamUrlContent); return Observable.empty(); } return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); })) .subscribe(System.out::println); } } class SubscribingRestCallback implements RestCallback { private final Subscriber<? super Content> subscriber; public SubscribingRestCallback(Subscriber<? super Content> subscriber) { this.subscriber = subscriber; } @Override public void onSuccess(Content content) { subscriber.onNext(content); subscriber.onCompleted(); } @Override public void onFailure(int code, String message) { System.err.println(message); subscriber.onNext(null); subscriber.onCompleted(); } } public class Content { public final String permalink; public final String logoUrl; public final String streamUrl; public Content(String permalink, String logoUrl, String streamUrl) { this.permalink = permalink; this.logoUrl = logoUrl; this.streamUrl = streamUrl; } @Override public String toString() { return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl); } } public interface RestCallback { void onSuccess(Content content); void onFailure(int code, String message); } class ContentManager { private final Random random = new Random(); public List<String> getPermalinks(int n) { List<String> permalinks = new ArrayList<>(n); for (int i = 1; i <= n; ++i) { permalinks.add("perma_" + i); } return permalinks; } public void getDataByPermalink(String permalink, RestCallback callback) { getByPermalink(permalink, callback, false); } public void getStreamByPermalink(String permalink, RestCallback callback) { getByPermalink(permalink, callback, true); } private void getByPermalink(String permalink, RestCallback callback, boolean stream) { // simulate network latency and unordered results new Thread(() -> { try { Thread.sleep(random.nextInt(1000) + 200); } catch (InterruptedException e) { e.printStackTrace(); } if (random.nextInt(100) < 95) { String logoUrl; String streamUrl; if (stream) { logoUrl = null; streamUrl = "http://" + permalink + "/stream"; } else { logoUrl = "http://" + permalink + "/logo.png"; streamUrl = null; } callback.onSuccess(new Content(permalink, logoUrl, streamUrl)); } else { callback.onFailure(-1, permalink + " data failure"); } }).start(); } } 

In general, it works, but I don't like error handling in this implementation. In principle, REST requests may fail, in which case the onFailure method calls subscriber.onNext(null) , so the zip method always has something to work with (one request may fail, but the other may no, and I don't know what failed). Then in the zip function I need an if that checks that both of them are not null (my code will work if any of the partial Content is null ).

I would like to be able to filter null with a filter statement somewhere, if possible. Or maybe there is a better way than emitting null values ​​for the failure case, but so that it still works with the zip function?

+8
rx-java
source share
1 answer

First of all, the correct way to notify Subscriber an error is to call the subscriber.onError method:

 class SubscribingRestCallback implements RestCallback { private final Subscriber<? super Content> subscriber; public SubscribingRestCallback(Subscriber<? super Content> subscriber) { this.subscriber = subscriber; } @Override public void onSuccess(Content content) { subscriber.onNext(content); subscriber.onCompleted(); } @Override public void onFailure(int code, String message) { subscriber.onError(new Exception(message)); } } 

Even if you do not want the entire thread to fail, you still need to call the subscriber.onError() method. There are other ways to fix errors. One of them is the onErrorResumeNext :

 Observable .from(cm.getPermalinks(10)) .flatMap(permalink -> Observable.zip( Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), (dataContent, streamUrlContent) -> { return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); }).onErrorResumeNext(Observable.empty())) .subscribe(System.out::println); 

EDIT

I have a final question: if you notice my functions with zippers, I return Observable.empty () if two objects cannot be archived, and as soon as I return the contents. This seems wrong. How to handle such an error? conditions in the function of the zipper?

Yes, returning Observable.empty() completely wrong. Throwing an exception to the zip function seems to be the best solution:

 Observable .from(cm.getPermalinks(10)) .flatMap(permalink -> Observable.zip( Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), (dataContent, streamUrlContent) -> { if (!isDataValid(dataContent, streamUrlContent)) { throw new RuntimeException("Something went wrong."); } return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); }).onErrorResumeNext(Observable.empty())) .subscribe(System.out::println); 
+5
source share

All Articles