Rxjava

Posted by ooftf on April 12, 2021

在没有使用RxJava时我们遇到了什么问题

  • 回调式网络请求会降低代码可读性
  • 切换线程十分麻烦
  • 网络请求如果操作不当会导致内存泄漏,处理内存泄漏的方式比较麻烦(静态类+弱引用 和 将取消事件传递到导致泄漏的引用层,解除引用)

为什么要使用RxJava

RxJava是如何解决上述问题的

RxJava还能做什么

  • 各种操作符(十分强大)
  • 计时器

    什么是RxJava

    字面意思

    Reactive Extensions
    r = rective 函数响应式编程 异步数据流
    x = Extensions

    RxJava 中各种元素介绍

    可观察对象

  • io.reactivex.Flowable: 0..N flows, supporting Reactive-Streams and backpressure
  • io.reactivex.Observable: 0..N flows, no backpressure,
  • io.reactivex.Single: a flow of exactly 1 item or an error,
  • io.reactivex.Completable: a flow without items but only a completion or error signal,
  • io.reactivex.Maybe: a flow with no items, exactly one item or an error.

    操作符

    创建observable

  • just
  • create 异步
  • fromIterable 集合
  • fromArray 数组
  • interval 定时器
  • intervalRange 定时器 固定次数
  • timer 延迟执行
  • defer 将获取数据流的操作延迟到订阅时刻
  • fromCallback 将获取数据流的操作延迟到订阅时刻和defer的区别在于这个只能是一次事件

操作 Observable

  • map 一个数据转为另一个数据
  • flatMap 一个onNext数据 转换成一个 Observable :需要第一个数据源得到后异步获取
  • concatMap 和flatMap 类似,只不过只有当 上一个执行完onComplete 才会执行下一个
  • compose 一个Observable 转换成另一个Observable ;可以将一些“操作步骤”进行封装
  • toList:将多个onNext 生成一个 Single 类似切面编程
  • doOnSubscribe 在 onSubscribe 之前做一些动作
  • doOnNext: 再onNext 之前做一些动作
  • doOnComplete 在 onComplete之前做一些动作
  • doAfterSubscribe
  • doAfterNext 类似
  • doAfterComplete

线程切换

observeOn: 切换observeOn 之后的线程 subscribeOn 切换observeOn 之前线程;如果不存在observeOn就是所有线程;当出现多个subscribeOn的时候只有第一个生效

合并observable

  • zip 合并后onNext相加
  • concat 第一个执行完onComplete 才会执行第二个
  • merge 按照onNext的时间顺序 交叉合并

订阅者

  • Observer
  • Consumer

    RxJava 实际应用

  • 数据变换处理
  • 多层网络请求

    RxJava中可能存在的一些疑惑

    1
    2
    3
    4
    5
    6
    
    Observable.create(new ObservableOnSubscribe<Object>() {
      @Override
      public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
    
      }
    });
    

    中subscribe什么时候执行? observable.subscribe()的时候

线程问题 在不主动切换线程的情况下 获取数据的线程就是 emitter.onNext 所在线程 订阅线程(ObservableOnSubscribe.subscribe)所在线程 在没有主动切换线程的情况下就是是创建Observable所在线程

问题:在下面情况下 observeOn 控制的是哪一层 ,doOnNext 又是控制的哪一层

1
2
3
4
5
Observable.just()
    .flatmap
    .faltmap
    .faltmap
    .observeOn

结论:

  • doOnNext 作用的是当前Observable,doOnSubscribe作用于整个流程的最开始(最初的Observable)
  • observeOn 作用于observeOn之后的部分

问题在 observeOn subscribeOn 后 doOnNext doOnSubscribe doOnError 是在哪个线程

RxJava 和传统观察者模式有什么不同

RxJava 是数据流操作 定义的是数据之间的变换关系
观察者模式,通常是基于事件

  • 观察者:只有先订阅才能接收到事件;RxJava 订阅后才发射数据
  • 观察者同一事件能被所有订阅者接收,而在RxJava中一个observer对应一个emitter,一个emitter发射出去的数据只能被一个observer接收

PublishSubject 更类似于传统观察者模式

多次订阅一个Observable 事件是分开的,因为每次订阅都会生成不同的订阅器
多次订阅PublishSubject 事件是相同的,可以多次订阅,类似传统观察者模式,但是onError和onComplete会阻断后续事件

使用RxJava曾遇到的问题(需要注意的问题)

  • subscribe(Consumer())这种订阅没有做异常处理,如果发生异常会导致应用崩溃
  • RxJava只能捕获当前同步异常,如下情况是无法通过RxJava捕获的
    1
    2
    3
    4
    5
    
        Observable.create {
               Thread{
                    exception
               }
       }
    
  • 如果 disposable.dispose 之后 还执行了emitter.onError()就会因为异常没有捕获而导致应用崩溃,所以emitter使用之前需要判断一下emitter.isDisposed
  • doOnError 不能防止异常外抛
  • doOn… 等事件中要做防内存泄漏问题,RxLifecycle disposable 并不能防止这种形式的内存泄漏,具体操作可以参照 hi-http
  • 如果doOn事件中需要在主线中操作,要在doOn之前调用线程切换

RxJava资料链接

  • https://github.com/ReactiveX/RxJava
  • https://github.com/ReactiveX/RxAndroid 切换主线程
  • https://github.com/JakeWharton/RxBinding 对Android View事件的扩展,
  • https://github.com/tbruyelle/RxPermissions Android 权限管理
  • https://github.com/trello/RxLifecycle 生命周期管理,及时取消订阅
  • https://github.com/AndroidKnife/RxBus 事件总线
  • https://github.com/JakeWharton/RxRelay RxJava 在发射 onError或者onComplete数据后将不能在发射后续数据,RxRelay就是为了解决这个问题而产生的

    RxJava问题讨论

  • 在合并多个Observer的时候线程切换是什么结果
  • onNext 时间抛出异常会中断数据流吗

    TODO

  • blocking
  • retry
  • repeat
  • throttle 节流
  • debounce 去抖
  • throttleFirst
  • sample
  • retryWhen onComplete事件