The first object in Set <Future <Object>> that satisfies the predicate

Abstract idea

I want to get the first value coming out of a futures set that satisfies a given predicate.

If a satisfactory value is found, all other futures should be canceled. If, after all futures have returned, the value cannot be found, execution should be stopped (by returning to the default value or throwing an exception).

Specific example

public boolean isThereVacantHotelRooms(Set<URL> hotelApiUrls) {
    // returns true if any provided server responds with a number larger than 0
}

I am looking for a great way to implement the above in Java 8 (external libraries are fine). I tried to implement it with CompletableFuture as well as RxJava, but I feel very non-idiomatic for this problem, and as a result I got a lot of ugly code.

+4
2

, , :

List<Observable<HotelInfo>> hotels = new ArrayList<>();
for (URL u : urls) {
    Observable<HotelInfo> hotelInfo = networkAPI.askHotel(u);
    hotels.add(hotelInfo);
}
Observable.merge(hotels)
.filter(h -> h.vacancy > 0)
.take(1)
.subscribe(h -> System.out.println("Winner: " + h), Throwable::printStackTrace);
+3

, bayou

// throw exception for failure
public void checkVacantHotelRooms(Set<URL> hotelApiUrls) throws Exception
{
    checkVacantHotelRoomsAsync(hotelApiUrls)  // Async<Void>
        .timeout(Duration.ofSeconds(10))      // cancel on 10s
        .sync();    //  async -> sync
}

public Async<Void> checkVacantHotelRoomsAsync(Set<URL> hotelApiUrls)
{
    Stream<Async<Void>> resultStream = hotelApiUrls.stream()    // Stream<URL>
        .map(this::getResponseBodyAsync)                        // Stream<Async<String>>
        .map(asyncBody->asyncBody.map(this::checkResponse));    // Stream<Async<Void>

    return AsyncBundle.anyOf(resultStream); 
    // succeeds if one result succeeds; others will be cancelled
}

Void checkResponse(String responseBody) throws Exception
{
    if(responseBody.contains("something"))
        return (Void)null;
    throw new Exception("none in this hotel");
}

-----

HttpClient httpClient = new HttpClient();
int maxBodyLength = 1000;

Async<String> getResponseBodyAsync(URL url)
{
    Async<HttpResponse> asyncResponse = httpClient.doGet(url.toExternalForm());
    return asyncResponse.then(resp->resp.bodyString(maxBodyLength));
}
+2

All Articles