【RPC】一步一步实现基于netty+zookeeper的RPC框架(三)

释放双眼,带上耳机,听听看~!

上一篇写完咱们已经具备服务注册发现/通信/通过接口调用功能了,本篇带来负载均衡策略。

RPC框架中,负载均衡策略提供,其实就是根据配置,选择不同的负载策略,常见的有随机/轮询/权重几种负载策略,本篇就带大家来实现它。

这里还是贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc

首先这里主要用到了设计模式中的策略模式,定义一个负载策略处理器:


1
2
3
4
5
6
7
8
9
10
11
1public interface BalanceProcessor {
2
3    /**
4     * 处理接口
5     * @param urlList
6     * @return
7     */
8    String process(String appCode, List<String> urlList);
9}
10
11

下面来实现轮询模式的策略处理:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1public class RoundRobinBalanceProcessor implements BalanceProcessor {
2
3    private ConcurrentMap<String, AtomicInteger> counterMap = new ConcurrentHashMap<String, AtomicInteger>();
4
5
6    public String process(String appCode, List<String> urlList) {
7        AtomicInteger counter = counterMap.get(appCode);
8        if (counter == null) {
9            counterMap.putIfAbsent(appCode, new AtomicInteger());
10        }
11        counter = counterMap.get(appCode);
12        int current = getAndIncrement(counter);
13
14        return urlList.get(current % urlList.size());
15    }
16
17    /**
18     * 通过CAS获取不超过最大数的顺序int
19     *
20     * @param ai
21     * @return
22     */
23    private Integer getAndIncrement(AtomicInteger ai) {
24        while (true) {
25            int current = ai.get();
26            int next = current >= Integer.MAX_VALUE ? 0 : current + 1;
27            if (ai.compareAndSet(current, next)) {
28                return current;
29            }
30        }
31    }
32}
33
34

这边代码比较简单,主要用到了java并发包的一些东西,通过一个自增数对数组长度取余即可以实现轮询模式。另外这里默认我们线上的服务器数量不是会经常变化的,所以urlList的内容其实是很少变化的,轮询在大部分时间内是不会被破坏的。

下面来看随机模式,这个很简单:


1
2
3
4
5
6
7
8
1public class RandomProcessor implements BalanceProcessor {
2
3    public String process(String appCode, List<String> urlList) {
4        return urlList.get((int) (Math.random() * urlList.size()));
5    }
6}
7
8

接下来来看策略上下文处理:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1public class BalanceContext {
2
3    private BalanceProcessor roundRobinProcessor = new RoundRobinBalanceProcessor();
4    private BalanceProcessor randomProcessor = new RandomProcessor();
5
6    /**
7     * 获取URL
8     *
9     * @param urlList
10     * @return
11     */
12    public String getUrl(String appCode, List<String> urlList, BalanceMode balanceMode) {
13        switch (balanceMode) {
14            case ROUND_ROBIN:
15                return roundRobinProcessor.process(appCode, urlList);
16            case RANDOM:
17                return randomProcessor.process(appCode, urlList);
18            default:
19                return null;
20        }
21    }
22}
23
24

然后是改造原有获取url的部分代码,调用我们的策略处理类:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1/**
2 * 负载上下文处理器
3 */
4private BalanceContext balanceContext = new BalanceContext();
5/** 负载模式,后续引入spring则从配置文件中读取 */
6private static String balanceMode = "RANDOM";
7
8/**
9 * 获取URL
10 *
11 * @param appCode
12 * @return
13 */
14public String getUrl(String appCode) {
15    // 初始化url
16    if (urlList.size() == 0) {
17        initUrlList(appCode);
18    }
19    // 随机返回一条,此处以后优化为负载均衡策略
20    if (urlList.size() > 0) {
21        return balanceContext.getUrl(appCode, urlList, BalanceMode.valueOf(balanceMode));
22    } else {
23        System.out.println("目前没有服务提供者");
24        return null;
25    }
26}
27
28

之后大家可以自行测试一下,采用轮询模式的话,获取到的处理服务器会依次轮询所有注册的服务,而采用随机模式则会随机选中服务器进行处理。这里就不贴测试结果了。

另外由于时间关系这里没有写权重的策略,这里先说一下原理,其实很简单的。

权重模式,根据配置,比如5台机器,配置权重依次为40,10,15,25,10.那么只需要在总数100中去随机一个随机数,命中了1-40则选取机器1,命中了41-50则选取机器2,其他同理。另外这里针对可能临时加机器的情况,配置没有及时更新进去的话,对于新机器可以设置默认权重,比如这时新加了一台机器,默认权重设置为10的话,则当前权重算法变为在总数110中随机,随机到101-110则会选取新增加的机器去处理请求即可。

到这里就实现了RPC框架的负载均衡.

后续将一步一步实现调用链路Trace记录/限流等功能,欢迎持续关注!

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense广告CPC(单元点击价格)为什么会减少??

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索