1、熔断器的解释
熔断器(circuit breaker)是借鉴了电气元件中的词语。在电气元件中,当电路设备的负载过高时,熔断器就会自动断开,从而保护冰箱和电视机等电气设备不至于烧坏。在微服务可靠性中,是指当服务A依赖的B服务不可用时,服务A可以采取不依赖于B的做法,提供有损的服务,从而降低B服务不可用对于整个业务的影响。
如下图所示。在步骤1)- 3)中,当熔断器发现服务B两次调用都超时时,判定服务B已经不可用了,于是,熔断器打开,服务A调用服务B时,将不再请求服务B,而是由熔断器返回一个降级方案的结果。关于降级,可以是某个默认的兜底结果。接下来,在步骤4) - 5)中,熔断器会主动向服务B发送PING命令,从而判断服务B是否已经恢复,如果恢复了,则熔断器断开,向服务B发出请求。

那么,我们为什么要考虑熔断呢?
熔断是与微服务相关的。在微服务中,每个服务都拆分得很细,因此,一个产品逻辑会涉及到调用多个外部服务。比如,我们的服务B1依赖于C1和C2,给A1和A2提供服务。
如果服务C1出现问题,比如,fullgc严重,导致对外响应很慢。那么,服务B1就会有很多线程处于等待的状态,耗尽线程池资源。那么,会导致对于C2的请求也出现问题。那么,就会导致我们的服务B1无法正常对外提供服务了。如果服务A1和A2都没有采取保护措施,也会导致服务A1和A2不可用。如下图所示。
->
->

即某个依赖的服务出现问题,会导致整个链路上游的服务都出现问题,称之为服务雪崩效应。
通过熔断,可以更好的保护我们自身的服务。比如,我们的服务B1当发现服务C1总是超时时,则放弃对于服务C1的依赖,线程不再一直等待,而是返回一个降级结果。这样,防止了问题的雪崩效应。
->

