使用EMQ X规则引擎将MQTT数据实时插入时序数据库IoTDB

发布于:2025-06-11 ⋅ 阅读:(54) ⋅ 点赞:(0)
一、引言

本文将详细介绍如何使用EMQ X规则引擎的MQTT数据桥接功能,接收由MQTT客户端发送的数据,并将其实时插入到时序数据库IoTDB中。EMQ X作为一个大规模扩展、可弹性伸缩的开源云原生分布式物联网消息中间件,能够高效可靠地处理海量物联网设备的并发连接。而IoTDB作为Apache的顶级项目,以其轻量级架构、高性能和高可用性,满足了工业IoT领域中海量数据存储、高吞吐量数据写入和复杂数据查询分析的需求。

二、准备工作
1. 软件与环境
  • 操作系统‌:Mac OSX
  • IoTDB‌:Binary包(Server),版本0.12.4
  • MQTT Broker‌:emqx开源版4.3.11
  • MQTT客户端软件‌:MQTTX v1.6.0
2. IoTDB安装与配置
  • IoTDB官方页面下载IoTDB Server(单机版)的二进制包,解压后进入解压目录。
  • 修改conf/iotdb-engine.properties文件,启用MQTT服务并设置相关参数:
    enable_mqtt_service=true
    mqtt_host=0.0.0.0
    mqtt_port=2883  # 避免与emqx的端口冲突
    mqtt_handler_pool_size=1
    mqtt_payload_formatter=json
    mqtt_max_message_size=1048576
  • 设置virtual_storage_group_num为机器核数以增加写入并行度。
  • 使用./sbin/start-server.sh命令启动IoTDB服务端。
  • 打开新的命令行终端窗口,使用./sbin/start-cli.sh命令启动IoTDB的shell工具。
3. 下载与启动EMQ X
  • EMQ X官方下载页面获取EMQ X Broker软件包。
  • 解压并启动EMQ X:
    unzip -q emqx-macos-4.3.11-amd64.zip
    cd emqx
    ./bin/emqx console

三、配置规则
1. 创建规则
  • 使用浏览器打开EMQ X的Dashboard,在规则引擎页面创建一条规则,SQL语句如下:
    SELECT clientid, now_timestamp('millisecond') as now_ts_ms, payload.bar as bar FROM "t/#"
  • 在页面底部添加「桥接数据到MQTT Broker」动作,并创建MQTT Bridge资源:
    • 远程Broker地址:127.0.0.1:2883
    • 客户端Id、用户名、密码:root
  • 返回动作创建页面,填写更多动作参数:
    • 主题:foo
    • 消息内容模板:
      {
        "device": "root.sg.${clientid}",
        "timestamp": ${now_ts_ms},
        "measurements": ["bar"],
        "values": [${bar}]
      }

  • 点击「确认」保存动作配置,并完成规则的创建。
四、测试与验证
  • 使用MQTTX客户端工具发送消息给EMQ X:
    • Client ID:abc
    • 主题:t/1
    • 消息内容:{ "bar": 0.2 }
  • 在EMQ X Dashboard的规则引擎页面观察规则的命中次数,确认规则被触发。
  • 在IoTDB客户端窗口使用SQL语句查询数据,验证数据是否成功插入:
    SHOW TIMESERIES root.sg.abc
    SELECT * FROM root.sg.abc

五、结语

通过本文的介绍,我们成功实现了通过EMQ X规则引擎功能将消息持久化到IoTDB时序数据库。在实际生产场景中,这一组合方案能够高效地处理物联网设备的并发连接,灵活处理业务功能,并将设备发送的消息持久化到IoTDB数据库,进而实现大数据分析、可视化展示等功能。EMQ X + IoTDB的组合是一个简洁、高效、易扩展且高可用的服务端集成方案,适用于物联网设备管理和数据处理场景。