cover_image

Project Reactor 响应式编程的探索与实践

苏彬彬 之家技术
2022年10月19日 10:00

关注“之家技术”,获取更多技术干货

图片

总篇175篇 2022年第49篇


1. 前言 


在前后端分离和服务化的环境下,通常需要一个API网关统一对接C端、B端和运营等前端项目。这个API网关的主要作用有两个:

一是为所有前端项目提供统一的入口,然后按照规则转发到后端服务上,之所以多一层转发,是因为服务化后的项目与前端是多对多的关系,一个前端项目可能会访问多个后端的项目,一个后端项目也可能对多个前端项目提供服务。
二是对所有的前端项目进行统一鉴权,鉴权后将用户ID写入请求头并转发给后端项目,所有的后端项目都是无会话状态的,只需要根据ID对外提供接口。


Spring Cloud Gateway 和 Netflix Zuul 都是这样一个API网关项目,能够按照业务规则实现请求的转发功能,也可以将统一鉴权很好的集成进去。最开始使用的 Zuul,和其他业务项目一样基于 Web Servlet 技术栈。随着 Spring 不断的升级,后来的项目也就从 Zuul 改成了 Spring Cloud Gateway。由于 Spring Cloud Gateway 构建在 Web Reactive技术栈之上,因此相关的业务就开始使用 WebFlux、WebClient 和 Reactor。


2. Reactor 是什么 


Reactor 是一个类库,是响应式编程范式的一个实现,主要用来开发非阻塞、异步应用,是 Spring 响应式生态系统的基础,同样的类库还有 RxJava。响应式编程范式定义了一组接口,作为观察者设计模式的扩展。核心接口如下:

interface Publisher<T{

    void subscribe(Subscriber<? super T> s);
}

interface Subscriber<T{

    void onSubscribe(Subscription s);

    void onNext(T t);

    void onError(Throwable t);

    void onComplete();
}

interface Subscription {

    void request(long n);

    void cancel();
}
Reactor 实现了这组接口,同时还提供了丰富的操作符,用于快速构建  Publisher 。在业务开发中,主要使用 Reactor 操作符,按照数据流的形式组合起来实现业务逻辑,就像下面这样:
wechatWebClient.getUserInfo(suiteId,code)
        .filter(it->!StringUtils.isEmpty(it.getCorpId()))
        .switchIfEmpty(createUnauthorizedError("xxx"))
        .doOnNext(token::setUserInfoResp)
        .flatMap(it->wechatWebClient.getCorpInfo(suiteId,it.getCorpId()))
        .filter(CorpInfoResp::isAuthenticated)
        .switchIfEmpty(createUnauthorizedError("xxx"))
        .doOnNext(it->token.setAgentId(it.getAuthorization().getAgentId()))
        .flatMap(it->accountWebClient.getAccountByCorpId(it.getCorpId()))
        .filter(it->!it.expired())
        .switchIfEmpty(createUnauthorizedError("xxx"))
        .doOnNext(it->token.setCompanyId(String.valueOf(it.getId())))
        .thenReturn(RespBody.ok());


3. 为什么要使用 Reactor 

使用 Reactor 的主要原因是能够提高资源的利用效率,减少硬件资源的占用。响应式编程能用更少的资源做更多的工作,用更少的实例满足更多的并发连接。

传统的开发方式使用阻塞代码编写程序,在出现性能问题之前,这种做法很好,一旦程序涉及大量的I/O操作(例如网络调用和数据库请求),就会浪费系统资源,因为很多线程在等待数据时处于空闲状态。并且并发越大,问题就越明显。

通过编写异步、非阻塞代码,可以有效的解决这些问题,然而JDK提供的异步API比较难用,因此也就催生了像 Reactor 这种响应式库。Reactor 这类响应式库与JDK的异步API相比,大大降低了异步编程的难度,同时还关注了代码的可组合性和可读性,并且可以使用丰富的运算符将数据作为流进行操作。


