物联网实际应用第七篇:利用MQTT模拟单向通信

文章目录

  • 概要
  • 发布方(模拟设备)
  • 引入pom
  • 回调MqttCallback
  • 发布的方法
  • 订阅方(模拟服务器)
  • 引入pom
  • 回调MqttCallback
  • 订阅的方法
  • 测试结果
  • 实际应用
  • 概要

    两个springboot项目,一个作为发布方,一个作为订阅方,模拟设备向服务器发送mqtt请求上报消息,示例使用的是emqx官网的免费mqtt服务器,实际开发中可以使用搭建的mqtt服务器

    发布方(模拟设备)

    引入pom

     <dependency>
        <groupId>org.eclipse.paho</groupId>
         <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
         <version>1.2.5</version>
     </dependency>
    

    回调MqttCallback

    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    public class OnMessageCallback implements MqttCallback {
        @Override
        public void connectionLost(Throwable cause) {
        }
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
        }
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
        }
    }
    

    发布的方法

    
    import com.sci.web.system.controller.dev.OnMessageCallback;
    import io.swagger.annotations.*;
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping("/system/send")
    public class TGlzzMenuController {
    
    
        @ApiOperation("发送mqtt消息")
        @GetMapping("/sendMqtt")
        public void sendMqtt() {
            String pubTopic = "publish111/topic";
            String content = "Hello World";
            int qos = 2;
    //      emqx免费服务器
            String broker = "tcp://broker.emqx.io:1883";
            String clientId = "publishClient11";
            MemoryPersistence persistence = new MemoryPersistence();
    
            try {
                MqttClient client = new MqttClient(broker, clientId, persistence);
    
                // MQTT 连接选项
                MqttConnectOptions connOpts = new MqttConnectOptions();
    //            connOpts.setUserName("test111");
    //            connOpts.setPassword("test111".toCharArray());
                // 保留会话
    
                // 设置回调
                client.setCallback(new OnMessageCallback() {
                    @Override
                    public void connectionLost(Throwable cause) {
                    }
    
                    @Override
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken token) {
                        System.out.println("发送成功");
                    }
                });
    
                // 建立连接
                client.connect(connOpts);
                // 消息发布所需参数
                MqttMessage message = new MqttMessage(content.getBytes());
                message.setQos(qos);
                client.publish(pubTopic, message);
    
            } catch (MqttException me) {
    //          这里有失败原因
                me.printStackTrace();
            }
        }
    }
    
    

    订阅方(模拟服务器)

    引入pom

      <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.5</version>
            </dependency>
    

    回调MqttCallback

    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    public class OnMessageCallback implements MqttCallback {
        @Override
        public void connectionLost(Throwable cause) {
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
        }
    }
    

    订阅的方法

    
    import org.eclipse.paho.client.mqttv3.*;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    
    @Component
    public class MqttClientConfiguration{
        
        private MqttClient client;
    //  emqx免费服务器
        private String broker = "tcp://broker.emqx.io:1883";
        private String clientId = "receiveClient11";
        //  订阅主题
        private String subscribeTopic = "publish111/topic";
        private MqttConnectOptions connOpts;
    
        @PostConstruct
        public void connect() {
            try {
                client = new MqttClient(broker, clientId);
    
    //          MQTT 连接选项
                connOpts = new MqttConnectOptions();
    //          connOpts.setUserName("test111");
    //          connOpts.setPassword("test111".toCharArray());
    //          保留会话
                connOpts.setCleanSession(true);
    
    //          建立连接
                client.connect(connOpts);
    //          设置回调
                client.setCallback(new OnMessageCallback() {
                    @Override
                    public void connectionLost(Throwable cause) {
                        // 连接丢失后,一般在这里面进行重连
                        System.out.println("连接断开,可以做重连");
                    }
                    @Override
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        // subscribe后得到的消息会执行到这里面
                        System.out.println("接收消息主题:" + topic);
                        System.out.println("接收消息内容:" + new String(message.getPayload()));
                    }
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken token) {
                    }
                });
    //          订阅
                client.subscribe(subscribeTopic);
            } catch (MqttException me) {
    //          这里有失败原因
                me.printStackTrace();
            }
        }
    }
    

    测试结果

  • 发布方
  • 订阅方
  • 实际应用

    在订阅方的回调中把收到的数据解析存起来

    作者:uutale

    物联沃分享整理
    物联沃-IOTWORD物联网 » 物联网实际应用第七篇:利用MQTT模拟单向通信

    发表回复