【RPC】一步一步实现基于netty+zookeeper的RPC框架(四)

释放双眼,带上耳机,听听看~!

上一篇实现了服务的负载均衡,本篇带来链路追踪。

关于链路追踪,大部分都是参考了谷歌的dapper论文:**https://bigbully.github.io/Dapper-translation/**。

    通过论文总结,其中span的核心元素为:traceId,name,spanId,parentSpanId,其他则根据自身业务需要来定义即可。

    链路追踪核心原理为通过一个全局的traceId作为依据,在调用服务时为每个服务分配spanId,并记录操作名name,在调用RPC服务时将带有这些属性的span随同请求一起传递,并在关键节点将span数据通过尽量小影响原服务性能的方式传递给我们自己的trace采集服务,采集服务存入数据并通过traceId以及spanId和parentSpanId的关系梳理出调用链并做图形化展示。这里给服务器传递span有很多中模式,其中包括:直接通过RPC服务调用,写入本地磁盘通过另外的进程读取磁盘数据写入远程服务器,写入缓存传输,发送消息等等方式,可以根据自己的需要选择,原则上是尽量少的影响服务本身性能。

    本篇只带来客户端采集span的过程,trace服务器采集和分析展示链路的过程这里省略。

这里还是贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc

首先来看我这里的span定义:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1public class Span {
2    /**
3     * 全局唯一id
4     */
5    String traceId;
6    /**
7     * 操作名--此处取方法名
8     */
9    String operationName;
10    /**
11     * 当前spanId
12     */
13    String spanId;
14    /**
15     * 调用方spanId
16     */
17    String parentSpanId;
18    /**
19     * appCode
20     */
21    String appCode;
22    /**
23     * 当前机器ip
24     */
25    String localIp;
26    /**
27     * 目标机器ip
28     */
29    String remoteIp;
30    /**
31     * 时间戳,用于记录访问时间
32     */
33    long   timestamp;
34}
35
36

上面是一些我定义的span属性,当然各位可以加一些自己需要用到的,比如exception记录等等,不过原则上这里span要尽量小,如果定义的过大会影响每次请求的传输数据量,对我们的服务性能造成影响。

在我们的comsumer中修改动态代理类,在发起远程调用之前,处理span相关内容:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1public <T> T getBean(final Class<T> clazz, final String appCode) {
2    return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz }, new InvocationHandler() {
3        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
4            // 获取服务器地址
5            String serverHost = getServer(appCode);
6            Span span = SpanBuilder.buildNewSpan(SpanHolder.get(), method.getName(), serverHost, appCode);
7            //// TODO: 2018/10/25 新启线程发起rpc调用远程链路追踪服务记录追踪日志 此处打日志代替
8            System.out.println("链路追踪,调用远程服务:" + JSON.toJSONString(span));
9            BaseRequestBO baseRequestBO = buildBaseBO(span, clazz.getName(), method, JSON.toJSONString(args[0]));
10            return JSON.parseObject(call(serverHost, JSON.toJSONString(baseRequestBO)), method.getReturnType());
11        }
12    });
13}
14
15

这里注释写的开启新线程调用rpc的方式传输数据,各位可以看需要自行修改,比如写入磁盘通过其他进程读取传输的性能往往会高于这种方式。

