go-kit实践之4:go-kit微服务熔断机制的实现

释放双眼,带上耳机,听听看~!

在微服务架构中,每一个微服务都是一个独立的业务功能单元,而一个应用一般由多个微服务组成,微服务之间的交互是通过RPC(远程过程调用)完成。

比如,我们的应用是微服务A调用微服务B和微服务C来完成的,而微服务B又需要调用微服务D,微服务D又需要调用微服务E。如果在调用的链路上对微服务E的调用,响应时间过长或者服务不可用,那么对微服务D的调用就会占用越来越多的系统资源,进而引起微服务D的系统崩溃,微服务D的不可用,又会连锁反应的引起微服务B崩溃,进而微服务A崩溃,最终导致整个应用不可用。这也就是所谓的“雪崩效应”。

介绍

go-kit 提供了三种熔断

1、 gobreaker

2、 handy

3、 hystrix-go

hystrix用的比较多,我们来介绍下go-kit中hystrix的使用方法

go-kit的hystrix
Middleware的实现
1、 Hystrix返回Middleware 此中间件会在原来的endPoint包一层Hystrix的endPoint

2、 hystrix通过传入的commanName获取对应的Hystrix的设置,并设置run失败时运行的fallback函数为nil

3、 我们也可以自己实现middleware包装endPoint


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1func Hystrix(commandName string) endpoint.Middleware {  
2   return func(next endpoint.Endpoint) endpoint.Endpoint {  
3      return func(ctx context.Context, request interface{}) (response interface{}, err error) {  
4         var resp interface{}  
5         if err := hystrix.Do(commandName, func() (err error) {  
6            resp, err = next(ctx, request)  
7            return err  
8         }, nil); err != nil {  
9            return nil, err  
10         }  
11         return resp, nil  
12      }  
13   }  
14}
15

客户端hystrix配置
1、Timeout 【请求超时的时间】

2、ErrorPercentThreshold【允许出现的错误比例】

3、SleepWindow【熔断开启多久尝试发起一次请求】

4、MaxConcurrentRequests【允许的最大并发请求数】

5、RequestVolumeThreshold 【波动期内的最小请求数,默认波动期10S】


1
2
3
4
5
6
7
8
9
10
1commandName := "my-endpoint"  
2hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{  
3  Timeout: 1000 * 30,  
4  ErrorPercentThreshold: 1,  
5  SleepWindow: 10000,  
6  MaxConcurrentRequests: 1000,  
7  RequestVolumeThreshold: 5,  
8})
9
10

增加熔断中间件的包装


1
2
3
4
1breakerMw := circuitbreaker.Hystrix(commandName)
2//增加熔断中间件  
3reqEndPoint = breakerMw(reqEndPoint)
4

实例

1、protobuf文件及生成对应的go文件


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1
2syntax = "proto3";
3
4// 请求书详情的参数结构  book_id 32位整形
5message BookInfoParams {
6    int32 book_id = 1;
7}
8
9
10// 书详情信息的结构   book_name字符串类型
11message BookInfo {
12    int32 book_id = 1;
13    string  book_name = 2;
14}
15
16// 请求书列表的参数结构  page、limit   32位整形
17message BookListParams {
18    int32 page = 1;
19    int32 limit = 2;
20}
21
22
23// 书列表的结构    BookInfo结构数组
24message BookList {
25    repeated BookInfo book_list = 1;
26}
27// 定义 获取书详情  和 书列表服务   入参出参分别为上面所定义的结构
28service BookService {
29    rpc GetBookInfo (BookInfoParams) returns (BookInfo) {}
30    rpc GetBookList (BookListParams) returns (BookList) {}
31}
32

生成对应的go语言代码文件:protoc –go_out=plugins=grpc:. book.proto  (其中:protobuf文件名为:book.proto)

注:由于演示熔断机制,也就是Server出现问题的时候进行熔断,因此本文Server端代码可以不用。

