在之前的博客中我们介绍了dds的大致功能,与组成结构。本篇博文主要介绍的是xtypes.分为理论和实际运用两部分.理论主要用于梳理hzy大佬的知识,对于某些一带而过的部分作出更为详细的阐释,并在之后通过实际案例便于理解。案例分为普通发布订阅模式与rpc模式。原博客地址:https://zhuanlan.zhihu.com/p/700132625
目录
xtypes是什么?
xtypes是 DDS(Data Distribution Service) 的一个扩展,提供了一种动态和静态数据类型管理机制.以数据为中心是DDS与其他消息中间件的一个重要的区别。它类似于ros的.msg文件但是更为强大。xtypes使得DDS表现的像能够理解业务数据一样。在hzy大佬的博客中总结了以下几点特性:
自定义类型相关的发送/接收接口
即提交给DDS和从DDS中获取的是主题关联的自定义数据结构对象。
- 优势
- 序列化/反序列化的工作从应用下沉到中间件,由中间件考虑端序/对齐/不同语言类型的转换;
- 类型检查,在编译期即可检查出部分问题;
- 劣势
- 使用复杂,即便是简单的收发也需要IDL编译器编译支持代码;
数据筛选
DDS提供类似于数据库的实时数据存储与查询的功能,包括:
-
- 将主题数据按照key值组织,比如订阅端可以仅读取特定key值的数据;
- 内容过滤,即订阅端可以配置只关心某个成员范围之间的值,DDS将自动过滤不属于这个范围的主题数据;
类型规范是不同DDS产品互联互通的基础
- 产品遵循相同的规范使得能够支持的数据类型互认;
- 数据样本序列化方式规范使得A厂家的DDS产品序列化的数据可以由B厂家的DDS产品反序列化还原成相同类型的样本数据;
我们来理解一下是什么意思,首先自定义类型的发送/接收接口是什么意思?fastdds支持两种模式,静态模式需要对应的idl文件通过fastddsgen生成.hpp与xxxtypes.hpp文件。同时xtpes也支持动态类型来发送和接受数据此时无需idl文件.
静态模式
我们来简单看一下普通的静态模式的idl文件是如何编写的:
module state_and_error {
// 错误码请求
@extensibility(MUTABLE)
struct ErrorCode {
string code; // 错误码 (如 "1001", "1002")
};
// 错误码解析响应
@extensibility(MUTABLE)
struct ErrorCodeReply {
string description; // 错误码的解析描述
string suggestion; // 修复建议
};
// 错误处理接口
interface ErrorHandle {
ErrorCodeReply analyze_error(in ErrorCode error);
};
}
上面的module就类似于C++里面的namespace,里面还有个state碍于篇幅我就没放进来,看个原理就可以了。他这里面的消息单元就是用类似结构体的方式来进行编写的。extensibility这些后面会讲到,它用于支持数据扩展性。包括后面的interface,这些都会在后面的篇幅中讲到。这里看完了静态模式,我们来观看一下动态模式是如何编写的:
#include <fastdds/xtypes/dynamic_types/DynamicTypeBuilder.hpp>
#include <fastdds/xtypes/dynamic_types/DynamicData.hpp>
// 1. 创建动态数据类型
DynamicTypeBuilder* builder = DynamicTypeBuilderFactory::get_instance()->create_struct_builder();
builder->add_member(0, "id", DynamicTypeBuilderFactory::get_instance()->create_int32_type());
builder->add_member(1, "name", DynamicTypeBuilderFactory::get_instance()->create_string_type());
DynamicType_ptr myType = builder->build();
// 2. 创建 `DynamicData` 数据对象
DynamicData* myData = DynamicDataFactory::get_instance()->create_data(myType);
myData->set_int32_value(0, 42);
myData->set_string_value(1, "Example");
// 3. 发送数据
dds_writer->write(myData);
这个就相当于一个写在idl文件一个写在了程序里但是他们序列化都需要fastcdr支持。以下是静态和动态的一个对比表:
对比项 | 静态 xtypes (IDL 编译) |
动态 xtypes (运行时创建) |
---|---|---|
定义方式 | 通过 .idl 文件定义 |
运行时动态定义 |
是否需要编译 IDL | ✅ 需要 | ❌ 不需要 |
数据结构变化 | ❌ 不能在运行时修改 | ✅ 运行时可修改 |
类型检查 | ✅ 编译期检查 | ❌ 运行时检查 |
适合的应用场景 | 实时性高、结构固定 | 结构不固定、跨 DDS 版本兼容 |
序列化方式 | DDS CDR(默认高效) | 可用 JSON、CBOR、DDS CDR |
性能 | 更快(直接访问编译好的类型) | 稍慢(需要运行时解析类型) |
ROS 2适配性 | ✅ 是 ROS 2 默认方式 | ❌ 目前 ROS 2 不支持动态 xtypes |
我这边建议使用静态模式,因为对于rpc模式来说,动态模式并不支持interface,并且他在传递性能上较动态模式更弱。但是如果你的数据结构是 “动态的” ,在运行时种类随时可能变化时,动态模式也是较好的选择。但是有mutable其实也可以用静态的。
优势
- 优势
- 序列化/反序列化的工作从应用下沉到中间件,由中间件考虑端序/对齐/不同语言类型的转换;
- 类型检查,在编译期即可检查出部分问题;
这一部分是什么意思呢?在前面我们说了他的序列化是由fastcdr中间件完成的,对于我们程序编写就不用考虑序列化问题,但这也存在一个问题。比如说如果没有自定义序列化插件,将 Protobuf之类的序列化方式转换为 DDS 兼容的数据格式,那么他就不支持其他序列化协议。这种耦合有其好处也有其坏处。像ros1这种没有将序列化下沉到中间件而是用应用层来处理的,就可以通过sfinea机制来让他兼容protobuf.有好有坏吧。类型检查这些也不必多说,常规操作。
劣势
- 劣势
- 使用复杂,即便是简单的收发也需要IDL编译器编译支持代码;
这个怎么说呢,就是常用的静态 xtypes
使用复杂,即使只是简单的消息传输,也需要 IDL 编译。而且他编译器还挺搞的,dds版本很多有些编译器支持这种dds但是不支持其他dds。有些时候有些数据结构他最新的,自己版本的编译器又不支持。升级上去,可能自己的代码有些编译就会报错。建议用稳定的就行了别折腾了。
下面的两种,在下文中会有提及,这里就不展开讲了。
类型描述
2.1. 类型描述
类型描述定义开发语言无关的各种类型的语言以及结构,具体包含的类型参见上图,协议中规定DDS主题能够关联的数据类型只包括:结构体struct以及联合体union,其他类型则作为这两种聚合类型的成员。
除了常规的类型/成员定义外,类型系统中还为类型或者成员添加了一些标签来提供额外的信息,常见的几个标签参见下表。
标签 | 作用对象 | 说明 |
---|---|---|
Extensibility | 类型 | 用于表明该类型的可扩展性,详见2.2. |
Nested | 类型 | 是否直接关联到DDS主题 |
key | 成员 | 表明成员是否为键值 |
optional | 成员 | 表明成员是否为可选 |
id | 成员 | 指定成员的唯一ID |
bound | string/sequence/map成员 | 表明变长结构的长度上界,主要用于空间管理 |
在我前面的例子中我们可以看到我只写了拓展性,因为这些其实都不是必填的,他们都是一些可选条件。如果我们要加上限制的话,我们可以这样写,看实际需要来写吧。
module state_and_error {
@extensibility(APPENDABLE)
struct State {
@key int32 status; // `status` 作为唯一标识
@id(1) double current_x;
@id(2) double current_y;
@id(3) double current_theta;
@optional double linear_velocity; // 这个成员是可选的
@optional double angular_velocity; // 这个成员也是可选的
@bound(255) string feedback_message; // 限制字符串最大长度为 255
};
};
下面来详细讲一下,这些标签。
Nested
他是一个类型标签(Annotation),它用于指示该类型是否可以直接用作 DDS 主题(Topic),或者它是否只能作为其他数据类型的成员。如果一个struct或union被标记为 @Nested,它不能直接作为 DDS 主题(Topic)发布或订阅,只能作为其他 struct 的成员来使用了。如果不加@Nested,默认情况下struct可以直接作为 DDS 主题使用。以下是代码案例:
struct Position {
double x;
double y;
double z;
};
@Nested
struct State {
int32 status;
Position pos; // `@Nested` 使 `State` 只能作为 `struct` 的成员
};
key
在 DDS 里,DDS 通过@key识别数据实例(Instance),@key
相同的数据会被认为是同一个对象,可以更新,不是新的消息。如果你不加@key,DDS 认为你的数据是无状态的消息流(类似于 UDP 广播),而如果你加了@key,DDS 就会把数据当作唯一标识的实例(类似数据库的主键)。这句话怎么理解呢?当没有加@key的时候:
struct SensorData {
int32 id;
double temperature;
};
DDS 认为所有SensorData消息是“独立的消息流”,不会追踪id是否重复。每个消息就像 UDP 广播,没有“实例管理”机制,接收方无法分辨两个数据是否属于同一个传感器。
加@key:
struct SensorData {
@key int32 id; // 传感器的唯一标识
double temperature;
};
DDS 现在认为id相同的数据是同一个“实例”,它会:
- 缓存最后一次收到的
id = 1
的数据(类似数据库的UPDATE
)。也就是说如果State
结构体有 @keyid
,那么 DDS 会按id
分别存储不同的实例。如果 DDS 订阅者(Subscriber)已经收到id = 1
的数据,再次收到id = 1
的新数据时,DDS 只会 更新id = 1
的数据,不会新增新的条目。 - 自动删除旧数据(可以配置数据历史策略)。DDS 允许你配置“数据历史策略”(History QoS),决定保留多少条历史记录。如果配置KEEP_LAST(1) DDS 只会保存每个id的最新数据,旧数据会自动被删除。如果配置KEEP_ALL DDS 会保留所有历史数据,不删除。
- 允许
QueryCondition
进行实例查询,比如“只订阅id = 2
的数据”。
这里展示一下怎么配置只保留最新的
DataReaderQos qos;
qos.history().kind = KEEP_LAST_HISTORY_QOS;
qos.history().depth = 1; // 只保留最新的一条数据
reader->set_qos(qos);
为什么 @key 重要?
如果你加了@key,DDS 知道哪些数据属于同一个实例,可以做增量更新,而不是简单的消息广播。这句话就是说
- 如果你加了@key,DDS 就会按照 key(通常是
id
)来管理数据。 - @key 让 DDS 认为
id
相同的数据是同一个对象的“状态更新”,可以进行增量更新(类似数据库的UPDATA)。
如果你不加@key,每个消息都是“独立的”,无法做基于 ID 的筛选、历史记录管理或 QoS 策略。但是如果@key类型相同,其他类型不同,如果拓展性没有设置mutable那么就会报错。
id
用于mutable可扩展性模式,确保新旧版本字段顺序不同也能正确解析数据。不会影响实例管理。如果不加@id,DDS 解析数据时只能按字段顺序匹配,无法正确解析字段新增、删除或重排的情况。这句话怎么理解呢?因为拓展性的mutable允许添加新的数据,那么就需要@id确保新旧版本的数据结构,即使字段顺序不同,DDS 仍然可以正确解析,而不会误解数据格式。如果不加 @id,DDS 只能按照字段的顺序解析数据,这意味着:如果字段的顺序改变,旧版本可能解析错字段,导致数据错误。如果字段被删除或新增,旧版本可能会崩溃或丢弃数据。这样,即使新版本的数据结构发生了变化,旧版本仍然可以解析它能识别的字段,不会因字段顺序变化而导致错误!
举个例子
//旧数据
@extensibility(MUTABLE)
struct State {
int32 status;
double x;
double y;
};
//新数据
@extensibility(MUTABLE)
struct State {
int32 status;
double y; // ⚠️ 位置发生变化!
double x; // ⚠️ 位置发生变化!
};
这样就会出问题,但是如果加了@id呢?
@extensibility(MUTABLE)
struct State {
@id(1) int32 status;
@id(2) double x;
@id(3) double y;
};
@extensibility(MUTABLE)
struct State {
@id(1) int32 status;
@id(3) double y; // 位置变化了,但 `@id(3)` 让 DDS 知道它是 `y`
@id(2) double x; // 位置变化了,但 `@id(2)` 让 DDS 知道它是 `x`
};
这样就没问题了
optional
他是在旧版本里面使用的,但是现在有拓展性的mutable,就没那么重要了。但是如果某个字段在新版本中可能为空,但旧版本的解析器不允许null值,optional让新系统的发布者可以选择是否发送该字段,避免影响旧系统。optional允许你在不影响旧版本的情况下逐步添加新功能。也就是说大部分时间是没用的。
Extensibility
这一部分hzy大佬讲的非常详细,引用他的原文即可。需要了解更多dds知识的可以去上面博客去看看原博客,写的很不错。但是注意大佬写的是DDS规范,规范是一个宽泛的概念,各版本的dds具体实现可能略有不同。
DDS可扩展性分为3种,详见下表,为什么取名叫“类型演进”,因为基于APPENDABLE/MUTABLE可扩展性类型,原有系统无需做任何的代码、配置的修改,即可与新的系统(使用迭代后的新的数据类型)进行数据交互。
可扩展性 | 说明 |
---|---|
FINAL | 不可扩展,类型结构必须完全一致才能相互交换数据,用于保护已有系统。 |
APPENDABLE | 可追加,这种类型是默认的类型,新的类型是基于老的类型在后面添加成员得到,这种模式下新老数据结构关联的主题能够相互交换数据。 |
MUTABLE | 可随意变换,新的类型可将老的类型重新排序组合以及添加新的成员得到,这种模式下新老数据结构关联的主题能够相互交换数据。 |
FINAL可扩展性示意图
上图中下面蓝色部分代表已有运行系统,上面的橙色部分代表新建的系统,新建的发布/订阅应用将位置信息从原有的2个坐标修改为3个坐标,此时由于原有系统设置为FINAL的保护状态,新的应用无法集成到老的系统中去。
APPENDABLE可扩展性示意图
上图中下面蓝色部分代表已有运行系统,上面的橙色部分代表新建的系统,新建的发布/订阅应用将位置信息从原有的2个坐标修改为3个坐标,此时由于类型系统设置为APPENDABLE可扩展状态,老的应用不修改任何的配置以及代码,即可把新的发布/订阅应用集成到原有的系统中,老的订阅者(右下)将接收到新的发布者发布的数据,其中多出的z成员将被忽略,而新的订阅者应用(左上)将接收到老的发布端者发布的数据,其中缺少的z成员将赋予默认的值。
MUABLE可扩展性示意图
上图中下面蓝色部分代表已有运行系统,上面的橙色部分代表新建的系统,新建的发布/订阅应用将位置信息将原有的x、y坐标打乱并在中间插入一个新的成员z,此时由于类型系统设置为MUTABLE可扩展状态,老的应用不修改任何的配置以及代码,即可把新的发布/订阅应用集成到原有的系统中,老的订阅者(右下)将接收到新的发布者发布的数据,其中多出的z成员将被忽略,而新的订阅者应用(左上)将接收到老的发布端者发布的数据,其中缺少的z成员将赋予默认的值。
介绍到这里可能会产生一个疑问:既然能够支持MUTABLE类型,那所有的类型都设计成可变的类型,系统的可扩展性不就可以得到保证吗,为什么还需要支持前面两个类型?答案总结在下面的这张不同类型的优劣势中,不同类型可扩展性实现的关键技术在数据序列化中介绍。
可扩展性 | 优势 | 劣势 |
---|---|---|
FINAL | 1、首先是安全,类似于Java里面把一个类声明为final禁止其他类型继承扩展;2、固定结构下数据序列化/反序列化效率高 | 无可扩展性 |
MUTABLE | 具备很好的可扩展性 | 结构可变带来底层序列化/反序列化需要携带更多的额外信息,导致效率变低 |
APPENDABLE | 1、具备一定的可扩展性;2、接近于固定结构序列化/反序列化效率高 | 可扩展性有限 |
基础类型
idl和C++用的基本类型差不多:
类型 | 描述 | 示例 |
---|---|---|
boolean |
布尔值(true 或 false ) |
boolean is_active; |
char |
单个字符(ASCII) | char letter; |
octet |
8-bit 无符号整数 | octet small_value; |
int8 |
8-bit 有符号整数 | int8 small_number; |
uint8 |
8-bit 无符号整数 | uint8 small_number; |
int16 |
16-bit 有符号整数 | int16 medium_number; |
uint16 |
16-bit 无符号整数 | uint16 medium_number; |
int32 |
32-bit 有符号整数 | int32 large_number; |
uint32 |
32-bit 无符号整数 | uint32 large_number; |
int64 |
64-bit 有符号整数 | int64 very_large_number; |
uint64 |
64-bit 无符号整数 | uint64 very_large_number; |
float |
32-bit 单精度浮点数 | float temperature; |
double |
64-bit 双精度浮点数 | double precise_value; |
interface
interface用于 DDS RPC(远程过程调用),类似 ROS 的 Service。后面会详细介绍
容器
FastDDS 的 xtypes
支持容器类型(Collection Types),包括:
sequence<T>
(可变长度序列)array<T, N>
(固定大小数组)map<K, V, N>
(键值对映射,部分 DDS 实现支持)
例子如下:
struct SensorReadings {
sequence<float, 10> temperatures; // 最多存储 10 个温度值
};
SensorReadings data;
data.temperatures().resize(5); // 运行时调整大小
struct Position {
array<float, 3> coordinates; // 3D 坐标 (x, y, z)
};
Position pos;
pos.coordinates()[0] = 1.0;
pos.coordinates()[1] = 2.0;
pos.coordinates()[2] = 3.0;
struct SensorMapping {
map<string<10>, float, 5> sensor_data; // 最多存储 5 个传感器数据
};
SensorMapping mapping;
mapping.sensor_data()["temperature"] = 36.5;
mapping.sensor_data()["humidity"] = 45.0;
使用流程
1.发布订阅模式
我们先写idl文件,然后进入fastddsgen文件夹。
运行命令
./fastddsgen -language C++ path/to/xxx.idl -d path/to/output/
2.命令详细说明
参数 | 作用 | 示例 |
---|---|---|
-language C++ |
指定生成 C++ 代码(默认是 C++) | ./fastddsgen -language C++ xxx.idl |
-d <output_path> |
指定输出目录 | ./fastddsgen -d /home/user/generated_code xxx.idl |
-replace |
覆盖旧文件,重新生成代码 | ./fastddsgen -replace xxx.idl |
-example <OS> |
生成完整示例(可选 Linux, Windows, Mac) | ./fastddsgen -example Linux xxx.idl |
-help |
显示帮助信息 | ./fastddsgen -help |
他会生成一系列代码。在写发布者订阅者的时候,需要.hpp文件与xxxPubSubTypes.hpp.首先需要注册类型,这里就是注册给cdr序列化协议的。
TypeSupport type_support(new Destination::Destination_sitePubSubType());
participant->register_type(type_support);
之后就可以定义数据结构了
Destination::Destination_site data;
data.x(0);
data.y(0);
3.RPC模式
这里先写一个demo
module robot_control {
interface RobotService {
string get_status();
boolean move_to(in double x, in double y, out string response_msg);
};
};
然后我们用
fastddsgen -example C++ robot_service.idl -d /home/user/generated_code/
注意一下,有些版本他是用fastrpcgen来编译idl,需要注意一下。
他会生成
robot_controlRobotServiceProxy.hpp // RPC 客户端(Proxy)
robot_controlRobotServiceServer.hpp // RPC 服务端(Server)
robot_controlRobotServiceImpl.hpp // 需要用户实现的服务逻辑
robot_controlRobotService.cxx // FastDDS RPC 底层实现
robot_controlRobotServicePubSubTypes.hpp // 数据类型支持
server
#include "robot_controlRobotServiceServer.hpp"
class RobotServiceImpl : public robot_control::RobotServiceServer
{
public:
// 实现 get_status() 方法
void get_status(::eprosima::fastdds::dds::StringType& _return) override
{
_return = "Robot is running"; // 返回状态信息
}
// 实现 move_to() 方法,返回是否移动成功
bool move_to(double x, double y, ::eprosima::fastdds::dds::StringType& response_msg) override
{
std::cout << "Moving to: (" << x << ", " << y << ")" << std::endl;
if (x >= 0 && y >= 0) // 只允许正坐标
{
response_msg = "Move successful!";
return true; // 移动成功
}
else
{
response_msg = "Invalid target position.";
return false; // 移动失败
}
}
};
int main()
{
RobotServiceImpl robot_service;
if (robot_service.run())
{
std::cout << "RPC Server is running..." << std::endl;
while (true) { } // 保持运行
}
return 0;
}
在 FastDDS RPC 生成的 C++ 代码中,IDL 里定义的返回值 在生成的 C++ 代码中会 被转换为void,并使用 out 参数_return 传递结果。这是 FastDDS RPC 代码生成的特性,用于避免额外的拷贝,提高性能。
client
#include "robot_controlRobotServiceProxy.hpp"
int main()
{
robot_control::RobotServiceProxy client;
if (client.run())
{
std::cout << "Connected to RPC Server!" << std::endl;
// 远程调用 get_status()
eprosima::fastdds::dds::StringType status;
client.get_status(status);
std::cout << "Robot Status: " << status << std::endl;
// 远程调用 move_to(),获取返回值
eprosima::fastdds::dds::StringType response_msg;
bool result = client.move_to(10.5, 20.8, response_msg);
std::cout << "Move Result: " << (result ? "Success" : "Failure") << std::endl;
std::cout << "Server Response: " << response_msg << std::endl;
client.stop();
}
return 0;
}
在RPC模式下你无需创建主题,域参与者,qos之类的。fastddsrpc内部都会帮你搞定,你只要拥有相同的头文件即可。
普通 DDS 需要手动做的事情 | FastDDS RPC 自动管理 |
---|---|
创建 DomainParticipant |
✅ FastDDS 自动创建 |
定义 Topic |
✅ FastDDS 自动创建 |
创建 Publisher 和 Subscriber |
✅ FastDDS 自动创建 |
管理 Request 和 Reply 的序列化 |
✅ FastDDS 自动管理 |
匹配 Client 和 Server 的 Domain ID |
✅ FastDDS 内部处理 |
但与自动管理并不代表你不能设置,比如:
域ID
client.set_domain_id(5); // 修改 Domain ID
server.set_domain_id(5);
QoS
FastDDS 允许你设置 QoS,控制 RPC 的可靠性、历史记录等。例如:
RELIABLE_RELIABILITY_QOS
(可靠传输,确保请求不丢失)KEEP_LAST_HISTORY_QOS
(保留最近的 N 条历史记录)TRANSIENT_LOCAL_DURABILITY_QOS
(即使Server
断开,Client
仍然能获取数据)
eprosima::fastdds::dds::QoSSettings qos;
qos.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS);
qos.history(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS);
client.set_qos(qos);
Transport
默认情况下,FastDDS 使用 UDP 进行通信。如果你想强制使用 TCP,可以这样配置:
eprosima::fastdds::dds::TransportConfig transport;
transport.use_tcp(true);
client.set_transport(transport);
Timeout(RPC 调用超时)
如果Client调用Server超时(Server可能崩溃或网络异常),默认 FastDDS 不会一直等待,可以设置超时时间:
client.set_timeout(std::chrono::milliseconds(5000)); // 5 秒超时
如果 5 秒内 Server没有响应,RPC 调用会失败并返回错误。
Topic
client.set_topic_name("MyCustomTopic");
如果你想同时运行多个不同的 RPC 服务,可以用不同的Topic进行隔离
Threading
eprosima::fastdds::dds::ThreadSettings threads;
threads.use_separate_thread(true); // 每个 RPC 请求使用单独线程
client.set_threading(threads);
默认情况下,FastDDS 使用单线程模式,你可以改为多线程,提高吞吐量。如果你的 RPC 请求处理速度较慢,建议开启多线程模式,以支持高并发调用。
FastDDS RPC 可配置参数总结
参数 | 作用 | 示例 |
---|---|---|
Domain ID |
指定 RPC 运行的 DDS 领域 | client.set_domain_id(5); |
QoS |
设置可靠性、持久性 | client.set_qos(qos); |
Transport |
指定 TCP/UDP 传输 | client.set_transport(transport); |
Timeout |
设置调用超时 | client.set_timeout(std::chrono::milliseconds(5000)); |
Topic |
手动指定 Topic 名称 | client.set_topic_name("MyCustomTopic"); |
Threading |
设定是否使用多线程 | client.set_threading(threads); |