ros_broidge使用示例
1. 安装与基本通信
sudo apt-get install ros-noetic-rosbridge-*
ros_bridge使用的是websocket的通信机制
# -*-coding:utf-8-*-
import asyncio
import websockets
import json
async def connect_to_ros_bridge():
uri = "ws://127.0.0.1:8080"
async with websockets.connect(uri) as websocket:
# 构造要发送的ROS消息
ros_message = {
"op": "subscribe",
"topic": "/scan"
}
# 发送消息给ROS Bridge
await websocket.send(json.dumps(ros_message))
while True:
# 接收来自ROS Bridge的消息
response = await websocket.recv()
# 处理收到的消息
data = json.loads(response)
# 在这里处理数据,可以根据需要提取信息
print(data)
# 启动连接
asyncio.get_event_loop().run_until_complete(connect_to_ros_bridge())
此处为内网穿透工具的使用 ,将远程的服务器ROS映射到本地端口
2. service方式通信
import asyncio
import websockets
import json
async def call_ros_service():
# 定义ROS Bridge的WebSocket地址
ros_bridge_url = "ws://your_ros_bridge_url"
async with websockets.connect(ros_bridge_url) as websocket:
# 构造要发送的ROS服务请求
ros_service_request = {
"op": "call_service",
"service": "/move_base/NavfnROS/make_plan",
"args": {
"start": {
"header": {
"stamp": {
"secs": 0,
"nsecs": 0
},
"frame_id": "map"
},
"pose": {
"position": {
"x": -1.0,
"y": 2.0,
"z": 0.0
},
"orientation": {
"x": 0.0,
"y": 0.0,
"z": 0.0,
"w": 1.0
}
}
},
"goal": {
"header": {
"stamp": {
"secs": 0,
"nsecs": 0
},
"frame_id": "map"
},
"pose": {
"position": {
"x": 1.0,
"y": 4.0,
"z": 0.0
},
"orientation": {
"x": 0.0,
"y": 0.0,
"z": 0.0,
"w": 1.0
}
}
}
}
}
# 发送服务请求给ROS Bridge
await websocket.send(json.dumps(ros_service_request))
# 接收来自ROS Bridge的服务响应
response = await websocket.recv()
# 处理服务响应
data = json.loads(response)
if data.get("op") == "service_response":
service_response = data.get("values")
print("Service Response:", service_response)
else:
print("Received an unexpected message:", data)
# 启动WebSocket连接和服务调用
asyncio.get_event_loop().run_until_complete(call_ros_service())
拿到了
本文含有隐藏内容,请 开通VIP 后查看