Convert String to array into objects in observable

I am trying to use CloseableHttpAsyncClient to read from an endpoint, line up a string in Object (using javax.json), and then convert the array to an object in separate components:

 CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build(); client.start(); Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client) .toObservable(); Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> { String stringVal = new String(bb); StringReader reader = new StringReader(stringVal); JsonObject jobj = Json.createReader(reader).readObject(); return jobj.getJsonArray("elements"); })).share(); 

I need to get a Json Array, then filter out the array objects:

 Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3")); 

How to convert Observable<JsonArray> to ObservableJsonObject> ?

Since this is async, I cannot use forEach to create some kind of array to buffer data.

UPDATE:

Thus, finding use of CloseableHttpAsyncClient may not be the best solution for what I'm trying to achieve. I realized this morning (at the heart of all things) that I am trying to process data asynchronously, then to make asynchronous calls.

Ideally, calling CloseableHttpClient (synchronizing) and passing data to Observable for filtering would be a more ideal approach (I don't need the first call to manage more than one HTTP call).

  CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build(); StringBuffer result = new StringBuffer(); try { HttpGet request = new HttpGet(uri); HttpResponse response = client.execute(request); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); String line; while ((line = rd.readLine()) != null) { result.append(line); } } catch(ClientProtocolException cpe) { } catch(IOException ioe) { } StringReader reader = new StringReader(result.toString()); JsonObject jobj = Json.createReader(reader).readObject(); JsonArray elements = jobj.getJsonArray("elements"); List<JsonObject> objects = elements.getValuesAs(JsonObject.class); Observable<JsonObject> shareable = Observable.from(objects).share(); Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3")); firstStream.subscribe(record -> { //connect to SOTS/Facebook and store the results ByteArrayOutputStream baos = new ByteArrayOutputStream(); Json.createWriter(baos).writeObject(record); System.out.println(baos.toString()); }); secondStream.subscribe(record -> { ByteArrayOutputStream baos = new ByteArrayOutputStream(); Json.createWriter(baos).writeObject(record); System.out.println(baos.toString()); }); thirdStream.subscribe(record -> { ByteArrayOutputStream baos = new ByteArrayOutputStream(); Json.createWriter(baos).writeObject(record); System.out.println(baos.toString()); }); 
+5
source share
2 answers

Try this code:

  String myjson = "{\"elements\": [{\"text\":\"Obj1\"},{\"text\":\"Obj2\"}, {\"text\":\"Obj3\"}]}"; Observable.just(myjson) .map(jsonStr -> new StringReader(myjson)) .map(reader -> Json.createReader(reader).readObject()) .map(jobj -> jobj.getJsonArray("elements")) .map(elements -> elements.toArray(new JsonObject[elements.size()])) .flatMap(jsonObjects -> Observable.from(jsonObjects)) .subscribe( (jsonObject) -> System.out.println(jsonObject.getString("text")), throwable -> throwable.printStackTrace(), () -> System.out.println("On complete")); 

Result:

07-22 12: 19: 22.362 8032-8032 / com.mediamanagment.app I / System.out: Obj1
07-22 12: 19: 22.362 8032-8032 / com.mediamanagment.app I / System.out: Obj2
07-22 12: 19: 22.362 8032-8032 / com.mediamanagment.app I / System.out: Obj3

Note:
You should use this dependency:

 compile 'org.glassfish:javax.json:1.0.4' 

Instead of this:

 compile 'javax.json:javax.json-api:1.0' 

If you use 'javax.json:javax.json-api:1.0' , you will get javax.json.JsonException: Provider org.glassfish.json.JsonProviderImpl not found in step:

 .map(reader -> Json.createReader(reader).readObject()) 

In this case, use 'org.glassfish:javax.json:1.0.4'

UPDATE: Also, instead of

 .flatMap(jsonObjects -> Observable.from(jsonObjects)) 

You can use flatMapIterable( ) :

 .flatMapIterable(jsonObjects -> jsonObjects) 
+2
source

You can use another call to flatMap() instead of the map() call you use. Then use Observable.create() to emit JsonObject s

 Observable<JsonObject> shareable = observable .flatMap(response -> response.getContent() .flatMap(bb -> { String stringVal = new String(bb); StringReader reader = new StringReader(stringVal); JsonObject jobj = Json.createReader(reader).readObject(); JsonArray elements = jobj.getJsonArray("elements"); return Observable.create(subscriber -> { for (int i = 0; i < elements.length(); i++) { subscriber.onNext(elements.getJSONObject(i)); } subscriber.onCompleted(); }); })) .share(); 
0
source

All Articles