Rxjava
# Rxjava
# 根本原理
自己写代码实现rxjava,怎么做? 如何实现: 链式,线程切换,各操作符? 一个类+两个接口即可演示完核心原理:
参考视频教程: https://www.bilibili.com/video/BV19h411172f?p=2
demo代码: https://github.com/hss01248/aop-android/tree/master/rxjavademo/src/main/java/com/hss01248/rxjavademo
public interface MyFunc<R,T> {
R apply(T t);
}
public interface MyObserver<T> {
void onNext(T t);
void onError(Throwable throwable);
void onComplete();
}
2
3
4
5
6
7
8
9
10
11
12
# 类:
# 定义:
public abstract class MyObservable<T> {
public abstract void subscrib(MyObserver<T> observer);
}
2
3
4
# 链式调用之创建:
public static <T> MyObservable<T> create(MyObservable<T> observable){
return observable;
}
2
3
# 变换/Map:
public <R> MyObservable<R> map(MyFunc<R,T> func){
return new MyObservable<R>() {
@Override
public void subscrib(MyObserver<R> observer) {
MyObservable.this.subscrib(new MyObserver<T>() {
@Override
public void onNext(T t) {
//往下执行时,进行函数调用
R r = func.apply(t);
observer.onNext(r);
}
@Override
public void onError(Throwable throwable) {
observer.onError(throwable);
}
@Override
public void onComplete() {
observer.onComplete();
}
});
}
};
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 订阅时切换线程:
/**
* 切换上流线程示例
* @return
*/
public MyObservable<T> subscribOnIO(){
return new MyObservable<T>(){
@Override
public void subscrib(MyObserver<T> observer) {
new Thread(new Runnable() {
@Override
public void run() {
//往上订阅时切换线程
MyObservable.this.subscrib(observer);
}
}).start();
}
};
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 统一切换下游线程:
public MyObservable<T> observerOnMainThread(){
return new MyObservable<T>(){
@Override
public void subscrib(MyObserver<T> observer) {
MyObservable.this.subscrib(new MyObserver<T>() {
@Override
public void onNext(T t) {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
//往下执行时切换线程
observer.onNext(t);
}
});
}
@Override
public void onError(Throwable throwable) {
//同上
}
@Override
public void onComplete() {
//同上
}
});
}
};
}
public MyObservable<T> observerOnBackThread(){
return new MyObservable<T>(){
@Override
public void subscrib(MyObserver<T> observer) {
MyObservable.this.subscrib(new MyObserver<T>() {
@Override
public void onNext(T t) {
new Thread(new Runnable() {
@Override
public void run() {
//往下执行时切换线程
observer.onNext(t);
}
}).start();
}
@Override
public void onError(Throwable throwable) {
//同上
}
@Override
public void onComplete() {
//同上
}
});
}
};
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# subscrib和onNext的调用时机和线程
subscrib什么时候调用
onNext什么时候调用
如下图,先顺序执行,再逆序执行,再顺序执行
比如,下方的调用,aop日志如下:
# 使用:
MyObservable.create(new MyObservable<String>() {
@Override
public void subscrib(MyObserver<String> observer) {
Log.d("create:", "," + Thread.currentThread().getName());
for (int i = 0; i < 10; i++) {
observer.onNext(i + "");
}
}
})//.subscribOnIO() //(打开时为上方截图的调用链)
.map(new MyFunc<Integer, String>() {
@Override
public Integer apply(String s) {
Log.d("map1:", "integer:" + s + "," + Thread.currentThread().getName());
return Integer.parseInt(s) * 2;
}
})
.map(new MyFunc<String, Integer>() {
@Override
public String apply(Integer integer) {
Log.d("map2:", "integer:" + integer + "," + Thread.currentThread().getName());
return integer + "->String";
}
}).subscribOnIO()
.observerOnMainThread()
.subscrib(new MyObserver<String>() {
@Override
public void onNext(String s) {
Log.d("onNext:", "finnaly:" + s + "," + Thread.currentThread().getName());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
Log.d("onComplete:", "finnaly: onComplete");
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Log:
# 将中间注释打开,也就是多次切换线程:
.subscribOnIO() .observerOnMainThread()
# 子线程
}).map(new MyFunc<Integer, String>() {
@Override
public Integer apply(String s) {
Log.d("map1:", "integer:" + s + "," + Thread.currentThread().getName());
return Integer.parseInt(s) * 2;
}
}).subscribOnIO()//上面使用子线程
.observerOnMainThread()//后续使用主线程
.map(new MyFunc<String, Integer>() {
@Override
public String apply(Integer integer) {
Log.d("map2:", "integer:" + integer + "," + Thread.currentThread().getName());
return integer + "->String";
}
})
.observerOnBackThread()//后续切到子线程
.subscrib(new MyObserver<String>() {
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 一些问题
# 工程化->如何保证使用Rxjava绝对不崩溃?
- 对MyObserver的onNext(),onComplete()进行try-catch, catch后调用onError()
- 如果onError里也崩溃呢? 对onError也try-catch包裹,提供一个全局处理异常的回调,供框架使用者初始化时调用.
# 链式调用流程中,Observable数量的变化方法:
- 一个Observable怎么样能在链式流中变成多个Observable?
- 上面已经变成的多个Observable怎么能够在下游组合成一个Observable?以及怎么组合成一次回调?
# 一些经典函数方法的实现:
//todo
# 学习资料/博客
https://github.com/Carson-Ho/RxJavaLearningMaterial
# 代码示例
# 学习Demo app
https://github.com/KunMinX/RxJava2-Operators-Magician
https://github.com/leeowenowen/rxjava-examples
https://github.com/xinghongfei/Hello-RxJava
# 开发工具
https://github.com/akaita/RxJava2Debug exception中显示真正的原因
https://github.com/davidmoten/rxjava2-extras 各种transformers和工具
https://github.com/akarnokd/RxJavaExtensions
https://github.com/skyNet2017/frodo2 logging RxJava (opens new window) Observables (opens new window) and Subscribers (opens new window) outputs on the logcat
# Android开发套件
https://github.com/Piasy/RxScreenshotDetector
rxjava基于集合的并发
https://www.jianshu.com/p/1e4d194bb782
# 存储和缓存相关库
https://github.com/Gridstone/RxStore
https://github.com/VictorAlbertos/RxCache
# 视频
https://www.bilibili.com/video/BV1Wt411d7N2 Java编程方法论-响应式 之 Rxjava2源码设计实现解读 全集(已完结)
https://www.bilibili.com/video/BV1iT4y137PZ