基于Netty的RPC简单框架实现(三):Kryo实现序列化

释放双眼,带上耳机,听听看~!

1.
序列

化和反序列化

网络中都是以字节序列的形式来传输数据的,因此在发送消息时需要先将对象序列化转换为字节序列,然后将获得的字节序列发送出去,消息接收方接收到字节序列后将之反序列化获得传输的对象,从收发双方来看就如同直接发送和接收了对象一样。


******2.**第三方依赖

本例使用目前最新版的kryo-serializers 0.36用于序列化


1
2
3
4
5
6
1<dependency>
2    <groupId>de.javakaffee</groupId>
3    <artifactId>kryo-serializers</artifactId>
4    <version>0.36</version>
5</dependency>
6

1
2
1使用maven直接在pom.xml中添加上面的依赖即可  
2

3.序列化和反序列化的实现

序列化和反序列化的对象在本例中只有两种:1.客户端向服务端发出的调用请求RpcRequest 2.服务端向客户端返回的调用结果RpcResponse

(1).RpcResponse

从RPC服务端传回给客户端的某次调用请求的结果


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
1package com.maigo.rpc.context;
2
3public class RpcResponse
4{ 
5   private int id;
6   private Object result;
7   private Throwable throwable;
8   private boolean isInvokeSuccess;
9  
10  public int getId()
11  {
12      return id;
13  }
14 
15  public void setId(int id)
16  {
17      this.id = id;
18  }
19 
20  public Object getResult()
21  {
22      return result;
23  }
24 
25  public void setResult(Object result)
26  {
27      this.result = result;
28  }
29 
30  public Throwable getThrowable()
31  {
32      return throwable;
33  }
34 
35  public void setThrowable(Throwable throwable)
36  {
37      this.throwable = throwable;
38  }
39 
40  public boolean isInvokeSuccess()
41  {
42      return isInvokeSuccess;
43  }
44 
45  public void setInvokeSuccess(boolean isInvokeSuccess)
46  {
47      this.isInvokeSuccess = isInvokeSuccess;
48  }
49
50}
51
52

1
2
1RpcResponse中的id对应着该次请求的RpcRequest中的id,isInvokeSuccess表示调用中是否有异常抛出,result和throwable分别表示调用结果和调用过程抛出的异常。
2

(2).RpcRequest

已在上一节中给出

(3).KryoSerializer

实际负责序列化和反序列化


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
1package com.maigo.rpc.serializer;
2
3import java.io.IOException;
4
5import com.esotericsoftware.kryo.Kryo;
6import com.esotericsoftware.kryo.io.Input;
7import com.esotericsoftware.kryo.io.Output;
8
9import io.netty.buffer.ByteBuf;
10import io.netty.buffer.ByteBufInputStream;
11import io.netty.buffer.ByteBufOutputStream;
12
13public class KryoSerializer
14{
15  private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
16 
17  public static void serialize(Object object, ByteBuf byteBuf)
18  {
19      Kryo kryo = KryoHolder.get();
20      int startIdx = byteBuf.writerIndex();
21        ByteBufOutputStream byteOutputStream = new ByteBufOutputStream(byteBuf);
22        try
23        {
24          byteOutputStream.write(LENGTH_PLACEHOLDER);
25          Output output = new Output(1024*4, -1);
26          output.setOutputStream(byteOutputStream);
27          kryo.writeClassAndObject(output, object);
28         
29          output.flush();
30          output.close();
31         
32          int endIdx = byteBuf.writerIndex();
33
34          byteBuf.setInt(startIdx, endIdx - startIdx - 4);
35      }
36        catch (IOException e)
37        {
38          e.printStackTrace();
39      }
40  }
41
42  public static Object deserialize(ByteBuf byteBuf)
43  {
44      if(byteBuf == null)
45            return null;
46     
47        Input input = new Input(new ByteBufInputStream(byteBuf));
48        Kryo kryo = KryoHolder.get();
49        return kryo.readClassAndObject(input);
50  }  
51}
52

1
2
1serialize()将一个对象通过kryo序列化并写入ByteBuf中,注意到在头部预留了4个字节用于写入长度信息。deserialize()将ByteBuf中的内容反序列化还原出传输的对象。其中序列化和反序列化均用到了kryo对象,该对象是从KryoHolder中通过get()拿到的。
2

(4).
KryoHolder

