36 #include <protobuf_comm/crypto.h>
37 #include <protobuf_comm/peer.h>
39 #include <boost/lexical_cast.hpp>
42 using namespace boost::asio;
43 using namespace boost::system;
45 namespace protobuf_comm {
58 ProtobufBroadcastPeer::ProtobufBroadcastPeer(
const std::string address,
unsigned short port)
60 resolver_(io_service_),
61 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
62 resolve_retry_timer_(io_service_)
65 own_message_register_ =
true;
78 unsigned short send_to_port,
79 unsigned short recv_on_port)
81 resolver_(io_service_),
82 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
83 resolve_retry_timer_(io_service_)
86 own_message_register_ =
true;
87 ctor(address, send_to_port);
97 std::vector<std::string> &proto_path)
99 resolver_(io_service_),
100 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
101 resolve_retry_timer_(io_service_)
104 own_message_register_ =
true;
118 unsigned short send_to_port,
119 unsigned short recv_on_port,
120 std::vector<std::string> &proto_path)
122 resolver_(io_service_),
123 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
124 resolve_retry_timer_(io_service_)
127 own_message_register_ =
true;
128 ctor(address, send_to_port);
140 resolver_(io_service_),
141 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
142 resolve_retry_timer_(io_service_),
143 message_register_(mr),
144 own_message_register_(false)
157 unsigned short send_to_port,
158 unsigned short recv_on_port,
159 const std::string crypto_key,
160 const std::string cipher)
162 resolver_(io_service_),
163 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
164 resolve_retry_timer_(io_service_)
166 ctor(address, send_to_port, crypto_key, cipher);
168 own_message_register_ =
true;
180 unsigned short send_to_port,
181 unsigned short recv_on_port,
183 const std::string crypto_key,
184 const std::string cipher)
186 resolver_(io_service_),
187 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
188 resolve_retry_timer_(io_service_),
189 message_register_(mr),
190 own_message_register_(false)
192 ctor(address, send_to_port, crypto_key, cipher);
203 const std::string crypto_key,
204 const std::string cipher)
206 resolver_(io_service_),
207 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
208 resolve_retry_timer_(io_service_)
210 ctor(address, port, crypto_key, cipher);
212 own_message_register_ =
true;
225 const std::string crypto_key,
226 const std::string cipher)
228 resolver_(io_service_),
229 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
230 resolve_retry_timer_(io_service_),
231 message_register_(mr),
232 own_message_register_(false)
234 ctor(address, port, crypto_key, cipher);
248 unsigned short send_to_port,
249 unsigned short recv_on_port,
251 frame_header_version_t header_version)
253 resolver_(io_service_),
254 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
255 resolve_retry_timer_(io_service_),
256 message_register_(mr),
257 own_message_register_(false)
259 ctor(address, send_to_port,
"",
"", header_version);
270 ProtobufBroadcastPeer::ctor(
const std::string & address,
271 unsigned int send_to_port,
272 const std::string crypto_key,
273 const std::string cipher,
274 frame_header_version_t header_version)
280 frame_header_version_ = header_version;
282 send_to_address_ = address;
283 send_to_port_ = send_to_port;
286 in_data_ = malloc(in_data_size_);
289 socket_.set_option(socket_base::broadcast(
true));
290 socket_.set_option(socket_base::reuse_address(
true));
291 determine_local_endpoints();
293 outbound_ready_ = outbound_active_ =
false;
296 if (!crypto_key.empty())
300 asio_thread_ = std::thread(&ProtobufBroadcastPeer::run_asio,
this);
306 resolve_retry_timer_.cancel();
307 if (asio_thread_.joinable()) {
314 if (own_message_register_) {
315 delete message_register_;
334 if (frame_header_version_ == PB_FRAME_V1) {
335 throw std::runtime_error(
"Crypto support only available with V2+ frame header");
345 if (key !=
"" && cipher !=
"") {
350 enc_in_data_size_ = 2 * in_data_size_;
351 enc_in_data_ = malloc(enc_in_data_size_);
361 ProtobufBroadcastPeer::determine_local_endpoints()
363 struct ifaddrs *ifap;
364 if (getifaddrs(&ifap) == 0) {
365 for (
struct ifaddrs *iter = ifap; iter != NULL; iter = iter->ifa_next) {
366 if (iter->ifa_addr == NULL)
368 if (iter->ifa_addr->sa_family == AF_INET) {
369 boost::asio::ip::address_v4 addr(
370 ntohl(reinterpret_cast<sockaddr_in *>(iter->ifa_addr)->sin_addr.s_addr));
372 local_endpoints_.push_back(
373 boost::asio::ip::udp::endpoint(addr, socket_.local_endpoint().port()));
378 local_endpoints_.sort();
387 filter_self_ = filter;
392 ProtobufBroadcastPeer::run_asio()
394 #if BOOST_ASIO_VERSION > 100409
395 while (!io_service_.stopped()) {
400 #if BOOST_ASIO_VERSION > 100409
406 ProtobufBroadcastPeer::handle_resolve(
const boost::system::error_code &err,
407 ip::udp::resolver::iterator endpoint_iterator)
410 std::lock_guard<std::mutex> lock(outbound_mutex_);
411 outbound_ready_ =
true;
412 outbound_endpoint_ = endpoint_iterator->endpoint();
414 sig_send_error_(
"Resolving endpoint failed, retrying");
415 resolve_retry_timer_.expires_from_now(boost::posix_time::seconds(2));
416 resolve_retry_timer_.async_wait(boost::bind(&ProtobufBroadcastPeer::retry_resolve,
this, _1));
422 ProtobufBroadcastPeer::retry_resolve(
const boost::system::error_code &ec)
429 ProtobufBroadcastPeer::start_resolve()
431 ip::udp::resolver::query query(send_to_address_, boost::lexical_cast<std::string>(send_to_port_));
432 resolver_.async_resolve(query,
433 boost::bind(&ProtobufBroadcastPeer::handle_resolve,
435 boost::asio::placeholders::error,
436 boost::asio::placeholders::iterator));
440 ProtobufBroadcastPeer::handle_recv(
const boost::system::error_code &error,
size_t bytes_rcvd)
442 const size_t expected_min_size = (frame_header_version_ == PB_FRAME_V1)
443 ?
sizeof(frame_header_v1_t)
444 : (
sizeof(frame_header_t) +
sizeof(message_header_t));
446 if (!error && bytes_rcvd >= expected_min_size) {
447 frame_header_t frame_header;
449 if (frame_header_version_ == PB_FRAME_V1) {
450 frame_header_v1_t *frame_header_v1 = static_cast<frame_header_v1_t *>(in_data_);
451 frame_header.header_version = PB_FRAME_V1;
452 frame_header.cipher = PB_ENCRYPTION_NONE;
453 frame_header.payload_size = frame_header_v1->payload_size;
454 header_size =
sizeof(frame_header_v1_t);
456 memcpy(&frame_header, crypto_buf_ ? enc_in_data_ : in_data_,
sizeof(frame_header_t));
457 header_size =
sizeof(frame_header_t);
460 sig_rcvd_raw_(in_endpoint_,
462 (
unsigned char *)enc_in_data_ +
sizeof(frame_header_t),
463 bytes_rcvd -
sizeof(frame_header_t));
465 sig_rcvd_raw_(in_endpoint_,
467 (
unsigned char *)in_data_ +
sizeof(frame_header_t),
468 bytes_rcvd -
sizeof(frame_header_t));
471 if (sig_rcvd_.num_slots() > 0) {
472 if (!crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
473 sig_recv_error_(in_endpoint_,
"Received encrypted message but encryption is disabled");
474 }
else if (crypto_buf_ && (frame_header.cipher == PB_ENCRYPTION_NONE)) {
475 sig_recv_error_(in_endpoint_,
"Received plain text message but encryption is enabled");
477 if (crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
480 memcpy(in_data_, enc_in_data_,
sizeof(frame_header_t));
481 size_t to_decrypt = bytes_rcvd -
sizeof(frame_header_t);
483 crypto_dec_->
decrypt(frame_header.cipher,
484 (
unsigned char *)enc_in_data_ +
sizeof(frame_header_t),
486 (
unsigned char *)in_data_ +
sizeof(frame_header_t),
488 frame_header.payload_size = htonl(bytes_rcvd);
489 bytes_rcvd +=
sizeof(frame_header_t);
490 }
catch (std::runtime_error &e) {
491 sig_recv_error_(in_endpoint_, std::string(
"Decryption fail: ") + e.what());
499 size_t payload_size = ntohl(frame_header.payload_size);
501 if (sig_rcvd_.num_slots() > 0) {
502 if (bytes_rcvd == (header_size + payload_size)) {
504 || !std::binary_search(local_endpoints_.begin(),
505 local_endpoints_.end(),
508 message_header_t message_header;
510 if (frame_header_version_ == PB_FRAME_V1) {
511 frame_header_v1_t *frame_header_v1 = static_cast<frame_header_v1_t *>(in_data_);
512 message_header.component_id = frame_header_v1->component_id;
513 message_header.msg_type = frame_header_v1->msg_type;
514 data = (
char *)in_data_ +
sizeof(frame_header_v1_t);
516 frame_header.payload_size =
517 htonl(ntohl(frame_header.payload_size) +
sizeof(message_header_t));
519 message_header_t *msg_header =
520 static_cast<message_header_t *>((
void *)((
char *)in_data_ +
sizeof(frame_header_t)));
521 message_header.component_id = msg_header->component_id;
522 message_header.msg_type = msg_header->msg_type;
523 data = (
char *)in_data_ +
sizeof(frame_header_t) +
sizeof(message_header_t);
526 uint16_t comp_id = ntohs(message_header.component_id);
527 uint16_t msg_type = ntohs(message_header.msg_type);
530 std::shared_ptr<google::protobuf::Message> m =
531 message_register_->
deserialize(frame_header, message_header, data);
533 sig_rcvd_(in_endpoint_, comp_id, msg_type, m);
534 }
catch (std::runtime_error &e) {
535 sig_recv_error_(in_endpoint_, std::string(
"Deserialization fail: ") + e.what());
539 sig_recv_error_(in_endpoint_,
"Invalid number of bytes received");
544 sig_recv_error_(in_endpoint_,
"General receiving error or truncated message");
551 ProtobufBroadcastPeer::handle_sent(
const boost::system::error_code &error,
552 size_t bytes_transferred,
558 std::lock_guard<std::mutex> lock(outbound_mutex_);
559 outbound_active_ =
false;
563 sig_send_error_(
"Sending message failed");
578 message_register_->
serialize(component_id,
586 throw std::runtime_error(
"Serialized message too big");
589 if (frame_header_version_ == PB_FRAME_V1) {
595 entry->
buffers[1] = boost::asio::const_buffer();
603 std::lock_guard<std::mutex> lock(outbound_mutex_);
604 outbound_queue_.push(entry);
623 entry->
serialized_message = std::string(reinterpret_cast<const char *>(data), data_size);
626 entry->
buffers[1] = boost::asio::const_buffer();
630 std::lock_guard<std::mutex> lock(outbound_mutex_);
631 outbound_queue_.push(entry);
644 std::shared_ptr<google::protobuf::Message> m)
646 send(component_id, msg_type, *m);
666 const google::protobuf::Descriptor * desc = m.GetDescriptor();
667 const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName(
"CompType");
669 throw std::logic_error(
"Message does not have CompType enum");
671 const google::protobuf::EnumValueDescriptor *compdesc = enumdesc->FindValueByName(
"COMP_ID");
672 const google::protobuf::EnumValueDescriptor *msgtdesc = enumdesc->FindValueByName(
"MSG_TYPE");
673 if (!compdesc || !msgtdesc) {
674 throw std::logic_error(
"Message CompType enum hs no COMP_ID or MSG_TYPE value");
676 int comp_id = compdesc->number();
677 int msg_type = msgtdesc->number();
678 if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
679 throw std::logic_error(
"Message has invalid COMP_ID");
681 if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
682 throw std::logic_error(
"Message has invalid MSG_TYPE");
685 send(comp_id, msg_type, m);
689 ProtobufBroadcastPeer::start_recv()
691 crypto_buf_ = crypto_;
692 socket_.async_receive_from(boost::asio::buffer(crypto_ ? enc_in_data_ : in_data_, in_data_size_),
694 boost::bind(&ProtobufBroadcastPeer::handle_recv,
696 boost::asio::placeholders::error,
697 boost::asio::placeholders::bytes_transferred));
701 ProtobufBroadcastPeer::start_send()
703 std::lock_guard<std::mutex> lock(outbound_mutex_);
704 if (outbound_queue_.empty() || outbound_active_ || !outbound_ready_)
707 outbound_active_ =
true;
709 QueueEntry *entry = outbound_queue_.front();
710 outbound_queue_.pop();
714 boost::asio::buffer_size(entry->buffers[1]) + boost::asio::buffer_size(entry->buffers[2]);
717 std::string plain_buf = std::string(plain_size,
'\0');
720 boost::asio::buffer_size(entry->buffers[1]),
721 boost::asio::buffer_cast<const char *>(entry->buffers[1]),
722 boost::asio::buffer_size(entry->buffers[1]));
724 plain_buf.replace(boost::asio::buffer_size(entry->buffers[1]),
725 boost::asio::buffer_size(entry->buffers[2]),
726 boost::asio::buffer_cast<const char *>(entry->buffers[2]),
727 boost::asio::buffer_size(entry->buffers[2]));
729 entry->encrypted_message.resize(enc_size);
730 crypto_enc_->
encrypt(plain_buf, entry->encrypted_message);
732 entry->frame_header.payload_size = htonl(entry->encrypted_message.size());
733 entry->frame_header.cipher = crypto_enc_->
cipher_id();
734 entry->buffers[1] = boost::asio::buffer(entry->encrypted_message);
735 entry->buffers[2] = boost::asio::const_buffer();
738 socket_.async_send_to(entry->buffers,
740 boost::bind(&ProtobufBroadcastPeer::handle_sent,
742 boost::asio::placeholders::error,
743 boost::asio::placeholders::bytes_transferred,