【RPC】一步一步实现基于netty+zookeeper的RPC框架(六)

释放双眼,带上耳机,听听看~!

上一篇实现了服务的限流,本篇来实现服务的熔断。

    首先还是贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc

    在现在的微服务架构下,由于服务众多,调用链路长,很可能其中某个服务有时会出现异常导致服务不可用,例如发布导致bug、机房网络问题等,这种时候如果没有一种保护机制的话,很容易引起服务雪崩。这个时候就引入了服务熔断,本篇就来实现一个简单的熔断器。

    首先我们定义一下熔断的状态,如下图:

此图中标注了我们熔断器的三种状态:全开 半开 关闭
他们的流转过程为:

  1. 初始时为关闭状态。
  2. 当遇到错误到我们预设的阈值比例后转换为全开状态。
  3. 经过一定时间后,熔断器变为半开状态。
  4. 半开状态时允许通过一个请求,此请求成功则转为关闭状态,失败则变为全开状态。

接下来看代码实现:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
1public class CircuitUtil {
2
3    // 达到默认请求基数才判断开启熔断
4    private static final int DEFAULT_FAIL_COUNT = 5;
5    // 半开转换为全开时间(毫秒数)
6    private static final long DEFAULT_HALF_OPEN_TRANSFER_TIME = 10000;
7    // 默认失败比例值开启熔断
8    private static final double DEFAULT_FAIL_RATE = 0.8D;
9    // 计数 pair左边成功,右边失败
10    private Map<String, Pair<AtomicInteger, AtomicInteger>> counter = new ConcurrentHashMap<>();
11    // 熔断器当前状态
12    private volatile CircuitStatusEnum status = CircuitStatusEnum.CLOSE;
13    // 最后一次处于全开状态的时间
14    private volatile long timestamp;
15    private final Semaphore semaphore = new Semaphore(1);
16
17    /**
18     * 简易熔断流程
19     * 1:判断是否打开熔断,打开则直接返回指定信息
20     * 2:执行逻辑,成功失败都进行标记 markSuccess markFail
21     *
22     * @param caller
23     * @return
24     * @throws Throwable
25     */
26    public String doCircuit(String methodName, Caller caller, String serverHost, String param) throws Throwable {
27        if (isOpen()) {
28            return "{\"code\":-1,\"message\":\"circuit break\"}";
29        }
30        String result;
31        result = caller.call(serverHost, param);
32        if ("exception".equals(result)) {
33            markFail(methodName);
34            return "{\"code\":-1,\"message\":\"exception request\"}";
35        }
36        markSuccess(methodName);
37        return result;
38    }
39
40    /**
41     * 判断熔断是否打开 全开状态是判断是否转为半开并放过一个请求
42     *
43     * @return
44     */
45    private boolean isOpen() {
46        boolean openFlag = true;
47        // 关闭
48        if (status.equals(CircuitStatusEnum.CLOSE)) {
49            openFlag = false;
50        }
51        // 全开
52        if (status.equals(CircuitStatusEnum.OPEN)) {
53            // 未到半开时间,返回打开
54            if (System.currentTimeMillis() - timestamp < DEFAULT_HALF_OPEN_TRANSFER_TIME) {
55                return openFlag;
56            }
57            // 已到半开时间,改为半开状态,通过一个请求
58            if (semaphore.tryAcquire()) {
59                status = CircuitStatusEnum.HALF_OPEN;
60                timestamp = System.currentTimeMillis();
61                openFlag = false;
62                semaphore.release();
63            }
64        }
65        return openFlag;
66    }
67
68    /**
69     * 标记成功
70     * 1.半开状态,成功一次转换为关闭状态
71     * 2.其他情况增加成功记录次数
72     *
73     * @param operation
74     */
75    private void markSuccess(String operation) {
76        Pair<AtomicInteger, AtomicInteger> pair = counter.get(operation);
77        if (pair == null) {
78            counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger()));
79        }
80        // 半开状态,成功一次转换为关闭状态
81        if (status == CircuitStatusEnum.HALF_OPEN) {
82            status = CircuitStatusEnum.CLOSE;
83            counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger()));
84        } else {
85            counter.get(operation).getKey().incrementAndGet();
86        }
87    }
88
89    /**
90     * 标记失败
91     * 1.半开状态,失败一次回退到打开状态
92     * 2.其他状态判断错误比例决定是否打开熔断
93     *
94     * @param operation
95     */
96    private void markFail(String operation) {
97        // 半开状态失败变更为全开,否则计数判断
98        if (status == CircuitStatusEnum.HALF_OPEN) {
99            status = CircuitStatusEnum.OPEN;
100            timestamp = System.currentTimeMillis();
101        } else {
102            Pair<AtomicInteger, AtomicInteger> pair = counter.get(operation);
103            if (pair == null) {
104                counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger()));
105                pair = counter.get(operation);
106            }
107            int failCount = pair.getValue().incrementAndGet();
108            int successCount = pair.getKey().get();
109            int totalCount = failCount + successCount;
110            double failRate = (double) failCount / (double) totalCount;
111            if (totalCount >= DEFAULT_FAIL_COUNT && failRate > DEFAULT_FAIL_RATE) {
112                status = CircuitStatusEnum.OPEN;
113                timestamp = System.currentTimeMillis();
114            }
115        }
116    }
117}
118
119

