在接入前,先自定义一组传输对象,而不是原来传输一个Invocation,返回则是一个String,这里需要用到netty的编解码器,当然这里可以用netty已经实现好的对象编解码、第三方的Probuff编解码器,而如果想要实现自己的传输协议,就要继承netty的编解码,他可以用来自定义解析对象以及封装对象,这里对对象处理采用本来是采用gson,但是有很多bug,便用了另一种进行序列化,这样自定义协议的话,可以做很多其他的处理,比如协议的魔数、格式校验等等,自定义协议的处理
定一个序列化接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 1/**
2 * @author: lele
3 * @date: 2019/11/19 下午5:15
4 * 对象-字节转换接口
5 */
6public interface RpcSerializer {
7 /**
8 * 序列化
9 * @param target
10 * @return
11 */
12 byte[] serialize(Object target);
13
14 /**
15 * 反序列化
16 * @param target
17 * @param clazz
18 * @param <T>
19 * @return
20 * @throws Exception
21 */
22 <T> T deserialize(byte[] target,Class<T> clazz) throws Exception;
23}
24
25
具体的序列化实现类,需要引入protostuff-stuff,protostuff-runtime依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 1public class ProtobufSerializer {
2 private static Map<Class, Schema> schemaMap = new HashMap<Class, Schema>();
3
4 // objenesis是一个小型Java类库用来实例化一个特定class的对象。
5 private static Objenesis objenesis = new ObjenesisStd(true);
6
7 // 存储模式对象映射
8 private static Schema getSchema(Class cls) {
9 Schema schema = schemaMap.get(cls);
10 if (null == schema) {
11 schema = RuntimeSchema.createFrom(cls);
12 if (null != schema) {
13 schemaMap.put(cls, schema);
14 }
15 }
16 return schema;
17 }
18
19 public byte[] serialize(Object target) {
20 LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
21 Class cls = target.getClass();
22
23 try {
24 Schema schema = getSchema(cls);
25 byte[] bytes = ProtobufIOUtil.toByteArray(target, schema, buffer);
26 return bytes;
27 } catch (Exception e) {
28 throw new IllegalStateException(e);
29 } finally {
30 buffer.clear();
31 }
32 }
33
34 public <T> T deserialize(byte[] target, Class<T> clazz) throws Exception {
35 try {
36 T instance = objenesis.newInstance(clazz);
37 Schema schema = getSchema(clazz);
38 ProtobufIOUtil.mergeFrom(target, instance, schema);
39 return instance;
40 } catch (Exception e) {
41 throw new IllegalStateException(e);
42 }
43 }
44
45//单例
46 private static class Holder {
47 private static final ProtobufSerializer j = new ProtobufSerializer();
48 }
49
50 public static ProtobufSerializer getInstance() {
51 return ProtobufSerializer.Holder.j;
52 }
53}
54
然后定义请求/返回的对象,这里定义的requestID用于标记该次请求,作用往后再谈
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 1/**
2 * @author: lele
3 * @date: 2019/11/15 下午7:01
4 * 封装调用方所想调用的远程方法信息
5 */
6@Data
7@AllArgsConstructor
8public class RpcRequest {
9
10 private String requestId;
11
12 private String interfaceName;
13
14 private String methodName;
15
16 private Object[] params;
17 //防止重载
18 private Class[] paramsTypes;
19}
20
21
22
23/**
24 * @author: lele
25 * @date: 2019/11/19 下午5:12
26 */
27@Data
28public class RpcResponse {
29 private String requestId;
30
31 private Object result;
32
33 private String error;
34}
35
定义处理对象的编解码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 1/**
2 * @author: lele
3 * @date: 2019/11/19 下午5:16
4 * 把字节转为实体类,如byte->request供后续处理
5 */
6public class RpcDecoder extends ByteToMessageDecoder {
7 //处理的对象
8 private Class<?> target;
9
10 private ProtobufSerializer serializer=ProtobufSerializer.getInstance();
11
12 public RpcDecoder(Class<?> target) {
13 this.target = target;
14 }
15
16 /**
17 * byte转实体
18 */
19 @Override
20 protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
21 System.out.println("收到字节");
22 //如果小于一个int的长度,不作处理
23 if (byteBuf.readableBytes() < 4) {
24 return;
25 }
26 //获取数据长度
27 int dataLength = byteBuf.readInt();
28 //写入byte数组
29 byte[] data = new byte[dataLength];
30 byteBuf.readBytes(data);
31 //解码转成对象
32 Object res = serializer.deserialize(data, target);
33 //给后面的handler处理
34 list.add(res);
35
36 }
37}
38
39
40
41/**
42 * @author: lele
43 * @date: 2019/11/19 下午5:16
44 * 把实体变为字节,可用于req->byte、response->byte
45 */
46public class RpcEncoder extends MessageToByteEncoder {
47 //处理的对象
48 private Class<?> entity;
49 ProtobufSerializer serializer=ProtobufSerializer.getInstance();
50
51 public RpcEncoder(Class<?> entity) {
52 this.entity = entity;
53 }
54 @Override
55 protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
56 System.out.println("转成字节");
57 if (entity.equals(o.getClass())) {
58 byte[] data=serializer.serialize(o);
59 //写入消息长度,这里还可以写入版本号、魔数等协议信息
60 byteBuf.writeInt(data.length);
61 //写入消息主体
62 byteBuf.writeBytes(data);
63 }
64 }
65}
66
67
这样,传输对象及其序列化、相应的编解码器就写好了,把原来自带的对象编解码器替换掉即可
而以前那种{interfaceName:{url:具体实现类}的zk存储方式不适合具体的业务处理,以接口为粒度不方便管理,而且现有的设计是每请求一次,就会新建一个client,不能复用,造成资源浪费,然后client如果保持和以接口获取的服务进行链接,当请求的接口时,client就无法复用,比如下图,client可以与serverA保持链接后,访问interfaceC,interfaceA都是可以的,如果调用interfaceB,就不能保持请求
但如果以client和对应服务来写,则不会出现上述情况,也可以保持链接以复用
最后设计为/服务名/url的格式,这次更改的结果是,相应的获取可用地址、注册服务的格式也要进行更改。
接下来就是接入spring了,有了上面的基础,这时候我们可以仿feign的功能,把某一个包下带有某个注解的接口注入spring中,同时为这些接口生成代理,当执行这些接口的方法时,进行动态代理访问远端接口
定义作用在要代理的接口上,这里的name为服务名,zk则在注册服务改为——/服务名/ip,服务端通过传来的接口名通过反射获取类,或者通过给spring托管获取其class(下面server端就是这样处理)
1
2
3
4
5
6
7
8
9
10
11
12
13 1/**
2 * @author: lele
3 * @date: 2019/11/20 下午2:44
4 */
5@Retention(RetentionPolicy.RUNTIME)
6@Target(ElementType.TYPE)
7
8//用于接口上,name为服务名,zk则在注册服务改为 服务名/ip,服务端通过传来的接口名通过反射获取类,或者通过给spring托管获取其class
9public @interface RpcStudyClient {
10
11 String name();
12}
13
流程为:自定义一个可以动态注册bean的类,然后通过获取所要扫描的包,添加扫描条件,符合扫描条件之后,再复写判断是否为接口,是的话对其进行注册,注册时通过对象工厂来定制化生成bean,这里是加入了动态代理拦截其方法进行远程调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 1
2/**
3 * @author: lele
4 * @date: 2019/11/20 下午3:06
5 * 第一个接口获取注册bean能力,第二个接口获取类加载器,仿feignregister写法
6 */
7
8public class RpcStudyClientRegisty implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {
9
10
11 private ClassLoader classLoader;
12
13 @Override
14 public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
15
16
17 if (DemoApplication.mode == 1) {
18 //获取指定路径中注解bean定义扫描器
19 ClassPathScanningCandidateComponentProvider scanner = getScanner();
20 //获取扫描的包,通过enable那个注解的属性
21 Set<String> basePackages = getBasePackages(importingClassMetadata);
22 //添加过滤规则,属于rpcstudyclient的加入,excludeFilter则是排除
23 scanner.addIncludeFilter(new AnnotationTypeFilter(RpcStudyClient.class));
24
25 Set<BeanDefinition> candidateBeans = new HashSet<>();
26 //获取符合条件的bean
27 for (String basePackage : basePackages) {
28 Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);
29 candidateBeans.addAll(candidateComponents);
30 }
31 //spring中用BeanDefintion来表示bean,这里判断bean类型是否合适,合适就注册
32 for (BeanDefinition candidateBean : candidateBeans) {
33 //如果bean还没有注册
34 if (!registry.containsBeanDefinition(candidateBean.getBeanClassName())) {
35 //判读是否含有注解
36 if (candidateBean instanceof AnnotatedBeanDefinition) {
37 //存储该类信息的bean,methodMetadata(方法),AnnotationMetadata(里面也包括methodMetadata,可以获取注解,类信息等等)
38 AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) candidateBean;
39 //获取bean的类信息
40 AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();
41 //判断其否为接口
42 Assert.isTrue(annotationMetadata.isInterface(), "@RpcStudeyClient注解只能用在接口上");
43 Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(RpcStudyClient.class.getCanonicalName());
44
45 this.registerRpcClient(registry, annotationMetadata, attributes);
46 }
47 }
48 }
49 }
50
51 }
52
53 //注册bean
54 private void registerRpcClient(BeanDefinitionRegistry registry,
55 AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
56
57 //获取bean类名
58 String className = annotationMetadata.getClassName();
59 //使用自定义的对象工厂定制化生成bean
60 BeanDefinitionBuilder definition = BeanDefinitionBuilder
61 .genericBeanDefinition(RpcStudyClientFactoryBean.class);
62 //设置根据类型的注入方式
63 definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
64
65 definition.addPropertyValue("type", className);
66 String name = attributes.get("name") == null ? "" : (String) (attributes.get("name"));
67
68 String alias = name + "RpcStudyClient";
69 //获取bean基类
70 AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
71 //防止其他有实现,设置此实现为首要
72 beanDefinition.setPrimary(true);
73 BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
74 new String[]{alias});
75 //注册bean
76 BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
77 }
78
79
80 //复写bean扫描的判断
81 protected ClassPathScanningCandidateComponentProvider getScanner() {
82 return new ClassPathScanningCandidateComponentProvider(false
83 ) {
84 @Override
85 protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
86 //存放注解相关信息,具备了class、注解的信息
87 AnnotationMetadata metadata = beanDefinition.getMetadata();
88 //是否是独立能创建对象的,比如class、内部类、静态内部类
89 if (metadata.isIndependent()) {
90 //用于过滤注解为@RpcClient的注解
91 if (metadata.isInterface() &&
92 metadata.getInterfaceNames().length == 1 &&
93 Annotation.class.getName().equals(metadata.getInterfaceNames()[0])) {
94 try {
95 Class<?> target = ClassUtils.forName(metadata.getClassName(),
96 RpcStudyClientRegisty.this.classLoader);
97 return !target.isAnnotation();
98 } catch (Exception ex) {
99 this.logger.error(
100 "Could not load target class: " + beanDefinition.getMetadata().getClassName(), ex);
101 }
102 }
103 return true;
104 }
105 return false;
106
107 }
108 };
109 }
110
111 @Override
112 public void setBeanClassLoader(ClassLoader classLoader) {
113 this.classLoader = classLoader;
114 }
115
116 //获取需要扫描的包位置
117 protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
118 Map<String, Object> attributes = importingClassMetadata
119 .getAnnotationAttributes(EnableRpcStudyClient.class.getCanonicalName());
120 String[] scanPackages = (String[]) attributes.get("basePackages");
121 Set<String> basePackages = new HashSet<>();
122
123 if (scanPackages.length > 0) {
124 //扫描指定包
125 for (String pkg : scanPackages) {
126 if (StringUtils.hasText(pkg)) {
127 basePackages.add(pkg);
128 }
129 }
130 } else {
131 //扫描主入口所在的包
132 basePackages.add(
133 ClassUtils.getPackageName(importingClassMetadata.getClassName()));
134 }
135 return basePackages;
136 }
137
138}
139
140
处理接口的FactoryBean,为每个接口生成代理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1/**
2 * @author: lele
3 * @date: 2019/11/20 下午3:08
4 * bean工厂类,这里为接口代理其方法
5 */
6
7@Data
8@EqualsAndHashCode(callSuper = false)
9public class RpcStudyClientFactoryBean implements FactoryBean<Object> {
10 private Class<?> type;
11 @Override
12 public Object getObject() throws Exception {
13 return ProxyFactory.getProxy(this.type);
14 }
15
16 @Override
17 public Class<?> getObjectType() {
18 return this.type;
19 }
20}
21
22
把上面的注册配置类注入到该注解
1
2
3
4
5
6
7
8
9
10
11
12
13 1/**
2 * @author: lele
3 * @date: 2019/11/20 下午2:42
4 */
5@Retention(RetentionPolicy.RUNTIME)
6@Target(ElementType.TYPE)
7@Import(RpcStudyClientRegisty.class)
8public @interface EnableRpcStudyClient {
9 //扫描的包,如果为空,根据启动类所在的包名扫描
10 String[] basePackages() default {};
11}
12
13
需要扫描的接口上添加RpcStudyClient注解
1
2
3
4
5
6
7 1@RpcStudyClient(name="user")
2public interface HelloService {
3 String sayHello(String userName);
4 String qq();
5}
6
7
启动类上添加@EnableRpcStudyClient注解指定要扫描的包,这样就完成了client端的处理
然后到server端注册具体的服务实现类,定义一个spring会扫描有该注解的类
1
2
3
4
5
6
7
8
9 1@Target({ElementType.TYPE})
2@Retention(RetentionPolicy.RUNTIME)
3@Component
4public @interface RpcStudyService {
5 // 用来指定实现的接口
6 Class<?> value();
7}
8
9
服务注册类,核心是通过getBeanWithAnnonation来获取相应的bean,然后存储在map里面,并且在初始化后注册本实例的ip地址,netty的handler则通过这个map直接获取实例来执行相应的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 1@Component
2@Data
3public class RpcStudyRegister implements InitializingBean,ApplicationContextAware {
4 public static Map<String,Object> serviceMap;
5 @Value("${spring.application.name}")
6 private String name;
7
8 @Value("${rpcstudy.port}")
9 private Integer port;
10
11
12
13 @Override
14 public void afterPropertiesSet() throws Exception {
15 if(DemoApplication.mode==0){
16 String hostAddress = InetAddress.getLocalHost().getHostName();
17 URL url=new URL(hostAddress,port);
18 Protocol server= ProtocolFactory.netty();
19 //注册服务
20 ZkRegister.register(name,url);
21 server.start(url);
22 }
23
24 }
25
26 @Override
27 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
28 //把有RpcStudyService注解的bean添加到map里面,key为该注解的接口
29 if(DemoApplication.mode==0){
30 Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcStudyService.class);
31 if (beans != null && beans.size() > 0) {
32 serviceMap = new HashMap<>(beans.size());
33 for (Object o : beans.values()) {
34 RpcStudyService rpcService = o.getClass().getAnnotation(RpcStudyService.class);
35 String interfaceName = rpcService.value().getName();
36 serviceMap.put(interfaceName, o);
37 }
38 serviceMap = Collections.unmodifiableMap(serviceMap);
39
40 }
41 }
42
43 }
44}
45
这样当本地的netty获取class时,可以通过该map从传来的request的interface为参数来获取相应的实现类
这里由于没有做模块划分,而server和client也需要分开,所以在启动类上面定义一个mode变量,mode为0时为client端,只启动tomcat服务,mode为1时是server端只启动netty,可以看到上面的client/server的register类在相应模式下才做处理,启动client(手动改mode=1)和server(手动改mode=0),访问localhost:8080/t2,得到下面的结果
client:
server: