0°

SpringCloud微服务知识整理五:服务容错保护 Spring Cloud Hystrix

什么是Spring Cloud Hystrix

在微服务架构中,我们将系统拆分为很多个服务,各个服务之间通过注册与订阅的方式相互依赖,由于各个服务都是在各自的进程中运行,就有可能由于网络原因或者服务自身的问题导致调用故障或延迟,随着服务的积压,可能会导致服务崩溃。为了解决这一系列的问题,断路器等一系列服务保护机制出现了。

Spring Cloud Hystrix 实现了断路器、线路隔离等一系列服务保护功能。
它也是基于 Netflix 的开源框架 Hystrix 实现的,该框架的目标在于通过控制那些访问远程系统、服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力
Hystrix 具备服务降级、服务熔断、线程和信号隔离、请求缓存、请求合并以及服务监控等强大功能。

一、快速入门

先搭建一个简单系统:
eureka-server 工程:服务注册中心,端口为8082。
hello-service 工程:HELLO-SERVICE 的服务单元,两个实例启动端口分别为 2221 和 2222.
ribbon-consumer 工程:使用 Ribbon 实现的服务消费者,端口为 3333
在这里插入图片描述
1、在 ribbon-consumer 工程的 pom.xml 的 dependency 节点中引入 spring-cloud-starter-hystrix 依赖:


1
2
3
4
5
6
1<dependency>
2  <groupId>org.springframework.cloud</groupId>
3  <artifactId>spring-cloud-starter-hystrix</artifactId>
4</dependency>
5
6

2、在 ribbon-consumer 工程的主类上使用 @EnableCircuitBreaker 注解开启断路器功能:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1@EnableCircuitBreaker
2@EnableDiscoveryClient
3@SpringBootApplication
4   public class DemoApplication {
5       @Bean
6       @LoadBalanced
7       RestTemplate restTemplate(){
8           return new RestTemplate();
9       }
10    public static void main(String[] args) {
11        SpringApplication.run(DemoApplication.class, args);
12    }
13}
14
15

还可以使用 Spring Cloud 应用中的 @SpringCloudApplication 注解来修饰主类,该注解的具体定义如下。可以看到,该注解中包含了上述所引用的三个注解,这意味着一个 Spring Cloud 标准应用应包含服务发现以及断路器

3、改造服务消费方式,新增 HelloService 类,注入 RestTemplate 实例。然后,将在 ConsumerController 中对 RestTemplate 的使用迁移到 helloService 函数中,最后,在 helloService 函数上增加 @HystrixCommand 注解来指定回调方法。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1@Service
2public class HelloService {
3
4    @Autowired
5    RestTemplate restTemplate;
6
7    @HystrixCommand(fallbackMethod = "helloFallback")
8    public String helloService(){
9        return restTemplate.getForEntity("http://hello-service/index",
10                String.class).getBody();
11    }
12
13    public String helloFallback(){
14        return "error";
15    }
16}
17
18

4、修改 ConsumerController 类, 注入上面实现的 HelloService 实例,并在 helloConsumer 中进行调用:


1
2
3
4
5
6
7
8
9
10
11
12
13
1@RestController
2public class ConsumerController {
3
4    @Autowired
5    HelloService helloService;
6
7    @RequestMapping(value = "ribbon-consumer", method = RequestMethod.GET)
8    public String helloConsumer(){
9        return helloService.helloService();
10    }
11}
12
13

5、对断路器实现的服务回调逻辑进行验证,重新启动之前关闭的 2221 端口的 hello-service,确保此时服务注册中心、两个 hello-service 和 ribbon-consumer 均已启动,再次访问 http://localhost:3333/ribbon-consumer 可以轮询两个 hello-serive 并返回一些文字信息。此时断开其中任意一个端口的 hello-service,再次访问,当轮询到关闭的端口服务时,输出内容为 error ,不再是之前的提示信息。

二、原理分析

工作流程:
在这里插入图片描述
1、创建HystrixCommand或者HystrixObservableCommand对象。用来表示对依赖服务的请求操作。使用的是命令模式。
其中HystrixCommand返回的是单个操作结果,HystrixObservableCommand返回多个结果

2、命令执行:共有4中方法执行命令。
execute() : 同步执行,从依赖的服务里返回单个结果或抛出异常
queue():异步执行,直接返回一个Future对象
observe():返回observable对象,代表了多个结果,是一个Hot Observable
toObservable():返回Observable对象,但是是一个 Cold Observable

3、结果是否被缓存。如果已经启用了缓存功能,且被命中,那么缓存就会直接以Observable对象返回

4、断路器是否已打开。没有命中缓存,在执行命令前会检查断路器是否已打开。断路器已打开,直接执行fallback。断路器关闭,继续往下执行

5、线程池And信号量Or请求队列是否已被占满 如果与该命令有关的线程池和请求队列,或者信号量已经被占满,就直接执行fallback

6、执行HystrixObservableCommand.construct () 或 HystrixCommand.run() 方法。如果设置了当前执行时间超过了设置的timeout,则当前处理线程会抛出一个TimeoutyException,如果命令不在自身线程里执行,就会通过单独的计时线程来抛出异常,Hystrix会直接执行fallback逻辑,并忽略run或construct的返回值。

7、计算断路器的健康值。

8、fallback处理。

9、返回成功的响应。

断路器原理:
在这里插入图片描述
依赖隔离:
Hystrix 使用 “舱壁模式” 实现线程池的隔离,它为每一个依赖服务创建一个独立的线程池,就算某个依赖服务出现延迟过高的情况,也不会拖慢其他的依赖服务。

三、使用详解

1、创建请求命令

继承方式实现HystrixCommand


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1public class UserCommand extends HystrixCommand<User> {
2
3    private RestTemplate restTemplate;
4
5    private Long id;
6
7    public UserCommand(Setter setter, RestTemplate restTemplate, Long id) {
8        super(setter);
9        this.restTemplate = restTemplate;
10        this.id = id;
11    }
12    
13    @Override
14    protected User run() throws Exception {
15        return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
16    }
17
18}
19
20

通过上面实现的UserCommand,我们即可以实现请求的同步执行也可以实现异步执行


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1public User getUserById(Long id) {
2        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("userKey");
3        com.netflix.hystrix.HystrixCommand.Setter setter = com.netflix.hystrix.HystrixCommand.Setter.withGroupKey(groupKey);
4        UserCommand userCommand = new UserCommand(setter, restTemplate, id);
5        // 同步执行获取结果
6//        return userCommand.execute();
7        // 异步执行获取结果
8        Future<User> future = userCommand.queue();
9        try {
10            return future.get();
11        } catch (Exception e) {
12            logger.info("获取结果发生异常", e);
13        }
14        return null;
15    }
16
17

注解方式使用HystrixCommand
同步:


1
2
3
4
5
6
1 @HystrixCommand
2    public User findUserById(Long id) {
3        return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
4    }
5
6

异步:


1
2
3
4
5
6
7
8
9
10
11
1 @HystrixCommand
2    public Future<User> asyncFindUserFutureById(Long id) {
3        return new AsyncResult<User>() {
4            @Override
5            public User invoke() {
6                return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
7            }
8        };
9    }
10
11

响应执行
还可以将HystrixCommand通过Observable来实现响应式执行方式。通过调用observe()和toObservable()可以返回Observable对象。


1
2
3
4
1Observable<User> observe = userCommand.observe();
2Observable<User> observe = userCommand.toObservable();
3
4

前者返回的是一个Hot Observable,该命令会在observe调用的时候立即执行,当Observable每次被订阅的时候都会重放它的行为。

后者返回的是一个Cold Observable,toObservable()执行之后,命令不会被立即执行,只有当所有订阅者都订阅他之后才会执行。

HystrixCommand具备了observe()和toObservable()的功能,但是它的实现有一定的局限性,它返回的Observable只能发射一次数据,所以Hystrix提供了另外的一个特殊命令封装HysrtixObservableCommand,通过命令可以发射多次的Observable。

响应执行自定义命令


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1public class UserObservableCommand extends HystrixObservableCommand<User> {
2
3    private RestTemplate restTemplate;
4
5    private Long id;
6
7    public UserObservableCommand (Setter setter, RestTemplate restTemplate, Long id) {
8        super(setter);
9        this.restTemplate = restTemplate;
10        this.id = id;
11    }
12
13    @Override
14    protected Observable<User> construct() {
15        return Observable.create(subscriber -> {
16            if (!subscriber.isUnsubscribed()) {
17                User user = restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
18                subscriber.onNext(user);
19                subscriber.onCompleted();
20            }
21        });
22    }
23}
24
25

响应执行使用注解@HystrixCommand


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1@Service
2public class HelloService {
3
4    private static final Logger logger = LoggerFactory.getLogger(HelloService.class);
5
6    @Autowired
7    private RestTemplate restTemplate;
8
9    @HystrixCommand
10    public Observable<User> observableGetUserId(Long id) {
11        return Observable.create(subscriber -> {
12            if (!subscriber.isUnsubscribed()) {
13                User user = restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
14                subscriber.onNext(user);
15                subscriber.onCompleted();
16            }
17        });
18    }
19
20}
21
22

