go-kit实践之5:go-kit微服务请求跟踪实现

释放双眼,带上耳机,听听看~!

一、介绍

go-kit 提供了两种tracing请求跟踪

1、opentracing【跟踪标准】

2、zipkin【zipkin的go封装】 
我们下面来介绍下zipkin在go-kit中的使用方法。

二、zipkin安装启动

1、ZipKin入门介绍

Zipkin是一款开源的分布式实时数据追踪系统(Distributed Tracking System),基于 Google Dapper的论文设计而来,由 Twitter 公司开发贡献。其主要功能是聚集来自各个异构系统的实时监控数据。分布式跟踪系统还有其他比较成熟的实现,例如:Naver的Pinpoint、Apache的HTrace、阿里的鹰眼Tracing、京东的Hydra、新浪的Watchman,美团点评的CAT,skywalking等。

2、ZipKin架构

ZipKin可以分为两部分,一部分是zipkin server,用来作为数据的采集存储、数据分析与展示;zipkin client是zipkin基于不同的语言及框架封装的一些列客户端工具,这些工具完成了追踪数据的生成与上报功能,架构如下:

Zipkin Server主要包括四个模块:
(1)Collector 接收或收集各应用传输的数据
(2)Storage 存储接受或收集过来的数据,当前支持Memory,MySQL,Cassandra,ElasticSearch等,默认存储在内存中。
(3)API(Query) 负责查询Storage中存储的数据,提供简单的JSON API获取数据,主要提供给web UI使用
(4)Web 提供简单的web界面
服务追踪流程如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1┌─────────────┐ ┌───────────────────────┐  ┌─────────────┐  ┌──────────────────┐
2│ User Code   │ │ Trace Instrumentation │  │ Http Client │  │ Zipkin Collector │
3└─────────────┘ └───────────────────────┘  └─────────────┘  └──────────────────┘
4       │                 │                         │                 │
5           ┌─────────┐
6       │ ──┤GET /foo ├─▶ │ ────┐                   │                 │
7           └─────────┘         │ record tags
8       │                 │ ◀───┘                   │                 │
9                           ────┐
10       │                 │     │ add trace headers │                 │
11                           ◀───┘
12       │                 │ ────┐                   │                 │
13                               │ record timestamp
14       │                 │ ◀───┘                   │                 │
15                             ┌─────────────────┐
16       │                 │ ──┤GET /foo         ├─▶ │                 │
17                             │X-B3-TraceId: aa │     ────┐
18       │                 │   │X-B3-SpanId: 6b  │   │     │           │
19                             └─────────────────┘         │ invoke
20       │                 │                         │     │ request   │
21                                                         │
22       │                 │                         │     │           │
23                                 ┌────────┐          ◀───┘
24       │                 │ ◀─────┤200 OK  ├─────── │                 │
25                           ────┐ └────────┘
26       │                 │     │ record duration   │                 │
27            ┌────────┐     ◀───┘
28       │ ◀──┤200 OK  ├── │                         │                 │
29            └────────┘       ┌────────────────────────────────┐
30       │                 │ ──┤ asynchronously report span     ├────▶ │
31                             │                                │
32                             │{                               │
33                             │  "traceId": "aa",              │
34                             │  "id": "6b",                   │
35                             │  "name": "get",                │
36                             │  "timestamp": 1483945573944000,│
37                             │  "duration": 386000,           │
38                             │  "annotations": [              │
39                             │--snip--                        │
40                             └────────────────────────────────┘
41
42

Instrumented client和server是分别使用了ZipKin Client的服务,Zipkin Client会根据配置将追踪数据发送到Zipkin Server中进行数据存储、分析和展示。

3、ZipKin几个概念

在追踪日志中,有几个基本概念spanId、traceId、parentId

traceId:用来确定一个追踪链的16字符长度的字符串,在某个追踪链中保持不变。

spanId:区域Id,在一个追踪链中spanId可能存在多个,每个spanId用于表明在某个服务中的身份,也是16字符长度的字符串。

parentId:在跨服务调用者的spanId会传递给被调用者,被调用者会将调用者的spanId作为自己的parentId,然后自己再生成spanId。
go-kit实践之5:go-kit微服务请求跟踪实现

4、window安装zipkin

(1)下载:

下载地址:https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec

(2)运行:

进入到下载的文件路径下,执行:java -jar xxx  (注:xxx:为下载的文件名,譬如下载的版本为:zipkin-server-2.12.8-exec.jar,则执行:java -jar zipkin-server-2.12.8-exec.jar 。
特别说明:Java-jar 需要安装jdk)

(3)访问:zipkin Server 运行后默认的访问地址:http://localhost:9411

go-kit实践之5:go-kit微服务请求跟踪实现

(4)调用链分析

go-kit实践之5:go-kit微服务请求跟踪实现

三、go-kit的zipkin

1、核心代码说明

服务端trace:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1//创建zipkin上报管理器
2reporter := http.NewReporter("http://localhost:9411/api/v2/spans")
3
4//运行结束,关闭上报管理器的for-select协程
5defer reporter.Close()  
6
7//创建trace跟踪器
8zkTracer, err := opzipkin.NewTracer(reporter)  
9
10//添加grpc请求的before after finalizer 事件对应要处理的trace操作方法
11zkServerTrace := zipkin.GRPCServerTrace(zkTracer)  
12
13//通过options的方式运行trace
14bookListHandler := grpctransport.NewServer(  
15   bookListEndPoint,  
16   decodeRequest,  
17   encodeResponse,  
18   zkServerTrace,  
19)
20
21

客户端trance:

与服务端trace的区别在于kitzipkin.GRPCClientTrace


1
2
3
4
5
6
7
1reporter := http.NewReporter("http://localhost:9411/api/v2/spans")  
2defer reporter.Close()  
3
4zkTracer, err := opzipkin.NewTracer(reporter)  
5zkClientTrace := zipkin.GRPCClientTrace(zkTracer)
6
7

可以通过span组装span结构树


1
2
3
4
1parentSpan := zkTracer.StartSpan("bookCaller")  
2defer parentSpan.Flush()  
3ctx = opzipkin.NewContext(context.Background(), parentSpan)
4

2、实例

1、protobuf文件及生成对应的go文件


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1
2syntax = "proto3";
3
4// 请求书详情的参数结构  book_id 32位整形
5message BookInfoParams {
6    int32 book_id = 1;
7}
8
9
10// 书详情信息的结构   book_name字符串类型
11message BookInfo {
12    int32 book_id = 1;
13    string  book_name = 2;
14}
15
16// 请求书列表的参数结构  page、limit   32位整形
17message BookListParams {
18    int32 page = 1;
19    int32 limit = 2;
20}
21
22
23// 书列表的结构    BookInfo结构数组
24message BookList {
25    repeated BookInfo book_list = 1;
26}
27// 定义 获取书详情  和 书列表服务   入参出参分别为上面所定义的结构
28service BookService {
29    rpc GetBookInfo (BookInfoParams) returns (BookInfo) {}
30    rpc GetBookList (BookListParams) returns (BookList) {}
31}
32

生成对应的go语言代码文件:protoc –go_out=plugins=grpc:. book.proto  (其中:protobuf文件名为:book.proto)

2、Server端代码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
1package main
2
3import (
4   "MyKit"
5   "context"
6   "github.com/go-kit/kit/endpoint"
7   "github.com/go-kit/kit/log"
8   "github.com/go-kit/kit/ratelimit"
9   "github.com/go-kit/kit/sd/etcdv3"
10  "github.com/go-kit/kit/tracing/zipkin"
11  grpctransport "github.com/go-kit/kit/transport/grpc"
12  opzipkin "github.com/openzipkin/zipkin-go"
13  "github.com/openzipkin/zipkin-go/reporter/http"
14  "golang.org/x/time/rate"
15  "google.golang.org/grpc"
16  "math/rand"
17  "net"
18  "time"
19)
20
21type BookServer struct {
22  bookListHandler grpctransport.Handler
23  bookInfoHandler grpctransport.Handler
24}
25
26//通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
27func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) {
28  _, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in)
29  if err != nil {
30      return nil, err
31  }
32  return rsp.(*book.BookInfo), err
33}
34
35//通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
36func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) {
37  _, rsp, err := s.bookListHandler.ServeGRPC(ctx, in)
38  if err != nil {
39      return nil, err
40  }
41  return rsp.(*book.BookList), err
42}
43
44//创建bookList的EndPoint
45func makeGetBookListEndpoint() endpoint.Endpoint {
46  return func(ctx context.Context, request interface{}) (interface{}, error) {
47      rand.Seed(time.Now().Unix())
48      randInt := rand.Int63n(200)
49      time.Sleep(time.Duration(randInt) * time.Millisecond)
50      //请求列表时返回 书籍列表
51      bl := new(book.BookList)
52      bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 1, BookName: "Go入门到精通"})
53      bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 2, BookName: "微服务入门到精通"})
54      bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 2, BookName: "区块链入门到精通"})
55      return bl, nil
56  }
57}
58
59//创建bookInfo的EndPoint
60func makeGetBookInfoEndpoint() endpoint.Endpoint {
61  return func(ctx context.Context, request interface{}) (interface{}, error) {
62      rand.Seed(time.Now().Unix())
63      randInt := rand.Int63n(200)
64      time.Sleep(time.Duration(randInt) * time.Microsecond)
65      //请求详情时返回 书籍信息
66      req := request.(*book.BookInfoParams)
67      b := new(book.BookInfo)
68      b.BookId = req.BookId
69      b.BookName = "Go入门系列"
70      return b, nil
71  }
72}
73
74func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {
75  return req, nil
76}
77
78func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) {
79  return rsp, nil
80}
81
82func main() {
83
84  var (
85      //etcd服务地址
86      etcdServer = "127.0.0.1:2379"
87      //服务的信息目录
88      prefix = "/services/book/"
89      //当前启动服务实例的地址
90      instance = "127.0.0.1:50051"
91      //服务实例注册的路径
92      key = prefix + instance
93      //服务实例注册的val
94      value = instance
95      ctx   = context.Background()
96      //服务监听地址
97      serviceAddress = ":50051"
98  )
99
100 //etcd的连接参数
101 options := etcdv3.ClientOptions{
102     DialTimeout:   time.Second * 3,
103     DialKeepAlive: time.Second * 3,
104 }
105 //创建etcd连接
106 client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
107 if err != nil {
108     panic(err)
109 }
110
111 // 创建注册器
112 registrar := etcdv3.NewRegistrar(client, etcdv3.Service{
113     Key:   key,
114     Value: value,
115 }, log.NewNopLogger())
116
117 // 注册器启动注册
118 registrar.Register()
119 //启动追踪
120 reporter := http.NewReporter("http://localhost:9411/api/v2/spans") //追踪地址
121 defer reporter.Close()
122 zkTracer, err := opzipkin.NewTracer(reporter)     //实例化追踪器
123 zkServerTrace := zipkin.GRPCServerTrace(zkTracer) //追踪器Server端
124 bookServer := new(BookServer)
125 bookListEndPoint := makeGetBookListEndpoint()
126 //创建限流器 1r/s  limiter := rate.NewLimiter(rate.Every(time.Second * 1), 100000)
127 //通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint
128 limiter := rate.NewLimiter(rate.Every(time.Second*1), 1) //限流1秒,临牌数:1
129 bookListEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookListEndPoint)
130
131 bookListHandler := grpctransport.NewServer(
132     bookListEndPoint,
133     decodeRequest,
134     encodeResponse,
135     zkServerTrace, //添加追踪
136 )
137 bookServer.bookListHandler = bookListHandler
138
139 bookInfoEndPoint := makeGetBookInfoEndpoint()
140 //通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint
141 bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint)
142 bookInfoHandler := grpctransport.NewServer(
143     bookInfoEndPoint,
144     decodeRequest,
145     encodeResponse,
146     zkServerTrace,
147 )
148 bookServer.bookInfoHandler = bookInfoHandler
149
150 ls, _ := net.Listen("tcp", serviceAddress)
151 gs := grpc.NewServer(grpc.UnaryInterceptor(grpctransport.Interceptor))
152 book.RegisterBookServiceServer(gs, bookServer)
153 gs.Serve(ls)
154}
155
156

