close
当前位置: 物联网在线 > 技术文库 > android >

RxAndroid 2.0 学习笔记

Rxjava 2.x正式版出来已经快两个月了。在之前的项目中也在使用Rx。但却一直没有时间对整个的知识进行梳理,恰好今天抽出时间,也系统的再学习一遍RxJava/RxAndroid

RxJava的使用 一、观察者/被观察者

1、前奏:

在观察者之前就要前提下 backpressure 这个概念。简单来说, backpressure 是在异步场景中,被观察者发送事件速度远快于观察者的处理速度时,告诉被观察者降低发送速度的策略。

2、在2.0中有以下几种观察者

Observable/Observer

Flowable/Subscriber

Single/SingleObserver

Completable/CompletableObserver

Maybe/MaybeObserver

依次的来看一下:

Observable

Observable .just(1, 2, 3) .subscribe(new Observer < Integer > () { @Override public void onSubscribe(Disposable d) {} @Override public void onNext(Integer value) {} @Override public void onError(Throwable e) {} @Override public void onComplete() {} });

这里要提的就是onSubscribe(Disposable d),disposable用于取消订阅。

就用简单的just这个操作符来分析一下。

@SuppressWarnings("unchecked") @SchedulerSupport(SchedulerSupport.NONE) public static < T > Observable < T > just(T item1, T item2, T item3, T item4) { ObjectHelper.requireNonNull(item1, "The first item is null"); ObjectHelper.requireNonNull(item2, "The second item is null"); ObjectHelper.requireNonNull(item3, "The third item is null"); ObjectHelper.requireNonNull(item4, "The fourth item is null"); return fromArray(item1, item2, item3, item4); } @SchedulerSupport(SchedulerSupport.NONE) public static < T > Observable < T > fromArray(T...items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray < T > (items)); } @Override public void subscribeActual(Observer < ?super T > s) { FromArrayDisposable < T > d = new FromArrayDisposable < T > (s, array); s.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } @Override public void dispose() { disposed = true; } @Override public boolean isDisposed() { return disposed; } void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { actual.onError(new NullPointerException("The " + i + "th element is null")); return; } actual.onNext(value); } if (!isDisposed()) { actual.onComplete(); } }

just实际调用了 fromArray 方法,中创建了 ObservableFromArray 的实例,在这个实例中实现了 Observable 这个接口,在调用 subscribe 方法进行绑定之后,首先调用了 subscribeActual 方法, onSubscribe 就会回调。

在取消绑定是我们可以将Disposable添加到CompositeDisposable中或者直接调用Disposable的dispose() 方法在流的任意位置取消。

此外, 为了简化代码,我使用了Consumer作为观察者(可以当成1.0时候的Action1 、ActionX) subscribe 的返回值就是一个Disposable ( subscribe 的返回值根据传入的参数不同,也有不同)我把这个对象添加到CompositeDisposable,并在中途取消,但发射器仍然会把所有的数据全都发射完。因为LambdaSubscriber(也就是传入Consumer 所构造的观察者)的 dispose 和 isDispose 略有不同,并不是简简单单的true/false, 说实话,我没看懂Consumer的这两个方法干了什么...........尴尬

LambdaSubscriber 瞅瞅 @Override public void dispose() { cancel(); } @Override public boolean isDisposed() { return get() == SubscriptionHelper.CANCELLED; }

Flowable

是2.0之后用的最多的观察者了,他与上一个的区别在于支持背压,也就是说,下游会知道上游有多少数据,所以他Subscriber会是这样

Flowable .just(1, 2, 3, 4) .subscribe(new Subscriber < Integer > () { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) {} @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

onSubscribe 这个回调传出了一个Subscription, 我们要指定他传出数据的大小, 调用他的 request() 方法。如没有要求可以传入一个Long的最大数值 Long.MAX_VALUE 。

要说明一下,request这个方法若不调用,下游的onNext与OnComplete都不会调用;若你写的数量小于,只会传你的个数,但是不会调用onComplete方法,可以看下 FlowableFromArray 的 slowPath 方法

@Override void slowPath(long r) { long e = 0; T[] arr = array; int f = arr.length; int i = index; Subscriber < ?super T > a = actual; for (;;) { while (e != r && i != f) { if (cancelled) { return; } T t = arr[i]; if (t == null) { a.onError(new NullPointerException("array element is null")); return; } else { a.onNext(t); } e++; i++; } if (i == f) { if (!cancelled) { a.onComplete(); } return; } r = get(); if (e == r) { index = i; r = addAndGet( - e); if (r == 0L) { return; } e = 0L; } } } }

需要if (i == f) f 是这个数据的大小,i是当前发送数据的个数,所以不会调用onComplete

休息一下

这是几种被观察者实现的接口

Observable 接口 ObservableSource

Flowable 接口 Publisher

Single 接口 SingleSource

Completable 接口 CompletableSource

Maybe 接口 MaybeSource


(责任编辑:ioter)

用户喜欢...

Android:学习AIDL,这一篇文章就够了(上)

在决定用这个标题之前甚是忐忑,主要是担心自己对AIDL的理解不够深入,到时候大家看了之后说——你这是什么玩意儿,就这么点东西就敢说够了?简直是坐井观天不知所谓——那样就很尴...


Android:学习AIDL,这一篇文章就够了(下)

上一篇博文介绍了关于AIDL是什么,为什么我们需要AIDL,AIDL的语法以及如何使用AIDL等方面的知识,这一篇博文将顺着上一篇的思路往下走,接着介绍关于AIDL的一些更加深入的知识。强烈建议...


RxAndroid深入理解

现在项目里面大多都已经使用了rxjava, 因此对于很多rxjava的扩展库,也都可以使用在项目里了。 RxAndroid 已经成为标配了,基本只要使用了Rxjava, 你肯定能看见RxAndroid的身影。 使用场景: Obs...