关注“之家技术”,获取更多技术干货
总篇175篇 2022年第49篇
1. 前言
在前后端分离和服务化的环境下,通常需要一个API网关统一对接C端、B端和运营等前端项目。这个API网关的主要作用有两个:
Spring Cloud Gateway 和 Netflix Zuul 都是这样一个API网关项目,能够按照业务规则实现请求的转发功能,也可以将统一鉴权很好的集成进去。最开始使用的 Zuul,和其他业务项目一样基于 Web Servlet 技术栈。随着 Spring 不断的升级,后来的项目也就从 Zuul 改成了 Spring Cloud Gateway。由于 Spring Cloud Gateway 构建在 Web Reactive技术栈之上,因此相关的业务就开始使用 WebFlux、WebClient 和 Reactor。
2. Reactor 是什么
Reactor 是一个类库,是响应式编程范式的一个实现,主要用来开发非阻塞、异步应用,是 Spring 响应式生态系统的基础,同样的类库还有 RxJava。响应式编程范式定义了一组接口,作为观察者设计模式的扩展。核心接口如下:
interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
interface Subscription {
void request(long n);
void cancel();
}
wechatWebClient.getUserInfo(suiteId,code)
.filter(it->!StringUtils.isEmpty(it.getCorpId()))
.switchIfEmpty(createUnauthorizedError("xxx"))
.doOnNext(token::setUserInfoResp)
.flatMap(it->wechatWebClient.getCorpInfo(suiteId,it.getCorpId()))
.filter(CorpInfoResp::isAuthenticated)
.switchIfEmpty(createUnauthorizedError("xxx"))
.doOnNext(it->token.setAgentId(it.getAuthorization().getAgentId()))
.flatMap(it->accountWebClient.getAccountByCorpId(it.getCorpId()))
.filter(it->!it.expired())
.switchIfEmpty(createUnauthorizedError("xxx"))
.doOnNext(it->token.setCompanyId(String.valueOf(it.getId())))
.thenReturn(RespBody.ok());
3. 为什么要使用 Reactor
使用 Reactor 的主要原因是能够提高资源的利用效率,减少硬件资源的占用。响应式编程能用更少的资源做更多的工作,用更少的实例满足更多的并发连接。
传统的开发方式使用阻塞代码编写程序,在出现性能问题之前,这种做法很好,一旦程序涉及大量的I/O操作(例如网络调用和数据库请求),就会浪费系统资源,因为很多线程在等待数据时处于空闲状态。并且并发越大,问题就越明显。
通过编写异步、非阻塞代码,可以有效的解决这些问题,然而JDK提供的异步API比较难用,因此也就催生了像 Reactor 这种响应式库。Reactor 这类响应式库与JDK的异步API相比,大大降低了异步编程的难度,同时还关注了代码的可组合性和可读性,并且可以使用丰富的运算符将数据作为流进行操作。
4. Reactor 实践
下面示例使用 Groovy 语法描述:
4.1
创建元素
简单的理解 Reactor 实际就两个核心类 Mono 和 Flux,他们都实现了 Publisher 接口,Mono 包含0-1个元素,Flux 包含0-N个元素,List 不一定要使用 Flux。
def mono = Mono.just(["foo", "bar", "foobar"])
.map({ it.collect { item -> item.toUpperCase() } })
mono.subscribe({ println("the mono data is $it") })
def flux = Flux.fromIterable(["foo", "bar", "foobar"])
.map({ it.toUpperCase() })
flux.subscribe({ println("the flux data is $it") })
the mono data is [FOO, BAR, FOOBAR]
the flux data is FOO
the flux data is BAR
the flux data is FOOBAR
just 传递的是对象,在声明时会立即初始化, defer 传递的是函数,只有在订阅后才执行,无论何时都应该优先使用 defer。
def data = "foo"
def pub1 = Mono.just(data)
data = "bar"
pub1.subscribe({ println("just data is $it") })
def pub2 = Mono.defer({ Mono.just(data) })
data = "foobar"
pub2.subscribe({ println("defer data is $it") })
just data is foo
defer data is foobar
4.2
转换元素
map 同步转换元素, flatMap 异步转换元素,前者直接在内存中操作,后者大多情况会有I/O。
Mono.just("foobar")
.map({ it.size() })
.subscribe({ println("map result is $it") })
Mono.just("foobar")
.flatMap({ Mono.just(it.size()) })
.subscribe({ println("flatMap result is $it") })
map result is 6
flatMap result is 6
4.3
附加行为
按照响应式编程范式,在订阅后正常会相继执行以下操作:
- Subscriber.onSubscribe()
- Subscription.request()
- Subscriber.onNext()
- Subscriber.onComplete()
doOn* 这组附加行为就是在执行到这些阶段时被调用,例如 doOnNext 是在执行完 Subscriber.onNext() 后触发
附加行为不改变序列的元素,是在订阅执行到某个阶段时,触发一些自定义的行为:
Flux.fromIterable([3, 4, 5])
.doOnNext({ println("init value is $it") })
.map({ it * 3 })
.doOnNext({ println("map value is $it") })
.doOnComplete({ println("flux completed") })
.subscribe()
init value is 3
map value is 9
init value is 4
map value is 12
init value is 5
map value is 15
flux completed
log 可以说是一个特殊的附加行为,可以输出订阅后每个阶段的执行过程:
Flux.fromIterable([3, 4, 5])
.log()
.map({ it * 3 })
.log()
.subscribe()
reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
reactor.Flux.MapFuseable.2 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
reactor.Flux.MapFuseable.2 - | request(unbounded)
reactor.Flux.Iterable.1 - | request(unbounded)
reactor.Flux.Iterable.1 - | onNext(3)
reactor.Flux.MapFuseable.2 - | onNext(9)
reactor.Flux.Iterable.1 - | onNext(4)
reactor.Flux.MapFuseable.2 - | onNext(12)
reactor.Flux.Iterable.1 - | onNext(5)
reactor.Flux.MapFuseable.2 - | onNext(15)
reactor.Flux.Iterable.1 - | onComplete()
reactor.Flux.MapFuseable.2 - | onComplete()
4.4
切换序列
then 结束上一个序列后,切换到下一个序列,两个序列没有必要的相关性时使用。比如执行完某项任务后返回一个成功。
Mono.just("foo")
.doOnNext({ println("do something $it") })
.then(Mono.just("success"))
.subscribe({ println("then result is $it") })
do something foo
then result is success
thenReturn 是 then 的简化操作。
Mono.just("bar")
.doOnNext({ println("do something $it") })
.thenReturn("success")
.subscribe({ println("then return is $it") })
do something bar
then return is success
switchIfEmpty 当前序列没有元素时,切换到另一个序列。
Flux.fromIterable([3, 4, 5])
.filter({ it > 5 })
.switchIfEmpty(Flux.just(6))
.map({ it + 1 })
.subscribe({ println("switchIfEmpty result is $it") })
switchIfEmpty result is 7
4.5
错误处理
error 抛出异常 onErrorResume 捕获异常做后续处理。
Flux.fromIterable([3, 4, 5])
.filter({ it > 5 })
.switchIfEmpty(Mono.error(new RuntimeException("value too small")))
.onErrorResume({
println("on error resume ${it.getMessage()}")
Flux.fromIterable([6, 7, 8])
})
.subscribe({ println("flux value is $it") })
on error resume value too small
flux value is 6
flux value is 7
flux value is 8
onErrorReturn捕获异常并返回值, 是 onErrorResume 的简化操作。Mono.defer({ Mono.just(null) })
.onErrorReturn(NullPointerException, "foobar")
.subscribe({ println("on error return $it") })
on error return foobar
onErrorMap 捕获异常并重新抛出另一个异常。Mono.defer({ Mono.just(null) })
.onErrorMap(NullPointerException, {
new RuntimeException("null pointer")
})
.doOnError({
println("error message ${it.getMessage()}")
})
.subscribe()
error message null pointer
4.6
传递上下文
在响应式编程中, 一个线程可以大致同时处理多个异步序列, 一个异步序列也可以从一个线程切换到另一个线程。像 ThreadLocal 这种允许将数据与线程相关联的方式,在响应式编程中是不适用的, Reactor 提供了一个高级特性 Context 来解决这个问题:
Mono.just("foo")
.flatMap({
Mono.subscriberContext()
.map({ ctx -> it + ctx.get("key") })
}).subscriberContext({ Context.of("key", "bar") })
.subscribe({ println("context result is $it") })
context result is foobar
Mono.just("foo")
.subscriberContext({ Context.of("key", "bar") })
.flatMap({
Mono.subscriberContext()
.map({ ctx -> it + ctx.getOrDefault("key", "") })
})
.subscribe({ println("context result is $it") })
context result is foo
Mono.just("hello")
.flatMap({
Mono.subscriberContext()
.map({ ctx -> it + " " + ctx.get("key") })
})
.subscriberContext({ it.put("key", "foo") })
.flatMap({
Mono.subscriberContext()
.map({ ctx -> it + ctx.get("key") })
})
.subscriberContext({ it.put("key", "bar") })
.subscribe({ println("context result is $it") })
context result is hello foobar
4.7
单元测试
Reactor 无法进行常规的断言,因此提供了专门的测试类库,使用 StepVerifier 测试场景。
when:
def mono = Mono.just("foo")
.map({ it + "bar" })
then:
StepVerifier.create(mono)
.expectNext("foobar")
.verifyComplete()
when:
def flux = Flux.just("thing1", "thing2")
.concatWith(Mono.error(new RuntimeException()))
then:
StepVerifier.create(flux)
.expectNext("thing1")
.expectNext("thing2")
.expectError()
.verify()
5. 使用 Reactor 的注意事项
5.1
代码为什么没有执行
下面 { it + "bar" } 不会被执行,操作符的链式调用每次都会返回一个新创建的对象。
def mono = Mono.just("foo")
mono.map({ it + "bar" })
mono.subscribe({ println("the result is $it") })
the result is foo
Mono.just("foo1")
.doOnNext({ item ->
Mono.fromSupplier({ println("do something $item") })
}).subscribe()
Mono.just("bar")
.doOnNext({ item ->
Mono.fromSupplier({ println("do something $item") })
.subscribe()
}).subscribe()
do something bar
Mono.just("foo2")
.flatMap({ item ->
Mono.fromSupplier({ println("do something $item") })
}).subscribe()
do something foo2
5.2
NULL问题
下面 onErrorReturn 是无效的,Mono.just() 不接收空值,在构建阶段就会抛出异常,而捕获异常是在订阅阶段发生的。
when:
Mono.just(null)
.onErrorReturn("foo")
.subscribe({ println("the result is $it") })
then:
thrown(NullPointerException)
when:
Mono.defer({ Mono.just(null) })
.onErrorReturn("bar")
.subscribe({ println("the result is $it") })
then:
noExceptionThrown()
the result is bar
Mono.justOrEmpty(null)
.log()
.subscribe()
Mono.empty()
.log()
.subscribe()
reactor.Mono.Empty.1 - onSubscribe([Fuseable] Operators.EmptySubscription)
reactor.Mono.Empty.1 - request(unbounded)
reactor.Mono.Empty.1 - onComplete()
reactor.Mono.Empty.2 - onSubscribe([Fuseable] Operators.EmptySubscription)
reactor.Mono.Empty.2 - request(unbounded)
reactor.Mono.Empty.2 - onComplete()
Mono.just("foo")
.map({ null })
.subscribe()
Caused by: java.lang.NullPointerException: The mapper returned a null value.
at java.util.Objects.requireNonNull(Objects.java:228)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2393)
5.3
通过异常信息无法定位问题
Reactor 默认没有启用调试功能,通过堆栈跟踪信息排查和定位问题比较困难。
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:481)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:414)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:364)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
下面是启用了调试功能的堆栈跟踪信息,可以明确是哪一行代码出现了问题。
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Flux.single(Flux.java:7887)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Error has been observed at the following site(s):
|_ Flux.single ⇢ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
|_ Mono.map ⇢ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
|_ Mono.filter ⇢ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Reactor 提供了三种用于增强调试信息的方法:
1.全局启用调试功能 Hooks.onOperatorDebug() 这是最简单的方式也是最慢的;
2.在调用链中使用 checkpoint() 手动激活堆栈跟踪;
3.使用生产级别的全局调试类库 reactor-tools。
6. 总结
在实践过程中,由于编程方式的不同,使用声明式编程不论是编码、调试和维护,相比命令式编程门槛要高一些。因此梳理了一些在使用过程中容易产生疑惑的问题,希望对其他初学者有所帮助。
作者简介
苏彬彬
■ 经销商技术部-商业资源团队
■ 2017年加入汽车之家经销商事业部,目前主要负责事业部内创新商业项目的研发工作,热衷于业内新技术的探索与实践。
阅读更多:
▼ 关注「之家技术」,获取更多技术干货 ▼