Kafka 中使用 Avro 序列化组件(三):Confluent Schema Registry

释放双眼,带上耳机,听听看~!

1. schema 注册表

无论是使用传统的Avro API自定义序列化类和反序列化类还是使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了schema,这会让记录的大小成倍地增加。但是不管怎样,在读取记录时仍然需要用到整个 schema,所以要先找到 schema。有没有什么方法可以让数据共用一个schema?

我们遵循通用的结构模式并使用"schema注册表"来达到目的。"schema注册表"的原理如下:

把所有写入数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema 的 ID。负责读取数据的应用程序使用 ID 从注册表里拉取 schema 来反序列化记录。序列化器和反序列化器分别负责处理 schema 的注册和拉取。

schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现。比如本文要讨论的Confluent Schema Registry。

2. 案例说明

现有 schema 文件 user.json,其中内容如下:


1
2
3
4
5
6
7
8
9
10
11
1
2{
3    "type": "record",
4    "name": "User",
5    "fields": [
6        {"name": "id", "type": "int"},
7        {"name": "name",  "type": "string"},
8        {"name": "age", "type": "int"}
9    ]
10}
11

需求:把这个 schema 中的内容注册到 Confluent Schema Registry 中,Kafka Producer 和 Kafka Consumer 通过识别 Confluent Schema Registry 中的 schema 内容来序列化和反序列化。

3. 实操步骤

(1) 启动 Confluent Schema Registry 服务


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1
2# Confluent Schema Registry 服务的访问IP和端口
3listeners=http://192.168.42.89:8081
4
5# Kafka集群所使用的zookeeper地址,如果不配置,会使用Confluent内置的Zookeeper地址(localhost:2181)
6kafkastore.connection.url=192.168.42.89:2181/kafka-1.1.0-cluster
7
8# Kafka集群的地址(上一个参数和这个参数配置一个就可以了)
9# kafkastore.bootstrap.servers=192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094
10
11# 存储 schema 的 topic
12kafkastore.topic=_schemas
13
14# 其余保持默认即可
15
  • 启动 Confluent Schema Registry


1
2
3
4
5
1
2[root@confluent confluent-4.1.1]# bin/schema-registry-start etc/schema-registry/schema-registry.properties
3# 省略一些内容......
4[2018-06-22 16:10:26,442] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)
5

(2) 注册 User 的 schema 注册到对应的 topic 下

  • 首先把原来的 schema 文件加上 "schema" 标记


1
2
3
4
5
6
7
8
9
10
11
12
13
1
2{
3    "schema": "{
4        "type": "record",
5        "name": "User",
6        "fields": [
7            {"name": "id", "type": "int"},
8            {"name": "name",  "type": "string"},
9            {"name": "age", "type": "int"}
10        ]
11    }"
12}
13
  • 部分"需要转义:


