Fawkes API  Fawkes Development Version
peer.cpp
1 /***************************************************************************
2  * peer.cpp - Protobuf stream protocol - broadcast peer
3  *
4  * Created: Mon Feb 04 17:19:17 2013
5  * Copyright 2013 Tim Niemueller [www.niemueller.de]
6  ****************************************************************************/
7 
8 /* Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * - Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  * - Redistributions in binary form must reproduce the above copyright
15  * notice, this list of conditions and the following disclaimer in
16  * the documentation and/or other materials provided with the
17  * distribution.
18  * - Neither the name of the authors nor the names of its contributors
19  * may be used to endorse or promote products derived from this
20  * software without specific prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
27  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
28  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
29  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
31  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
33  * OF THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <protobuf_comm/crypto.h>
37 #include <protobuf_comm/peer.h>
38 
39 #include <boost/lexical_cast.hpp>
40 #include <ifaddrs.h>
41 
42 using namespace boost::asio;
43 using namespace boost::system;
44 
45 namespace protobuf_comm {
46 
47 /** @class ProtobufBroadcastPeer <protobuf_comm/peer.h>
48  * Communicate by broadcasting protobuf messages.
49  * This class allows to communicate via UDP by broadcasting messages to the
50  * network.
51  * @author Tim Niemueller
52  */
53 
54 /** Constructor.
55  * @param address IPv4 broadcast address to send to
56  * @param port IPv4 UDP port to listen on and to send to
57  */
58 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address, unsigned short port)
59 : io_service_(),
60  resolver_(io_service_),
61  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
62  resolve_retry_timer_(io_service_)
63 {
64  message_register_ = new MessageRegister();
65  own_message_register_ = true;
66  ctor(address, port);
67 }
68 
69 /** Testing constructor.
70  * This constructor listens and sends to different ports. It can be used to
71  * send and receive on the same host or even from within the same process.
72  * It is most useful for communication tests.
73  * @param address IPv4 address to send to
74  * @param send_to_port IPv4 UDP port to send data to
75  * @param recv_on_port IPv4 UDP port to receive data on
76  */
77 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address,
78  unsigned short send_to_port,
79  unsigned short recv_on_port)
80 : io_service_(),
81  resolver_(io_service_),
82  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
83  resolve_retry_timer_(io_service_)
84 {
85  message_register_ = new MessageRegister();
86  own_message_register_ = true;
87  ctor(address, send_to_port);
88 }
89 
90 /** Constructor.
91  * @param address IPv4 broadcast address to send to
92  * @param port IPv4 UDP port to listen on and to send to
93  * @param proto_path list of file system paths where to look for proto files
94  */
95 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address,
96  unsigned short port,
97  std::vector<std::string> &proto_path)
98 : io_service_(),
99  resolver_(io_service_),
100  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
101  resolve_retry_timer_(io_service_)
102 {
103  message_register_ = new MessageRegister(proto_path);
104  own_message_register_ = true;
105  ctor(address, port);
106 }
107 
108 /** Testing constructor.
109  * This constructor listens and sends to different ports. It can be used to
110  * send and receive on the same host or even from within the same process.
111  * It is most useful for communication tests.
112  * @param address IPv4 address to send to
113  * @param send_to_port IPv4 UDP port to send data to
114  * @param recv_on_port IPv4 UDP port to receive data on
115  * @param proto_path list of file system paths where to look for proto files
116  */
117 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address,
118  unsigned short send_to_port,
119  unsigned short recv_on_port,
120  std::vector<std::string> &proto_path)
121 : io_service_(),
122  resolver_(io_service_),
123  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
124  resolve_retry_timer_(io_service_)
125 {
126  message_register_ = new MessageRegister(proto_path);
127  own_message_register_ = true;
128  ctor(address, send_to_port);
129 }
130 
131 /** Constructor.
132  * @param address IPv4 broadcast address to send to
133  * @param port IPv4 UDP port to listen on and to send to
134  * @param mr message register to query for message types
135  */
136 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address,
137  unsigned short port,
138  MessageRegister * mr)
139 : io_service_(),
140  resolver_(io_service_),
141  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
142  resolve_retry_timer_(io_service_),
143  message_register_(mr),
144  own_message_register_(false)
145 {
146  ctor(address, port);
147 }
148 
149 /** Constructor with encryption.
150  * @param address IPv4 broadcast address to send to
151  * @param send_to_port IPv4 UDP port to send data to
152  * @param recv_on_port IPv4 UDP port to receive data on
153  * @param crypto_key encryption key for messages
154  * @param cipher cipher to use for encryption
155  */
156 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address,
157  unsigned short send_to_port,
158  unsigned short recv_on_port,
159  const std::string crypto_key,
160  const std::string cipher)
161 : io_service_(),
162  resolver_(io_service_),
163  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
164  resolve_retry_timer_(io_service_)
165 {
166  ctor(address, send_to_port, crypto_key, cipher);
167  message_register_ = new MessageRegister();
168  own_message_register_ = true;
169 }
170 
171 /** Constructor with encryption.
172  * @param address IPv4 broadcast address to send to
173  * @param send_to_port IPv4 UDP port to send data to
174  * @param recv_on_port IPv4 UDP port to receive data on
175  * @param mr message register to query for message types
176  * @param crypto_key encryption key for messages
177  * @param cipher cipher to use for encryption
178  */
179 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address,
180  unsigned short send_to_port,
181  unsigned short recv_on_port,
182  MessageRegister * mr,
183  const std::string crypto_key,
184  const std::string cipher)
185 : io_service_(),
186  resolver_(io_service_),
187  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
188  resolve_retry_timer_(io_service_),
189  message_register_(mr),
190  own_message_register_(false)
191 {
192  ctor(address, send_to_port, crypto_key, cipher);
193 }
194 
195 /** Constructor with encryption.
196  * @param address IPv4 broadcast address to send to
197  * @param port IPv4 UDP port to listen on and to send to
198  * @param crypto_key encryption key for messages
199  * @param cipher cipher to use for encryption
200  */
201 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address,
202  unsigned short port,
203  const std::string crypto_key,
204  const std::string cipher)
205 : io_service_(),
206  resolver_(io_service_),
207  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
208  resolve_retry_timer_(io_service_)
209 {
210  ctor(address, port, crypto_key, cipher);
211  message_register_ = new MessageRegister();
212  own_message_register_ = true;
213 }
214 
215 /** Constructor with encryption.
216  * @param address IPv4 broadcast address to send to
217  * @param port IPv4 UDP port to listen on and to send to
218  * @param mr message register to query for message types
219  * @param crypto_key encryption key for messages
220  * @param cipher cipher to use for encryption
221  */
222 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address,
223  unsigned short port,
224  MessageRegister * mr,
225  const std::string crypto_key,
226  const std::string cipher)
227 : io_service_(),
228  resolver_(io_service_),
229  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
230  resolve_retry_timer_(io_service_),
231  message_register_(mr),
232  own_message_register_(false)
233 {
234  ctor(address, port, crypto_key, cipher);
235 }
236 
237 /** Testing constructor.
238  * This constructor listens and sends to different ports. It can be used to
239  * send and receive on the same host or even from within the same process.
240  * It is most useful for communication tests.
241  * @param address IPv4 address to send to
242  * @param send_to_port IPv4 UDP port to send data to
243  * @param recv_on_port IPv4 UDP port to receive data on
244  * @param mr message register to query for message types
245  * @param header_version which frame header version to send, use with caution
246  */
247 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address,
248  unsigned short send_to_port,
249  unsigned short recv_on_port,
250  MessageRegister * mr,
251  frame_header_version_t header_version)
252 : io_service_(),
253  resolver_(io_service_),
254  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
255  resolve_retry_timer_(io_service_),
256  message_register_(mr),
257  own_message_register_(false)
258 {
259  ctor(address, send_to_port, "", "", header_version);
260 }
261 
262 /** Constructor helper.
263  * @param address hostname/address to send to
264  * @param send_to_port UDP port to send messages to
265  * @param crypto_key encryption key for messages
266  * @param cipher cipher to use for encryption
267  * @þaram header_version which frame header version to send, use with caution
268  */
269 void
270 ProtobufBroadcastPeer::ctor(const std::string & address,
271  unsigned int send_to_port,
272  const std::string crypto_key,
273  const std::string cipher,
274  frame_header_version_t header_version)
275 {
276  filter_self_ = true;
277  crypto_ = false;
278  crypto_enc_ = NULL;
279  crypto_dec_ = NULL;
280  frame_header_version_ = header_version;
281 
282  send_to_address_ = address;
283  send_to_port_ = send_to_port;
284 
285  in_data_size_ = max_packet_length;
286  in_data_ = malloc(in_data_size_);
287  enc_in_data_ = NULL;
288 
289  socket_.set_option(socket_base::broadcast(true));
290  socket_.set_option(socket_base::reuse_address(true));
291  determine_local_endpoints();
292 
293  outbound_ready_ = outbound_active_ = false;
294  start_resolve();
295 
296  if (!crypto_key.empty())
297  setup_crypto(crypto_key, cipher);
298 
299  start_recv();
300  asio_thread_ = std::thread(&ProtobufBroadcastPeer::run_asio, this);
301 }
302 
303 /** Destructor. */
305 {
306  resolve_retry_timer_.cancel();
307  if (asio_thread_.joinable()) {
308  io_service_.stop();
309  asio_thread_.join();
310  }
311  free(in_data_);
312  if (enc_in_data_)
313  free(enc_in_data_);
314  if (own_message_register_) {
315  delete message_register_;
316  }
317 
318  delete crypto_enc_;
319  delete crypto_dec_;
320 }
321 
322 /** Setup encryption.
323  * After this call communication will be encrypted. Note that the first
324  * received message might be considered invalid because we are still
325  * listening for plain text messages. To avoid this use the constructor
326  * which takes the encryption key as parameter.
327  * @param key encryption key
328  * @param cipher cipher to use for encryption
329  * @see BufferEncryptor for supported ciphers
330  */
331 void
332 ProtobufBroadcastPeer::setup_crypto(const std::string &key, const std::string &cipher)
333 {
334  if (frame_header_version_ == PB_FRAME_V1) {
335  throw std::runtime_error("Crypto support only available with V2+ frame header");
336  }
337 
338  delete crypto_enc_;
339  delete crypto_dec_;
340  crypto_enc_ = NULL;
341  crypto_dec_ = NULL;
342  crypto_ = false;
343  crypto_buf_ = false;
344 
345  if (key != "" && cipher != "") {
346  crypto_enc_ = new BufferEncryptor(key, cipher);
347 
348  if (!enc_in_data_) {
349  // this depends on the cipher, but nothing is two times the incoming buffer...
350  enc_in_data_size_ = 2 * in_data_size_;
351  enc_in_data_ = malloc(enc_in_data_size_);
352  }
353 
354  crypto_dec_ = new BufferDecryptor(key);
355  crypto_ = true;
356  crypto_buf_ = false;
357  }
358 }
359 
360 void
361 ProtobufBroadcastPeer::determine_local_endpoints()
362 {
363  struct ifaddrs *ifap;
364  if (getifaddrs(&ifap) == 0) {
365  for (struct ifaddrs *iter = ifap; iter != NULL; iter = iter->ifa_next) {
366  if (iter->ifa_addr == NULL)
367  continue;
368  if (iter->ifa_addr->sa_family == AF_INET) {
369  boost::asio::ip::address_v4 addr(
370  ntohl(reinterpret_cast<sockaddr_in *>(iter->ifa_addr)->sin_addr.s_addr));
371 
372  local_endpoints_.push_back(
373  boost::asio::ip::udp::endpoint(addr, socket_.local_endpoint().port()));
374  }
375  }
376  freeifaddrs(ifap);
377  }
378  local_endpoints_.sort();
379 }
380 
381 /** Set if to filter out own messages.
382  * @param filter true to filter out own messages, false to receive them
383  */
384 void
386 {
387  filter_self_ = filter;
388 }
389 
390 /** ASIO thread runnable. */
391 void
392 ProtobufBroadcastPeer::run_asio()
393 {
394 #if BOOST_ASIO_VERSION > 100409
395  while (!io_service_.stopped()) {
396 #endif
397  usleep(0);
398  io_service_.reset();
399  io_service_.run();
400 #if BOOST_ASIO_VERSION > 100409
401  }
402 #endif
403 }
404 
405 void
406 ProtobufBroadcastPeer::handle_resolve(const boost::system::error_code &err,
407  ip::udp::resolver::iterator endpoint_iterator)
408 {
409  if (!err) {
410  std::lock_guard<std::mutex> lock(outbound_mutex_);
411  outbound_ready_ = true;
412  outbound_endpoint_ = endpoint_iterator->endpoint();
413  } else {
414  sig_send_error_("Resolving endpoint failed, retrying");
415  resolve_retry_timer_.expires_from_now(boost::posix_time::seconds(2));
416  resolve_retry_timer_.async_wait(boost::bind(&ProtobufBroadcastPeer::retry_resolve, this, _1));
417  }
418  start_send();
419 }
420 
421 void
422 ProtobufBroadcastPeer::retry_resolve(const boost::system::error_code &ec)
423 {
424  if (!ec)
425  start_resolve();
426 }
427 
428 void
429 ProtobufBroadcastPeer::start_resolve()
430 {
431  ip::udp::resolver::query query(send_to_address_, boost::lexical_cast<std::string>(send_to_port_));
432  resolver_.async_resolve(query,
433  boost::bind(&ProtobufBroadcastPeer::handle_resolve,
434  this,
435  boost::asio::placeholders::error,
436  boost::asio::placeholders::iterator));
437 }
438 
439 void
440 ProtobufBroadcastPeer::handle_recv(const boost::system::error_code &error, size_t bytes_rcvd)
441 {
442  const size_t expected_min_size = (frame_header_version_ == PB_FRAME_V1)
443  ? sizeof(frame_header_v1_t)
444  : (sizeof(frame_header_t) + sizeof(message_header_t));
445 
446  if (!error && bytes_rcvd >= expected_min_size) {
447  frame_header_t frame_header;
448  size_t header_size;
449  if (frame_header_version_ == PB_FRAME_V1) {
450  frame_header_v1_t *frame_header_v1 = static_cast<frame_header_v1_t *>(in_data_);
451  frame_header.header_version = PB_FRAME_V1;
452  frame_header.cipher = PB_ENCRYPTION_NONE;
453  frame_header.payload_size = frame_header_v1->payload_size;
454  header_size = sizeof(frame_header_v1_t);
455  } else {
456  memcpy(&frame_header, crypto_buf_ ? enc_in_data_ : in_data_, sizeof(frame_header_t));
457  header_size = sizeof(frame_header_t);
458 
459  if (crypto_buf_) {
460  sig_rcvd_raw_(in_endpoint_,
461  frame_header,
462  (unsigned char *)enc_in_data_ + sizeof(frame_header_t),
463  bytes_rcvd - sizeof(frame_header_t));
464  } else {
465  sig_rcvd_raw_(in_endpoint_,
466  frame_header,
467  (unsigned char *)in_data_ + sizeof(frame_header_t),
468  bytes_rcvd - sizeof(frame_header_t));
469  }
470 
471  if (sig_rcvd_.num_slots() > 0) {
472  if (!crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
473  sig_recv_error_(in_endpoint_, "Received encrypted message but encryption is disabled");
474  } else if (crypto_buf_ && (frame_header.cipher == PB_ENCRYPTION_NONE)) {
475  sig_recv_error_(in_endpoint_, "Received plain text message but encryption is enabled");
476  } else {
477  if (crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
478  // we need to decrypt first
479  try {
480  memcpy(in_data_, enc_in_data_, sizeof(frame_header_t));
481  size_t to_decrypt = bytes_rcvd - sizeof(frame_header_t);
482  bytes_rcvd =
483  crypto_dec_->decrypt(frame_header.cipher,
484  (unsigned char *)enc_in_data_ + sizeof(frame_header_t),
485  to_decrypt,
486  (unsigned char *)in_data_ + sizeof(frame_header_t),
487  in_data_size_);
488  frame_header.payload_size = htonl(bytes_rcvd);
489  bytes_rcvd += sizeof(frame_header_t);
490  } catch (std::runtime_error &e) {
491  sig_recv_error_(in_endpoint_, std::string("Decryption fail: ") + e.what());
492  bytes_rcvd = 0;
493  }
494  }
495  }
496  } // else nobody cares about deserialized message
497  }
498 
499  size_t payload_size = ntohl(frame_header.payload_size);
500 
501  if (sig_rcvd_.num_slots() > 0) {
502  if (bytes_rcvd == (header_size + payload_size)) {
503  if (!filter_self_
504  || !std::binary_search(local_endpoints_.begin(),
505  local_endpoints_.end(),
506  in_endpoint_)) {
507  void * data;
508  message_header_t message_header;
509 
510  if (frame_header_version_ == PB_FRAME_V1) {
511  frame_header_v1_t *frame_header_v1 = static_cast<frame_header_v1_t *>(in_data_);
512  message_header.component_id = frame_header_v1->component_id;
513  message_header.msg_type = frame_header_v1->msg_type;
514  data = (char *)in_data_ + sizeof(frame_header_v1_t);
515  // message register expects payload size to include message header
516  frame_header.payload_size =
517  htonl(ntohl(frame_header.payload_size) + sizeof(message_header_t));
518  } else {
519  message_header_t *msg_header =
520  static_cast<message_header_t *>((void *)((char *)in_data_ + sizeof(frame_header_t)));
521  message_header.component_id = msg_header->component_id;
522  message_header.msg_type = msg_header->msg_type;
523  data = (char *)in_data_ + sizeof(frame_header_t) + sizeof(message_header_t);
524  }
525 
526  uint16_t comp_id = ntohs(message_header.component_id);
527  uint16_t msg_type = ntohs(message_header.msg_type);
528 
529  try {
530  std::shared_ptr<google::protobuf::Message> m =
531  message_register_->deserialize(frame_header, message_header, data);
532 
533  sig_rcvd_(in_endpoint_, comp_id, msg_type, m);
534  } catch (std::runtime_error &e) {
535  sig_recv_error_(in_endpoint_, std::string("Deserialization fail: ") + e.what());
536  }
537  }
538  } else {
539  sig_recv_error_(in_endpoint_, "Invalid number of bytes received");
540  }
541  } // else nobody cares (no one registered to signal)
542 
543  } else {
544  sig_recv_error_(in_endpoint_, "General receiving error or truncated message");
545  }
546 
547  start_recv();
548 }
549 
550 void
551 ProtobufBroadcastPeer::handle_sent(const boost::system::error_code &error,
552  size_t bytes_transferred,
553  QueueEntry * entry)
554 {
555  delete entry;
556 
557  {
558  std::lock_guard<std::mutex> lock(outbound_mutex_);
559  outbound_active_ = false;
560  }
561 
562  if (error) {
563  sig_send_error_("Sending message failed");
564  }
565 
566  start_send();
567 }
568 
569 /** Send a message to other peers.
570  * @param component_id ID of the component to address
571  * @param msg_type numeric message type
572  * @param m message to send
573  */
574 void
575 ProtobufBroadcastPeer::send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
576 {
577  QueueEntry *entry = new QueueEntry();
578  message_register_->serialize(component_id,
579  msg_type,
580  m,
581  entry->frame_header,
582  entry->message_header,
583  entry->serialized_message);
584 
585  if (entry->serialized_message.size() > max_packet_length) {
586  throw std::runtime_error("Serialized message too big");
587  }
588 
589  if (frame_header_version_ == PB_FRAME_V1) {
593 
594  entry->buffers[0] = boost::asio::buffer(&entry->frame_header_v1, sizeof(frame_header_v1_t));
595  entry->buffers[1] = boost::asio::const_buffer();
596  } else {
597  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
598  entry->buffers[1] = boost::asio::buffer(&entry->message_header, sizeof(message_header_t));
599  }
600  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
601 
602  {
603  std::lock_guard<std::mutex> lock(outbound_mutex_);
604  outbound_queue_.push(entry);
605  }
606  start_send();
607 }
608 
609 /** Send a raw message.
610  * The message is sent as-is (frame_header appended by message data) over the wire.
611  * @param frame_header frame header to prepend, must be completely and properly
612  * setup.
613  * @param data data buffer, maybe encrypted (if indicated in frame header)
614  * @param data_size size in bytes of @p data
615  */
616 void
618  const void * data,
619  size_t data_size)
620 {
621  QueueEntry *entry = new QueueEntry();
622  entry->frame_header = frame_header;
623  entry->serialized_message = std::string(reinterpret_cast<const char *>(data), data_size);
624 
625  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
626  entry->buffers[1] = boost::asio::const_buffer();
627  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
628 
629  {
630  std::lock_guard<std::mutex> lock(outbound_mutex_);
631  outbound_queue_.push(entry);
632  }
633  start_send();
634 }
635 
636 /** Send a message to other peers.
637  * @param component_id ID of the component to address
638  * @param msg_type numeric message type
639  * @param m message to send
640  */
641 void
642 ProtobufBroadcastPeer::send(uint16_t component_id,
643  uint16_t msg_type,
644  std::shared_ptr<google::protobuf::Message> m)
645 {
646  send(component_id, msg_type, *m);
647 }
648 
649 /** Send a message to other peers.
650  * @param m Message to send, the message must have an CompType enum type to
651  * specify component ID and message type.
652  */
653 void
654 ProtobufBroadcastPeer::send(std::shared_ptr<google::protobuf::Message> m)
655 {
656  send(*m);
657 }
658 
659 /** Send a message to other peers.
660  * @param m Message to send, the message must have an CompType enum type to
661  * specify component ID and message type.
662  */
663 void
664 ProtobufBroadcastPeer::send(google::protobuf::Message &m)
665 {
666  const google::protobuf::Descriptor * desc = m.GetDescriptor();
667  const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName("CompType");
668  if (!enumdesc) {
669  throw std::logic_error("Message does not have CompType enum");
670  }
671  const google::protobuf::EnumValueDescriptor *compdesc = enumdesc->FindValueByName("COMP_ID");
672  const google::protobuf::EnumValueDescriptor *msgtdesc = enumdesc->FindValueByName("MSG_TYPE");
673  if (!compdesc || !msgtdesc) {
674  throw std::logic_error("Message CompType enum hs no COMP_ID or MSG_TYPE value");
675  }
676  int comp_id = compdesc->number();
677  int msg_type = msgtdesc->number();
678  if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
679  throw std::logic_error("Message has invalid COMP_ID");
680  }
681  if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
682  throw std::logic_error("Message has invalid MSG_TYPE");
683  }
684 
685  send(comp_id, msg_type, m);
686 }
687 
688 void
689 ProtobufBroadcastPeer::start_recv()
690 {
691  crypto_buf_ = crypto_;
692  socket_.async_receive_from(boost::asio::buffer(crypto_ ? enc_in_data_ : in_data_, in_data_size_),
693  in_endpoint_,
694  boost::bind(&ProtobufBroadcastPeer::handle_recv,
695  this,
696  boost::asio::placeholders::error,
697  boost::asio::placeholders::bytes_transferred));
698 }
699 
700 void
701 ProtobufBroadcastPeer::start_send()
702 {
703  std::lock_guard<std::mutex> lock(outbound_mutex_);
704  if (outbound_queue_.empty() || outbound_active_ || !outbound_ready_)
705  return;
706 
707  outbound_active_ = true;
708 
709  QueueEntry *entry = outbound_queue_.front();
710  outbound_queue_.pop();
711 
712  if (crypto_) {
713  size_t plain_size =
714  boost::asio::buffer_size(entry->buffers[1]) + boost::asio::buffer_size(entry->buffers[2]);
715  size_t enc_size = crypto_enc_->encrypted_buffer_size(plain_size);
716 
717  std::string plain_buf = std::string(plain_size, '\0');
718 
719  plain_buf.replace(0,
720  boost::asio::buffer_size(entry->buffers[1]),
721  boost::asio::buffer_cast<const char *>(entry->buffers[1]),
722  boost::asio::buffer_size(entry->buffers[1]));
723 
724  plain_buf.replace(boost::asio::buffer_size(entry->buffers[1]),
725  boost::asio::buffer_size(entry->buffers[2]),
726  boost::asio::buffer_cast<const char *>(entry->buffers[2]),
727  boost::asio::buffer_size(entry->buffers[2]));
728 
729  entry->encrypted_message.resize(enc_size);
730  crypto_enc_->encrypt(plain_buf, entry->encrypted_message);
731 
732  entry->frame_header.payload_size = htonl(entry->encrypted_message.size());
733  entry->frame_header.cipher = crypto_enc_->cipher_id();
734  entry->buffers[1] = boost::asio::buffer(entry->encrypted_message);
735  entry->buffers[2] = boost::asio::const_buffer();
736  }
737 
738  socket_.async_send_to(entry->buffers,
739  outbound_endpoint_,
740  boost::bind(&ProtobufBroadcastPeer::handle_sent,
741  this,
742  boost::asio::placeholders::error,
743  boost::asio::placeholders::bytes_transferred,
744  entry));
745 }
746 
747 } // end namespace protobuf_comm
protobuf_comm::MessageRegister::serialize
void serialize(uint16_t component_id, uint16_t msg_type, const google::protobuf::Message &msg, frame_header_t &frame_header, message_header_t &message_header, std::string &data)
Serialize a message.
Definition: message_register.cpp:274
protobuf_comm::QueueEntry::frame_header_v1
frame_header_v1_t frame_header_v1
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:64
protobuf_comm::frame_header_v1_t::payload_size
uint32_t payload_size
payload size in bytes
Definition: frame_header.h:125
protobuf_comm::QueueEntry::message_header
message_header_t message_header
Frame header (network byte order)
Definition: queue_entry.h:65
protobuf_comm::ProtobufBroadcastPeer::~ProtobufBroadcastPeer
~ProtobufBroadcastPeer()
Destructor.
Definition: peer.cpp:303
protobuf_comm::MessageRegister
Definition: message_register.h:64
protobuf_comm::frame_header_v1_t::msg_type
uint16_t msg_type
message type
Definition: frame_header.h:123
protobuf_comm::BufferEncryptor::encrypted_buffer_size
size_t encrypted_buffer_size(size_t plain_length)
Get required size for an encrypted buffer of the given plain text length.
Definition: crypto.cpp:151
protobuf_comm::QueueEntry
Outgoing queue entry.
Definition: queue_entry.h:49
protobuf_comm::ProtobufBroadcastPeer::max_packet_length
maximum packet length in bytes
Definition: peer.h:63
protobuf_comm::ProtobufBroadcastPeer::ProtobufBroadcastPeer
ProtobufBroadcastPeer(const std::string address, unsigned short port)
Constructor.
Definition: peer.cpp:57
protobuf_comm::QueueEntry::buffers
std::array< boost::asio::const_buffer, 3 > buffers
outgoing buffers
Definition: queue_entry.h:66
protobuf_comm::message_header_t
Network message header.
Definition: frame_header.h:100
protobuf_comm::ProtobufBroadcastPeer::send_raw
void send_raw(const frame_header_t &frame_header, const void *data, size_t data_size)
Send a raw message.
Definition: peer.cpp:616
protobuf_comm::frame_header_t
Network framing header.
Definition: frame_header.h:74
protobuf_comm::MessageRegister::deserialize
std::shared_ptr< google::protobuf::Message > deserialize(frame_header_t &frame_header, message_header_t &message_header, void *data)
Deserialize message.
Definition: message_register.cpp:314
protobuf_comm::ProtobufBroadcastPeer::set_filter_self
void set_filter_self(bool filter)
Set if to filter out own messages.
Definition: peer.cpp:384
protobuf_comm::message_header_t::component_id
uint16_t component_id
component id
Definition: frame_header.h:103
protobuf_comm::ProtobufBroadcastPeer::setup_crypto
void setup_crypto(const std::string &key, const std::string &cipher)
Setup encryption.
Definition: peer.cpp:331
protobuf_comm::BufferDecryptor
Definition: crypto.h:81
protobuf_comm::message_header_t::msg_type
uint16_t msg_type
message type
Definition: frame_header.h:105
protobuf_comm::frame_header_v1_t
Old network message framing header.
Definition: frame_header.h:118
protobuf_comm::QueueEntry::frame_header
frame_header_t frame_header
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:63
protobuf_comm::BufferDecryptor::decrypt
size_t decrypt(int cipher, const void *enc, size_t enc_size, void *plain, size_t plain_size)
Decrypt a buffer.
Definition: crypto.cpp:224
protobuf_comm::frame_header_t::payload_size
uint32_t payload_size
payload size in bytes includes message and header, not IV
Definition: frame_header.h:87
protobuf_comm::ProtobufBroadcastPeer::send
void send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to other peers.
Definition: peer.cpp:574
protobuf_comm::BufferEncryptor::encrypt
void encrypt(const std::string &plain, std::string &enc)
Encrypt a buffer.
Definition: crypto.cpp:100
protobuf_comm::QueueEntry::serialized_message
std::string serialized_message
serialized protobuf message
Definition: queue_entry.h:61
protobuf_comm::frame_header_v1_t::component_id
uint16_t component_id
component id
Definition: frame_header.h:121
protobuf_comm::BufferEncryptor::cipher_id
int cipher_id() const
Get cipher ID.
Definition: crypto.h:67
protobuf_comm::BufferEncryptor
Definition: crypto.h:52