转:rosspin、rosspinOnce及多线程订阅
背景
因为在一些点云处理的程序中,出现多个订阅者订阅同一个topic,由于内部处理的时间不同,最后造成显示界面出现卡顿,现象就是用鼠标拖动点云的视角会感觉非常卡,不顺畅。为此,决定先走一遍官方的多线程系列教程。
https://github.com/ros/ros_tutorials/tree/jade-devel/roscpp_tutorials
http://wiki.ros.org/roscpp_tutorials/Tutorials
关于Spin和SpinOnce的解释来自英文书《A Gentle Introduction to ROS》P59页
什么时候用ros::spin()和ros::spinOnce()呢,如果仅仅只是响应topic,就用ros::spin()。当程序中除了响应回调函数还有其他重复性工作的时候,那就在循环中做那些工作,然后调用ros::spinOnce()。如下面的用spinOnce,循环调用print( )函数。
这里一直在打印while 中的部分,并且处理message queue。
而下面的打印输出不会更新,相当于只执行了一次,但是它会不断处理ROS 的message queue(下图右边的subscriber 开头的打印函数只调用了一次,然后一直响应订阅的消息)。
这里只在一开始进入main的时候调用了一次print函数,其余的都在响应topic 去了。
spinOnce( )较常用的做法是while里放publisher所要发布的msg的赋值处理,然后一直循环发布topic。
- ros::Rate loop_rate(10);
- while (ros::ok())
- {
- std_msgs::String msg;
- std::stringstream ss;
- ss << "hello world " << count;
- msg.data = ss.str();
- chatter_pub.publish(msg);
- ros::spinOnce();
- loop_rate.sleep();
- }
1. customer_callback_processing
自定义消息队列处理线程,关键字:JOIN,在主线程spinOnce进入自定义处理线程。http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning这里官网给出了补充说明
- #include "ros/ros.h"
- #include "ros/callback_queue.h"
- #include "std_msgs/String.h"
- #include <boost/thread.hpp>
- /**
- * This tutorial demonstrates the use of custom separate callback queues that can be processed
- * independently, whether in different threads or just at different times.
- */
- void chatterCallbackMainQueue(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("I heard: [ " << msg->data << "] in thread [" << boost::this_thread::get_id() << "] (Main thread)");
- }
- void chatterCallbackCustomQueue(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("I heard: [ " << msg->data << "] in thread [" << boost::this_thread::get_id() << "]");
- }
- /**
- * The custom queue used for one of the subscription callbacks
- */
- ros::CallbackQueue g_queue;
- void callbackThread()
- {
- ROS_INFO_STREAM("Callback thread id=" << boost::this_thread::get_id());
- ros::NodeHandle n;
- while (n.ok())
- {
- g_queue.callAvailable(ros::WallDuration(0.01));
- }
- }
- int main(int argc, char **argv)
- {
- ros::init(argc, argv, "listener_with_custom_callback_processing");
- ros::NodeHandle n;
- /**
- * The SubscribeOptions structure lets you specify a custom queue to use for a specific subscription.
- * You can also set a default queue on a NodeHandle using the NodeHandle::setCallbackQueue() function.
- *
- * AdvertiseOptions and AdvertiseServiceOptions offer similar functionality.
- */
- ros::SubscribeOptions ops = ros::SubscribeOptions::create<std_msgs::String>("chatter", 1000,
- chatterCallbackCustomQueue,
- ros::VoidPtr(), &g_queue);
- ros::Subscriber sub = n.subscribe(ops);
- ros::Subscriber sub2 = n.subscribe("chatter", 1000, chatterCallbackMainQueue);
- boost::thread chatter_thread(callbackThread);
- ROS_INFO_STREAM("Main thread id=" << boost::this_thread::get_id());
- ros::Rate r(1);
- while (n.ok())
- {
- ros::spinOnce();
- r.sleep();
- }
- chatter_thread.join();
- return 0;
- }
2. listenner_multiple_spin
思想:利用一个类的不同成员函数实现同一个topic的不同处理(在一个线程中)。
- #include "ros/ros.h"
- #include "std_msgs/String.h"
- /**
- * This tutorial demonstrates simple receipt of messages over the ROS system, with
- * multiple callbacks for the same topic. See the "listener" tutorial for more
- * information.
- */
- class Listener
- {
- public:
- void chatter1(const std_msgs::String::ConstPtr& msg) { ROS_INFO("chatter1: [%s]", msg->data.c_str()); }
- void chatter2(const std_msgs::String::ConstPtr& msg) { ROS_INFO("chatter2: [%s]", msg->data.c_str()); }
- void chatter3(const std_msgs::String::ConstPtr& msg) { ROS_INFO("chatter3: [%s]", msg->data.c_str()); }
- };
- void chatter4(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO("chatter4: [%s]", msg->data.c_str());
- }
- int main(int argc, char **argv)
- {
- ros::init(argc, argv, "listener");
- ros::NodeHandle n;
- Listener l;
- ros::Subscriber sub1 = n.subscribe("chatter", 10, &Listener::chatter1, &l);
- ros::Subscriber sub2 = n.subscribe("chatter", 10, &Listener::chatter2, &l);
- ros::Subscriber sub3 = n.subscribe("chatter", 10, &Listener::chatter3, &l);
- ros::Subscriber sub4 = n.subscribe("chatter", 10, chatter4);
- ros::spin();
- return 0;
- }
3. listener_threaded_spin
思想:利用一个类的不同成员函数实现同一个topic的不同处理(在四个不同线程中)。
- #include "ros/ros.h"
- #include "std_msgs/String.h"
- #include <boost/thread.hpp>
- /**
- * This tutorial demonstrates simple receipt of messages over the ROS system, using
- * a threaded Spinner object to receive callbacks in multiple threads at the same time.
- */
- ros::Duration d(0.01);
- class Listener
- {
- public:
- void chatter1(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("chatter1: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
- d.sleep();
- }
- void chatter2(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("chatter2: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
- d.sleep();
- }
- void chatter3(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("chatter3: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
- d.sleep();
- }
- };
- void chatter4(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("chatter4: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
- d.sleep();
- }
- int main(int argc, char **argv)
- {
- ros::init(argc, argv, "listener");
- ros::NodeHandle n;
- Listener l;
- ros::Subscriber sub1 = n.subscribe("chatter", 10, &Listener::chatter1, &l);
- ros::Subscriber sub2 = n.subscribe("chatter", 10, &Listener::chatter2, &l);
- ros::Subscriber sub3 = n.subscribe("chatter", 10, &Listener::chatter3, &l);
- ros::Subscriber sub4 = n.subscribe("chatter", 10, chatter4);
- /**
- * The MultiThreadedSpinner object allows you to specify a number of threads to use
- * to call callbacks. If no explicit # is specified, it will use the # of hardware
- * threads available on your system. Here we explicitly specify 4 threads.
- */
- ros::MultiThreadedSpinner s(4);
- ros::spin(s);
- return 0;
- }
4. listener_async_spin
因为这里用的是spinOnce,主线程每隔(1/5秒)打印,然后sleep把控制权交给系统,系统然后就把控制权分配到四个子线程,所以结果看到一共是五个线程。- #include "ros/ros.h"
- #include "std_msgs/String.h"
- #include <boost/thread.hpp>
- /**
- * This tutorial demonstrates simple receipt of messages over the ROS system, using
- * an asynchronous Spinner object to receive callbacks in multiple threads at the same time.
- */
- ros::Duration d(0.01);
- class Listener
- {
- public:
- void chatter1(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("chatter1: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
- d.sleep();
- }
- void chatter2(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("chatter2: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
- d.sleep();
- }
- void chatter3(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("chatter3: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
- d.sleep();
- }
- };
- void chatter4(const std_msgs::String::ConstPtr& msg)
- {
- ROS_INFO_STREAM("chatter4: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
- d.sleep();
- }
- int main(int argc, char **argv)
- {
- ros::init(argc, argv, "listener");
- ros::NodeHandle n;
- Listener l;
- ros::Subscriber sub1 = n.subscribe("chatter", 10, &Listener::chatter1, &l);
- ros::Subscriber sub2 = n.subscribe("chatter", 10, &Listener::chatter2, &l);
- ros::Subscriber sub3 = n.subscribe("chatter", 10, &Listener::chatter3, &l);
- ros::Subscriber sub4 = n.subscribe("chatter", 10, chatter4);
- /**
- * The AsyncSpinner object allows you to specify a number of threads to use
- * to call callbacks. If no explicit # is specified, it will use the # of hardware
- * threads available on your system. Here we explicitly specify 4 threads.
- */
- ros::AsyncSpinner s(4);
- s.start();
- ros::Rate r(5);
- while (ros::ok())
- {
- ROS_INFO_STREAM("Main thread [" << boost::this_thread::get_id() << "].");
- r.sleep();
- }
- return 0;
- }
============2016.11====一次订阅四个topic===============
http://wiki.ros.org/message_filters#ApproximateTime_Policy
https://github.com/tum-vision/rgbd_demo/blob/master/src/example.cpp
- #include <message_filters/subscriber.h>
- #include <message_filters/synchronizer.h>
- #include <message_filters/sync_policies/approximate_time.h>
- #include <sensor_msgs/Image.h>
- #include <sensor_msgs/CameraInfo.h>
- #include <cv_bridge/cv_bridge.h>
- #include <opencv2/imgproc/imgproc.hpp>
- #include <opencv2/highgui/highgui.hpp>
- #include <iostream>
- #include <ros/ros.h>
- #include <pcl_ros/point_cloud.h>
- #include <pcl/point_types.h>
- // Note: for colorized point cloud, use PointXYZRGBA
- typedef pcl::PointCloud<pcl::PointXYZ> PointCloud;
- using namespace std;
- using namespace sensor_msgs;
- using namespace message_filters;
- ros::Publisher pointcloud_pub;
- void callback(const ImageConstPtr& image_color_msg,
- const ImageConstPtr& image_depth_msg,
- const CameraInfoConstPtr& info_color_msg,
- const CameraInfoConstPtr& info_depth_msg)
- {
- // Solve all of perception here...
- cv::Mat image_color = cv_bridge::toCvCopy(image_color_msg)->image;
- cv::Mat image_depth = cv_bridge::toCvCopy(image_depth_msg)->image;
- cvtColor(image_color,image_color, CV_RGB2BGR);
- // colorize the image
- cv::Vec3b color(255,0,0);
- for(int y=0;y<image_color.rows;y++)
- {
- for(int x=0;x<image_color.cols;x++)
- {
- float depth = image_depth.at<short int>(cv::Point(x,y)) / 1000.0;
- if( depth == 0)
- {
- image_color.at<cv::Vec3b>(cv::Point(x,y)) = cv::Vec3b(0,0,255);
- }
- if( depth > 1.00)
- {
- image_color.at<cv::Vec3b>(cv::Point(x,y)) = cv::Vec3b(255,0,0);
- }
- }
- }
- //cout <<"depth[320,240]="<< image_depth.at<short int>(cv::Point(320,240)) / 1000.0 <<"m"<< endl;
- image_color.at<cv::Vec3b>(cv::Point(320,240)) = cv::Vec3b(255,255,255);
- // get camera intrinsics
- float fx = info_depth_msg->K[0];
- float fy = info_depth_msg->K[4];
- float cx = info_depth_msg->K[2];
- float cy = info_depth_msg->K[5];
- //cout << "fx="<< fx << " fy="<<fy<<" cx="<<cx<<" cy="<<cy<<endl;
- // produce a point cloud
- PointCloud::Ptr pointcloud_msg (new PointCloud);
- pointcloud_msg->header = image_depth_msg->header;
- pcl::PointXYZ pt;
- for(int y=0;y<image_color.rows;y+=4)
- {
- for(int x=0;x<image_color.cols;x+=4)
- {
- float depth = image_depth.at<short int>(cv::Point(x,y)) / 1000.0;
- if(depth>0)
- {
- pt.x = (x - cx) * depth / fx;
- pt.y = (y - cy) * depth / fy;
- pt.z = depth;
- //cout << pt.x<<" "<<pt.y<<" "<<pt.z<<endl;
- pointcloud_msg->points.push_back (pt);
- }
- }
- }
- pointcloud_msg->height = 1;
- pointcloud_msg->width = pointcloud_msg->points.size();
- pointcloud_pub.publish (pointcloud_msg);
- cv::imshow("color", image_color);
- cv::waitKey(3);
- }
- int main(int argc, char** argv)
- {
- ros::init(argc, argv, "vision_node");
- ros::NodeHandle nh;
- message_filters::Subscriber<Image> image_color_sub(nh,"/camera/rgb/image_raw", 1);
- message_filters::Subscriber<Image> image_depth_sub(nh,"/camera/depth_registered/image_raw", 1);
- message_filters::Subscriber<CameraInfo> info_color_sub(nh,"/camera/rgb/camera_info", 1);
- message_filters::Subscriber<CameraInfo> info_depth_sub(nh,"/camera/depth_registered/camera_info", 1);
- pointcloud_pub = nh.advertise<PointCloud> ("mypoints", 1);
- typedef sync_policies::ApproximateTime<Image, Image, CameraInfo, CameraInfo> MySyncPolicy;
- Synchronizer<MySyncPolicy> sync(MySyncPolicy(10), image_color_sub, image_depth_sub, info_color_sub, info_depth_sub);
- sync.registerCallback(boost::bind(&callback, _1, _2, _3, _4));
- ros::spin();
- return 0;
- }
5. listener_with_userdata
利用boost_bind向消息回调函数传入多个参数
关于boost_bind以前写过一个用法简介http://blog.****.net/yaked/article/details/44942773
这里是boost::bind(&listener::chatterCallback, this, _1, "User 1" )也即this->chatterCallback( "User1"), _1表示第一个输入参数, 是个占位标记。
- #include "ros/ros.h"
- #include "std_msgs/String.h"
- /**
- * This tutorial demonstrates a simple use of Boost.Bind to pass arbitrary data into a subscription
- * callback. For more information on Boost.Bind see the documentation on the boost homepage,
- * http://www.boost.org/
- */
- class Listener
- {
- public:
- ros::NodeHandle node_handle_;
- ros::V_Subscriber subs_;
- Listener(const ros::NodeHandle& node_handle): node_handle_(node_handle)
- {
- }
- void init()
- {
- subs_.push_back(node_handle_.subscribe<std_msgs::String>("chatter", 1000, boost::bind(&Listener::chatterCallback, this, _1, "User 1")));
- subs_.push_back(node_handle_.subscribe<std_msgs::String>("chatter", 1000, boost::bind(&Listener::chatterCallback, this, _1, "User 2")));
- subs_.push_back(node_handle_.subscribe<std_msgs::String>("chatter", 1000, boost::bind(&Listener::chatterCallback, this, _1, "User 3")));
- }
- void chatterCallback(const std_msgs::String::ConstPtr& msg, std::string user_string)
- {
- ROS_INFO("I heard: [%s] with user string [%s]", msg->data.c_str(), user_string.c_str());
- }
- };
- int main(int argc, char **argv)
- {
- ros::init(argc, argv, "listener_with_userdata");
- ros::NodeHandle n;
- Listener l(n);
- l.init();
- ros::spin();
- return 0;
- }
6. timers
可以参考官网http://wiki.ros.org/roscpp/Overview/Timers
- #include "ros/ros.h"
- /**
- * This tutorial demonstrates the use of timer callbacks.
- */
- void callback1(const ros::TimerEvent&)
- {
- ROS_INFO("Callback 1 triggered");
- }
- void callback2(const ros::TimerEvent&)
- {
- ROS_INFO("Callback 2 triggered");
- }
- int main(int argc, char **argv)
- {
- ros::init(argc, argv, "talker");
- ros::NodeHandle n;
- ros::Timer timer1 = n.createTimer(ros::Duration(0.1), callback1);
- ros::Timer timer2 = n.createTimer(ros::Duration(1.0), callback2);
- ros::spin();
- return 0;
- }
======================2016.07=====定时器带入附加参数===============
- #include "ros/ros.h"
- #include "cstring"
- #include "iostream"
- #include "han_agv/SocketClient.h"
- #include "han_agv/VelEncoder.h"
- using namespace std;
- const float_t PI = 3.14159;
- const int ENCODER_PER_LOOP = 900;
- const float_t WHEEL_RADIUS = 0.075;
- ClientSocket soc;
- // ros_tutorials/turtlesim/tutorials/draw_square.cpp
- void timerCallback(const ros::TimerEvent&, ros::Publisher vel_pub)
- {
- ROS_INFO("Timer callback triggered");
- AGVDATA buffer;
- AGVSENSORS buffer_left_encoder_start;
- AGVSENSORS buffer_right_encoder_start;
- // if(soc.recvMsg(buffer) == 0)
- // {
- // cout << "Failed to receive message." << endl;
- // return;
- // }
- // Reverse the High and Low byte.
- double time_delay = 0.1;
- ros::Duration(time_delay).sleep ();
- buffer.SensorsData.WheelLeft_Encoder[0] = buffer.SensorsData.WheelLeft_Encoder[3];
- buffer.SensorsData.WheelLeft_Encoder[1] = buffer.SensorsData.WheelLeft_Encoder[2];
- buffer.SensorsData.WheelRight_Encoder[0] = buffer.SensorsData.WheelRight_Encoder[3];
- buffer.SensorsData.WheelRight_Encoder[1] = buffer.SensorsData.WheelRight_Encoder[2];
- han_agv::VelEncoder vel;
- vel.left_velocity = 1.0;
- vel.right_velocity = 2.0;
- vel_pub.publish(vel);
- }
- int main(int argc, char** argv)
- {
- ros::init(argc, argv, "TCPDecode_node");
- ros::NodeHandle nh;
- ROS_INFO("create node successfully!");
- ros::Publisher vel_pub = nh.advertise<han_agv::VelEncoder>("HansAGV/encoder_vel", 1);
- ros::Timer decoder_timer = nh.createTimer(ros::Duration(0.1), boost::bind(timerCallback, _1, vel_pub));
- ros::spin();
- }
7. notify_connect
- #include "ros/ros.h"
- #include "std_msgs/String.h"
- #include <sstream>
- /**
- * This tutorial demonstrates how to get a callback when a new subscriber connects
- * to an advertised topic, or a subscriber disconnects.
- */
- uint32_t g_count = 0;
- void chatterConnect(const ros::SingleSubscriberPublisher& pub)
- {
- std_msgs::String msg;
- std::stringstream ss;
- ss << "chatter connect #" << g_count++;
- ROS_INFO("%s", ss.str().c_str());
- msg.data = ss.str();
- pub.publish(msg); // This message will get published only to the subscriber that just connected
- }
- void chatterDisconnect(const ros::SingleSubscriberPublisher&)
- {
- ROS_INFO("chatter disconnect");
- }
- int main(int argc, char **argv)
- {
- ros::init(argc, argv, "notify_connect");
- ros::NodeHandle n;
- /**
- * This version of advertise() optionally takes a connect/disconnect callback
- */
- ros::Publisher pub = n.advertise<std_msgs::String>("chatter", 1000, chatterConnect, chatterDisconnect);
- ros::spin();
- return 0;
- }
断开连接,通知延后了一分多钟。囧囧!!