上一篇实现了服务的负载均衡,本篇带来链路追踪。
关于链路追踪,大部分都是参考了谷歌的dapper论文:**https://bigbully.github.io/Dapper-translation/**。
通过论文总结,其中span的核心元素为:traceId,name,spanId,parentSpanId,其他则根据自身业务需要来定义即可。
链路追踪核心原理为通过一个全局的traceId作为依据,在调用服务时为每个服务分配spanId,并记录操作名name,在调用RPC服务时将带有这些属性的span随同请求一起传递,并在关键节点将span数据通过尽量小影响原服务性能的方式传递给我们自己的trace采集服务,采集服务存入数据并通过traceId以及spanId和parentSpanId的关系梳理出调用链并做图形化展示。这里给服务器传递span有很多中模式,其中包括:直接通过RPC服务调用,写入本地磁盘通过另外的进程读取磁盘数据写入远程服务器,写入缓存传输,发送消息等等方式,可以根据自己的需要选择,原则上是尽量少的影响服务本身性能。
本篇只带来客户端采集span的过程,trace服务器采集和分析展示链路的过程这里省略。
这里还是贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc
首先来看我这里的span定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1public class Span {
2 /**
3 * 全局唯一id
4 */
5 String traceId;
6 /**
7 * 操作名--此处取方法名
8 */
9 String operationName;
10 /**
11 * 当前spanId
12 */
13 String spanId;
14 /**
15 * 调用方spanId
16 */
17 String parentSpanId;
18 /**
19 * appCode
20 */
21 String appCode;
22 /**
23 * 当前机器ip
24 */
25 String localIp;
26 /**
27 * 目标机器ip
28 */
29 String remoteIp;
30 /**
31 * 时间戳,用于记录访问时间
32 */
33 long timestamp;
34}
35
36
上面是一些我定义的span属性,当然各位可以加一些自己需要用到的,比如exception记录等等,不过原则上这里span要尽量小,如果定义的过大会影响每次请求的传输数据量,对我们的服务性能造成影响。
在我们的comsumer中修改动态代理类,在发起远程调用之前,处理span相关内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1public <T> T getBean(final Class<T> clazz, final String appCode) {
2 return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz }, new InvocationHandler() {
3 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
4 // 获取服务器地址
5 String serverHost = getServer(appCode);
6 Span span = SpanBuilder.buildNewSpan(SpanHolder.get(), method.getName(), serverHost, appCode);
7 //// TODO: 2018/10/25 新启线程发起rpc调用远程链路追踪服务记录追踪日志 此处打日志代替
8 System.out.println("链路追踪,调用远程服务:" + JSON.toJSONString(span));
9 BaseRequestBO baseRequestBO = buildBaseBO(span, clazz.getName(), method, JSON.toJSONString(args[0]));
10 return JSON.parseObject(call(serverHost, JSON.toJSONString(baseRequestBO)), method.getReturnType());
11 }
12 });
13}
14
15
这里注释写的开启新线程调用rpc的方式传输数据,各位可以看需要自行修改,比如写入磁盘通过其他进程读取传输的性能往往会高于这种方式。
这里来看一下上面代码用到的SpanBuilder:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 1public class SpanBuilder {
2
3 /**
4 * 构造span
5 * @param parentSpan
6 * @return
7 * @throws UnknownHostException
8 */
9 public static Span buildNewSpan(Span parentSpan, String operationName, String serverIp, String appCode) throws UnknownHostException {
10 Span span = new Span();
11 span.setLocalIp(InetAddress.getLocalHost().getHostAddress());
12 if (parentSpan == null) {
13 span.setTraceId(ShortUUIDUtils.nextId());
14 span.setParentSpanId("0");
15 } else {
16 span.setTraceId(parentSpan.getTraceId());
17 span.setParentSpanId(parentSpan.getSpanId());
18 }
19 span.setTimestamp(System.currentTimeMillis());
20 span.setOperationName(operationName);
21 span.setRemoteIp(serverIp);
22 span.setAppCode(appCode);
23 span.setSpanId(ShortUUIDUtils.nextId());
24 return span;
25 }
26
27 /**
28 * 构建新的appCpde的Span
29 * @param span
30 * @param appCode
31 * @return
32 */
33 public static Span rebuildSpan(Span span, String appCode) {
34 Span newSpan = copy(span);
35 newSpan.setAppCode(appCode);
36 return newSpan;
37 }
38
39 /**
40 * 拷贝
41 * @param source
42 * @return
43 */
44 public static Span copy(Span source) {
45 if (source == null) {
46 return null;
47 }
48 Span span = new Span();
49 span.setTraceId(source.getTraceId());
50 span.setOperationName(source.getOperationName());
51 span.setSpanId(source.getSpanId());
52 span.setParentSpanId(source.getParentSpanId());
53 span.setAppCode(source.getAppCode());
54 span.setLocalIp(source.getLocalIp());
55 span.setRemoteIp(source.getRemoteIp());
56 span.setTimestamp(source.getTimestamp());
57 return span;
58 }
59}
60
61
这里其实就是简单的构造span,其中主要是traceId和spanId的生成,我这里用到了一个短码的生成器,这里就不贴代码了,可以自行去github上拉代码来看,或者直接用uuid也是可以的,这里只是需要保证不重复的基础上尽量短一点。
接下来是改造provider端的接收数据处理方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 1public void channelRead(ChannelHandlerContext ctx, Object msg) {
2 System.out.println("服务端收到请求:" + msg);
3 try {
4 // 解析出 类名+方法名+请求参数类型(方法签名)
5 BaseRequestBO baseRequestBO = JSON.parseObject(msg.toString(), BaseRequestBO.class);
6 // 放入span
7 SpanHolder.put(baseRequestBO.getSpan());
8 // 获取注册的服务
9 Object object = ProviderBeanHolder.getBean(baseRequestBO.getClazzName());
10 if (object == null) {
11 System.out.println("服务类未注册:" + baseRequestBO.getClazzName());
12 }
13 // 通过反射调用服务
14 Class paramType = Class.forName(baseRequestBO.getParamTypeName());
15 Method method = object.getClass().getDeclaredMethod(baseRequestBO.getMethodName(), paramType);
16 Object response = method.invoke(object, JSON.parseObject(baseRequestBO.getData(), paramType));
17 // 请求响应
18 ctx.writeAndFlush(JSON.toJSONString(response));
19 Span span = SpanBuilder.rebuildSpan(baseRequestBO.getSpan(), ProviderConstant.APP_CODE);
20 //// TODO: 2018/10/25 新启线程发起rpc调用远程链路追踪服务记录追踪日志 此处打日志代替
21 System.out.println("链路追踪,远程服务响应:" + JSON.toJSONString(span));
22 } catch (Exception e) {
23 System.out.println("服务异常" + e);
24 }
25}
26
27
这里获取到传递来的span,之后放入本地线程变量中记录,在这个服务处理中继续调用别的provider时,comsumer代码中可以取到这一个span,再生成新的span时,这个span的traceId会被沿用,spanId则会被设置成为下一个span的parentSpanId,这样一级一级的传递就形成了调用链。
到这里主要的代码其实就完成了,大家可以直接去github拉代码来运行。这里补充几点:
- 由于span在服务内部通过本地线程变量传递,会造成服务中起新线程时链路会丢失,这里可以通过其他方式来处理,比如存入第三方缓存等其他方式来解决.
- span的采集节点,这里采用了在consumer发起调用前和provider处理完成后的两个节点进行采集,是综合考虑请求成功/超时/异常后的一种方案。各位也可以在别的节点进行采集,比如consumer收到响应、provider收到请求等等节点,或者都收集,然后在trace服务端自行分化处理。