Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
20 
21 #ifndef Q_MOC_RUN
22 #include <tbb/task.h>
23 #endif
24 #include <boost/bind.hpp>
25 #include <boost/enable_shared_from_this.hpp>
26 #include <map>
27 #include <list>
28 #include <string>
29 #include <vector>
30 
33 #include "gazebo/util/system.hh"
34 
35 namespace gazebo
36 {
37  namespace transport
38  {
41  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
42  {
46  public: PublishTask(transport::PublisherPtr _pub,
47  const google::protobuf::Message &_message)
48  : pub(_pub)
49  {
50  this->msg = _message.New();
51  this->msg->CopyFrom(_message);
52  }
53 
56  public: tbb::task *execute()
57  {
58  this->pub->WaitForConnection();
59  this->pub->Publish(*this->msg, true);
60  this->pub->SendMessage();
61  delete this->msg;
62  this->pub.reset();
63  return NULL;
64  }
65 
67  private: transport::PublisherPtr pub;
68 
70  private: google::protobuf::Message *msg;
71  };
73 
76 
80  class GZ_TRANSPORT_VISIBLE Node :
81  public boost::enable_shared_from_this<Node>
82  {
84  public: Node();
85 
87  public: virtual ~Node();
88 
97  public: void Init(const std::string &_space ="");
98 
110  public: bool TryInit(
111  const common::Time &_maxWait = common::Time(1, 0));
112 
117  public: bool IsInitialized() const;
118 
120  public: void Fini();
121 
124  public: std::string GetTopicNamespace() const;
125 
129  public: std::string DecodeTopicName(const std::string &_topic);
130 
134  public: std::string EncodeTopicName(const std::string &_topic);
135 
138  public: unsigned int GetId() const;
139 
142  public: void ProcessPublishers();
143 
145  public: void ProcessIncoming();
146 
150  public: bool HasLatchedSubscriber(const std::string &_topic) const;
151 
152 
159  public: template<typename M>
160  void Publish(const std::string &_topic,
161  const google::protobuf::Message &_message)
162  {
163  transport::PublisherPtr pub = this->Advertise<M>(_topic);
164  PublishTask *task = new(tbb::task::allocate_root())
165  PublishTask(pub, _message);
166 
167  tbb::task::enqueue(*task);
168  return;
169  }
170 
178  public: template<typename M>
179  transport::PublisherPtr Advertise(const std::string &_topic,
180  unsigned int _queueLimit = 1000,
181  double _hzRate = 0)
182  {
183  std::string decodedTopic = this->DecodeTopicName(_topic);
184  PublisherPtr publisher =
185  transport::TopicManager::Instance()->Advertise<M>(
186  decodedTopic, _queueLimit, _hzRate);
187 
188  boost::mutex::scoped_lock lock(this->publisherMutex);
189  publisher->SetNode(shared_from_this());
190  this->publishers.push_back(publisher);
191 
192  return publisher;
193  }
194 
201  public: void Publish(const std::string &_topic,
202  const google::protobuf::Message &_message)
203  {
204  transport::PublisherPtr pub = this->Advertise(_topic,
205  _message.GetTypeName());
206  pub->WaitForConnection();
207 
208  pub->Publish(_message, true);
209  }
210 
218  public: transport::PublisherPtr Advertise(const std::string &_topic,
219  const std::string &_msgTypeName,
220  unsigned int _queueLimit = 1000,
221  double _hzRate = 0)
222  {
223  std::string decodedTopic = this->DecodeTopicName(_topic);
224  PublisherPtr publisher =
226  decodedTopic, _msgTypeName, _queueLimit, _hzRate);
227 
228  boost::mutex::scoped_lock lock(this->publisherMutex);
229  publisher->SetNode(shared_from_this());
230  this->publishers.push_back(publisher);
231 
232  return publisher;
233  }
234 
242  public: template<typename M, typename T>
243  SubscriberPtr Subscribe(const std::string &_topic,
244  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
245  bool _latching = false)
246  {
247  SubscribeOptions ops;
248  std::string decodedTopic = this->DecodeTopicName(_topic);
249  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
250 
251  {
252  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
253  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
254  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
255  }
256 
257  SubscriberPtr result =
258  transport::TopicManager::Instance()->Subscribe(ops);
259 
260  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
261 
262  return result;
263  }
264 
271  public: template<typename M>
272  SubscriberPtr Subscribe(const std::string &_topic,
273  void(*_fp)(const boost::shared_ptr<M const> &),
274  bool _latching = false)
275  {
276  SubscribeOptions ops;
277  std::string decodedTopic = this->DecodeTopicName(_topic);
278  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
279 
280  {
281  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
282  this->callbacks[decodedTopic].push_back(
283  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
284  }
285 
286  SubscriberPtr result =
287  transport::TopicManager::Instance()->Subscribe(ops);
288 
289  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
290 
291  return result;
292  }
293 
301  template<typename T>
302  SubscriberPtr Subscribe(const std::string &_topic,
303  void(T::*_fp)(const std::string &), T *_obj,
304  bool _latching = false)
305  {
306  SubscribeOptions ops;
307  std::string decodedTopic = this->DecodeTopicName(_topic);
308  ops.Init(decodedTopic, shared_from_this(), _latching);
309 
310  {
311  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
312  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
313  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
314  }
315 
316  SubscriberPtr result =
317  transport::TopicManager::Instance()->Subscribe(ops);
318 
319  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
320 
321  return result;
322  }
323 
324 
331  SubscriberPtr Subscribe(const std::string &_topic,
332  void(*_fp)(const std::string &), bool _latching = false)
333  {
334  SubscribeOptions ops;
335  std::string decodedTopic = this->DecodeTopicName(_topic);
336  ops.Init(decodedTopic, shared_from_this(), _latching);
337 
338  {
339  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
340  this->callbacks[decodedTopic].push_back(
342  }
343 
344  SubscriberPtr result =
345  transport::TopicManager::Instance()->Subscribe(ops);
346 
347  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
348 
349  return result;
350  }
351 
356  public: bool HandleData(const std::string &_topic,
357  const std::string &_msg);
358 
363  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
364 
371  public: void InsertLatchedMsg(const std::string &_topic,
372  const std::string &_msg);
373 
380  public: void InsertLatchedMsg(const std::string &_topic,
381  MessagePtr _msg);
382 
386  public: std::string GetMsgType(const std::string &_topic) const;
387 
393  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
394 
406  private: bool PrivateInit(const std::string &_space,
407  const common::Time &_maxWait,
408  const bool _fallbackToDefault);
409 
410  private: std::string topicNamespace;
411  private: std::vector<PublisherPtr> publishers;
412  private: std::vector<PublisherPtr>::iterator publishersIter;
413  private: static unsigned int idCounter;
414  private: unsigned int id;
415 
416  private: typedef std::list<CallbackHelperPtr> Callback_L;
417  private: typedef std::map<std::string, Callback_L> Callback_M;
418  private: Callback_M callbacks;
419  private: std::map<std::string, std::list<std::string> > incomingMsgs;
420 
422  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
423 
424  private: boost::mutex publisherMutex;
425  private: boost::mutex publisherDeleteMutex;
426  private: boost::recursive_mutex incomingMutex;
427 
430  private: boost::recursive_mutex processIncomingMutex;
431 
432  private: bool initialized;
433  };
435  }
436 }
437 #endif
#define NULL
Definition: CommonTypes.hh:31
transport
Definition: ConnectionManager.hh:35
Forward declarations for transport.
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
A Time class, can be used to hold wall- or sim-time.
Definition: Time.hh:48
Callback helper Template.
Definition: CallbackHelper.hh:112
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:82
void Fini()
Finalize the node.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:272
std::string GetTopicNamespace() const
Get the topic namespace for this node.
void InsertLatchedMsg(const std::string &_topic, MessagePtr _msg)
Add a latched message to the node for publication.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:302
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:218
bool IsInitialized() const
Check if this Node has been initialized.
unsigned int GetId() const
Get the unique ID of the node.
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:179
void ProcessIncoming()
Process incoming messages.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:201
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
void Init(const std::string &_space="")
Init the node.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:243
virtual ~Node()
Destructor.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
void RemoveCallback(const std::string &_topic, unsigned int _id)
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:160
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:331
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:178
Options for a subscription.
Definition: SubscribeOptions.hh:36
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
Forward declarations for the common classes.
Definition: Animation.hh:27