C++高性能通信:了解Iceoryx与零拷贝技术的实现与应用

发布于:2024-07-25 ⋅ 阅读:(168) ⋅ 点赞:(0)

0. 引言

Iceoryx是一个开源的实时通信框架,特别适用于需要高性能和低延迟的嵌入式系统,如自动驾驶系统、机器人控制、航空航天等。

详细介绍请看官网

img

iceoryx 使用真正的零拷贝共享内存方法,允许将数据从发布者传输到订阅者而无需任何副本。这可确保数据传输具有恒定的延迟,无论有效负载的大小是多少。

img
关于进程间通信,Iceoryx与Nanomsg的对比请看从 Nanomsg 到 Iceoryx: 发布-订阅模式的性能对比

1. Iceoryx使用到的零拷贝技术

1.1 零拷贝技术概述

零拷贝是指在数据传输过程中,避免不必要的数据拷贝操作。传统的数据传输通常涉及将数据从一个缓冲区复制到另一个缓冲区,这会产生额外的开销。而零拷贝技术允许数据在不进行拷贝的情况下直接传递到目标缓冲区,从而提高传输效率。

Iceoryx使用共享内存进行进程间通信,将数据放置在共享内存区域,然后通过指针引用实现数据传递,避免了数据的额外复制。

1.2 零拷贝的优势

  1. 减少CPU开销:零拷贝减少了CPU的复制操作,提高了系统性能。
  2. 降低内存占用:由于数据不需要在不同缓冲区之间复制,内存使用更为高效。
  3. 降低传输延迟:数据直接传递,无需复制等待时间。

1.3 Iceoryx零拷贝的实现

Iceoryx通过以下方式实现真正的零拷贝:

  • 利用共享内存技术,预先开辟内存块(chunk),publisher将数据写入。
  • Subscriber通过指针获取chunk中的信息,数据被写入时,subscriber收到一个指针。
  • Iceoryx维护每个chunk的引用记录,确保资源不被浪费。

1.4 信息轮询与信号触发

为了提升数据获取效率,Iceoryx提供两种方式:

  • WaitSet:采用react设计模式,绑定对应的subscribers,数据到来时触发通知。
  • Listener:直接触发用户定制的callback,数据到来时调用回调函数。

2. Iceoryx的核心概念

掌握以下核心概念对于理解和开发基于Iceoryx的通信功能至关重要:

2.1 RouDi (iox-roudi)

定义
RouDi是Iceoryx的中间件守护进程(daemon),负责管理和协调不同应用之间的通信。它是Iceoryx通信框架中的核心组件,所有使用Iceoryx的应用都需要与RouDi建立连接才能进行正常的数据交换。

功能

  • 路由与分发:RouDi作为中心节点,接收来自发布者的数据,并根据订阅者的需求将数据分发到相应的订阅者。
  • 资源管理与优化:管理共享内存资源,确保资源得到合理分配和回收,以提高系统性能和资源利用率。
  • 安全性与隔离:实施必要的安全措施,确保通信过程的安全性,并通过隔离机制防止不同应用之间的数据冲突。

使用
在启动任何使用Iceoryx的应用之前,必须先启动RouDi守护进程。

2.2 Runtime

定义
Runtime是Iceoryx为每个应用提供的运行时环境。在应用启动时,需要初始化其对应的Runtime,以便应用能够接入Iceoryx的通信框架。

功能

  • 初始化:为应用提供必要的初始化步骤,使其能够注册为Iceoryx通信框架的一部分。
  • 资源分配:为应用分配必要的资源,如共享内存段、消息队列等。
  • 通信管理:管理应用与其他参与者(如其他应用、RouDi等)之间的通信。

使用

constexpr char APP_NAME[] = "iox-publisher";
iox::runtime::PoshRuntime::initRuntime(APP_NAME);

2.3 Publisher

定义
Publisher是Iceoryx中的数据发送器,负责将数据发布到指定的Topic上,以便订阅者可以接收。

功能

  • 数据发送:将数据写入共享内存中的指定位置,并通知RouDi该数据已准备好被分发。
  • Topic绑定:Publisher需要与特定的Topic绑定,以便订阅者能够识别并接收其发布的数据。

使用

iox::popo::Publisher<Data> publisher({"Group", "Topic", "Instance"});

2.4 Subscriber

