1.
序列
化和反序列化
网络中都是以字节序列的形式来传输数据的,因此在发送消息时需要先将对象序列化转换为字节序列,然后将获得的字节序列发送出去,消息接收方接收到字节序列后将之反序列化获得传输的对象,从收发双方来看就如同直接发送和接收了对象一样。
******2.**第三方依赖
本例使用目前最新版的kryo-serializers 0.36用于序列化
1
2
3
4
5
6 1<dependency>
2 <groupId>de.javakaffee</groupId>
3 <artifactId>kryo-serializers</artifactId>
4 <version>0.36</version>
5</dependency>
6
1
2 1使用maven直接在pom.xml中添加上面的依赖即可
2
3.序列化和反序列化的实现
序列化和反序列化的对象在本例中只有两种:1.客户端向服务端发出的调用请求RpcRequest 2.服务端向客户端返回的调用结果RpcResponse
(1).RpcResponse
从RPC服务端传回给客户端的某次调用请求的结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1package com.maigo.rpc.context;
2
3public class RpcResponse
4{
5 private int id;
6 private Object result;
7 private Throwable throwable;
8 private boolean isInvokeSuccess;
9
10 public int getId()
11 {
12 return id;
13 }
14
15 public void setId(int id)
16 {
17 this.id = id;
18 }
19
20 public Object getResult()
21 {
22 return result;
23 }
24
25 public void setResult(Object result)
26 {
27 this.result = result;
28 }
29
30 public Throwable getThrowable()
31 {
32 return throwable;
33 }
34
35 public void setThrowable(Throwable throwable)
36 {
37 this.throwable = throwable;
38 }
39
40 public boolean isInvokeSuccess()
41 {
42 return isInvokeSuccess;
43 }
44
45 public void setInvokeSuccess(boolean isInvokeSuccess)
46 {
47 this.isInvokeSuccess = isInvokeSuccess;
48 }
49
50}
51
52
1
2 1RpcResponse中的id对应着该次请求的RpcRequest中的id,isInvokeSuccess表示调用中是否有异常抛出,result和throwable分别表示调用结果和调用过程抛出的异常。
2
(2).RpcRequest
已在上一节中给出
(3).KryoSerializer
实际负责序列化和反序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1package com.maigo.rpc.serializer;
2
3import java.io.IOException;
4
5import com.esotericsoftware.kryo.Kryo;
6import com.esotericsoftware.kryo.io.Input;
7import com.esotericsoftware.kryo.io.Output;
8
9import io.netty.buffer.ByteBuf;
10import io.netty.buffer.ByteBufInputStream;
11import io.netty.buffer.ByteBufOutputStream;
12
13public class KryoSerializer
14{
15 private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
16
17 public static void serialize(Object object, ByteBuf byteBuf)
18 {
19 Kryo kryo = KryoHolder.get();
20 int startIdx = byteBuf.writerIndex();
21 ByteBufOutputStream byteOutputStream = new ByteBufOutputStream(byteBuf);
22 try
23 {
24 byteOutputStream.write(LENGTH_PLACEHOLDER);
25 Output output = new Output(1024*4, -1);
26 output.setOutputStream(byteOutputStream);
27 kryo.writeClassAndObject(output, object);
28
29 output.flush();
30 output.close();
31
32 int endIdx = byteBuf.writerIndex();
33
34 byteBuf.setInt(startIdx, endIdx - startIdx - 4);
35 }
36 catch (IOException e)
37 {
38 e.printStackTrace();
39 }
40 }
41
42 public static Object deserialize(ByteBuf byteBuf)
43 {
44 if(byteBuf == null)
45 return null;
46
47 Input input = new Input(new ByteBufInputStream(byteBuf));
48 Kryo kryo = KryoHolder.get();
49 return kryo.readClassAndObject(input);
50 }
51}
52
1
2 1serialize()将一个对象通过kryo序列化并写入ByteBuf中,注意到在头部预留了4个字节用于写入长度信息。deserialize()将ByteBuf中的内容反序列化还原出传输的对象。其中序列化和反序列化均用到了kryo对象,该对象是从KryoHolder中通过get()拿到的。
2
(4).
KryoHolder
由于kryo对象是线程不安全的,当有多个netty的channel同时连接时,各channel是可能工作在不同的线程上的(netty中一个IO线程可以对应多个channel,而一个channel只能对应一个线程,详细可以参考netty线程模型),若共用同一个kryo对象会出现并发问题,因此用ThreadLocal在每个线程保留一个各自的kryo对象,保证不会大量创建kryo对象的同时避免了并发问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1package com.maigo.rpc.serializer;
2
3import com.esotericsoftware.kryo.Kryo;
4
5public class KryoHolder
6{
7 private static ThreadLocal<Kryo> threadLocalKryo = new ThreadLocal<Kryo>()
8 {
9 protected Kryo initialValue()
10 {
11 Kryo kryo = new KryoReflectionFactory();
12
13 return kryo;
14 };
15 };
16
17 public static Kryo get()
18 {
19 return threadLocalKryo.get();
20 }
21}
22
23
1
2 1可见,最终用于序列化和反序列化的kryo对象是通过new KryoReflectionFactory()创建的。
2
(5).
KryoReflectionFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 1package com.maigo.rpc.serializer;
2
3import java.lang.reflect.InvocationHandler;
4import java.net.URI;
5import java.util.Arrays;
6import java.util.BitSet;
7import java.util.Collection;
8import java.util.Collections;
9import java.util.Date;
10import java.util.EnumMap;
11import java.util.EnumSet;
12import java.util.GregorianCalendar;
13import java.util.Map;
14import java.util.UUID;
15import java.util.regex.Pattern;
16
17import com.esotericsoftware.kryo.Serializer;
18import com.maigo.rpc.context.RpcRequest;
19import com.maigo.rpc.context.RpcResponse;
20
21import de.javakaffee.kryoserializers.ArraysAsListSerializer;
22import de.javakaffee.kryoserializers.BitSetSerializer;
23import de.javakaffee.kryoserializers.CollectionsEmptyListSerializer;
24import de.javakaffee.kryoserializers.CollectionsEmptyMapSerializer;
25import de.javakaffee.kryoserializers.CollectionsEmptySetSerializer;
26import de.javakaffee.kryoserializers.CollectionsSingletonListSerializer;
27import de.javakaffee.kryoserializers.CollectionsSingletonMapSerializer;
28import de.javakaffee.kryoserializers.CollectionsSingletonSetSerializer;
29import de.javakaffee.kryoserializers.CopyForIterateCollectionSerializer;
30import de.javakaffee.kryoserializers.CopyForIterateMapSerializer;
31import de.javakaffee.kryoserializers.DateSerializer;
32import de.javakaffee.kryoserializers.EnumMapSerializer;
33import de.javakaffee.kryoserializers.EnumSetSerializer;
34import de.javakaffee.kryoserializers.GregorianCalendarSerializer;
35import de.javakaffee.kryoserializers.JdkProxySerializer;
36import de.javakaffee.kryoserializers.KryoReflectionFactorySupport;
37import de.javakaffee.kryoserializers.RegexSerializer;
38import de.javakaffee.kryoserializers.SubListSerializers;
39import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer;
40import de.javakaffee.kryoserializers.URISerializer;
41import de.javakaffee.kryoserializers.UUIDSerializer;
42import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
43
44public class KryoReflectionFactory extends KryoReflectionFactorySupport
45{
46 public KryoReflectionFactory()
47 {
48 setRegistrationRequired(false);
49 setReferences(true);
50 register(RpcRequest.class, new RpcRequestSerializer());
51 register(RpcResponse.class, new RpcResponseSerializer());
52 register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
53 register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer());
54 register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer());
55 register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer());
56 register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer());
57 register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer());
58 register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer());
59 register(Pattern.class, new RegexSerializer());
60 register(BitSet.class, new BitSetSerializer());
61 register(URI.class, new URISerializer());
62 register(UUID.class, new UUIDSerializer());
63 register(GregorianCalendar.class, new GregorianCalendarSerializer());
64 register(InvocationHandler.class, new JdkProxySerializer());
65 UnmodifiableCollectionsSerializer.registerSerializers(this);
66 SynchronizedCollectionsSerializer.registerSerializers(this);
67 }
68
69 @Override
70 @SuppressWarnings({"rawtypes", "unchecked"})
71 public Serializer<?> getDefaultSerializer(Class clazz)
72 {
73 if(EnumSet.class.isAssignableFrom(clazz))
74 return new EnumSetSerializer();
75
76 if(EnumMap.class.isAssignableFrom(clazz))
77 return new EnumMapSerializer();
78
79 if(Collection.class.isAssignableFrom(clazz))
80 return new CopyForIterateCollectionSerializer();
81
82 if(Map.class.isAssignableFrom(clazz))
83 return new CopyForIterateMapSerializer();
84
85 if(Date.class.isAssignableFrom(clazz))
86 return new DateSerializer( clazz );
87
88 if (SubListSerializers.ArrayListSubListSerializer.canSerialize(clazz)
89 || SubListSerializers.JavaUtilSubListSerializer.canSerialize(clazz))
90 return SubListSerializers.createFor(clazz);
91
92 return super.getDefaultSerializer(clazz);
93 }
94}
95
96
导入的包非常多,主要完成的功能是给大量类类型注册其对应的Serializer。setRegistrationRequired()设置是否只能序列化已注册的类,此处必须设置为false,因为RPC请求和回应中都可能包含用户自定义的类,这些类显然是不可能在kryo中注册过的。setReferences()若设置成false在序列化Exception时似乎有问题,此处维持打开(默认也是打开)。注意到给RpcRequest.class和RpcResponse.class分别注册了对应的
Serializer为RpcRequestSerializer和RpcResponseSerializer。这是由于kryo对未注册的类序列化后的格式是
1
2 1x01 x00 <(string)className> <(byte)id> <(Object)objectFieldValue ordered by fieldName>
2
里面包含类的全类名,导致序列化后的字节序列很长,故应该实现一个自定义的
Serializer用于已知类型的序列化和反序列化缩短序列化后的字节序列。
(6).
RpcRequestSerializer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1package com.maigo.rpc.serializer;
2
3import com.esotericsoftware.kryo.Kryo;
4import com.esotericsoftware.kryo.Serializer;
5import com.esotericsoftware.kryo.io.Input;
6import com.esotericsoftware.kryo.io.Output;
7import com.maigo.rpc.context.RpcRequest;
8
9public class RpcRequestSerializer extends Serializer<RpcRequest>
10{
11 @Override
12 public void write(Kryo kryo, Output output, RpcRequest object)
13 {
14 output.writeInt(object.getId());
15 output.writeByte(object.getMethodName().length());
16 output.write(object.getMethodName().getBytes());
17 kryo.writeClassAndObject(output, object.getArgs());
18 }
19
20 @Override
21 public RpcRequest read(Kryo kryo, Input input, Class<RpcRequest> type)
22 {
23 RpcRequest rpcRequest = null;
24 int id = input.readInt();
25 byte methodLength = input.readByte();
26 byte[] methodBytes = input.readBytes(methodLength);
27 String methodName = new String(methodBytes);
28 Object[] args = (Object[])kryo.readClassAndObject(input);
29
30 rpcRequest = new RpcRequest(id, methodName, args);
31
32 return rpcRequest;
33 }
34}
35
36
write()中按顺序往output中写入id,调用方法名的长度和调用方法名的字节数组,最后是调用方法的参数列表,由于不知道参数的确切类型,此处调用传进的kryo对象的writeClassAndObject()方法对参数进行序列化。
read()中按照相同的顺序读出值并根据这些值构建出一个RpcRequest对象并返回。
(7).
RpcResponseSerializer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 1package com.maigo.rpc.serializer;
2
3import com.esotericsoftware.kryo.Kryo;
4import com.esotericsoftware.kryo.Serializer;
5import com.esotericsoftware.kryo.io.Input;
6import com.esotericsoftware.kryo.io.Output;
7import com.maigo.rpc.context.RpcResponse;
8
9public class RpcResponseSerializer extends Serializer<RpcResponse>
10{
11 @Override
12 public void write(Kryo kryo, Output output, RpcResponse object)
13 {
14 output.writeInt(object.getId());
15 output.writeBoolean(object.isInvokeSuccess());
16 if(object.isInvokeSuccess())
17 kryo.writeClassAndObject(output, object.getResult());
18 else
19 kryo.writeClassAndObject(output, object.getThrowable());
20 }
21
22 @Override
23 public RpcResponse read(Kryo kryo, Input input, Class<RpcResponse> type)
24 {
25 RpcResponse rpcResponse = new RpcResponse();
26 rpcResponse.setId(input.readInt());
27 rpcResponse.setInvokeSuccess(input.readBoolean());
28 if(rpcResponse.isInvokeSuccess())
29 rpcResponse.setResult(kryo.readClassAndObject(input));
30 else
31 rpcResponse.setThrowable((Throwable)kryo.readClassAndObject(input));
32
33 return rpcResponse;
34 }
35}
36
37
类似RpcRequestSerializer,不再赘述 ****
4.测试
测试内容与下一节的Netty网络传输一同测试