使用@HystrixCommand注解实现响应式命令,可以通过observableExecutionMode参数来控制是使用observe()还是toObservable()的执行方式。该参数有下面两种设置方式:

@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER): EAGER是该参数的模式值,表示使用observe()执行方式。
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY): 表示使用toObservable()执行方式。

2.定义服务降级

fallback是Hystrix命令执行失败时使用得后备方法,用来实现服务降级处理逻辑。
继承方式:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1public class UserCommand extends HystrixCommand<User> {
2
3    private RestTemplate restTemplate;
4
5    private Long id;
6
7    public UserCommand(Setter setter,RestTemplate restTemplate,Long id){
8        super(setter);
9        this.restTemplate = restTemplate;
10        this.id = id;
11    }
12
13    @Override
14    protected User run() throws Exception {
15        return restTemplate.getForObject("Http://user-service/hello/user/{1}",User.class,id);
16    }
17
18    @Override
19    protected User getFallback(){
20        return new User();
21    }
22}
23
24

注解方式(含多次降级):


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1
2public class UserService {
3
4    @Autowired
5    private RestTemplate restTemplate;
6
7    @HystrixCommand(fallbackMethod = "defaultUser")
8    public User getUserById(Long id){
9        return restTemplate.getForObject("Http://user-service/hello/user/{1}",User.class,id);
10    }
11    
12    @HystrixCommand(fallbackMethod = "defaultUserSec")
13    public User defaultUser(){
14          //此处可能是另外一个网络请求获取,也有可能失败
15          return new User("First Fallback");
16  }
17 
18    public  User dedalutUserSec(String id,Throwable e){
19         return new User("Second Fallback");
20    }
21}
22
23

3.异常处理

异常传播
在HystrixCommand实现的run()方法中抛出异常时,除了HystrixBadRequestException之外,其他异常均会被Hystrix认为命令执行失败并触发服务降级的处理逻辑,所以当需要在命令执行中抛出不触发服务降级的异常时来选择它。

在使用注解配置实现Hystrix命令时,可以忽略指定的异常类型,只需要通过设置@HystrixCommand注解的ignoreExceptions参数,如下:


1
2
3
4
5
6
1  @HystrixCommand(fallbackMethod = "getDefaultUser", ignoreExceptions = NullPointerException.class)
2    public User findUserById(Long id) {
3        return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
4    }
5
6

异常获取
在继承实现Hystrix命令时,可以在getFallback()方法中通过getExecutionException()方法来获取具体的异常,然后通过判断来进入不同的处理逻辑。

在注解配置方式中,在fallback实现方法的参数中增加Throwable e对象的定义,这样在方法内部就可以获取触发服务降级的具体异常内容。

4.命令名称、分组和线程池划分

继承实现自定义命令
先调用了withGroupKey来设置命令组名,然后才通过调用andCommandKey来设置命令名。
还提供HystrixThreadPoolKey来对线程池进行设置,通过它可以实现更细粒度的线程池划分。


1
2
3
4
5
6
7
1public UserCommand(RestTemplate restTemplate, Long id) {
2        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GroupName"))
3                .andCommandKey(HystrixCommandKey.Factory.asKey("CommandName"))
4                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));
5    }
6
7

在没有指定HystrixThreadPoolKey的情况下,会使用命令组的方式来划分线程池。通常情况下,我们尽量使用HystrixThreadPoolKey来指定线程池的划分。 因为多个不同的命令可能从业务逻辑上来看属于同一个组,但是往往从实现本身上需要跟其他命令来进行隔离。

使用注解时只需要设置注解的commandKey、groupKey以及threadPoolKey属性即可,他分别表示了命令名称、分组以及线程池划分。


1
2
3
4
5
6
1@HystrixCommand(fallbackMethod = "getDefaultUser", ignoreExceptions = NullPointerException.class,commandKey = "findUserById", groupKey = "UserGroup", threadPoolKey = "findUserByIdThread")
2public User findUserById(Long id) {
3        return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
4}
5
6

5.请求缓存

在高并发的场景之下,Hystrix中提供了请求缓存的功能,可以很方便的开启和使用请求缓存来优化系统,达到减轻高并发时的请求线程消耗、降低请求响应时间的效果。

开启:实现HystrixCommand时重载getCacheKey()
清理:HystrixRequsetCache.clear()


1
2
3
4
5
6
7
8
9
10
11
1@Override
2public String getCacheKey() {
3       return String.valueOf(id);
4}
5 public static void flushRequestCache(Long id){
6        HystrixRequestCache.getInstance(
7                HystrixCommandKey.Factory.asKey("test"), HystrixConcurrencyStrategyDefault.getInstance())
8                .clear(String.valueOf(id));
9}
10
11

