介绍
go-kit提供了限流模块,该模块采用令牌桶算法实现,其实是封装了一下golang自带的golang.org/x/time/rate包来实现的。
令牌桶
令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送流量。令牌桶中的每一个令牌都代表一个字节。如果令牌桶中存在令牌,则允许发送流量;而如果令牌桶中不存在令牌,则不允许发送流量。因此,如果突发门限被合理地配置并且令牌桶中有足够的令牌,那么流量就可以以峰值速率发送。
令牌桶算法的基本过程如下:
假如用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中;
假设桶最多可以存发b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃;
当一个n个字节的[数据包]到达时,就从令牌桶中删除n个令牌,并且数据包被发送到网络;
如果令牌桶中少于n个令牌,那么不会删除令牌,并且认为这个数据包在流量限制之外;
两种限流
1、DelayingLimiter【限流延迟访问】
2、ErroringLimiter【限流错误返回】
Middleware
因为endpoint的封装,我们在使用go-kit提供的其它中间件时十分简单。下面就是一个完整的限流延迟中间件
把已有的endPoint外再包一层endPoint,再从最外层向内一层层调用
1
2
3
4
5
6
7
8
9
10
11 1func NewDelayingLimiter(limit Waiter) endpoint.Middleware {
2 return func(next endpoint.Endpoint) endpoint.Endpoint {
3 return func(ctx context.Context, request interface{}) (interface{}, error) {
4 if err := limit.Wait(ctx); err != nil {
5 return nil, err
6 }
7 return next(ctx, request)
8 }
9 }
10}
11
使用延迟限流
在go-kit 实现注册发现与负载均衡(https://blog.csdn.net/weixin_42117918/article/details/89208850)Server处代码做一下更改:
原来代码:
1
2
3
4
5
6
7
8
9 1bookServer := new(BookServer)
2 bookInfoHandler := grpc_transport.NewServer(
3 makeGetBookInfoEndpoint(),
4 decodeRequest,
5 encodeResponse,
6 )
7 bookServer.bookInfoHandler = bookInfoHandler
8
9
更改之后的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13 1bookServer := new(BookServer)
2 bookInfoEndPoint:=makeGetBookInfoEndpoint()
3 //rate路径:golang.org/x/time/rate
4 limiter := rate.NewLimiter(rate.Every(time.Second * 3), 1)//限流3秒,临牌数:1
5 //通过DelayingLimiter中间件,在bookInfoEndPoint 的外层再包裹一层限流的endPoint
6 bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint)
7 bookInfoHandler := grpc_transport.NewServer(
8 bookInfoEndPoint, //限流的endpoint
9 decodeRequest,
10 encodeResponse,
11 )
12 bookServer.bookInfoHandler = bookInfoHandler
13
完整代码
1、Server端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 1package main
2
3import (
4 "MyKit"
5 "context"
6 "fmt"
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/ratelimit"
10 "github.com/go-kit/kit/sd/etcdv3"
11 grpc_transport "github.com/go-kit/kit/transport/grpc"
12 "golang.org/x/time/rate"
13 "google.golang.org/grpc"
14 "net"
15 "time"
16)
17
18type BookServer struct {
19 bookListHandler grpc_transport.Handler
20 bookInfoHandler grpc_transport.Handler
21}
22
23//一下两个方法实现了 protoc生成go文件对应的接口:
24/*
25// BookServiceServer is the server API for BookService service.
26type BookServiceServer interface {
27 GetBookInfo(context.Context, *BookInfoParams) (*BookInfo, error)
28 GetBookList(context.Context, *BookListParams) (*BookList, error)
29}
30*/
31//通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
32func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) {
33
34 _, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in)
35 if err != nil {
36 return nil, err
37
38 }
39 /*
40 if info,ok:=rsp.(*book.BookInfo);ok {
41 return info,nil
42 }
43 return nil,errors.New("rsp.(*book.BookInfo)断言出错")
44 */
45 return rsp.(*book.BookInfo), err //直接返回断言的结果
46}
47
48//通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
49func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) {
50 _, rsp, err := s.bookListHandler.ServeGRPC(ctx, in)
51 if err != nil {
52 return nil, err
53 }
54 return rsp.(*book.BookList), err
55}
56
57//创建bookList的EndPoint
58func makeGetBookListEndpoint()endpoint.Endpoint {
59 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
60 b:=new(book.BookList)
61 b.BookList=append(b.BookList,&book.BookInfo{BookId:1,BookName:"Go语言入门到精通"})
62 b.BookList=append(b.BookList,&book.BookInfo{BookId:2,BookName:"微服务入门到精通"})
63 b.BookList=append(b.BookList,&book.BookInfo{BookId:2,BookName:"区块链入门到精通"})
64 return b,nil
65 }
66}
67
68//创建bookInfo的EndPoint
69func makeGetBookInfoEndpoint() endpoint.Endpoint {
70 return func(ctx context.Context, request interface{}) (interface{}, error) {
71 //请求详情时返回 书籍信息
72 req := request.(*book.BookInfoParams)
73 b := new(book.BookInfo)
74 b.BookId = req.BookId
75 b.BookName = "Go入门到精通"
76 return b, nil
77 }
78}
79
80func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {
81 return req, nil
82}
83
84func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) {
85 return rsp, nil
86}
87
88func main() {
89 var (
90 etcdServer = "127.0.0.1:2379" //etcd服务的IP地址
91 prefix = "/services/book/" //服务的目录
92 ServerInstance = "127.0.0.1:50052" //当前实例Server的地址
93 key = prefix + ServerInstance //服务实例注册的路径
94 value = ServerInstance
95 ctx = context.Background()
96 //服务监听地址
97 serviceAddress = ":50052"
98 )
99 //etcd连接参数
100 option := etcdv3.ClientOptions{DialTimeout: time.Second * 3, DialKeepAlive: time.Second * 3}
101 //创建连接
102 client, err := etcdv3.NewClient(ctx, []string{etcdServer}, option)
103 if err != nil {
104 panic(err)
105 }
106 //创建注册
107 registrar := etcdv3.NewRegistrar(client, etcdv3.Service{Key: key, Value: value}, log.NewNopLogger())
108 registrar.Register() //启动注册服务
109 bookServer := new(BookServer)
110 bookInfoEndPoint:=makeGetBookInfoEndpoint()
111 //rate路径:golang.org/x/time/rate
112 limiter := rate.NewLimiter(rate.Every(time.Second * 3), 1) //限流3秒,临牌数:1
113 //通过DelayingLimiter中间件,在bookInfoEndPoint 的外层再包裹一层限流的endPoint
114 bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint)
115 bookListHandler := grpc_transport.NewServer(
116 makeGetBookListEndpoint(),
117 decodeRequest,
118 encodeResponse,
119 )
120 bookServer.bookListHandler = bookListHandler
121 bookInfoHandler := grpc_transport.NewServer(
122 bookInfoEndPoint,
123 decodeRequest,
124 encodeResponse,
125 )
126 bookServer.bookInfoHandler = bookInfoHandler
127 listener, err := net.Listen("tcp", serviceAddress) //网络监听,注意对应的包为:"net"
128 if err != nil {
129 fmt.Println(err)
130 return
131 }
132 gs := grpc.NewServer(grpc.UnaryInterceptor(grpc_transport.Interceptor))
133 book.RegisterBookServiceServer(gs, bookServer) //调用protoc生成的代码对应的注册方法
134 gs.Serve(listener) //启动Server
135
136}
137
138
2、Client端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 1package main
2
3import (
4 "MyKit"
5 "context"
6 "fmt"
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
10 "github.com/go-kit/kit/sd/etcdv3"
11 "github.com/go-kit/kit/sd/lb"
12 "google.golang.org/grpc"
13 "io"
14 "time"
15)
16
17func main() {
18
19 var (
20 //注册中心地址
21 etcdServer = "127.0.0.1:2379"
22 //监听的服务前缀
23 prefix = "/services/book/"
24 ctx = context.Background()
25 )
26 options := etcdv3.ClientOptions{
27 DialTimeout: time.Second * 3,
28 DialKeepAlive: time.Second * 3,
29 }
30 //连接注册中心
31 client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
32 if err != nil {
33 panic(err)
34 }
35 logger := log.NewNopLogger()
36 //创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
37 instancer, err := etcdv3.NewInstancer(client, prefix, logger)
38 if err != nil {
39 panic(err)
40 }
41 //创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint
42 endpointer := sd.NewEndpointer(instancer, reqFactory, logger) //reqFactory自定义的函数,主要用于端点层(endpoint)接受并显示数据
43 //创建负载均衡器
44 balancer := lb.NewRoundRobin(endpointer)
45
46 /**
47 我们可以通过负载均衡器直接获取请求的endPoint,发起请求
48 reqEndPoint,_ := balancer.Endpoint()
49 */
50
51 /**
52 也可以通过retry定义尝试次数进行请求
53 */
54 reqEndPoint := lb.Retry(30, 30*time.Second, balancer) //请求次数为30,时间为30S(时间需要多于服务器限流时间3s)
55
56 //现在我们可以通过 endPoint 发起请求了
57 for i:=0;i<10;i++ { //发送10次请求
58 req := struct{}{}
59 ctx=context.Background()
60 if _, err = reqEndPoint(ctx, req); err != nil {
61 panic(err)
62 }
63 }
64}
65
66//通过传入的 实例地址 创建对应的请求endPoint
67func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
68 return func(ctx context.Context, request interface{}) (interface{}, error) {
69 conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
70 if err != nil {
71 fmt.Println(err)
72 panic("connect error")
73 }
74 defer conn.Close()
75 bookClient := book.NewBookServiceClient(conn)
76 bi, _ := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1})
77 fmt.Println("获取书籍详情")
78 fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)
79 fmt.Println("请求服务成功: ", instanceAddr,"当前时间为:",time.Now().Format("2006-01-02 15:04:05.99"))
80 /*bl, _ := bookClient.GetBookList(context.Background(), &book.BookListParams{Page: 1, Limit: 10})
81 fmt.Println("获取书籍列表")
82 for _, b := range bl.BookList {
83 fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName)
84 }*/
85 return nil, nil
86 }, nil, nil
87}
88
89
运行效果及分析
1、启动etcd
2、启动Server
3、启动Client
client端运行效果如下:
可以看到Server对Client端进行了限流,限流时间间隔为:3s。