《从环境搭建到服务集成:深入浅出Springboot整合MQTT》
介绍
Eclipse Mosquitto是一个开源消息代理,实现了MQTT协议版本3.1和3.1.1。Mosquitto轻量,适用于低功耗单板计算机到完整服务器的所有设备。Mosquitto项目还提供了用于实现MQTT客户端的C库以及非常受欢迎的mosquitto_pub和mosquitto_sub命令行MQTT客户端。
Ubuntu安装mosquitto
1.安装mosquitto
sudo apt-get install mosquitto
2.查看mosquitto服务状态
sudo service mosquitto status
3.开启/停止 mosquitto服务
sudo service mosquitto start
sudo service mosquitto stop
4.配置mosquitto服务器用户验证
①进入mosquitto目录
cd /etc/mosquitto/
②修改mosquitto.conf
sudo vim mosquitto.conf
③mosquitto配置用户验证信息
#不允许匿名
allow_anonymous false
#配置用户密码文件
password_file /etc/mosquitto/pwfile
#开放端口,允许所有IP访问
listener 1883 0.0.0.0
④配置用户登录名及密码
sudo mosquitto_passwd -c /etc/mosquitto/pwfile 用户名
用户名填写自己想要的名称,然后输入两遍密码,用户名和密码就添加成功了
Springboot 集成 MQTT服务
1.在Pom.xml里导入mqtt的jar包
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
2.在application.yml配置文件里配置MQTT服务器的连接信息
mqtt:
userName: admin
passWord: 123456
host: tcp://192.168.145.128:1883
topics: /txt/a,/txt/b,/txt/c
3.新建一个初始化MQTT连接的class
①通过Value注解从配置文件获取MQTT配置信息
//MQTT服务器用户名
@Value("${mqtt.userName}")
private String userName;
//MQTT服务器密码
@Value("${mqtt.passWord}")
private String passWord;
//MQTT服务器的IP地址和端口
@Value("${mqtt.host}")
private String host;
//所要订阅的Topic列表,topic列表通过逗号进行分割
@Value("${mqtt.topics}")
private List<String> sub_Topic;
②配置MQTT连接信息
/**
* 配置MQTT连接信息
* @return
*/
public MqttConnectOptions getOptions(){
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//是否清除Session
mqttConnectOptions.setCleanSession(false);
//MQTT服务器用户名
mqttConnectOptions.setUserName(userName);
//MQTT服务器密码
mqttConnectOptions.setPassword(passWord.toCharArray());
//MQTT连接超时时间,10s
mqttConnectOptions.setConnectionTimeout(10);
//设置MQTT心跳时间,20s
mqttConnectOptions.setKeepAliveInterval(20);
return mqttConnectOptions;
}
③连接MQTT服务端
/**
* 连接MQTT服务端
*/
public void connect(){
//防止重复创建MQTTClient实例
if (mqttClient == null){
try {
mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
//MqttDataService是接收订阅topic数据推送的回调
mqttClient.setCallback(new MqttDataService());
} catch (MqttException e) {
e.printStackTrace();
}
}
MqttConnectOptions options = getOptions();
//判断拦截状态
if (!mqttClient.isConnected()){
try {
mqttClient.connect(options);
log.info("[MQTT] 连接成功!");
} catch (MqttException e) {
e.printStackTrace();
}
}else {
try {
mqttClient.disconnect();
mqttClient.connect(options);
log.info("[MQTT] 连接成功!");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
④发送消息
/**
* 发送数据
*/
public void publish(String topic, String content) throws MqttException {
MqttMessage message = new MqttMessage(content.getBytes());
try{
mqttClient.publish(topic, message);
}catch (Exception e){
e.printStackTrace();
}
}
⑤断线重连
/**
*断线重连
*/
public void reConnect() throws MqttException {
if (null != mqttClient){
MqttConnectOptions options = getOptions();
mqttClient.connect(options);
log.info("[MQTT] 重连成功!");
}
}
⑥启动程序并订阅Topic
/**
* 启动程序并订阅Topic
* @PostConstruct 注解的作用是服务初始化启动完成后会主动执行有该注解的方法
*/
@PostConstruct
public void start(){
log.info("MQTT连接数据初始化成功,开始连接....");
try {
connect();
for (int i = 0;i<sub_Topic.size();i++){
mqttClient.subscribe(sub_Topic.get(i),1);
}
} catch (MqttException e) {
e.printStackTrace();
}
}
⑦MqttConfigService.class
package com.example.mqtttest.service;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.UUID;
@Service
@Slf4j
public class MqttConfigService {
//mqtt连接
private MqttClient mqttClient;
//MQTT服务器用户名
@Value("${mqtt.userName}")
private String userName;
//MQTT服务器密码
@Value("${mqtt.passWord}")
private String passWord;
//MQTT服务器的IP地址和端口
@Value("${mqtt.host}")
private String host;
//所要订阅的Topic列表,topic列表通过逗号进行分割
@Value("${mqtt.topics}")
private List<String> sub_Topic;
private String clientId = createUUID();
/**
* 配置MQTT连接信息
* @return
*/
public MqttConnectOptions getOptions(){
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//是否清除Session
mqttConnectOptions.setCleanSession(false);
//MQTT服务器用户名
mqttConnectOptions.setUserName(userName);
//MQTT服务器密码
mqttConnectOptions.setPassword(passWord.toCharArray());
//MQTT连接超时时间,10s
mqttConnectOptions.setConnectionTimeout(10);
//设置MQTT心跳时间,20s
mqttConnectOptions.setKeepAliveInterval(20);
return mqttConnectOptions;
}
/**
* 连接MQTT服务端
*/
public void connect(){
//防止重复创建MQTTClient实例
if (mqttClient == null){
try {
mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
//MqttDataService是接收订阅topic数据推送的回调
mqttClient.setCallback(new MqttDataService());
} catch (MqttException e) {
e.printStackTrace();
}
}
MqttConnectOptions options = getOptions();
//判断拦截状态
if (!mqttClient.isConnected()){
try {
mqttClient.connect(options);
log.info("[MQTT] 连接成功!");
} catch (MqttException e) {
e.printStackTrace();
}
}else {
try {
mqttClient.disconnect();
mqttClient.connect(options);
log.info("[MQTT] 连接成功!");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 启动程序并订阅Topic
* @PostConstruct 注解的作用是服务初始化启动完成后会主动执行有该注解的方法
*/
@PostConstruct
public void start(){
log.info("MQTT连接数据初始化成功,开始连接....");
try {
connect();
for (int i = 0;i<sub_Topic.size();i++){
mqttClient.subscribe(sub_Topic.get(i),1);
}
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发送数据
*/
public void publish(String topic, String content) throws MqttException {
MqttMessage message = new MqttMessage(content.getBytes());
try{
mqttClient.publish(topic, message);
}catch (Exception e){
e.printStackTrace();
}
}
/**
*断线重连
*/
public void reConnect() throws MqttException {
if (null != mqttClient){
MqttConnectOptions options = getOptions();
mqttClient.connect(options);
log.info("[MQTT] 重连成功!");
}
}
public static String createUUID(){
UUID uuid = UUID.randomUUID();
String uuidStr = String.valueOf(uuid);
String uuidStr2 = uuidStr.replaceAll("-","");
return uuidStr2;
}
}
4.实现MqttCallback回调类
package com.example.mqtttest.service;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 实现MqttCallback回调类
*/
@Slf4j
public class MqttDataService implements MqttCallback {
private MqttConfigService mqttConfigService = ActivitiConfig.getBean(MqttConfigService.class);
/**
* 与MQTT服务器连接断开后的回调
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
// 连接丢失后,一般在这里面进行重连
log.info("[MQTT] 连接断开,10S之后尝试重连...");
while (true){
try {
Thread.sleep(10000);
mqttConfigService.reConnect();
break;
}catch (Exception e){
e.printStackTrace();
continue;
}
}
}
/**
* 接收所订阅Topic推送的消息
* @param topic
* @param mqttMessage
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
String result = mqttMessage.toString();
log.info("#####接收到的数据:"+result);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
5.所订阅Topic推送的数据
#####接收到的数据:
{
"msg": "hello"
}
6.使用publish方法发送数据到指定的Topic
①编写测试方法
@Test
public void test(){
mqttConfigService.publish("/test/d","数据发送成功");
}
②使用MQTT客户端工具订阅该Topic:/test/d,客户端工具接收到我们发送的数据
7.MqttDataService的接收消息方法如果接收的数据处理抛出异常,会导致MQTT连接异常断开
①处理数据的方法抛出异常
②MQTT连接异常退出
③解决方法:采用异步的方法,将数据交由其他线程处理
该例子仅作测试,正常应采用线程池方式去做异步处理。
8.不通过@Autowired或@Resource注解获取JavaBean的方式
package com.example.mqtttest.service;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ActivitiConfig implements ApplicationContextAware {
static ApplicationContext applicationContext;
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
@Override
public void setApplicationContext(ApplicationContext arg0) throws BeansException {
if (applicationContext == null){
applicationContext = arg0;
}
}
}