项目目录如下
启动类如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1import org.eclipse.paho.client.mqttv3.MqttException;
2import org.springframework.boot.SpringApplication;
3import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
4import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6@EnableAutoConfiguration(exclude = {
7 org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration.class
8})
9@SpringBootApplication
10public class MqttApplication {
11
12 public static void main(String[] args) throws MqttException {
13 SpringApplication.run(MqttApplication.class, args);
14 //订阅主题,之后控制台打印消息。证明整合成功
15 // MqttPushClient.getInstance().subscribe("topic1");
16 }
17}
18
19
20
MqttController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 1package com.example.mqtt.mqtt;
2
3import org.eclipse.paho.client.mqttv3.MqttException;
4import org.springframework.http.HttpStatus;
5import org.springframework.http.ResponseEntity;
6import org.springframework.web.bind.annotation.GetMapping;
7import org.springframework.web.bind.annotation.RequestParam;
8import org.springframework.web.bind.annotation.ResponseBody;
9import org.springframework.web.bind.annotation.RestController;
10
11/**
12 * MQTT消息发送
13 */
14@RestController
15public class MqttController {
16
17
18 /**
19 * 发送MQTT消息
20 *
21 * @param message 消息内容
22 * @return 返回
23 */
24
25 @ResponseBody
26 @GetMapping(value = "/mqtt")
27 public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) throws MqttException {
28 String kdTopic = "topic1";
29 MqttPushClient.getInstance().publish(kdTopic, "稍微来点鸡血");
30 return new ResponseEntity<>("OK", HttpStatus.OK);
31 }
32}
33
34
35
MqttPushClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 1import lombok.extern.slf4j.Slf4j;
2import org.apache.commons.lang3.StringUtils;
3import org.eclipse.paho.client.mqttv3.*;
4import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
5import org.springframework.beans.factory.annotation.Autowired;
6import org.springframework.context.annotation.Bean;
7import org.springframework.context.annotation.Configuration;
8
9
10@Slf4j
11@Configuration
12public class MqttPushClient {
13
14 private String url = "tcp://127.0.0.1:61613";
15 private String clientId = "mqttProducer";
16
17 @Autowired
18 MqttConfig config;
19 private static final byte[] WILL_DATA;
20
21 static {
22 WILL_DATA = "offline".getBytes();
23 }
24
25 private static volatile MqttPushClient mqttPushClient = null;
26
27 @Bean
28 public static MqttPushClient getInstance() throws MqttException {
29
30 if (null == mqttPushClient) {
31 synchronized (MqttPushClient.class) {
32 if (null == mqttPushClient) {
33 mqttPushClient = new MqttPushClient();
34 }
35 }
36
37 }
38 return mqttPushClient;
39 }
40
41
42 public MqttPushClient() throws MqttException {
43 connect();
44 }
45
46 private MqttClient client;
47
48 private void connect() throws MqttException {
49 try {
50 client = new MqttClient(url, clientId, new MemoryPersistence());
51 MqttConnectOptions options = new MqttConnectOptions();
52 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
53 // 这里设置为true表示每次连接到服务器都以新的身份连接
54 options.setCleanSession(true);
55 // 设置连接的用户名
56 options.setUserName("admin");
57 // 设置连接的密码
58 options.setPassword("password".toCharArray());
59 options.setServerURIs(StringUtils.split("tcp://127.0.0.1:61613", ","));
60 // 设置超时时间 单位为秒
61 options.setConnectionTimeout(100);
62 // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
63 options.setKeepAliveInterval(20);
64 // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
65 options.setWill("willTopic", WILL_DATA, 2, false);
66 client.setCallback(new PushCallback());
67 client.connect(options);
68 } catch (MqttException e) {
69 e.printStackTrace();
70 }
71 }
72
73 /**
74 * 发布,默认qos为0,非持久化
75 *
76 * @param topic
77 * @param pushMessage
78 */
79 public void publish(String topic, String pushMessage) {
80 publish(0, false, topic, pushMessage);
81 }
82
83 /**
84 * 发布
85 *
86 * @param qos
87 * @param retained
88 * @param topic
89 * @param pushMessage
90 */
91 public void publish(int qos, boolean retained, String topic, String pushMessage) {
92 MqttMessage message = new MqttMessage();
93 message.setQos(qos);
94 message.setRetained(retained);
95 message.setPayload(pushMessage.getBytes());
96 MqttTopic mTopic = client.getTopic(topic);
97 if (null == mTopic) {
98 log.error("topic not exist");
99 }
100 MqttDeliveryToken token;
101 try {
102 token = mTopic.publish(message);
103 token.waitForCompletion();
104 } catch (MqttPersistenceException e) {
105 e.printStackTrace();
106 } catch (MqttException e) {
107 e.printStackTrace();
108 }
109 }
110
111 /**
112 * 订阅某个主题,qos默认为0
113 *
114 * @param topic
115 */
116 public void subscribe(String topic) {
117 subscribe(topic, 0);
118 }
119
120 /**
121 * 订阅某个主题
122 *
123 * @param topic
124 * @param qos
125 */
126 public void subscribe(String topic, int qos) {
127 try {
128 client.subscribe(topic, qos);
129 } catch (MqttException e) {
130 e.printStackTrace();
131 }
132 }
133
134
PushCallback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
2import org.eclipse.paho.client.mqttv3.MqttCallback;
3import org.eclipse.paho.client.mqttv3.MqttException;
4import org.eclipse.paho.client.mqttv3.MqttMessage;
5
6
7public class PushCallback implements MqttCallback {
8 @Override
9 public void connectionLost(Throwable cause) {
10 System.out.println("连接断开,可以做重连");
11 }
12
13 @Override
14 public void messageArrived(String topic, MqttMessage message) throws Exception {
15 System.out.println("接收消息主题 : " + topic);
16 System.out.println("接收消息Qos : " + message.getQos());
17 System.out.println("接收消息内容 : " + new String(message.getPayload()));
18
19 }
20
21 @Override
22 public void deliveryComplete(IMqttDeliveryToken token) {
23 System.out.println(token);
24 try {
25 System.out.println(token.getMessage());
26 } catch (MqttException e) {
27 e.printStackTrace();
28 }
29 }
30}
31
32
33
pom
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5 <parent>
6 <groupId>org.springframework.boot</groupId>
7 <artifactId>spring-boot-starter-parent</artifactId>
8 <version>2.1.7.RELEASE</version>
9 <relativePath/> <!-- lookup parent from repository -->
10 </parent>
11 <groupId>com.example</groupId>
12 <artifactId>mqtt</artifactId>
13 <version>0.0.1-SNAPSHOT</version>
14 <name>mqtt</name>
15 <description>Demo project for Spring Boot</description>
16
17 <properties>
18 <java.version>1.8</java.version>
19 </properties>
20
21 <dependencies>
22 <dependency>
23 <groupId>org.springframework.boot</groupId>
24 <artifactId>spring-boot-starter-web</artifactId>
25 </dependency>
26 <!--mqtt 相关依赖-->
27 <dependency>
28 <groupId>org.springframework.boot</groupId>
29 <artifactId>spring-boot-starter-integration</artifactId>
30 </dependency>
31 <dependency>
32 <groupId>org.springframework.integration</groupId>
33 <artifactId>spring-integration-stream</artifactId>
34 </dependency>
35 <dependency>
36 <groupId>org.springframework.integration</groupId>
37 <artifactId>spring-integration-mqtt</artifactId>
38 </dependency>
39 <!--<dependency>
40 <groupId>org.eclipse.paho</groupId>
41 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
42 <version>1.2.0</version>
43 </dependency>-->
44 <dependency>
45 <groupId>org.projectlombok</groupId>
46 <artifactId>lombok</artifactId>
47 <version>1.16.14</version>
48 </dependency>
49 <dependency>
50 <groupId>org.springframework.boot</groupId>
51 <artifactId>spring-boot-starter-test</artifactId>
52 <scope>test</scope>
53 </dependency>
54 <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-jmx -->
55 <!-- https://mvnrepository.com/artifact/org.springframework/spring-jmx -->
56
57
58 <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
59 <dependency>
60 <groupId>org.apache.commons</groupId>
61 <artifactId>commons-lang3</artifactId>
62 <version>3.0</version>
63 </dependency>
64 <dependency>
65 <groupId>org.springframework.boot</groupId>
66 <artifactId>spring-boot-starter-security</artifactId>
67 </dependency>
68
69 <dependency>
70 <groupId>org.springframework.boot</groupId>
71 <artifactId>spring-boot-starter-aop</artifactId>
72 </dependency>
73</dependencies>
74
75 <build>
76 <plugins>
77 <plugin>
78 <groupId>org.springframework.boot</groupId>
79 <artifactId>spring-boot-maven-plugin</artifactId>
80 </plugin>
81 </plugins>
82 </build>
83
84</project>
85
86
87
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 1package com.example.mqtt.mqtt;
2
3
4import lombok.extern.slf4j.Slf4j;
5import org.eclipse.paho.client.mqttv3.*;
6import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
7import org.springframework.beans.factory.annotation.Value;
8import org.springframework.stereotype.Component;
9import javax.annotation.PostConstruct;
10
11@Slf4j
12@Component
13public class ReportMqtt implements MqttCallback {
14 @Value("${mqtt.url}")
15 public String HOST;
16 @Value("${mqtt.consumer.defaultTopic}")
17 public String TOPIC;
18 @Value("${mqtt.username}")
19 private String name;
20 @Value("${mqtt.password}")
21 private String passWord;
22
23 private MqttClient client;
24 private MqttConnectOptions options;
25
26 //clientId不能重复所以这里我设置为系统时间
27 String clientid = String.valueOf(System.currentTimeMillis());
28
29 @PostConstruct
30 public void result() {
31 try {
32 // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
33 client = new MqttClient(HOST, clientid, new MemoryPersistence());
34 // MQTT的连接设置
35 options = new MqttConnectOptions();
36 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
37 options.setCleanSession(true);
38 // 设置连接的用户名
39 options.setUserName(name);
40 // 设置连接的密码
41 options.setPassword(passWord.toCharArray());
42 // 设置超时时间 单位为秒
43 options.setConnectionTimeout(10);
44 // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
45 options.setKeepAliveInterval(3600);
46 // 设置回调
47 client.setCallback(this);
48 client.connect(options);
49 //订阅消息
50 int[] Qos = {1};
51 String[] topic1 = {TOPIC};
52 client.subscribe(topic1, Qos);
53
54 } catch (Exception e) {
55 log.info("ReportMqtt客户端连接异常,异常信息:" + e);
56 }
57
58 }
59
60 @Override
61 public void connectionLost(Throwable throwable) {
62 try {
63 log.info("程序出现异常,DReportMqtt断线!正在重新连接...:");
64 client.close();
65 this.result();
66 log.info("ReportMqtt重新连接成功");
67 } catch (MqttException e) {
68 log.info(e.getMessage());
69 }
70 }
71
72 @Override
73 public void messageArrived(String topic, MqttMessage message) {
74 log.info("接收消息主题:" + topic);
75 log.info("接收消息Qos:" + message.getQos());
76 log.info("接收消息内容 :" + new String(message.getPayload()));
77
78 }
79
80 @Override
81 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
82 log.info("消息发送成功");
83 }
84}
85
86
配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 1# src/main/resources/config/mqtt.properties
2##################
3# MQTT 配置
4##################
5# 用户名
6mqtt.username=admin
7# 密码
8mqtt.password=password
9# 推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613
10mqtt.url=tcp://127.0.0.1:61613
11##################
12# MQTT 生产者
13# 连接服务器默认客户端ID
14mqtt.producer.clientId=mqttProducer
15# 默认的推送主题,实际可在调用接口时指定
16mqtt.producer.defaultTopic=topic1
17##################
18# MQTT 消费者
19##################
20# 连接服务器默认客户端ID
21mqtt.consumer.clientId=mqttConsumer
22# 默认的接收主题,可以订阅多个Topic,逗号分隔
23mqtt.consumer.defaultTopic=topic1
24
25
运行截图
项目中遇到如下问题
加入spring权限jar包即可
mqtt后台地址
http://127.0.0.1:61680/console/index.html\#
然后启动项目看下结果,是否有producer
访问项目url,地址如下 http://localhost:8080/mqtt?msg=“来点鸡血”
后续完善,许多漏洞。