Kafka Producer在发送消息时必须配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在拦截器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。
首先我们通过一段示例代码来看下普通情况下Kafka Producer如何编写:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1public class ProducerJavaDemo {
2 public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092";
3 public static final String topic = "hidden-topic";
4
5 public static void main(String[] args) {
6 Properties properties = new Properties();
7 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
8 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
9 properties.put("client.id", "hidden-producer-client-id-1");
10 properties.put("bootstrap.servers", brokerList);
11
12 Producer<String,String> producer = new KafkaProducer<String,String>(properties);
13
14 while (true) {
15 String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu";
16 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
17 try {
18 Future<RecordMetadata> future = producer.send(producerRecord, new Callback() {
19 public void onCompletion(RecordMetadata metadata, Exception exception) {
20 System.out.print(metadata.offset()+" ");
21 System.out.print(metadata.topic()+" ");
22 System.out.println(metadata.partition());
23 }
24 });
25 } catch (Exception e) {
26 e.printStackTrace();
27 }
28 try {
29 TimeUnit.MILLISECONDS.sleep(10);
30 } catch (InterruptedException e) {
31 e.printStackTrace();
32 }
33 }
34 }
35}
36
这里采用的客户端不是0.8.x.x时代的Scala版本,而是Java编写的新Kafka Producer, 相应的Maven依赖如下:
1
2
3
4
5
6 1<dependency>
2 <groupId>org.apache.kafka</groupId>
3 <artifactId>kafka-clients</artifactId>
4 <version>1.0.0</version>
5</dependency>
6
上面的程序中使用的是Kafka客户端自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有三种方法:
- public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
- public byte[] serialize(String topic, T data):用来执行序列化。
- public void close():用来关闭当前序列化器。一般情况下这个方法都是个空方法,如果实现了此方法,必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。
下面我们来看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具体实现,源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1public class StringSerializer implements Serializer<String> {
2 private String encoding = "UTF8";
3
4 @Override
5 public void configure(Map<String, ?> configs, boolean isKey) {
6 String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
7 Object encodingValue = configs.get(propertyName);
8 if (encodingValue == null)
9 encodingValue = configs.get("serializer.encoding");
10 if (encodingValue != null && encodingValue instanceof String)
11 encoding = (String) encodingValue;
12 }
13
14 @Override
15 public byte[] serialize(String topic, String data) {
16 try {
17 if (data == null)
18 return null;
19 else
20 return data.getBytes(encoding);
21 } catch (UnsupportedEncodingException e) {
22 throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
23 }
24 }
25
26 @Override
27 public void close() {
28 // nothing to do
29 }
30}
31
首先看下StringSerializer中的configure(Map
1
2
3
4
5
6 1public class Company {
2 private String name;
3 private String address;
4 //省略Getter, Setter, Constructor & toString方法
5}
6
接下去我们来实现Company类型的Serializer,即下面代码示例中的DemoSerializer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1package com.hidden.client;
2public class DemoSerializer implements Serializer<Company> {
3 public void configure(Map<String, ?> configs, boolean isKey) {}
4 public byte[] serialize(String topic, Company data) {
5 if (data == null) {
6 return null;
7 }
8 byte[] name, address;
9 try {
10 if (data.getName() != null) {
11 name = data.getName().getBytes("UTF-8");
12 } else {
13 name = new byte[0];
14 }
15 if (data.getAddress() != null) {
16 address = data.getAddress().getBytes("UTF-8");
17 } else {
18 address = new byte[0];
19 }
20 ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
21 buffer.putInt(name.length);
22 buffer.put(name);
23 buffer.putInt(address.length);
24 buffer.put(address);
25 return buffer.array();
26 } catch (UnsupportedEncodingException e) {
27 e.printStackTrace();
28 }
29 return new byte[0];
30 }
31 public void close() {}
32}
33
使用时只需要在Kafka Producer的config中修改value.serializer属性即可,示例如下:
1
2
3
4
5
6
7
8 1properties.put("value.serializer", "com.hidden.client.DemoSerializer");
2//记得也要将相应的String类型改为Company类型,如:
3//Producer<String,Company> producer = new KafkaProducer<String,Company>(properties);
4//Company company = new Company();
5//company.setName("hidden.cooperation-" + new Date().getTime());
6//company.setAddress("Shanghai, China");
7//ProducerRecord<String, Company> producerRecord = new ProducerRecord<String, Company>(topic,company);
8
示例中只修改了value.serializer,而key.serializer和value.serializer没有什么区别,如果有真实需要,修改以下也未尝不可。