2、Client端代码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
1package main
2
3import (
4   "MyKit"
5   "context"
6   "fmt"
7   "github.com/afex/hystrix-go/hystrix"
8   "github.com/go-kit/kit/circuitbreaker"
9   "github.com/go-kit/kit/endpoint"
10  "github.com/go-kit/kit/log"
11  "github.com/go-kit/kit/sd"
12  "github.com/go-kit/kit/sd/etcdv3"
13  "github.com/go-kit/kit/sd/lb"
14  "google.golang.org/grpc"
15  "io"
16  "time"
17)
18
19func main() {
20
21  var (
22      //注册中心地址
23      etcdServer = "127.0.0.1:2379"
24      //监听的服务前缀
25      prefix = "/services/book/"
26      ctx    = context.Background()
27  )
28  //对hystrix进行配置
29  commandName:="my_endpoint"
30  hystrix.ConfigureCommand(commandName,hystrix.CommandConfig{
31      Timeout:1000*3, //超时
32      MaxConcurrentRequests:100, //最大并发的请求数
33      RequestVolumeThreshold:5,//请求量阈值
34      SleepWindow:10000, //熔断开启多久尝试发起一次请求
35      ErrorPercentThreshold:1, //误差阈值百分比
36  })
37  breakerMw:=circuitbreaker.Hystrix(commandName) //定义熔断器中间件
38  options := etcdv3.ClientOptions{
39      DialTimeout:   time.Second * 3,
40      DialKeepAlive: time.Second * 3,
41  }
42  //连接注册中心
43  client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
44  if err != nil {
45      panic(err)
46  }
47  logger := log.NewNopLogger()
48  //创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
49  instancer, err := etcdv3.NewInstancer(client, prefix, logger)
50  if err != nil {
51      panic(err)
52  }
53  //创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint
54  endpointer := sd.NewEndpointer(instancer, reqFactory, logger) //reqFactory自定义的函数,主要用于端点层(endpoint)接受并显示数据
55  //创建负载均衡器
56  balancer := lb.NewRoundRobin(endpointer)
57
58  /**
59  我们可以通过负载均衡器直接获取请求的endPoint,发起请求
60  reqEndPoint,_ := balancer.Endpoint()
61  */
62
63  /**
64  也可以通过retry定义尝试次数进行请求
65  */
66  reqEndPoint := lb.Retry(3, 100*time.Second, balancer) //请求次数为3,时间为10S(时间需要多于服务器限流时间3s)
67
68  //增加熔断中间件
69  reqEndPoint=breakerMw(reqEndPoint)
70
71  //现在我们可以通过 endPoint 发起请求了
72  req := struct{}{}
73  for i:=0;i<20;i++ {  //发生20次请求
74      ctx=context.Background()
75      if _, err = reqEndPoint(ctx, req); err != nil {
76          //panic(err)
77          fmt.Println("当前时间: ", time.Now().Format("2006-01-02 15:04:05.99"),"\t第",i+1,"次")
78          fmt.Println(err)
79          time.Sleep(1*time.Second)
80      }
81  }
82}
83
84//通过传入的 实例地址  创建对应的请求endPoint
85func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
86  return func(ctx context.Context, request interface{}) (interface{}, error) {
87      conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
88      if err != nil {
89          fmt.Println(err)
90          panic("connect error")
91      }
92      defer conn.Close()
93      bookClient := book.NewBookServiceClient(conn)
94      bi, _ := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1})
95      fmt.Println("获取书籍详情")
96      fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)
97      fmt.Println("请求服务成功: ", instanceAddr,"当前时间为:",time.Now().Format("2006-01-02 15:04:05.99"))
98      /*bl, _ := bookClient.GetBookList(context.Background(), &book.BookListParams{Page: 1, Limit: 10})
99      fmt.Println("获取书籍列表")
100     for _, b := range bl.BookList {
101         fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName)
102     }*/
103     return nil, nil
104 }, nil, nil
105}
106
107

3、运行及分析

直接运行Client端(不用启动etcd、Server),效果如下:

通过上面的输出记录可以验证我们的配置:

1、 前5条波动期内的错误,没有触发circuit开启(RequestVolumeThreshold:5,//请求量阈值)

2、 circuit开启后请求熔断生效(输出内容:hystrix: circuit open)

3、 circuit开启10S后,SleepWindow测试发起请求设置生效(第16次输出的内容;设置:SleepWindow:10000, //熔断开启多久尝试发起一次请求)

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

Linux(内核剖析):06---进程之线程的实现

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索