云原生环境下Kubernetes集成物联网实时消息引擎工具的实践与探索


一、EMQX 简介

EMQX 是一款开源的高性能 分布式 MQTT 消息服务器,专为物联网(IoT)场景设计,支持海量设备连接与消息路由。其核心功能包括:

  1. 协议支持

  2. MQTT v3.1/v3.1.1/v5.0:标准物联网通信协议。
  3. 其他协议:CoAP、LwM2M、WebSocket、STOMP 等。
  4. 多语言 SDK:支持 Java、Python、Go 等客户端开发。
  5. 核心特性

  6. 高并发:单节点支持百万级 MQTT 连接。
  7. 低延迟:微秒级消息路由(基于 Erlang/OTP 的高效调度)。
  8. 集群扩展:水平扩展节点,自动负载均衡。
  9. 规则引擎:通过 SQL 实时处理消息(如转发到 Kafka、数据库)。
  10. 企业级安全:TLS 加密、ACL 权限控制、LDAP/MySQL 认证集成。
  11. 适用场景

  12. 物联网设备管理(如智能家居、车联网)。
  13. 实时数据采集与分发(如传感器数据、日志流)。
  14. 消息中间件(连接应用系统与边缘设备)。
  15. 版本区别

  16. 开源版(EMQX):基础功能免费,适合中小规模。
  17. 企业版(EMQX Enterprise):支持更多协议、SLA 保障与高级功能(如数据桥接、离线消息)。

图片转载


二、在 Kubernetes 中部署 EMQX 集群

