本指南将详细介绍如何创建两个 ROS2 Python 节点(一个客户端和一个服务器),通过发布/订阅机制实现相互通信,完成数字计算请求与结果返回的功能。我们将使用 std_msgs/String
消息类型来传递数据。
1. 创建 ROS2 Python 包
首先,使用 ros2 pkg create
命令创建两个独立的 Python 包,分别用于客户端和服务器。
# 创建客户端包
ros2 pkg create calculator_client_py \
--build-type ament_python \
--dependencies rclpy \
--node-name calculator_client_node
# 创建服务器包
ros2 pkg create calculator_server_py \
--build-type ament_python \
--dependencies rclpy \
--node-name calculator_server_node
命令选项说明:
--build-type ament_python
:指定该包为 Python 包,并使用 ament 构建系统。--dependencies rclpy
:声明依赖rclpy
库,这是 Python 版本的 ROS2 客户端库。--node-name
:在包内创建一个名为calculator_client_node.py
或calculator_server_node.py
的基础节点文件。
2. 实现客户端节点 (calculator_client_node.py
)
客户端负责接收用户输入,将计算请求(两个数字和一个运算符)打包成字符串消息,并发布到 calculation_request
话题。同时,它订阅 calculation_response
话题以接收服务器的计算结果。
[calculator_client_node.py]
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class CalculatorClient(Node):
def __init__(self):
super().__init__('calculator_client')
# 创建发布者,发布请求到 'calculation_request' 主题
self.publisher_ = self.create_publisher(String, 'calculation_request', 10)
# 创建订阅者,订阅来自 'calculation_response' 主题的响应
self.subscription = self.create_subscription(
String,
'calculation_response',
self.listener_callback,
10)
self.subscription # 防止未使用变量警告
def listener_callback(self, msg):
# 回调函数:当收到计算结果时被调用
self.get_logger().info(f'Received result: {msg.data}')
def send_request(self, num1, operator, num2):
# 发送请求函数
msg = String()
# 格式: "num1,operator,num2"
msg.data = f"{num1},{operator},{num2}"
self.publisher_.publish(msg)
self.get_logger().info(f'Sent request: {num1} {operator} {num2}')
def main(args=None):
rclpy.init(args=args)
calculator_client = CalculatorClient()
print("Supported operations: +, -, *, /")
try:
while rclpy.ok():
# 获取用户输入
user_input = input("Enter calculation (e.g., '3 + 5') or 'exit' to quit: ")
if user_input.lower() == 'exit':
break
try:
parts = user_input.split()
if len(parts) != 3:
print("Invalid input format. Please use 'number operator number'.")
continue
num1_str, operator, num2_str = parts
# 验证运算符
if operator not in ['+', '-', '*', '/']:
print(f"Unsupported operator '{operator}'. Please use +, -, *, or /.")
continue
# 尝试转换为浮点数
num1 = float(num1_str)
num2 = float(num2_str)
# 发送请求
calculator_client.send_request(num1, operator, num2)
except ValueError:
print("Invalid input. Please ensure numbers are valid.")
continue
# 旋转一次以处理可能的回调
rclpy.spin_once(calculator_client, timeout_sec=0.1)
except KeyboardInterrupt:
print("\nShutting down client...")
finally:
calculator_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
3. 实现服务器节点 (calculator_server_node.py
)
服务器节点订阅 calculation_request
话题。当收到请求时,它解析消息,执行计算,并将结果(或错误信息)发布到 calculation_response
话题。
[calculator_server_node.py]
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class CalculatorServer(Node):
def __init__(self):
super().__init__('calculator_server')
# 创建订阅者,订阅 'calculation_request' 主题的请求
self.subscription = self.create_subscription(
String,
'calculation_request',
self.listener_callback,
10)
self.subscription # 防止未使用变量警告
# 创建发布者,发布响应到 'calculation_response' 主题
self.publisher_ = self.create_publisher(String, 'calculation_response', 10)
def listener_callback(self, msg):
# 回调函数:当收到计算请求时被调用
try:
# 解析收到的数据 "num1,operator,num2"
data_str = msg.data
num1_str, operator, num2_str = data_str.split(',')
num1 = float(num1_str)
num2 = float(num2_str)
# 执行计算
if operator == '+':
result = num1 + num2
elif operator == '-':
result = num1 - num2
elif operator == '*':
result = num1 * num2
elif operator == '/':
if num2 == 0:
raise ZeroDivisionError("Division by zero is not allowed.")
result = num1 / num2
else:
# 这在客户端已经检查过,但作为服务端健壮性考虑
raise ValueError(f"Unsupported operator: {operator}")
# 准备并发布响应
response_msg = String()
response_msg.data = f"{num1} {operator} {num2} = {result}"
self.publisher_.publish(response_msg)
self.get_logger().info(f'Processed request: {num1} {operator} {num2} = {result}')
except ValueError as e:
# 处理解析错误或不支持的运算符
error_msg = String()
error_msg.data = f"Error: Invalid request data '{msg.data}'. Details: {e}"
self.publisher_.publish(error_msg)
self.get_logger().warn(f'Failed to process request: {msg.data}. Error: {e}')
except ZeroDivisionError as e:
# 处理除零错误
error_msg = String()
error_msg.data = f"Error: {e}"
self.publisher_.publish(error_msg)
self.get_logger().warn(f'Calculation error for request: {msg.data}. Error: {e}')
except Exception as e: # 捕获其他未预期的错误
error_msg = String()
error_msg.data = f"Unexpected error occurred: {e}"
self.publisher_.publish(error_msg)
self.get_logger().error(f'Unexpected error processing request: {msg.data}. Error: {e}')
def main(args=None):
rclpy.init(args=args)
calculator_server = CalculatorServer()
try:
print("Calculator server is running... Waiting for requests.")
# 保持节点运行,等待消息
rclpy.spin(calculator_server)
except KeyboardInterrupt:
print("\nShutting down server...")
finally:
calculator_server.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Additional Notes:
- The created node will need manual modification to implement the actual calculator service logic
- After creation, you would typically:
- Edit the node file to implement service callbacks
- Add message/service dependencies if needed
- Build with
colcon build
- Source the workspace
- Run with
ros2 run calculator_server_py calculator_server_node
4. 构建与运行
构建包:
colcon build --packages-select calculator_client_py calculator_server_py
激活环境:
source install/setup.bash
启动节点:
- 在一个终端中启动服务器(必须先启动):
- 在一个终端中启动服务器(必须先启动):
ros2 run calculator_server_py calculator_server_node
- 在另一个终端中启动客户端:
ros2 run calculator_client_py calculator_client_node
交互: 在客户端终端输入类似
3 + 5
的表达式,服务器会计算并将结果返回,客户端会显示结果。
启动客户端Node
启动服务端Node