4. Reactor 实践 


下面示例使用 Groovy 语法描述:


4.1

创建元素

简单的理解 Reactor 实际就两个核心类 Mono 和  Flux,他们都实现了 Publisher 接口,Mono 包含0-1个元素,Flux 包含0-N个元素,List 不一定要使用 Flux。

def mono = Mono.just(["foo""bar""foobar"])
        .map({ it.collect { item -> item.toUpperCase() } })

mono.subscribe({ println("the mono data is $it") })

def flux = Flux.fromIterable(["foo""bar""foobar"])
        .map({ it.toUpperCase() })

flux.subscribe({ println("the flux data is $it") })
the mono data is [FOO, BAR, FOOBAR]
the flux data is FOO
the flux data is BAR
the flux data is FOOBAR

just 传递的是对象,在声明时会立即初始化, defer 传递的是函数,只有在订阅后才执行,无论何时都应该优先使用 defer。

def data = "foo"
def pub1 = Mono.just(data)
data = "bar"

pub1.subscribe({ println("just data is $it") })

def pub2 = Mono.defer({ Mono.just(data) })
data = "foobar"

pub2.subscribe({ println("defer data is $it") })
just data is foo
defer data is foobar


4.2 

转换元素

map 同步转换元素, flatMap 异步转换元素,前者直接在内存中操作,后者大多情况会有I/O。

Mono.just("foobar")
        .map({ it.size() })
        .subscribe({ println("map result is $it") })

Mono.just("foobar")
        .flatMap({ Mono.just(it.size()) })
        .subscribe({ println("flatMap result is $it") })
map result is 6
flatMap result is 6


4.3

附加行为

按照响应式编程范式,在订阅后正常会相继执行以下操作:

- Subscriber.onSubscribe()

- Subscription.request()

- Subscriber.onNext()

- Subscriber.onComplete()


doOn* 这组附加行为就是在执行到这些阶段时被调用,例如  doOnNext 是在执行完  Subscriber.onNext() 后触发


附加行为不改变序列的元素,是在订阅执行到某个阶段时,触发一些自定义的行为:

Flux.fromIterable([345])
        .doOnNext({ println("init value is $it") })
        .map({ it * 3 })
        .doOnNext({ println("map value is $it") })
        .doOnComplete({ println("flux completed") })
        .subscribe()
init value is 3
map value is 9
init value is 4
map value is 12
init value is 5
map value is 15
flux completed

 log 可以说是一个特殊的附加行为,可以输出订阅后每个阶段的执行过程:

Flux.fromIterable([345])
        .log()
        .map({ it * 3 })
        .log()
        .subscribe()
reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
reactor.Flux.MapFuseable.2 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
reactor.Flux.MapFuseable.2 - | request(unbounded)
reactor.Flux.Iterable.1 - | request(unbounded)
reactor.Flux.Iterable.1 - | onNext(3)
reactor.Flux.MapFuseable.2 - | onNext(9)
reactor.Flux.Iterable.1 - | onNext(4)
reactor.Flux.MapFuseable.2 - | onNext(12)
reactor.Flux.Iterable.1 - | onNext(5)
reactor.Flux.MapFuseable.2 - | onNext(15)
reactor.Flux.Iterable.1 - | onComplete()
reactor.Flux.MapFuseable.2 - | onComplete()


4.4

切换序列

 then 结束上一个序列后,切换到下一个序列,两个序列没有必要的相关性时使用。比如执行完某项任务后返回一个成功。

Mono.just("foo")
        .doOnNext({ println("do something $it") })
        .then(Mono.just("success"))
        .subscribe({ println("then result is $it") })
do something foo
then result is success

thenReturn 是 then 的简化操作。

Mono.just("bar")
        .doOnNext({ println("do something $it") })
        .thenReturn("success")
        .subscribe({ println("then return is $it") })
do something bar
then return is success

switchIfEmpty 当前序列没有元素时,切换到另一个序列。

Flux.fromIterable([345])
        .filter({ it > 5 })
        .switchIfEmpty(Flux.just(6))
        .map({ it + 1 })
        .subscribe({ println("switchIfEmpty result is $it") })
switchIfEmpty result is 7


4.5

错误处理

error 抛出异常 onErrorResume 捕获异常做后续处理。

Flux.fromIterable([345])
        .filter({ it > 5 })
        .switchIfEmpty(Mono.error(new RuntimeException("value too small")))
        .onErrorResume({
            println("on error resume ${it.getMessage()}")
            Flux.fromIterable([678])
        })
        .subscribe({ println("flux value is $it") })
on error resume value too small
flux value is 6
flux value is 7
flux value is 8
onErrorReturn捕获异常并返回值, 是  onErrorResume 的简化操作。
Mono.defer({ Mono.just(null) })
        .onErrorReturn(NullPointerException, "foobar")
        .subscribe({ println("on error return $it") })
on error return foobar
onErrorMap 捕获异常并重新抛出另一个异常。
Mono.defer({ Mono.just(null) })
        .onErrorMap(NullPointerException, {
            new RuntimeException("null pointer")
        })
        .doOnError({
            println("error message ${it.getMessage()}")
        })
        .subscribe()
error message null pointer


4.6

传递上下文

在响应式编程中, 一个线程可以大致同时处理多个异步序列, 一个异步序列也可以从一个线程切换到另一个线程。像 ThreadLocal 这种允许将数据与线程相关联的方式,在响应式编程中是不适用的, Reactor 提供了一个高级特性 Context 来解决这个问题:

Mono.just("foo")
        .flatMap({
            Mono.subscriberContext()
                    .map({ ctx -> it + ctx.get("key") })
        }).subscriberContext({ Context.of("key""bar") })
        .subscribe({ println("context result is $it") })
context result is foobar
subscriberContext 的链是从下往上流动的, 所以需要先读取后写入。
Mono.just("foo")
        .subscriberContext({ Context.of("key""bar") })
        .flatMap({
            Mono.subscriberContext()
                    .map({ ctx -> it + ctx.getOrDefault("key""") })
        })
        .subscribe({ println("context result is $it") })
context result is foo
多次向 Context 中写入相同 Key 时, 按照顺序读取其下最接近的。
Mono.just("hello")
        .flatMap({
            Mono.subscriberContext()
                .map({ ctx -> it + " " + ctx.get("key") })
        })
        .subscriberContext({ it.put("key""foo") })
        .flatMap({
            Mono.subscriberContext()
                .map({ ctx -> it + ctx.get("key") })
        })
        .subscriberContext({ it.put("key""bar") })
        .subscribe({ println("context result is $it") })
context result is hello foobar


4.7

单元测试

Reactor 无法进行常规的断言,因此提供了专门的测试类库,使用 StepVerifier 测试场景。

when:
def mono = Mono.just("foo")
        .map({ it + "bar" })

then:

StepVerifier.create(mono)
        .expectNext("foobar")
        .verifyComplete()
下面使用 expectNext 断言元素,使用 expectError 断言异常,只有在调用 verify... 之后才执行验证,不调用不执行。
when:
def flux = Flux.just("thing1""thing2")
        .concatWith(Mono.error(new RuntimeException()))

then:

StepVerifier.create(flux)
        .expectNext("thing1")
        .expectNext("thing2")
        .expectError()
        .verify()


5. 使用 Reactor 的注意事项 


5.1

代码为什么没有执行


下面 { it + "bar" } 不会被执行,操作符的链式调用每次都会返回一个新创建的对象。

def mono = Mono.just("foo")

mono.map({ it + "bar" })

