提升研发效率、简化逻辑、规范研发流程一直被技术团队所视为重中之重。而在日常工作学习中,RxJava的出现让我们可以很好的解决以上这几个问题。
通过使用RxJava我们可以做到以下几点
简化逻辑、解耦模块间的依赖关系
完备的操作符来满足绝大多数的场景
封装好的并行异步操作,提升效能
优雅的处理异常
希望通过本文的介绍,可以帮助大家可以对RxJava有一个基本的了解。并且借助RxJava来提升自己的研发效率和了解到反应式函数式编程思想的魅力。
RxJava的通常的解释是通过观察者模式来实现的反应式函数式编程框架,下面就让我们简单的逐一说明这两个概念。
观察者模式是RxJava实现的基本原理,本质上来说就是一个对象发生改变,那么所依赖于他的对象都会收到通知。
反应式函数式编程思想中其实是有两个概念的:反应式、函数式。为了说明这两个概念我们分别同日常常用的命令式编程来做一下对比。
反应式-命令式对比
反应式是基于数据流中的事件触发进行反应处理
命令式是指令序列,即一步一步的告诉计算机应该做什么
函数式-命令式对比
函数式关心的是数据的映射关系
命令式关心的是解决问题的步骤
RxJava通过提供的操作符对数据流进行处理,同时通过背压策略保证了不会因为消费速度过慢而导致的内存溢出问题。
以下是流、操作符、背压这三个概念的详细介绍
流的类型
热流:不管是否有订阅者,它都可能已经开始发布事件了
常见的热流:ConnectableObservable
通过冷流调用publish或者replay方法进行转换,下文中高级操作符有详细介绍
冷流:延迟执行的,并且在有人对其感兴趣时才会开始发布事件
常见的冷流:RxJava通过create或者from等操作符创建的流都是冷流
可以通过热流调用refCount转换,下文中高级操作符有详细介绍
流的创建
RxJava中对于流的创建方式分为手动的调用create方式创建与通过便捷的from等操作符的方式创建
create方式
Observable<Integer> createTest = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Utils.log("before");
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
Utils.log("after");
}
});
便捷方式
//from
//from操作符可以接收数组、集合等
List<Integer> numbers = new ArrayList<>();
Observable.from(numbers);
//just
Observable.just(1, 2, 3, 4, 5, 6);
//range
//从from开始生成n个整型数字。例如,range(5, 3)将会发布5、6和7,然后正常完成。每个订阅者会接收到一组相同的数字
Observable.range(5, 3);
流的订阅
RxJava中对于流的订阅是通过调用被订阅流的subscribe方法来与观察者创建关系实现的
创建观察者
private Action1<Integer> observer = x -> {
System.out.println(x);
};
创建关系
Observable<Integer> observable = Observable.range(5, 3);
//订阅流,对于冷流来说只有订阅了才会发布事件
observable.subscribe(observer);
简便写法
Observable<Integer> observable = Observable.range(5, 3);
//订阅流,对于冷流来说只有订阅了才会发布事件
observable.subscribe(x -> {
System.out.println(x);
});
RxJava中操作符的强大决定了RxJava能够帮助我们应对绝大多数场景。下面我们将对基础操作符、高级操作符、多数据流操作符这三种操作符进行详细的介绍
基础操作符
map
数据转换,A->B
//原有数据流testObs不会有影响
Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5);
Observable<String> testObsMap = testObs.map(s -> s + "test");
testObs.subscribe(Utils::log);
testObsMap.subscribe(Utils::log);
filter
数据过滤
Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> testObsFilter = testObs.filter(s -> s.equals(1));
testObsFilter.subscribe(Utils::log);
flatMap
嵌套数据展平
异步并行
List<Order> user1Orders = new ArrayList<Order>(){{
add(Order.builder().id("user1-order1").build());
add(Order.builder().id("user1-order2").build());
add(Order.builder().id("user1-order3").build());
}};
List<Order> user2Orders = new ArrayList<Order>(){{
add(Order.builder().id("user2-order1").build());
add(Order.builder().id("user2-order2").build());
add(Order.builder().id("user2-order3").build());
}};
User user1 = User.builder().name("user1").orders(user1Orders).build();
User user2 = User.builder().name("user2").orders(user2Orders).build();
Observable<User> testObs = Observable.just(user1, user2);
//下面的两种写法是等价的,第二种相当于是第一种的便捷式写法
Observable<Order> orderObs = testObs.map(User::getOrders).flatMap(Observable::from);
Observable<Order> orderObs2 = testObs.(User::getOrders);
testObs.subscribe(u -> Utils.log("订阅user流:" + u.getName()));
orderObs.subscribe(o -> Utils.log("订阅order流1:" + o.getId()));
orderObs2.subscribe(o -> Utils.log("订阅order流2:" + o.getId()));
concatMap
嵌套数据展平
同步
写法同flatMap一致
delay
延迟发布
//统一的延迟n间隔时间后发布
Observable<Integer> testObs = Observable.just(1, 2, 3).delay(5000, TimeUnit.MILLISECONDS);
//发射的每一个值延长指定的策略时间后发布
Observable<Integer> testObs2 = Observable.just(1, 2, 3).delay(i -> timer(i, TimeUnit.SECONDS));
testObs.subscribe(i -> Utils.log("testObs:" + i ));
testObs2.subscribe(i -> Utils.log("testObs2:" + i ));
doOnNext
对经过的流进行日志监控操作,不会对数据造成影响
Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5)
//这里打印的是上一个处理完之后的内容
.doOnNext(s -> Utils.log("first doOnNext: " + s))
.filter(s -> s / 2 == 0)
.doOnNext(s -> Utils.log("second doOnNext: " + s))
.map(s -> s + 10)
.doOnNext(s -> Utils.log("third doOnNext: " + s));;
testObs.subscribe(Utils::log);
doOnError
对经过的流进行错误监控操作,不会对数据造成影响
高级操作符
scan
对每一项做处理,并且将每一项的处理结果输出
Observable<BigInteger> factorials = Observable.range(2, 10).scan(BigInteger.ONE, (big, current) -> big.add(BigInteger.valueOf(current)));
factorials.subscribe(Utils::log);
reduce
对每一项做处理,并且将总处理结果输出
Observable<BigInteger> factorials = Observable.range(2, 10).reduce(BigInteger.ONE, (big, current) -> big.add(BigInteger.valueOf(current)));
factorials.subscribe(Utils::log);
collect
数据收集,每次使用的都是相同的可变累加器
//将数据收集到List中
Observable<List<Integer>> all = Observable.range(2, 10)
.collect(ArrayList::new, List::add);
all.subscribe(Utils::log);
distinct
剔除流中的重复事件,必须要记住到目前为止出现的所有事件,内存不可控
Observable<Integer> obs = Observable.just(1, 2, 3, 1, 2);
//x -> x,如果数据是对象,则此处为对象的某一个属性
Observable<Integer> obs2 = obs.distinct(x -> x);
obs2.subscribe(Utils::log);
distinctUntilChanged
distinctUntilChanged只会当改变时才会发布事件,也就是说它只需要记住上一个事件,内存可控
Observable<Integer> obs = Observable.just(1, 2, 3, 1, 2);
//x -> x,如果数据是对象,则此处为对象的某一个属性
Observable<Integer> obs2 = obs.distinctUntilChanged(x -> x);
obs2.subscribe(Utils::log);
groupBy
groupBy 用来将数据源通过一定的规则进行分组
Order order1 = Order.builder().id("1").build();
Order order2 = Order.builder().id("1").build();
Order order3 = Order.builder().id("2").build();
Order order4 = Order.builder().id("1").build();
Order order5 = Order.builder().id("5").build();
Order order6 = Order.builder().id("1").build();
Order order7 = Order.builder().id("6").build();
Observable<Order> obs = Observable.just(order1, order2, order3, order4, order5, order6, order7);
obs.groupBy(order -> {
if(order.getId().equals("1")){
return 1;
}else if(order.getId().equals("2")){
return 2;
}else {
return 3;
}
}).subscribe(groupedObservable ->
groupedObservable.subscribe(o -> Utils.log("key is " + groupedObservable.getKey() + " value is " + o)));
publish
流转换
将冷流转换为热流
多订阅者相互独立
订阅者只会收到订阅之后发射的数据
ConnectableObservable<Integer> testObs = Observable.just(1, 2, 3, 4, 5).publish();
//多个订阅者使用一个数据源,无论有没有订阅者只要调用了connect方法,都会发射数据
testObs.connect();
replay(n)
流转换
将冷流转换为热流
多订阅者相互独立
订阅者在订阅后可以收到Cold Observable在订阅之前发送的N个数据
用法与publish一致,区别在于需要多传递一个bufferSize参数
refCount
流转换
将热流转换为冷流
Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5).publish().refCount();
share
建立连接
断开连接
流转换
等价于publish().refCount()
为了简化管理多个订阅者的复杂度
Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5).share();
多数据流操作符
merge
流拼接
把多个被观察者合并到一个被观察者身上输出,但是可能会让合并的被观察者发射的数据交错。
Observable<Integer> testObs1 = Observable.just(1, 3, 5);
Observable<Integer> testObs2 = Observable.just(2, 4, 8, 9, 6);
Observable<Integer> mergeObs = Observable.merge(testObs1, testObs2);
mergeObs.subscribe(Utils::log);
concat
流拼接
把多个被观察者合并到一个被观察者身上输出,肯定是不会发生数据交错发射的情况。
用法等同于merge
zip
流合并
按照给定的规则将两个被观察者的数据合并
Observable<Integer> testObs1 = Observable.just(1, 2, 3);
Observable<Integer> testObs2 = Observable.just(4, 5, 6);
Observable<Integer> testObsZip1 = Observable.zip(testObs1, testObs2, Integer::sum);
testObsZip1.subscribe(Utils::log);
combineLatest
流合并
任意一个上游流产生事件时,就使用另外一个流最新的已知值
Observable.combineLatest(Observable.interval(17, TimeUnit.MILLISECONDS).map(x -> "S" + x),
Observable.interval(10, TimeUnit.MILLISECONDS).map(x -> "F" + x),
(x, y) -> x + ":" + y)
.forEach(System.out::println);
testLatestFrom
流合并
testLatestFrom会遵从主流的节奏走,但是要控制从属流的发布节奏,主流只会使用从属流的最新的值,其余的会丢弃
Observable<String> slow = Observable.interval(17, TimeUnit.MILLISECONDS).map(x -> "S" + x);
Observable<String> fast = Observable.interval(10, TimeUnit.MILLISECONDS).map(x -> "F" + x);
slow.withLatestFrom(fast, (s, f) -> s + ":" + f).forEach(System.out::println);
amb
流合并
amb会订阅上游所操控的所有的流,并且当第一个流的事件发布后,立即取消对其他所有流的订阅
Observable<String> slow = Observable.interval(17, TimeUnit.MILLISECONDS).map(x -> "S" + x)
.doOnSubscribe(() -> Utils.log("subscribe to S"));
Observable<String> fast = Observable.interval(10, TimeUnit.MILLISECONDS).map(x -> "F" + x)
.delay(300, TimeUnit.MILLISECONDS)
.doOnSubscribe(() -> Utils.log("subscribe to F"))
.doOnUnsubscribe(() -> Utils.log("unsubscribe to F"));
slow.ambWith(fast).subscribe(Utils::log);
在实际的使用场景中,往往可能观察者处理数据的速度要比被观察者生产数据速度慢很多,此时很可能出现OOM现象。对此,RxJava提供了“背压”概念来解决这一问题。
背压概念
上游弹射数据速度大于下游消费数据速度,越积越多,内存溢出的现象
RxJava1.*版本中主要是通过对被观察者数据流进行打包缓存、背压策略、以及"拉取-推送"模式来应对背压现象的
打包缓存
buffer操作符打包到List,需要不断地创建List实例,内存不可控
window操作符打包到Observable,内存可控
此方式是对被订阅者做文章
使用缓存类的操作符将其中一部分打包缓存起来,再一点一点的处理其中的事件
使用的操作符是buffer与window
//buffer
Observable.range(1, 7).buffer(3).subscribe(Utils::log);
//window
Observable<Observable<Integer>> testObj = Observable.range(1, 7).window(3);
testObj.subscribe(x -> x.subscribe(y -> {
System.out.println("------------------");
System.out.println(y);
}));
使用背压策略
在内部会保持一个缓冲
首先会消耗其内部缓冲的数据,只有缓冲几乎消耗殆尽的时候,它才会向上游发起请求
消费者无法处理数据,把数据丢弃了
onBackpressureDrop
onBackpressureBuffer
private Observable<Integer> myRange(int from, int count){
return Observable.unsafeCreate(subscriber -> {
int i = from;
while(i < from + count){
if(!subscriber.isUnsubscribed()){
subscriber.onNext(i++);
}else {
return;
}
}
subscriber.onCompleted();
});
}
myRange(1, 100000000)
.map(Dish::new)
//警告之后会立即输出错误
.onBackpressureBuffer(100, () -> log.warn("buffer full"))
.observeOn(Schedulers.io())
.subscribe(x -> {
log.info("Washing: {}", x);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
myRange(1, 100000000)
.map(Dish::new)
.onBackpressureDrop(dish -> log.info("Throw away {}", dish))
.observeOn(Schedulers.io())
.subscribe(x -> {
log.info("Washing: {}", x);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
由推送模式变为了"拉取-推送"模式
SyncOnSubscribe.createStateful
每次消费完再去拉取下一个批次
//这个方法允许返回一个新的状态,而不是改变当前的状态
Observable.OnSubscribe<Long> onSubscribe = SyncOnSubscribe.createStateful(
//生成状态变量,这个变量将会作为参数传递给后续的表达式。
() -> 0L,
//生成下一个值的回调,此时通常会基于当前状态来生成下一个值。这个回调可以自由地改变第一个参数给定的状态。
(cur, observer) -> {
observer.onNext(cur);
return cur + 1;
}
);
Observable<Long> naturals = Observable.unsafeCreate(onSubscribe);
naturals.observeOn(schedulerB).subscribe(x -> {
log.info("Thread-{}-Washing: {}", Thread.currentThread().getName(), x);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
上文中简要的介绍了一下RxJava的用法,接下来通过一个简单的例子来展示RxJava在数据处理方面对于模块间的接耦的优势。
场景
从mongo中导入数据到mysql中
mongo中的数据是嵌套结构,需要打平到mysql中
部分属性存在数据映射关系
分析
从数据关系层面上看我们分为数据抽取、数据转换与数据处理层
考虑到消费者处理数据能力我们采用"拉取-推送"模式
针对嵌套结构转平行结构我们采用flatMap操作符
针对各种数据映射我们采用map操作符
使用doOnNext与doOnError操作符来记录日志
使用RxJava反应式思想最终我们的整体模块图如下
如果使用往常的方式,我们基本是使用接口或者方法去串流程,底层数据抽取一般会使用循环分页的方式去获取。并且打印日志以及错误处理是分散的。
通过使用RxJava结合此模块图我们可以很清晰、简单的构建出我们的应用来完成需求,并且模块间的耦合性很低。
书籍:《RxJava反应式编程》
官网:https://reactivex.io/
本文中所介绍RxJava相关知识是基于RxJava1.*版本。但是大多数的操作符以及理念在2.0及以上版本也是相通的。
下一篇文章《RxJava2.*-分享》将会详细的说明RxJava2中的操作符以及背压知识。