HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 技术面试完全指南

    • 技术面试完全指南
    • 8年面试官告诉你:90%的简历在第一轮就被刷掉了
    • 刷了500道LeetCode,终于明白大厂算法面试到底考什么
    • 高频算法题精讲-双指针与滑动窗口
    • 03-高频算法题精讲-二分查找与排序
    • 04-高频算法题精讲-树与递归
    • 05-高频算法题精讲-图与拓扑排序
    • 06-高频算法题精讲-动态规划
    • Go面试必问:一道GMP问题,干掉90%的候选人
    • 08-数据库面试高频题
    • 09-分布式系统面试题
    • 10-Kubernetes与云原生面试题
    • 11-系统设计面试方法论
    • 前端面试高频题
    • AI 与机器学习面试题
    • 行为面试与软技能

第一章 ROS基础

1.1 ROS核心概念

1.1.1 节点(Node)

节点是ROS中执行计算的进程。每个节点都是一个独立的可执行文件,通过ROS通信机制相互连接。

Python示例:

#!/usr/bin/env python
import rospy

def main():
    # 初始化节点
    rospy.init_node('my_node', anonymous=True)
    rospy.loginfo("节点启动成功")

    # 保持节点运行
    rospy.spin()

if __name__ == '__main__':
    main()

C++示例:

#include <ros/ros.h>

int main(int argc, char **argv) {
    // 初始化节点
    ros::init(argc, argv, "my_node");
    ros::NodeHandle nh;

    ROS_INFO("节点启动成功");
    ros::spin();
    return 0;
}

1.1.2 话题(Topic)

话题是节点间传递消息的总线,采用发布/订阅模式。多个节点可以同时发布或订阅同一话题。

Publisher示例(Python):

#!/usr/bin/env python
import rospy
from std_msgs.msg import String

def talker():
    rospy.init_node('talker', anonymous=True)
    pub = rospy.Publisher('chatter', String, queue_size=10)
    rate = rospy.Rate(10)  # 10Hz

    while not rospy.is_shutdown():
        hello_str = "hello world %s" % rospy.get_time()
        rospy.loginfo(hello_str)
        pub.publish(hello_str)
        rate.sleep()

if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        pass

Subscriber示例(Python):

#!/usr/bin/env python
import rospy
from std_msgs.msg import String

def callback(data):
    rospy.loginfo("接收到数据: %s", data.data)

def listener():
    rospy.init_node('listener', anonymous=True)
    rospy.Subscriber("chatter", String, callback)
    rospy.spin()

if __name__ == '__main__':
    listener()

C++版本:

#include <ros/ros.h>
#include <std_msgs/String.h>

void chatterCallback(const std_msgs::String::ConstPtr& msg) {
    ROS_INFO("接收到: [%s]", msg->data.c_str());
}

int main(int argc, char **argv) {
    ros::init(argc, argv, "listener");
    ros::NodeHandle n;

    ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);
    ros::spin();
    return 0;
}

1.1.3 服务(Service)

服务提供请求/响应模式的同步通信机制,适用于偶发的双向通信。

服务定义(AddTwoInts.srv):

int64 a
int64 b
---
int64 sum

服务端(Python):

#!/usr/bin/env python
from rospy_tutorials.srv import AddTwoInts, AddTwoIntsResponse
import rospy

def handle_add_two_ints(req):
    result = req.a + req.b
    rospy.loginfo("返回 [%s + %s = %s]" % (req.a, req.b, result))
    return AddTwoIntsResponse(result)

def add_two_ints_server():
    rospy.init_node('add_two_ints_server')
    s = rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)
    rospy.loginfo("准备添加两个整数")
    rospy.spin()

if __name__ == "__main__":
    add_two_ints_server()

客户端(Python):

#!/usr/bin/env python
import sys
import rospy
from rospy_tutorials.srv import AddTwoInts

def add_two_ints_client(x, y):
    rospy.wait_for_service('add_two_ints')
    try:
        add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)
        resp = add_two_ints(x, y)
        return resp.sum
    except rospy.ServiceException as e:
        print("服务调用失败: %s" % e)

if __name__ == "__main__":
    if len(sys.argv) == 3:
        x = int(sys.argv[1])
        y = int(sys.argv[2])
    else:
        x, y = 1, 2

    result = add_two_ints_client(x, y)
    print("%s + %s = %s" % (x, y, result))

1.1.4 动作(Action)

动作用于长时间运行的任务,提供目标、反馈和结果三种消息类型。

Action定义(Fibonacci.action):

# 目标
int32 order
---
# 结果
int32[] sequence
---
# 反馈
int32[] sequence

Action服务端(Python):

#!/usr/bin/env python
import rospy
import actionlib
from actionlib_tutorials.msg import FibonacciAction, FibonacciFeedback, FibonacciResult

