云原生环境下Kubernetes集成物联网实时消息引擎工具的实践与探索
一、EMQX 简介
EMQX 是一款开源的高性能 分布式 MQTT 消息服务器,专为物联网(IoT)场景设计,支持海量设备连接与消息路由。其核心功能包括:
-
协议支持:
- MQTT v3.1/v3.1.1/v5.0:标准物联网通信协议。
- 其他协议:CoAP、LwM2M、WebSocket、STOMP 等。
- 多语言 SDK:支持 Java、Python、Go 等客户端开发。
-
核心特性:
- 高并发:单节点支持百万级 MQTT 连接。
- 低延迟:微秒级消息路由(基于 Erlang/OTP 的高效调度)。
- 集群扩展:水平扩展节点,自动负载均衡。
- 规则引擎:通过 SQL 实时处理消息(如转发到 Kafka、数据库)。
- 企业级安全:TLS 加密、ACL 权限控制、LDAP/MySQL 认证集成。
-
适用场景:
- 物联网设备管理(如智能家居、车联网)。
- 实时数据采集与分发(如传感器数据、日志流)。
- 消息中间件(连接应用系统与边缘设备)。
-
版本区别:
- 开源版(EMQX):基础功能免费,适合中小规模。
- 企业版(EMQX Enterprise):支持更多协议、SLA 保障与高级功能(如数据桥接、离线消息)。
二、在 Kubernetes 中部署 EMQX 集群
1. 部署架构
2. 部署步骤
前提条件:
kubectl
和 Helm(可选)。步骤 1:创建 Namespace
kubectl create ns emqx
步骤 2:编写部署文件(YAML)
# emqx-cluster.yaml
---
# 配置 EMQX 参数
apiVersion: v1
kind: ConfigMap
metadata:
name: emqx-config
namespace: emqx
data:
emqx.conf: |
cluster {
discovery = k8s
name = emqx-cluster
k8s {
apiserver = "https://kubernetes.default.svc:443"
service_name = emqx-headless
namespace = emqx
address_type = hostname
app_name = emqx
}
}
# 其他配置(如监听端口、认证)
listeners.tcp.default {
bind = "0.0.0.0:1883"
max_connections = 1000000
}
---
# 定义 Headless Service(节点发现)
apiVersion: v1
kind: Service
metadata:
name: emqx-headless
namespace: emqx
spec:
clusterIP: None
ports:
- name: mqtt
port: 1883
targetPort: 1883
- name: dashboard
port: 18083
targetPort: 18083
selector:
app: emqx
---
# 部署 EMQX StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: emqx
namespace: emqx
spec:
serviceName: emqx-headless
replicas: 3
selector:
matchLabels:
app: emqx
template:
metadata:
labels:
app: emqx
spec:
containers:
- name: emqx
image: emqx/emqx:5.4.0
env:
- name: EMQX_NODE_NAME
value: "emqx@$(POD_NAME).emqx-headless.emqx.svc.cluster.local"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
ports:
- containerPort: 1883 # MQTT
- containerPort: 8883 # MQTT/SSL
- containerPort: 8083 # WebSocket
- containerPort: 18083 # Dashboard
volumeMounts:
- name: emqx-data
mountPath: /opt/emqx/data
- name: config
mountPath: /opt/emqx/etc/emqx.conf
subPath: emqx.conf
volumes:
- name: config
configMap:
name: emqx-config
volumeClaimTemplates:
- metadata:
name: emqx-data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "standard" # 根据集群存储类修改
resources:
requests:
storage: 10Gi
---
# 对外暴露 Dashboard 和 MQTT 服务
apiVersion: v1
kind: Service
metadata:
name: emqx-loadbalancer
namespace: emqx
spec:
type: LoadBalancer # 或 NodePort/Ingress
ports:
- name: mqtt
port: 1883
targetPort: 1883
- name: dashboard
port: 18083
targetPort: 18083
selector:
app: emqx
步骤 3:应用配置
kubectl apply -f emqx-cluster.yaml
步骤 4:验证部署
# 检查 Pod 状态
kubectl get pods -n emqx -l app=emqx
# 查看集群节点状态(需进入任意 Pod)
kubectl exec -it emqx-0 -n emqx -- emqx_ctl cluster status
三、EMQX 集群性能测试
1. 测试工具选择
2. 使用 emqtt-bench 进行测试
步骤 1:安装 emqtt-bench
git clone https://github.com/emqx/emqtt-bench.git
cd emqtt-bench
make
步骤 2:模拟 10 万设备连接
# 启动 1000 个客户端,每个发布 1 条消息/秒,持续 60 秒
./emqtt_bench conn -c 100000 -h <EMQX-Service-IP> -p 1883 -t 60 --interval 10
步骤 3:测试消息吞吐量
# 启动 100 个发布者和 100 个订阅者
./emqtt_bench sub -c 100 -h <EMQX-Service-IP> -p 1883 -t bench/#
./emqtt_bench pub -c 100 -h <EMQX-Service-IP> -p 1883 -t bench/msg -m "hello" -s 256 -q 1
步骤 4:监控指标
http://<LB-IP>:18083
):查看连接数、消息速率、节点负载。emqx_cluster_nodes{node="emqx@node1", state="running"}
)。3. 关键性能指标
4. 优化建议
# emqx.conf
listeners.tcp.default {
max_connections = 1000000
acceptor_pool_size = 16 # 根据 CPU 核数调整
}
四、实战示例:车联网数据采集
场景:
部署与测试:
- 部署 EMQX 集群(如上述 YAML 配置)。
- 配置规则引擎(将数据转发到 Kafka):
SELECT payload.lat as latitude, payload.lng as longitude, clientid as vehicle_id FROM "vehicle/gps"
- 压测:使用
emqtt-bench
模拟 10 万客户端,验证吞吐量与延迟。 - 结果:
- 连接数稳定在 10 万,无断开。
- 消息吞吐量 ≥ 20k msg/sec,P99 延迟 < 50ms。
五、总结
通过 Kubernetes 部署 EMQX 集群,您可以快速构建高可用、弹性扩展的物联网消息平台。结合性能测试工具,可验证集群在真实负载下的表现,并根据结果优化资源配置与 EMQX 参数。典型场景包括:
扩展阅读:
作者:demonlg0112