使用传统的 avro API 自定义序列化类和反序列化类比较麻烦,需要根据 schema 生成实体类,需要调用 avro 的 API 实现 对象到 byte[] 和 byte[] 到对象的转化,而那些方法看上去比较繁琐,幸运的是,Twitter 开源的类库 Bijection 对传统的 Avro API 进行了封装了和优化,让我们可以方便的实现以上操作。
1. 添加 Bijection 类库的依赖,并新建一个 schema 文件
Bijection 类库的依赖如下:
1 2 3 4 5 6 7
| 1
2<dependency>
3 <groupId>com.twitter</groupId>
4 <artifactId>bijection-avro_2.11</artifactId>
5 <version>0.9.6</version>
6</dependency>
7 |
在 maven 工程的 resources 目录下新建一个 schema 文件,名称为"user.json",因为我们不用 avro 生成实体类的方式,所以定义一个普通的 json 文件来描述 schema 即可,另外,在 json 文件中,也不需要"namespace": "packageName"这个限定生成实体类的包名的参数,本文使用的 json 文件内容如下:
1 2 3 4 5 6 7 8 9 10 11
| 1
2{
3 "type": "record",
4 "name": "User",
5 "fields": [
6 {"name": "id", "type": "int"},
7 {"name": "name", "type": "string"},
8 {"name": "age", "type": "int"}
9 ]
10}
11 |
2. KafkaProducer 使用 Bijection 类库发送序列化后的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| 1
2package com.bonc.rdpe.kafka110.producer;
3
4import java.io.BufferedReader;
5import java.io.File;
6import java.io.FileReader;
7import java.util.Properties;
8
9import org.apache.avro.Schema;
10import org.apache.avro.generic.GenericData;
11import org.apache.avro.generic.GenericRecord;
12import org.apache.kafka.clients.producer.KafkaProducer;
13import org.apache.kafka.clients.producer.Producer;
14import org.apache.kafka.clients.producer.ProducerRecord;
15
16import com.twitter.bijection.Injection;
17import com.twitter.bijection.avro.GenericAvroCodecs;
18
19/**
20 * @Title BijectionProducer.java
21 * @Description KafkaProducer 使用 Bijection 类库发送序列化后的消息
22 * @Author YangYunhe
23 * @Date 2018-06-22 10:42:06
24 */
25public class BijectionProducer {
26
27 public static void main(String[] args) throws Exception {
28
29 String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
30 FileReader fr = new FileReader(new File(schemaFilePath));
31 BufferedReader br = new BufferedReader(fr);
32 StringBuilder sb = new StringBuilder();
33 String line;
34 while((line = br.readLine()) != null) {
35 sb.append(line).append("\n");
36 }
37 String schemaStr = sb.toString();
38 br.close();
39 fr.close();
40
41 Properties props = new Properties();
42 props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
43 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
44 props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
45
46 Schema.Parser parser = new Schema.Parser();
47 Schema schema = parser.parse(schemaStr);
48 Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
49
50 Producer<String, byte[]> producer = new KafkaProducer<>(props);
51
52 for (int i = 0; i < 100; i++) {
53 GenericData.Record avroRecord = new GenericData.Record(schema);
54 avroRecord.put("id", i);
55 avroRecord.put("name", "name" + i);
56 avroRecord.put("age", 22);
57 byte[] avroRecordBytes = recordInjection.apply(avroRecord);
58 ProducerRecord<String, byte[]> record = new ProducerRecord<>("dev3-yangyunhe-topic001", avroRecordBytes);
59 producer.send(record);
60 Thread.sleep(1000);
61 }
62 producer.close();
63 }
64}
65 |
3. KafkaConsumer 使用 Bijection 类库来反序列化消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| 1
2package com.bonc.rdpe.kafka110.consumer;
3
4import java.io.BufferedReader;
5import java.io.File;
6import java.io.FileReader;
7import java.util.Collections;
8import java.util.Properties;
9
10import org.apache.avro.Schema;
11import org.apache.avro.generic.GenericRecord;
12import org.apache.kafka.clients.consumer.ConsumerRecord;
13import org.apache.kafka.clients.consumer.ConsumerRecords;
14import org.apache.kafka.clients.consumer.KafkaConsumer;
15
16import com.bonc.rdpe.kafka110.producer.BijectionProducer;
17import com.twitter.bijection.Injection;
18import com.twitter.bijection.avro.GenericAvroCodecs;
19
20/**
21 * @Title BijectionConsumer.java
22 * @Description KafkaConsumer 使用 Bijection 类库来反序列化消息
23 * @Author YangYunhe
24 * @Date 2018-06-22 11:10:29
25 */
26public class BijectionConsumer {
27
28 public static void main(String[] args) throws Exception {
29
30 String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
31 FileReader fr = new FileReader(new File(schemaFilePath));
32 BufferedReader br = new BufferedReader(fr);
33 StringBuilder sb = new StringBuilder();
34 String line;
35 while((line = br.readLine()) != null) {
36 sb.append(line).append("\n");
37 }
38 String schemaStr = sb.toString();
39 br.close();
40 fr.close();
41
42 Properties props = new Properties();
43 props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
44 props.put("group.id", "dev3-yangyunhe-group001");
45 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
46 props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
47 KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
48
49 consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
50 Schema.Parser parser = new Schema.Parser();
51 Schema schema = parser.parse(schemaStr);
52 Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
53
54 try {
55 while(true) {
56 ConsumerRecords<String, byte[]> records = consumer.poll(1000);
57 for (ConsumerRecord<String, byte[]> record : records) {
58 GenericRecord genericRecord = recordInjection.invert(record.value()).get();
59 System.out.println("value = [user.id = " + genericRecord.get("id") + ", " +
60 "user.name = " + genericRecord.get("name") + ", " +
61 "user.age = " + genericRecord.get("age") + "], " +
62 "partition = " + record.partition() + ", " +
63 "offset = " + record.offset());
64 }
65 }
66 } finally {
67 consumer.close();
68 }
69 }
70}
71 |
4. 测试结果
先运行 KafkaConsumer,没有输出 当运行 KakfaProducer 后,KakfaConsumer 控制台输出:
1 2 3 4 5 6 7 8 9
| 1
2value = [user.id = 0, user.name = name0, user.age = 22], partition = 2, offset = 662
3value = [user.id = 1, user.name = name1, user.age = 22], partition = 1, offset = 663
4value = [user.id = 2, user.name = name2, user.age = 22], partition = 0, offset = 663
5value = [user.id = 3, user.name = name3, user.age = 22], partition = 2, offset = 663
6value = [user.id = 4, user.name = name4, user.age = 22], partition = 1, offset = 664
7
8......
9 |