然后是改造在我们的RPC框架中引入熔断,修改类RPCConsumer的如下方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1    public <T> T getBean(final Class<T> clazz, final String appCode) {
2        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
3            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
4                // 获取服务器地址
5                String serverHost = getServer(appCode);
6                Span span = SpanBuilder.buildNewSpan(SpanHolder.get(), method.getName(), serverHost, appCode);
7                //// TODO: 2018/10/25 新启线程发起rpc调用远程链路追踪服务记录追踪日志 此处打日志代替
8                System.out.println("链路追踪,调用远程服务:" + JSON.toJSONString(span));
9                BaseRequestBO baseRequestBO = buildBaseBO(span, clazz.getName(), method, JSON.toJSONString(args[0]));
10                String result = circuitUtil.doCircuit(method.getName(), remoteCaller, serverHost, JSON.toJSONString(baseRequestBO));
11                return JSON.parseObject(result, method.getReturnType());
12            }
13        });
14    }
15
16

其中的remoteCaller是我们本次修改的抽象出的远程调用,代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1public interface Caller {
2
3    /**
4     * 调用
5     * @param serverHost
6     * @param param
7     * @return
8     */
9    String call(String serverHost, String param) ;
10}
11public class RemoteCaller implements Caller {
12    /**
13     * netty客户端
14     */
15    private NettyClient nettyClient = new NettyClient();
16
17    /**
18     * 远程调用
19     *
20     * @param serverHost
21     * @param param
22     * @return
23     */
24    @Override
25    public String call(String serverHost, String param) {
26        try {
27            if (serverHost == null) {
28                System.out.println("远程调用错误:当前无服务提供者");
29                return "{\"code\":404,\"message\":\"no provider\"}";
30            }
31            // 连接netty,请求并接收响应
32            RpcClientNettyHandler clientHandler = new RpcClientNettyHandler();
33            clientHandler.setParam(param);
34            nettyClient.initClient(serverHost, clientHandler);
35            String result = clientHandler.process();
36            System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverHost, param, result));
37            return result;
38        } catch (Exception e) {
39            System.out.println("远程服务调用失败:" + e);
40            return "error";
41        }
42    }
43}
44
45

接下来修改HelloServiceImpl的实现:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1public class HelloServiceImpl implements HelloService {
2
3    @Override
4    public HelloResponse hello(HelloRequest request) {
5        System.out.println("服务端收到请求,序列号:" + request.getSeq());
6        if (request.getSeq() < 0) {
7            throw new RuntimeException("seq error");
8        }
9        HelloResponse response = new HelloResponse();
10        response.setCode(200);
11        response.setMessage("success:" + request.getSeq());
12        return response;
13    }
14}
15
16

此处加入在传入的seq为负值时抛出异常。接下来启动服务器和客户端进行调用:


1
2
3
4
5
6
1客户端传入-1时返回:{"code":-1,"message":"exception request"}
2连续五次后返回:{"code":-1,"message":"circuit break"}
3此时处于熔断状态,即便传入正值1,也会返回:{"code":-1,"message":"circuit break"}
4等10S后在传入1时返回正常,服务恢复可正常使用。
5
6

在熔断后我们可以做相应的降级处理,比如一个远程调用,在发现对方服务响应大量超时时,我们可以将对方熔断,之后降级成为替代方案去继续执行我们的方法,就不会引起我们自己的服务也不可用了,达到我们自己服务的高可用的目的。

上述的代码都在github上了,各位可以下载后用docker启动一个zookeeper即可运行。

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Docker安装gitlab

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索