RxJava 源码分析

Posted by OOFTF Blog on July 27, 2021

代码使用的是我手写代码,逻辑和 RxJava 一致,细节可能不同。

手写 RxJava 项目地址 私有仓库

主流程分析

Observable

1
2
3
4
5
6
7
8
9
10
11
abstract class Observable<T> {
    fun subscribe(observable: Observer<T>) {
        subscribeActual(observable)
    }
    protected abstract fun subscribeActual(observer:Observer<T>)
    companion object {
        fun <T> create(oos:ObservableOnSubscribe<T>): Observable<T> {
            return ObservableCreate<T>(oos)
        }
    }
}

总结:

  • Observable 是被观察对象,提供 subscribe 用于“被订阅”
  • 对外提供静态方法 create ,用于创建 Observable 对象;
  • create 方法的入参是 ObservableOnSubscribe 可以被称作为数据源,其作用是使用发射器 Emitter 发射数据
  • 最终返回的是 ObservableCreate 对象,各个组件的粘合就是在 ObservableCreate 中完成的

ObservableCreate

1
2
3
4
5
6
7
8
9
class ObservableCreate<T>(val source: ObservableOnSubscribe<T>) : Observable<T>() {

    override fun subscribeActual(observer: Observer<T>) {
        val emitter = CreateEmitter(observer)
        observer.onSubscribe(emitter)
        source.subscribe(emitter)
    }

}

总结:

  • 当使用 subscribe 订阅事件的时候,就会调用 subscribeActual
  • subscribeActual 内部首先创建 CreateEmitter 对象,触发 Observer.onSubscribe 事件
  • 调用 ObservableOnSubscribe.subscribe 方法,传入 Emitter 发射器,ObservableOnSubscribe.subscribe 内部会使用 Emitter 发射数据

CreateEmitter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class CreateEmitter<T>(val observer: Observer<T>) : Emitter<T>, Disposable {
    var disposed = false
    override fun onError(e: Throwable) {
        if (!isDisposed()) {
            observer.onError(e)
            dispose()
        }
    }

    override fun onNext(item: T) {
        if (!isDisposed()) {
            observer.onNext(item)
        }

    }

    override fun onComplete() {
        if (!isDisposed()) {
            observer.onCompleted()
            dispose()
        }

    }

    override fun dispose() {
        disposed = true
    }

    override fun isDisposed(): Boolean {
        return disposed
    }

}
  • CreateEmitter 用于将 onNext、onError、onComplete 等事件经过 isDisposed 判断后转交给 Observer

RxJava 内存泄漏

通过 CreateEmitter 代码我们可以知道 Disposable.disposed 只是防止 Observer 接收事件,CreateEmitter 仍然持有 Observer 的对象。但是 ObservableOnSubscribe 内部实现可以通过 setDisposable 或者 setCancellable 监听 disposed 方法的调用,切断 CreateEmitter 和 ObservableOnSubscribe 的引用关系 因此我们通过 ObservableOnSubscribe 创建延迟 Observable 对象的时候要注意通过 setDisposable 或者 setCancellable 在 disposed 方法中 切断 CreateEmitter 和 ObservableOnSubscribe 引用关系,这样才能解决内存泄漏问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 错误写法
class ErrorObservableOnSubscribe : ObservableOnSubscribe<String> {
    override fun subscribe(emitter: ObservableEmitter<String>) {
        Handler().postDelayed({
            if (!emitter.isDisposed) {
                emitter.onNext("")
                emitter.onComplete()
            }
        }, 10000000)
    }
}

// 正确写法
class RightObservableOnSubscribe : ObservableOnSubscribe<String> {
    val handler = Handler()
    override fun subscribe(emitter: ObservableEmitter<String>) {
        val run = Runnable {
            if (!emitter.isDisposed) {
                emitter.onNext("")
                emitter.onComplete()
            }
        }
        handler.postDelayed(run, 10000000)
        emitter.setCancellable {
            handler.removeCallbacks(run)
        }
    }
}

以 map 为代表分析操作符(手写代码,非源代码)

  • 添加方法 Observable.map
  • 添加类 ObservableMap:此处使用装饰器模式,对上游原 Observable 进行封装拓展,重写 subscribeActual 订阅上游 Observable
  • 添加类 ObserverMap 从上游 Observable 拿到数据,经过 mapper 转换,传输到下游 observer 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
fun <R> Observable.map(mapper: (T) -> R): Observable<R> {
    return ObservableMap(this, mapper)
}

class ObservableMap<T, R>(val upObservable: Observable<T>, val mapper: (T) -> R) : Observable<R>() {

