物联网智能监测-IOT产品设备管理
阿里云IOT平台
在开发智能监测之前,我们必须要先熟悉和掌握阿里云IOT平台的使用及对接
什么是物联网
把所有物品通过信息传感设备与互联网连接起来,进行信息交换,即物物相息,以实现智能化识别和管理。
物联网(英文:Internet of Things,缩写:IoT)起源于传媒领域,是信息科技产业的第三次革命。物联网是指通过信息传感设备,按约定的协议,将任何物体与网络相连接,物体通过信息传播媒介进行信息交换和通信,以实现智能化识别、定位、跟踪、监管等功能。
常见的应用场景:
共享充电宝
充电宝设备接入物联网平台后,可上报充电宝电量和借用状态等信息到物联网平台云端。充电宝用户扫码后,云端低延时向充电宝下发指令,使其弹出。同时,企业运营者能够实时获知充电宝的运行状况。
智能音箱
播报音箱接入物联网平台后,用户扫码完成支付后,将支付金额实时通过音箱,向用户和商家进行语音播报。
智能家居
智能家居技术已经成为当今家庭装潢的一大特色。比如,通过智能灯泡,可以实现远程控制灯光和电视等设备,调节温度和湿度,实现智能化生活。
智能农耕
智能农耕可以通过物联网技术来监测、传输、分析、管理农业生产过程中的信息。比如作物的生长情况、土壤的状况等,以提高农业生产的效率,改善利润率,减少污染,节约农业资源。
智能医疗
在智慧医疗中,可以捕捉人的生理状态信息,例如心跳频率、体力消耗、血压高低等。然后对采集数据进行备份、加工和分析,以便个人或医生快速查询。在物联网平台接入传感器设备,采集人体及周边环境参数的信息,通过数据服务处理数据后,反馈给用户。
IOT简介
产品文档:产品概述_物联网平台(IoT)-阿里云帮助中心
阿里云物联网平台是一个集成了设备管理、数据安全通信、消息订阅和数据服务等能力的一体化平台。向下支持连接海量设备,采集设备数据上云;向上提供云端API,服务端可通过调用云端API将指令下发至设备端,实现远程控制。
我们作为一个开发者,基本的设备与后台调度思路,如下:
开通物联网平台
开通阿里云账号
前往阿里云官网注册账号。如果已有注册账号,请跳过此步骤。
进入阿里云首页后,如果没有阿里云的账户需要先进行注册,才可以进行登录。由于注册较为简单,课程和讲义不在进行体现(注册可以使用多种方式,如淘宝账号、支付宝账号等…)。
需要实名认证和活体认证。
开通物理网平台
登录账号以后,我们可以在产品中搜索物联网平台
打开之后,点击管理控制台
申请公共实例
在IOT中分为了两种实例,一个是公共实例,另外一个是企业实例,不同的实例收费标准和功能是不一样的
公共实例,免费,使用地域为上海,支持同时在线设备数为50个,最多可创建500个设备,消息通信TPS为5条/秒
企业实例,如果公共实例超出了业务需求资源,可以使用企业实例,企业实例可以按照包年包月方式计算
在我们教学阶段,可以申请使用公共实例使用,如下图
注意:地域必须选择上海才能申请公共实例
产品
一旦拥有了公共实例,我们就可以使用临时实例来进行开发,我们先来介绍产品和设备
创建产品
如何在物联网平台创建产品_物联网平台(IoT)-阿里云帮助中心
产品:设备的集合,通常指一组具有相同功能的设备。物联网平台为每个产品颁发全局唯一的ProductKey。
简单说就是某一类产品,比如,手表、大门通道门禁、紧急呼叫报警器、滞留报警器、跌倒报警器
现在我们可以创建产品,找到产品–>创建产品
如下图,输入产品名称,然后选择平台提供好的分类,其他选择默认即可,然后确认创建
在产品列表中也可以查看,刚刚创建的产品
物模型
定义物模型属性、事件和服务_物联网平台(IoT)-阿里云帮助中心
产品创建好之后,可以给产品添加物模型,也就是给产品定义功能。
比如我们刚才创建的手表产品,可以定义功能,功能也可以分为两类,一个是监测手表本身,一个是因为指标数据
手表本身:耗电量,使用时间
指标数据:身体血压、血氧、体温数据
像这些耗电量、血压、血氧数据都属于产品的功能,也叫做物模型
在IOT平台的物模型中,分为了三类:
设备
物联网平台中注册单个设备_物联网平台(IoT)-阿里云帮助中心
产品是设备的集合,通常指一组具有相同功能的设备。创建产品完成后,需在产品下添加设备,获取设备证书。您可在物联网平台上,同时创建一个或多个设备
前提条件:设备是绑定在产品上的,所以必须先创建产品才行
操作步骤:
-
在左侧导航栏,选择设备管理> 设备。
-
在设备页面,单击添加设备。
-
在添加设备对话框中,输入设备信息,单击确认。
在添加设备的时候有三个参数,解释如下:
创建设备成功后,会自动弹出添加完成对话框。您可以查看、复制设备证书信息。设备证书由设备的ProductKey、DeviceName和DeviceSecret组成,是设备与物联网平台进行通信的重要身份认证。
设备数据上报
设备获取设备证书_物联网平台(IoT)-阿里云帮助中心
物理设备可通过两种方式获取物联网平台颁发的设备证书(ProductKey、DeviceName和DeviceSecret):设备厂商在产线上将证书烧录到设备上和设备上电联网后从厂商云服务中获取证书。
我们在开发阶段可以使用联网的电脑,来模拟设备的数据上报,比较简答的方式可以使用node来进行链接上报数据,参考代码如下:
const mqtt = require('aliyun-iot-mqtt');
// 1. 设备身份信息
var options = {
productKey: "j0rk1AN61hM",
deviceName: "watch001",
deviceSecret: "ea94110e5495bb04b0a7b35b9535a50c",
host: "iot-06z00frq8umvkx2.mqtt.iothub.aliyuncs.com"
};// 2. 建立MQTT连接
const client = mqtt.getAliyunIotMqttClient(options);
//订阅云端指令Topic
client.subscribe(`/${options.productKey}/${options.deviceName}/user/get`)
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/thing/event/property/post_reply`)
client.on('message', function (topic, message) {
console.log("topic " + topic)
console.log("message " + message)
})setInterval(function () {
// 3.定时上报温湿度数据
client.publish(`/sys/${options.productKey}/${options.deviceName}/thing/event/property/post`, getPostData(), { qos: 0 });
}, 5 * 1000);var power = 1000;
function getPostData () {
const payloadJson = {
id: Date.now(),
version: "1.0",
params: {
PowerConsumption: power–
},
method: "thing.event.property.post"}
console.log("payloadJson " + JSON.stringify(payloadJson))
return JSON.stringify(payloadJson);
}
把上述代码保存到一个文件夹下,以js为后缀名,如:iot_device_01.js
然后在js所在的文件夹下,打开cmd窗口,执行
node iot_device_01.js
设备启动后,可以在物联网平台查看刚才创建的设备,现在已在线
找到物模型数据,可以看到上报之后的数据
异步处理基础概念
同步和异步
同步(Background Synchronous)是指任务在后台进行处理,但需要等待任务完成后才能继续执行其他操作
异步(Asynchronous)是指任务的提交和执行是相互独立的,任务的执行不会阻塞程序的继续执行
如下图:
同步模式下,任务1完成后才能执行任务2,任务2需要等待任务1的完成。这种顺序执行的方式称为同步。
异步模式下,任务1和任务2可以并行执行,彼此之间相互独立,不需要等待对方的完成。这种并行执行的方式称为异步。
好处:
提高系统的并发性
改善系统的响应性
缺点:
复杂性增加
资源消耗增加
2.2 消息队列的基础概念
生产者:负责将消息发送到消息队列中
消费者:负责从消息队列中获取消息并进行处理
队列:存储消息
broker:负责接收、存储和分发消息的中间件组件,实现了发送者和接收者之间的解耦和异步通信
topic:消息的分类
在IOT中数据流转是这样的,如下图
生产者:设备负责将消息发送到IOT中(队列)
每个产品可以绑定不同的topic来进行消息分类,比如有电视topic、手表topic
IOT本身相当于是一个队列
消费者可以从指定的topic中获取数据
如果有多个消费者都要接收同一类消息,可以设置多个消费者,称为消费者组
什么是AMQP
AMQP全称Advanced Message Queuing Protocol,是一种网络协议,用于在应用程序之间传递消息。它是一种开放标准的消息传递协议,可以在不同的系统之间实现可靠、安全、高效的消息传递。
AMQP协议的实现包括多种消息队列软件,例如RabbitMQ、Apache ActiveMQ、Apache Qpid等。这些软件提供了可靠、高效的消息传递服务,广泛应用于分布式系统、云计算、物联网等领域。
快速使用Apache Qpid软件来接收IOT中的数据,如下图
设备数据消费
在IOT官方文档中,已经提供了对应的接收数据的解决方案,如下链接:
如何将AMQP JMS客户端接入物联网平台接收消息_物联网平台(IoT)-阿里云帮助中心
官网Java SDK接入
导入pom依赖
<!– amqp 1.0 qpid client –>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.57.0</version>
</dependency>
<!– util for base64–>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
下载Demo代码包
下载地址:https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
接收数据
我们可以修改里面的参数,包含以下几个重要参数:
accessKey 秘钥key
accessSecret 秘钥
consumerGroupId 消费者组
iotInstanceId 公共实例ID
clientId:InetAddress.getLocalHost().getHostAddress(); 获取本机ip作为clientId
package com.aliyun.iotx.demo;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class AmqpClient {
private final static Logger logger = LoggerFactory.getLogger(AmqpClient.class);
/**
* 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
*/
private static String accessKey = "LTAI5tDQKg9F61aJhbmhqVRK";
private static String accessSecret = "LYUKZH7HQGBoD025pmSq0fQsREaOYD";;
private static String consumerGroupId = "eraicKJm98cQR0hHgsxb000100";//iotInstanceId:实例ID。若是2021年07月30日之前(不含当日)开通的公共实例,请填空字符串。
private static String iotInstanceId = "iot-06z00frq8umvkx2";//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
private static String clientId;static {
try {
clientId = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}//${YourHost}为接入域名,请参见AMQP客户端接入说明文档。
private static String host = "iot-06z00frq8umvkx2.amqp.iothub.aliyuncs.com";// 指定单个进程启动的连接数
// 单个连接消费速率有限,请参考使用限制,最大64个连接
// 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接
private static int connectionCount = 4;//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue(50000));public static void main(String[] args) throws Exception {
List<Connection> connections = new ArrayList<>();//参数说明,请参见AMQP客户端接入说明文档。
for (int i = 0; i < connectionCount; i++) {
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "-" + i + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent, accessSecret, signMethod);
String connectionUrl = "failover:(amqps://" + host + ":5671?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// 创建连接。
Connection connection = cf.createConnection(userName, password);
connections.add(connection);((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();
// 创建Receiver连接。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}logger.info("amqp demo is started successfully, and will exit after 60s ");
// 结束程序运行
Thread.sleep(6000 * 1000);
logger.info("run shutdown");connections.forEach(c-> {
try {
c.close();
} catch (JMSException e) {
logger.error("failed to close connection", e);
}
});executorService.shutdown();
if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {
logger.info("shutdown success");
} else {
logger.info("failed to handle messages");
}
}private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(final Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(new Runnable() {
@Override
public void run() {
processMessage(message);
}
});
} catch (Exception e) {
logger.error("submit task occurs exception ", e);
}
}
};/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
logger.info("receive message"
+ ",\n topic = " + topic
+ ",\n messageId = " + messageId
+ ",\n content = " + content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Override
public void onSessionClosed(Session session, Throwable cause) {}@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
}
以上代码启动之后,并不能接收到数据,因为设备并没有绑定topic,所以需要在物联网IOT平台设置topic,也就是消费者的消费者组
第一:找到 消息转发->服务端订阅->消费者组列表
创建一个自己的消费者组
创建好之后可以查看到已经创建好的消费者组,并且自动生成了消费者组id
进入刚刚创建的消费者组,然后点击订阅产品,然后创建订阅
需要选择消费者组与推送消息类型(设备上报数据),如下图
修改demo代码中的消费者组,改为自己创建的消费者组ID
private static String consumerGroupId = "eraicKJm98cQR0hHgsxb000100";
测试
找一个设备进行数据上报
demo代码中绑定对应的消费者组
启动后台代码,可以在日志中查看消费者到的数据
SDK改造
SDK提供好的这个工具类,我们需要改造这个类,改造内容如下:
让spring进行管理和监听,一旦有数据变化之后,就可以马上消费,可以让这个类实现ApplicationRunner接口,重新run方法
可以在项目中自己配置线程池的使用
所有的可变参数,如实例id、accessKey、accessSecret、consumerGroupId这些统一在配置文件中维护
application.yml文件中添加IOT配置
Iot:
aliyun:
accessKeyId: LTAI5tDQKg9F61aJhbmhqVRK
accessKeySecret: LYUKZH7HQGBoD025pmSq0fQsREaOYD
consumerGroupId: DEFAULT_GROUP
regionId: cn-shanghai
iotInstanceId: iot-06z00frq8umvkx2
host: iot-06z00frq8umvkx2.amqp.iothub.aliyuncs.com
添加读取文件配置类
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Setter
@Getter
@NoArgsConstructor
@ToString
@Configuration
@ConfigurationProperties(prefix = "zzyl.aliyun")
public class AliIoTConfigProperties {/**
* 访问Key
*/
private String accessKeyId;
/**
* 访问秘钥
*/
private String accessKeySecret;
/**
* 区域id
*/
private String regionId;
/**
* 实例id
*/
private String iotInstanceId;
/**
* 域名
*/
private String host;/**
* 消费组
*/
private String consumerGroupId;}
常见线程池配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;@Configuration
public class ThreadPoolConfig {/**
* 核心线程池大小
*/
private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();/**
* 最大可创建的线程数
*/
private static final int MAX_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;/**
* 队列最大长度
*/
private static final int QUEUE_CAPACITY = 50000;/**
* 线程池维护线程所允许的空闲时间
*/
private static final int KEEP_ALIVE_SECONDS = 60;@Bean
public ExecutorService executorService(){
AtomicInteger c = new AtomicInteger(1);
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(QUEUE_CAPACITY);
return new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.MILLISECONDS,
queue,
r -> new Thread(r, "zzyl-pool-" + c.getAndIncrement()),
new ThreadPoolExecutor.DiscardPolicy()
);
}
}
改造之后的AmqpClient
import com.zzyl.properties.AliIoTConfigProperties;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ExecutorService;@Component
public class AmqpClient implements ApplicationRunner {
private final static Logger logger = LoggerFactory.getLogger(AmqpClient.class);@Autowired
private AliIoTConfigProperties aliIoTConfigProperties;//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
private static String clientId;static {
try {
clientId = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}// 指定单个进程启动的连接数
// 单个连接消费速率有限,请参考使用限制,最大64个连接
// 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接
private static int connectionCount = 64;//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
@Autowired
private ExecutorService executorService;public void start() throws Exception {
List<Connection> connections = new ArrayList<>();//参数说明,请参见AMQP客户端接入说明文档。
for (int i = 0; i < connectionCount; i++) {
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "-" + i + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + aliIoTConfigProperties.getAccessKeyId()
+ ",iotInstanceId=" + aliIoTConfigProperties.getIotInstanceId()
+ ",consumerGroupId=" + aliIoTConfigProperties.getConsumerGroupId()
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + aliIoTConfigProperties.getAccessKeyId() + "×tamp=" + timeStamp;
String password = doSign(signContent, aliIoTConfigProperties.getAccessKeySecret(), signMethod);
String connectionUrl = "failover:(amqps://" + aliIoTConfigProperties.getHost() + ":5671?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Destination queue = (Destination) context.lookup("QUEUE");
// 创建连接。
Connection connection = cf.createConnection(userName, password);
connections.add(connection);((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();
// 创建Receiver连接。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}logger.info("amqp is started successfully, and will exit after server shutdown ");
}private MessageListener messageListener = message -> {
try {
//异步处理收到的消息,确保onMessage函数里没有耗时逻辑
executorService.submit(() -> processMessage(message));
} catch (Exception e) {
logger.error("submit task occurs exception ", e);
}
};/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String contentStr = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
logger.info("receive message"
+ ",\n topic = " + topic
+ ",\n messageId = " + messageId
+ ",\n content = " + contentStr);} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}private JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
}@Override
public void onSessionClosed(Session session, Throwable cause) {
}@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
}@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {
}
};/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}@Override
public void run(ApplicationArguments args) throws Exception {
start();
}
}
设备消息订阅
在接收消息之前,我们需要让设备绑定消费组列表,这样才能通过消费组去接收消息
第一:找到 消息转发->服务端订阅->消费者组列表
目前有一个默认的消费组
第二:创建订阅,让产品与消费组进行关联
在服务端订阅页面的订阅列表页签下,单击创建订阅。
在创建订阅对话框,设置参数后单击确认。
作者:没带耳机