上一篇实现了服务的限流,本篇来实现服务的熔断。
首先还是贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc
在现在的微服务架构下,由于服务众多,调用链路长,很可能其中某个服务有时会出现异常导致服务不可用,例如发布导致bug、机房网络问题等,这种时候如果没有一种保护机制的话,很容易引起服务雪崩。这个时候就引入了服务熔断,本篇就来实现一个简单的熔断器。
首先我们定义一下熔断的状态,如下图:
此图中标注了我们熔断器的三种状态:全开 半开 关闭。
他们的流转过程为:
- 初始时为关闭状态。
- 当遇到错误到我们预设的阈值比例后转换为全开状态。
- 经过一定时间后,熔断器变为半开状态。
- 半开状态时允许通过一个请求,此请求成功则转为关闭状态,失败则变为全开状态。
接下来看代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119 1public class CircuitUtil {
2
3 // 达到默认请求基数才判断开启熔断
4 private static final int DEFAULT_FAIL_COUNT = 5;
5 // 半开转换为全开时间(毫秒数)
6 private static final long DEFAULT_HALF_OPEN_TRANSFER_TIME = 10000;
7 // 默认失败比例值开启熔断
8 private static final double DEFAULT_FAIL_RATE = 0.8D;
9 // 计数 pair左边成功,右边失败
10 private Map<String, Pair<AtomicInteger, AtomicInteger>> counter = new ConcurrentHashMap<>();
11 // 熔断器当前状态
12 private volatile CircuitStatusEnum status = CircuitStatusEnum.CLOSE;
13 // 最后一次处于全开状态的时间
14 private volatile long timestamp;
15 private final Semaphore semaphore = new Semaphore(1);
16
17 /**
18 * 简易熔断流程
19 * 1:判断是否打开熔断,打开则直接返回指定信息
20 * 2:执行逻辑,成功失败都进行标记 markSuccess markFail
21 *
22 * @param caller
23 * @return
24 * @throws Throwable
25 */
26 public String doCircuit(String methodName, Caller caller, String serverHost, String param) throws Throwable {
27 if (isOpen()) {
28 return "{\"code\":-1,\"message\":\"circuit break\"}";
29 }
30 String result;
31 result = caller.call(serverHost, param);
32 if ("exception".equals(result)) {
33 markFail(methodName);
34 return "{\"code\":-1,\"message\":\"exception request\"}";
35 }
36 markSuccess(methodName);
37 return result;
38 }
39
40 /**
41 * 判断熔断是否打开 全开状态是判断是否转为半开并放过一个请求
42 *
43 * @return
44 */
45 private boolean isOpen() {
46 boolean openFlag = true;
47 // 关闭
48 if (status.equals(CircuitStatusEnum.CLOSE)) {
49 openFlag = false;
50 }
51 // 全开
52 if (status.equals(CircuitStatusEnum.OPEN)) {
53 // 未到半开时间,返回打开
54 if (System.currentTimeMillis() - timestamp < DEFAULT_HALF_OPEN_TRANSFER_TIME) {
55 return openFlag;
56 }
57 // 已到半开时间,改为半开状态,通过一个请求
58 if (semaphore.tryAcquire()) {
59 status = CircuitStatusEnum.HALF_OPEN;
60 timestamp = System.currentTimeMillis();
61 openFlag = false;
62 semaphore.release();
63 }
64 }
65 return openFlag;
66 }
67
68 /**
69 * 标记成功
70 * 1.半开状态,成功一次转换为关闭状态
71 * 2.其他情况增加成功记录次数
72 *
73 * @param operation
74 */
75 private void markSuccess(String operation) {
76 Pair<AtomicInteger, AtomicInteger> pair = counter.get(operation);
77 if (pair == null) {
78 counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger()));
79 }
80 // 半开状态,成功一次转换为关闭状态
81 if (status == CircuitStatusEnum.HALF_OPEN) {
82 status = CircuitStatusEnum.CLOSE;
83 counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger()));
84 } else {
85 counter.get(operation).getKey().incrementAndGet();
86 }
87 }
88
89 /**
90 * 标记失败
91 * 1.半开状态,失败一次回退到打开状态
92 * 2.其他状态判断错误比例决定是否打开熔断
93 *
94 * @param operation
95 */
96 private void markFail(String operation) {
97 // 半开状态失败变更为全开,否则计数判断
98 if (status == CircuitStatusEnum.HALF_OPEN) {
99 status = CircuitStatusEnum.OPEN;
100 timestamp = System.currentTimeMillis();
101 } else {
102 Pair<AtomicInteger, AtomicInteger> pair = counter.get(operation);
103 if (pair == null) {
104 counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger()));
105 pair = counter.get(operation);
106 }
107 int failCount = pair.getValue().incrementAndGet();
108 int successCount = pair.getKey().get();
109 int totalCount = failCount + successCount;
110 double failRate = (double) failCount / (double) totalCount;
111 if (totalCount >= DEFAULT_FAIL_COUNT && failRate > DEFAULT_FAIL_RATE) {
112 status = CircuitStatusEnum.OPEN;
113 timestamp = System.currentTimeMillis();
114 }
115 }
116 }
117}
118
119
然后是改造在我们的RPC框架中引入熔断,修改类RPCConsumer的如下方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1 public <T> T getBean(final Class<T> clazz, final String appCode) {
2 return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
3 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
4 // 获取服务器地址
5 String serverHost = getServer(appCode);
6 Span span = SpanBuilder.buildNewSpan(SpanHolder.get(), method.getName(), serverHost, appCode);
7 //// TODO: 2018/10/25 新启线程发起rpc调用远程链路追踪服务记录追踪日志 此处打日志代替
8 System.out.println("链路追踪,调用远程服务:" + JSON.toJSONString(span));
9 BaseRequestBO baseRequestBO = buildBaseBO(span, clazz.getName(), method, JSON.toJSONString(args[0]));
10 String result = circuitUtil.doCircuit(method.getName(), remoteCaller, serverHost, JSON.toJSONString(baseRequestBO));
11 return JSON.parseObject(result, method.getReturnType());
12 }
13 });
14 }
15
16
其中的remoteCaller是我们本次修改的抽象出的远程调用,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 1public interface Caller {
2
3 /**
4 * 调用
5 * @param serverHost
6 * @param param
7 * @return
8 */
9 String call(String serverHost, String param) ;
10}
11public class RemoteCaller implements Caller {
12 /**
13 * netty客户端
14 */
15 private NettyClient nettyClient = new NettyClient();
16
17 /**
18 * 远程调用
19 *
20 * @param serverHost
21 * @param param
22 * @return
23 */
24 @Override
25 public String call(String serverHost, String param) {
26 try {
27 if (serverHost == null) {
28 System.out.println("远程调用错误:当前无服务提供者");
29 return "{\"code\":404,\"message\":\"no provider\"}";
30 }
31 // 连接netty,请求并接收响应
32 RpcClientNettyHandler clientHandler = new RpcClientNettyHandler();
33 clientHandler.setParam(param);
34 nettyClient.initClient(serverHost, clientHandler);
35 String result = clientHandler.process();
36 System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverHost, param, result));
37 return result;
38 } catch (Exception e) {
39 System.out.println("远程服务调用失败:" + e);
40 return "error";
41 }
42 }
43}
44
45
接下来修改HelloServiceImpl的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1public class HelloServiceImpl implements HelloService {
2
3 @Override
4 public HelloResponse hello(HelloRequest request) {
5 System.out.println("服务端收到请求,序列号:" + request.getSeq());
6 if (request.getSeq() < 0) {
7 throw new RuntimeException("seq error");
8 }
9 HelloResponse response = new HelloResponse();
10 response.setCode(200);
11 response.setMessage("success:" + request.getSeq());
12 return response;
13 }
14}
15
16
此处加入在传入的seq为负值时抛出异常。接下来启动服务器和客户端进行调用:
1
2
3
4
5
6 1客户端传入-1时返回:{"code":-1,"message":"exception request"}
2连续五次后返回:{"code":-1,"message":"circuit break"}
3此时处于熔断状态,即便传入正值1,也会返回:{"code":-1,"message":"circuit break"}
4等10S后在传入1时返回正常,服务恢复可正常使用。
5
6
在熔断后我们可以做相应的降级处理,比如一个远程调用,在发现对方服务响应大量超时时,我们可以将对方熔断,之后降级成为替代方案去继续执行我们的方法,就不会引起我们自己的服务也不可用了,达到我们自己服务的高可用的目的。
上述的代码都在github上了,各位可以下载后用docker启动一个zookeeper即可运行。