RudolphLiu子丙
所有项目
解决方案已上线

工业场景 MQTT 消息总线设计方案

面向工业物联网的 MQTT 架构设计文档,涵盖主题规范、QoS 策略选型、安全认证、高可用集群,以及与 OPC-UA 的协议桥接方案。

MQTTEMQXOPC-UANode.jsDockerTLS

这是一份在实际工业项目中沉淀下来的设计规范,主要解决「工厂里有几十台设备,如何把数据统一汇聚、分发和管理」的问题。

为什么选 MQTT

工业现场的网络环境通常不稳定(Wi-Fi 信号弱、4G 时断时续),设备算力有限(MCU 级别)。MQTT 的优势在于:

  • 轻量:最小报文头仅 2 字节,适合低带宽场景
  • 发布/订阅解耦:设备只管发,消费方随时可加,系统扩展无需改动设备固件
  • QoS 三级:可根据数据重要性选择投递保障级别
  • 遗嘱消息(LWT):设备掉线时自动触发告警

主题命名规范

{tenant}/{site}/{area}/{device_id}/{category}/{metric}
层级示例说明
tenantacme租户标识(多租户场景)
sitefactory-a工厂/站点
arealine-3产线/区域
device_idplc-001设备唯一 ID
categorytelemetry / event / cmd数据类别
metrictemperature具体测点

完整示例:

acme/factory-a/line-3/plc-001/telemetry/temperature
acme/factory-a/line-3/plc-001/event/alarm
acme/factory-a/line-3/plc-001/cmd/setpoint

命令下发和遥测上报使用不同的 category,避免混淆,也便于 ACL 权限控制(设备只能发布 telemetry,不能发布 cmd)。

QoS 策略

场景QoS原因
高频遥测(温度、压力,1s/次)0丢几个点无所谓,不需要重传开销
过程报警(越限、故障)1至少投递一次,允许重复
控制指令下发2必须恰好执行一次,不能重复

安全认证

生产环境必须启用:

# 生成 CA 和服务端证书
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt
 
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -out server.crt

EMQX 配置启用双向 TLS + 用户名密码认证:

listeners.ssl.default {
  bind = "0.0.0.0:8883"
  ssl_options {
    cacertfile = "etc/certs/ca.crt"
    certfile   = "etc/certs/server.crt"
    keyfile    = "etc/certs/server.key"
    verify     = verify_peer  # 强制验证客户端证书
  }
}

设备级 ACL 通过 EMQX 的 Built-in Database 管理,每台设备只能 pub/sub 自己的主题前缀。

与 OPC-UA 的桥接

工厂里的 PLC 和 SCADA 系统通常只支持 OPC-UA,需要一个协议网关将 OPC-UA 数据转成 MQTT:

PLC (OPC-UA Server)
    
OPC-UA  MQTT Gateway (Node.js)
    
MQTT Broker (EMQX)
    
数据平台 / 云端

网关核心逻辑:

import { OPCUAClient, AttributeIds } from 'node-opcua'
import mqtt from 'mqtt'
 
const opcClient = OPCUAClient.create({ endpointMustExist: false })
const mqttClient = mqtt.connect('mqtts://broker:8883', { /* tls options */ })
 
// 订阅 OPC-UA 节点变化,转发到 MQTT
subscription.monitor({
  nodeId: 'ns=2;s=Line3.PLC001.Temperature',
  attributeId: AttributeIds.Value,
}, (dataValue) => {
  mqttClient.publish(
    'acme/factory-a/line-3/plc-001/telemetry/temperature',
    JSON.stringify({ value: dataValue.value.value, ts: Date.now() }),
    { qos: 0 }
  )
})

高可用部署

生产环境建议 EMQX 3 节点集群,前置 HAProxy 做 TCP 负载均衡:

                    ┌─ emqx-node-1
Client  HAProxy ──┤─ emqx-node-2
                    └─ emqx-node-3

节点间通过 Erlang distribution 协议同步路由表和会话状态。单节点故障,已连接的客户端会在 keep-alive 超时后自动重连到其他节点,订阅关系不丢失。

监控与告警

关键指标通过 EMQX Dashboard + Prometheus + Grafana 监控:

  • 在线连接数
  • 消息吞吐量(msg/s)
  • 消息堆积(某个主题的积压量)
  • 节点 CPU / 内存

消息堆积超阈值时 PagerDuty 告警,这通常意味着消费方处理能力不足。