Spring Boot 2.0.4 集成 Kafka 2.0.0。
项目源码地址:https://gitee.com/lilyssh/high-concurrency
一、简介
kafka是一种高吞吐量的分布式发布订阅消息系统。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
Kafka的安装请参考文章:Kafka的安装与使用。
二、使用方法
(1)添加依赖
1
2
3
4
5
6
7 1<dependency>
2 <groupId>org.springframework.kafka</groupId>
3 <artifactId>spring-kafka</artifactId>
4 <version>2.1.10.RELEASE</version>
5</dependency>
6
7
(2)在application.yml中添加配置
官方文档说只要配置两个必要项就可以了,spring.kafka.consumer.group-id和spring.kafka.consumer.auto-offset-reset。此处对其他配置稍作解释。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 1spring:
2 kafka:
3 # 指定kafka代理地址,brokers集群。
4 bootstrap-servers: ssh.qianxunclub.com:9092
5 producer:
6 # 发送失败重试次数。
7 retries: 0
8 # 每次批量发送消息的数量 批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能。
9 batch-size: 16384
10 # 32MB的批处理缓冲区。
11 buffer-memory: 33554432
12 # 指定消息key和消息体的编解码方式。
13 key-serializer: org.apache.kafka.common.serialization.StringSerializer
14 value-serializer: org.apache.kafka.common.serialization.StringSerializer
15 consumer:
16 # 消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一群组内的消费者只有一个能消费到消息。
17 group-id: kafka_order_group
18 auto-offset-reset: earliest
19 # 如果为true,消费者的偏移量将在后台定期提交。
20 enable-auto-commit: true
21 # 自动提交周期
22 auto-commit-interval: 100
23 # 指定消息key和消息体的编解码方式。
24 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
25 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
26
27
(3)消息发送类
1
2
3 1package cn.lilyssh.common.kafka.provider;
2
3
import com
.google
.gson
.Gson
;
import com
.google
.gson
.GsonBuilder
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.stereotype
.Component
;
import org
.springframework
.kafka
.core
.KafkaTemplate
;
@Component
@Slf4j
public
class
KafkaSender
{
@Autowired
private KafkaTemplate
<String
,String
kafkaTemplate
;
private Gson gson
new
GsonBuilder
(
)
.
create
(
)
;
//发送消息方法
public
void
send
(String topic
,String key
,Object message
)
{
kafkaTemplate
.
send
(topic
,key
,gson
.
toJson
(message
)
)
;
log
.
info
(
"+++++++++++++++++++++ message = {}"
, gson
.
toJson
(message
)
)
;
}
}
此处关键代码为kafkaTemplate.send(),参数topic是Kafka里的topic,这个topic在 Java程序中是不需要提前在Kafka中设置的,因为它会在发送的时候自动创建你设置的topic, gson.toJson(message)是消息内容。
(4)在下单业务中调用消息发送
1
2
3 1package cn.lilyssh.order.provider.service;
2
3
import cn
.lilyssh
.common
.kafka
.provider
.KafkaSender
;
import cn
.lilyssh
.order
.api
.model
.request
.OrderInsertReq
;
import cn
.lilyssh
.order
.api
.service
.OrderServiceApi
;
import cn
.lilyssh
.order
.provider
.dao
.entity
.OrderEntity
;
import com
.alibaba
.dubbo
.config
.annotation
.Service
;
import lombok
.AllArgsConstructor
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.beans
.BeanUtils
;
import org
.springframework
.stereotype
.Component
;
import org
.springframework
.util
.StringUtils
;
import java
.math
.BigDecimal
;
import java
.util
.*
;
@Slf4j
@Service
@Component
@AllArgsConstructor
public
class
OrderService
implements
OrderServiceApi
{
private KafkaSender kafkaSender
;
/**
* 保存到kafka
* @param orderInsertReq
* @return
*/
@Override
public
void
saveByKafka
(OrderInsertReq orderInsertReq
)
{
OrderEntity orderEntity
new
OrderEntity
(
)
;
//直接写入数据库太慢,引起dubbo超时,导致调用多次,此处需要改造成kafka异步写入。
BeanUtils
.
copyProperties
(orderInsertReq
,orderEntity
)
;
kafkaSender
.
send
(
“placeOrder”
, orderEntity
.
getUserId
(
)
.
toString
(
)
, orderEntity
)
;
}
}
(5)消息接收类
1
2
3 1package cn.lilyssh.order.provider.kafka.consumer;
2
3
import cn
.lilyssh
.order
.provider
.dao
.entity
.OrderEntity
;
import cn
.lilyssh
.order
.provider
.dao
.repository
.OrderRepository
;
import com
.google
.gson
.Gson
;
import lombok
.AllArgsConstructor
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.apache
.kafka
.clients
.consumer
.ConsumerRecord
;
import org
.springframework
.kafka
.annotation
.KafkaListener
;
import org
.springframework
.stereotype
.Component
;
import java
.util
.Optional
;
@Component
@Slf4j
@AllArgsConstructor
public
class
KafkaReceiver
{
1
2
3
4
5
6
7
8
9
10
11
12
13 1<span class="token keyword">private</span> OrderRepository orderRepository<span class="token punctuation">;</span>
2<span class="token comment">/**
3 * 监听下单
4 */</span>
5<span class="token annotation punctuation">@KafkaListener</span><span class="token punctuation">(</span>topics <span class="token operator">=</span> <span class="token punctuation">{</span><span class="token string">"placeOrder"</span><span class="token punctuation">}</span><span class="token punctuation">)</span>
6<span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">listen</span><span class="token punctuation">(</span>String orderEntityStr<span class="token punctuation">)</span> <span class="token punctuation">{</span>
7 log<span class="token punctuation">.</span><span class="token function">info</span><span class="token punctuation">(</span><span class="token string">"------------------ orderEntityStr ="</span> <span class="token operator">+</span> orderEntityStr<span class="token punctuation">)</span><span class="token punctuation">;</span>
8 Gson gs <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Gson</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
9 OrderEntity orderEntity <span class="token operator">=</span> gs<span class="token punctuation">.</span><span class="token function">fromJson</span><span class="token punctuation">(</span>orderEntityStr<span class="token punctuation">,</span>OrderEntity<span class="token punctuation">.</span><span class="token keyword">class</span><span class="token punctuation">)</span><span class="token punctuation">;</span><span class="token comment">//把JSON字符串转为对象</span>
10 orderRepository<span class="token punctuation">.</span><span class="token function">save</span><span class="token punctuation">(</span>orderEntity<span class="token punctuation">)</span><span class="token punctuation">;</span>
11<span class="token punctuation">}</span>
12
13
}
接收消息直接用@KafkaListener注解即可,并在监听中设置监听的topic,topics是一个数组所以是可以绑定多个主题的,如@KafkaListener(topics = {“topicA”,“topicB”})。这里的topic需要和消息发送类 KafkaSender.java中设置的topic一致。
spring.kafka.bootstrap-servers 后面设置你安装的Kafka的机器IP地址和端口号9092。
三、启动Kafka服务
1
2
3 1bin/kafka-server-start.sh config/server.properties
2
3
千万注意: 记得将你的虚拟机或者服务器关闭防火墙或者开启Kafka的端口9092。
四、测试
启动order-provider,调用下单接口,可以看到下单成功。
我们来看下Kafka中的topic列表:
1
2
3 1bin/kafka-topics.sh --list --zookeeper localhost:2181
2
3
会看到:
1
2
3
4 1__consumer_offsets
2placeOrder
3
4
接下来,我们来测试下kafka的消费能力。
我们把 OrderService 改造一下:
1
2
3 1package cn.lilyssh.order.provider.service;
2
3
import cn
.lilyssh
.common
.kafka
.provider
.KafkaSender
;
import cn
.lilyssh
.order
.api
.model
.request
.OrderInsertReq
;
import cn
.lilyssh
.order
.api
.service
.OrderServiceApi
;
import cn
.lilyssh
.order
.provider
.dao
.entity
.OrderEntity
;
import com
.alibaba
.dubbo
.config
.annotation
.Service
;
import lombok
.AllArgsConstructor
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.beans
.BeanUtils
;
import org
.springframework
.stereotype
.Component
;
import org
.springframework
.util
.StringUtils
;
import java
.math
.BigDecimal
;
import java
.util
.*
;
@Slf4j
@Service
@Component
@AllArgsConstructor
public
class
OrderService
implements
OrderServiceApi
{
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 1<span class="token keyword">private</span> KafkaSender kafkaSender<span class="token punctuation">;</span>
2
3<span class="token comment">/**
4 * 每两秒,新建五百个下单线程,一分钟后停止,查看kafka每秒是否能消费250条数据。
5 */</span>
6<span class="token annotation punctuation">@Override</span>
7<span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">saveByKafka</span><span class="token punctuation">(</span>OrderInsertReq orderInsertReq<span class="token punctuation">)</span><span class="token punctuation">{</span>
8 OrderEntity orderEntity<span class="token operator">=</span><span class="token keyword">new</span> <span class="token class-name">OrderEntity</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
9 <span class="token comment">//直接写入数据库太慢,引起dubbo超时,导致调用多次,此处需要改造成kafka异步写入。</span>
10 BeanUtils<span class="token punctuation">.</span><span class="token function">copyProperties</span><span class="token punctuation">(</span>orderInsertReq<span class="token punctuation">,</span>orderEntity<span class="token punctuation">)</span><span class="token punctuation">;</span>
11 System<span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">"预备备!开始!"</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
12 Timer timer <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Timer</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
13 timer<span class="token punctuation">.</span><span class="token function">schedule</span><span class="token punctuation">(</span><span class="token keyword">new</span> <span class="token class-name">MyTask</span><span class="token punctuation">(</span>timer<span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token number">0</span><span class="token punctuation">,</span> <span class="token number">2000</span><span class="token punctuation">)</span><span class="token punctuation">;</span> <span class="token comment">//任务等待0秒后开始执行,之后每2秒执行一次</span>
14<span class="token punctuation">}</span>
15<span class="token comment">//任务:每次新建五百个下单线程。</span>
16<span class="token keyword">class</span> <span class="token class-name">MyTask</span> <span class="token keyword">extends</span> <span class="token class-name">TimerTask</span> <span class="token punctuation">{</span>
17 <span class="token keyword">private</span> Timer timer<span class="token punctuation">;</span>
18 <span class="token keyword">public</span> <span class="token function">MyTask</span><span class="token punctuation">(</span>Timer timer<span class="token punctuation">)</span> <span class="token punctuation">{</span>
19 <span class="token keyword">this</span><span class="token punctuation">.</span>timer <span class="token operator">=</span> timer<span class="token punctuation">;</span>
20 <span class="token punctuation">}</span>
21
22 <span class="token keyword">int</span> second <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span>
23 <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">run</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
24 System<span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">"~~~第"</span><span class="token operator">+</span>second<span class="token operator">+</span><span class="token string">"秒~~~"</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
25 <span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">int</span> i <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span> i <span class="token operator"><</span> <span class="token number">500</span><span class="token punctuation">;</span> i<span class="token operator">++</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
26 AddOrder addOrder<span class="token operator">=</span><span class="token keyword">new</span> <span class="token class-name">AddOrder</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
27 Thread thread<span class="token operator">=</span><span class="token keyword">new</span> <span class="token class-name">Thread</span><span class="token punctuation">(</span>addOrder<span class="token punctuation">)</span><span class="token punctuation">;</span>
28 thread<span class="token punctuation">.</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
29 <span class="token punctuation">}</span>
30 second<span class="token operator">++</span><span class="token punctuation">;</span>
31 <span class="token keyword">if</span><span class="token punctuation">(</span> second <span class="token operator">==</span> <span class="token number">30</span><span class="token punctuation">)</span><span class="token punctuation">{</span>
32 <span class="token keyword">this</span><span class="token punctuation">.</span>timer<span class="token punctuation">.</span><span class="token function">cancel</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
33 System<span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">"#### 程序结束 ####"</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
34 <span class="token punctuation">}</span>
35 <span class="token punctuation">}</span>
36<span class="token punctuation">}</span>
37<span class="token comment">//下单线程</span>
38<span class="token keyword">class</span> <span class="token class-name">AddOrder</span> <span class="token keyword">implements</span> <span class="token class-name">Runnable</span><span class="token punctuation">{</span>
39 <span class="token annotation punctuation">@Override</span>
40 <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">run</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
41 OrderEntity orderEntity <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">OrderEntity</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
42 orderEntity<span class="token punctuation">.</span><span class="token function">setUserId</span><span class="token punctuation">(</span><span class="token number">753</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
43 orderEntity<span class="token punctuation">.</span><span class="token function">setPayment</span><span class="token punctuation">(</span><span class="token keyword">new</span> <span class="token class-name">BigDecimal</span><span class="token punctuation">(</span><span class="token number">928.23</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
44 orderEntity<span class="token punctuation">.</span><span class="token function">setCreateTime</span><span class="token punctuation">(</span><span class="token keyword">new</span> <span class="token class-name">Date</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
45 kafkaSender<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token string">"placeOrder"</span><span class="token punctuation">,</span> orderEntity<span class="token punctuation">.</span><span class="token function">getUserId</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">toString</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> orderEntity<span class="token punctuation">)</span><span class="token punctuation">;</span>
46 <span class="token punctuation">}</span>
47<span class="token punctuation">}</span>
48
49
}