3、Client端代码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
1package main
2
3import (
4   "MyKit"
5   "context"
6   "fmt"
7   "github.com/afex/hystrix-go/hystrix"
8   "github.com/go-kit/kit/circuitbreaker"
9   "github.com/go-kit/kit/endpoint"
10  "github.com/go-kit/kit/log"
11  "github.com/go-kit/kit/sd"
12  "github.com/go-kit/kit/sd/etcdv3"
13  "github.com/go-kit/kit/sd/lb"
14  "github.com/go-kit/kit/tracing/zipkin"
15  grpctransport "github.com/go-kit/kit/transport/grpc"
16  opzipkin "github.com/openzipkin/zipkin-go"
17  "github.com/openzipkin/zipkin-go/reporter/http"
18  "google.golang.org/grpc"
19  "io"
20  "time"
21)
22
23func main() {
24  commandName := "my-endpoint"
25  hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{
26      Timeout:                1000 * 30,
27      ErrorPercentThreshold:  1,
28      SleepWindow:            10000,
29      MaxConcurrentRequests:  1000,
30      RequestVolumeThreshold: 5,
31  })
32  breakerMw := circuitbreaker.Hystrix(commandName)
33
34  var (
35      //注册中心地址
36      etcdServer = "127.0.0.1:2379"
37      //监听的服务前缀
38      prefix = "/services/book/"
39      ctx    = context.Background()
40  )
41  options := etcdv3.ClientOptions{
42      DialTimeout:   time.Second * 3,
43      DialKeepAlive: time.Second * 3,
44  }
45  //连接注册中心
46  client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
47  if err != nil {
48      panic(err)
49  }
50  logger := log.NewNopLogger()
51  //创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
52  instancer, err := etcdv3.NewInstancer(client, prefix, logger)
53  if err != nil {
54      panic(err)
55  }
56  //创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint
57  endpointer := sd.NewEndpointer(instancer, reqFactory, logger)
58  //创建负载均衡器
59  balancer := lb.NewRoundRobin(endpointer)
60
61  /**
62  我们可以通过负载均衡器直接获取请求的endPoint,发起请求
63  reqEndPoint,_ := balancer.Endpoint()
64  */
65  /**
66  也可以通过retry定义尝试次数进行请求
67  */
68  reqEndPoint := lb.Retry(3, 100*time.Second, balancer)
69
70  //增加熔断中间件
71  reqEndPoint = breakerMw(reqEndPoint)
72  //现在我们可以通过 endPoint 发起请求了
73
74  req := struct{}{}
75  for i := 1; i <= 30; i++ {
76      if _, err = reqEndPoint(ctx, req); err != nil {
77          fmt.Println(err)
78      }
79  }
80}
81
82//通过传入的 实例地址  创建对应的请求endPoint
83func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
84  return func(ctx context.Context, request interface{}) (interface{}, error) {
85      fmt.Println("请求服务: ", instanceAddr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99"))
86      conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
87      if err != nil {
88          fmt.Println(err)
89          panic("connect error")
90      }
91
92      //追踪设置
93      reporter := http.NewReporter("http://localhost:9411/api/v2/spans") //追踪地址
94      defer reporter.Close()
95
96      zkTracer, err := opzipkin.NewTracer(reporter)     //新建追踪器
97      zkClientTrace := zipkin.GRPCClientTrace(zkTracer) //启动追踪器Client端
98
99      bookInfoRequest := grpctransport.NewClient(
100         conn,
101         "BookService",
102         "GetBookInfo",
103         func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil },
104         func(_ context.Context, out interface{}) (interface{}, error) {
105             return out, nil
106         },
107         book.BookInfo{},
108         zkClientTrace, //追踪客户端
109     ).Endpoint()
110
111     bookListRequest := grpctransport.NewClient(
112         conn,
113         "BookService",
114         "GetBookList",
115         func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil },
116         func(_ context.Context, out interface{}) (interface{}, error) {
117             return out, nil
118         },
119         book.BookList{},
120         zkClientTrace,
121     ).Endpoint()
122
123     parentSpan := zkTracer.StartSpan("bookCaller")
124     defer parentSpan.Flush()
125
126     ctx = opzipkin.NewContext(ctx, parentSpan)
127     infoRet, _ := bookInfoRequest(ctx, request)
128     bi := infoRet.(*book.BookInfo)
129     fmt.Println("获取书籍详情")
130     fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)
131
132     listRet, _ := bookListRequest(ctx, request)
133     bl := listRet.(*book.BookList)
134     fmt.Println("获取书籍列表")
135     for _, b := range bl.BookList {
136         fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName)
137     }
138
139     return nil, nil
140 }, nil, nil
141}
142
143

3、运行

1、启动etcd   

2、启动zipkin

3、运行Server端

4、运行Client端

go-kit实践之5:go-kit微服务请求跟踪实现

5、查看zipkin中的记录

访问http://localhost:9411/zipkin/ 

go-kit实践之5:go-kit微服务请求跟踪实现

之后就可以看到记录的数据。

go-kit实践之5:go-kit微服务请求跟踪实现

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全经验

LAMP环境搭建

2021-11-28 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索