24 #include <blackboard/blackboard.h>
25 #include <blackboard/interface_listener.h>
26 #include <blackboard/interface_observer.h>
27 #include <blackboard/internal/notifier.h>
28 #include <core/threading/mutex.h>
29 #include <core/threading/mutex_locker.h>
30 #include <core/utils/lock_hashmap.h>
31 #include <core/utils/lock_hashset.h>
32 #include <interface/interface.h>
33 #include <logging/liblogger.h>
54 bbil_writer_events_ = 0;
55 bbil_writer_mutex_ =
new Mutex();
57 bbil_reader_events_ = 0;
58 bbil_reader_mutex_ =
new Mutex();
60 bbil_data_events_ = 0;
61 bbil_data_mutex_ =
new Mutex();
63 bbil_messages_events_ = 0;
64 bbil_messages_mutex_ =
new Mutex();
67 bbio_mutex_ =
new Mutex();
73 delete bbil_writer_mutex_;
74 delete bbil_reader_mutex_;
75 delete bbil_data_mutex_;
76 delete bbil_messages_mutex_;
104 BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
106 for (i = queue.begin(); i != queue.end(); ++i) {
110 proc_listener_maybe_queue(i->op,
122 proc_listener_maybe_queue(i->op,
125 bbil_messages_mutex_,
126 bbil_messages_events_,
128 bbil_messages_queue_,
134 proc_listener_maybe_queue(i->op,
146 proc_listener_maybe_queue(i->op,
160 listener->bbil_release_queue(flag);
164 BlackBoardNotifier::proc_listener_maybe_queue(
bool op,
165 Interface * interface,
166 BlackBoardInterfaceListener *listener,
168 unsigned int & events,
173 MutexLocker lock(mutex);
177 "listener %s for %s events (queued)",
178 op ?
"Registering" :
"Unregistering",
179 listener->bbil_name(),
182 queue_listener(op, interface, listener, queue);
185 add_listener(interface, listener, map);
187 remove_listener(interface, listener, map);
200 const BlackBoardInterfaceListener::InterfaceMaps maps = listener->bbil_acquire_maps();
202 BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
203 for (i = maps.data.begin(); i != maps.data.end(); ++i) {
204 proc_listener_maybe_queue(
false,
214 for (i = maps.messages.begin(); i != maps.messages.end(); ++i) {
215 proc_listener_maybe_queue(
false,
218 bbil_messages_mutex_,
219 bbil_messages_events_,
221 bbil_messages_queue_,
225 for (i = maps.reader.begin(); i != maps.reader.end(); ++i) {
226 proc_listener_maybe_queue(
false,
236 for (i = maps.writer.begin(); i != maps.writer.end(); ++i) {
237 proc_listener_maybe_queue(
false,
247 listener->bbil_release_maps();
256 BlackBoardNotifier::add_listener(Interface * interface,
257 BlackBoardInterfaceListener *listener,
260 std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
262 BBilMap::value_type v = std::make_pair(interface->uid(), listener);
263 BBilMap::iterator f = std::find(ret.first, ret.second, v);
265 if (f == ret.second) {
266 ilmap.insert(std::make_pair(interface->uid(), listener));
271 BlackBoardNotifier::remove_listener(Interface * interface,
272 BlackBoardInterfaceListener *listener,
275 std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
276 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
277 if (j->second == listener) {
285 BlackBoardNotifier::is_in_queue(
bool op,
288 BlackBoardInterfaceListener *bbil)
290 BBilQueue::iterator q;
291 for (q = queue.begin(); q != queue.end(); ++q) {
292 if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
300 BlackBoardNotifier::queue_listener(
bool op,
301 Interface * interface,
302 BlackBoardInterfaceListener *listener,
305 BBilQueueEntry qe = {op, interface->uid(), interface, listener};
316 if (bbio_events_ > 0) {
317 bbio_queue_.push_back(std::make_pair(1, observer));
319 add_observer(observer, observer->bbio_get_observed_create(), bbio_created_);
320 add_observer(observer, observer->bbio_get_observed_destroy(), bbio_destroyed_);
326 BlackBoardNotifier::add_observer(BlackBoardInterfaceObserver * observer,
332 for (i = its->begin(); i != its->end(); ++i) {
333 bbiomap[i->first].push_back(make_pair(observer, i->second));
343 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
345 BBioMapIterator i, tmp;
348 while (i != iomap.end()) {
349 BBioListIterator j = i->second.begin();
350 while (j != i->second.end()) {
351 if (j->first == observer) {
352 j = i->second.erase(j);
357 if (i->second.empty()) {
375 MutexLocker lock(bbio_mutex_);
376 if (bbio_events_ > 0) {
377 BBioQueueEntry e = std::make_pair((
unsigned int)0, observer);
378 BBioQueue::iterator re;
379 while ((re = find_if(bbio_queue_.begin(),
381 bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
382 != bbio_queue_.end()) {
384 if (re->second == observer) {
385 bbio_queue_.erase(re);
388 bbio_queue_.push_back(std::make_pair(0, observer));
391 remove_observer(bbio_created_, observer);
392 remove_observer(bbio_destroyed_, observer);
405 bbio_mutex_->unlock();
407 BBioMapIterator lhmi;
408 BBioListIterator i, l;
409 for (lhmi = bbio_created_.begin(); lhmi != bbio_created_.end(); ++lhmi) {
410 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
413 BBioList &list = lhmi->second;
414 for (i = list.begin(); i != list.end(); ++i) {
416 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
417 if (fnmatch(pi->c_str(), id, 0) == 0) {
427 process_bbio_queue();
428 bbio_mutex_->unlock();
440 bbio_mutex_->unlock();
442 BBioMapIterator lhmi;
443 BBioListIterator i, l;
444 for (lhmi = bbio_destroyed_.begin(); lhmi != bbio_destroyed_.end(); ++lhmi) {
445 if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
448 BBioList &list = (*lhmi).second;
449 for (i = list.begin(); i != list.end(); ++i) {
451 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
452 if (fnmatch(pi->c_str(), id, 0) == 0) {
462 process_bbio_queue();
463 bbio_mutex_->unlock();
467 BlackBoardNotifier::process_bbio_queue()
469 if (!bbio_queue_.empty()) {
470 if (bbio_events_ > 0) {
473 while (!bbio_queue_.empty()) {
474 BBioQueueEntry &e = bbio_queue_.front();
476 add_observer(e.second, e.second->bbio_get_observed_create(), bbio_created_);
477 add_observer(e.second, e.second->bbio_get_observed_destroy(), bbio_destroyed_);
479 remove_observer(bbio_created_, e.second);
480 remove_observer(bbio_destroyed_, e.second);
482 bbio_queue_.pop_front();
496 unsigned int event_instance_serial)
throw()
498 bbil_writer_mutex_->lock();
499 bbil_writer_events_ += 1;
500 bbil_writer_mutex_->unlock();
502 const char * uid = interface->uid();
503 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
504 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
506 if (!is_in_queue(
false, bbil_writer_queue_, uid, bbil)) {
508 if (bbil_iface != NULL) {
512 "BBIL[%s] registered for writer events "
513 "(open) for '%s' but has no such interface",
520 bbil_writer_mutex_->lock();
521 bbil_writer_events_ -= 1;
522 process_writer_queue();
523 bbil_writer_mutex_->unlock();
533 unsigned int event_instance_serial)
throw()
535 bbil_writer_mutex_->lock();
536 bbil_writer_events_ += 1;
537 bbil_writer_mutex_->unlock();
539 const char * uid = interface->uid();
540 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
541 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
543 if (!is_in_queue(
false, bbil_data_queue_, uid, bbil)) {
545 if (bbil_iface != NULL) {
549 "BBIL[%s] registered for writer events "
550 "(close) for '%s' but has no such interface",
557 bbil_writer_mutex_->lock();
558 bbil_writer_events_ -= 1;
559 process_writer_queue();
560 bbil_writer_mutex_->unlock();
564 BlackBoardNotifier::process_writer_queue()
566 if (!bbil_writer_queue_.empty()) {
567 if (bbil_writer_events_ > 0) {
570 while (!bbil_writer_queue_.empty()) {
571 BBilQueueEntry &e = bbil_writer_queue_.front();
573 add_listener(e.interface, e.listener, bbil_writer_);
575 remove_listener(e.interface, e.listener, bbil_writer_);
577 bbil_writer_queue_.pop_front();
590 unsigned int event_instance_serial)
throw()
592 bbil_reader_mutex_->lock();
593 bbil_reader_events_ += 1;
594 bbil_reader_mutex_->unlock();
596 const char * uid = interface->uid();
597 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
598 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
600 if (!is_in_queue(
false, bbil_reader_queue_, uid, bbil)) {
602 if (bbil_iface != NULL) {
606 "BBIL[%s] registered for reader events "
607 "(open) for '%s' but has no such interface",
614 bbil_reader_mutex_->lock();
615 bbil_reader_events_ -= 1;
616 process_reader_queue();
617 bbil_reader_mutex_->unlock();
627 unsigned int event_instance_serial)
throw()
629 bbil_reader_mutex_->lock();
630 bbil_reader_events_ += 1;
631 bbil_reader_mutex_->unlock();
633 const char * uid = interface->uid();
634 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
635 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
637 if (!is_in_queue(
false, bbil_data_queue_, uid, bbil)) {
639 if (bbil_iface != NULL) {
643 "BBIL[%s] registered for reader events "
644 "(close) for '%s' but has no such interface",
651 bbil_reader_mutex_->lock();
652 bbil_reader_events_ -= 1;
653 process_reader_queue();
654 bbil_reader_mutex_->unlock();
658 BlackBoardNotifier::process_reader_queue()
660 if (!bbil_reader_queue_.empty()) {
661 if (bbil_reader_events_ > 0) {
664 while (!bbil_reader_queue_.empty()) {
665 BBilQueueEntry &e = bbil_reader_queue_.front();
667 add_listener(e.interface, e.listener, bbil_reader_);
669 remove_listener(e.interface, e.listener, bbil_reader_);
671 bbil_reader_queue_.pop_front();
689 bbil_data_mutex_->
lock();
690 bbil_data_events_ += 1;
693 const char * uid = interface->uid();
694 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_data_.equal_range(uid);
695 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
697 if (!is_in_queue(
false, bbil_data_queue_, uid, bbil)) {
699 if (bbil_iface != NULL) {
703 "BBIL[%s] registered for data change events "
704 "for '%s' but has no such interface",
711 bbil_data_mutex_->
lock();
712 bbil_data_events_ -= 1;
713 if (!bbil_data_queue_.empty()) {
714 if (bbil_data_events_ == 0) {
715 while (!bbil_data_queue_.empty()) {
716 BBilQueueEntry &e = bbil_data_queue_.front();
718 add_listener(e.interface, e.listener, bbil_data_);
720 remove_listener(e.interface, e.listener, bbil_data_);
722 bbil_data_queue_.pop_front();
726 bbil_data_mutex_->
unlock();
742 bbil_messages_mutex_->
lock();
743 bbil_messages_events_ += 1;
748 const char * uid = interface->uid();
749 std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_messages_.equal_range(uid);
750 for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
752 if (!is_in_queue(
false, bbil_messages_queue_, uid, bbil)) {
754 if (bbil_iface != NULL) {
762 "BBIL[%s] registered for message events "
763 "for '%s' but has no such interface",
770 bbil_messages_mutex_->
lock();
771 bbil_messages_events_ -= 1;
772 if (!bbil_messages_queue_.empty()) {
773 if (bbil_messages_events_ == 0) {
774 while (!bbil_messages_queue_.empty()) {
775 BBilQueueEntry &e = bbil_messages_queue_.front();
777 add_listener(e.interface, e.listener, bbil_messages_);
779 remove_listener(e.interface, e.listener, bbil_messages_);
781 bbil_messages_queue_.pop_front();
785 bbil_messages_mutex_->
unlock();