使用注解实现:
@CacheResult
@CacheRemove
@CacheKey

6.请求合并

在高并发情况下,通信次数的增加会导致总的通信时间增加,同时,线程池的资源也是有限的,高并发环境会导致有大量的线程处于等待状态,进而导致响应延迟,为了解决这些问题,我们需要Hystrix的请求合并。

1、假设微服务提供了两个接口,一个请求单个一个请求一个批量列表,服务消费端定义两个接口,一个是单个请求,第二个是批量请求,并且用restTemplate实现了远程调用。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1public class UserServiceImpl implements UserService {
2
3    @Autowired
4    private RestTemplate restTemplate;
5
6
7    @Override
8    public User find(Long id) {
9        return restTemplate.getForObject
10                ("http//user-service/users/{1}",User.class,id);
11    }
12
13    @Override
14    public List<User> findAll(List<Long> ids) {
15        return restTemplate.getForObject
16                ("http//user-service/users?ids={1}",List.class, StringUtils.join(ids,","));
17    }
18}
19
20

2、为请求合并实现一个批量请求


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1public class UserBatchCommand extends HystrixCommand<List<User>> {
2
3    UserService userService;
4    List<Long> userIds;
5
6    public UserBatchCommand(UserService userService,List<Long> userIds){
7        super(Setter.withGroupKey(asKey("userBatchCommand")));
8        this.userIds = userIds;
9        this.userService = userService;
10    }
11
12    @Override
13    protected List<User> run() throws Exception {
14        return userService.findAll(userIds);
15    }
16}
17
18

3、继承HystrixCollapser实现请求合并器:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1public class UserCollapseCommand extends HystrixCollapser<List<User>,User,Long> {
2
3    private UserService userService;
4
5    private Long userId;
6
7    public UserCollapseCommand(UserService userService,Long userId){
8        super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userBatchCommand")).andCollapserPropertiesDefaults(
9                HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)//设置时间延迟属性
10        ));
11        this.userService = userService;
12        this.userId = userId;
13    }
14
15    @Override
16    public Long getRequestArgument() {
17        return userId;//返回给定的单个请求参数
18    }
19
20    @Override
21    protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Long>> collection) {
22        List<Long> userIds = new ArrayList<>(collection.size());
23        userIds.addAll(collection.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
24        return new UserBatchCommand(userService,userIds);
25    }
26
27    @Override
28    protected void mapResponseToRequests(List<User> users, Collection<CollapsedRequest<User, Long>> collection) {
29        int count = 0;
30        for (CollapsedRequest<User,Long> collapsedRequest:collection){
31            User user = users.get(count++);
32            collapsedRequest.setResponse(user);
33        }
34    }
35}
36
37

通过注解形式来实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1@Service
2public class UserService {
3
4    @Autowired
5    RestTemplate restTemplate;
6
7    @HystrixCollapser(batchMethod = "findAll",collapserProperties = {@HystrixProperty(name="timerDelayInMilliseconds",value = "100")})
8    public User find(Long id){
9           return null;
10    }
11
12    @HystrixCommand
13    public List<User> findAll(List<Long> ids){
14        return restTemplate.getForObject("http://user-service/user?ids={1}",List.class, StringUtils.join(ids,","));
15    }
16
17

四、属性详解

继承方法:Setter对象设置
注解方法:command-Properties

类型:
Execution:控制HystrixCommand.run() 的如何执行
Fallback: 控制HystrixCommand.getFallback() 如何执行
Circuit Breaker: 控制断路器的行为
Metrics: 捕获和HystrixCommand 和 HystrixObservableCommand 执行信息相关的配置属性
Request Context:设置请求上下文的属性
Collapser Properties:设置请求合并的属性
Thread Pool Properties:设置线程池的属性

五、Hystrix仪表盘与Turbine集群监控

度量指标都是HystrixCommand和HystrixObservableCommand实例在执行过程中记录的重要信息,它们除了在Hystrix断路器实现中使用之外,对于系统运维的帮助也很大。

度量指标会以“滚动时间窗”与“桶”结合的方式进行汇总,并在内存中驻留一段时间,以供内部或外部查询使用,Hystrix仪表盘就是这些指标内容的消费者之一。

在这里插入图片描述

不管是监控单体应用还是Turbine集群监控,我们都需要一个Hystrix Dashboard,单独创建一个新的工程专门用来做Hystrix Dashboard。
具体搭建略

在实际应用中,我们要监控的应用往往是一个集群,这个时候我们就得采取Turbine集群监控了。Turbine有一个重要的功能就是汇聚监控信息,并将汇聚到的监控信息提供给Hystrix Dashboard来集中展示和监控。
在这里插入图片描述
具体搭建略

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!