一、引言
本文将详细介绍如何使用EMQ X规则引擎的MQTT数据桥接功能,接收由MQTT客户端发送的数据,并将其实时插入到时序数据库IoTDB中。EMQ X作为一个大规模扩展、可弹性伸缩的开源云原生分布式物联网消息中间件,能够高效可靠地处理海量物联网设备的并发连接。而IoTDB作为Apache的顶级项目,以其轻量级架构、高性能和高可用性,满足了工业IoT领域中海量数据存储、高吞吐量数据写入和复杂数据查询分析的需求。
二、准备工作
1. 软件与环境
- 操作系统:Mac OSX
- IoTDB:Binary包(Server),版本0.12.4
- MQTT Broker:emqx开源版4.3.11
- MQTT客户端软件:MQTTX v1.6.0
2. IoTDB安装与配置
- 从IoTDB官方页面下载IoTDB Server(单机版)的二进制包,解压后进入解压目录。
- 修改
conf/iotdb-engine.properties
文件,启用MQTT服务并设置相关参数:enable_mqtt_service=true mqtt_host=0.0.0.0 mqtt_port=2883 # 避免与emqx的端口冲突 mqtt_handler_pool_size=1 mqtt_payload_formatter=json mqtt_max_message_size=1048576
- 设置
virtual_storage_group_num
为机器核数以增加写入并行度。 - 使用
./sbin/start-server.sh
命令启动IoTDB服务端。 - 打开新的命令行终端窗口,使用
./sbin/start-cli.sh
命令启动IoTDB的shell工具。
3. 下载与启动EMQ X
- 从EMQ X官方下载页面获取EMQ X Broker软件包。
- 解压并启动EMQ X:
unzip -q emqx-macos-4.3.11-amd64.zip cd emqx ./bin/emqx console
三、配置规则
1. 创建规则
- 使用浏览器打开EMQ X的Dashboard,在规则引擎页面创建一条规则,SQL语句如下:
SELECT clientid, now_timestamp('millisecond') as now_ts_ms, payload.bar as bar FROM "t/#"
- 在页面底部添加「桥接数据到MQTT Broker」动作,并创建MQTT Bridge资源:
- 远程Broker地址:
127.0.0.1:2883
- 客户端Id、用户名、密码:
root
- 远程Broker地址:
- 返回动作创建页面,填写更多动作参数:
- 主题:
foo
- 消息内容模板:
{ "device": "root.sg.${clientid}", "timestamp": ${now_ts_ms}, "measurements": ["bar"], "values": [${bar}] }
- 主题:
- 点击「确认」保存动作配置,并完成规则的创建。
四、测试与验证
- 使用MQTTX客户端工具发送消息给EMQ X:
- Client ID:
abc
- 主题:
t/1
- 消息内容:
{ "bar": 0.2 }
- Client ID:
- 在EMQ X Dashboard的规则引擎页面观察规则的命中次数,确认规则被触发。
- 在IoTDB客户端窗口使用SQL语句查询数据,验证数据是否成功插入:
SHOW TIMESERIES root.sg.abc SELECT * FROM root.sg.abc
五、结语
通过本文的介绍,我们成功实现了通过EMQ X规则引擎功能将消息持久化到IoTDB时序数据库。在实际生产场景中,这一组合方案能够高效地处理物联网设备的并发连接,灵活处理业务功能,并将设备发送的消息持久化到IoTDB数据库,进而实现大数据分析、可视化展示等功能。EMQ X + IoTDB的组合是一个简洁、高效、易扩展且高可用的服务端集成方案,适用于物联网设备管理和数据处理场景。