第一章 ROS基础
1.1 ROS核心概念
1.1.1 节点(Node)
节点是ROS中执行计算的进程。每个节点都是一个独立的可执行文件,通过ROS通信机制相互连接。
Python示例:
#!/usr/bin/env python
import rospy
def main():
# 初始化节点
rospy.init_node('my_node', anonymous=True)
rospy.loginfo("节点启动成功")
# 保持节点运行
rospy.spin()
if __name__ == '__main__':
main()
C++示例:
#include <ros/ros.h>
int main(int argc, char **argv) {
// 初始化节点
ros::init(argc, argv, "my_node");
ros::NodeHandle nh;
ROS_INFO("节点启动成功");
ros::spin();
return 0;
}
1.1.2 话题(Topic)
话题是节点间传递消息的总线,采用发布/订阅模式。多个节点可以同时发布或订阅同一话题。
Publisher示例(Python):
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def talker():
rospy.init_node('talker', anonymous=True)
pub = rospy.Publisher('chatter', String, queue_size=10)
rate = rospy.Rate(10) # 10Hz
while not rospy.is_shutdown():
hello_str = "hello world %s" % rospy.get_time()
rospy.loginfo(hello_str)
pub.publish(hello_str)
rate.sleep()
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
pass
Subscriber示例(Python):
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def callback(data):
rospy.loginfo("接收到数据: %s", data.data)
def listener():
rospy.init_node('listener', anonymous=True)
rospy.Subscriber("chatter", String, callback)
rospy.spin()
if __name__ == '__main__':
listener()
C++版本:
#include <ros/ros.h>
#include <std_msgs/String.h>
void chatterCallback(const std_msgs::String::ConstPtr& msg) {
ROS_INFO("接收到: [%s]", msg->data.c_str());
}
int main(int argc, char **argv) {
ros::init(argc, argv, "listener");
ros::NodeHandle n;
ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);
ros::spin();
return 0;
}
1.1.3 服务(Service)
服务提供请求/响应模式的同步通信机制,适用于偶发的双向通信。
服务定义(AddTwoInts.srv):
int64 a
int64 b
---
int64 sum
服务端(Python):
#!/usr/bin/env python
from rospy_tutorials.srv import AddTwoInts, AddTwoIntsResponse
import rospy
def handle_add_two_ints(req):
result = req.a + req.b
rospy.loginfo("返回 [%s + %s = %s]" % (req.a, req.b, result))
return AddTwoIntsResponse(result)
def add_two_ints_server():
rospy.init_node('add_two_ints_server')
s = rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)
rospy.loginfo("准备添加两个整数")
rospy.spin()
if __name__ == "__main__":
add_two_ints_server()
客户端(Python):
#!/usr/bin/env python
import sys
import rospy
from rospy_tutorials.srv import AddTwoInts
def add_two_ints_client(x, y):
rospy.wait_for_service('add_two_ints')
try:
add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)
resp = add_two_ints(x, y)
return resp.sum
except rospy.ServiceException as e:
print("服务调用失败: %s" % e)
if __name__ == "__main__":
if len(sys.argv) == 3:
x = int(sys.argv[1])
y = int(sys.argv[2])
else:
x, y = 1, 2
result = add_two_ints_client(x, y)
print("%s + %s = %s" % (x, y, result))
1.1.4 动作(Action)
动作用于长时间运行的任务,提供目标、反馈和结果三种消息类型。
Action定义(Fibonacci.action):
# 目标
int32 order
---
# 结果
int32[] sequence
---
# 反馈
int32[] sequence
Action服务端(Python):
#!/usr/bin/env python
import rospy
import actionlib
from actionlib_tutorials.msg import FibonacciAction, FibonacciFeedback, FibonacciResult
class FibonacciActionServer:
def __init__(self):
self.server = actionlib.SimpleActionServer(
'fibonacci', FibonacciAction, self.execute, False)
self.server.start()
def execute(self, goal):
r = rospy.Rate(1)
feedback = FibonacciFeedback()
result = FibonacciResult()
feedback.sequence = [0, 1]
for i in range(1, goal.order):
if self.server.is_preempt_requested():
self.server.set_preempted()
return
feedback.sequence.append(
feedback.sequence[i] + feedback.sequence[i-1])
self.server.publish_feedback(feedback)
r.sleep()
result.sequence = feedback.sequence
self.server.set_succeeded(result)
if __name__ == '__main__':
rospy.init_node('fibonacci_server')
server = FibonacciActionServer()
rospy.spin()
1.1.5 参数服务器
参数服务器用于存储和检索运行时参数。
#!/usr/bin/env python
import rospy
rospy.init_node('param_demo')
# 设置参数
rospy.set_param('robot_name', 'my_robot')
rospy.set_param('max_speed', 1.5)
# 获取参数
robot_name = rospy.get_param('robot_name', 'default_robot')
max_speed = rospy.get_param('max_speed', 1.0)
# 检查参数是否存在
if rospy.has_param('robot_name'):
rospy.loginfo("参数存在: %s", robot_name)
# 删除参数
rospy.delete_param('robot_name')
1.2 工作空间与功能包
1.2.1 创建工作空间
# 创建工作空间
mkdir -p ~/catkin_ws/src
cd ~/catkin_ws/
# 初始化工作空间
catkin_make
# 或使用catkin build(推荐)
sudo apt install python-catkin-tools
catkin build
# 设置环境变量
source devel/setup.bash
echo "source ~/catkin_ws/devel/setup.bash" >> ~/.bashrc
1.2.2 创建功能包
cd ~/catkin_ws/src
catkin_create_pkg my_robot rospy roscpp std_msgs geometry_msgs
package.xml示例:
<?xml version="1.0"?>
<package format="2">
<name>my_robot</name>
<version>0.1.0</version>
<description>机器人控制功能包</description>
<maintainer email="user@example.com">Your Name</maintainer>
<license>MIT</license>
<buildtool_depend>catkin</buildtool_depend>
<depend>rospy</depend>
<depend>roscpp</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
</package>
CMakeLists.txt核心内容:
cmake_minimum_required(VERSION 3.0.2)
project(my_robot)
find_package(catkin REQUIRED COMPONENTS
rospy
roscpp
std_msgs
geometry_msgs
)
catkin_package(
CATKIN_DEPENDS rospy roscpp std_msgs geometry_msgs
)
include_directories(
${catkin_INCLUDE_DIRS}
)
# C++节点
add_executable(my_node src/my_node.cpp)
target_link_libraries(my_node ${catkin_LIBRARIES})
# Python脚本安装
catkin_install_python(PROGRAMS
scripts/my_script.py
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)
1.2.3 自定义消息类型
创建消息文件(msg/CustomMsg.msg):
Header header
string robot_id
float64 speed
float64 angle
int32 battery_level
修改package.xml:
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
修改CMakeLists.txt:
find_package(catkin REQUIRED COMPONENTS
rospy
roscpp
std_msgs
message_generation
)
add_message_files(
FILES
CustomMsg.msg
)
generate_messages(
DEPENDENCIES
std_msgs
)
catkin_package(
CATKIN_DEPENDS message_runtime
)
使用自定义消息:
#!/usr/bin/env python
import rospy
from my_robot.msg import CustomMsg
def publisher():
rospy.init_node('custom_pub')
pub = rospy.Publisher('custom_topic', CustomMsg, queue_size=10)
rate = rospy.Rate(10)
while not rospy.is_shutdown():
msg = CustomMsg()
msg.header.stamp = rospy.Time.now()
msg.robot_id = "robot_01"
msg.speed = 1.5
msg.angle = 0.5
msg.battery_level = 80
pub.publish(msg)
rate.sleep()
if __name__ == '__main__':
publisher()
1.3 常用命令行工具
1.3.1 核心命令
# 节点管理
rosnode list # 列出所有节点
rosnode info /node_name # 查看节点信息
rosnode kill /node_name # 终止节点
# 话题管理
rostopic list # 列出所有话题
rostopic echo /topic_name # 显示话题内容
rostopic info /topic_name # 查看话题信息
rostopic hz /topic_name # 显示发布频率
rostopic pub /topic_name type data # 发布消息
# 服务管理
rosservice list # 列出所有服务
rosservice call /service_name args # 调用服务
rosservice info /service_name # 查看服务信息
# 参数管理
rosparam list # 列出所有参数
rosparam get /param_name # 获取参数值
rosparam set /param_name value # 设置参数值
rosparam dump file.yaml # 导出参数到文件
rosparam load file.yaml # 从文件加载参数
# 消息类型
rosmsg show message_type # 显示消息定义
rosmsg list # 列出所有消息类型
1.4 Launch文件
1.4.1 基础Launch文件
launch/robot.launch:
<launch>
<!-- 参数定义 -->
<arg name="robot_name" default="my_robot"/>
<arg name="use_sim_time" default="false"/>
<!-- 设置参数服务器 -->
<param name="robot_name" value="$(arg robot_name)"/>
<param name="use_sim_time" value="$(arg use_sim_time)"/>
<!-- 加载参数文件 -->
<rosparam file="$(find my_robot)/config/params.yaml" command="load"/>
<!-- 启动节点 -->
<node name="talker" pkg="my_robot" type="talker.py" output="screen"/>
<!-- 节点重映射 -->
<node name="listener" pkg="my_robot" type="listener.py">
<remap from="chatter" to="my_chatter"/>
</node>
<!-- 节点组 -->
<group ns="robot1">
<node name="driver" pkg="my_robot" type="driver" respawn="true"/>
</group>
<!-- Include其他launch文件 -->
<include file="$(find my_robot)/launch/sensors.launch">
<arg name="lidar_enabled" value="true"/>
</include>
</launch>
1.4.2 高级Launch配置
<launch>
<!-- 条件启动 -->
<arg name="enable_camera" default="true"/>
<node if="$(arg enable_camera)" name="camera" pkg="usb_cam" type="usb_cam_node"/>
<!-- 机器分布式启动 -->
<machine name="robot" address="192.168.1.100" user="robot" password="pwd"/>
<node name="controller" pkg="my_robot" type="controller" machine="robot"/>
<!-- 环境变量 -->
<env name="ROSCONSOLE_CONFIG_FILE" value="$(find my_robot)/config/rosconsole.config"/>
<!-- 日志级别 -->
<node name="debug_node" pkg="my_robot" type="node" output="screen"
launch-prefix="gdb -ex run --args"/>
</launch>
1.5 rqt工具集
1.5.1 rqt_graph
查看节点和话题的连接关系:
rosrun rqt_graph rqt_graph
1.5.2 rqt_plot
实时绘制话题数据:
rosrun rqt_plot rqt_plot /turtle1/pose/x /turtle1/pose/y
1.5.3 rqt_console
查看日志信息:
rosrun rqt_console rqt_console
rosrun rqt_logger_level rqt_logger_level
1.5.4 其他常用工具
# rqt完整工具集
rqt
# 图像查看
rosrun rqt_image_view rqt_image_view
# 服务调用工具
rosrun rqt_service_caller rqt_service_caller
# 参数配置
rosrun rqt_reconfigure rqt_reconfigure
1.6 ROS vs ROS2 对比
1.6.1 核心差异
| 特性 | ROS | ROS2 |
|---|---|---|
| 通信机制 | 自定义TCPROS/UDPROS | DDS(数据分发服务) |
| 实时性 | 不支持 | 支持实时系统 |
| 跨平台 | 主要Linux | Linux/Windows/macOS |
| 多机器人 | 需要手动配置 | 原生支持 |
| 安全性 | 无加密 | 支持DDS安全 |
| Python版本 | Python 2.7/3 | Python 3 |
| 生命周期 | 无标准 | 节点生命周期管理 |
1.6.2 DDS通信层
ROS2基于DDS实现发布/订阅模式:
// ROS2 C++ Publisher
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
class MinimalPublisher : public rclcpp::Node {
public:
MinimalPublisher() : Node("minimal_publisher") {
publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
timer_ = this->create_wall_timer(
std::chrono::milliseconds(500),
std::bind(&MinimalPublisher::timer_callback, this));
}
private:
void timer_callback() {
auto message = std_msgs::msg::String();
message.data = "Hello, ROS2!";
publisher_->publish(message);
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr timer_;
};
int main(int argc, char * argv[]) {
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalPublisher>());
rclcpp::shutdown();
return 0;
}
1.6.3 实时性支持
ROS2支持实时控制:
# ROS2 Python示例
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup
class RealtimeNode(Node):
def __init__(self):
super().__init__('realtime_node')
self.cb_group = ReentrantCallbackGroup()
self.timer = self.create_timer(
0.001, # 1kHz
self.timer_callback,
callback_group=self.cb_group
)
def timer_callback(self):
# 实时控制逻辑
pass
def main():
rclpy.init()
node = RealtimeNode()
executor = MultiThreadedExecutor()
executor.add_node(node)
executor.spin()
rclpy.shutdown()
1.7 高频面试题
问题1: ROS的通信机制有哪些?各自的应用场景是什么?
答案:
ROS主要有三种通信机制:
话题(Topic) - 发布/订阅模式
- 异步通信,单向数据流
- 适用场景:传感器数据流、状态信息、持续数据传输
- 特点:多对多通信,发布者和订阅者解耦
服务(Service) - 请求/响应模式
- 同步通信,双向通信
- 适用场景:偶发的短时任务、查询操作、触发式操作
- 特点:一对一通信,客户端等待响应
动作(Action) - 目标/反馈/结果模式
- 异步通信,支持中途取消
- 适用场景:长时间运行的任务、需要反馈的操作
- 特点:提供执行状态、中间反馈、可抢占
选择建议:
- 传感器数据用Topic
- 短时请求用Service
- 导航、抓取等长任务用Action
问题2: 如何调试ROS节点间通信问题?
答案:
调试步骤:
- 检查节点运行状态
rosnode list
rosnode info /node_name
- 检查话题连接
rostopic list
rostopic info /topic_name
rqt_graph # 可视化节点关系
- 监听话题数据
rostopic echo /topic_name
rostopic hz /topic_name # 检查发布频率
- 检查消息类型匹配
rostopic type /topic_name
rosmsg show message_type
- 使用日志调试
rospy.logdebug("调试信息")
rospy.loginfo("普通信息")
rospy.logwarn("警告信息")
rospy.logerr("错误信息")
- 使用rqt_console查看日志
rosrun rqt_console rqt_console
问题3: Launch文件中的remap、param、arg有什么区别?
答案:
remap(重映射)
- 功能:改变话题、服务、参数的名称
- 作用域:仅影响特定节点
- 示例:
<node name="camera" pkg="usb_cam" type="usb_cam_node"> <remap from="image" to="camera/image_raw"/> </node>param(参数)
- 功能:在参数服务器上设置参数
- 作用域:全局可访问
- 示例:
<param name="robot_name" value="my_robot"/> <param name="max_speed" value="1.5"/>arg(参数)
- 功能:launch文件内部变量
- 作用域:仅在launch文件内有效,不上传到参数服务器
- 示例:
<arg name="use_sim" default="false"/> <param name="use_sim_time" value="$(arg use_sim)"/>
核心区别:
- arg是launch文件变量,不可在节点中访问
- param是ROS参数,可通过rospy.get_param()访问
- remap改变话题名称映射关系
问题4: 自定义消息类型的完整流程是什么?
答案:
完整流程:
- 创建msg文件
mkdir msg
# 创建 msg/RobotStatus.msg
- 定义消息内容
Header header
string robot_id
float64 battery_voltage
float64 speed
geometry_msgs/Pose pose
- 修改package.xml
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
- 修改CMakeLists.txt
find_package(catkin REQUIRED COMPONENTS
rospy
std_msgs
geometry_msgs
message_generation
)
add_message_files(
FILES
RobotStatus.msg
)
generate_messages(
DEPENDENCIES
std_msgs
geometry_msgs
)
catkin_package(
CATKIN_DEPENDS message_runtime
)
- 编译
cd ~/catkin_ws
catkin_make
source devel/setup.bash
- 验证
rosmsg show my_robot/RobotStatus
- 使用
from my_robot.msg import RobotStatus
msg = RobotStatus()
msg.robot_id = "robot_01"
问题5: ROS2相比ROS有哪些重要改进?
答案:
主要改进:
通信层改进
- 使用DDS替代自定义协议
- 支持多种DDS实现(FastDDS、CycloneDDS等)
- 更好的服务质量(QoS)控制
实时性支持
- 支持实时操作系统(RTOS)
- 确定性执行
- 更低的延迟
安全性增强
- DDS安全插件支持
- 认证和加密
- 访问控制
跨平台
- 原生支持Windows、macOS
- 更好的嵌入式支持
节点生命周期
- 标准化的启动/关闭流程
- 状态管理(Configuring、Active、Inactive等)
组件化
- 支持节点组合
- 更灵活的部署
Python 3支持
- 放弃Python 2.7
- 现代化的Python开发
迁移建议:
- 新项目优先考虑ROS2
- 现有项目可以使用ros1_bridge桥接
- 实时控制、多机器人项目强烈推荐ROS2