RxJava是Java对于反应式编程的一个实现框架,是一个基于事件的、提供实现强大且优雅的异步调用程序的代码库。18年以来,由淘宝技术部发起的应用架构升级项目,希望通过反应式架构、全异步化的改造,提升系统整体性能和机器资源利用率,减少网络延时,资源的重复使用,并为业务快速创新提供敏捷的架构支撑。在闲鱼的基础链路诸如商品批量更新、订单批量查询等,都利用了RxJava的异步编程能力。
不过,RxJava是入门容易精通难,一不小心遍地坑。今天来一起看下RxJava的使用方式、基本原理、注意事项。
让我们先看下,使用RxJava之前,我们曾经写过的回调代码存在的痛点。
当我们的应用需要处理用户事件、异步调用时,随着流式事件的复杂性和处理逻辑的复杂性的增加,代码的实现难度将爆炸式增长。比如我们有时需要处理多个事件流的组合、处理事件流的异常或超时、在事件流结束后做清理工作等,如果需要我们从零实现,势必要小心翼翼地处理回调、监听、并发等很多棘手问题。
还有一个被称作“回调地狱”的问题,描述的是代码的不可读性。
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
System.out.println(Thread.currentThread().getName() + "----TestRx.subscribe");
List<UserDo> result = userService.getAllUser();
for (UserDo st : result) {emitter.onNext(st);}
}
});
Observable<String> map = observable.map(s -> s.toString());
// 创建订阅关系
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub1 = " + o)/*更新到ui*/);
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub2 = " + o)/*写缓存*/,
e-> System.out.println("e = " + e)),
()->System.out.println("finish")));
Observable
.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable fromCallable");
Thread.sleep(1000); // imitate expensive computation
return "event";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.map(i->{
System.out.println(Thread.currentThread().getName() + "----observable map");
return i;
})
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(Thread.currentThread().getName() + "----inputStr=" + str));
System.out.println(Thread.currentThread().getName() + "----end");
Thread.sleep(2000); // <--- wait for the flow to finish. In RxJava the default Schedulers run on daemon threads
map:见Code 2.2,一对一转换,如同很多流式编程api一样,将上游的每个元素转化成另一个元素
flatMap:一对多转换,将上游的每个元素转化成0到多个元素。类比Java8:Stream.flatMap内返回的是stream,Observerable.flatMap内返回的是Observerable。注意,本方法非常强大,很多api底层都是基于此方法。并且由于flatMap返回的多个Observerable是相互独立的,可以基于这个特点,实现并发。
merge:将两个事件流合并成一个时间流,合并后的事件流的顺序,与上流两个流中元素到来的时间顺序一致。
zip:逐个接收上游多个流的每个元素,并且一对一的组合起来,转换后发送给下游。示例见code3.1
//第一个流每1秒输出一个偶数
Observable<Long> even = Observable.interval(1000, TimeUnit.MILLISECONDS).map(i -> i * 2L);
//第二个流每3秒输出一个奇数
Observable<Long> odd = Observable.interval(3000, TimeUnit.MILLISECONDS).map(i -> i * 2L + 1);
//zip也可以传入多个流,这里只传入了两个
Observable.zip(even, odd, (e, o) -> e + "," + o).forEach(x -> {
System.out.println("observer = " + x);
});
/* 输出如下,可以看到,当某个流有元素到来时,会等待其他所有流都有元素到达时,才会合并处理然后发给下游
observer = 0,1
observer = 2,3
observer = 4,5
observer = 6,7
...
*/
// Single是只返回一个元素的Observable的实现类
Single<String> ob1 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 1");
TimeUnit.SECONDS.sleep(3);
return userService.queryById(1).getName();
});
Single<String> ob2 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 2");
TimeUnit.SECONDS.sleep(1);
return userService.queryById(1).getName();
});
String s = Single.zip(ob1, ob2,
(e, o) -> {System.out.println(e + "++++" + o);
create :最原始的create和subscribe,其他创建方法都基于此
// 返回的子类是ObservableCreate
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("event");
emitter.onNext("event2");
emitter.onComplete();
}
});
// 订阅observable
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread().getName() + " ,s = " + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onComplete");
}
});
just :Observable.just("e1","e2"); 简单的创建一个Observable,发出指定的n个元素。
interval:code 3.1已给出示例,创建一个按一定间隔不断产生元素的Observable,默认执行在Schedulers.comutation()提供的线程池中
defer:产生一个延迟创建的Observable。有点绕:Observable.create等创建出来的被观察者虽然是延迟执行的,只有有人订阅的时候才会真正开始生成数据。但是创建Observable的方法却是立即执行的。而 Observable.defer方法会在有人订阅的时候才开始创建Observable。如代码Code3.4
public String myFun() {
String now = new Date().toString();
System.out.println("myFun = " + now);
return now;
}
public void testDefer(){
// 该代码会立即执行myFun()
Observable<String> ob1 = Observable.just(myFun());
// 该代码会在产生订阅时,才会调用myFun(), 可类比Java8的Supplier接口
Observable<String> ob2 = Observable.defer(() -> Observable.just(myFun()) );
}
fromCallable :产生一个延迟创建的Observable,简化的defer方法。Observable.fromCallable(() -> myFun()) 等同于Observable.defer(() -> Observable.just(myFun()) );
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//onAssembly默认直接返回ObservableCreate
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) {... } catch (Throwable e) {... }
}
//ObserverableCreate实现的subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent); //source是ObservableOnSubscribe,即我们写的生产元素的代码
} catch (Throwable ex) {...}
}
public void onNext(T t) {
if (t == null) {...}
if (!isDisposed()) { observer.onNext(t); }
}
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
//ObservableMap的subscribeActual
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
@Override
public void onNext(T t) {
if (done) {return; }
if (sourceMode != NONE) { actual.onNext(null);return;}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {...}
actual.onNext(v);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//observeOn(Scheduler) 返回ObservableObserveOn(继承自Observable)
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
// Observable的subscribe方法最终会调用到ObservableObserveOn.subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//创建一个ObserveOnObserver包装了原观察者、worker,把它订阅到source(原observable)
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
public void onNext(T t) {
if (done) { return; }
if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t);}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); //this是ObserveOnObserver,他同样实现了Runable
}
}
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal(); //最终会调用actual.onNext(v) , 即调用被封装的下游观察者,v是emmiter
}
}
处理UI事件
异步响应和处理IO结果
事件或数据 是由无法控制的生产者推送过来的
组合接收到的事件
Observable.create(emitter->{
new Thread(()->emitter.onNext("a1")).start();
new Thread(()->emitter.onNext("a2")).start();
})
Observable ob1 = Observable.create(e->new Thread(()->e.onNext("a1")).start());
Observable ob2 = Observable.create(e->new Thread(()->e.onNext("a2")).start());
Observable ob3 = Observable.merge(ob1,ob2);