Calling network services in parallel using RxJava. Is it correct?

The idea is to make 3 network calls in parallel. (I use Google as a service for demo purposes. The following steps, but are not sure if this is the right way or can be simplified. What if I need to combine the responses of all three queries?

public class GoogleSearchRx
{
    public static void main(String args[])
    {
        CountDownLatch latch = new CountDownLatch(3);

        search("RxJava").subscribeOn(Schedulers.io()).subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        search("Reactive Extensions").subscribeOn(Schedulers.io()).subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        //run the last one on current thread
        search("Erik Meijer").subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        try
        {
            latch.await();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

    public static Observable<Elements> search(String q)
    {
        String google = "http://www.google.com/search?q=";

        String charset = "UTF-8";
        String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // Change this to your company name and bot homepage!

        return Observable.create(new Observable.OnSubscribe<Elements>()
        {

            @Override public void call(Subscriber<? super Elements> subscriber)
            {
                out.println(currentThreadName() + "\tOnSubscribe.call");

                try
                {
                    Elements links = Jsoup.connect(google + URLEncoder.encode(q, charset)).timeout(1000).userAgent(userAgent).get().select("li.g>h3>a");
                    subscriber.onNext(links);
                }
                catch (IOException e)
                {
                    subscriber.onError(e);
                }
                subscriber.onCompleted();
            }
        });
    }
}
+4
source share
2 answers

Following the “combine the answers of all three searches” in the part of your question, you can search for Zip .

Observable<Elements> search1 = search("RxJava");
Observable<Elements> search2 = search("Reactive Extensions");
Observable<Elements> search3 = search("Eric Meijer");
Observable.zip(searc1, search2, search3,
            new Func3<Elements, Elements, Elements, Elements>() {
                @Override
                public Elements call(Elements result1, Elements result2, Elements result3) {
                    // Add all the results together...
                    return results;
                }
            }
    ).subscribeOn(Schedulers.io()).subscribe(
            links -> {
                links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                latch.countDown();
            },
            e -> {
                out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                latch.countDown();
            }
    );

, ( ) , .

, zip, 1..N Func1 Func9 FuncN, .

+5

, ( Jsoup), Observable.create ( Rx !)

( "back-of-sapkin", .)

final String google = "http://www.google.com/search?q=";

final String charset = "UTF-8";
final String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // ...
Observable.just("RxJava", "Reactive Extensions", "Erik Meijer")
    .flatMap((query) -> Observable.defer(() -> {
        try {
            return Observable.from(Jsoup.connect(google + URLEncoder.encode(query, charset))
                .timeout(1000)
                .userAgent(userAgent)
                .get()
                .select("li.g>h3>a")).subscribeOn(Schedulers.io());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }))
    .forEach(
        (link) -> out.println(link.text()),
        (e) -> out.println("Failed: " + e.getMessage()));

, , , . toSortedList, , Comparable, Func2, .

+2

All Articles