Springboot整合Mqtt(物联网)
1、添加 MQTT 依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2、创建 MQTT 客户端工厂
管理 MQTT 客户端的创建和销毁
public class MqttClientFactory {
private static final String BROKER_URL = "tcp://127.0.0.1:1883";
private static final String CLIENT_ID_PREFIX = "mqtt-client-";
public static MqttClient createClient(String clientIdSuffix) throws MqttException {
String clientId = CLIENT_ID_PREFIX + clientIdSuffix;
MqttClient mqttClient = new MqttClient(BROKER_URL, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName("admin");
connOpts.setPassword("123456".toCharArray());
connOpts.setConnectionTimeout(3000);
connOpts.setKeepAliveInterval(20);
mqttClient.connect(connOpts);
return mqttClient;
}
public static void destroyClient(MqttClient client) throws MqttException{
if (client != null && client.isConnected()) {
client.disconnect();
client.close();
}
}
}
3、创建 MQTT 客户端服务
管理 MQTT 客户端的生命周期,包括连接、订阅、发布、断开连接以及重连逻辑
@Slf4j
@Service
public class MqttClientService {
private final ConcurrentMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();
public void connectAndSub(String[] topic, String clientId) throws MqttException {
MqttClient client = MqttClientFactory.createClient(clientId);
clientMap.put(clientId, client);
client.subscribe(topic);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
log.info("连接丢失后的重连");
tryReconnect(topic, clientId);
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("处理接收到的消息");
log.info("topic:{},mqttMessage:{}", topic, new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 发布完成回调
log.info("主题发布成功");
}
});
}
public void publish(String clientId, String topic, String message) throws MqttException {
MqttClient client = clientMap.get(clientId);
if (client != null && client.isConnected()) {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(2);
client.publish(topic, mqttMessage);
}
}
/**
* 断开、销毁连接
* @param clientId
*/
public void disConnectAndDestroy(String clientId){
MqttClient client = clientMap.remove(clientId);
try {
MqttClientFactory.destroyClient(client);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
private void tryReconnect(String[] topic, String clientId){
// 尝试重连逻辑,可以加入重试次数限制和延时
try {
connectAndSub(topic, clientId);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
4、在 Spring Boot 应用中使用 MQTT 客户端服务
@Resource
private MqttClientService mqttClientService;
作者:一只编程菜鸟