Kafka消息序列化和反序列化(上)

释放双眼,带上耳机,听听看~!

Kafka Producer在发送消息时必须配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在拦截器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。

首先我们通过一段示例代码来看下普通情况下Kafka Producer如何编写:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1public class ProducerJavaDemo {
2    public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092";
3    public static final String topic = "hidden-topic";
4
5    public static void main(String[] args) {
6        Properties properties = new Properties();
7        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
8        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
9        properties.put("client.id", "hidden-producer-client-id-1");
10        properties.put("bootstrap.servers", brokerList);
11
12        Producer<String,String> producer = new KafkaProducer<String,String>(properties);
13
14        while (true) {
15            String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu";
16            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
17            try {
18                Future<RecordMetadata> future =  producer.send(producerRecord, new Callback() {
19                    public void onCompletion(RecordMetadata metadata, Exception exception) {
20                        System.out.print(metadata.offset()+"    ");
21                        System.out.print(metadata.topic()+"    ");
22                        System.out.println(metadata.partition());
23                    }
24                });
25            } catch (Exception e) {
26                e.printStackTrace();
27            }
28            try {
29                TimeUnit.MILLISECONDS.sleep(10);
30            } catch (InterruptedException e) {
31                e.printStackTrace();
32            }
33        }
34    }
35}
36

这里采用的客户端不是0.8.x.x时代的Scala版本,而是Java编写的新Kafka Producer, 相应的Maven依赖如下:


1
2
3
4
5
6
1<dependency>
2    <groupId>org.apache.kafka</groupId>
3    <artifactId>kafka-clients</artifactId>
4    <version>1.0.0</version>
5</dependency>
6

上面的程序中使用的是Kafka客户端自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有三种方法:

  1. public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
  2. public byte[] serialize(String topic, T data):用来执行序列化。
  3. public void close():用来关闭当前序列化器。一般情况下这个方法都是个空方法,如果实现了此方法,必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。

下面我们来看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具体实现,源码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1public class StringSerializer implements Serializer&lt;String&gt; {
2    private String encoding = &quot;UTF8&quot;;
3
4    @Override
5    public void configure(Map&lt;String, ?&gt; configs, boolean isKey) {
6        String propertyName = isKey ? &quot;key.serializer.encoding&quot; : &quot;value.serializer.encoding&quot;;
7        Object encodingValue = configs.get(propertyName);
8        if (encodingValue == null)
9            encodingValue = configs.get(&quot;serializer.encoding&quot;);
10        if (encodingValue != null &amp;&amp; encodingValue instanceof String)
11            encoding = (String) encodingValue;
12    }
13
14    @Override
15    public byte[] serialize(String topic, String data) {
16        try {
17            if (data == null)
18                return null;
19            else
20                return data.getBytes(encoding);
21        } catch (UnsupportedEncodingException e) {
22            throw new SerializationException(&quot;Error when serializing string to byte[] due to unsupported encoding &quot; + encoding);
23        }
24    }
25
26    @Override
27    public void close() {
28        // nothing to do
29    }
30}
31

首先看下StringSerializer中的configure(Map


1
2
3
4
5
6
1public class Company {
2    private String name;
3    private String address;
4    //省略Getter, Setter, Constructor &amp; toString方法
5}
6

接下去我们来实现Company类型的Serializer,即下面代码示例中的DemoSerializer。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1package com.hidden.client;
2public class DemoSerializer implements Serializer&lt;Company&gt; {
3    public void configure(Map&lt;String, ?&gt; configs, boolean isKey) {}
4    public byte[] serialize(String topic, Company data) {
5        if (data == null) {
6            return null;
7        }
8        byte[] name, address;
9        try {
10            if (data.getName() != null) {
11                name = data.getName().getBytes(&quot;UTF-8&quot;);
12            } else {
13                name = new byte[0];
14            }
15            if (data.getAddress() != null) {
16                address = data.getAddress().getBytes(&quot;UTF-8&quot;);
17            } else {
18                address = new byte[0];
19            }
20            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
21            buffer.putInt(name.length);
22            buffer.put(name);
23            buffer.putInt(address.length);
24            buffer.put(address);
25            return buffer.array();
26        } catch (UnsupportedEncodingException e) {
27            e.printStackTrace();
28        }
29        return new byte[0];
30    }
31    public void close() {}
32}
33

使用时只需要在Kafka Producer的config中修改value.serializer属性即可,示例如下:


1
2
3
4
5
6
7
8
1properties.put(&quot;value.serializer&quot;, &quot;com.hidden.client.DemoSerializer&quot;);
2//记得也要将相应的String类型改为Company类型,如:
3//Producer&lt;String,Company&gt; producer = new KafkaProducer&lt;String,Company&gt;(properties);
4//Company company = new Company();
5//company.setName(&quot;hidden.cooperation-&quot; + new Date().getTime());
6//company.setAddress(&quot;Shanghai, China&quot;);
7//ProducerRecord&lt;String, Company&gt; producerRecord = new ProducerRecord&lt;String, Company&gt;(topic,company);
8

示例中只修改了value.serializer,而key.serializer和value.serializer没有什么区别,如果有真实需要,修改以下也未尝不可。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全网络

12. Dubbo原理解析-注册中心之基于dubbo协议的简单注册中心实现

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索