class FibonacciActionServer:
    def __init__(self):
        self.server = actionlib.SimpleActionServer(
            'fibonacci', FibonacciAction, self.execute, False)
        self.server.start()

    def execute(self, goal):
        r = rospy.Rate(1)
        feedback = FibonacciFeedback()
        result = FibonacciResult()

        feedback.sequence = [0, 1]

        for i in range(1, goal.order):
            if self.server.is_preempt_requested():
                self.server.set_preempted()
                return

            feedback.sequence.append(
                feedback.sequence[i] + feedback.sequence[i-1])
            self.server.publish_feedback(feedback)
            r.sleep()

        result.sequence = feedback.sequence
        self.server.set_succeeded(result)

if __name__ == '__main__':
    rospy.init_node('fibonacci_server')
    server = FibonacciActionServer()
    rospy.spin()

1.1.5 参数服务器

参数服务器用于存储和检索运行时参数。

#!/usr/bin/env python
import rospy

rospy.init_node('param_demo')

# 设置参数
rospy.set_param('robot_name', 'my_robot')
rospy.set_param('max_speed', 1.5)

# 获取参数
robot_name = rospy.get_param('robot_name', 'default_robot')
max_speed = rospy.get_param('max_speed', 1.0)

# 检查参数是否存在
if rospy.has_param('robot_name'):
    rospy.loginfo("参数存在: %s", robot_name)

# 删除参数
rospy.delete_param('robot_name')

1.2 工作空间与功能包

1.2.1 创建工作空间

# 创建工作空间
mkdir -p ~/catkin_ws/src
cd ~/catkin_ws/

# 初始化工作空间
catkin_make

# 或使用catkin build(推荐)
sudo apt install python-catkin-tools
catkin build

# 设置环境变量
source devel/setup.bash
echo "source ~/catkin_ws/devel/setup.bash" >> ~/.bashrc

1.2.2 创建功能包

cd ~/catkin_ws/src
catkin_create_pkg my_robot rospy roscpp std_msgs geometry_msgs

package.xml示例:

<?xml version="1.0"?>
<package format="2">
  <name>my_robot</name>
  <version>0.1.0</version>
  <description>机器人控制功能包</description>
  <maintainer email="user@example.com">Your Name</maintainer>
  <license>MIT</license>

  <buildtool_depend>catkin</buildtool_depend>

  <depend>rospy</depend>
  <depend>roscpp</depend>
  <depend>std_msgs</depend>
  <depend>geometry_msgs</depend>
  <depend>sensor_msgs</depend>
</package>

CMakeLists.txt核心内容:

cmake_minimum_required(VERSION 3.0.2)
project(my_robot)

find_package(catkin REQUIRED COMPONENTS
  rospy
  roscpp
  std_msgs
  geometry_msgs
)

catkin_package(
  CATKIN_DEPENDS rospy roscpp std_msgs geometry_msgs
)

include_directories(
  ${catkin_INCLUDE_DIRS}
)

# C++节点
add_executable(my_node src/my_node.cpp)
target_link_libraries(my_node ${catkin_LIBRARIES})

# Python脚本安装
catkin_install_python(PROGRAMS
  scripts/my_script.py
  DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)

1.2.3 自定义消息类型

创建消息文件(msg/CustomMsg.msg):

Header header
string robot_id
float64 speed
float64 angle
int32 battery_level

修改package.xml:

<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>

修改CMakeLists.txt:

find_package(catkin REQUIRED COMPONENTS
  rospy
  roscpp
  std_msgs
  message_generation
)

add_message_files(
  FILES
  CustomMsg.msg
)

generate_messages(
  DEPENDENCIES
  std_msgs
)

catkin_package(
  CATKIN_DEPENDS message_runtime
)

使用自定义消息:

#!/usr/bin/env python
import rospy
from my_robot.msg import CustomMsg

def publisher():
    rospy.init_node('custom_pub')
    pub = rospy.Publisher('custom_topic', CustomMsg, queue_size=10)
    rate = rospy.Rate(10)

    while not rospy.is_shutdown():
        msg = CustomMsg()
        msg.header.stamp = rospy.Time.now()
        msg.robot_id = "robot_01"
        msg.speed = 1.5
        msg.angle = 0.5
        msg.battery_level = 80

        pub.publish(msg)
        rate.sleep()

if __name__ == '__main__':
    publisher()

1.3 常用命令行工具

1.3.1 核心命令

# 节点管理
rosnode list                    # 列出所有节点
rosnode info /node_name         # 查看节点信息
rosnode kill /node_name         # 终止节点

# 话题管理
rostopic list                   # 列出所有话题
rostopic echo /topic_name       # 显示话题内容
rostopic info /topic_name       # 查看话题信息
rostopic hz /topic_name         # 显示发布频率
rostopic pub /topic_name type data  # 发布消息

