正如硬币总是有两面,分布式系统让我们不再受单机性能和单点失败的限制的同时,也提高了系统的复杂性和失败的概率。就一台机器而言,可能能够无故障地持续运行一年,然而对一个庞大的集群来说,很多时候都处在有机器故障的状态。而比起机器本身,网络或者应用本身出现问题的概率往往更大。
另外,以一条典型的微服务调用链来说,如果下游故障导致无法对上游的请求作出响应,它的上游服务的特定请求都会被挂起直到超时,在请求比较多的时候,甚至会耗尽本来没有故障的上层服务的资源,从而使得上层服务也发生故障,层层递进,最后导致整个系统崩溃。
举个具体点的例子,一个依赖 30 个微服务的应用,我们假设它依赖的服务都有不错的可用性: 99.99%。如果我们不对系统的调用做任何的弹性设计,那么算下来这个应用的可用性只有 99.7%,每个月这个服务都会有两小时的不可用时间。
因此,为了提高自身服务的可用性,防止连锁失败,在通过网络调用其他服务的时候,需要考虑到下游服务故障的情况,并做相应的弹力 (Resiliency) 设计。
弹力也可以被认为是容错性,也即是系统容忍错误的能力。在这方面,有一些经典的设计模式,包括:
本篇文章要介绍的 Hystrix 正是 Netflix 应用了这些模式设计出的为不可靠的远程调用以及第三方库赋予弹性的库。在失败不可避免的复杂分布式环境中,它能够有效地阻止连锁故障,提高系统的可用性。
Hystrix 在 2012 年末由 Netflix 作为他们对分布式系统弹性设计的成果开源,见 Introducing Hystrix for Resilience Engineering[1]。在开源后,Hystrix 也加入到 Spring Cloud Netflix 中,得到了广泛的使用。
Hystrix 具备如下特性:
在 2018 年,Netflix 宣布 Hystrix 正式进入维护阶段,不再添加新的功能,并推荐新的项目使用受 Hystrix 启发,更加轻量、更多采用函数式调用的 resilience4j[2]。Netflix 自身则转向研究根据应用的实时性能适应性调整的concurrency-limits[3](这个项目最后一次更新在 2019 年,基本可以认为是停止了)。
然而,Hystrix 作为成熟且经过生产检验的弹性调用库,并不会因此失去价值。它的思想和设计被其他许多工具采用,使用上也十分稳定。在瓴岳现有的服务中,也有很多 Hystrix 的使用场景,在使用中也针对我们的需求对 Hystrix 进行了一些扩展。
接下来,我们先从 Hystrix 的概念和原理出发,并讲述我们对 Hystrix 进行的扩展以及使用经验。
Hystrix 的执行流程在 Hystrix: How it Works[4] 上有很详细的说明,这里简单复述一下以便下文展开,对 Hystrix 执行流程已经很熟悉的同学可以跳过。
当一个请求进入,并调用通过 Hystrix 进行保护的下游依赖,Hystrix 的执行流程如下:
构建 HystrixCommand
或 HystrixObservableCommand
。
Hystrix 中采用 Command Pattern[5] 来包装对下游依赖的请求。在 Command 的实现中包含对下游进行调用的逻辑,而每一次调用则会构建一个新的 Command 实例。
根据调用产生的结果是单个还是多个,用户可以选择是继承 HystrixCommand
还是 HystrixObservableCommand
。
执行上一步创建的 Command 实例。
根据 Command 的类型 (HystrixCommand
/ HystrixObservableCommand
) 以及执行方式 (同步 / 异步 / 即时 / 延后),选择如下方法中的一种来执行 Command:
execute()
: 仅适用于 HystrixCommand
,同步阻塞执行,并返回下游返回的响应或抛出异常,是 queue().get()
的 shortcut。queue()
: 仅适用于 HystrixCommand
,异步执行,返回 Future
对象,是 toObservable().toBlocking().toFuture()
的 shortcut。observe()
: 订阅发送响应的 Observable
对象,并返回该 Observable
的镜像。(即时执行)toObservable()
: 返回发送响应的 Observable
对象,仅当用户主动订阅后才开始执行具体逻辑并返回响应。(延时执行)可以发现,其实四种执行方法都是基于 Observable
的实现。
如果对 Observable
不了解的话,可以看下 ReactiveX 相关的资料,或者其在 Java 中的实现 RxJava,其在 Hystrix 的实现中被大量使用。简单来说,是基于数据流的发布-订阅模式,Observable
则是一个可供订阅的发布源。
K value = command.execute();
Future<K> fValue = command.queue();
Observable<K> ohValue = command.observe(); //hot observable
Observable<K> ocValue = command.toObservable(); //cold observable
判断是否有启用响应缓存。
如果有可用的缓存,将在这一步直接返回缓存中的值。
判断断路器是否打开。
若断路器打开,则不会继续执行 Command,而是直接去尝试获取 Fallback。
若断路器关闭,则继续执行。
判断线程池 / 排队队列 / 信号量 是否已经被占满。
根据 Command 采用的隔离策略 (后面会详细说),如果正在进行的请求数已满,则放弃执行,尝试获取 Fallback。
进行实际调用。
触发具体的调用实现:HystrixCommand.run()
或 HystrixObservableCommand.construct()
。如果调用超过了配置的超时时间,会抛出一个 TimeoutException
,随后和抛出其他除了 HystrixBadRequestException 的异常一样进入获取 Fallback 的流程。
对于具体执行调用并由于超时等原因阻塞住的线程,Hystrix 只能够尝试进行打断。但是由于大多数的 Java HTTP client 并不会响应 InterruptedException,可能线程还是会继续阻塞直到连接超时,导致线程池占满。因此,用户最好在代码中对 InterruptedException 进行处理,以及为 http 客户端配置合理的超时时间。
如果调用正常执行 (没有出现超时或异常),Hystrix 则在写日志、记录监控信息后返回。
计算线路健康程度
根据新取得的监控信息,判断是否要打开或关闭断路器。
获取 Fallback。
在上述提到的数种情况下不执行具体调用或者调用失败,Hystrix 均会尝试获取 Fallback 响应,也就是调用用户实现的 HystrixCommand.getFallback()
或 HystrixObservableCommand.resumeWithFallback()
。Fallback 顾名思义是一种降级的举措,所以用户尽量应该让这一步不会失败。如果恰巧获取 Fallback 也是网络调用,则需要通过 HystrixCommand
或 HystrixObservableCommand
再包一层。
如果用户没有实现 Fallback 方法或者 Fallback 本身抛出异常,则 Hystrix 会返回直接发送 onError
通知的 Observable
实例。下面是四种调用方式在没有实现 Fallback 或者 Fallback 抛出异常时的行为:
execute()
- 抛出异常queue()
- 返回 Future
,但是在调用 get()
方法时会抛出异常observe()
- 返回 Observable
实例,当被订阅时会马上结束并调用订阅者的 onError
方法toObservable()
- 同上通过 Observable
的方式,返回调用成功的响应。根据不同的调用方式,Observable
可能会被转换。
execute()
- 通过和 queue()
相同的方式获取 Future
对象,并通过调用 get()
来获取最底层的 Observable
发送的单个响应queue()
- 将 Observable
转化为 BlockingObservable
以转化为 Future
并返回observe()
- 订阅 Observable
使得执行马上开始,并返回一个在用户订阅后可以回放 Observable
的镜像 Observable
。toObservable()
- 返回原样的 Observable
,仅在用户进行 subscribe
之后才开始执行。HystrixCommand
和 HystrixObservableCommand
为 Hystrix 主要对外暴露的两个类,使用 Command Pattern 对可能有风险的函数功能(通常是通过网络调用下游服务)进行包装,为其提供错误及延迟容忍、性能数据收集、断路器以及隔离功能。
除了基本的调用逻辑外,它们还会包含对应的线程池、断路器、监控数据收集等部件。如果顺着执行流程全部展开讲的话未免过于冗长,而且不太清晰。在这里我们把 Hystrix 按照功能,划分为几个模块,分别展开说明,最后再来看 HystrixCommand
和 HystrixObservableCommand
的流程,会比较清晰。
其中,Caching 和 Collapser 相对其他四个模块不是那么重要,本文将不会展开细说,有兴趣的小伙伴可以自行了解。
Hystrix 是基于配置运行的,在各个阶段都有很多配置项,而且有不少需要根据线上的运行情况进行调整,例如超时时间,线程数等。如果使用不可实时改变的静态配置,那么调整起来会十分困难。因此,Hystrix 一大特性就是支持动态修改配置,在接口和使用方面都做了相应的支持。
默认情况下,它使用自家的 Archaius
作为动态配置源,不过用户可以切换为自己的动态配置源实现。
在最外层,Hystrix 根据配置的用途划分为几个配置类,例如 HystrixCommandProperties
, HystrixThreadPoolProperties
就分别对应 Command 相关的配置以及线程池相关的配置。里面为各个特定的配置都提供了 getter,返回的是泛型接口实例 HystrixProperty<T>
。这样的设计是提供了足够的灵活性,让配置提供方自由实现具体的配置获取逻辑。在默认实现中,就通过多层组合 (composite) 的方式实现了链式优先级动态配置。
以 HystrixCommandProperties
为例,其结构如图:
在获取配置时,会返回 HystrixProperty
接口的实例,具体的实现类为 ChainHystrixProperty
,底层为 ChainProperty
,其中以类似链表的形式连接几个 HystrixDynamicProperty
,对应同一个配置不同优先级的 key。 例如,hystrix.command.xxx.xxx
的优先级要比 hystrix.command.default.xxx.xxx
要高,仅在没有找到前者的时候会使用后者。 而 HystrixDynamicProperty
又是从 HystrixDynamicProperties
获取的,也就是上文提到的动态配置源,可在 HystrixPlugins
中进行配置,默认会使用 HystrixDynamicPropertiesArchaius
也就是基于 Archaius 的实现。
Archaius
本身也是支持动态修改配置的库,也有对 Spring 的支持,对 Spring Boot 应用来说,只要修改 Spring 的配置就能够修改 Archaius 的配置,可以实现大部分配置的线上实时修改。
不过,也不是所有的配置都支持动态修改,具体可以看下官方仓库 wiki 中的 configuration 章节。
断路是 Hystrix 除了隔离外的另一个重要功能。 默认情况下,当一段时间内有一定数量的请求失败和超时数量达到一定百分比,可以触发断路,之后一段时间内新进入的请求会直接失败,不会请求故障的下游服务。过去一段时间后,Hystrix 再通过放行一个请求的方式检查下游是否恢复正常,如果已恢复,则停止断路,放行后续请求,否则重复上述断路流程。
断路的功能不但能够帮助上游服务在下游故障时不浪费过多资源请求下游,可以快速返回,而且能够给下游留出一些喘息空间,有利于下游及时恢复。
在代码中,断路器会实现 HystrixCircuitBreaker
接口:
public interface HystrixCircuitBreaker {
// 是否允许执行
boolean allowRequest();
// 断路器是否处在打开状态
boolean isOpen();
// 在半打开状态,表示测试请求调用成功
void markSuccess();
// 在半打开状态,表示测试请求调用失败
void markNonSuccess();
// 在 Command 开始执行时调用
boolean attemptExecution();
}
在 Hystrix 代码中,断路器默认实现为 HystrixCircuitBreakerImpl
。上面所说的断路器行为,其实都是特指默认实现下的行为。如果用户有特定的需求,完全可以通过自己实现断路器达成。Command 只通过外部暴露的 allowRequest
和 isOpen
等部分方法和断路器实例交互,内部可以实现多种多样的逻辑。在我们的实践中,我们发现默认的断路器实现使用单个请求进行下游是否恢复健康的判断是不够全面的,有可能会碰到单个请求恰好成功,之后放行大量请求导致发生超时的尴尬情况。因此,我们实现了具备阶梯恢复特性的断路器用于替换流量较大的 Command 下的默认断路器实现。在此我们先介绍下默认实现。
在默认断路器实现下有三种断路器状态: CLOSED
, OPEN
和 HALF_OPEN
。 CLOSED
代表当前断路器关闭,请求正常通过;OPEN
则代表断路器打开,一定时间内请求不可通过;HALF_OPEN
代表断路器打开一段时间后,放行了一个请求到下游,待结果返回。 断路器会被并发调用,因此需要保证状态的转变是并发安全的。断路器使用 AtomicReference
保存当前的状态,当需要进行状态变更时使用 CAS 进行修改。
那么,断路器如何在不同的状态间进行调整呢?后面介绍线程池的时候,会提到过基于滑动窗口和桶的监控信息统计类 HystrixThreadPoolMetrics
,其实 Command 也有相似的统计类 HystrixCommandMetrics
,它们都是 HystrixMetrics
的实现类,机制非常相似。断路器会订阅 HystrixCommandMetrics
,在滑动窗口发生滚动的时候根据最新窗口内的请求量和成功率判断是否要将断路器的状态从关闭改为打开。
// 断路器订阅 HystrixCommandMetrics 的回调方法
public void onNext(HealthCounts hc) {
// 是否到达判断是否断路的最低请求量,否则跳过,因为请求量少的时候通过成功百分比来判断不准确
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
// 错误率没有达到上限,无需进行操作
} else {
// 错误率超过上限,通过 CAS 将断路器状态调整为 OPEN,并记录时间用于后续休眠时间的判断
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
假设这里由于错误率太高,断路器打开,那么在用户配置的休眠窗口内 (circuitBreakerSleepWindowInMilliseconds),它将持续拒绝进入的请求。过了这个窗口,则在下一个请求进入时将状态修改为 HALF_OPEN
,具体执行逻辑在 attemptExecution
方法内:
if (isAfterSleepWindow()) {
//only the first request after sleep window should execute
//if the executing command succeeds, the status will transition to CLOSED
//if the executing command fails, the status will transition to OPEN
//if the executing command gets unsubscribed, the status will transition to OPEN
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
return false;
}
这个请求在执行完成后,通过调用 markSuccess
或者 markNonSuccess
回调方法,决定断路器是关闭还是重新打开。
整体状态图如下:
到这里,断路器的默认行为与实现就比较清晰了。
在我们的实践中,我们发现默认的断路器实现有两个不足:
HystrixPlugins
里面的 EventNotifier
可以对断开的时间作通知,但是我们希望能够对恢复的事件也作通知,把断路器的状态变化接入到我们的预警系统中。对于第一个问题,我们实现了阶梯恢复的断路器 HystrixSteppingRecoverCircuitBreaker
。它在默认的断路器行为上做了扩展,在放行单个请求并成功后,进入阶梯恢复状态,根据百分比逐步放行越来越多的请求到下游。如果放行的请求成功率达到要求,则继续提高放行请求的百分比直到 100% 恢复。由于它基于成功率进行判断,因此对调用量有一定的要求,适合放在流量较大的接口使用。部分代码如下:
public class HystrixSteppingRecoverCircuitBreaker implements HystrixCircuitBreaker {
enum Status {
CLOSED,
OPEN,
HALF_OPEN_SINGLE,
HALF_OPEN_STEPPING; // 添加 STEPPING 状态
}
// ...
@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
}
if (status.get() == Status.HALF_OPEN_STEPPING) {
// 处于 STEPPING 状态时,放行当前 step 百分比的请求
return ThreadLocalRandom.current().nextFloat() < stepper.currentStep();
}
if (isAfterSleepWindow() && status.compareAndSet(Status.OPEN, Status.HALF_OPEN_SINGLE)) {
write(Status.OPEN, Status.HALF_OPEN_SINGLE);
return true;
}
return false;
}
@Override
public void markSuccess() {
// 在 HALF_OPEN_SINGLE 状态,放行单个请求成功后,进入 stepping 阶梯恢复状态
toSteppingIfSingle();
}
// ...
// 状态转换方法
private void onNext(HealthCounts healthCounts) {
if (status.get() == Status.HALF_OPEN_STEPPING && isSteppingRequestVolumeSufficient(healthCounts)) {
if (isSuccessRateSufficient(healthCounts)) {
if (stepper.nextStep() == 1) {
toCloseIfStepping();
}
return;
}
toOpenIfStepping();
}
if (isRequestVolumeSufficient(healthCounts)
&& !isSuccessRateSufficient(healthCounts)
&& status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
write(Status.CLOSED, Status.OPEN);
}
}
private void toSteppingIfSingle() {
if (status.compareAndSet(Status.HALF_OPEN_SINGLE, Status.HALF_OPEN_STEPPING)) {
resubscribe();
write(Status.HALF_OPEN_SINGLE, Status.HALF_OPEN_STEPPING);
}
}
private void toCloseIfStepping() {
if (status.compareAndSet(Status.HALF_OPEN_STEPPING, Status.CLOSED)) {
resubscribe();
stepper.resetStep();
circuitOpened.set(-1L);
write(Status.HALF_OPEN_STEPPING, Status.CLOSED);
}
}
private void toOpenIfStepping() {
if (status.compareAndSet(Status.HALF_OPEN_STEPPING, Status.OPEN)) {
stepper.resetStep();
circuitOpened.set(System.currentTimeMillis());
write(Status.HALF_OPEN_STEPPING, Status.OPEN);
}
}
// 实现 stepper,管理放行百分比
static class Stepper {
private AtomicInteger currentStepPos = new AtomicInteger(0);
private List<Double> steps;
// ...
private double currentStep() {
int index = currentStepPos.get();
return isStepEnd(index) ? 1 : steps.get(index);
}
private double nextStep() {
int index = currentStepPos.incrementAndGet();
if (isStepEnd(index)) {
return 1;
}
return steps.get(index);
}
private boolean isStepEnd(int index) {
return index >= steps.size() || steps.get(index) == 1;
}
private void resetStep() {
currentStepPos.set(0);
}
}
}
对于第二个问题,我们创建了一个 Observable
订阅流负责存储断路器状态变更的事件,在断路器状态发生变化时写入。在此之上实现 Callback 机制,可以让用户为某个 Command 注册 callback,对该 command 的事件进行订阅,并通过 composition 的方式实现多层订阅。
前面提到过,Hystrix 使用 Bulkhead 模式提供容错功能。简单来说,就是把系统依赖但是却互相不相关的服务调用所使用到的资源隔离开,这样在一个下游服务故障的时候,不会导致整个服务的资源都被占据。
Hystrix 为不同的应用场景提供两种隔离级别:Thread
和 Semaphore
。
线程隔离
其中,Netflix 中使用最广泛并且推荐使用的是 Thread
。Thread
隔离级别很好理解,就是让调用在另外的线程中执行,并且相关的调用都使用一个线程池中的线程。
这样做的好处如下:
然而,线程池的大小需要怎么设置呢?大多数场景下,默认的 10 个线程就能足够了。如果想要进一步调整的话,官方给出了一条简单有效的公式:
requests per second at peak when healthy × 99th percentile latency in seconds + some breathing room
峰值 qps * P99 响应时间 + 适当数量的额外缓冲线程
举个简单的例子,对于一个峰值每秒调用 30 次,p99 响应时间为 0.2s 的接口,可算出需要 30 * 0.2 + 4 = 10 个线程。不过在实际应用中还是需要具体情况具体分析,对于一些数值方差较大的接口,这个公式就不太适用了。
当然,线程隔离也不是银弹。业务线程将具体调用提交到线程池到执行完成,就需要付出任务排队、线程池调度、上下文切换的开销。Netflix 也考虑到这一点,并做了对应的测试。对于一个每秒被请求 60 次的接口,使用线程隔离在 P50、P90、P99 的开销分别为 0ms、3ms 和 9ms。
考虑到线程隔离带来的优势,这样的开销对于大多数的接口来说往往是可以接受的。
信号量隔离
但是,如果你的接口响应时间非常小,无法接受线程隔离带来的开销,且信任该接口能够很快返回的话,则可以使用 Semaphore
隔离级别。原因是使用信号量隔离自然就无法像线程隔离一样在出现超时的时候直接返回,而是需要等待客户端的阻塞结束。 在 Hystrix 中,command 的执行以及 Fallback 都支持使用 Semaphore
。将 execution.isolation.strategy
配置为 SEMAPHORE
即可将默认的 THREAD
隔离级别改为信号量隔离。根据接口的响应时间以及单位时间内的调用次数,你可以根据和计算线程数相似的方式计算出可允许并发执行的数量。
Command 在初始化时,会向 HystrixThreadPool.Factory
工厂类传入自身的 ThreadPoolKey (默认为 groupKey)。一个 ThreadPoolKey 对应一个线程池,工厂会先在 ConcurrentHashMap 缓存中检查是否已经创建,如果已经创建就直接返回,如果没有则进行创建。
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
String key = threadPoolKey.name();
// 若已创建过,则返回缓存的线程池
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// 尚未创建,进入同步块进行创建。这里的同步块主要是防止重复创建线程池
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
HystrixThreadPool
是一个简单的接口,有个获取 ExecutorService
的方法、rxJava 用到的 Scheduler 的方法另外就是一些记录 metrics 的方法。具体的实现类是 HystrixThreadPoolDefault
。它的实现比较简单,因为它把创建线程池的任务转交给 HystrixPlugins
中的 HystrixConcurrencyStrategy
。它主要负责的是为外部提供获取线程池、scheduler 的接口,以及根据配置变更对线程池做调整,记录线程池的监控信息。
/* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {
private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);
private final HystrixThreadPoolProperties properties;
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolMetrics metrics;
private final int queueSize;
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
// 通过 concurrencyStrategy 创建线程池,这是用户可扩展的插件
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
// ...
这里简单说一下 HystrixConcurrencyStrategy
。它是负责控制并发相关策略的插件,可以在 HystrixPlugins
中进行配置成自主实现的子类,默认系统实现为 HystrixConcurrencyStrategyDefault
。HystrixConcurrencyStrategy
中可重写的方法如下:
// 控制如何根据配置创建线程池
ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties);
// 控制如何创建阻塞队列
BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize);
// 控制是否对 Callable 进行处理,例如存储上下文信息
<T> Callable<T> wrapCallable(Callable<T> callable);
// 控制获取请求上下文
<T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv);
我们后续也有对它进行扩展,实现在业务请求线程和 Hystrix 执行线程之间传递链路追踪信息的功能。
HystrixThreadPoolMetrics
则会以基于配置的滑动窗口 + 桶的形式统计线程池的相关监控信息,并输出到用户配置的 HystrixMetricsPublisher 中。这里先略过,因为线程池的 metrics 只是一部分,后面还有其他的 metrics,它们的统计和输出方式都是相似的,可以后面一起说。
信号量的对应接口为 TryableSemaphore
,接口定义也很简单:
interface TryableSemaphore {
// 尝试获取信号量,若返回 true 为成功获取
public abstract boolean tryAcquire();
// 释放已经获得的信号量
public abstract void release();
// 获取已经占用的数量
public abstract int getNumberOfPermitsUsed();
}
它有两个实现类,一个是在 Command 使用信号量作为隔离级别是使用的 TryableSemaphoreActual
,还有一个是在 Command 使用线程作为隔离级别时使用的不作任何限制的 TryableSemaphoreNoOp
。这里简单说下 TrableSemaphoreActual
。它实现了可动态调整的 counter-based semaphore。因为它不需要阻塞,并且需要实现动态调整并发上限的功能,因此它没有使用 Java concurrent 包中已有的 Semaphore 实现,而是使用 AtomicInteger 自己实现了一个,代码也很简单,引入了一个可以动态变更的 HystrixProperty 作为上限,每次请求获取时自增 counter,如果达到上限则返回 false 并自减。
class TryableSemaphoreActual implements TryableSemaphore {
protected final HystrixProperty<Integer> numberOfPermits;
private final AtomicInteger count = new AtomicInteger(0);
@Override
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}
// ...
}
在 HystrixCommand
和 HystrixObservableCommand
运行期间,会产生许多运行数据如延迟、结果、排队时间等。这些数据单独来看或者是聚合起来都对用户了解系统的运行情况并做调整非常有用。下面是 Command 一边执行一边记录运行数据的示意图:
这些产生的 Metrics 在内存中存储一定时间,便于进行查询和导出。
Metrics 在顶层根据类别分为了三个类,用于更新以及获取监控数据。包括 HystrixCommandMetrics
, Hystrix ThreadPoolMetrics
以及 HystrixCollapserMetrics
。
它们底层实现相似,使用桶结构存放单位时间内的数据,并保存一定时间窗口内的桶,以此提供滑动时间窗口内的 metrics 统计。每过一个单位时间,会删除最老的桶,添加最新完成统计的桶,并新开一个桶统计下一个单位时间的数据。实现基于 RxJava 的 Observable 订阅流及其 window 函数,阅读起来会比较繁琐,并且逻辑已经比较清晰了,这里就不再展开说明。这里给出一张结构图,有兴趣的同学可以自行翻看源码。
尽管看起来很复杂,不过用户并不需要管这些,只需要实现 Hystrix 提供的 HystrixMetricsPublisher
即可按照自己的需求将运行数据导出。HystrixMetricsPublisher
是个抽象类,包含三个方法:
public abstract class HystrixMetricsPublisher {
// 返回用于上报 CommandMetrics 的 HystrixMetricsPublisherCommand
HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties);
// 返回用于上报 ThreadPoolMetrics 的 HystrixMetricsPublisherThreadPool
HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties);
// 返回用于上报 CollapserMetrics 的 HystrixMetricsPublisherCollapser
HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties);
}
分别返回用来上报不同 Metrics 的实现。不过其实点进去看这些接口的定义会比较 confused,因为里面都是只有一个 initialize
方法,例如 HystrixMetricsPublisherThreadPool
:
public interface HystrixMetricsPublisherThreadPool {
void initialize();
}
实际上,Hystrix 提供了多种 Metrics 订阅流供下游订阅,用户提供的实现类就是应该订阅这些流然后输出到外部源。获取流的 api 如下:
Class | Method | Return Type |
---|---|---|
HystrixCommandStartStream |
observe() |
Observable |
HystrixCommandCompletionStream |
observe() |
Observable |
HystrixThreadPoolStartStream |
observe() |
Observable |
HystrixThreadPoolCompletionStream |
observe() |
Observable |
HystrixCollapserEventStream |
observe() |
Observable |
HystrixRequestEventsStream |
observe() |
Observable |
在这些基础的流之上,Hystrix 也提供了一些常见的聚合流,直接提供大多数用户需要的聚合后的监控数据,如 RollingCommandEventCounterStream
。
用户需要做的就是在 HystrixMetricsPublisher
中初始化 HystrixMetricsPublisherCommand
等实现类,创建需要的流以及初始化外部源。在 initialize 方法中,确认订阅关系,开始消费数据。
在实践中,我们实现了 HystrixMetricsPublisher
将 metrics 写入到 Influxdb 中,方便在 Grafana 图表中进行查看。
以导出 Command Metrics 的 HystrixPublisherCommand
为例:
public class HystrixYqgMetricsPublisherCommand implements HystrixMetricsPublisherCommand {
private final HystrixCommandKey commandKey;
private final HystrixCommandGroupKey commandGroupKey;
private final Observable<CachedHistograms> bucketedHistogram;
private final MetricsReporter metricsReporter;
public HystrixYqgMetricsPublisherCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties, MetricsReporter metricsReporter) {
this.commandKey = commandKey;
this.commandGroupKey = commandGroupKey;
this.metricsReporter = metricsReporter;
final int metricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get();
final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get();
final int bucketSizeInMs = metricWindow / numCounterBuckets;
final HystrixCommandCompletionStream stream = HystrixCommandCompletionStream.getInstance(commandKey);
this.bucketedHistogram = getBucketedHistogram(stream, bucketSizeInMs);
}
private Observable<CachedHistograms> getBucketedHistogram(final HystrixCommandCompletionStream stream, final int bucketSizeInMs) {
return stream
.observe()
.observeOn(Schedulers.computation())
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(bucket -> bucket.reduce(new Histograms(), this::addValuesToBucket))
.map(CachedHistograms::backedBy);
}
@Override
public void initialize() {
this.bucketedHistogram.subscribe(cachedHistograms ->
// metrics reporter 负责将数据写入到 Influxdb
metricsReporter.writeCommandMetrics(commandGroupKey.name(), commandKey.name(), cachedHistograms));
}
private Histograms addValuesToBucket(Histograms initialDistribution, HystrixCommandCompletion event) {
for (HystrixEventType eventType: HystrixEventType.values()) {
switch (eventType) {
case EXCEPTION_THROWN: break;
default:
initialDistribution.counters[eventType.ordinal()] += event.getEventCounts().getCount(eventType);
break;
}
}
if (event.didCommandExecute() && event.getTotalLatency() > -1 && event.getExecutionLatency() > -1) {
initialDistribution.totalLatency.recordValue(event.getTotalLatency());
initialDistribution.executeLatency.recordValue(event.getExecutionLatency());
initialDistribution.queueLatency.recordValue(event.getTotalLatency() - event.getExecutionLatency());
}
return initialDistribution;
}
}
在 Grafana 上查看服务中不同 Command 调用量的图表:
不过,你也可以直接使用 hystrix-metrics-event-stream[6] 来使用官方的 dashboard。
另外的一些细节信息可以在官方的 metrics wiki[7] 中找到。
到这里,Hystrix 的主要部件就讲得差不多了。结合上面所说的执行流程,相信能够对 Hystrix 的内部运行机制有一些了解。
在 Hystrix 的常见用法中,有直接使用,和同一技术栈下的 Feign[8] 结合使用,以及在 Spring Boot 中集成使用。
用户可以通过实现 HystrixCommand
或者 HystrixObservableCommand
并进行配置使用,最原始不过也很灵活。
例如:
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
// a real example would do work like a network call here
return "Hello " + name + "!";
}
}
String s = new CommandHelloWorld("Bob").execute();
Future<String> s = new CommandHelloWorld("Bob").queue();
Observable<String> s = new CommandHelloWorld("Bob").observe();
Feign 是 Netflix 下的模板化 http 工具,用户只需要编写接口和注解,feign 运行时生成对应的 http 调用代码,底层可以指定不同的客户端以及 load balancer 等。
通过 HystrixFeign[9] 模块,能够将具体的调用嵌套在 Hystrix command 中。以下是使用 HystrixFeign 构建具备 Hystrix 保护的客户端,并配置 command key、fallback 的使用样例:
GitHub github = HystrixFeign.builder()
.setterFactory(commandKeyIsRequestLine)
.target(GitHub.class, "https://api.github.com", fallback);
不过和 Feign 使用的时候需要注意错误处理的问题。对于一些非功能性的业务错误,需要包装为 HystrixBadRequestException,让 Hystrix 不要统计到错误中造成断路。
通过在项目中引入依赖 spring-cloud-starter-hystrix
,就能够方便地在 Bean 中使用 @HystrixCommand
注解令方法在 Hystrix 下执行,如
@Service
public class GreetingService {
@HystrixCommand(fallbackMethod = "defaultGreeting")
public String getGreeting(String username) {
return new RestTemplate()
.getForObject("http://localhost:9090/greeting/{username}",
String.class, username);
}
private String defaultGreeting(String username) {
return "Hello User!";
}
}
在前面有提到过,Hystrix 现在处于维护状态,新的用户可以转向类似的库 resilience4j。 Netflix 在 Hystrix 进入维护状态后开发的项目 concurrency limit[10],是适应性根据服务延时进行流控的库,在近几年都没有进行更新,很可能已经放弃了。不过对用户透明的流控以及适应性调整免去繁琐配置也将是我们努力的方向。
目前有一些基于 Sidecar 实现的流控解决方案,例如 Istio[11],能够对微服务架构下的服务间调用赋予弹性。尽管功能上可能没有 Hystrix 丰富,但是好处是简单透明。
本文从弹性调用出发,介绍了常用的 Hystrix 库的执行流程、主要部件功能与实现、常见使用模式等,结合了公司内部实践中对 Hystrix 进行的改造,希望能对理解 Hystrix 有所帮助。
[1]
Introducing Hystrix for Resilience Engineering: https://netflixtechblog.com/introducing-hystrix-for-resilience-engineering-13531c1ab362[2]
resilience4j: https://github.com/resilience4j/resilience4j[3]
concurrency-limits: https://github.com/Netflix/concurrency-limits[4]
Hystrix: How it Works: https://github.com/Netflix/Hystrix/wiki/How-it-Works[5]
Command Pattern: https://en.wikipedia.org/wiki/Command_pattern[6]
hystrix-metrics-event-stream: https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream[7]
metrics wiki: https://github.com/Netflix/Hystrix/wiki/Metrics-and-Monitoring[8]
Feign: https://github.com/OpenFeign/feign[9]
HystrixFeign: https://github.com/OpenFeign/feign/blob/master/hystrix[10]
concurrency limit: https://github.com/Netflix/concurrency-limits[11]
Istio: https://istio.io/latest/zh/docs/concepts/traffic-management/