1
2
3
4
5
6
7
8
9
10
11
12
13
1
2{
3    "schema": "{
4        \"type\": \"record\",
5        \"name\": \"User\",
6        \"fields\": [
7            {\"name\": \"id\", \"type\": \"int\"},
8            {\"name\": \"name\",  \"type\": \"string\"},
9            {\"name\": \"age\", \"type\": \"int\"}
10        ]
11    }"
12}
13
  • 注册 schema 的命令如下


1
2
3
4
5
1
2curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
3--data '' \
4http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
5

说明: <1> ''之间需要填写schema字符串 <2> 我用来测试的 topic 为 dev3-yangyunhe-topic001,而且我只对 Kafka 的 value 进行 avro 的序列化,所以注册的地址为http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions <3> http://192.168.42.89:8081需要根据自己的配置进行修改

  • 把转义后 schema 填充到 –data ''的两个单引号中


1
2
3
4
5
1
2curl -X POST -H &quot;Content-Type: application/vnd.schemaregistry.v1+json&quot; \
3--data &#x27;{&quot;schema&quot;: &quot;{\&quot;type\&quot;: \&quot;record\&quot;, \&quot;name\&quot;: \&quot;User\&quot;, \&quot;fields\&quot;: [{\&quot;name\&quot;: \&quot;id\&quot;, \&quot;type\&quot;: \&quot;int\&quot;}, {\&quot;name\&quot;: \&quot;name\&quot;,  \&quot;type\&quot;: \&quot;string\&quot;}, {\&quot;name\&quot;: \&quot;age\&quot;, \&quot;type\&quot;: \&quot;int\&quot;}]}&quot;}&#x27; \
4http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
5
  • 注册成功会返回这个 schema 的 ID


1
2
3
1
2{&quot;id&quot;:102}
3

(3) 在 maven 工程中引入 Confluent Schema Registry 相关的 jar 包

这些 jar 包在 maven 仓库中下载不到,需要自己手动添加到集群中,confluent-4.1.1 解压后,其 share/java/目录下有 confluent 各个组件的 jar 包:

我们需要 confluent-common 目录下的common-config-4.1.1.jar、common-utils-4.1.1.jar和全部以jackson开头的 jar 包以及 kafka-serde-tools 目录下的kafka-schema-registry-client-4.1.1.jar和kafka-avro-serializer-4.1.1.jar,关于如何添加本地的 jar 包到 java 工程中,本文不再赘述。

(4) Kafka Producer 发送数据


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
1
2package com.bonc.rdpe.kafka110.producer;
3
4import java.util.Properties;
5import java.util.Random;
6
7import org.apache.avro.Schema;
8import org.apache.avro.generic.GenericData;
9import org.apache.avro.generic.GenericRecord;
10import org.apache.kafka.clients.producer.KafkaProducer;
11import org.apache.kafka.clients.producer.Producer;
12import org.apache.kafka.clients.producer.ProducerRecord;
13
14/**
15 * @Title ConfluentProducer.java
16 * @Description 使用Confluent实现的Schema Registry服务来发送Avro序列化后的对象
17 * @Author YangYunhe
18 * @Date 2018-06-25 10:49:19
19 */
20public class ConfluentProducer {
21    
22    public static final String USER_SCHEMA = &quot;{\&quot;type\&quot;: \&quot;record\&quot;, \&quot;name\&quot;: \&quot;User\&quot;, &quot; +
23            &quot;\&quot;fields\&quot;: [{\&quot;name\&quot;: \&quot;id\&quot;, \&quot;type\&quot;: \&quot;int\&quot;}, &quot; +
24            &quot;{\&quot;name\&quot;: \&quot;name\&quot;,  \&quot;type\&quot;: \&quot;string\&quot;}, {\&quot;name\&quot;: \&quot;age\&quot;, \&quot;type\&quot;: \&quot;int\&quot;}]}&quot;;
25    
26    public static void main(String[] args) throws Exception {
27        
28        Properties props = new Properties();
29        props.put(&quot;bootstrap.servers&quot;, &quot;192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094&quot;);
30        props.put(&quot;key.serializer&quot;, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);
31        // 使用Confluent实现的KafkaAvroSerializer
32        props.put(&quot;value.serializer&quot;, &quot;io.confluent.kafka.serializers.KafkaAvroSerializer&quot;);
33        // 添加schema服务的地址,用于获取schema
34        props.put(&quot;schema.registry.url&quot;, &quot;http://192.168.42.89:8081&quot;);
35
36        Producer&lt;String, GenericRecord&gt; producer = new KafkaProducer&lt;&gt;(props);
37        
38        Schema.Parser parser = new Schema.Parser();
39        Schema schema = parser.parse(USER_SCHEMA);
40        
41        Random rand = new Random();
42        int id = 0;
43
44        while(id &lt; 100) {
45            id++;
46            String name = &quot;name&quot; + id;
47            int age = rand.nextInt(40) + 1;
48            GenericRecord user = new GenericData.Record(schema);
49            user.put(&quot;id&quot;, id);
50            user.put(&quot;name&quot;, name);
51            user.put(&quot;age&quot;, age);
52            
53            ProducerRecord&lt;String, GenericRecord&gt; record = new ProducerRecord&lt;&gt;(&quot;dev3-yangyunhe-topic001&quot;, user);
54            
55            producer.send(record);
56            Thread.sleep(1000);
57        }
58
59        producer.close();
60
61    }
62
63}
64

(5) Kafka Consumer 消费数据


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1
2package com.bonc.rdpe.kafka110.consumer;
3
4import java.util.Collections;
5import java.util.Properties;
6
7import org.apache.avro.generic.GenericRecord;
8import org.apache.kafka.clients.consumer.ConsumerRecord;
9import org.apache.kafka.clients.consumer.ConsumerRecords;
10import org.apache.kafka.clients.consumer.KafkaConsumer;
11
12/**
13 * @Title ConfluentConsumer.java
14 * @Description 使用Confluent实现的Schema Registry服务来消费Avro序列化后的对象
15 * @Author YangYunhe
16 * @Date 2018-06-25 11:42:21
17 */
18public class ConfluentConsumer {
19
20    public static void main(String[] args) throws Exception {
21
22        Properties props = new Properties();
23        props.put(&quot;bootstrap.servers&quot;, &quot;192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094&quot;);
24        props.put(&quot;group.id&quot;, &quot;dev3-yangyunhe-group001&quot;);
25        props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
26        // 使用Confluent实现的KafkaAvroDeserializer
27        props.put(&quot;value.deserializer&quot;, &quot;io.confluent.kafka.serializers.KafkaAvroDeserializer&quot;);
28        // 添加schema服务的地址,用于获取schema
29        props.put(&quot;schema.registry.url&quot;, &quot;http://192.168.42.89:8081&quot;);
30        KafkaConsumer&lt;String, GenericRecord&gt; consumer = new KafkaConsumer&lt;&gt;(props);
31
32        consumer.subscribe(Collections.singletonList(&quot;dev3-yangyunhe-topic001&quot;));
33
34        try {
35            while (true) {
36                ConsumerRecords&lt;String, GenericRecord&gt; records = consumer.poll(1000);
37                for (ConsumerRecord&lt;String, GenericRecord&gt; record : records) {
38                    GenericRecord user = record.value();
39                    System.out.println(&quot;value = [user.id = &quot; + user.get(&quot;id&quot;) + &quot;, &quot; + &quot;user.name = &quot;
40                            + user.get(&quot;name&quot;) + &quot;, &quot; + &quot;user.age = &quot; + user.get(&quot;age&quot;) + &quot;], &quot;
41                            + &quot;partition = &quot; + record.partition() + &quot;, &quot; + &quot;offset = &quot; + record.offset());
42                }
43            }
44        } finally {
45            consumer.close();
46        }
47    }
48}
49

(6) 测试结果

Kafka Consumer 的控制台输出内容如下:


1
2
3
4
5
6
7
8
9
1
2value = [user.id = 1, user.name = name1, user.age = 20], partition = 1, offset = 696
3value = [user.id = 2, user.name = name2, user.age = 27], partition = 0, offset = 696
4value = [user.id = 3, user.name = name3, user.age = 35], partition = 2, offset = 695
5value = [user.id = 4, user.name = name4, user.age = 7], partition = 1, offset = 697
6value = [user.id = 5, user.name = name5, user.age = 34], partition = 0, offset = 697
7
8......
9

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全漏洞

预警Apache Solr 多个中高危漏洞(CVE-2021-27905等)

2021-2-26 11:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索