# 服务管理
rosservice list                 # 列出所有服务
rosservice call /service_name args  # 调用服务
rosservice info /service_name   # 查看服务信息

# 参数管理
rosparam list                   # 列出所有参数
rosparam get /param_name        # 获取参数值
rosparam set /param_name value  # 设置参数值
rosparam dump file.yaml         # 导出参数到文件
rosparam load file.yaml         # 从文件加载参数

# 消息类型
rosmsg show message_type        # 显示消息定义
rosmsg list                     # 列出所有消息类型

1.4 Launch文件

1.4.1 基础Launch文件

launch/robot.launch:

<launch>
  <!-- 参数定义 -->
  <arg name="robot_name" default="my_robot"/>
  <arg name="use_sim_time" default="false"/>

  <!-- 设置参数服务器 -->
  <param name="robot_name" value="$(arg robot_name)"/>
  <param name="use_sim_time" value="$(arg use_sim_time)"/>

  <!-- 加载参数文件 -->
  <rosparam file="$(find my_robot)/config/params.yaml" command="load"/>

  <!-- 启动节点 -->
  <node name="talker" pkg="my_robot" type="talker.py" output="screen"/>

  <!-- 节点重映射 -->
  <node name="listener" pkg="my_robot" type="listener.py">
    <remap from="chatter" to="my_chatter"/>
  </node>

  <!-- 节点组 -->
  <group ns="robot1">
    <node name="driver" pkg="my_robot" type="driver" respawn="true"/>
  </group>

  <!-- Include其他launch文件 -->
  <include file="$(find my_robot)/launch/sensors.launch">
    <arg name="lidar_enabled" value="true"/>
  </include>
</launch>

1.4.2 高级Launch配置

<launch>
  <!-- 条件启动 -->
  <arg name="enable_camera" default="true"/>
  <node if="$(arg enable_camera)" name="camera" pkg="usb_cam" type="usb_cam_node"/>

  <!-- 机器分布式启动 -->
  <machine name="robot" address="192.168.1.100" user="robot" password="pwd"/>
  <node name="controller" pkg="my_robot" type="controller" machine="robot"/>

  <!-- 环境变量 -->
  <env name="ROSCONSOLE_CONFIG_FILE" value="$(find my_robot)/config/rosconsole.config"/>

  <!-- 日志级别 -->
  <node name="debug_node" pkg="my_robot" type="node" output="screen"
        launch-prefix="gdb -ex run --args"/>
</launch>

1.5 rqt工具集

1.5.1 rqt_graph

查看节点和话题的连接关系:

rosrun rqt_graph rqt_graph

1.5.2 rqt_plot

实时绘制话题数据:

rosrun rqt_plot rqt_plot /turtle1/pose/x /turtle1/pose/y

1.5.3 rqt_console

查看日志信息:

rosrun rqt_console rqt_console
rosrun rqt_logger_level rqt_logger_level

1.5.4 其他常用工具

# rqt完整工具集
rqt

# 图像查看
rosrun rqt_image_view rqt_image_view

# 服务调用工具
rosrun rqt_service_caller rqt_service_caller

# 参数配置
rosrun rqt_reconfigure rqt_reconfigure

1.6 ROS vs ROS2 对比

1.6.1 核心差异

特性ROSROS2
通信机制自定义TCPROS/UDPROSDDS(数据分发服务)
实时性不支持支持实时系统
跨平台主要LinuxLinux/Windows/macOS
多机器人需要手动配置原生支持
安全性无加密支持DDS安全
Python版本Python 2.7/3Python 3
生命周期无标准节点生命周期管理

1.6.2 DDS通信层

ROS2基于DDS实现发布/订阅模式:

// ROS2 C++ Publisher
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>

class MinimalPublisher : public rclcpp::Node {
public:
    MinimalPublisher() : Node("minimal_publisher") {
        publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
        timer_ = this->create_wall_timer(
            std::chrono::milliseconds(500),
            std::bind(&MinimalPublisher::timer_callback, this));
    }

private:
    void timer_callback() {
        auto message = std_msgs::msg::String();
        message.data = "Hello, ROS2!";
        publisher_->publish(message);
    }

    rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
    rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char * argv[]) {
    rclcpp::init(argc, argv);
    rclcpp::spin(std::make_shared<MinimalPublisher>());
    rclcpp::shutdown();
    return 0;
}

1.6.3 实时性支持

ROS2支持实时控制:

# ROS2 Python示例
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup

class RealtimeNode(Node):
    def __init__(self):
        super().__init__('realtime_node')
        self.cb_group = ReentrantCallbackGroup()

        self.timer = self.create_timer(
            0.001,  # 1kHz
            self.timer_callback,
            callback_group=self.cb_group
        )

    def timer_callback(self):
        # 实时控制逻辑
        pass

