1. EII Message Bus简介
本文详细介绍了EII Message Bus原理以及实操案例。
实操本文案例前,用户须安装好EII 3.0,并跑通了EII自带的PCBA缺陷检测案例和温度监控案例。EII的下载与安装,请参考上一篇文章:英特尔工业边缘洞见平台(EII)安装指南。
1.1 ZeroMQ简介
EII的架构图如下,EII Message Bus作为EII的消息中间件,可支持EII各个模块之间的通讯,同时也能支持EII模块与外部程序的数据交换。
EII Message Bus是基于ZeroMQ开发的,并且EII模块中对于EII Message Bus的很多配置选项都和ZeroMQ的性质相关,所以我们有必要先介绍一下ZeroMQ。
ZeroMQ (官网: https://zeromq.org/) 是一个开源的通用消息库,它提供类似于socket编程的API基于不同的网络协议实现多种通讯模式。例如,使用ZeroMQ同一套API可以实现进程内(in-process),进程间(IPC),以及跨网络的通讯(TCP/multicast),另外ZeroMQ支持Request-Reply,Pub-Sub,Pull-Push等通讯模式。
ZeroMQ名字里虽然有MQ (Message Queue),但是它本质上并不是一个消息队列,而是一个消息内核 (Message Library)。所以,它和RabbitMQ, RocketMQ, Kafka等产品还是有一些区别的。ZeroMQ作为一个非常轻量化的消息内核,提供去中心化的,低延迟的通讯,可以很方便地集成在不同的系统中。
EII Message Bus对ZeroMQ的Request-Reply和Pub-Sub这两种通讯方式进行了封装,我们这里重点介绍一下。
首先是Request-Reply模式,该模式类似于传统Server-Client模式,即由Client(Request)端先发出请求,Server(Reply)端收到请求进行回复,属于一问一答式的通讯。
Pub-Sub模式,即发布-订阅模式,这种模式在很多通讯协议中都有被使用到(如MQTT)。它主要用于1对多的通讯场景中。当一个发布者发布消息后,可以有多个订阅者订阅消息,同时一个订阅者也可以订阅多个发布者的消息。
1.2 配置EII Message Bus
接下来我们介绍,在EII功能模块中,如何配置EII Message Bus的通讯接口呢?我们以EII自带的PCBA缺陷检测案例为例进行介绍。
在PCBA缺陷检测案例中,由VideoIngestion模块进行图像的采集,然后数据传递给VideoAnalytics模块进行分析,分析结果最后传递给Visualizer模块进行显示。
- 首先,我们介绍VideoIngestion模块的发布接口,该接口用于VideoIngestion模块采集到图像数据后,将数据发布出去。查看VideoIngestion模块的配置文件"VideoIngestion/config.json",在"interfaces"字段中找到"Publishers"的定义,具体如下:
"Publishers": [
{
"Name": "default",
"Type": "zmq_ipc",
"EndPoint": "/EII/sockets",
"Topics": [
"camera1_stream"
],
"AllowedClients": [
"VideoAnalytics", "Visualizer", "WebVisualizer", "TLSRemoteAgent", "RestDataExport"
]
}
]
字段具体含义如下:
- “Name"字段可任意填写。
- “Type"字段为"zmq_ipc”,表示采用ZeroMQ的进程间(IPC)的通讯方式。
- “EndPoint"字段为”/EII/sockets”,表示存放通讯媒介socket文件的存放地址。
- “Topics"字段为"camera1_stream”。Pub-Sub模式一般都是通过主题(topic)名字来实现消息的订阅。
- "AllowedClients"字段是一个简单的安全保护,只有这里定义的模块,才有权限订阅VideoIngestion模块发布的数据。
- 接着,来看VideoAnalytics模块的订阅接口,该接口用于从VideoIngestion订阅数据。查看VideoAnalytics模块的配置文件"VideoAnalytics/config.json",在"interfaces"字段中找到"Subscribers"的定义,具体如下:
"Subscribers": [
{
"Name": "default",
"Type": "zmq_ipc",
"EndPoint": "/EII/sockets",
"PublisherAppName": "VideoIngestion",
"Topics": [
"camera1_stream"
],
"zmq_recv_hwm": 50
}
],
可以看到VideoAnalytics模块的Subscriber接口定义和VideoIngestion模块的Publisher接口定义是对应的。
- “Name"字段可任意填写。
- “Type"字段为"zmq_ipc”。与VideoIngestion模块的Publisher接口保持一致。
- “EndPoint"字段为”/EII/sockets”。与VideoIngestion模块的Publisher接口保持一致。
- “Topics"字段为"camera1_stream”。与VideoIngestion模块的Publisher接口保持一致。
- "PublisherAppName"字段填写VideoIngestion模块的名字,该名字的定义可以去文件"VideoIngestion/docker-compose.yml"中,"environment"字段下的"AppName"变量查看。
- "zmq_recv_hwm"字段是ZeroMQ的一个设置选项,指的是通过ZeroMQ收发的消息,在内存中排队队列的长度大小。
- 接着,来看VideoAnalytics模块的发布接口,VideoAnalytics模块从VideoIngestion模块接收数据,然后在模块内部进行分析处理,处理结果再经由发布接口发布出去。仍然是在配置文件"VideoAnalytics/config.json",在"interfaces"字段中找到"Publishers"的定义,具体如下:
"Publishers": [
{
"Name": "default",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65013",
"Topics": [
"camera1_stream_results"
],
"AllowedClients": [
"*"
]
}
]
相比于VideoIngestion的发布接口,这里有的些许不同,我们来具体看一下:
- “Name"字段仍然可以任意填写。
- “Type"字段为"zmq_tcp”。这里和VideoIngestion模块就不一样了,VideoIngestion定义的是"zmq_ipc”,表示采用ZeroMQ的进程间(IPC)的通讯方式,这种方式只能实现同一台机器上进程之间的通讯。而VideoAnalytics这里定义的是"zmq_tcp",表示采用的是ZeroMQ的TCP通讯方式,可以实现跨机器,跨网络的通讯。VideoAnalytics模块的发布接口采用TCP方式的好处是,可以支持远端的机器远程查看处理结果。
- “EndPoint"字段为"0.0.0.0:65013”。由于这里采用了"zmq_tcp"的通讯方式,所以"EndPoint"需要配置成TCP通讯时,需要用到的IP地址 + 端口号Port的形式。
- “Topics"字段为"camera1_stream_results”,用于发布处理结果。
- “AllowedClients"字段是”*"。表示任意模块,都可以订阅VideoAnalytics模块发布的处理结果。
- 最后,我们来看看Visualizer模块的订阅接口。到这里,大家应该比较清楚了,Visualizer模块是从VideoAnalytics模块订阅处理结果来进行可视化,所以它的订阅接口的定义需要和VideoAnalytics模块的发布模块对应。
"Subscribers": [
{
"Name": "default",
"Type": "zmq_tcp",
"EndPoint": "ia_video_analytics:65013",
"PublisherAppName": "VideoAnalytics",
"Topics": [
"camera1_stream_results"
]
},
{
"Name": "default",
"Type": "zmq_tcp",
"EndPoint": "ia_python_safety_gear_analytics:65019",
"PublisherAppName": "PySafetyGearAnalytics",
"Topics": [
"py_safety_gear_stream_results"
]
},
{
"Name": "default",
"Type": "zmq_tcp",
"EndPoint": "ia_native_safety_gear_analytics:65017",
"PublisherAppName": "NativeSafetyGearAnalytics",
"Topics": [
"native_safety_gear_stream_results"
]
}
]
我们可以看到,这里定义了多个订阅接口。其中后面两个是和其他案例相关,这里我们只需查看第一个。
和VideoAnalytics模块的发布接口对照来看:
- “Type"字段为"zmq_tcp”。与VideoAnalytics模块的Publisher接口保持一致。
- “EndPoint"字段为"ia_video_analytics:65013”。与VideoAnalytics模块的Publisher接口保持一致。在docker容器的使用中,可以使用容器service名字来代替容器的IP地址,可以在文件"VideoAnalytics/docker-compose.yml"第25行查看容器service名字。
- "PublisherAppName"字段填写VideoAnalytics模块的名字,该名字的定义可以去文件"VideoAnalytics/docker-compose.yml"中,"environment"字段下的"AppName"变量查看。
- “Topics"字段为"camera1_stream_results”,与VideoAnalytics模块的Publisher接口保持一致。
以上,我们介绍EII Message Bus在EII各个模块中的配置方式,接下来我将介绍实操案例,大家可以在实际操作中,来感受EII Message Bus的用法。
2. EII Message Bus测试模块
2.1 查看测试模块的配置
EII提供了多种编程语言的Samples模块,可以用于测试EII Message Bus的基本功能。我们以Python为例,Python的案例程序位于目录"Samples/publisher/python"(发布模块),和目录"Samples/subscriber/python"(订阅模块)。
我们首先来看,发布模块和订阅模块之间的通信接口是如何定义的。
- 先看发布模块的配置文件"Samples/publisher/python/ubuntu/config.json",其中定义了发布接口"Publishers"。
"Publishers": [ { "Name": "default", "Type": "zmq_tcp", "EndPoint": "0.0.0.0:65020", "AllowedClients": [ "*" ], "Topics": [ "camera1_stream_results", "camera2_stream_results_metaonly" ] } ]
- 接着来看订阅模块的配置文件"Samples/subscriber/python/ubuntu/config.json",其中定义了订阅接口"Subscribers"。
"Subscribers": [ { "Name": "default", "Type": "zmq_tcp", "EndPoint": "ia_ubuntu_python_sample_pub:65020", "PublisherAppName": "UbuntuPythonSamplePub", "Topics": [ "camera1_stream_results" ] } ]
通过第一章节的介绍,大家应该发布接口和订阅接口如何定义已经很熟悉了。这里就不再详细解释了。
接着,我们来看功能模块中其他的配置文件。在EII的功能模块中,有3个文件是必备的,分别是config.json, Dockerfile, docker-compose.yml。config.json用于对模块的功能,通讯接口等进行配置,Dockerfile和docker-compose.yml对于接触过Docker容器技术的用户来说,应该不陌生。Dockerfile用于构建功能模块的docker镜像,docker-compose.yml则用于实现对多个容器运行的编排,其中可以定义环境变量,文件挂载等信息。发布模块和订阅模块的Dockerfile和docker-compose.yml文件的细节,这里就不详细解释了,大家可自行查看。
从两个模块的Dockerfile的末尾可以发现,两个模块的入口均是main.py脚本。
ENTRYPOINT ["python3", "-u", "main.py"]
以发布模块的main.py为入口,可以看到在发布模块的publisher.py脚本中,实现了读取接口配置"pub_ctx.get_msgbus_config()“,创建’publisher’接口"msgbus_pub.new_publisher()”,并通过"publisher.publish()"发布数据的功能。
同样,以订阅模块的main.py为入口,可以看到在订阅模块的subscriber.py脚本中,实现了读取接口配置"sub_ctx.get_msgbus_config()“,创建’subscriber’接口"msgbus_sub.new_subscriber()”,并通过"subscriber.recv()"接收数据的功能。
2.2 运行测试模块
接下来,我们将测试模块跑起来。
我们将在EII源码目录"~/eii/IEdgeInsights"目录下进行操作。
首先创建一个模块编排文件"build/usecases/test-emb.yml",并将"publisher"和"subscriber"模块添加进来,"ConfigMgrAgent"模块用于配置文件管理,默认必须添加。
AppContexts:
- ConfigMgrAgent
- Samples/publisher/python/ubuntu
- Samples/subscriber/python/ubuntu
- 编译模块配置信息
$ cd ~/eii/IEdgeInsights/build
$ sudo -E python3 builder.py -f usecases/test-emb.yml
编译模块docker镜像。
- (For PRC网络) 修改源信息,加速国内网络环境下的build docker镜像过程。
# 文件Samples/publisher/python/ubuntu/docker-compose.yml第34行,"github"修改为"gitee",参考如下: 第34行: PKG_SRC: https://github.com/open-edge-insights/eii-manifests/releases/download/v3.0 修改为: PKG_SRC: https://gitee.com/open-edge-insights/eii-manifests/releases/download/v3.0 # 文件Samples/publisher/python/ubuntu/Dockerfile第91行,pip源修改为清华源,参考如下: 第91行: pip3 install -r requirements.txt; \ 修改为: pip3 install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple; \ # 文件Samples/subscriber/python/ubuntu/docker-compose.yml第34行,"github"修改为"gitee",参考如下: 第34行: PKG_SRC: https://github.com/open-edge-insights/eii-manifests/releases/download/v3.0 修改为: PKG_SRC: https://gitee.com/open-edge-insights/eii-manifests/releases/download/v3.0 # 文件Samples/subscriber/python/ubuntu/Dockerfile第91行,pip源修改为清华源,参考如下: 第91行: pip3 install -r requirements.txt; \ 修改为: pip3 install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple; \
- 执行编译docker镜像命令。(由于ConfigMgrAgent docker镜像在先前PCBA案例中已经build好了,所以这里可以不用重复编译,只需要编译"publisher"和"subscriber"模块)
$ docker-compose -f docker-compose-build.yml build ia_ubuntu_python_sample_pub ia_ubuntu_python_sample_sub
模块docker镜像编译成功后(可通过"docker images"命令查看),启动EII软件栈。
$ ./eii_start.sh
- EII软件栈启动成功后,可通过"docker ps"命令查看容器运行情况。
$ docker ps
- 查看发布模块日志。
# 执行docker logs命令,查看发布模块的日志
$ docker logs -f ia_ubuntu_python_sample_pub
发布模块日志信息如下。
[INFO] Msg published by publisher : '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
[INFO] Received request from client: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
[INFO] Msg published by publisher : '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
[INFO] Received request from client: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
[INFO] Msg published by publisher : '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
[INFO] Received request from client: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
- 打开另外一个命令行窗口,查看订阅模块的日志。
# 执行docker logs命令,查看订阅模块的日志
$ docker logs -f ia_ubuntu_python_sample_sub
订阅模块日志信息如下。
[INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
[INFO] Sending request {'int': 42, 'float': 55.5, 'str': 'Hello, World!', 'bool': True}
[INFO] Waiting for response
[INFO] Received response: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
[INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
[INFO] Sending request {'int': 42, 'float': 55.5, 'str': 'Hello, World!', 'bool': True}
[INFO] Waiting for response
[INFO] Received response: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
[INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
[INFO] Sending request {'int': 42, 'float': 55.5, 'str': 'Hello, World!', 'bool': True}
[INFO] Waiting for response
[INFO] Received response: {'int': 42, 'float': 55.5, 'bool': 1, 'str': 'Hello, World!'}
大家可能已经发现了日志里还有client和server的打印信息,这是因为测试模块中还带有client和server的通信代码。接下来,我们将client和server部分注释掉,重新再运行一遍。
2.3 只运行Pub-Sub通信部分
- 首先,将client和server部分注释掉。
- 打开文件"Samples/publisher/python/main.py",参照如下内容,将"server"部分注释掉,只保留"publisher"。
def main(): t1 = threading.Thread(target=publisher.start_publisher) # t2 = threading.Thread(target=server.start_server) t1.start() # t2.start() t1.join() # t2.join()
- 接着打开文件"Samples/subscriber/python/main.py",参照如下内容,将"client"部分注释掉,只保留"subscriber"。
def main(): t1 = threading.Thread(target=subscriber.start_subscriber,) # t2 = threading.Thread(target=client.start_client,) t1.start() # t2.start() t1.join() # t2.join()
- 重新编译和运行。
- 编译模块配置信息
$ cd ~/eii/IEdgeInsights/build $ sudo -E python3 builder.py -f usecases/test-emb.yml
- 编译docker镜像。
$ docker-compose -f docker-compose-build.yml build ia_ubuntu_python_sample_pub ia_ubuntu_python_sample_sub
- 启动EII软件栈。
$ ./eii_start.sh
- 查看发布模块日志。
发布模块日志信息如下。$ docker logs -f ia_ubuntu_python_sample_pub
此时日志信息就比较清晰了,只剩下"publisher"发送的信息。[INFO] Msg published by publisher : '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}' [INFO] Msg published by publisher : '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}' [INFO] Msg published by publisher : '{'integer': 123, 'floating': 55.5, 'string': 'test', 'boolean': True, 'empty': None, 'obj': {'test': {'test2': 'hello'}, 'test3': 'world'}, 'arr': ['test', 123]}'
- 打开另外一个命令行窗口,查看订阅模块的日志。
订阅模块日志信息如下。$ docker logs -f ia_ubuntu_python_sample_sub
[INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1} [INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1} [INFO] RECEIVED by subscriber : {'empty': None, 'string': 'test', 'obj': {'test3': 'world', 'test': {'test2': 'hello'}}, 'integer': 123, 'floating': 55.5, 'arr': ['test', 123], 'boolean': 1}
- 想要关闭EII软件栈,在"build"目录下执行"docker-compose down"命令即可。
$ cd ~/eii/IEdgeInsights/build
$ docker-compose down
通过以上内容,想必大家对EII Message Bus的Publisher和Subscriber通信方式,以及EII功能模块的编译和启动流程有了基本了解。接下来我们结合EII的时序数据模块来进一步学习一下EII Message Bus的使用。
3. 通过EII Message Bus传递数据到EII时序栈中
通过上述实操,我们使用EII Message Bus实现了数据的发布和订阅。
不过大家可能发现了,测试模块中发送的数据是写死的,没有变化 。在接下来的测试中,我们将对发送的数据做一些修改,来让测试变得有趣一点。
此外我们还要将测试数据通过EII Message Bus导入到EII的时序栈中(Telegraf, InfluxDB, Grafana, Kapacitor)来实现数据的存储,分析和可视化。
3.1 对发送数据进行修改
- 首先,我们修改发布程序,让"publisher"程序发布随机的温度数据。参考如下内容,修改"publisher"程序"Samples/publisher/python/publisher.py"。让"publisher"程序发布10到30之间随机的温度数据。
# 第51-60行,对于meta变量的定义:
meta = {
'integer': 123,
'floating': 55.5,
'string': 'test',
'boolean': True,
'empty': None,
'obj': {'test': {'test2': 'hello'}, 'test3': 'world'},
'arr': ['test', 123]
}
# 修改为:
meta = {
'temperature': random.uniform(10,30)
}
# 第27行,导入'random'库
import random
3.2 配置EII时序栈
接下来,我们将引入EII的时序栈相关的模块。
- 首先配置Telegraf模块。Telegraf和"subscriber"模块类似,需要从"publisher"模块订阅数据。所以,我们参考"subscriber"模块的config.json文件来配置Telegraf的config.json文件“Telegraf/config.json”。Telegraf最终的配置信息如下,其中"config"字段,增加了"temperature_sub"定义,“interface"字段增加了"Subscribers"定义(注意"Subscribers"中的"Name"字段也为"temperature_sub”)。
{
"config": {
"cert_type": ["zmq", "pem"],
"influxdb": {
"dbname": "datain"
},
"publisher": {
"measurements": ["*"],
"profiling": "false"
},
"temperature_sub": {
"topics_info": [
"camera1_stream_results:temperature"
],
"queue_len": 10,
"num_worker": 1,
"profiling": "false"
}
},
"interfaces": {
"Publishers": [
{
"Name": "publisher",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65077",
"Topics": [
"*"
],
"AllowedClients": [
"*"
]
}
],
"Subscribers": [
{
"Name": "temperature_sub",
"Type": "zmq_tcp",
"EndPoint": "ia_ubuntu_python_sample_pub:65020",
"PublisherAppName": "UbuntuPythonSamplePub",
"Topics": [
"camera1_stream_results"
]
}
]
}
}
- 接着我们来配置Telegraf的插件配置文件"Telegraf/config/Telegraf/Telegraf_devmode.conf" (检查IEdgeInsights/build/.env文件中"DEV_MODE"是否为true,为true表示"开发模块",将使用Telegraf_devmode.conf配置文件,否则将使用Telegraf.conf配置文件),在该文件的3961行,添加如下内容。注意这里的"instance_name"也为"temperature_sub"。
[[inputs.eii_msgbus]]
instance_name = "temperature_sub"
data_format = "json"
json_strict = true
- 然后我们在"Telegraf/docker-compose.yml"文件中,将配置文件挂载到容器中,使得稍后我们无需重新编译docker镜像。参考如下内容,在文件第72行添加挂载信息。
volumes:
- "vol_temp_telegraf:/tmp/"
- "vol_eii_socket:${SOCKET_DIR}"
- ./Certificates/Telegraf:/run/secrets/Telegraf:ro
- ./Certificates/rootca/cacert.pem:/run/secrets/rootca/cacert.pem:ro
- "../Telegraf/config/Telegraf/Telegraf_devmode.conf:/etc/Telegraf/Telegraf/Telegraf_devmode.conf"
到这里呢,我就配置好了"Telegraf"模块,该模块将通过“Telegraf/config.json”中的配置的EII Message Bus接口,实现从Samples “publisher"模块订阅数据,然后通过"Telegraf/config/Telegraf/Telegraf_devmode.conf"中"eii_msgbus"输入插件,将数据注入到Telegraf模块中。那么有输入"input”,就要有输出"output"。
- Telegraf模块已经默认配置好了一些输出"outputs"插件。例如,将数据输出到InfluxDB数据库。在文件"Telegraf/config/Telegraf/Telegraf_devmode.conf"中搜索"outputs.influxdb",定位到"influxdb"输出插件的位置。可以看到这里配置了InfluxDB数据库的地址,账号和密码。(这里我们不用做额外修改,使用默认配置即可)
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
# urls = ["unix:///var/run/influxdb.sock"]
# urls = ["udp://127.0.0.1:8089"]
urls = ["http://$INFLUX_SERVER:$INFLUXDB_PORT"]
## The target database for metrics; will be created as needed.
database = "$INFLUXDB_DBNAME"
## If true, no CREATE DATABASE queries will be sent. Set to true when using
## Telegraf with a user without permissions to create databases or when the
## database already exists.
skip_database_creation = true
## HTTP Basic Auth
username = "$INFLUXDB_USERNAME"
password = "$INFLUXDB_PASSWORD"
InfluxDB和Grafana模块使用默认配置即可,我们也不用做额外的修改。
以上,我们就配置好了EII的Telegraf, InfluxDB, Grafana模块,可以实现数据的存储和可视化,我们暂时先不引入Kapacitor模块,先来测试数据的存储和可视化功能。
接着,我们来配置模块编排文件"build/usecases/test-emb.yml",将Telegraf, InfluxDB, Grafana这三个模块的模块名,添加到模块编排文件中。
AppContexts:
- ConfigMgrAgent
- Samples/publisher/python/ubuntu
- Samples/subscriber/python/ubuntu
- Telegraf
- InfluxDBConnector
- Grafana
3.3 运行存储+可视化案例
完成以上配置后,我们来启动整个软件栈。
- 编译模块配置信息
$ cd ~/eii/IEdgeInsights/build $ sudo -E python3 builder.py -f usecases/test-emb.yml
- 编译docker镜像('publisher’模块修改过源码,所以需要重新编译,其他模块只是配置文件的修改,无需重新编译)。
$ docker-compose -f docker-compose-build.yml build ia_ubuntu_python_sample_pub
- 启动EII软件栈。
$ ./eii_start.sh
- 查看发布模块日志。
$ docker logs -f ia_ubuntu_python_sample_pub
发布模块日志信息如下。
[INFO] Msg published by publisher : '{'temperature': 13.908131420251822}' [INFO] Msg published by publisher : '{'temperature': 24.90195310641654}' [INFO] Msg published by publisher : '{'temperature': 20.590006255570245}'
此时日志打印的是10-30之间随机的温度数据。
- 打开另外一个命令行窗口,查看订阅模块的日志。
$ docker logs -f ia_ubuntu_python_sample_sub
订阅模块日志信息如下,说明成功收到了数据。
[INFO] RECEIVED by subscriber : {'temperature': 13.908131420251822} [INFO] RECEIVED by subscriber : {'temperature': 24.90195310641654} [INFO] RECEIVED by subscriber : {'temperature': 20.590006255570245}
接下来,我们来查看一下,温度数据是否成功导入了EII的时序栈呢?
- 首先查看数据是否存入了InfluxDB数据库。
# 进入InfluxDB容器内部 $ docker exec -it ia_influxdbconnector /bin/bash # 登录influx的数据库datain $ influx -username "admin" -password "admin123" -database "datain" # 查看数据表 $ show measurements # 查询temperature数据表数据(前10条) $ select * from temperature limit 10
- 若成功查询到数据,则说明数据成功存入InfluxDB数据库。
> select * from temperature limit 10 name: temperature time host temperature ---- ---- ----------- 1662958358010377742 ia_telegraf 13.423975245848652 1662958359011378683 ia_telegraf 16.121738058536394 1662958360011835195 ia_telegraf 29.126776840391607 1662958361012156050 ia_telegraf 13.657501242332737 1662958362012814971 ia_telegraf 17.99415435616606 1662958363013936581 ia_telegraf 28.59169796074449 1662958364014790541 ia_telegraf 29.09044813076643 1662958365015604115 ia_telegraf 12.120675429378629 1662958366016797502 ia_telegraf 26.1021303770315 1662958367017682936 ia_telegraf 17.839165649354236
在Grafana界面中,可视化数据。
打开浏览器,输入网址"localhost:3000"登录Grafana界面 (默认账号: admin, 密码: admin),第一次登录,提示修改密码。
点击左边栏"Create" -> “Dashboard”,创建一个新的仪表板。
然后点击"Add an empty panel",添加一个新图表。
在弹出的界面中,“Data source"选择"InfluxDB”,“measurement"选择"temperature”,“field"选择"temperature”。
此时在上方图表,将看到温度数据的呈现。
点击右上角"apply",则会保存该图表并返回仪表盘。将时间窗口调整至显示最近5分钟的数据。
仪表盘刷新时间调整为每5秒刷新一次。
此时,我们将会看到不断刷新的温度数据(10-30之间)。
到这里,我们就完成了数据的可视化。接下来可以保存该仪表板。点击右上角"Save dashboard"。
输入仪表板名字,并点击"Save"。
若想关闭EII软件栈,执行如下命令即可。
$ cd ~/eii/IEdgeInsights/build
$ docker-compose down
3.4 增加数据分析模块
这一小节,我们将Kapacitor模块也加进来,来实现一些简单的数据处理。
例如,我们在采集到10到30度之间的温度数据之后,将20到25之间的数据过滤掉,实现一个简单的数据过滤。
接下来,我们开始实操。
3.4.1 配置Kapacitor模块
- 首先,我们创建一个TICKScript (Kapacitor特有的用来编写规则的脚本语言),用来实现过滤算法。在目录"Kapacitor/tick_scripts"下,创建脚本"temp_filter.tick",并输入如下内容。
dbrp "datain"."autogen"
var data0 = stream
|from()
.database('datain')
.retentionPolicy('autogen')
.measurement('temperature')
.where(lambda: "temperature" < 20 OR "temperature" > 25)
|influxDBOut()
.buffer(0)
.database('datain')
.measurement('temp_filter')
.retentionPolicy('autogen')
该脚本实现了从数据库"datain"的数据表"temperature"中,读取数据,并只将字段"temperature"小于20或者大于25的提取出来,接着再将提取出来的数据,写回数据库"datain"的另外一个表"temp_filter"中。
TICKScript脚本的详细语法,可参考Kapacitor官方文档:https://docs.influxdata.com/kapacitor/v1.5/tick/syntax/
- 接着,我们让Kapacitor模块引用该脚本。在配置文件"Kapacitor/config.json"中,添加"temp_filter.tick"脚本,参考如下:
{
"config": {
"cert_type": ["zmq", "pem"],
"influxdb": {},
"task": [
{
"task_name": "temp_filter",
"tick_script": "temp_filter.tick"
}
]
},
"interfaces": {}
}
- 然后,我们将脚本挂载到容器内,使得我们无需重新编译Kapacitor的docker镜像。在文件"Kapacitor/docker-compose.yml"中,添加如下文件挂载信息 (参考最后一行):
volumes:
- "vol_temp_kapacitor:/tmp/"
- "vol_eii_socket:${SOCKET_DIR}"
- "vol_dev_shm:/dev/shm"
- ./Certificates/Kapacitor:/run/secrets/Kapacitor:ro
- ./Certificates/rootca/cacert.pem:/run/secrets/rootca/cacert.pem:ro
- ../Kapacitor/tick_scripts:/EII/tick_scripts
- 最后,我们将Kapacitor模块也添加到模块编排文件"build/usecases/test-emb.yml"中。
AppContexts:
- ConfigMgrAgent
- Samples/publisher/python/ubuntu
- Samples/subscriber/python/ubuntu
- Telegraf
- InfluxDBConnector
- Grafana
- Kapacitor
3.4.2 启动EII软件栈
配置完毕后,我们来启动EII软件栈。
- 编译模块配置信息
$ cd ~/eii/IEdgeInsights/build
$ sudo -E python3 builder.py -f usecases/test-emb.yml
- 启动EII软件栈。
$ ./eii_start.sh
打开浏览器,输入网址"localhost:3000"登录Grafana界面 (默认账号: admin, 密码: admin),大家会发现这里仍然提示需要修改密码,暂且忽略,我们在后面解答。
点击左边栏"Create" -> “Dashboard”,创建一个新的仪表板。
然后点击"Add an empty panel",添加一个新图表。
在弹出的界面中,“Data source"选择"InfluxDB”,点击"measurement",大家这里可能看到,"measurement"里出现了"temperature"表和"temp_filter"表。这里"temperature"表里存放的即是10-30原始温度数据,而"temp_filter"表里存放的则是已经过滤掉20-25温度的数据。
接着,我们参考先前的步骤,分别创建一个显示原始温度数据的图表,以及一个显示过滤后温度的图表。最终结果如下图所示,通过对比,可以很明显看出过滤算法的效果。
若想关闭EII软件栈,执行如下命令即可。
$ cd ~/eii/IEdgeInsights/build
$ docker-compose down
以上,我们即实现了通过EII Message Bus发送数据,由Telegraf模块接收数据并导入InfluxDB模块进行存储,然后由Kapacitor模块进行分析过滤,最后由Grafana模块进行数据可视化的完整流程。
后记
- 通过本文,我们介绍了EII Message Bus的原理,并结合EII时序栈(Telegraf, InfluxDB, Grafana, Kapacitor)进行了案例演示,实现了简单的数据采集,存储,可视化和分析。希望对大家了解EII有所帮助。
- 大家在实操过程中,可能发现了,当EII软件栈重启时,InfluxDB中的数据,以及Grafana模块中保存的图表数据都不见了,包括Grafana设置的用户密码也重置了。这些问题,我们将放在下一篇文章中解答。敬请关注。