(2)EII消息总线(EII Message Bus)发布和订阅数据实操

发布于:2022-12-19 ⋅ 阅读:(539) ⋅ 点赞:(0)

1. EII Message Bus简介

本文详细介绍了EII Message Bus原理以及实操案例。

实操本文案例前,用户须安装好EII 3.0,并跑通了EII自带的PCBA缺陷检测案例和温度监控案例。EII的下载与安装,请参考上一篇文章:英特尔工业边缘洞见平台(EII)安装指南

1.1 ZeroMQ简介

EII的架构图如下,EII Message Bus作为EII的消息中间件,可支持EII各个模块之间的通讯,同时也能支持EII模块与外部程序的数据交换。
请添加图片描述

EII Message Bus是基于ZeroMQ开发的,并且EII模块中对于EII Message Bus的很多配置选项都和ZeroMQ的性质相关,所以我们有必要先介绍一下ZeroMQ。

ZeroMQ (官网: https://zeromq.org/) 是一个开源的通用消息库,它提供类似于socket编程的API基于不同的网络协议实现多种通讯模式。例如,使用ZeroMQ同一套API可以实现进程内(in-process),进程间(IPC),以及跨网络的通讯(TCP/multicast),另外ZeroMQ支持Request-Reply,Pub-Sub,Pull-Push等通讯模式。

ZeroMQ名字里虽然有MQ (Message Queue),但是它本质上并不是一个消息队列,而是一个消息内核 (Message Library)。所以,它和RabbitMQ, RocketMQ, Kafka等产品还是有一些区别的。ZeroMQ作为一个非常轻量化的消息内核,提供去中心化的,低延迟的通讯,可以很方便地集成在不同的系统中。

EII Message Bus对ZeroMQ的Request-Reply和Pub-Sub这两种通讯方式进行了封装,我们这里重点介绍一下。

首先是Request-Reply模式,该模式类似于传统Server-Client模式,即由Client(Request)端先发出请求,Server(Reply)端收到请求进行回复,属于一问一答式的通讯。
在这里插入图片描述

Pub-Sub模式,即发布-订阅模式,这种模式在很多通讯协议中都有被使用到(如MQTT)。它主要用于1对多的通讯场景中。当一个发布者发布消息后,可以有多个订阅者订阅消息,同时一个订阅者也可以订阅多个发布者的消息。
在这里插入图片描述

1.2 配置EII Message Bus

接下来我们介绍,在EII功能模块中,如何配置EII Message Bus的通讯接口呢?我们以EII自带的PCBA缺陷检测案例为例进行介绍。

在PCBA缺陷检测案例中,由VideoIngestion模块进行图像的采集,然后数据传递给VideoAnalytics模块进行分析,分析结果最后传递给Visualizer模块进行显示。
在这里插入图片描述

  • 首先,我们介绍VideoIngestion模块的发布接口,该接口用于VideoIngestion模块采集到图像数据后,将数据发布出去。查看VideoIngestion模块的配置文件"VideoIngestion/config.json",在"interfaces"字段中找到"Publishers"的定义,具体如下:
"Publishers": [
   {
        "Name": "default",
        "Type": "zmq_ipc",
        "EndPoint": "/EII/sockets",
        "Topics": [
            "camera1_stream"
        ],
        "AllowedClients": [
            "VideoAnalytics", "Visualizer", "WebVisualizer", "TLSRemoteAgent", "RestDataExport"
        ]
    }
]

字段具体含义如下:
- “Name"字段可任意填写。
- “Type"字段为"zmq_ipc”,表示采用ZeroMQ的进程间(IPC)的通讯方式。
- “EndPoint"字段为”/EII/sockets”,表示存放通讯媒介socket文件的存放地址。
- “Topics"字段为"camera1_stream”。Pub-Sub模式一般都是通过主题(topic)名字来实现消息的订阅。
- "AllowedClients"字段是一个简单的安全保护,只有这里定义的模块,才有权限订阅VideoIngestion模块发布的数据。

  • 接着,来看VideoAnalytics模块的订阅接口,该接口用于从VideoIngestion订阅数据。查看VideoAnalytics模块的配置文件"VideoAnalytics/config.json",在"interfaces"字段中找到"Subscribers"的定义,具体如下:
"Subscribers": [
   {
        "Name": "default",
        "Type": "zmq_ipc",
        "EndPoint": "/EII/sockets",
        "PublisherAppName": "VideoIngestion",
        "Topics": [
            "camera1_stream"
        ],
        "zmq_recv_hwm": 50
    }
],

可以看到VideoAnalytics模块的Subscriber接口定义和VideoIngestion模块的Publisher接口定义是对应的。
- “Name"字段可任意填写。
- “Type"字段为"zmq_ipc”。与VideoIngestion模块的Publisher接口保持一致。
- “EndPoint"字段为”/EII/sockets”。与VideoIngestion模块的Publisher接口保持一致。
- “Topics"字段为"camera1_stream”。与VideoIngestion模块的Publisher接口保持一致。
- "PublisherAppName"字段填写VideoIngestion模块的名字,该名字的定义可以去文件"VideoIngestion/docker-compose.yml"中,"environment"字段下的"AppName"变量查看。
- "zmq_recv_hwm"字段是ZeroMQ的一个设置选项,指的是通过ZeroMQ收发的消息,在内存中排队队列的长度大小。

  • 接着,来看VideoAnalytics模块的发布接口,VideoAnalytics模块从VideoIngestion模块接收数据,然后在模块内部进行分析处理,处理结果再经由发布接口发布出去。仍然是在配置文件"VideoAnalytics/config.json",在"interfaces"字段中找到"Publishers"的定义,具体如下:
"Publishers": [
    {
        "Name": "default",
        "Type": "zmq_tcp",
        "EndPoint": "0.0.0.0:65013",
        "Topics": [
            "camera1_stream_results"
        ],
        "AllowedClients": [
            "*"
        ]
    }
]

相比于VideoIngestion的发布接口,这里有的些许不同,我们来具体看一下:
- “Name"字段仍然可以任意填写。
- “Type"字段为"zmq_tcp”。这里和VideoIngestion模块就不一样了,VideoIngestion定义的是"zmq_ipc”,表示采用ZeroMQ的进程间(IPC)的通讯方式,这种方式只能实现同一台机器上进程之间的通讯。而VideoAnalytics这里定义的是"zmq_tcp",表示采用的是ZeroMQ的TCP通讯方式,可以实现跨机器,跨网络的通讯。VideoAnalytics模块的发布接口采用TCP方式的好处是,可以支持远端的机器远程查看处理结果。
- “EndPoint"字段为"0.0.0.0:65013”。由于这里采用了"zmq_tcp"的通讯方式,所以"EndPoint"需要配置成TCP通讯时,需要用到的IP地址 + 端口号Port的形式。
- “Topics"字段为"camera1_stream_results”,用于发布处理结果。
- “AllowedClients"字段是”*"。表示任意模块,都可以订阅VideoAnalytics模块发布的处理结果。

  • 最后,我们来看看Visualizer模块的订阅接口。到这里,大家应该比较清楚了,Visualizer模块是从VideoAnalytics模块订阅处理结果来进行可视化,所以它的订阅接口的定义需要和VideoAnalytics模块的发布模块对应。
"Subscribers": [
    {
        "Name": "default",
        "Type": "zmq_tcp",
        "EndPoint": "ia_video_analytics:65013",
        "PublisherAppName": "VideoAnalytics",
        "Topics": [
            "camera1_stream_results"
        ]
    },
    {
        "Name": "default",
        "Type": "zmq_tcp",
        "EndPoint": "ia_python_safety_gear_analytics:65019",
        "PublisherAppName": "PySafetyGearAnalytics",
        "Topics": [
            "py_safety_gear_stream_results"
        ]
    },
    {
        "Name": "default",
        "Type": "zmq_tcp",
        "EndPoint": "ia_native_safety_gear_analytics:65017",
        "PublisherAppName": "NativeSafetyGearAnalytics",
        "Topics": [
            "native_safety_gear_stream_results"
        ]
    }
]

我们可以看到,这里定义了多个订阅接口。其中后面两个是和其他案例相关,这里我们只需查看第一个。
和VideoAnalytics模块的发布接口对照来看:
- “Type"字段为"zmq_tcp”。与VideoAnalytics模块的Publisher接口保持一致。
- “EndPoint"字段为"ia_video_analytics:65013”。与VideoAnalytics模块的Publisher接口保持一致。在docker容器的使用中,可以使用容器service名字来代替容器的IP地址,可以在文件"VideoAnalytics/docker-compose.yml"第25行查看容器service名字。
- "PublisherAppName"字段填写VideoAnalytics模块的名字,该名字的定义可以去文件"VideoAnalytics/docker-compose.yml"中,"environment"字段下的"AppName"变量查看。
- “Topics"字段为"camera1_stream_results”,与VideoAnalytics模块的Publisher接口保持一致。

以上,我们介绍EII Message Bus在EII各个模块中的配置方式,接下来我将介绍实操案例,大家可以在实际操作中,来感受EII Message Bus的用法。

2. EII Message Bus测试模块

2.1 查看测试模块的配置

EII提供了多种编程语言的Samples模块,可以用于测试EII Message Bus的基本功能。我们以Python为例,Python的案例程序位于目录"Samples/publisher/python"(发布模块),和目录"Samples/subscriber/python"(订阅模块)。

  • 我们首先来看,发布模块和订阅模块之间的通信接口是如何定义的。

    1. 先看发布模块的配置文件"Samples/publisher/python/ubuntu/config.json",其中定义了发布接口"Publishers"。
    "Publishers": [
       {
            "Name": "default",
            "Type": "zmq_tcp",
            "EndPoint": "0.0.0.0:65020",
            "AllowedClients": [
                "*"
            ],
            "Topics": [
                "camera1_stream_results",
                "camera2_stream_results_metaonly"
            ]
        }
    ]
    
    1. 接着来看订阅模块的配置文件"Samples/subscriber/python/ubuntu/config.json",其中定义了订阅接口"Subscribers"。
    "Subscribers": [
       {
            "Name": "default",
            "Type": "zmq_tcp",
            "EndPoint": "ia_ubuntu_python_sample_pub:65020",
            "PublisherAppName": "UbuntuPythonSamplePub",
            "Topics": [
                "camera1_stream_results"
            ]
        }
    ]
    

    通过第一章节的介绍,大家应该发布接口和订阅接口如何定义已经很熟悉了。这里就不再详细解释了。

  • 接着,我们来看功能模块中其他的配置文件。在EII的功能模块中,有3个文件是必备的,分别是config.json, Dockerfile, docker-compose.yml。config.json用于对模块的功能,通讯接口等进行配置,Dockerfile和docker-compose.yml对于接触过Docker容器技术的用户来说,应该不陌生。Dockerfile用于构建功能模块的docker镜像,docker-compose.yml则用于实现对多个容器运行的编排,其中可以定义环境变量,文件挂载等信息。发布模块和订阅模块的Dockerfile和docker-compose.yml文件的细节,这里就不详细解释了,大家可自行查看。

  • 从两个模块的Dockerfile的末尾可以发现,两个模块的入口均是main.py脚本。

ENTRYPOINT ["python3", "-u", "main.py"]
  • 以发布模块的main.py为入口,可以看到在发布模块的publisher.py脚本中,实现了读取接口配置"pub_ctx.get_msgbus_config()“,创建’publisher’接口"msgbus_pub.new_publisher()”,并通过"publisher.publish()"发布数据的功能。

  • 同样,以订阅模块的main.py为入口,可以看到在订阅模块的subscriber.py脚本中,实现了读取接口配置"sub_ctx.get_msgbus_config()“,创建’subscriber’接口"msgbus_sub.new_subscriber()”,并通过"subscriber.recv()"接收数据的功能。

2.2 运行测试模块

接下来,我们将测试模块跑起来。

  • 我们将在EII源码目录"~/eii/IEdgeInsights"目录下进行操作。

  • 首先创建一个模块编排文件"build/usecases/test-emb.yml",并将"publisher"和"subscriber"模块添加进来,"ConfigMgrAgent"模块用于配置文件管理,默认必须添加。

AppContexts:
 - ConfigMgrAgent
 - Samples/publisher/python/ubuntu
 - Samples/subscriber/python/ubuntu
  • 编译模块配置信息
$ cd ~/eii/IEdgeInsights/build
$ sudo -E python3 builder.py -f usecases/test-emb.yml
  • 编译模块docker镜像。

    • (For PRC网络) 修改源信息,加速国内网络环境下的build docker镜像过程。
    # 文件Samples/publisher/python/ubuntu/docker-compose.yml第34行,"github"修改为"gitee",参考如下:
    第34行:
    PKG_SRC: https://github.com/open-edge-insights/eii-manifests/releases/download/v3.0
    修改为:
    PKG_SRC: https://gitee.com/open-edge-insights/eii-manifests/releases/download/v3.0
    
    # 文件Samples/publisher/python/ubuntu/Dockerfile第91行,pip源修改为清华源,参考如下:
    第91行:
    pip3 install -r requirements.txt; \
    修改为:
    pip3 install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple; \
    
    # 文件Samples/subscriber/python/ubuntu/docker-compose.yml第34行,"github"修改为"gitee",参考如下:
    第34行:
    PKG_SRC: https://github.com/open-edge-insights/eii-manifests/releases/download/v3.0
    修改为:
    PKG_SRC: https://gitee.com/open-edge-insights/eii-manifests/releases/download/v3.0
    
    # 文件Samples/subscriber/python/ubuntu/Dockerfile第91行,pip源修改为清华源,参考如下:
    第91行:
    pip3 install -r requirements.txt; \
    修改为:
    pip3 install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple; \
    
    • 执行编译docker镜像命令。(由于ConfigMgrAgent docker镜像在先前PCBA案例中已经build好了,所以这里可以不用重复编译,只需要编译"publisher"和"subscriber"模块)
    $ docker-compose -f docker-compose-build.yml build ia_ubuntu_python_sample_pub ia_ubuntu_python_sample_sub
    
  • 模块docker镜像编译成功后(可通过"docker images"命令查看),启动EII软件栈。

$ ./eii_start.sh
  • EII软件栈启动成功后,可通过"docker ps"命令查看容器运行情况。
$ docker ps
  • 查看发布模块日志。
# 执行docker logs命令,查看发布模块的日志
$ docker logs -f ia_ubuntu_python_sample_pub

发布模块日志信息如下。

[INFO] Msg published by publisher :  '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
[INFO] Received request from client: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
[INFO] Msg published by publisher :  '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
[INFO] Received request from client: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
[INFO] Msg published by publisher :  '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
[INFO] Received request from client: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
  • 打开另外一个命令行窗口,查看订阅模块的日志。
# 执行docker logs命令,查看订阅模块的日志
$ docker logs -f ia_ubuntu_python_sample_sub

订阅模块日志信息如下。

[INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
[INFO] Sending request {'int': 42, 'float': 55.5, 'str': 'Hello, World!', 'bool': True}
[INFO] Waiting for response
[INFO] Received response: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
[INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
[INFO] Sending request {'int': 42, 'float': 55.5, 'str': 'Hello, World!', 'bool': True}
[INFO] Waiting for response
[INFO] Received response: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
[INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
[INFO] Sending request {'int': 42, 'float': 55.5, 'str': 'Hello, World!', 'bool': True}
[INFO] Waiting for response
[INFO] Received response: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}

大家可能已经发现了日志里还有client和server的打印信息,这是因为测试模块中还带有client和server的通信代码。接下来,我们将client和server部分注释掉,重新再运行一遍。

2.3 只运行Pub-Sub通信部分

  • 首先,将client和server部分注释掉。
    • 打开文件"Samples/publisher/python/main.py",参照如下内容,将"server"部分注释掉,只保留"publisher"。
    def main():
        t1 = threading.Thread(target=publisher.start_publisher)
        # t2 = threading.Thread(target=server.start_server)
        t1.start()
        # t2.start()
        t1.join()
        # t2.join()
    
    • 接着打开文件"Samples/subscriber/python/main.py",参照如下内容,将"client"部分注释掉,只保留"subscriber"。
    def main():
        t1 = threading.Thread(target=subscriber.start_subscriber,)
        # t2 = threading.Thread(target=client.start_client,)
        t1.start()
        # t2.start()
        t1.join()
        # t2.join()
    
  • 重新编译和运行。
    • 编译模块配置信息
    $ cd ~/eii/IEdgeInsights/build
    $ sudo -E python3 builder.py -f usecases/test-emb.yml
    
    • 编译docker镜像。
    $ docker-compose -f docker-compose-build.yml build ia_ubuntu_python_sample_pub ia_ubuntu_python_sample_sub
    
    • 启动EII软件栈。
    $ ./eii_start.sh
    
    • 查看发布模块日志。
    $ docker logs -f ia_ubuntu_python_sample_pub
    
    发布模块日志信息如下。
    [INFO] Msg published by publisher :  '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
    [INFO] Msg published by publisher :  '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
    [INFO] Msg published by publisher :  '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
    
    此时日志信息就比较清晰了,只剩下"publisher"发送的信息。
    • 打开另外一个命令行窗口,查看订阅模块的日志。
    $ docker logs -f ia_ubuntu_python_sample_sub
    
    订阅模块日志信息如下。
    [INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
    [INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
    [INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
    
  • 想要关闭EII软件栈,在"build"目录下执行"docker-compose down"命令即可。
$ cd ~/eii/IEdgeInsights/build
$ docker-compose down

通过以上内容,想必大家对EII Message Bus的Publisher和Subscriber通信方式,以及EII功能模块的编译和启动流程有了基本了解。接下来我们结合EII的时序数据模块来进一步学习一下EII Message Bus的使用。

3. 通过EII Message Bus传递数据到EII时序栈中

通过上述实操,我们使用EII Message Bus实现了数据的发布和订阅。
不过大家可能发现了,测试模块中发送的数据是写死的,没有变化 。在接下来的测试中,我们将对发送的数据做一些修改,来让测试变得有趣一点。
此外我们还要将测试数据通过EII Message Bus导入到EII的时序栈中(Telegraf, InfluxDB, Grafana, Kapacitor)来实现数据的存储,分析和可视化。

3.1 对发送数据进行修改

  • 首先,我们修改发布程序,让"publisher"程序发布随机的温度数据。参考如下内容,修改"publisher"程序"Samples/publisher/python/publisher.py"。让"publisher"程序发布10到30之间随机的温度数据。
# 第51-60行,对于meta变量的定义:
meta = {
    'integer': 123,
    'floating': 55.5,
    'string': 'test',
    'boolean': True,
    'empty': None,
    'obj': {'test': {'test2': 'hello'}, 'test3': 'world'},
    'arr': ['test', 123]
}
# 修改为:
meta = {
    'temperature': random.uniform(10,30)
}

# 第27行,导入'random'库
import random

3.2 配置EII时序栈

接下来,我们将引入EII的时序栈相关的模块。

  • 首先配置Telegraf模块。Telegraf和"subscriber"模块类似,需要从"publisher"模块订阅数据。所以,我们参考"subscriber"模块的config.json文件来配置Telegraf的config.json文件“Telegraf/config.json”。Telegraf最终的配置信息如下,其中"config"字段,增加了"temperature_sub"定义,“interface"字段增加了"Subscribers"定义(注意"Subscribers"中的"Name"字段也为"temperature_sub”)。
{
    "config": {
        "cert_type": ["zmq", "pem"],
        "influxdb": {
            "dbname": "datain"
        },
	    "publisher": {
            "measurements": ["*"],
            "profiling": "false"
        },
        "temperature_sub": {
            "topics_info": [
                "camera1_stream_results:temperature"
            ],
            "queue_len": 10,
            "num_worker": 1,
            "profiling": "false"
        }
    },
    "interfaces": {
        "Publishers": [
            {
                "Name": "publisher",
                "Type": "zmq_tcp",
                "EndPoint": "0.0.0.0:65077",
                "Topics": [
                    "*"
                ],
                "AllowedClients": [
                    "*"
                ]
            }
        ],
        "Subscribers": [
            {
                "Name": "temperature_sub",
                "Type": "zmq_tcp",
                "EndPoint": "ia_ubuntu_python_sample_pub:65020",
                "PublisherAppName": "UbuntuPythonSamplePub",
                "Topics": [
                    "camera1_stream_results"
                ]
            }
        ]
    }
}
  • 接着我们来配置Telegraf的插件配置文件"Telegraf/config/Telegraf/Telegraf_devmode.conf" (检查IEdgeInsights/build/.env文件中"DEV_MODE"是否为true,为true表示"开发模块",将使用Telegraf_devmode.conf配置文件,否则将使用Telegraf.conf配置文件),在该文件的3961行,添加如下内容。注意这里的"instance_name"也为"temperature_sub"。
[[inputs.eii_msgbus]]
    instance_name = "temperature_sub"
    data_format = "json"
    json_strict = true
  • 然后我们在"Telegraf/docker-compose.yml"文件中,将配置文件挂载到容器中,使得稍后我们无需重新编译docker镜像。参考如下内容,在文件第72行添加挂载信息。
volumes:
  - "vol_temp_telegraf:/tmp/"
  - "vol_eii_socket:${SOCKET_DIR}"
  - ./Certificates/Telegraf:/run/secrets/Telegraf:ro
  - ./Certificates/rootca/cacert.pem:/run/secrets/rootca/cacert.pem:ro
  - "../Telegraf/config/Telegraf/Telegraf_devmode.conf:/etc/Telegraf/Telegraf/Telegraf_devmode.conf"

到这里呢,我就配置好了"Telegraf"模块,该模块将通过“Telegraf/config.json”中的配置的EII Message Bus接口,实现从Samples “publisher"模块订阅数据,然后通过"Telegraf/config/Telegraf/Telegraf_devmode.conf"中"eii_msgbus"输入插件,将数据注入到Telegraf模块中。那么有输入"input”,就要有输出"output"。

  • Telegraf模块已经默认配置好了一些输出"outputs"插件。例如,将数据输出到InfluxDB数据库。在文件"Telegraf/config/Telegraf/Telegraf_devmode.conf"中搜索"outputs.influxdb",定位到"influxdb"输出插件的位置。可以看到这里配置了InfluxDB数据库的地址,账号和密码。(这里我们不用做额外修改,使用默认配置即可)
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
  ## The full HTTP or UDP URL for your InfluxDB instance.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  # urls = ["unix:///var/run/influxdb.sock"]
  # urls = ["udp://127.0.0.1:8089"]
   urls = ["http://$INFLUX_SERVER:$INFLUXDB_PORT"]

  ## The target database for metrics; will be created as needed.
   database = "$INFLUXDB_DBNAME"
  ## If true, no CREATE DATABASE queries will be sent.  Set to true when using
  ## Telegraf with a user without permissions to create databases or when the
  ## database already exists.
   skip_database_creation = true

  ## HTTP Basic Auth
   username = "$INFLUXDB_USERNAME"
   password = "$INFLUXDB_PASSWORD"
  • InfluxDB和Grafana模块使用默认配置即可,我们也不用做额外的修改。

  • 以上,我们就配置好了EII的Telegraf, InfluxDB, Grafana模块,可以实现数据的存储和可视化,我们暂时先不引入Kapacitor模块,先来测试数据的存储和可视化功能。

  • 接着,我们来配置模块编排文件"build/usecases/test-emb.yml",将Telegraf, InfluxDB, Grafana这三个模块的模块名,添加到模块编排文件中。

AppContexts:
- ConfigMgrAgent
- Samples/publisher/python/ubuntu
- Samples/subscriber/python/ubuntu
- Telegraf
- InfluxDBConnector
- Grafana

3.3 运行存储+可视化案例

  • 完成以上配置后,我们来启动整个软件栈。

    • 编译模块配置信息
    $ cd ~/eii/IEdgeInsights/build
    $ sudo -E python3 builder.py -f usecases/test-emb.yml
    
    • 编译docker镜像('publisher’模块修改过源码,所以需要重新编译,其他模块只是配置文件的修改,无需重新编译)。
    $ docker-compose -f docker-compose-build.yml build ia_ubuntu_python_sample_pub
    
    • 启动EII软件栈。
    $ ./eii_start.sh
    
    • 查看发布模块日志。
    $ docker logs -f ia_ubuntu_python_sample_pub
    

    发布模块日志信息如下。

    [INFO] Msg published by publisher :  '{'temperature': 13.908131420251822}'
    [INFO] Msg published by publisher :  '{'temperature': 24.90195310641654}'
    [INFO] Msg published by publisher :  '{'temperature': 20.590006255570245}'
    

    此时日志打印的是10-30之间随机的温度数据。

    • 打开另外一个命令行窗口,查看订阅模块的日志。
    $ docker logs -f ia_ubuntu_python_sample_sub
    

    订阅模块日志信息如下,说明成功收到了数据。

    [INFO] RECEIVED by subscriber : {'temperature': 13.908131420251822}
    [INFO] RECEIVED by subscriber : {'temperature': 24.90195310641654}
    [INFO] RECEIVED by subscriber : {'temperature': 20.590006255570245}
    
  • 接下来,我们来查看一下,温度数据是否成功导入了EII的时序栈呢?

    • 首先查看数据是否存入了InfluxDB数据库。
    # 进入InfluxDB容器内部
    $ docker exec -it ia_influxdbconnector /bin/bash
    # 登录influx的数据库datain
    $ influx -username "admin" -password "admin123" -database "datain"
    # 查看数据表
    $ show measurements
    # 查询temperature数据表数据(前10条)
    $ select * from temperature limit 10
    
    • 若成功查询到数据,则说明数据成功存入InfluxDB数据库。
    > select * from temperature limit 10
    name: temperature
    time                host        temperature
    ----                ----        -----------
    1662958358010377742 ia_telegraf 13.423975245848652
    1662958359011378683 ia_telegraf 16.121738058536394
    1662958360011835195 ia_telegraf 29.126776840391607
    1662958361012156050 ia_telegraf 13.657501242332737
    1662958362012814971 ia_telegraf 17.99415435616606
    1662958363013936581 ia_telegraf 28.59169796074449
    1662958364014790541 ia_telegraf 29.09044813076643
    1662958365015604115 ia_telegraf 12.120675429378629
    1662958366016797502 ia_telegraf 26.1021303770315
    1662958367017682936 ia_telegraf 17.839165649354236
    
  • 在Grafana界面中,可视化数据。

    • 打开浏览器,输入网址"localhost:3000"登录Grafana界面 (默认账号: admin, 密码: admin),第一次登录,提示修改密码。
      请添加图片描述

    • 点击左边栏"Create" -> “Dashboard”,创建一个新的仪表板。
      请添加图片描述

    • 然后点击"Add an empty panel",添加一个新图表。
      请添加图片描述

    • 在弹出的界面中,“Data source"选择"InfluxDB”,“measurement"选择"temperature”,“field"选择"temperature”。
      请添加图片描述

    • 此时在上方图表,将看到温度数据的呈现。
      请添加图片描述

    • 点击右上角"apply",则会保存该图表并返回仪表盘。将时间窗口调整至显示最近5分钟的数据。
      请添加图片描述

    • 仪表盘刷新时间调整为每5秒刷新一次。
      请添加图片描述

    • 此时,我们将会看到不断刷新的温度数据(10-30之间)。
      请添加图片描述

    • 到这里,我们就完成了数据的可视化。接下来可以保存该仪表板。点击右上角"Save dashboard"。
      请添加图片描述

    • 输入仪表板名字,并点击"Save"。
      请添加图片描述

  • 若想关闭EII软件栈,执行如下命令即可。

$ cd ~/eii/IEdgeInsights/build
$ docker-compose down

3.4 增加数据分析模块

这一小节,我们将Kapacitor模块也加进来,来实现一些简单的数据处理。
例如,我们在采集到10到30度之间的温度数据之后,将20到25之间的数据过滤掉,实现一个简单的数据过滤。

接下来,我们开始实操。

3.4.1 配置Kapacitor模块

  • 首先,我们创建一个TICKScript (Kapacitor特有的用来编写规则的脚本语言),用来实现过滤算法。在目录"Kapacitor/tick_scripts"下,创建脚本"temp_filter.tick",并输入如下内容。
dbrp "datain"."autogen"

var data0 = stream
        |from()
                .database('datain')
                .retentionPolicy('autogen')
                .measurement('temperature')
                .where(lambda: "temperature" < 20 OR "temperature" > 25)
        |influxDBOut()
                .buffer(0)
                .database('datain')
                .measurement('temp_filter')
                .retentionPolicy('autogen')

该脚本实现了从数据库"datain"的数据表"temperature"中,读取数据,并只将字段"temperature"小于20或者大于25的提取出来,接着再将提取出来的数据,写回数据库"datain"的另外一个表"temp_filter"中。

TICKScript脚本的详细语法,可参考Kapacitor官方文档:https://docs.influxdata.com/kapacitor/v1.5/tick/syntax/

  • 接着,我们让Kapacitor模块引用该脚本。在配置文件"Kapacitor/config.json"中,添加"temp_filter.tick"脚本,参考如下:
{
    "config": {
        "cert_type": ["zmq", "pem"],
        "influxdb": {},
        "task": [
           {
              "task_name": "temp_filter",
              "tick_script": "temp_filter.tick"
           }
        ]
    },
    "interfaces": {}
}
  • 然后,我们将脚本挂载到容器内,使得我们无需重新编译Kapacitor的docker镜像。在文件"Kapacitor/docker-compose.yml"中,添加如下文件挂载信息 (参考最后一行):
    volumes:
       - "vol_temp_kapacitor:/tmp/"
       - "vol_eii_socket:${SOCKET_DIR}"
       - "vol_dev_shm:/dev/shm"
       - ./Certificates/Kapacitor:/run/secrets/Kapacitor:ro
       - ./Certificates/rootca/cacert.pem:/run/secrets/rootca/cacert.pem:ro
       - ../Kapacitor/tick_scripts:/EII/tick_scripts
  • 最后,我们将Kapacitor模块也添加到模块编排文件"build/usecases/test-emb.yml"中。
AppContexts:
- ConfigMgrAgent
- Samples/publisher/python/ubuntu
- Samples/subscriber/python/ubuntu
- Telegraf
- InfluxDBConnector
- Grafana
- Kapacitor

3.4.2 启动EII软件栈

配置完毕后,我们来启动EII软件栈。

  • 编译模块配置信息
$ cd ~/eii/IEdgeInsights/build
$ sudo -E python3 builder.py -f usecases/test-emb.yml
  • 启动EII软件栈。
$ ./eii_start.sh
  • 打开浏览器,输入网址"localhost:3000"登录Grafana界面 (默认账号: admin, 密码: admin),大家会发现这里仍然提示需要修改密码,暂且忽略,我们在后面解答。
    请添加图片描述

  • 点击左边栏"Create" -> “Dashboard”,创建一个新的仪表板。
    请添加图片描述

  • 然后点击"Add an empty panel",添加一个新图表。
    请添加图片描述

  • 在弹出的界面中,“Data source"选择"InfluxDB”,点击"measurement",大家这里可能看到,"measurement"里出现了"temperature"表和"temp_filter"表。这里"temperature"表里存放的即是10-30原始温度数据,而"temp_filter"表里存放的则是已经过滤掉20-25温度的数据。
    请添加图片描述

  • 接着,我们参考先前的步骤,分别创建一个显示原始温度数据的图表,以及一个显示过滤后温度的图表。最终结果如下图所示,通过对比,可以很明显看出过滤算法的效果。
    请添加图片描述

  • 若想关闭EII软件栈,执行如下命令即可。

$ cd ~/eii/IEdgeInsights/build
$ docker-compose down

以上,我们即实现了通过EII Message Bus发送数据,由Telegraf模块接收数据并导入InfluxDB模块进行存储,然后由Kapacitor模块进行分析过滤,最后由Grafana模块进行数据可视化的完整流程。

后记

  • 通过本文,我们介绍了EII Message Bus的原理,并结合EII时序栈(Telegraf, InfluxDB, Grafana, Kapacitor)进行了案例演示,实现了简单的数据采集,存储,可视化和分析。希望对大家了解EII有所帮助。
  • 大家在实操过程中,可能发现了,当EII软件栈重启时,InfluxDB中的数据,以及Grafana模块中保存的图表数据都不见了,包括Grafana设置的用户密码也重置了。这些问题,我们将放在下一篇文章中解答。敬请关注。

网站公告

今日签到

点亮在社区的每一天
去签到