1. 部署架构
  • StatefulSet:确保每个 EMQX 节点有稳定的网络标识(Pod 名称)和持久化存储。
  • Headless Service:用于节点间发现与通信。
  • ConfigMap:配置 EMQX 集群参数与规则引擎。
  • 2. 部署步骤

    前提条件

  • Kubernetes 集群(1.20+)。
  • 已安装 kubectl 和 Helm(可选)。

  • 步骤 1:创建 Namespace

    kubectl create ns emqx
    

    步骤 2:编写部署文件(YAML)

    # emqx-cluster.yaml
    ---
    # 配置 EMQX 参数
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: emqx-config
      namespace: emqx
    data:
      emqx.conf: |
        cluster {
          discovery = k8s
          name = emqx-cluster
          k8s {
            apiserver = "https://kubernetes.default.svc:443"
            service_name = emqx-headless
            namespace = emqx
            address_type = hostname
            app_name = emqx
          }
        }
        # 其他配置(如监听端口、认证)
        listeners.tcp.default {
          bind = "0.0.0.0:1883"
          max_connections = 1000000
        }
    ---
    # 定义 Headless Service(节点发现)
    apiVersion: v1
    kind: Service
    metadata:
      name: emqx-headless
      namespace: emqx
    spec:
      clusterIP: None
      ports:
        - name: mqtt
          port: 1883
          targetPort: 1883
        - name: dashboard
          port: 18083
          targetPort: 18083
      selector:
        app: emqx
    ---
    # 部署 EMQX StatefulSet
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: emqx
      namespace: emqx
    spec:
      serviceName: emqx-headless
      replicas: 3
      selector:
        matchLabels:
          app: emqx
      template:
        metadata:
          labels:
            app: emqx
        spec:
          containers:
          - name: emqx
            image: emqx/emqx:5.4.0
            env:
            - name: EMQX_NODE_NAME
              value: "emqx@$(POD_NAME).emqx-headless.emqx.svc.cluster.local"
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            ports:
            - containerPort: 1883  # MQTT
            - containerPort: 8883  # MQTT/SSL
            - containerPort: 8083  # WebSocket
            - containerPort: 18083  # Dashboard
            volumeMounts:
            - name: emqx-data
              mountPath: /opt/emqx/data
            - name: config
              mountPath: /opt/emqx/etc/emqx.conf
              subPath: emqx.conf
          volumes:
          - name: config
            configMap:
              name: emqx-config
      volumeClaimTemplates:
      - metadata:
          name: emqx-data
        spec:
          accessModes: [ "ReadWriteOnce" ]
          storageClassName: "standard"  # 根据集群存储类修改
          resources:
            requests:
              storage: 10Gi
    ---
    # 对外暴露 Dashboard 和 MQTT 服务
    apiVersion: v1
    kind: Service
    metadata:
      name: emqx-loadbalancer
      namespace: emqx
    spec:
      type: LoadBalancer  # 或 NodePort/Ingress
      ports:
        - name: mqtt
          port: 1883
          targetPort: 1883
        - name: dashboard
          port: 18083
          targetPort: 18083
      selector:
        app: emqx
    

    步骤 3:应用配置

    kubectl apply -f emqx-cluster.yaml
    

    步骤 4:验证部署

    # 检查 Pod 状态
    kubectl get pods -n emqx -l app=emqx
    
    # 查看集群节点状态(需进入任意 Pod)
    kubectl exec -it emqx-0 -n emqx -- emqx_ctl cluster status
    

    三、EMQX 集群性能测试

    1. 测试工具选择
  • emqtt-bench:EMQX 官方压测工具(基于 Erlang,适合 MQTT 协议)。
  • JMeter + MQTT 插件:图形化界面,支持复杂场景。
  • Xmeter:云原生压测平台(企业级)。
  • 2. 使用 emqtt-bench 进行测试

    步骤 1:安装 emqtt-bench

    git clone https://github.com/emqx/emqtt-bench.git
    cd emqtt-bench
    make
    

    步骤 2:模拟 10 万设备连接

    # 启动 1000 个客户端,每个发布 1 条消息/秒,持续 60 秒
    ./emqtt_bench conn -c 100000 -h <EMQX-Service-IP> -p 1883 -t 60 --interval 10
    

    步骤 3:测试消息吞吐量

    # 启动 100 个发布者和 100 个订阅者
    ./emqtt_bench sub -c 100 -h <EMQX-Service-IP> -p 1883 -t bench/#
    ./emqtt_bench pub -c 100 -h <EMQX-Service-IP> -p 1883 -t bench/msg -m "hello" -s 256 -q 1
    

    步骤 4:监控指标

  • EMQX Dashboardhttp://<LB-IP>:18083):查看连接数、消息速率、节点负载。
  • Prometheus + Grafana:集成 EMQX 的监控指标(如 emqx_cluster_nodes{node="emqx@node1", state="running"})。
  • 3. 关键性能指标
  • 连接建立速率(connections/sec):每秒成功建立的 MQTT 连接数。
  • 消息吞吐量(msg/sec):每秒发布和消费的消息数量。
  • 端到端延迟(P99 Latency):99% 的消息从发布到订阅的延迟时间。
  • 资源利用率:CPU、内存、网络带宽占用。
  • 4. 优化建议
  • 调整 EMQX 参数
    # emqx.conf
    listeners.tcp.default {
      max_connections = 1000000
      acceptor_pool_size = 16  # 根据 CPU 核数调整
    }
    
  • Kubernetes 资源配置
  • 为 EMQX Pod 分配足够 CPU/内存(如 4 CPU + 8GiB)。
  • 使用本地 SSD 存储(减少 I/O 延迟)。
  • 网络优化
  • 启用 MQTT over TLS(避免明文传输开销)。
  • 使用高性能 CNI 插件(如 Cilium)。

  • 四、实战示例:车联网数据采集

    场景

  • 10 万辆车辆每 5 秒上报 GPS 数据(消息大小 1KB)。
  • EMQX 集群将数据转发至 Kafka 供实时分析。
  • 部署与测试

    1. 部署 EMQX 集群(如上述 YAML 配置)。
    2. 配置规则引擎(将数据转发到 Kafka):
      SELECT 
          payload.lat as latitude,
          payload.lng as longitude,
          clientid as vehicle_id
      FROM "vehicle/gps"
      
    3. 压测:使用 emqtt-bench 模拟 10 万客户端,验证吞吐量与延迟。
    4. 结果
    5. 连接数稳定在 10 万,无断开。
    6. 消息吞吐量 ≥ 20k msg/sec,P99 延迟 < 50ms。

    五、总结

    通过 Kubernetes 部署 EMQX 集群,您可以快速构建高可用、弹性扩展的物联网消息平台。结合性能测试工具,可验证集群在真实负载下的表现,并根据结果优化资源配置与 EMQX 参数。典型场景包括:

  • 智能工厂:设备状态实时监控。
  • 智慧城市:海量传感器数据聚合。
  • 实时通信:聊天应用、游戏服务器。
  • 扩展阅读

  • EMQX 官方文档
  • Kubernetes StatefulSet 最佳实践
  • 作者:demonlg0112

    物联沃分享整理
    物联沃-IOTWORD物联网 » 云原生环境下Kubernetes集成物联网实时消息引擎工具的实践与探索

    发表回复