定义
Subscriber是Iceoryx中的数据接收器,负责订阅指定的Topic并接收来自发布者的数据。

功能

  • 数据接收:从共享内存中读取发布者发布的数据。
  • Topic绑定:Subscriber需要与特定的Topic绑定,以便接收该Topic上的所有数据。
  • 回调处理:当接收到新数据时,可以触发回调函数来处理数据。

使用

iox::popo::Subscriber<Data> subscriber({"Group", "Topic", "Instance"});

2.5 Topic

定义
Topic是Iceoryx中的数据载体,用于在发布者和订阅者之间传递数据。Publisher将数据发送到指定的Topic,而Subscriber则订阅该Topic以接收数据。

功能

  • 数据传递:作为数据传递的媒介,确保数据能够从发布者正确地传输到订阅者。
  • 命名约定:Topic通过组(Group)、主题(Topic)和实例(Instance)来唯一标识,以便发布者和订阅者能够准确匹配。

使用
在创建Publisher和Subscriber时,需要指定它们要绑定或订阅的Topic名称(包括组、主题和实例)。

3. Iceoryx使用示例

3.1 发布者程序

#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include <chrono>
#include <thread>
#include <iostream>

struct Data {
    char message[128];
};

int main() {
    constexpr char APP_NAME[] = "iox-publisher";
    iox::runtime::PoshRuntime::initRuntime(APP_NAME);
    iox::popo::Publisher<Data> publisher({"Group", "Topic", "Instance"});

    while (true) {
        publisher.loan()
            .and_then([&](auto& sample) {
                std::strcpy(sample->message, "Hello from Publisher");
                sample.publish();
            })
            .or_else([](auto& error) {
                std::cerr << "Loaning sample failed: " << error << std::endl;
            });

        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }

    return 0;
}

3.2 订阅者程序

#include "iceoryx_posh/popo/subscriber.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include <chrono>
#include <thread>
#include <iostream>

struct Data {
    char message[128];
};

int main() {
    constexpr char APP_NAME[] = "iox-subscriber";
    iox::runtime::PoshRuntime::initRuntime(APP_NAME);
    iox::popo::Subscriber<Data> subscriber({"Group", "Topic", "Instance"});

    while (true) {
        subscriber.take()
            .and_then([&](const auto& sample) {
                std::cout << "Received: " << sample->message << std::endl;
            })
            .or_else([](auto& error) {
                if (error != iox::popo::ChunkReceiveResult::NO_CHUNK_AVAILABLE) {
                    std::cerr << "Taking sample failed: " << error << std::endl;
                }
            });

        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    return 0;
}

3.3 编译和运行

确保已安装Iceoryx,并正确配置CMakeLists.txt:

cmake_minimum_required(VERSION 3.5)
project(IceoryxPubSub)

set(CMAKE_CXX_STANDARD 14)
find_package(iceoryx_posh REQUIRED)

add_executable(publisher_iceoryx publisher_iceoryx.cpp)
target_link_libraries(publisher_iceoryx iceoryx_posh::iceoryx_posh iceoryx_posh::iceoryx_posh_roudi_environment)

add_executable(subscriber_iceoryx subscriber_iceoryx.cpp)
target_link_libraries(subscriber_iceoryx iceoryx_posh::iceoryx_posh iceoryx_posh::iceoryx_posh_roudi_environment)

然后在项目根目录创建并运行CMake:

mkdir build
cd build
cmake ..
make

运行RouDi(Iceoryx的守护进程):

iox-roudi &

运行发布者和订阅者程序:

./publisher_iceoryx &
./subscriber_iceoryx &

3.4 压力测试脚本

#include <thread>
#include <vector>
#include <cstdlib>

void run_publisher() {
    system("./publisher_iceoryx");
}

void run_subscriber() {
    system("./subscriber_iceoryx");
}

int main() {
    const int num_publishers = 10;
    const int num_subscribers = 10;
    std::vector<std::thread> threads;

    for (int i = 0; i < num_publishers; ++i) {
        threads.emplace_back(run_publisher);
    }

    for (int i = 0; i < num_subscribers; ++i) {
        threads.emplace_back(run_subscriber);
    }

    for (auto& t : threads) {
        t.join();
    }

    return 0;
}

4. 参考文章

iceoryx源码阅读
iceoryx_github


网站公告

今日签到

点亮在社区的每一天
去签到