什么是Spring Cloud Hystrix
在微服务架构中,我们将系统拆分为很多个服务,各个服务之间通过注册与订阅的方式相互依赖,由于各个服务都是在各自的进程中运行,就有可能由于网络原因或者服务自身的问题导致调用故障或延迟,随着服务的积压,可能会导致服务崩溃。为了解决这一系列的问题,断路器等一系列服务保护机制出现了。
Spring Cloud Hystrix 实现了断路器、线路隔离等一系列服务保护功能。
它也是基于 Netflix 的开源框架 Hystrix 实现的,该框架的目标在于通过控制那些访问远程系统、服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。
Hystrix 具备服务降级、服务熔断、线程和信号隔离、请求缓存、请求合并以及服务监控等强大功能。
一、快速入门
先搭建一个简单系统:
eureka-server 工程:服务注册中心,端口为8082。
hello-service 工程:HELLO-SERVICE 的服务单元,两个实例启动端口分别为 2221 和 2222.
ribbon-consumer 工程:使用 Ribbon 实现的服务消费者,端口为 3333
1、在 ribbon-consumer 工程的 pom.xml 的 dependency 节点中引入 spring-cloud-starter-hystrix 依赖:
1
2
3
4
5
6 1<dependency>
2 <groupId>org.springframework.cloud</groupId>
3 <artifactId>spring-cloud-starter-hystrix</artifactId>
4</dependency>
5
6
2、在 ribbon-consumer 工程的主类上使用 @EnableCircuitBreaker 注解开启断路器功能:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1@EnableCircuitBreaker
2@EnableDiscoveryClient
3@SpringBootApplication
4 public class DemoApplication {
5 @Bean
6 @LoadBalanced
7 RestTemplate restTemplate(){
8 return new RestTemplate();
9 }
10 public static void main(String[] args) {
11 SpringApplication.run(DemoApplication.class, args);
12 }
13}
14
15
还可以使用 Spring Cloud 应用中的 @SpringCloudApplication 注解来修饰主类,该注解的具体定义如下。可以看到,该注解中包含了上述所引用的三个注解,这意味着一个 Spring Cloud 标准应用应包含服务发现以及断路器。
3、改造服务消费方式,新增 HelloService 类,注入 RestTemplate 实例。然后,将在 ConsumerController 中对 RestTemplate 的使用迁移到 helloService 函数中,最后,在 helloService 函数上增加 @HystrixCommand 注解来指定回调方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1@Service
2public class HelloService {
3
4 @Autowired
5 RestTemplate restTemplate;
6
7 @HystrixCommand(fallbackMethod = "helloFallback")
8 public String helloService(){
9 return restTemplate.getForEntity("http://hello-service/index",
10 String.class).getBody();
11 }
12
13 public String helloFallback(){
14 return "error";
15 }
16}
17
18
4、修改 ConsumerController 类, 注入上面实现的 HelloService 实例,并在 helloConsumer 中进行调用:
1
2
3
4
5
6
7
8
9
10
11
12
13 1@RestController
2public class ConsumerController {
3
4 @Autowired
5 HelloService helloService;
6
7 @RequestMapping(value = "ribbon-consumer", method = RequestMethod.GET)
8 public String helloConsumer(){
9 return helloService.helloService();
10 }
11}
12
13
5、对断路器实现的服务回调逻辑进行验证,重新启动之前关闭的 2221 端口的 hello-service,确保此时服务注册中心、两个 hello-service 和 ribbon-consumer 均已启动,再次访问 http://localhost:3333/ribbon-consumer 可以轮询两个 hello-serive 并返回一些文字信息。此时断开其中任意一个端口的 hello-service,再次访问,当轮询到关闭的端口服务时,输出内容为 error ,不再是之前的提示信息。
二、原理分析
工作流程:
1、创建HystrixCommand或者HystrixObservableCommand对象。用来表示对依赖服务的请求操作。使用的是命令模式。
其中HystrixCommand返回的是单个操作结果,HystrixObservableCommand返回多个结果
2、命令执行:共有4中方法执行命令。
execute() : 同步执行,从依赖的服务里返回单个结果或抛出异常
queue():异步执行,直接返回一个Future对象
observe():返回observable对象,代表了多个结果,是一个Hot Observable
toObservable():返回Observable对象,但是是一个 Cold Observable
3、结果是否被缓存。如果已经启用了缓存功能,且被命中,那么缓存就会直接以Observable对象返回
4、断路器是否已打开。没有命中缓存,在执行命令前会检查断路器是否已打开。断路器已打开,直接执行fallback。断路器关闭,继续往下执行
5、线程池And信号量Or请求队列是否已被占满 如果与该命令有关的线程池和请求队列,或者信号量已经被占满,就直接执行fallback
6、执行HystrixObservableCommand.construct () 或 HystrixCommand.run() 方法。如果设置了当前执行时间超过了设置的timeout,则当前处理线程会抛出一个TimeoutyException,如果命令不在自身线程里执行,就会通过单独的计时线程来抛出异常,Hystrix会直接执行fallback逻辑,并忽略run或construct的返回值。
7、计算断路器的健康值。
8、fallback处理。
9、返回成功的响应。
断路器原理:
依赖隔离:
Hystrix 使用 “舱壁模式” 实现线程池的隔离,它为每一个依赖服务创建一个独立的线程池,就算某个依赖服务出现延迟过高的情况,也不会拖慢其他的依赖服务。
三、使用详解
1、创建请求命令
继承方式实现HystrixCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1public class UserCommand extends HystrixCommand<User> {
2
3 private RestTemplate restTemplate;
4
5 private Long id;
6
7 public UserCommand(Setter setter, RestTemplate restTemplate, Long id) {
8 super(setter);
9 this.restTemplate = restTemplate;
10 this.id = id;
11 }
12
13 @Override
14 protected User run() throws Exception {
15 return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
16 }
17
18}
19
20
通过上面实现的UserCommand,我们即可以实现请求的同步执行也可以实现异步执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1public User getUserById(Long id) {
2 HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("userKey");
3 com.netflix.hystrix.HystrixCommand.Setter setter = com.netflix.hystrix.HystrixCommand.Setter.withGroupKey(groupKey);
4 UserCommand userCommand = new UserCommand(setter, restTemplate, id);
5 // 同步执行获取结果
6// return userCommand.execute();
7 // 异步执行获取结果
8 Future<User> future = userCommand.queue();
9 try {
10 return future.get();
11 } catch (Exception e) {
12 logger.info("获取结果发生异常", e);
13 }
14 return null;
15 }
16
17
注解方式使用HystrixCommand
同步:
1
2
3
4
5
6 1 @HystrixCommand
2 public User findUserById(Long id) {
3 return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
4 }
5
6
异步:
1
2
3
4
5
6
7
8
9
10
11 1 @HystrixCommand
2 public Future<User> asyncFindUserFutureById(Long id) {
3 return new AsyncResult<User>() {
4 @Override
5 public User invoke() {
6 return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
7 }
8 };
9 }
10
11
响应执行
还可以将HystrixCommand通过Observable来实现响应式执行方式。通过调用observe()和toObservable()可以返回Observable对象。
1
2
3
4 1Observable<User> observe = userCommand.observe();
2Observable<User> observe = userCommand.toObservable();
3
4
前者返回的是一个Hot Observable,该命令会在observe调用的时候立即执行,当Observable每次被订阅的时候都会重放它的行为。
后者返回的是一个Cold Observable,toObservable()执行之后,命令不会被立即执行,只有当所有订阅者都订阅他之后才会执行。
HystrixCommand具备了observe()和toObservable()的功能,但是它的实现有一定的局限性,它返回的Observable只能发射一次数据,所以Hystrix提供了另外的一个特殊命令封装HysrtixObservableCommand,通过命令可以发射多次的Observable。
响应执行自定义命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 1public class UserObservableCommand extends HystrixObservableCommand<User> {
2
3 private RestTemplate restTemplate;
4
5 private Long id;
6
7 public UserObservableCommand (Setter setter, RestTemplate restTemplate, Long id) {
8 super(setter);
9 this.restTemplate = restTemplate;
10 this.id = id;
11 }
12
13 @Override
14 protected Observable<User> construct() {
15 return Observable.create(subscriber -> {
16 if (!subscriber.isUnsubscribed()) {
17 User user = restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
18 subscriber.onNext(user);
19 subscriber.onCompleted();
20 }
21 });
22 }
23}
24
25
响应执行使用注解@HystrixCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1@Service
2public class HelloService {
3
4 private static final Logger logger = LoggerFactory.getLogger(HelloService.class);
5
6 @Autowired
7 private RestTemplate restTemplate;
8
9 @HystrixCommand
10 public Observable<User> observableGetUserId(Long id) {
11 return Observable.create(subscriber -> {
12 if (!subscriber.isUnsubscribed()) {
13 User user = restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
14 subscriber.onNext(user);
15 subscriber.onCompleted();
16 }
17 });
18 }
19
20}
21
22
使用@HystrixCommand注解实现响应式命令,可以通过observableExecutionMode参数来控制是使用observe()还是toObservable()的执行方式。该参数有下面两种设置方式:
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER): EAGER是该参数的模式值,表示使用observe()执行方式。
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY): 表示使用toObservable()执行方式。
2.定义服务降级
fallback是Hystrix命令执行失败时使用得后备方法,用来实现服务降级处理逻辑。
继承方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1public class UserCommand extends HystrixCommand<User> {
2
3 private RestTemplate restTemplate;
4
5 private Long id;
6
7 public UserCommand(Setter setter,RestTemplate restTemplate,Long id){
8 super(setter);
9 this.restTemplate = restTemplate;
10 this.id = id;
11 }
12
13 @Override
14 protected User run() throws Exception {
15 return restTemplate.getForObject("Http://user-service/hello/user/{1}",User.class,id);
16 }
17
18 @Override
19 protected User getFallback(){
20 return new User();
21 }
22}
23
24
注解方式(含多次降级):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1
2public class UserService {
3
4 @Autowired
5 private RestTemplate restTemplate;
6
7 @HystrixCommand(fallbackMethod = "defaultUser")
8 public User getUserById(Long id){
9 return restTemplate.getForObject("Http://user-service/hello/user/{1}",User.class,id);
10 }
11
12 @HystrixCommand(fallbackMethod = "defaultUserSec")
13 public User defaultUser(){
14 //此处可能是另外一个网络请求获取,也有可能失败
15 return new User("First Fallback");
16 }
17
18 public User dedalutUserSec(String id,Throwable e){
19 return new User("Second Fallback");
20 }
21}
22
23
3.异常处理
异常传播
在HystrixCommand实现的run()方法中抛出异常时,除了HystrixBadRequestException之外,其他异常均会被Hystrix认为命令执行失败并触发服务降级的处理逻辑,所以当需要在命令执行中抛出不触发服务降级的异常时来选择它。
在使用注解配置实现Hystrix命令时,可以忽略指定的异常类型,只需要通过设置@HystrixCommand注解的ignoreExceptions参数,如下:
1
2
3
4
5
6 1 @HystrixCommand(fallbackMethod = "getDefaultUser", ignoreExceptions = NullPointerException.class)
2 public User findUserById(Long id) {
3 return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
4 }
5
6
异常获取
在继承实现Hystrix命令时,可以在getFallback()方法中通过getExecutionException()方法来获取具体的异常,然后通过判断来进入不同的处理逻辑。
在注解配置方式中,在fallback实现方法的参数中增加Throwable e对象的定义,这样在方法内部就可以获取触发服务降级的具体异常内容。
4.命令名称、分组和线程池划分
继承实现自定义命令
先调用了withGroupKey来设置命令组名,然后才通过调用andCommandKey来设置命令名。
还提供HystrixThreadPoolKey来对线程池进行设置,通过它可以实现更细粒度的线程池划分。
1
2
3
4
5
6
7 1public UserCommand(RestTemplate restTemplate, Long id) {
2 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GroupName"))
3 .andCommandKey(HystrixCommandKey.Factory.asKey("CommandName"))
4 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));
5 }
6
7
在没有指定HystrixThreadPoolKey的情况下,会使用命令组的方式来划分线程池。通常情况下,我们尽量使用HystrixThreadPoolKey来指定线程池的划分。 因为多个不同的命令可能从业务逻辑上来看属于同一个组,但是往往从实现本身上需要跟其他命令来进行隔离。
使用注解时只需要设置注解的commandKey、groupKey以及threadPoolKey属性即可,他分别表示了命令名称、分组以及线程池划分。
1
2
3
4
5
6 1@HystrixCommand(fallbackMethod = "getDefaultUser", ignoreExceptions = NullPointerException.class,commandKey = "findUserById", groupKey = "UserGroup", threadPoolKey = "findUserByIdThread")
2public User findUserById(Long id) {
3 return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
4}
5
6
5.请求缓存
在高并发的场景之下,Hystrix中提供了请求缓存的功能,可以很方便的开启和使用请求缓存来优化系统,达到减轻高并发时的请求线程消耗、降低请求响应时间的效果。
开启:实现HystrixCommand时重载getCacheKey()
清理:HystrixRequsetCache.clear()
1
2
3
4
5
6
7
8
9
10
11 1@Override
2public String getCacheKey() {
3 return String.valueOf(id);
4}
5 public static void flushRequestCache(Long id){
6 HystrixRequestCache.getInstance(
7 HystrixCommandKey.Factory.asKey("test"), HystrixConcurrencyStrategyDefault.getInstance())
8 .clear(String.valueOf(id));
9}
10
11
使用注解实现:
@CacheResult
@CacheRemove
@CacheKey
6.请求合并
在高并发情况下,通信次数的增加会导致总的通信时间增加,同时,线程池的资源也是有限的,高并发环境会导致有大量的线程处于等待状态,进而导致响应延迟,为了解决这些问题,我们需要Hystrix的请求合并。
1、假设微服务提供了两个接口,一个请求单个一个请求一个批量列表,服务消费端定义两个接口,一个是单个请求,第二个是批量请求,并且用restTemplate实现了远程调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1public class UserServiceImpl implements UserService {
2
3 @Autowired
4 private RestTemplate restTemplate;
5
6
7 @Override
8 public User find(Long id) {
9 return restTemplate.getForObject
10 ("http//user-service/users/{1}",User.class,id);
11 }
12
13 @Override
14 public List<User> findAll(List<Long> ids) {
15 return restTemplate.getForObject
16 ("http//user-service/users?ids={1}",List.class, StringUtils.join(ids,","));
17 }
18}
19
20
2、为请求合并实现一个批量请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1public class UserBatchCommand extends HystrixCommand<List<User>> {
2
3 UserService userService;
4 List<Long> userIds;
5
6 public UserBatchCommand(UserService userService,List<Long> userIds){
7 super(Setter.withGroupKey(asKey("userBatchCommand")));
8 this.userIds = userIds;
9 this.userService = userService;
10 }
11
12 @Override
13 protected List<User> run() throws Exception {
14 return userService.findAll(userIds);
15 }
16}
17
18
3、继承HystrixCollapser实现请求合并器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 1public class UserCollapseCommand extends HystrixCollapser<List<User>,User,Long> {
2
3 private UserService userService;
4
5 private Long userId;
6
7 public UserCollapseCommand(UserService userService,Long userId){
8 super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userBatchCommand")).andCollapserPropertiesDefaults(
9 HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)//设置时间延迟属性
10 ));
11 this.userService = userService;
12 this.userId = userId;
13 }
14
15 @Override
16 public Long getRequestArgument() {
17 return userId;//返回给定的单个请求参数
18 }
19
20 @Override
21 protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Long>> collection) {
22 List<Long> userIds = new ArrayList<>(collection.size());
23 userIds.addAll(collection.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
24 return new UserBatchCommand(userService,userIds);
25 }
26
27 @Override
28 protected void mapResponseToRequests(List<User> users, Collection<CollapsedRequest<User, Long>> collection) {
29 int count = 0;
30 for (CollapsedRequest<User,Long> collapsedRequest:collection){
31 User user = users.get(count++);
32 collapsedRequest.setResponse(user);
33 }
34 }
35}
36
37
通过注解形式来实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1@Service
2public class UserService {
3
4 @Autowired
5 RestTemplate restTemplate;
6
7 @HystrixCollapser(batchMethod = "findAll",collapserProperties = {@HystrixProperty(name="timerDelayInMilliseconds",value = "100")})
8 public User find(Long id){
9 return null;
10 }
11
12 @HystrixCommand
13 public List<User> findAll(List<Long> ids){
14 return restTemplate.getForObject("http://user-service/user?ids={1}",List.class, StringUtils.join(ids,","));
15 }
16
17
四、属性详解
继承方法:Setter对象设置
注解方法:command-Properties
类型:
Execution:控制HystrixCommand.run() 的如何执行
Fallback: 控制HystrixCommand.getFallback() 如何执行
Circuit Breaker: 控制断路器的行为
Metrics: 捕获和HystrixCommand 和 HystrixObservableCommand 执行信息相关的配置属性
Request Context:设置请求上下文的属性
Collapser Properties:设置请求合并的属性
Thread Pool Properties:设置线程池的属性
五、Hystrix仪表盘与Turbine集群监控
度量指标都是HystrixCommand和HystrixObservableCommand实例在执行过程中记录的重要信息,它们除了在Hystrix断路器实现中使用之外,对于系统运维的帮助也很大。
度量指标会以“滚动时间窗”与“桶”结合的方式进行汇总,并在内存中驻留一段时间,以供内部或外部查询使用,Hystrix仪表盘就是这些指标内容的消费者之一。
不管是监控单体应用还是Turbine集群监控,我们都需要一个Hystrix Dashboard,单独创建一个新的工程专门用来做Hystrix Dashboard。
具体搭建略
在实际应用中,我们要监控的应用往往是一个集群,这个时候我们就得采取Turbine集群监控了。Turbine有一个重要的功能就是汇聚监控信息,并将汇聚到的监控信息提供给Hystrix Dashboard来集中展示和监控。
具体搭建略