Kafka 中使用 Avro 序列化框架(二):使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化

释放双眼,带上耳机,听听看~!

使用传统的 avro API 自定义序列化类和反序列化类比较麻烦,需要根据 schema 生成实体类,需要调用 avro 的 API 实现 对象到 byte[] 和 byte[] 到对象的转化,而那些方法看上去比较繁琐,幸运的是,Twitter 开源的类库 Bijection 对传统的 Avro API 进行了封装了和优化,让我们可以方便的实现以上操作。

1. 添加 Bijection 类库的依赖,并新建一个 schema 文件

Bijection 类库的依赖如下:


1
2
3
4
5
6
7
1
2<dependency>
3    <groupId>com.twitter</groupId>
4    <artifactId>bijection-avro_2.11</artifactId>
5    <version>0.9.6</version>
6</dependency>
7

在 maven 工程的 resources 目录下新建一个 schema 文件,名称为"user.json",因为我们不用 avro 生成实体类的方式,所以定义一个普通的 json 文件来描述 schema 即可,另外,在 json 文件中,也不需要"namespace": "packageName"这个限定生成实体类的包名的参数,本文使用的 json 文件内容如下:


1
2
3
4
5
6
7
8
9
10
11
1
2{
3    "type": "record",
4    "name": "User",
5    "fields": [
6        {"name": "id", "type": "int"},
7        {"name": "name",  "type": "string"},
8        {"name": "age", "type": "int"}
9    ]
10}
11

2. KafkaProducer 使用 Bijection 类库发送序列化后的消息


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
1
2package com.bonc.rdpe.kafka110.producer;
3
4import java.io.BufferedReader;
5import java.io.File;
6import java.io.FileReader;
7import java.util.Properties;
8
9import org.apache.avro.Schema;
10import org.apache.avro.generic.GenericData;
11import org.apache.avro.generic.GenericRecord;
12import org.apache.kafka.clients.producer.KafkaProducer;
13import org.apache.kafka.clients.producer.Producer;
14import org.apache.kafka.clients.producer.ProducerRecord;
15
16import com.twitter.bijection.Injection;
17import com.twitter.bijection.avro.GenericAvroCodecs;
18
19/**
20 * @Title BijectionProducer.java
21 * @Description KafkaProducer 使用 Bijection 类库发送序列化后的消息
22 * @Author YangYunhe
23 * @Date 2018-06-22 10:42:06
24 */
25public class BijectionProducer {
26
27    public static void main(String[] args) throws Exception {
28        
29        String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
30        FileReader fr = new FileReader(new File(schemaFilePath));
31        BufferedReader br = new BufferedReader(fr);
32        StringBuilder sb = new StringBuilder();
33        String line;
34        while((line = br.readLine()) != null) {
35            sb.append(line).append("\n");
36        }
37        String schemaStr = sb.toString();
38        br.close();
39        fr.close();
40        
41        Properties props = new Properties();
42        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
43        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
44        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
45
46        Schema.Parser parser = new Schema.Parser();
47        Schema schema = parser.parse(schemaStr);
48        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
49        
50        Producer<String, byte[]> producer = new KafkaProducer<>(props);
51        
52        for (int i = 0; i < 100; i++) {
53            GenericData.Record avroRecord = new GenericData.Record(schema);
54            avroRecord.put("id", i);
55            avroRecord.put("name", "name" + i);
56            avroRecord.put("age", 22);
57            byte[] avroRecordBytes = recordInjection.apply(avroRecord);
58            ProducerRecord<String, byte[]> record = new ProducerRecord<>("dev3-yangyunhe-topic001", avroRecordBytes);
59            producer.send(record);
60            Thread.sleep(1000);
61        }
62        producer.close();
63    }
64}
65

3. KafkaConsumer 使用 Bijection 类库来反序列化消息


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
1
2package com.bonc.rdpe.kafka110.consumer;
3
4import java.io.BufferedReader;
5import java.io.File;
6import java.io.FileReader;
7import java.util.Collections;
8import java.util.Properties;
9
10import org.apache.avro.Schema;
11import org.apache.avro.generic.GenericRecord;
12import org.apache.kafka.clients.consumer.ConsumerRecord;
13import org.apache.kafka.clients.consumer.ConsumerRecords;
14import org.apache.kafka.clients.consumer.KafkaConsumer;
15
16import com.bonc.rdpe.kafka110.producer.BijectionProducer;
17import com.twitter.bijection.Injection;
18import com.twitter.bijection.avro.GenericAvroCodecs;
19
20/**
21 * @Title BijectionConsumer.java
22 * @Description KafkaConsumer 使用 Bijection 类库来反序列化消息
23 * @Author YangYunhe
24 * @Date 2018-06-22 11:10:29
25 */
26public class BijectionConsumer {
27    
28    public static void main(String[] args) throws Exception {
29        
30        String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
31        FileReader fr = new FileReader(new File(schemaFilePath));
32        BufferedReader br = new BufferedReader(fr);
33        StringBuilder sb = new StringBuilder();
34        String line;
35        while((line = br.readLine()) != null) {
36            sb.append(line).append("\n");
37        }
38        String schemaStr = sb.toString();
39        br.close();
40        fr.close();
41        
42        Properties props = new Properties();
43        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
44        props.put("group.id", "dev3-yangyunhe-group001");
45        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
46        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
47        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
48
49        consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
50        Schema.Parser parser = new Schema.Parser();
51        Schema schema = parser.parse(schemaStr);
52        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
53        
54        try {
55            while(true) {
56                ConsumerRecords<String, byte[]> records = consumer.poll(1000);
57                for (ConsumerRecord<String, byte[]> record : records) {
58                    GenericRecord genericRecord = recordInjection.invert(record.value()).get();
59                    System.out.println("value = [user.id = " + genericRecord.get("id") + ", " +
60                            "user.name = " + genericRecord.get("name") + ", " +
61                            "user.age = " + genericRecord.get("age") + "], " +
62                            "partition = " + record.partition() + ", " +
63                            "offset = " + record.offset());
64                }
65            }
66        } finally {
67            consumer.close();
68        }
69    }
70}
71

4. 测试结果

先运行 KafkaConsumer,没有输出 当运行 KakfaProducer 后,KakfaConsumer 控制台输出:


1
2
3
4
5
6
7
8
9
1
2value = [user.id = 0, user.name = name0, user.age = 22], partition = 2, offset = 662
3value = [user.id = 1, user.name = name1, user.age = 22], partition = 1, offset = 663
4value = [user.id = 2, user.name = name2, user.age = 22], partition = 0, offset = 663
5value = [user.id = 3, user.name = name3, user.age = 22], partition = 2, offset = 663
6value = [user.id = 4, user.name = name4, user.age = 22], partition = 1, offset = 664
7
8......
9

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

病毒疫情

福建省新型冠状病毒肺炎疫情情况

2020-4-10 8:23:00

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索