2、使用hystrix提供的熔断器
根据hystrix在github上的介绍,
Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.
意思是,hystrix是一个容错、容延迟的工具包,用来隔离本地服务对于远程服务和第三方包的依赖,提高系统的稳定性,防止连锁灾难发生。更进一步的解释是,hystrix可以用来防止外部依赖超时或者外部依赖错误对于本地系统的影响。
虽然开源的hystrix不再维护,但是,作为一个著名的容错降级的工具,还是值得看一看的。下面看一个hystrix提供的熔断的例子。
2.1 使用hystrix
1)引入jar包
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.4</version>
</dependency>
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-core</artifactId>
<version>0.20.7</version>
</dependency>
rxjava-core提供了事件驱动的编程能力,封装了观察者模式,翻遍用户采用异步编程的方式。hystrix的实现依赖于rxjava-core这个包。
2)假设存在一个远程服务
public class RemoteServiceSimulator {
private long wait;
public RemoteServiceSimulator(long wait) {
this.wait = wait;
}
public String execute() throws InterruptedException {
Thread.sleep(wait);
return "success";
}
}
3)接下来,是本地的服务,调用远程服务。
在本地服务中,有两个值得注意的地方。
- 本地服务需要继承 HystrixCommand
,覆盖其run()方法和getFallback()方法。run()方法是正常的业务逻辑,getFallback()是降级业务逻辑。 - 需要有一个构造器,来调用父类的有参构造器。因为,父类中不存在无参构造器。
可见,使用hystrix的熔断器还是很简单的。只需要继承 HystrixCommand 并覆盖指定的方法就可以了。
public class ClientServiceCommand extends HystrixCommand<String> {
private RemoteServiceSimulator remoteServiceSimulator;
public ClientServiceCommand(Setter config, RemoteServiceSimulator remoteServiceSimulator) {
super(config);
this.remoteServiceSimulator = remoteServiceSimulator;
}
@Override
protected String run() throws Exception {
return remoteServiceSimulator.execute();
}
@Override
protected String getFallback() {
return "fallback";
}
}
4)测试类。
public class CircuitBreakerCommandTest {
@Test
public void RemoteServiceCommandTest() throws InterruptedException {
HystrixCommand.Setter config = HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceCircuitBreakerGroup"))
// 设置客户端调用服务端的超时时间
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000))
// 设置隔离的线程池的参数
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(1).withMaxQueueSize(1).withQueueSizeRejectionThreshold(1))
// 设置断路器的特性。开启断路器功能,设置断路器4000ms后自动关闭,在线程上进行隔离
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withCircuitBreakerEnabled(true)
.withCircuitBreakerRequestVolumeThreshold(1).withCircuitBreakerSleepWindowInMilliseconds(4000)
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD));
// 前两个因为服务端超时失败,导致断路器打开,第三个也走了降级
assertThat(new ClientServiceCommand(config, new RemoteServiceSimulator(2000)).execute(), equalTo("fallback"));
assertThat(new ClientServiceCommand(config, new RemoteServiceSimulator(2000)).execute(), equalTo("fallback"));
assertThat(new ClientServiceCommand(config, new RemoteServiceSimulator(500)).execute(), equalTo("fallback"));
Thread.sleep(5000);
// 5000ms后断路器自动关闭,走了远程方法调用
assertThat(new ClientServiceCommand(config, new RemoteServiceSimulator(500)).execute(), equalTo("success"));
}
}
在测试类代码中,主要是调用 ClientServiceCommand 的方法。在测试类代码中,存在很多hystrix熔断器和线程池的配置,是为了方便测试,这些配置在实际使用时应该随着本地服务一起配置的。
2.2 hystrix的属性配置
hystrix的属性配置大致可以分为:3个key的配置和2个properties的配置。
- 3个key的配置
commandKey:该命令的key。我们知道hystrix使用命令模式,每个业务方法都是一个命令。因此,该commandKey标志了唯一该命令(业务方法)的唯一性。默认会取被注解的方法名
groupKey:一组command的集合对应的key,便于分组管理不同的command。默认取类的名字。
threadPoolKey:用来标志一个线程池。如果没有配置,则取groupKey。因此,若不配置,则同一个类内的方法在共用同一个线程池。
- 2个properties的配置
commandProperties: 包括隔离策略的配置、统计器的配置和熔断器的配置。
threadPoolProperties:若隔离策略为线程池隔离,则需要设置线程池的属性,包括coreSize(核心线程池大小),maximumSize(线程池中线程的最大数量,默认值是 10),maxQueueSize(作业队列的最大值)
关于属性配置,具体见 https://zhenbianshu.github.io/2018/09/hystrix_configuration_analysis.html
2.3 hystrix提供的功能
上面看完了hystrix提供的熔断器使用方式,那么,hystrix还提供哪些功能呢?hystrix提供的功能有:
1)请求熔断
当hystrix客户端请求后端服务失败次数比例超过一定的阈值(默认50%)时,熔断器就会打开,之后的请求就不会请求后端服务了。当熔断器打开超过一定之间后(默认5s),就会切换到半开路的状态。断路器的状态变化如下图所示。处于半开的熔断器,会对服务进行ping,如果服务端返回成功,说明服务端已经恢复正常,则熔断器关闭;否则,熔断器切换到打开,等待下一个时间窗。
2)服务降级
熔断器打开的时候,一般就会走服务降级了。服务降级的结果可以是用户实现的getFallback()方法或者来自于缓存的结果。
3)资源隔离
hystrix中,可以通过多个线程池来实现资源隔离。比如,线程池A和线程池B,各50个线程。线程池A中有远程调用x,线程池B中有远程调用y。当远程调用x大量超时或阻塞,耗尽了线程池A中的线程时,只会影响线程池A中的其他调用,不会影响线程池B中的远程调用。
4)请求结果缓存
大致的意思就是,对于请求参数相同的远程调用,hystrix会对请求结果进行缓存。比如,请求userId=1第一次进来时,拿到结果后,就会进行缓存。当下次userId=1再次进来时,就会从缓存中读取结果。
注意:缓存的有效期需要用户自己考虑。如果缓存有效期太长,当服务端数据更改后,则返回给客户端的数据会有延迟
5)请求合并
如果我们调用一个远程服务N次,那么,hystrix会做优化,改为批量调用远程服务。当然,批量调用远程服务的方法,需要用户自己实现。
3、hystrix熔断器的实现和设计原则
hystrix的运行流程如下图所示。
(图片来自于网络)
1)用户继承HystrixCommand,创建一个Command,需要实现run()和getFallback()方法。或者继承HystrixObservableCommand,实现construct()方法。hystrix使用命令模式来处理用户的请求,即用户的每个业务操作,都需要封装成一个Command提交给hystrix处理。
public class ClientServiceCommand extends HystrixCommand<String> {
private RemoteServiceSimulator remoteServiceSimulator;
public ClientServiceCommand(Setter setter, RemoteServiceSimulator remoteServiceSimulator) {
super(setter);
this.remoteServiceSimulator = remoteServiceSimulator;
}
@Override
protected String run() throws Exception {
return remoteServiceSimulator.execute();
}
@Override
protected String getFallback() {
return "fallback";
}
}
2)在运行时,HystrixCommand的execute()是同步方法,实际上会调用其queue()这个异步方法。而最终,他们都依赖于toObservable()这个方法。如图中的蓝色虚线箭头所示。
3)接下来,会去hystrix缓存中取数据。若缓存中存在数据,则返回;否则,进入下一步。见如下代码。
final boolean requestCacheEnabled = isRequestCachingEnabled(); // 是否开启了缓存并且存在cache key
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
// 缓存的数据不为空
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
代码中会判断是否开启了缓存功能,并且cacheKey不为空,则会尝试去缓存中取数据。
4)查看熔断器是否打开。若熔断器已经打开,则调用用户实现的getFallback()这个降级方法。用户实现了降级方法,则返回降级结果;若没有实现,则抛出异常UnsupportedOperationException(“No fallback available.”)
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) { // 熔断器是否允许该请求,allowRequest里面会判断熔断器是否打开
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
...
}
}
circuitBreaker用来判断熔断器是否打开。
5)若熔断器未打开,则看信号量或者线程池是否会拒绝请求进入。若拒绝请求,则走降级方法
6)若信号量或者线程池不拒绝请求,则执行业务逻辑方法。
@Override
public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
// 在线程池中执行提交的命令(业务逻辑)
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
7)执行业务逻辑方法时,无论执行结果成功还是失败,都会被记录下来,作为熔断器是否打开的依据。
若执行失败,或者其他原因,都会调用getFallbackOrThrowException()这个方法。方法可以传入不同的事件类型,标志着不同的原因,比如,超时事件、执行失败、熔断器熔断、线程池拒绝。addEvent()方法中,会对不同的事件类型进行计数。
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType,
final String message, final Exception originalException) {
final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
// record the executionResult
// do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
executionResult = executionResult.addEvent((int) latency, eventType); // 将该次失败事件进行统计
...
}
失败时,不同的事件类型如下图所示。

