Springboot整合Mqtt(物联网)

1、添加 MQTT 依赖

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2、创建 MQTT 客户端工厂

管理 MQTT 客户端的创建和销毁

public class MqttClientFactory {
    private static final String BROKER_URL = "tcp://127.0.0.1:1883";
    private static final String CLIENT_ID_PREFIX = "mqtt-client-";

    public static MqttClient createClient(String clientIdSuffix) throws MqttException {
        String clientId = CLIENT_ID_PREFIX + clientIdSuffix;
        MqttClient mqttClient = new MqttClient(BROKER_URL, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setUserName("admin");
        connOpts.setPassword("123456".toCharArray());
        connOpts.setConnectionTimeout(3000);
        connOpts.setKeepAliveInterval(20);
        mqttClient.connect(connOpts);
        return mqttClient;
    }

    public static void destroyClient(MqttClient client) throws MqttException{
        if (client != null && client.isConnected()) {
            client.disconnect();
            client.close();
        }
    }
}

3、创建 MQTT 客户端服务

管理 MQTT 客户端的生命周期,包括连接、订阅、发布、断开连接以及重连逻辑

@Slf4j
@Service
public class MqttClientService {

    private final ConcurrentMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();

    public void connectAndSub(String[] topic, String clientId) throws MqttException {
        MqttClient client = MqttClientFactory.createClient(clientId);
        clientMap.put(clientId, client);
        client.subscribe(topic);
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                log.info("连接丢失后的重连");
                tryReconnect(topic, clientId);
            }

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                log.info("处理接收到的消息");
                log.info("topic:{},mqttMessage:{}", topic, new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // 发布完成回调
                log.info("主题发布成功");
            }
        });
    }

    public void publish(String clientId, String topic, String message) throws MqttException {
        MqttClient client = clientMap.get(clientId);
        if (client != null && client.isConnected()) {
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttMessage.setQos(2);
            client.publish(topic, mqttMessage);
        }
    }

    /**
     * 断开、销毁连接
     * @param clientId
     */
    public void disConnectAndDestroy(String clientId){
        MqttClient client = clientMap.remove(clientId);
        try {
            MqttClientFactory.destroyClient(client);
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }

    private void tryReconnect(String[] topic, String clientId){
        // 尝试重连逻辑,可以加入重试次数限制和延时
        try {
            connectAndSub(topic, clientId);
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }

}

4、在 Spring Boot 应用中使用 MQTT 客户端服务

    @Resource
    private MqttClientService mqttClientService;

作者:一只编程菜鸟

物联沃分享整理
物联沃-IOTWORD物联网 » Springboot整合Mqtt(物联网)

发表回复