在分布式游戏服务器系统中,消息处理队列主要解决问题就是解耦系统中的业务,使得每个系统看起来功能比较单一,而且解决一些全服数据共享等问题。
通常我们知道kafka是作为消息队列比较火的一种方式,其实还有(Active MQ,Rabbit MQ,Zero MQ)个人觉得kafka比较好用点,哈哈,习惯吧。
同样我们来复习kafka基础。
kafka基础。
的内容来介绍kafka的基本安装。
首先我们要去kafka官方网站上下载kafka依赖包。https://kafka.apache.org/
下载下来之后的到安装包解压。
当然我编写了一个启动脚本,内容很简单
1
2
3
4 1@echo off
2.\bin\windows\kafka-server-start.bat .\config\server.properties
3
4
运行kafka需要启动zookeeper,所以需要确保机器上有可用的zookeeper。关于zookeeper,可以阅读我上篇文章所写的内容。https://blog.csdn.net/baidu_23086307/article/details/82769234
进入conf目录。使用文本编译器编辑service.properties文件,修改log.dirs熟悉,修改为你自己的路径。
例如我的路径为这个我就修改成这了。
然后保存文件。
然后我们进入bin目录,里面有个Windows目录,我们执行如下命令,
kafka-topics.bat –create –zookeeper 0.0.0.0:2181 –replication-factor 1 –partitions 1 –topic defaultTopic
创建一个topic中心。有关kafka的topic,请查看这个大大的说明。这就不详细说明了。
https://blog.csdn.net/u013256816/article/details/79303825
不要关闭窗口,然后我们回到kafka目录,打开cmd执行命令:
1
2
3
4 1@echo off
2.\bin\windows\kafka-server-start.bat .\config\server.properties
3
4
当然我更喜欢把它编写成一个bat脚本。方便与下次直接使用。然后我们启动kafka服务
大概看到这样的说明kafka启动没有问题。否者启动出现各种问题,需要读者自己排查问题。
整合kafka实现分布式消息队列
万事具备,我们就要将kafka使用到我们的业务场景中来了。在游戏服务器中,我们通常用kafka做一些全局的东西。比如全服聊天,全服活动之类的任务。也可以用于任务系统,解耦每一部分的逻辑。
我们的系统采用netty+spring整合的游戏服务器系统。当然spring也对kafka进行封装了。所以我们在工程中需要添加kafka的依赖。
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1<!--kafka-->
2 <dependency>
3 <groupId>org.springframework.integration</groupId>
4 <artifactId>spring-integration-kafka</artifactId>
5 <version>2.0.0.RELEASE</version>
6 <exclusions>
7 <exclusion>
8 <groupId>log4j</groupId>
9 <artifactId>log4j</artifactId>
10 </exclusion>
11 </exclusions>
12 </dependency>
13
14
添加完maven依赖后我们就可以使用spring提供的kafka进行封装了。首先我们需要一个kafka中心服务器的配置文件。
简单的配置了kafka的基础属性
game-kafka.properties
1
2
3
4
5
6 1kafka.taskThreadSize=4
2kafka.corePoolSize=4
3kafka.maximumPoolSize=4
4kafka.keepAliveTime=5000
5
6
然后我们添加kafka的consumer配置,application-kafka-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 1<!-- 定義consumer的參數 -->
2<bean id="consumerProperties" class="java.util.HashMap">
3 <constructor-arg>
4 <map>
5 <!-- 配置kafka的broke -->
6 <entry key="bootstrap.servers" value="0.0.0.0:9092"/>
7 <!-- 配置組-->
8 <entry key="group.id" value="0"/>
9 <entry key="enable.auto.commit" value="true"/>
10 <entry key="auto.commit.interval.ms" value="1000"/>
11 <entry key="session.timeout.ms" value="30000"/>
12 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
13 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
14 </map>
15 </constructor-arg>
16</bean>
17
18<!-- 創建consumerFactory bean -->
19<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
20 <constructor-arg ref="consumerProperties"/>
21</bean>
22
23<!-- 實際執行消息消費的類 -->
24<bean id="messageListenerConsumerService"
25 class="com.twjitm.core.common.kafka.NettyKafkaConsumerListener"/>
26
27<!-- 消費者容器配置信息 -->
28<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
29 <!-- 重要!配置topic -->
30 <constructor-arg value="defaultTopic"/>
31 <property name="messageListener" ref="messageListenerConsumerService"/>
32</bean>
33
34<!-- 創建kafka template bean,使用的時候,只需要注入這個bean,即可使用template的send消息方法 -->
35<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
36 init-method="doStart">
37 <constructor-arg ref="consumerFactory"/>
38 <constructor-arg ref="containerProperties"/>
39</bean>
40
41
1
2 1 然后在添加一个kafka的producer。application-kafka-producer.xml <?xml version="1.0" encoding="UTF-8"?>
2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 1<!--bootstrap.servers 消費者提供服務器:example:0.0.0.1:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092-->
2<!-- 定義producer的參數 -->
3<bean id="producerProperties" class="java.util.HashMap">
4 <constructor-arg>
5 <map>
6
7 <entry key="bootstrap.servers"
8 value="0.0.0.0:9092"/>
9 <entry key="group.id" value="0"/>
10 <entry key="retries" value="1"/>
11 <entry key="batch.size" value="16384"/>
12 <entry key="linger.ms" value="1"/>
13 <entry key="buffer.memory" value="33554432"/>
14 <!--序列化方式-->
15 <entry key="key.serializer"
16 value="org.apache.kafka.common.serialization.StringSerializer"/>
17 <entry key="value.serializer"
18 value="org.apache.kafka.common.serialization.StringSerializer"/>
19 </map>
20
21 </constructor-arg>
22</bean>
23
24<!-- 創建kafka template需要使用的producer factory bean -->
25<bean id="producerFactory"
26 class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
27 <constructor-arg ref="producerProperties"/>
28</bean>
29
30<!-- 創建kafka template bean,使用的時候,只需要注入這個bean,即可使用template的send消息方法 -->
31<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
32 <constructor-arg ref="producerFactory"/>
33 <constructor-arg name="autoFlush" value="true"/>
34 <property name="defaultTopic" value="defaultTopic"/>
35 <!--<property name="producerListener" ref="producerListener"/>-->
36</bean>
37
38
1
2 1 具体详细配置正如代码中的注释一样,配置好kafka的基础属性,整合到spring 的bean对象中,使用spring容器来管理这些bean对象。省去了对象管理的工作。
2
程序实现:
我们需要定义一个kafka抽象任务,每个任务其实就是想kafka里面发送消息。属于producer范畴。AbstractKafkaPushTask .java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1package com.twjitm.core.common.kafka;
2
3/**
4 * @author twjitm - [Created on 2018-09-04 21:31]
5 */
6public abstract class AbstractKafkaPushTask {
7 private KafkaTaskType taskType;
8 public Object value;
9
10 public AbstractKafkaPushTask(KafkaTaskType taskType) {
11 this.taskType = taskType;
12 }
13
14 public KafkaTaskType getTaskType() {
15 return taskType;
16 }
17
18
19 public Object getValue() {
20 return value;
21 }
22
23 /**
24 * 需要將消息值保存到這裏面,值如何獲得由子類自己實心
25 *
26 * @param value
27 */
28 public abstract void setValue(Object value);
29}
30
31kafka消息监听器:消息消费者.
32
33package com.twjitm.core.common.kafka;
34
35import org.apache.kafka.clients.consumer.ConsumerRecord;
36import org.springframework.kafka.listener.MessageListener;
37
38/**
39 * kafka 消息消費者
40 *
41 * @author twjitm- [Created on 2018-09-04 16:12]
42 */
43public class NettyKafkaConsumerListener implements MessageListener<String, String> {
44 @Override
45 public void onMessage(ConsumerRecord<String, String> consumerRecord) {
46
47 System.out.println(consumerRecord);
48 consumerRecord.value();
49 }
50}
51
52
在这我们对消息做简单的处理,但是在实际项目开发中我们还得更具消息的具体类型做不同的业务处理,因此我们可以将消息的具体应用做出自己业务逻辑处理。
消息监听器继承MessageListener,而spring对这个接口做了处理,所以能够监听到消息的到来。
有了消息消费者,我们需要编写一个消息提供者,消息提供者负责将消息发布到kafka消息队列中。实现代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 1package com.twjitm.core.common.kafka;
2
3import com.alibaba.fastjson.JSON;
4import com.twjitm.core.common.config.global.GlobalConstants;
5import com.twjitm.core.common.config.global.KafkaConfig;
6import com.twjitm.core.common.service.IService;
7import com.twjitm.core.spring.SpringServiceManager;
8import com.twjitm.threads.thread.NettyThreadNameFactory;
9import org.slf4j.Logger;
10import org.slf4j.LoggerFactory;
11import org.springframework.kafka.core.KafkaTemplate;
12import org.springframework.kafka.support.SendResult;
13import org.springframework.stereotype.Service;
14import org.springframework.util.concurrent.ListenableFuture;
15import org.springframework.util.concurrent.ListenableFutureCallback;
16
17import javax.annotation.Resource;
18import java.util.concurrent.*;
19
20/**
21 * kafka 消息提供者
22 *
23 * @author twjitm- [Created on 2018-09-04 16:17]
24 */
25@Service
26public class NettyKafkaProducerListener implements IService {
27 private Logger logger = LoggerFactory.getLogger(NettyKafkaProducerListener.class);
28 @Resource
29 private KafkaTemplate<String, String> kafkaTemplate;
30
31 /**
32 * 消息處理線程池
33 */
34 private volatile ExecutorService executorService;
35
36 private boolean run = true;
37 /**
38 * 消息隊列
39 */
40 private BlockingQueue<AbstractKafkaPushTask> queue;
41
42 /**
43 * 將消息發送給kafka中心
44 *
45 * @param abstractKafkaPushTask
46 */
47 private void sendMessage(AbstractKafkaPushTask abstractKafkaPushTask) {
48 String type = abstractKafkaPushTask.getTaskType().getTypeName();
49 String value = JSON.toJSONString(abstractKafkaPushTask.getValue());
50 //本身send方法就是一個異步執行方法
51 ListenableFuture<SendResult<String, String>> result =
52 kafkaTemplate.sendDefault(type, value);
53 /**
54 * 添加回調監聽
55 */
56 result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
57 @Override
58 public void onFailure(Throwable throwable) {
59 logger.error("KAFKA MESSAGE SEND FAIL", throwable);
60 }
61
62 @Override
63 public void onSuccess(SendResult<String, String> kafkaTaskTypeObjectSendResult) {
64 logger.info("KAFKA MESSAGE SEND SUCCESS");
65 }
66 });
67
68
69 }
70
71 /**
72 * 將任務存放到隊列中
73 *
74 * @param task
75 */
76 public void put(AbstractKafkaPushTask task) {
77 try {
78 queue.put(task);
79 } catch (InterruptedException e) {
80 e.printStackTrace();
81 }
82 }
83
84 @Override
85 public String getId() {
86 return NettyKafkaProducerListener.class.getSimpleName();
87 }
88
89 @Override
90 public void startup() throws Exception {
91 run = true;
92 queue = new LinkedBlockingQueue<>();
93 NettyThreadNameFactory factory = new NettyThreadNameFactory(GlobalConstants.Thread.GAME_KAFKA_TASK_EXECUTOR);
94 //開啓線程數量
95 KafkaConfig kafkaConfig = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService().getKafkaConfig();
96 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(kafkaConfig.getCorePoolSize(), kafkaConfig.getMaximumPoolSize(), kafkaConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory);
97 executorService = poolExecutor;
98 for (int i = 0; i < 4; i++) {
99 executorService.execute(new Worker());
100 }
101 }
102
103 @Override
104 public void shutdown() throws Exception {
105 logger.info("STOP KAFKA EXECUTOR");
106 run = false;
107 executorService.shutdown();
108 }
109
110 private class Worker implements Runnable {
111
112 @Override
113 public void run() {
114 try {
115 while (run) {
116 sendMessage(queue.take());
117 }
118 } catch (InterruptedException e) {
119 e.printStackTrace();
120 }
121 }
122 }
123
124
125}
126
127
封装看一个消息本地消息队列,将上层调用过来的消息存放到队列中,再有队列tack到kafka中。这样保证每个进程进来的消息都是有顺序的。
测试:
使用spring整合kafka使用起来说是特别方便的。说以我们编写一个测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1package com.twjitm.kafka;
2
3import com.twjitm.TestSpring;
4import com.twjitm.core.common.kafka.KafkaTaskType;
5import com.twjitm.core.common.kafka.NettyKafkaProducerListener;
6
7import javax.annotation.Resource;
8
9/**
10 * @author twjitm- [Created on 2018-09-05 12:25]
11 */
12
13public class TestKafka {
14 @Resource
15 static NettyKafkaProducerListener nettyKafkaProducerListener;
16
17 public static void main(String[] args) {
18 TestSpring.initSpring();
19 test();
20 }
21
22 public static void test() {
23 WordChatTask task = new WordChatTask(KafkaTaskType.WORLD_CHAT);
24 task.setValue("hello,world");
25 nettyKafkaProducerListener.put(task);
26 }
27
28}
29
30
创建线程成功:
发送消息成功:
接收消息成功:
相关代码已经提交到github,欢迎各位大大star