由于kryo对象是线程不安全的,当有多个netty的channel同时连接时,各channel是可能工作在不同的线程上的(netty中一个IO线程可以对应多个channel,而一个channel只能对应一个线程,详细可以参考netty线程模型),若共用同一个kryo对象会出现并发问题,因此用ThreadLocal在每个线程保留一个各自的kryo对象,保证不会大量创建kryo对象的同时避免了并发问题


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1package com.maigo.rpc.serializer;
2
3import com.esotericsoftware.kryo.Kryo;
4
5public class KryoHolder
6{
7   private static ThreadLocal<Kryo> threadLocalKryo = new ThreadLocal<Kryo>()
8   {
9       protected Kryo initialValue()
10      {
11          Kryo kryo = new KryoReflectionFactory();
12                     
13          return kryo;
14      };
15  };
16 
17  public static Kryo get()
18  {
19      return threadLocalKryo.get();
20  }
21}
22
23

1
2
1可见,最终用于序列化和反序列化的kryo对象是通过new KryoReflectionFactory()创建的。
2

(5).
KryoReflectionFactory


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
1package com.maigo.rpc.serializer;
2
3import java.lang.reflect.InvocationHandler;
4import java.net.URI;
5import java.util.Arrays;
6import java.util.BitSet;
7import java.util.Collection;
8import java.util.Collections;
9import java.util.Date;
10import java.util.EnumMap;
11import java.util.EnumSet;
12import java.util.GregorianCalendar;
13import java.util.Map;
14import java.util.UUID;
15import java.util.regex.Pattern;
16
17import com.esotericsoftware.kryo.Serializer;
18import com.maigo.rpc.context.RpcRequest;
19import com.maigo.rpc.context.RpcResponse;
20
21import de.javakaffee.kryoserializers.ArraysAsListSerializer;
22import de.javakaffee.kryoserializers.BitSetSerializer;
23import de.javakaffee.kryoserializers.CollectionsEmptyListSerializer;
24import de.javakaffee.kryoserializers.CollectionsEmptyMapSerializer;
25import de.javakaffee.kryoserializers.CollectionsEmptySetSerializer;
26import de.javakaffee.kryoserializers.CollectionsSingletonListSerializer;
27import de.javakaffee.kryoserializers.CollectionsSingletonMapSerializer;
28import de.javakaffee.kryoserializers.CollectionsSingletonSetSerializer;
29import de.javakaffee.kryoserializers.CopyForIterateCollectionSerializer;
30import de.javakaffee.kryoserializers.CopyForIterateMapSerializer;
31import de.javakaffee.kryoserializers.DateSerializer;
32import de.javakaffee.kryoserializers.EnumMapSerializer;
33import de.javakaffee.kryoserializers.EnumSetSerializer;
34import de.javakaffee.kryoserializers.GregorianCalendarSerializer;
35import de.javakaffee.kryoserializers.JdkProxySerializer;
36import de.javakaffee.kryoserializers.KryoReflectionFactorySupport;
37import de.javakaffee.kryoserializers.RegexSerializer;
38import de.javakaffee.kryoserializers.SubListSerializers;
39import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer;
40import de.javakaffee.kryoserializers.URISerializer;
41import de.javakaffee.kryoserializers.UUIDSerializer;
42import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
43
44public class KryoReflectionFactory extends KryoReflectionFactorySupport
45{
46  public KryoReflectionFactory()
47  {
48      setRegistrationRequired(false);
49      setReferences(true);
50      register(RpcRequest.class, new RpcRequestSerializer());
51      register(RpcResponse.class, new RpcResponseSerializer());
52        register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
53        register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer());
54        register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer());
55        register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer());
56        register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer());
57        register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer());
58        register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer());
59        register(Pattern.class, new RegexSerializer());
60        register(BitSet.class, new BitSetSerializer());
61        register(URI.class, new URISerializer());
62        register(UUID.class, new UUIDSerializer());
63        register(GregorianCalendar.class, new GregorianCalendarSerializer());
64        register(InvocationHandler.class, new JdkProxySerializer());
65        UnmodifiableCollectionsSerializer.registerSerializers(this);
66        SynchronizedCollectionsSerializer.registerSerializers(this);
67  }
68 
69  @Override
70  @SuppressWarnings({"rawtypes", "unchecked"})
71  public Serializer<?> getDefaultSerializer(Class clazz)
72  {      
73      if(EnumSet.class.isAssignableFrom(clazz))
74          return new EnumSetSerializer();
75        
76        if(EnumMap.class.isAssignableFrom(clazz))
77            return new EnumMapSerializer();
78        
79        if(Collection.class.isAssignableFrom(clazz))
80            return new CopyForIterateCollectionSerializer();
81        
82        if(Map.class.isAssignableFrom(clazz))
83            return new CopyForIterateMapSerializer();
84        
85        if(Date.class.isAssignableFrom(clazz))
86            return new DateSerializer( clazz );
87        
88        if (SubListSerializers.ArrayListSubListSerializer.canSerialize(clazz)
89              || SubListSerializers.JavaUtilSubListSerializer.canSerialize(clazz))
90          return SubListSerializers.createFor(clazz);    
91        
92        return super.getDefaultSerializer(clazz);
93  }  
94}
95
96

