流水账逻辑
(数据补全、通知逻辑等),这个时候我们通常会采用异步化的方式去处理从而加快响应速度。与此同时,伴随着上下游依赖的服务变多,对应的可能也会产生一系列的问题包括不限于问题难以排查
、性能难以保证
等。在闲鱼也不例外:补齐的串行操作
(这里指串行去实现多次网络io)存在性能瓶颈,严重影响到了接口rt。
逻辑单元化程度不好,逻辑"自由飞翔"导致代码可读性较差,单元逻辑相关指标也难以衡量。
可监控:每个单元逻辑生命周期内的所有状态和行为(成功,失败,rt等关键指标)都在监控范围内。
容灾性: 有fallback机制,当逻辑单元内出现大量异常(通常指的是超时)的时候能堪大用。
零成本接入:接入使用方便,做到拆箱即用。
该逻辑单元L执行时间为3
a任务执行成功1次耗时为1, b任务执行成功1次耗时为2, c任务执行成功1次耗时为3
该逻辑单元L执行时间为4
a任务执行成功1次耗时为3, b任务执行失败, c任务执行失败
b和c线程在失败后线程停止, 不继续占用线程池资源
超时场景
展开当前服务获取对应的LogicActor
LogicActor获取当前的所有UnitActor(a, b, c)
使用 tell
方式分别向a, b, c发消息, 注意这个过程是异步的
a正常返回, b和c均超时
不管超时与否LogicActor都会去countDown当前的任务
当所有任务都完成的时候就回去combine数据
最后返回结果数据
消息驱动.
不需要额外的线程池管理 & 异常容错.
优雅停止正在执行的worker(PoinsonPill).
熟悉akka的人可以较为容易的上手.
为了达到目标需要一层封装, 并且也不容易.
大大增加系统复杂度.
对于不熟悉scala/akka的人来说简直是灾难.
scala/akka的学习成本直接导致了"零成本接入"的目标无法达成, 故放弃.
定义三个task
通过rxjava去实现异步化处理
给一个固定大小为10的线程池去处理
reduce中去做累加操作
latch做一个统一卡点
这边的timeout起到了整体卡点的效果, 但是并不知道timeout是哪个业务导致的
响应式编程
代码量精简
屏蔽线程池的操作
封装了 #timeout
和 #onErrorReturn
方法, 已存在超时处理模块无需二度封装
为了达到目标需要一层封装
存在超时处理模块, 但是业务单元的植入会破坏rxjava原有的封装且同时改造难度较大
虽然满足"容灾性"( onErrorReturn
), 但是无法做到"可监控", 因为超时之后我们没有办法知道是哪个业务超时了, 故放弃.
集团内部针对全局的 traceId
进行封装(ThreadLocal), 使用线程池会 丢失上下文
, 导致对应的日志无法被追踪到.
存在 bizCode
概念, 方便监控逻辑单元以及逻辑单元内的每一个操作.
Callable
和 Runnable
执行超时时, 需要将其停止, 让其不继续占用线程池资源资源.CountDownLatch: java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)超时不会抛出异常只能通过boolean去判断是否超时, 意味着不能通过短路的异常流方式去处理.
CyclicBarrier: java.util.concurrent.CyclicBarrier#await(long, java.util.concurrent.TimeUnit)对于异常的封装并不友好, 例
现在有四个线程a, b, c, d. 超时时间均2s. 其中d线程执行了3s, 此时会发生下面的情况
第一个完成任务进入barrier.await的会抛出timeout异常, 同时其他四个任务都会抛出broken barrier
超时任务最后会完成后续的动作, 持续占用资源
Semaphore: 不满足使用条件, 不过多赘述.
Future: java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit), futureList会串行读取, 导致T>=Max(t1,t2,t3)
提交future到对应的list中
通过遍历去获取对应的结果
获取到结果之后去做累加的操作
最后输出结果和执行的时间
虽然上面三个任务是并发执行, 但由于 future.get(long,TimeUnit)
是阻塞, timeout会在此处失效. 如上图, 最后耗时可能是3+2+1=6s
.
ConcurrentCallable是对外使用的类
BizBaseCallable继承了Callable, 封装了内部的上下文和对应的业务单元
BizCountDownLatch为了解决业务超时监控的问题, 加上了业务单元池的属性
BizBaseCallable封装了ctx表示上下文, 保证在多线程的情况下上下文也不会丢失(针对集团内部的traceId的兼容)
对应的bizSet实际上是一个未完成的业务池子, 完成一个业务去掉对应的业务单元
重写countDown方法, 通过移除业务单元去表示 已完成
await超时的时候抛出异常, 模拟短路环境, 上层统一处理
超时异常中丢出还未完成的任务
监控逻辑
中包含了超时逻辑的处理Callable获取返回值的时候, get超时时长给 0
: 该完成的都完成了, 没完成的就超时了.
通过future.cancel去中断正在执行的callable
默认返回fallback(兜底操作), fallback默认实现为 renturnnull;
.
这里需要塞一个默认值, 这样可以方便外层调用的时候通过 list.forEach(l->deal(l))
去处理列表逻辑
4000L表示该任务的超时时间为4000ms, 该逻辑单元取名为"l"
往组件中添加任务a, b, c
最后展示了获取到数据之后的处理
a结束后会在 countDownLatch
中移除对应的单元
超时的时候会强行结束b和c
可监控
, 容灾性
, 零成本接入
的异步化组件.