若执行成功,则调用executeCommandAndObserve方法,将成功的次数加1。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); // 记录成功调用次数
circuitBreaker.markSuccess();
}
}
};
...
}
8)若业务方法执行失败或者超时,则走fallback逻辑。
降级时的计数,也是调用getFallbackOrThrowException()这个方法
9)若业务逻辑执行成功,则返回正常结果。
下面来看看hystrix是怎么做线程池隔离的。
用户必须实现一个有参的构造函数,构造函数里面具有一个Setter配置。
public ClientServiceCommand(Setter setter, RemoteServiceSimulator remoteServiceSimulator) {
super(setter); // 调用父类的构造函数来初始化
this.remoteServiceSimulator = remoteServiceSimulator;
}
在使用构造函数创建对象的时候,就会调用到如下的代码。
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
// 这里会初始化线程池。
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
最终会调用如下代码。
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
// 若threadPools不包含对应的key(这里的key就是Command配置所用到的groupKey),说明之前没有创建该key对应的线程池,那么,就创建一个线程池,保存在ConcurrentHashMap中
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
// 若threadPools包含对应的key,说明之前已经创建该key(group)对应的线程池,那么,就从ConcurrentHashMap取出来返回
return threadPools.get(key);
}
从上面的代码中可以看出,在初始化Command时,就会创建对应的线程池,或者采用已经存在的线程池。group和线程池的对应关系保存在一个ConcurrentHashMap中,根据group key来对应不同的线程池,即如果commandA和commandB使用同一个group key,则他们共用同一个线程池。
总结
之前讲过限流,这篇文章讲熔断。限流和熔断都是为了保障我们自己的服务的可用性。当我们的服务有限流时,就不会受到某个客户端的突发流量的影响;当我们的服务有熔断时,我们依赖的服务发生问题,不会对我们的服务造成影响,不会引发雪崩效应。限流和熔断结合,保证了我们的调用方和我们的依赖方发生问题,都不会影响我们自身的服务。
参考
- http://blog.didispace.com/spring-cloud-hystrix-request-collapse/
- https://www.jianshu.com/p/cda7c0366089
- http://www.10tiao.com/html/175/201812/2653550514/1.html
- https://chenyongjun.vip/articles/90?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
- https://segmentfault.com/a/1190000005988895
- https://www.baeldung.com/introduction-to-hystrix