三、python脚本调用moveit api控制机械臂
目录
本文承接此文:
二、mycobot320 pi机械臂gazebo仿真模型加入D435i模型与图像插件,并用opencv做视觉识别
上面两篇文章我们已经完成了gazebo+rvzi联合仿真与D435i视觉识别,理论上我们现在只剩下读取视觉识别结果然后控制机械臂移动到目标位置就完成了。所以这篇文章继续讲述调用moveit api的方法与流程,实现的目标是做机械臂循环扫描的动作。整体项目的思路是机械臂循环扫描,如果扫描到红色物体,则终止扫描动作,开始执行定位动作。所以我们需要运行两个python动作脚本,一个负责循环扫描,一个负责订阅视觉识别的话题,一旦检测到红色色块坐标,立刻终止扫描动作脚本。我们将两个动作脚本分别命名为mycobot_moveit_py_search.py与mycobot_moveit_py_move.py
一、创建功能包
catkin_create_pkg mycobot_moveit_py_ctrl geometry_msgs moveit_commander moveit_msgs rospy std_msgs
mkdir script
touch mycobot_moveit_py_search.py
touch mycobot_moveit_py_move.py
二、编写代码
接下来编写python脚本代码,这里我直接提供,每一句代码的解析都以注释形式写在代码一旁,大家自行学习即可。
#!/usr/bin/env python
import rospy # 导入ROS Python客户端库
import moveit_commander # 导入MoveIt!的Python接口
import geometry_msgs.msg # 导入几何消息类型
from sensor_msgs.msg import JointState # 导入关节状态消息类型
from moveit_commander.conversions import pose_to_list # 导入位姿转换函数
import subprocess # 导入子进程管理模块
joint_velocities = None # 初始化关节速度变量
failure_count = 0 # 初始化失败计数器
def joint_states_callback(msg):
global joint_velocities
joint_velocities = msg.velocity # 更新全局变量joint_velocities为接收到的关节速度
def move_arm_increment(move_group, x_increment, y_increment, z_increment):
global failure_count
end_effector_link = move_group.get_end_effector_link() # 获取末端执行器链接名称
current_pose = move_group.get_current_pose(end_effector_link).pose # 获取当前末端执行器位姿
# 计算新的目标位姿
target_pose = geometry_msgs.msg.Pose()
target_pose.position.x = current_pose.position.x + x_increment
target_pose.position.y = current_pose.position.y + y_increment
target_pose.position.z = current_pose.position.z + z_increment
# 保持当前的姿态
target_pose.orientation = current_pose.orientation
move_group.set_start_state_to_current_state() # 设置起始状态为当前状态
move_group.set_pose_target(target_pose) # 设置目标位姿
while True:
plan = move_group.plan() # 规划路径
if plan[0]:
rospy.loginfo("Plan (pose goal) succeeded")
move_group.execute(plan[1], wait=True) # 执行规划的路径
failure_count = 0 # 重置失败计数器
break
else:
rospy.loginfo("Plan (pose goal) failed")
failure_count += 1
if failure_count >= 3:
rospy.loginfo("Planning failed 3 times, returning to home position and restarting the program.")
move_group.set_named_target("home") # 设置目标为"home"位置
move_group.go(wait=True) # 移动到"home"位置
subprocess.call(["rosrun", "mycobot_moveit_py_crtl", "mycobot_moveit_py_search.py"]) # 重启程序
rospy.signal_shutdown("Restarting the program") # 关闭节点
return
while True:
current_pose = move_group.get_current_pose(end_effector_link).pose # 获取当前末端执行器位姿
if (abs(current_pose.position.x - target_pose.position.x) <= 0.01 and
abs(current_pose.position.y - target_pose.position.y) <= 0.01 and
abs(current_pose.position.z - target_pose.position.z) <= 0.01):
break
else:
rospy.sleep(0.5) # 等待0.5秒
rospy.sleep(1) # 等待1秒
# 检测关节是否静止
while True:
if joint_velocities is not None:
if all(abs(vel) < 0.003 for vel in joint_velocities):
rospy.loginfo("All joints are static")
break
rospy.sleep(0.5) # 等待0.5秒
def main():
moveit_commander.roscpp_initialize([]) # 初始化moveit_commander
rospy.init_node('control_node', anonymous=True) # 初始化ROS节点
group_name = "arm" # 设置机械臂组名
move_group = moveit_commander.MoveGroupCommander(group_name) # 创建MoveGroupCommander对象
reference_frame = "world" # 设置参考坐标系
move_group.set_pose_reference_frame(reference_frame)
move_group.allow_replanning(True) # 允许重新规划
move_group.set_goal_position_tolerance(0.001) # 设置位置容差
move_group.set_goal_orientation_tolerance(0.01) # 设置姿态容差
move_group.set_max_acceleration_scaling_factor(0.1) # 设置最大加速度缩放因子
move_group.set_max_velocity_scaling_factor(0.1) # 设置最大速度缩放因子
move_group.set_planning_time(5.0) # 设置规划时间为5秒
move_group.set_num_planning_attempts(10) # 设置规划尝试次数为10次
# 订阅 /joint_states 主题
rospy.Subscriber('/joint_states', JointState, joint_states_callback)
move_group.set_named_target("home") # 设置目标为"home"位置
move_group.go(wait=True) # 移动到"home"位置
rospy.sleep(1) # 等待1秒
while not rospy.is_shutdown():
x_increment = 0 # 示例增量
y_increment = 0 # 示例增量
z_increment = -0.03 # 示例增量
move_arm_increment(move_group, x_increment, y_increment, z_increment)
x_increment = 0.05 # 示例增量
y_increment = 0 # 示例增量
z_increment = 0 # 示例增量
move_arm_increment(move_group, x_increment, y_increment, z_increment)
x_increment = 0 # 示例增量
y_increment = 0 # 示例增量
z_increment = -0.12 # 示例增量
move_arm_increment(move_group, x_increment, y_increment, z_increment)
x_increment = -0.1 # 示例增量
y_increment = 0 # 示例增量
z_increment = 0 # 示例增量
move_arm_increment(move_group, x_increment, y_increment, z_increment)
x_increment = 0 # 示例增量
y_increment = 0 # 示例增量
z_increment = 0.12 # 示例增量
move_arm_increment(move_group, x_increment, y_increment, z_increment)
x_increment = 0.05 # 示例增量
y_increment = 0 # 示例增量
z_increment = 0 # 示例增量
move_arm_increment(move_group, x_increment, y_increment, z_increment)
x_increment = 0 # 示例增量
y_increment = 0 # 示例增量
z_increment = 0.03 # 示例增量
move_arm_increment(move_group, x_increment, y_increment, z_increment)
move_group.set_named_target("home") # 设置目标为"home"位置
move_group.go(wait=True) # 移动到"home"位置
moveit_commander.roscpp_shutdown() # 关闭moveit_commander
if __name__ == '__main__':
main()
代码整体执行流程:
1.导入必要的ROS和MoveIt!库。
2.初始化全局变量joint_velocities和failure_count。
3. 定义joint_states_callback函数,用于更新关节速度。
4.定义move_arm_increment函数,用于增量移动机械臂,并在规划失败时重置或重启程序。
5.在main函数中:
– 初始化moveit_commander和ROS节点。
– 设置机械臂组和参考坐标系。
– 配置MoveGroupCommander的参数。
– 订阅/joint_states主题以获取关节速度。
– 将机械臂移动到"home"位置。
– 进入一个循环,在循环中增量移动机械臂,并在每次移动后返回"home"位置。
– 在程序结束时关闭moveit_commander。
比较重要的步骤在于每完成一次规划,机械臂到达了目标点后,需要加入目标点与目前点的误差检测、关节静止检测,以保证下一次规划的起始姿态必定与目前姿态相同。这一点在函数move_arm_increment的末尾体现。
#!/usr/bin/env python
import rospy # 导入ROS Python库
from geometry_msgs.msg import PointStamped # 导入PointStamped消息类型
import time # 导入时间库
import subprocess # 导入子进程库
import signal # 导入信号库
import os # 导入操作系统库
import moveit_commander # 导入MoveIt Commander库
import geometry_msgs.msg # 导入geometry_msgs消息库
from sensor_msgs.msg import JointState # 导入JointState消息类型
# 全局变量
joint_velocities = None # 存储关节速度的全局变量
failure_count = 0 # 存储失败计数的全局变量
# 终止mycobot_moveit_py_search.py进程的函数
def terminate_process():
try:
# 查找mycobot_moveit_py_search.py的进程ID
output = subprocess.check_output(["pgrep", "-f", "mycobot_moveit_py_search.py"])
pids = output.decode().strip().split('\n')
for pid in pids:
if pid:
os.kill(int(pid), signal.SIGTERM) # 终止进程
print(f"Terminated process with PID {pid}")
except subprocess.CalledProcessError:
print("No process found with the name mycobot_moveit_py_search.py")
# 自定义函数,处理接收到的xyz增量消息
def custom_function(msg, move_group):
x_increment = msg.point.x # 从消息中提取x增量
y_increment = msg.point.y # 从消息中提取y增量
z_increment = msg.point.z # 从消息中提取z增量
rospy.loginfo(f"Received xyz increments: x={x_increment}, y={y_increment}, z={z_increment}") # 打印xyz增量信息
move_arm_increment(move_group, x_increment, y_increment, z_increment) # 调用move_arm_increment函数
if abs(x_increment) <= 0.01 and abs(y_increment) <= 0.01 and abs(z_increment) <= 0.01:
print("Finsh!, exiting...")
rospy.signal_shutdown("Condition met") # 满足条件时关闭节点
# 主监听函数
def listener():
rospy.init_node('terminate_node', anonymous=True) # 初始化ROS节点
moveit_commander.roscpp_initialize([]) # 初始化moveit_commander
group_name = "arm"
move_group = moveit_commander.MoveGroupCommander(group_name) # 创建MoveGroupCommander对象
rospy.Subscriber('/joint_states', JointState, joint_states_callback) # 订阅 /joint_states 主题
while not rospy.is_shutdown():
try:
msg = rospy.wait_for_message('/red_object_center', PointStamped, timeout=None) # 等待 /red_object_center 主题的消息
terminate_process() # 终止mycobot_moveit_py_search.py进程
while True:
if joint_velocities is not None:
if all(abs(vel) < 0.003 for vel in joint_velocities): # 检测关节是否静止
rospy.loginfo("All joints are static")
break
rospy.sleep(1)
custom_function(msg, move_group) # 执行自定义函数
except rospy.ROSInterruptException:
break
# 关节状态回调函数
def joint_states_callback(msg):
global joint_velocities
joint_velocities = msg.velocity # 更新全局变量joint_velocities
# 移动机械臂的函数
def move_arm_increment(move_group, x_increment, y_increment, z_increment):
global failure_count
end_effector_link = move_group.get_end_effector_link() # 获取末端执行器链接
current_pose = move_group.get_current_pose(end_effector_link).pose # 获取当前位姿
target_pose = geometry_msgs.msg.Pose() # 创建目标位姿对象
target_pose.position.x = current_pose.position.x + x_increment # 计算新的x目标位置
target_pose.position.y = current_pose.position.y + y_increment # 计算新的y目标位置
target_pose.position.z = current_pose.position.z + z_increment # 计算新的z目标位置
target_pose.orientation = current_pose.orientation # 保持当前的姿态
move_group.set_start_state_to_current_state() # 设置起始状态为当前状态
move_group.set_pose_target(target_pose) # 设置目标位姿
while True:
plan = move_group.plan() # 规划路径
if plan[0]:
rospy.loginfo("Plan (pose goal) succeeded")
move_group.execute(plan[1], wait=True) # 执行路径
failure_count = 0 # 重置失败计数器
break
else:
rospy.loginfo("Plan (pose goal) failed")
failure_count += 1
if failure_count >= 3:
rospy.loginfo("Planning failed 3 times, returning to home position and restarting the program.")
move_group.set_named_target("home") # 设置目标位置为home
move_group.go(wait=True) # 移动到home位置
subprocess.call(["rosrun", "mycobot_moveit_py_crtl", "mycobot_moveit_py_search.py"]) # 重启程序
rospy.signal_shutdown("Restarting the program") # 关闭节点
return
while True:
current_pose = move_group.get_current_pose(end_effector_link).pose # 获取当前位姿
if (abs(current_pose.position.x - target_pose.position.x) <= 0.01 and
abs(current_pose.position.y - target_pose.position.y) <= 0.01 and
abs(current_pose.position.z - target_pose.position.z) <= 0.01): # 检查是否到达目标位置
break
else:
rospy.sleep(0.5)
rospy.sleep(1)
while True:
if joint_velocities is not None:
if all(abs(vel) < 0.003 for vel in joint_velocities): # 检测关节是否静止
rospy.loginfo("All joints are static")
break
rospy.sleep(0.5)
if __name__ == '__main__':
listener() # 启动主监听函数
代码运行流程及实现的功能:
1.初始化ROS节点:创建一个名为terminate_node的ROS节点。
2.初始化MoveIt:使用moveit_commander初始化MoveIt,并创建一个MoveGroupCommander对象来控制机械臂。
3.订阅关节状态:订阅/joint_states主题,以获取关节速度信息。
4.监听消息:等待/red_object_center主题的消息,该消息包含xyz增量信息。
5.终止进程:接收到消息后,终止mycobot_moveit_py_search.py进程。
6.检测关节静止:检查所有关节是否静止,确保机械臂稳定。
7.执行自定义函数:调用custom_function函数,处理接收到的xyz增量消息。
8.移动机械臂:根据xyz增量信息,计算新的目标位姿,并规划和执行路径。
9.检测关节静止:再次检查所有关节是否静止,确保机械臂稳定。
10.条件判断:如果xyz增量非常小,关闭节点。
拷贝完后保存退出,然后返回工作空间编译:
catkin_make
三、模型中增加灰色墙体与红色按钮块
touch wall.xacro
<?xml version="1.0"?>
<robot xmlns:xacro="http://www.ros.org/wiki/xacro">
<link name="wall">
<visual>
<origin xyz="0 -0.4 0.5" rpy="0 0 0" />
<geometry >
<box size="1 0.01 1" />
</geometry>
<material name="grey">
<color rgba="0.5 0.5 0.5 1" />
</material>
</visual>
<collision>
<origin xyz="0 -0.4 0.5" rpy="0 0 0" />
<geometry >
<box size="1 0.01 1" />
</geometry>
</collision>
<inertial>
<mass value="0.5"/>
<inertia ixx="0.001" ixy="0.0" ixz="0.0" iyy="0.001" iyz="0.0" izz="0.001"/>
</inertial>
</link>
<joint name="wall_pos" type="fixed">
<parent link="base"/>
<child link="wall"/>
</joint>
<link name="buttom">
<visual>
<origin xyz="-0.09 -0.39 0.5" rpy="0 0 0" />
<geometry >
<box size="0.05 0.01 0.05" />
</geometry>
</visual>
<collision>
<origin xyz="-0.09 -0.39 0.5" rpy="0 0 0" />
<geometry >
<box size="0.05 0.01 0.05" />
</geometry>
</collision>
<inertial>
<mass value="0.5"/>
<inertia ixx="0.001" ixy="0.0" ixz="0.0" iyy="0.001" iyz="0.0" izz="0.001"/>
</inertial>
</link>
<joint name="buttom_pos" type="fixed">
<parent link="wall"/>
<child link="buttom"/>
</joint>
<gazebo reference="wall">
<material>Gazebo/LightGray</material>
</gazebo>
<gazebo reference="buttom">
<material>Gazebo/Red</material>
</gazebo>
</robot>
<xacro:include filename="$(find mycobot_description)/urdf/mycobot_320_pi_2022/wall.xacro"/>
保存退出。
四、执行程序
执行四个程序:联合仿真->视觉识别->移动到目标点->循环扫描
roslaunch mycobot_moveit_config demo_gazebo.launch
rosrun mycobot_moveit_py_image_sub mycobot_moveit_py_image_sub.py
rosrun mycobot_moveit_py_ctrl mycobot_moveit_py_move.py
rosrun mycobot_moveit_py_ctrl mycobot_moveit_py_search.py
最终运行画面:
最终相机准确落在红色块中心,仿真成功。
五、总结
其实这个小项目比较困难的部分是在我第一篇文章讲述的联合仿真环境配置:一、适用于最新moveit版本的mycobot 350 pi 机械臂 gazebo+moveit+rviz联合仿真,后面的动作脚本,视觉识别,以及两者的结合运动反而不是很难写,主要理清楚代码逻辑就好,然而代码逻辑也并不是很难,就是一些api的调用需要多去看官网例程以及多去看ai的代码的解析,所以我后面也没有一句句地解析代码,因为这些网上都有对应的解析。
那么至此,这篇小项目的解析就到此结束了,如果大家有疑问或者觉得我哪里出错的话,请及时在评论区告诉我,谢谢大家!
程序源码:
链接: https://pan.baidu.com/s/1w0HWKm9ZlyDmB6KT7oMCjQ?pwd=diks
作者:WXYANXW