Kafka消息序列化和反序列化(下)

释放双眼,带上耳机,听听看~!

接上一篇:Kafka消息序列化和反序列化(上)。

有序列化就会有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起来只需要配置一下key.deserializer和value.deseriaizer。对应上面自定义的Company类型的Deserializer就需要实现org.apache.kafka.common.serialization.Deserializer接口,这个接口同样有三个方法:

  1. public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
  2. public byte[] serialize(String topic, T data):用来执行反序列化。如果data为null建议处理的时候直接返回null而不是抛出一个异常。
  3. public void close():用来关闭当前序列化器。

下面就来看一下DemoSerializer对应的反序列化的DemoDeserializer,详细代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1public class DemoDeserializer implements Deserializer&lt;Company&gt; {
2    public void configure(Map&lt;String, ?&gt; configs, boolean isKey) {}
3    public Company deserialize(String topic, byte[] data) {
4        if (data == null) {
5            return null;
6        }
7        if (data.length &lt; 8) {
8            throw new SerializationException(&quot;Size of data received by DemoDeserializer is shorter than expected!&quot;);
9        }
10        ByteBuffer buffer = ByteBuffer.wrap(data);
11        int nameLen, addressLen;
12        String name, address;
13        nameLen = buffer.getInt();
14        byte[] nameBytes = new byte[nameLen];
15        buffer.get(nameBytes);
16        addressLen = buffer.getInt();
17        byte[] addressBytes = new byte[addressLen];
18        buffer.get(addressLen);
19        try {
20            name = new String(nameBytes, &quot;UTF-8&quot;);
21            address = new String(addressBytes, &quot;UTF-8&quot;);
22        } catch (UnsupportedEncodingException e) {
23            throw new SerializationException(&quot;Error occur when deserializing!&quot;);
24        }
25        return new Company(name,address);
26    }
27    public void close() {}
28}
29

有些读者可能对新版的Consumer不是很熟悉,这里顺带着举一个完整的消费示例,并以DemoDeserializer作为消息Value的反序列化器。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1Properties properties = new Properties();
2properties.put(&quot;bootstrap.servers&quot;, brokerList);
3properties.put(&quot;group.id&quot;, consumerGroup);
4properties.put(&quot;session.timeout.ms&quot;, 10000);
5properties.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
6properties.put(&quot;value.deserializer&quot;, &quot;com.hidden.client.DemoDeserializer&quot;);
7properties.put(&quot;client.id&quot;, &quot;hidden-consumer-client-id-zzh-2&quot;);
8KafkaConsumer&lt;String, Company&gt; consumer = new KafkaConsumer&lt;String, Company&gt;(properties);
9consumer.subscribe(Arrays.asList(topic));
10try {
11    while (true) {
12        ConsumerRecords&lt;String, Company&gt; records = consumer.poll(100);
13        for (ConsumerRecord&lt;String, Company&gt; record : records) {
14            String info = String.format(&quot;topic=%s, partition=%s, offset=%d, consumer=%s, country=%s&quot;,
15                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
16            System.out.println(info);
17        }
18        consumer.commitAsync(new OffsetCommitCallback() {
19            public void onComplete(Map&lt;TopicPartition, OffsetAndMetadata&gt; offsets, Exception exception) {
20                if (exception != null) {
21                    String error = String.format(&quot;Commit failed for offsets {}&quot;, offsets, exception);
22                    System.out.println(error);
23                }
24            }
25        });
26    }
27} finally {
28    consumer.close();
29}
30

有些时候自定义的类型还可以和Avro、ProtoBuf等联合使用,而且这样更加的方便快捷,比如我们将前面Company的Serializer和Deserializer用Protostuff包装一下,由于篇幅限制,笔者这里只罗列出对应的serialize和deserialize方法,详细参考如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1public byte[] serialize(String topic, Company data) {
2    if (data == null) {
3        return null;
4    }
5    Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass());
6    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
7    byte[] protostuff = null;
8    try {
9        protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
10    } catch (Exception e) {
11        throw new IllegalStateException(e.getMessage(), e);
12    } finally {
13        buffer.clear();
14    }
15    return protostuff;
16}
17
18public Company deserialize(String topic, byte[] data) {
19    if (data == null) {
20        return null;
21    }
22    Schema schema = RuntimeSchema.getSchema(Company.class);
23    Company ans = new Company();
24    ProtostuffIOUtil.mergeFrom(data, ans, schema);
25    return ans;
26}
27

如果Company的字段很多,我们使用Protostuff进一步封装一下的方式就显得简洁很多。不过这个不是最主要的,而最主要的是经过Protostuff包装之后,这个Serializer和Deserializer可以向前兼容(新加字段采用默认值)和向后兼容(忽略新加字段),这个特性Avro和Protobuf也都具备。

自定义的类型有一个不得不面对的问题就是Kafka Producer和Kafka Consumer之间的序列化和反序列化的兼容性,试想对于StringSerializer来说,Kafka Consumer可以顺其自然的采用StringDeserializer,不过对于Company这种专用类型,某个服务使用DemoSerializer进行了序列化之后,那么下游的消费者服务必须也要实现对应的DemoDeserializer。再者,如果上游的Company类型改变,下游也需要跟着重新实现一个新的DemoSerializer,这个后面所面临的难题可想而知。所以,如无特殊需要,笔者不建议使用自定义的序列化和反序列化器;如有业务需要,也要使用通用的Avro、Protobuf、Protostuff等序列化工具包装,尽可能的实现得更加通用且向前后兼容。

题外话,对于Kafka的“深耕者”Confluent来说,还有其自身的一套序列化和反序列化解决方案(io.confluent.kafka.serializer.KafkaAvroSerializer),GitHub上有相关资料,读者如有兴趣可以自行扩展学习。

给TA打赏
共{{data.count}}人
人已打赏
安全运维

jmeter连接Mysql数据库测试性能初探

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索