def main():
    rclpy.init()
    node = RealtimeNode()
    executor = MultiThreadedExecutor()
    executor.add_node(node)
    executor.spin()
    rclpy.shutdown()

1.7 高频面试题

问题1: ROS的通信机制有哪些?各自的应用场景是什么?

答案:

ROS主要有三种通信机制:

  1. 话题(Topic) - 发布/订阅模式

    • 异步通信,单向数据流
    • 适用场景:传感器数据流、状态信息、持续数据传输
    • 特点:多对多通信,发布者和订阅者解耦
  2. 服务(Service) - 请求/响应模式

    • 同步通信,双向通信
    • 适用场景:偶发的短时任务、查询操作、触发式操作
    • 特点:一对一通信,客户端等待响应
  3. 动作(Action) - 目标/反馈/结果模式

    • 异步通信,支持中途取消
    • 适用场景:长时间运行的任务、需要反馈的操作
    • 特点:提供执行状态、中间反馈、可抢占

选择建议:

  • 传感器数据用Topic
  • 短时请求用Service
  • 导航、抓取等长任务用Action

问题2: 如何调试ROS节点间通信问题?

答案:

调试步骤:

  1. 检查节点运行状态
rosnode list
rosnode info /node_name
  1. 检查话题连接
rostopic list
rostopic info /topic_name
rqt_graph  # 可视化节点关系
  1. 监听话题数据
rostopic echo /topic_name
rostopic hz /topic_name  # 检查发布频率
  1. 检查消息类型匹配
rostopic type /topic_name
rosmsg show message_type
  1. 使用日志调试
rospy.logdebug("调试信息")
rospy.loginfo("普通信息")
rospy.logwarn("警告信息")
rospy.logerr("错误信息")
  1. 使用rqt_console查看日志
rosrun rqt_console rqt_console

问题3: Launch文件中的remap、param、arg有什么区别?

答案:

  1. remap(重映射)

    • 功能:改变话题、服务、参数的名称
    • 作用域:仅影响特定节点
    • 示例:
    <node name="camera" pkg="usb_cam" type="usb_cam_node">
      <remap from="image" to="camera/image_raw"/>
    </node>
    
  2. param(参数)

    • 功能:在参数服务器上设置参数
    • 作用域:全局可访问
    • 示例:
    <param name="robot_name" value="my_robot"/>
    <param name="max_speed" value="1.5"/>
    
  3. arg(参数)

    • 功能:launch文件内部变量
    • 作用域:仅在launch文件内有效,不上传到参数服务器
    • 示例:
    <arg name="use_sim" default="false"/>
    <param name="use_sim_time" value="$(arg use_sim)"/>
    

核心区别:

  • arg是launch文件变量,不可在节点中访问
  • param是ROS参数,可通过rospy.get_param()访问
  • remap改变话题名称映射关系

问题4: 自定义消息类型的完整流程是什么?

答案:

完整流程:

  1. 创建msg文件
mkdir msg
# 创建 msg/RobotStatus.msg
  1. 定义消息内容
Header header
string robot_id
float64 battery_voltage
float64 speed
geometry_msgs/Pose pose
  1. 修改package.xml
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
  1. 修改CMakeLists.txt
find_package(catkin REQUIRED COMPONENTS
  rospy
  std_msgs
  geometry_msgs
  message_generation
)

add_message_files(
  FILES
  RobotStatus.msg
)

generate_messages(
  DEPENDENCIES
  std_msgs
  geometry_msgs
)

catkin_package(
  CATKIN_DEPENDS message_runtime
)
  1. 编译
cd ~/catkin_ws
catkin_make
source devel/setup.bash
  1. 验证
rosmsg show my_robot/RobotStatus
  1. 使用
from my_robot.msg import RobotStatus

msg = RobotStatus()
msg.robot_id = "robot_01"

问题5: ROS2相比ROS有哪些重要改进?

答案:

主要改进:

  1. 通信层改进

    • 使用DDS替代自定义协议
    • 支持多种DDS实现(FastDDS、CycloneDDS等)
    • 更好的服务质量(QoS)控制
  2. 实时性支持

    • 支持实时操作系统(RTOS)
    • 确定性执行
    • 更低的延迟
  3. 安全性增强

    • DDS安全插件支持
    • 认证和加密
    • 访问控制
  4. 跨平台

    • 原生支持Windows、macOS
    • 更好的嵌入式支持
  5. 节点生命周期

    • 标准化的启动/关闭流程
    • 状态管理(Configuring、Active、Inactive等)
  6. 组件化

    • 支持节点组合
    • 更灵活的部署
  7. Python 3支持

    • 放弃Python 2.7
    • 现代化的Python开发

迁移建议:

  • 新项目优先考虑ROS2
  • 现有项目可以使用ros1_bridge桥接
  • 实时控制、多机器人项目强烈推荐ROS2