什么时候不该使用Rxjava
转成observalbe是简单的,所以这篇文章是关于什么时候从你的api中返回非observalbe。 #### 情况1 小的常量数据集合
下面是最简单的一种不适合生成observalbe的情况。下面是一个每具类。
public enum EmployeeType {
FULL_TIME,
CONTRACTOR,
INTERN
}
如果你想遍历这个枚举类类型,那么以下面这种转成observalbe的方式是不是总是有意义的呢?
Observable<EmployeeType> employeeTypes = Observable.from(Employee.values());
如果你已经在Observable运算符链中,需要使用Observable版本的EmployeeType,那么这么做是有意义的.但我认为这是个列外,通常情况下并非这种情况。
小并且不发生变化的数据集并不适合转化成Observable。从一个api的角度,使用传统的集合的形式是比较好的情况。如果需要使用 Observable 则可以很容易的完成转换。这并不是只适用于枚举类型,可能这是一个极端的例子。但是任何小的常量数据集都适用于这种情况。
情况2 开销比较大的缓存对象
假设你有一个ActionQualifier类,类中有一些开销比较大的正则表达式成员变量。虽然在寻找文字块时,正则表达式很强大,但是运行时进行编译却非常耗时。因此,生成冗余的一个又一个的ActionQualifier对象对性能代价是巨大的。
public final class ActionQualifier {
private final Pattern codeRegexPattern;
private final int actionNumber;
ActionQualifier(String codeRegex, int actionNumber) {
this.codeRegexPattern = Pattern.compile(codeRegex);
this.actionNumber = actionNumber;
}
public boolean qualify(String code) {
return codeRegexPattern.matcher(code).find();
}
public int getActionCode() {
return actionNumber;
}
}
如果你使用RxJava-JDBC从数据库获取数据并得到ActionQualifier实例事件的Observable对象。当有多个订阅者订阅到这个查询数据的 Observable 上的时候,由于每个订阅会都需要重新查询数据库,从而需要重新创建ActionQualifier对象。因此对于每一次的订阅,都会重新生成ActionQualifier实例,而我们知道创建新的 ActionQualifier 是非常耗时的。
Observable<ActionQualifier> actionQualifiers = db
.select("SELECT CODE_REGEX, ACTION_NUMBER FROM ACTION_MAPPING")
.get(rs -> new ActionQualifier(rs.getString("CODE_REGEX"), rs.getInt("ACTION_NUMBER")));
为了避免每个订阅者订阅的时候都查询一次,你可以选择使用 cache() 操作符来缓存数据,并将这些缓存数据传递给订阅者。这是有效的,但这样的话,actionQualifiers 就没法更新了并且可能长期持有,导致虚拟机无法回收该对象。
Observable<ActionQualifier> actionQualifiers = db
.select("SELECT CODE_REGEX, ACTION_NUMBER FROM ACTION_MAPPING")
.get(rs -> new ActionQualifier(rs.getString("CODE_REGEX"), rs.getInt("ACTION_NUMBER")))
.cache();
Dave Moten 提供了一个巧妙的解决方式 ,缓存设定过期时间,然后缓存过期重新订阅时,从数据库获取数据源创建对象。但是最终你可能会问,是否可以把 actionQualifiers 保存在一个 List 中,然后手工的去刷新。你也不必再绑定到Observable上。
List<ActionQualifier> actionQualifiers = db
.select("SELECT CODE_REGEX, ACTION_NUMBER FROM ACTION_MAPPING")
.get(rs -> new ActionQualifier(rs.getString("CODE_REGEX"), rs.getInt("ACTION_NUMBER")))
.toList().toBlocking().first();
当你需要使用 Observable 的时候,则可以很容易的把 List 转换为 Observable :
Observable.from(actionQualifiers).filter(aq -> aq.qualify("TXB.*"));
不论哪种方式,处理大消耗的资源都是一个棘手的问题。有些时候,相比响应式,以状态式的方式管理它们更容易一些。
情况3 简单的“查询”和单步操作
Rxjava的一个比较大的优势是可以使用各种操作符对其持有的事件转化。比如获取这个、过滤那个、反射和减少另外一个。你可以使用少量的代码就可以通过一个链式的操作符来处理大量的工作。
Observable<Product> products = ...
Observable<Int> totalSoldUnits = products
.filter(pd -> pd.getCategoryId() == 3)
.map(pd -> pd.getSoldUnits())
.reduce(0,(x,y) -> x + y)
但是如果你只是需要一个单步操作,比如查询一个id对应的对象。
Observable<Category> category = Category.forId(263);
是不是这样的话,rxjava的优势就没那么大了。可能把,这时候返回一个Category对象,而不是一个Observable对象或许更简单一些,即:
Category category = Category.forId(263);
如果返回的结果有多个 Category 、或者对于 null 返回值你想的到一个空的Observable,则使用 Observable是必须的 。但是在下面的情况4中会看到,这种过度的使用 Observable 会导致更多的模板代码出现。所以对于一个简单查询返回的单值,或者只是一个单步操作,那么最好不考虑使用Observable。
另外,如果你真的想适用Observable,你总是可以之后将其进行转化。
Observable<Category> category = Observable.just(Category.forId(263))
.filter(c -> c != null);
情况4 频繁使用到的属性
让我们扩展下情况3来讨论另外一个点。考虑你有这样一个product类。
public final class Product {
private final int id;
private final String description;
private final int categoryId;
public Product(int id, String description, int categoryId) {
this.id = id;
this.description = description;
this.categoryId = categoryId;
}
public Observable<Category> getCategory() {
return Category.forId(categoryId);
}
}
上面的 getCategory() 返回的是 Observable 。如果你经常使用这个函数,则用起来可能相当麻烦。假设每个 Category 中有个 getGroup() 函数返回一个整数。我们想过滤的到Category的group是5的那些product事件的observable。操作如下:
Observable<Product> products = ...
Observable<Product> productsInGroup5 =
products.flatMap(p -> p.getCategory().filter(c -> c.getGroup() == 5).map(c -> p));
对于一个简单的任务,就需要那么多的操作符。略显复杂。如果我们考虑getCategory() 返回的是 非Observable,这样会更简单一些。
public Category getCategory() {
return Category.forId(categoryId);
}
Observable<Product> productsInGroup5 =
products.filter(p -> p.getCategory().getGroup() == 5);
总结:如果一个类中的某个属性会被频繁的过滤等使用,那么最好考虑不采用Observable方式,特别是对于属性返回的是单值,而不是一系列值的时候。
情况5 获取状态
RxJava 中的数据通常是无状态的。这样可以方便并行处理多个事件或行为,而不是处理一系列的状态。但是在实际的业务逻辑中,通常需要保留一些状态。比如打印价格时候,需要保留历史价格信息:
public final class PricePoint {
private final int id;
private final int productId;
private final BigDecimal price;
private final ImmutableList<BigDecimal> historicalPricePoints;
public PricePoint(int id, int productId, BigDecimal price) {
this.id = id;
this.productId = productId;
this.price = price;
historicalPricePoints = HistoricalPricePoints.forProductId(productId);
}
public ImmutableList<BigDecimal> getHistoricalPricePoints() {
return historicalPricePoints;
}
}
你能响应式的获取历史价格点,当操作不消耗太大时,是可以的。
public final class PricePoint {
private final int id;
private final int productId;
private final BigDecimal price;
public PricePoint(int id, int productId, BigDecimal price) {
this.id = id;
this.productId = productId;
this.price = price;
}
public Observable<BigDecimal> getHistoricalPricePoints() {
return HistoricalPricePoints.forProductId(productId);
}
}
但是如果这个操作是比较消耗资源的,则又回到了第二种情况。并且,如果你想获取在PricePoint创建之后获取historical price points,Observable并不是一个非常好的办法。你需要持有其状态,但是像rxjava这些无状态解决方案并不是很理想。针对这种有状态的数据,还是使用传统的查询方式比较好。
总结
Rxjava确实是一种革新式的编程方式,你需要毫不犹豫的使用它。但是 RxJava 是用来解决比较复杂或者非常复杂情况的,对于简单的情况还是简单处理吧。 上面的一些情况,对于有经验的 RxJava 开发者可能很容易避免这种情况,对于初学者可能还处于 使用 RxJava 的蜜月期,看问题没这么透彻很容易陷入到过度使用 RxJava 的情况。
内存溢出
Observable经常出现内存泄漏溢出,占用过多等情况。现在讨论一下这些问题。
#### 内存溢出案例
在进行retrofit+rxjava进行异步请求网络数据并ui渲染过程中,retrofit生成的Observable对象的生命周期凌驾于 Activity(/Fragment) 之上.
如果当Subscriber订阅Observable后(生成数据/事件、转换数据/事件,然后发射数据/事件),这时候退出界面,当页面销毁后会发生:
- 因为retrofit生成的Observable对象的生命周期凌驾于 Activity(/Fragment) 之上(没有取消订阅的话),所以被订阅的Observable 并没有终止销毁,Observable 保留了 Activity 的 Subscriber 的引用,最终导致 Activity 无法被释放,产生内存泄漏。
- 当 Observer 开始发射数据时,Subscriber 响应进行 UI 处理,会直接抛出异常并退出程式,因为 UI 对象已经被销毁了。
解决方案
手动取消订阅
Observable.subscribe() 的返回值是一个 Subscription 对象。Subscription 类只有两个方法,unsubscribe() 和 isUnsubscribed()。为了防止可能的内存泄露,在你的 Activity 或 Fragment 的 onDestroy 里,用 Subscription.isUnsubscribed() 检查你的 Subscription 是否是 unsubscribed。如果调用了 Subscription.unsubscribe() ,Unsubscribing将会对 items 停止通知给你的 Subscriber,并允许垃圾回收机制释放对象,防止任何 RxJava 造成内存泄露。如果你正在处理多个 Observables 和 Subscribers,所有的 Subscription 对象可以添加到 CompositeSubscription,然后可以使用 CompositeSubscription.unsubscribe() 方法在同一时间进行退订(unsubscribed)。
RxLifecycle
RxLifecycle是一个不错的解决方案。RxLifecycle能够自动补全取消订阅的操纵。从而防止一些特殊情况或遗忘导致的没有取消订阅,从而产生内存泄漏的情况,例如,activty的DESTROY()中取消订阅就采用:
myObservable
.compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTROY))
.subscribe();
不足之处:因为RxLifecycle 提供的一些组件都是rxactivity、rxfragment此类的,如果我们适用它的组件,需要继承这些activty/fragment,所以其更适合直接在 Activity/Fragment 中处理 Observable。具体怎么自己实现怎么实现还没搞明白。
Observable一些思考问题
问题1 在configuration改变(比如转屏)之后继续之前的Subscription?
比如你使用Retrofit发出了一个REST请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办? 你当然想继续刚才的请求,但是怎么搞? #### 问题2 Observable持有Context导致的内存泄露 这个问题是因为创建subscription的时候,以某种方式持有了context的引用,尤其是当你和view交互的时候,这太容易发生!如果Observable没有及时结束,内存占用就会越来越大。 ### 下一步 基本对retrofit+rxjava大致有了一个了解。 接下来准备以mvp这种模式实现一个retrofit+rxjava+RxLifecycle的一个小demo,看看具体情况。

This work is licensed under a CC A-S 4.0 International License.