23 #include <arpa/inet.h>
24 #include <blackboard/exceptions.h>
25 #include <blackboard/internal/instance_factory.h>
26 #include <blackboard/internal/notifier.h>
27 #include <blackboard/net/ilist_content.h>
28 #include <blackboard/net/interface_proxy.h>
29 #include <blackboard/net/messages.h>
30 #include <blackboard/remote.h>
31 #include <core/threading/mutex.h>
32 #include <core/threading/mutex_locker.h>
33 #include <core/threading/thread.h>
34 #include <core/threading/wait_condition.h>
35 #include <interface/interface_info.h>
36 #include <netcomm/fawkes/client.h>
37 #include <utils/time/time.h>
62 throw Exception(
"Cannot instantiate RemoteBlackBoard on unconnected client");
70 wait_mutex_ =
new Mutex();
73 inbound_thread_ = NULL;
85 fnc_ =
new FawkesNetworkClient(hostname, port);
96 throw Exception(
"Cannot instantiate RemoteBlackBoard on unconnected client");
101 mutex_ =
new Mutex();
102 instance_factory_ =
new BlackBoardInstanceFactory();
104 wait_mutex_ =
new Mutex();
105 wait_cond_ =
new WaitCondition(wait_mutex_);
107 inbound_thread_ = NULL;
116 delete instance_factory_;
118 for (pit_ = proxies_.begin(); pit_ != proxies_.end(); ++pit_) {
138 RemoteBlackBoard::reopen_interfaces()
141 ipit_ = invalid_proxies_.begin();
142 while (ipit_ != invalid_proxies_.end()) {
144 Interface *iface = (*ipit_)->interface();
147 ipit_ = invalid_proxies_.erase(ipit_);
173 RemoteBlackBoard::open_interface(
const char *type,
174 const char *identifier,
180 throw Exception(
"Cannot instantiate remote interface, connection is dead");
186 throw Exception(
"Cannot call open_interface() from inbound handler");
190 bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1,
sizeof(bb_iopen_msg_t));
191 strncpy(om->type, type, INTERFACE_TYPE_SIZE_ - 1);
192 strncpy(om->id, identifier, INTERFACE_ID_SIZE_ - 1);
193 memcpy(om->hash, iface->hash(), INTERFACE_HASH_SIZE_);
195 FawkesNetworkMessage *omsg =
196 new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
197 writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
199 sizeof(bb_iopen_msg_t));
205 && (!m_ || ((m_->
msgid() != MSG_BB_OPEN_SUCCESS) && (m_->
msgid() != MSG_BB_OPEN_FAILURE)))) {
215 throw Exception(
"Connection died while trying to open %s::%s", type, identifier);
218 if (m_->
msgid() == MSG_BB_OPEN_SUCCESS) {
220 BlackBoardInterfaceProxy *proxy =
221 new BlackBoardInterfaceProxy(fnc_, m_,
notifier_, iface, writer);
222 proxies_[proxy->serial()] = proxy;
223 }
else if (m_->
msgid() == MSG_BB_OPEN_FAILURE) {
224 bb_iopenfail_msg_t *fm = m_->
msg<bb_iopenfail_msg_t>();
225 unsigned int error = ntohl(fm->error_code);
229 throw BlackBoardWriterActiveException(identifier, type);
231 throw Exception(
"Hash mismatch for interface %s:%s", type, identifier);
233 throw Exception(
"Type %s unknown (%s::%s)", type, type, identifier);
235 throw BlackBoardWriterActiveException(identifier, type);
237 throw Exception(
"Could not open interface");
246 RemoteBlackBoard::open_interface(
const char *type,
247 const char *identifier,
252 throw Exception(
"Cannot instantiate remote interface, connection is dead");
257 open_interface(type, identifier, owner, writer, iface);
258 }
catch (Exception &e) {
269 return open_interface(type, identifier, owner,
false);
275 return open_interface(type, identifier, owner,
true);
278 std::list<Interface *>
280 const char *id_pattern,
283 std::list<Interface *> rv;
286 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
288 char type[INTERFACE_TYPE_SIZE_ + 1];
289 char id[INTERFACE_ID_SIZE_ + 1];
290 type[INTERFACE_TYPE_SIZE_] = 0;
291 id[INTERFACE_TYPE_SIZE_] = 0;
292 strncpy(type, i->type(), INTERFACE_TYPE_SIZE_);
293 strncpy(
id, i->id(), INTERFACE_ID_SIZE_);
295 if ((fnmatch(type_pattern, type, 0) == FNM_NOMATCH)
296 || (fnmatch(id_pattern,
id, 0) == FNM_NOMATCH)) {
305 for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
321 if (interface == NULL)
324 unsigned int serial = interface->
serial();
326 if (proxies_.find(serial) != proxies_.end()) {
327 delete proxies_[serial];
328 proxies_.erase(serial);
349 throw Exception(
"Cannot call list_all() from inbound handler");
358 while (!m_ || (m_->
msgid() != MSG_BB_INTERFACE_LIST)) {
367 BlackBoardInterfaceListContent *bbilc = m_->
msgc<BlackBoardInterfaceListContent>();
368 while (bbilc->has_next()) {
370 bb_iinfo_msg_t *ii = bbilc->next(&iisize);
371 bool has_writer = ii->writer_readers & htonl(0x80000000);
372 unsigned int num_readers = ntohl(ii->writer_readers & htonl(0x7FFFFFFF));
379 std::list<std::string>(),
395 throw Exception(
"Cannot call list() from inbound handler");
402 strncpy(om->
type_pattern, type_pattern, INTERFACE_TYPE_SIZE_ - 1);
403 strncpy(om->
id_pattern, id_pattern, INTERFACE_ID_SIZE_ - 1);
410 while (!m_ || (m_->
msgid() != MSG_BB_INTERFACE_LIST)) {
419 BlackBoardInterfaceListContent *bbilc = m_->
msgc<BlackBoardInterfaceListContent>();
420 while (bbilc->has_next()) {
422 bb_iinfo_msg_t *ii = bbilc->next(&iisize);
423 bool has_writer = ii->writer_readers & htonl(0x80000000);
424 unsigned int num_readers = ntohl(ii->writer_readers & htonl(0x7FFFFFFF));
431 std::list<std::string>(),
458 if (m->cid() == FAWKES_CID_BLACKBOARD) {
459 unsigned int msgid = m->msgid();
461 if (msgid == MSG_BB_DATA_CHANGED) {
462 unsigned int serial = ntohl(((
unsigned int *)m->payload())[0]);
463 if (proxies_.find(serial) != proxies_.end()) {
464 proxies_[serial]->process_data_changed(m);
466 }
else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
467 unsigned int serial = ntohl(((
unsigned int *)m->payload())[0]);
468 if (proxies_.find(serial) != proxies_.end()) {
469 proxies_[serial]->process_interface_message(m);
471 }
else if (msgid == MSG_BB_READER_ADDED) {
473 if (proxies_.find(ntohl(esm->
serial)) != proxies_.end()) {
476 }
else if (msgid == MSG_BB_READER_REMOVED) {
478 if (proxies_.find(ntohl(esm->
serial)) != proxies_.end()) {
481 }
else if (msgid == MSG_BB_WRITER_ADDED) {
483 if (proxies_.find(ntohl(esm->
serial)) != proxies_.end()) {
486 }
else if (msgid == MSG_BB_WRITER_REMOVED) {
488 if (proxies_.find(ntohl(esm->
serial)) != proxies_.end()) {
491 }
else if (msgid == MSG_BB_INTERFACE_CREATED) {
493 notifier_->notify_of_interface_created(em->
type, em->
id);
494 }
else if (msgid == MSG_BB_INTERFACE_DESTROYED) {
496 notifier_->notify_of_interface_destroyed(em->
type, em->
id);
501 wait_cond_->wake_all();
502 wait_mutex_->unlock();
510 inbound_thread_ = NULL;
519 for (pit_ = proxies_.begin(); pit_ != proxies_.end(); ++pit_) {
520 pit_->second->interface()->set_validity(
false);
521 invalid_proxies_.push_back(pit_->second);
525 wait_cond_->wake_all();