mono.subscribe({ println("the result is $it") })
the result is foo
下面 println() 不会被执行,Mono 不订阅不执行,最外层的 subscribe 不会传播到 doOnNext 的 Mono 里面,同理 doOn* 都不应该执行 Publisher 相关的代码。
Mono.just("foo1")
        .doOnNext({ item ->
            Mono.fromSupplier({ println("do something $item") })
        }).subscribe()
下面 println() 可以执行,但是对外层的订阅者来说,就丧失了对内的控制权,因此不建议有内部订阅。
Mono.just("bar")
        .doOnNext({ item ->
            Mono.fromSupplier({ println("do something $item") })
                .subscribe()
        }).subscribe()
do something bar
为了将订阅在内部传播下去,正确的方法应该使用  flatMap()
Mono.just("foo2")
        .flatMap({ item ->
            Mono.fromSupplier({ println("do something $item") })
        }).subscribe()
do something foo2


5.2

NULL问题


 下面 onErrorReturn 是无效的,Mono.just() 不接收空值,在构建阶段就会抛出异常,而捕获异常是在订阅阶段发生的。

when:
Mono.just(null)
        .onErrorReturn("foo")
        .subscribe({ println("the result is $it") })

then:

thrown(NullPointerException)
 下面 onErrorReturn 是有效的,这就是上文中提到的为什么要优先使用 defer 的原因,同样的异常会在订阅执行后被捕获。
when:
Mono.defer({ Mono.just(null) })
.onErrorReturn("bar")
.subscribe({ println("the result is $it") })

then:
noExceptionThrown()
the result is bar
如果没有明确的表示,所有的调用都不能传递  NULL,即便是可以传递空的 justOrEmpty 也是判断空后返回 Mono.empty(),下面的 justOrEmpty 和 empty 是等价的。
Mono.justOrEmpty(null)
        .log()
        .subscribe()

Mono.empty()
        .log()
        .subscribe()
reactor.Mono.Empty.1 - onSubscribe([Fuseable] Operators.EmptySubscription)
reactor.Mono.Empty.1 - request(unbounded)
reactor.Mono.Empty.1 - onComplete()
reactor.Mono.Empty.2 - onSubscribe([Fuseable] Operators.EmptySubscription)
reactor.Mono.Empty.2 - request(unbounded)
reactor.Mono.Empty.2 - onComplete()
因此 Reactor 的设计原则是不要在序列中传递空值,任何 NULL 参数都会抛出 NPE。
Mono.just("foo")
        .map({ null })
        .subscribe()
Caused by: java.lang.NullPointerException: The mapper returned a null value.
at java.util.Objects.requireNonNull(Objects.java:228)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2393)


5.3

通过异常信息无法定位问题


Reactor 默认没有启用调试功能,通过堆栈跟踪信息排查和定位问题比较困难。

Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:481)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:414)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:364)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)

下面是启用了调试功能的堆栈跟踪信息,可以明确是哪一行代码出现了问题。

Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Flux.single(Flux.java:7887)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Error has been observed at the following site(s):
|_ Flux.single ⇢ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
|_ Mono.map ⇢ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
|_ Mono.filter ⇢ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

Reactor 提供了三种用于增强调试信息的方法:

1.全局启用调试功能 Hooks.onOperatorDebug() 这是最简单的方式也是最慢的;

2.在调用链中使用 checkpoint() 手动激活堆栈跟踪;

3.使用生产级别的全局调试类库 reactor-tools。


6. 总结 


在实践过程中,由于编程方式的不同,使用声明式编程不论是编码、调试和维护,相比命令式编程门槛要高一些。因此梳理了一些在使用过程中容易产生疑惑的问题,希望对其他初学者有所帮助。


作者简介

图片


苏彬彬

 经销商技术部-商业资源团队

 2017年加入汽车之家经销商事业部,目前主要负责事业部内创新商业项目的研发工作,热衷于业内新技术的探索与实践。


图片

阅读更多:


▼ 关注「之家技术」,获取更多技术干货 

图片

修改于2022年10月19日
继续滑动看下一个
之家技术
向上滑动看下一个