这里来看一下上面代码用到的SpanBuilder:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
1public class SpanBuilder {
2
3    /**
4     * 构造span
5     * @param parentSpan
6     * @return
7     * @throws UnknownHostException
8     */
9    public static Span buildNewSpan(Span parentSpan, String operationName, String serverIp, String appCode) throws UnknownHostException {
10        Span span = new Span();
11        span.setLocalIp(InetAddress.getLocalHost().getHostAddress());
12        if (parentSpan == null) {
13            span.setTraceId(ShortUUIDUtils.nextId());
14            span.setParentSpanId("0");
15        } else {
16            span.setTraceId(parentSpan.getTraceId());
17            span.setParentSpanId(parentSpan.getSpanId());
18        }
19        span.setTimestamp(System.currentTimeMillis());
20        span.setOperationName(operationName);
21        span.setRemoteIp(serverIp);
22        span.setAppCode(appCode);
23        span.setSpanId(ShortUUIDUtils.nextId());
24        return span;
25    }
26
27    /**
28     * 构建新的appCpde的Span
29     * @param span
30     * @param appCode
31     * @return
32     */
33    public static Span rebuildSpan(Span span, String appCode) {
34        Span newSpan = copy(span);
35        newSpan.setAppCode(appCode);
36        return newSpan;
37    }
38
39    /**
40     * 拷贝
41     * @param source
42     * @return
43     */
44    public static Span copy(Span source) {
45        if (source == null) {
46            return null;
47        }
48        Span span = new Span();
49        span.setTraceId(source.getTraceId());
50        span.setOperationName(source.getOperationName());
51        span.setSpanId(source.getSpanId());
52        span.setParentSpanId(source.getParentSpanId());
53        span.setAppCode(source.getAppCode());
54        span.setLocalIp(source.getLocalIp());
55        span.setRemoteIp(source.getRemoteIp());
56        span.setTimestamp(source.getTimestamp());
57        return span;
58    }
59}
60
61

这里其实就是简单的构造span,其中主要是traceId和spanId的生成,我这里用到了一个短码的生成器,这里就不贴代码了,可以自行去github上拉代码来看,或者直接用uuid也是可以的,这里只是需要保证不重复的基础上尽量短一点。

接下来是改造provider端的接收数据处理方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1public void channelRead(ChannelHandlerContext ctx, Object msg) {
2    System.out.println("服务端收到请求:" + msg);
3    try {
4        // 解析出 类名+方法名+请求参数类型(方法签名)
5        BaseRequestBO baseRequestBO = JSON.parseObject(msg.toString(), BaseRequestBO.class);
6        // 放入span
7        SpanHolder.put(baseRequestBO.getSpan());
8        // 获取注册的服务
9        Object object = ProviderBeanHolder.getBean(baseRequestBO.getClazzName());
10        if (object == null) {
11            System.out.println("服务类未注册:" + baseRequestBO.getClazzName());
12        }
13        // 通过反射调用服务
14        Class paramType = Class.forName(baseRequestBO.getParamTypeName());
15        Method method = object.getClass().getDeclaredMethod(baseRequestBO.getMethodName(), paramType);
16        Object response = method.invoke(object, JSON.parseObject(baseRequestBO.getData(), paramType));
17        // 请求响应
18        ctx.writeAndFlush(JSON.toJSONString(response));
19        Span span = SpanBuilder.rebuildSpan(baseRequestBO.getSpan(), ProviderConstant.APP_CODE);
20        //// TODO: 2018/10/25 新启线程发起rpc调用远程链路追踪服务记录追踪日志 此处打日志代替
21        System.out.println("链路追踪,远程服务响应:" + JSON.toJSONString(span));
22    } catch (Exception e) {
23        System.out.println("服务异常" + e);
24    }
25}
26
27

这里获取到传递来的span,之后放入本地线程变量中记录,在这个服务处理中继续调用别的provider时,comsumer代码中可以取到这一个span,再生成新的span时,这个span的traceId会被沿用,spanId则会被设置成为下一个span的parentSpanId,这样一级一级的传递就形成了调用链。

到这里主要的代码其实就完成了,大家可以直接去github拉代码来运行。这里补充几点:

  1. 由于span在服务内部通过本地线程变量传递,会造成服务中起新线程时链路会丢失,这里可以通过其他方式来处理,比如存入第三方缓存等其他方式来解决.
  2. span的采集节点,这里采用了在consumer发起调用前和provider处理完成后的两个节点进行采集,是综合考虑请求成功/超时/异常后的一种方案。各位也可以在别的节点进行采集,比如consumer收到响应、provider收到请求等等节点,或者都收集,然后在trace服务端自行分化处理。

给TA打赏
共{{data.count}}人
人已打赏
安全经验

如何避免Adsense违规封号

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索