代码使用的是我手写代码,逻辑和 RxJava 一致,细节可能不同。
手写 RxJava 项目地址 私有仓库
主流程分析
Observable
1
2
3
4
5
6
7
8
9
10
11
| abstract class Observable<T> {
fun subscribe(observable: Observer<T>) {
subscribeActual(observable)
}
protected abstract fun subscribeActual(observer:Observer<T>)
companion object {
fun <T> create(oos:ObservableOnSubscribe<T>): Observable<T> {
return ObservableCreate<T>(oos)
}
}
}
|
总结:
- Observable 是被观察对象,提供 subscribe 用于“被订阅”
- 对外提供静态方法 create ,用于创建 Observable 对象;
- create 方法的入参是 ObservableOnSubscribe 可以被称作为数据源,其作用是使用发射器 Emitter 发射数据
- 最终返回的是 ObservableCreate 对象,各个组件的粘合就是在 ObservableCreate 中完成的
ObservableCreate
1
2
3
4
5
6
7
8
9
| class ObservableCreate<T>(val source: ObservableOnSubscribe<T>) : Observable<T>() {
override fun subscribeActual(observer: Observer<T>) {
val emitter = CreateEmitter(observer)
observer.onSubscribe(emitter)
source.subscribe(emitter)
}
}
|
总结:
- 当使用 subscribe 订阅事件的时候,就会调用 subscribeActual
- subscribeActual 内部首先创建 CreateEmitter 对象,触发 Observer.onSubscribe 事件
- 调用 ObservableOnSubscribe.subscribe 方法,传入 Emitter 发射器,ObservableOnSubscribe.subscribe 内部会使用 Emitter 发射数据
CreateEmitter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| class CreateEmitter<T>(val observer: Observer<T>) : Emitter<T>, Disposable {
var disposed = false
override fun onError(e: Throwable) {
if (!isDisposed()) {
observer.onError(e)
dispose()
}
}
override fun onNext(item: T) {
if (!isDisposed()) {
observer.onNext(item)
}
}
override fun onComplete() {
if (!isDisposed()) {
observer.onCompleted()
dispose()
}
}
override fun dispose() {
disposed = true
}
override fun isDisposed(): Boolean {
return disposed
}
}
|
- CreateEmitter 用于将 onNext、onError、onComplete 等事件经过 isDisposed 判断后转交给 Observer
RxJava 内存泄漏
通过 CreateEmitter 代码我们可以知道 Disposable.disposed 只是防止 Observer 接收事件,CreateEmitter 仍然持有 Observer 的对象。但是
ObservableOnSubscribe 内部实现可以通过 setDisposable 或者 setCancellable 监听 disposed 方法的调用,切断 CreateEmitter 和 ObservableOnSubscribe 的引用关系
因此我们通过 ObservableOnSubscribe 创建延迟 Observable 对象的时候要注意通过 setDisposable 或者 setCancellable 在 disposed 方法中 切断 CreateEmitter 和 ObservableOnSubscribe 引用关系,这样才能解决内存泄漏问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| // 错误写法
class ErrorObservableOnSubscribe : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
Handler().postDelayed({
if (!emitter.isDisposed) {
emitter.onNext("")
emitter.onComplete()
}
}, 10000000)
}
}
// 正确写法
class RightObservableOnSubscribe : ObservableOnSubscribe<String> {
val handler = Handler()
override fun subscribe(emitter: ObservableEmitter<String>) {
val run = Runnable {
if (!emitter.isDisposed) {
emitter.onNext("")
emitter.onComplete()
}
}
handler.postDelayed(run, 10000000)
emitter.setCancellable {
handler.removeCallbacks(run)
}
}
}
|
以 map 为代表分析操作符(手写代码,非源代码)
- 添加方法 Observable.map
- 添加类 ObservableMap:此处使用装饰器模式,对上游原 Observable 进行封装拓展,重写 subscribeActual 订阅上游 Observable
- 添加类 ObserverMap 从上游 Observable 拿到数据,经过 mapper 转换,传输到下游 observer 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| fun <R> Observable.map(mapper: (T) -> R): Observable<R> {
return ObservableMap(this, mapper)
}
class ObservableMap<T, R>(val upObservable: Observable<T>, val mapper: (T) -> R) : Observable<R>() {
override fun subscribeActual(observer: Observer<R>) {
upObservable.subscribe(ObserverMap(observer, mapper))
}
}
class ObserverMap<U,D>(val observer:Observer<D>,val mapper:(U)->D):Observer<U>,Disposable {
var upDisposable: Disposable? = null
override fun onError(t: Throwable) {
observer.onError(t)
}
override fun onNext(t: U) {
observer.onNext(mapper.invoke(t))
}
override fun onCompleted() {
observer.onCompleted()
}
override fun onSubscribe(disposable: Disposable) {
upDisposable = disposable
observer.onSubscribe(this)
}
override fun dispose() {
upDisposable?.dispose()
}
override fun isDisposed(): Boolean {
return upDisposable?.isDisposed() == true
}
}
|
线程切换分析
线程调度器 Scheduler
1
2
3
4
5
6
| interface Scheduler {
fun createWork():Work
interface Work{
fun schedule(run:Runnable)
}
}
|
分析:
Scheduler 是线程调度器,其中有个方法 createWork 返回 Work 对象,Work.schedule 方法会在指定线程中调用传入 Runnable 的 run 方法
Observable.observeOn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| fun Observable.observeOn(scheduler:Scheduler):Observable<T>{
return ObservableObserveOn(this,scheduler)
}
class ObservableObserveOn<T>(val actual: Observable<T>, val scheduler: Scheduler) :
Observable<T>() {
override fun subscribeActual(observer: Observer<T>) {
actual.subscribe(ObserverObserveOn(observer, scheduler.createWork()))
}
class ObserverObserveOn<T>(val downStream: Observer<T>, val work: Scheduler.Work) : Observer<T>,
Disposable {
lateinit var disposable: Disposable
override fun onError(t: Throwable) {
work.schedule {
downStream.onError(t)
}
}
override fun onNext(t: T) {
work.schedule {
downStream.onNext(t)
}
}
override fun onCompleted() {
work.schedule {
downStream.onCompleted()
}
}
override fun onSubscribe(disposable: Disposable) {
downStream.onSubscribe(this)
}
override fun dispose() {
disposable.dispose()
}
override fun isDisposed(): Boolean {
return disposable.isDisposed()
}
}
}
|
分析:
- 同样采用装饰器模式新建 ObservableObserveOn 类,在 subscribeActual 方法中利用 ObserverObserveOn 从上游数据获取到数据,然后利用 Work.schedule 将调用下游 Observer 的调用切换到指定线程;
- 分析代码可知 Observable.observeOn 影响了下游 Observer 的 onError、onCompleted、onNext 方法的调用线程,但是并没有影响 onSubscribe 的调用线程。
Observable.subscribeOn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| fun Observable.subscribeOn(scheduler:Scheduler):Observable<T>{
return ObservableSubscribeOn(this,scheduler)
}
class ObservableSubscribeOn<T>(val actual: Observable<T>, val scheduler: Scheduler) :
Observable<T>() {
override fun subscribeActual(observer: Observer<T>) {
val parent = ObserverSubscribeOn(observer)
// 为了防止 Observer.onSubscribe 受到线程切换的影响,在此处调用 onSubscribe 方法,同时使用 ObserverSubscribeOn 避免下游 Observer.onSubscribe 再次被调用
observer.onSubscribe(parent)
scheduler.createWork().schedule {
actual.subscribe(parent)
}
}
class ObserverSubscribeOn<T>(val downStream: Observer<T>) : Observer<T>, Disposable {
lateinit var disposable: Disposable
override fun onError(t: Throwable) {
downStream.onError(t)
}
override fun onNext(t: T) {
downStream.onNext(t)
}
override fun onCompleted() {
downStream.onCompleted()
}
override fun onSubscribe(disposable: Disposable) {
this.disposable = disposable
}
override fun dispose() {
disposable.dispose()
}
override fun isDisposed(): Boolean {
return disposable.isDisposed()
}
}
}
|
分析:
- 同样采用装饰器模式新建 ObservableSubscribeOn ,在 subscribeActual 方法中,使用 scheduler.createWork().schedule 将上游 Observable.subscribe 的调用切换到指定线程
- 分析代码可知 Observable.subscribeOn 只影响了 上游 Observable.subscribe 方法的调用线程,Observer 的方法都没有受影响
- 综合 subscribeOn 和 observeOn 的实现代码可知:下游 Observer.onSubscribe 方法的调用不受 subscribeOn 和 observeOn 的影响,与调用 Observable.subscribe 的线程相同