RxJava: InterruptedIOException

I wrote some code to download the file from the server, while updating the progress bar. The download code worked in the Schedulers.io stream, and the ui code was updated in AndroidSchedulers.mainThread . My program ended when the download started. Here is my code:

  Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { try { Response response = getResponse(url); if (response != null && response.isSuccessful()) { InputStream is = response.body().byteStream(); subscriber.onNext(response.body().contentLength()); // init progress File storedFile = Utils.getStoredFile(context, filePath); OutputStream os = new FileOutputStream(storedFile); byte[] buffer = new byte[1024]; int len; while ((len = is.read(buffer)) != -1) { // write data os.write(buffer, 0, len); count += len; subscriber.onNext(count); // update progress } if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } os.close(); is.close(); response.body().close(); } catch (InterruptedException e) { subscriber.onError(e); } } }) .subscribeOn(Schedulers.io()) // io and network operation .observeOn(AndroidSchedulers.mainThread()) // UI view update operation .subscribe(new Observer<Long>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted -> " + Thread.currentThread().getName()); } @Override public void onError(Throwable e) { Log.d(TAG, "onError -> " + e.getMessage()); } @Override public void onNext(Long progress) { Log.d(TAG, "onNext -> " + Thread.currentThread().getName()); Log.d(TAG, "onNext progress -> " + progress); // here update view in ui thread } } } 

And here is the error text:

 java.io.InterruptedIOException: thread interrupted at okio.Timeout.throwIfReached(Timeout.java:145) at okio.Okio$2.read(Okio.java:136) at okio.AsyncTimeout$2.read(AsyncTimeout.java:211) at okio.RealBufferedSource.read(RealBufferedSource.java:50) at com.squareup.okhttp.internal.http.HttpConnection$FixedLengthSource.read(HttpConnection.java:418) at okio.RealBufferedSource$1.read(RealBufferedSource.java:371) at java.io.InputStream.read(InputStream.java:163) at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:74) at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:52) at rx.Observable.unsafeSubscribe(Observable.java:8098) at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executor at java.util.concurrent.FutureTask.run(FutureTask.java:23 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573) at java.lang.Thread.run(Thread.java:841) 
0
source share
1 answer

ObserverOn applies to Observable.create, but internally you create a new observable in another thread. Thus, your pipeline never gives the monitor the main thread. I think your code is too complicated for what you want to achieve.

Just in case, which will help you understand the concepts of the Planner

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

0
source

All Articles