I am trying to use CloseableHttpAsyncClient to read from an endpoint, line up a string in Object (using javax.json), and then convert the array to an object in separate components:
CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build(); client.start(); Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client) .toObservable(); Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> { String stringVal = new String(bb); StringReader reader = new StringReader(stringVal); JsonObject jobj = Json.createReader(reader).readObject(); return jobj.getJsonArray("elements"); })).share();
I need to get a Json Array, then filter out the array objects:
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
How to convert Observable<JsonArray> to ObservableJsonObject> ?
Since this is async, I cannot use forEach to create some kind of array to buffer data.
UPDATE:
Thus, finding use of CloseableHttpAsyncClient may not be the best solution for what I'm trying to achieve. I realized this morning (at the heart of all things) that I am trying to process data asynchronously, then to make asynchronous calls.
Ideally, calling CloseableHttpClient (synchronizing) and passing data to Observable for filtering would be a more ideal approach (I don't need the first call to manage more than one HTTP call).
CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build(); StringBuffer result = new StringBuffer(); try { HttpGet request = new HttpGet(uri); HttpResponse response = client.execute(request); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); String line; while ((line = rd.readLine()) != null) { result.append(line); } } catch(ClientProtocolException cpe) { } catch(IOException ioe) { } StringReader reader = new StringReader(result.toString()); JsonObject jobj = Json.createReader(reader).readObject(); JsonArray elements = jobj.getJsonArray("elements"); List<JsonObject> objects = elements.getValuesAs(JsonObject.class); Observable<JsonObject> shareable = Observable.from(objects).share(); Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3")); firstStream.subscribe(record -> {
source share