导入的包非常多,主要完成的功能是给大量类类型注册其对应的Serializer。setRegistrationRequired()设置是否只能序列化已注册的类,此处必须设置为false,因为RPC请求和回应中都可能包含用户自定义的类,这些类显然是不可能在kryo中注册过的。setReferences()若设置成false在序列化Exception时似乎有问题,此处维持打开(默认也是打开)。注意到给RpcRequest.class和RpcResponse.class分别注册了对应的
Serializer为RpcRequestSerializer和RpcResponseSerializer。这是由于kryo对未注册的类序列化后的格式是


1
2
1x01 x00 <(string)className> <(byte)id> <(Object)objectFieldValue ordered by fieldName>
2

里面包含类的全类名,导致序列化后的字节序列很长,故应该实现一个自定义的
Serializer用于已知类型的序列化和反序列化缩短序列化后的字节序列。

(6).
RpcRequestSerializer


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1package com.maigo.rpc.serializer;
2
3import com.esotericsoftware.kryo.Kryo;
4import com.esotericsoftware.kryo.Serializer;
5import com.esotericsoftware.kryo.io.Input;
6import com.esotericsoftware.kryo.io.Output;
7import com.maigo.rpc.context.RpcRequest;
8
9public class RpcRequestSerializer extends Serializer<RpcRequest>
10{
11  @Override
12  public void write(Kryo kryo, Output output, RpcRequest object)
13  {
14      output.writeInt(object.getId());
15      output.writeByte(object.getMethodName().length());
16      output.write(object.getMethodName().getBytes());
17      kryo.writeClassAndObject(output, object.getArgs());
18  }
19
20  @Override
21  public RpcRequest read(Kryo kryo, Input input, Class<RpcRequest> type)
22  {
23      RpcRequest rpcRequest = null;
24      int id = input.readInt();
25      byte methodLength = input.readByte();
26      byte[] methodBytes = input.readBytes(methodLength);
27      String methodName = new String(methodBytes);
28      Object[] args = (Object[])kryo.readClassAndObject(input);
29     
30      rpcRequest = new RpcRequest(id, methodName, args);
31     
32      return rpcRequest;
33  }
34}
35
36

write()中按顺序往output中写入id,调用方法名的长度和调用方法名的字节数组,最后是调用方法的参数列表,由于不知道参数的确切类型,此处调用传进的kryo对象的writeClassAndObject()方法对参数进行序列化。

read()中按照相同的顺序读出值并根据这些值构建出一个RpcRequest对象并返回。

(7).
RpcResponseSerializer


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1package com.maigo.rpc.serializer;
2
3import com.esotericsoftware.kryo.Kryo;
4import com.esotericsoftware.kryo.Serializer;
5import com.esotericsoftware.kryo.io.Input;
6import com.esotericsoftware.kryo.io.Output;
7import com.maigo.rpc.context.RpcResponse;
8
9public class RpcResponseSerializer extends Serializer<RpcResponse>
10{
11  @Override
12  public void write(Kryo kryo, Output output, RpcResponse object)
13  {
14      output.writeInt(object.getId());
15      output.writeBoolean(object.isInvokeSuccess());
16      if(object.isInvokeSuccess())
17          kryo.writeClassAndObject(output, object.getResult());
18      else
19          kryo.writeClassAndObject(output, object.getThrowable());
20  }
21
22  @Override
23  public RpcResponse read(Kryo kryo, Input input, Class<RpcResponse> type)
24  {
25      RpcResponse rpcResponse = new RpcResponse();
26      rpcResponse.setId(input.readInt());
27      rpcResponse.setInvokeSuccess(input.readBoolean());
28      if(rpcResponse.isInvokeSuccess())
29          rpcResponse.setResult(kryo.readClassAndObject(input));
30      else
31          rpcResponse.setThrowable((Throwable)kryo.readClassAndObject(input));
32     
33      return rpcResponse;
34  }
35}
36
37

类似RpcRequestSerializer,不再赘述 ****
4.测试

测试内容与下一节的Netty网络传输一同测试

给TA打赏
共{{data.count}}人
人已打赏
安全经验

英文站如何做Google Adsense

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索