    override fun subscribeActual(observer: Observer<R>) {
        upObservable.subscribe(ObserverMap(observer, mapper))
    }
}

class ObserverMap<U,D>(val observer:Observer<D>,val mapper:(U)->D):Observer<U>,Disposable {
    var upDisposable: Disposable? = null
    override fun onError(t: Throwable) {
        observer.onError(t)
    }

    override fun onNext(t: U) {
        observer.onNext(mapper.invoke(t))
    }

    override fun onCompleted() {
        observer.onCompleted()
    }

    override fun onSubscribe(disposable: Disposable) {
        upDisposable = disposable
        observer.onSubscribe(this)
    }

    override fun dispose() {
        upDisposable?.dispose()
    }

    override fun isDisposed(): Boolean {
        return upDisposable?.isDisposed() == true
    }
}

线程切换分析

线程调度器 Scheduler

1
2
3
4
5
6
interface Scheduler {
    fun createWork():Work
    interface Work{
        fun schedule(run:Runnable)
    }
}

分析:
Scheduler 是线程调度器,其中有个方法 createWork 返回 Work 对象,Work.schedule 方法会在指定线程中调用传入 Runnable 的 run 方法

Observable.observeOn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
fun Observable.observeOn(scheduler:Scheduler):Observable<T>{
    return ObservableObserveOn(this,scheduler)
}

class ObservableObserveOn<T>(val actual: Observable<T>, val scheduler: Scheduler) :
    Observable<T>() {
    override fun subscribeActual(observer: Observer<T>) {
        actual.subscribe(ObserverObserveOn(observer, scheduler.createWork()))
    }

    class ObserverObserveOn<T>(val downStream: Observer<T>, val work: Scheduler.Work) : Observer<T>,
        Disposable {
        lateinit var disposable: Disposable
        override fun onError(t: Throwable) {
            work.schedule {
                downStream.onError(t)
            }
        }

        override fun onNext(t: T) {
            work.schedule {
                downStream.onNext(t)
            }
        }

        override fun onCompleted() {
            work.schedule {
                downStream.onCompleted()
            }
        }

        override fun onSubscribe(disposable: Disposable) {
            downStream.onSubscribe(this)
        }

        override fun dispose() {
            disposable.dispose()
        }

        override fun isDisposed(): Boolean {
            return disposable.isDisposed()
        }

    }
}

分析:

  • 同样采用装饰器模式新建 ObservableObserveOn 类,在 subscribeActual 方法中利用 ObserverObserveOn 从上游数据获取到数据,然后利用 Work.schedule 将调用下游 Observer 的调用切换到指定线程;
  • 分析代码可知 Observable.observeOn 影响了下游 Observer 的 onError、onCompleted、onNext 方法的调用线程,但是并没有影响 onSubscribe 的调用线程。

Observable.subscribeOn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
fun Observable.subscribeOn(scheduler:Scheduler):Observable<T>{
    return ObservableSubscribeOn(this,scheduler)
}

class ObservableSubscribeOn<T>(val actual: Observable<T>, val scheduler: Scheduler) :
    Observable<T>() {
    override fun subscribeActual(observer: Observer<T>) {
        val parent = ObserverSubscribeOn(observer)
        // 为了防止 Observer.onSubscribe 受到线程切换的影响,在此处调用 onSubscribe 方法,同时使用 ObserverSubscribeOn 避免下游 Observer.onSubscribe 再次被调用
        observer.onSubscribe(parent)
        scheduler.createWork().schedule {
            actual.subscribe(parent)
        }
    }

    class ObserverSubscribeOn<T>(val downStream: Observer<T>) : Observer<T>, Disposable {
        lateinit var disposable: Disposable
        override fun onError(t: Throwable) {
            downStream.onError(t)
        }

        override fun onNext(t: T) {
            downStream.onNext(t)
        }

        override fun onCompleted() {
            downStream.onCompleted()
        }

        override fun onSubscribe(disposable: Disposable) {
            this.disposable = disposable
        }

        override fun dispose() {
            disposable.dispose()
        }

        override fun isDisposed(): Boolean {
            return disposable.isDisposed()
        }

    }
}    

分析:

  • 同样采用装饰器模式新建 ObservableSubscribeOn ,在 subscribeActual 方法中,使用 scheduler.createWork().schedule 将上游 Observable.subscribe 的调用切换到指定线程
  • 分析代码可知 Observable.subscribeOn 只影响了 上游 Observable.subscribe 方法的调用线程,Observer 的方法都没有受影响
  • 综合 subscribeOn 和 observeOn 的实现代码可知:下游 Observer.onSubscribe 方法的调用不受 subscribeOn 和 observeOn 的影响,与调用 Observable.subscribe 的线程相同