How to get the last issue of a sequence when calling rx.Observable.sample ()?

I am reading the file from a remote URL and using RxJava to report the progress of the download. The Observable file writer emits a sequence of DownloadProgress objects. Since many elements are emitted, I use Observable.sample () to control backpressure. This works very well - user interface updates arrive at a constant speed and there are no backpressure problems, but the latest progress update is almost always skipped.

I would like to get the last element in the Observable sequence so that I can update the interface with final progress. What is the best way to ensure that the last element in an Observable sequence is always emitted?

Observable<Response> fileReader =
        Rx.okHttpGetRequest(url);
OkHttpResponseWriter fileWriter =
        Rx.okHttpResponseWriter(outFile);

Subscription subscription = fileReader.flatMap(fileWriter)
        .sample(1, TimeUnit.SECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<DownloadProgress>() {
            @Override
            public void onCompleted() {}
            @Override
            public void onError(Throwable e) {}
            @Override
            public void onNext(DownloadProgress progress) {
                // I want to receive an update every one second
                // I also want to always receive the last progress update
            }
        });
+4
3

, buffer sample, "" . , , - ( ).

private Subscriber<List<Integer>> loggingSubscriber2 = new SimpleSubscriber<List<Integer>>() {
    @Override
    public void onNext(List<Integer> integers) {
        Log.v(TAG, String.valueOf(integers.get(integers.size() - 1)));
    }
};

private void startObservableTests() {
    Observable<Integer> fileObserver = Observable.create(
            new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for (int i = 1; i <= 9; i++) {
                        subscriber.onNext(i);
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    subscriber.onCompleted();
                }
            }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

    fileObserver.buffer(500, TimeUnit.MILLISECONDS).subscribe(loggingSubscriber2);
}
+3

:

private void startObservableTests() {

    Observable<Integer> fileObserver = Observable.create(
            new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for (int i = 1; i <= 9; i++) {
                        subscriber.onNext(i);
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    subscriber.onCompleted();
                }
            }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());


    fileObserver.sample(500, TimeUnit.MILLISECONDS).subscribe(new SuperSubscriber(fileObserver));
}

private class SuperSubscriber extends Subscriber<Integer> {

    Observable obs;

    public SuperSubscriber(Observable<Integer> fileObserver) {
        obs = fileObserver;
    }

    @Override
    public void onCompleted() {
        obs.last().subscribe(new SimpleSubscriber<Integer>() {
            @Override
            public void onNext(Integer o) {
                Log.v(TAG, "final value was " + o);
            }
        });
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Integer o) {
        Log.v(TAG, "got a " + o);
    }
}

:

10-01 15:18:36.893 V/TAG  (26129): got a 5
10-01 15:18:38.214 V/TAG  (26129): final value was 9
+2

, @Travis, , . buffer window switchOnNext :

Observable<Response> fileReader =
    Rx.okHttpGetRequest(fileInfo.getUrl());
OkHttpResponseWriter fileWriter =
    Rx.okHttpResponseWriter(outFile, totalBytesRead);
Subscription subscription = Observable.switchOnNext(fileReader
        .flatMap(fileWriter)
        .window(1, TimeUnit.SECONDS))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<DownloadProgress>() {
            @Override
            public void onCompleted() {}
            @Override
            public void onError(Throwable e) {}
            @Override
            public void onNext(DownloadProgress progress) {
                // always emits the most recent DownloadProgress, even the last one!
            }
});

:

  • fileReader URL-, HTTP- Response
  • fileReader.flatMap(fileWriter)takes Responseand writes a stream of bytes to disk, emitting a sequence of objectsDownloadProgress
  • window(1, TimeUnit.Seconds)allocates List<DownloadProgress>objects every second, then packets and re-emits them asObservable<DownloadProgress>
  • Observable.switchOnNext()takes the last emitted Observable<DownloadProgress>and emits the last DownloadProgressobject in the sequence
+1
source

All Articles