Go to the documentation of this file.
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
22 #include <boost/bind.hpp>
23 #include <boost/enable_shared_from_this.hpp>
39 class GZ_TRANSPORT_VISIBLE PublishTask :
public tbb::task
45 const google::protobuf::Message &_message)
48 this->msg = _message.New();
49 this->msg->CopyFrom(_message);
54 public: tbb::task *execute()
56 this->pub->WaitForConnection();
57 this->pub->Publish(*this->msg,
true);
58 this->pub->SendMessage();
68 private: google::protobuf::Message *msg;
78 class GZ_TRANSPORT_VISIBLE
Node :
79 public boost::enable_shared_from_this<Node>
85 public:
virtual ~
Node();
95 public:
void Init(
const std::string &_space =
"");
108 public:
bool TryInit(
115 public:
bool IsInitialized()
const;
122 public: std::string GetTopicNamespace()
const;
127 public: std::string DecodeTopicName(
const std::string &_topic);
132 public: std::string EncodeTopicName(
const std::string &_topic);
136 public:
unsigned int GetId()
const;
140 public:
void ProcessPublishers();
143 public:
void ProcessIncoming();
148 public:
bool HasLatchedSubscriber(
const std::string &_topic)
const;
157 public:
template<
typename M>
159 const google::protobuf::Message &_message)
162 PublishTask *task =
new(tbb::task::allocate_root())
163 PublishTask(pub, _message);
165 tbb::task::enqueue(*task);
176 public:
template<
typename M>
178 unsigned int _queueLimit = 1000,
181 std::string decodedTopic = this->DecodeTopicName(_topic);
184 decodedTopic, _queueLimit, _hzRate);
186 boost::mutex::scoped_lock lock(this->publisherMutex);
187 publisher->SetNode(shared_from_this());
188 this->publishers.push_back(publisher);
200 public:
template<
typename M,
typename T>
202 void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
203 bool _latching =
false)
206 std::string decodedTopic = this->DecodeTopicName(_topic);
207 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
210 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
218 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
229 public:
template<
typename M>
231 void(*_fp)(
const boost::shared_ptr<M const> &),
232 bool _latching =
false)
235 std::string decodedTopic = this->DecodeTopicName(_topic);
236 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
239 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
240 this->callbacks[decodedTopic].push_back(
247 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
261 void(T::*_fp)(
const std::string &), T *_obj,
262 bool _latching =
false)
265 std::string decodedTopic = this->DecodeTopicName(_topic);
266 ops.
Init(decodedTopic, shared_from_this(), _latching);
269 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
277 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
290 void(*_fp)(
const std::string &),
bool _latching =
false)
293 std::string decodedTopic = this->DecodeTopicName(_topic);
294 ops.
Init(decodedTopic, shared_from_this(), _latching);
297 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
298 this->callbacks[decodedTopic].push_back(
305 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
314 public:
bool HandleData(
const std::string &_topic,
315 const std::string &_msg);
321 public:
bool HandleMessage(
const std::string &_topic,
MessagePtr _msg);
329 public:
void InsertLatchedMsg(
const std::string &_topic,
330 const std::string &_msg);
338 public:
void InsertLatchedMsg(
const std::string &_topic,
344 public: std::string GetMsgType(
const std::string &_topic)
const;
351 public:
void RemoveCallback(
const std::string &_topic,
unsigned int _id);
364 private:
bool PrivateInit(
const std::string &_space,
366 const bool _fallbackToDefault);
368 private: std::string topicNamespace;
369 private: std::vector<PublisherPtr> publishers;
370 private: std::vector<PublisherPtr>::iterator publishersIter;
371 private:
static unsigned int idCounter;
372 private:
unsigned int id;
374 private:
typedef std::list<CallbackHelperPtr> Callback_L;
375 private:
typedef std::map<std::string, Callback_L> Callback_M;
376 private: Callback_M callbacks;
377 private: std::map<std::string, std::list<std::string> > incomingMsgs;
380 private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
382 private: boost::mutex publisherMutex;
383 private: boost::mutex publisherDeleteMutex;
384 private: boost::recursive_mutex incomingMutex;
388 private: boost::recursive_mutex processIncomingMutex;
390 private:
bool initialized;
void ProcessIncoming()
Process incoming messages.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:230
Options for a subscription.
Definition: SubscribeOptions.hh:35
virtual ~Node()
Destructor.
Forward declarations for the common classes.
Definition: Animation.hh:26
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
A Time class, can be used to hold wall- or sim-time. stored as sec and nano-sec.
Definition: Time.hh:44
#define NULL
Definition: CommonTypes.hh:31
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
Forward declarations for transport.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:158
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:289
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:78
unsigned int GetId() const
Get the unique ID of the node.
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
Callback helper Template.
Definition: CallbackHelper.hh:111
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
void RemoveCallback(const std::string &_topic, unsigned int _id)
void Init(const std::string &_space="")
Init the node.
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Adverise a topic.
Definition: Node.hh:177
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:260
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
std::string GetTopicNamespace() const
Get the topic namespace for this node.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:201
void Fini()
Finalize the node.
bool IsInitialized() const
Check if this Node has been initialized.
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:177