这是一份在实际工业项目中沉淀下来的设计规范,主要解决「工厂里有几十台设备,如何把数据统一汇聚、分发和管理」的问题。
为什么选 MQTT
工业现场的网络环境通常不稳定(Wi-Fi 信号弱、4G 时断时续),设备算力有限(MCU 级别)。MQTT 的优势在于:
- 轻量:最小报文头仅 2 字节,适合低带宽场景
- 发布/订阅解耦:设备只管发,消费方随时可加,系统扩展无需改动设备固件
- QoS 三级:可根据数据重要性选择投递保障级别
- 遗嘱消息(LWT):设备掉线时自动触发告警
主题命名规范
{tenant}/{site}/{area}/{device_id}/{category}/{metric}
| 层级 | 示例 | 说明 |
|---|---|---|
| tenant | acme | 租户标识(多租户场景) |
| site | factory-a | 工厂/站点 |
| area | line-3 | 产线/区域 |
| device_id | plc-001 | 设备唯一 ID |
| category | telemetry / event / cmd | 数据类别 |
| metric | temperature | 具体测点 |
完整示例:
acme/factory-a/line-3/plc-001/telemetry/temperature
acme/factory-a/line-3/plc-001/event/alarm
acme/factory-a/line-3/plc-001/cmd/setpoint
命令下发和遥测上报使用不同的 category,避免混淆,也便于 ACL 权限控制(设备只能发布 telemetry,不能发布 cmd)。
QoS 策略
| 场景 | QoS | 原因 |
|---|---|---|
| 高频遥测(温度、压力,1s/次) | 0 | 丢几个点无所谓,不需要重传开销 |
| 过程报警(越限、故障) | 1 | 至少投递一次,允许重复 |
| 控制指令下发 | 2 | 必须恰好执行一次,不能重复 |
安全认证
生产环境必须启用:
# 生成 CA 和服务端证书
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -out server.crtEMQX 配置启用双向 TLS + 用户名密码认证:
listeners.ssl.default {
bind = "0.0.0.0:8883"
ssl_options {
cacertfile = "etc/certs/ca.crt"
certfile = "etc/certs/server.crt"
keyfile = "etc/certs/server.key"
verify = verify_peer # 强制验证客户端证书
}
}设备级 ACL 通过 EMQX 的 Built-in Database 管理,每台设备只能 pub/sub 自己的主题前缀。
与 OPC-UA 的桥接
工厂里的 PLC 和 SCADA 系统通常只支持 OPC-UA,需要一个协议网关将 OPC-UA 数据转成 MQTT:
PLC (OPC-UA Server)
↓
OPC-UA → MQTT Gateway (Node.js)
↓
MQTT Broker (EMQX)
↓
数据平台 / 云端
网关核心逻辑:
import { OPCUAClient, AttributeIds } from 'node-opcua'
import mqtt from 'mqtt'
const opcClient = OPCUAClient.create({ endpointMustExist: false })
const mqttClient = mqtt.connect('mqtts://broker:8883', { /* tls options */ })
// 订阅 OPC-UA 节点变化,转发到 MQTT
subscription.monitor({
nodeId: 'ns=2;s=Line3.PLC001.Temperature',
attributeId: AttributeIds.Value,
}, (dataValue) => {
mqttClient.publish(
'acme/factory-a/line-3/plc-001/telemetry/temperature',
JSON.stringify({ value: dataValue.value.value, ts: Date.now() }),
{ qos: 0 }
)
})高可用部署
生产环境建议 EMQX 3 节点集群,前置 HAProxy 做 TCP 负载均衡:
┌─ emqx-node-1
Client → HAProxy ──┤─ emqx-node-2
└─ emqx-node-3
节点间通过 Erlang distribution 协议同步路由表和会话状态。单节点故障,已连接的客户端会在 keep-alive 超时后自动重连到其他节点,订阅关系不丢失。
监控与告警
关键指标通过 EMQX Dashboard + Prometheus + Grafana 监控:
- 在线连接数
- 消息吞吐量(msg/s)
- 消息堆积(某个主题的积压量)
- 节点 CPU / 内存
消息堆积超阈值时 PagerDuty 告警,这通常意味着消费方处理能力不足。