Fawkes API  Fawkes Development Version
notifier.cpp
1 
2 /***************************************************************************
3  * notifier.cpp - BlackBoard notifier
4  *
5  * Created: Mon Mar 03 23:28:18 2008
6  * Copyright 2006-2008 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <blackboard/blackboard.h>
25 #include <blackboard/interface_listener.h>
26 #include <blackboard/interface_observer.h>
27 #include <blackboard/internal/notifier.h>
28 #include <core/threading/mutex.h>
29 #include <core/threading/mutex_locker.h>
30 #include <core/utils/lock_hashmap.h>
31 #include <core/utils/lock_hashset.h>
32 #include <interface/interface.h>
33 #include <logging/liblogger.h>
34 
35 #include <algorithm>
36 #include <cstdlib>
37 #include <cstring>
38 #include <fnmatch.h>
39 #include <functional>
40 
41 namespace fawkes {
42 
43 /** @class BlackBoardNotifier <blackboard/internal/notifier.h>
44  * BlackBoard notifier.
45  * This class is used by the BlackBoard to notify listeners and observers
46  * of changes.
47  *
48  * @author Tim Niemueller
49  */
50 
51 /** Constructor. */
53 {
54  bbil_writer_events_ = 0;
55  bbil_writer_mutex_ = new Mutex();
56 
57  bbil_reader_events_ = 0;
58  bbil_reader_mutex_ = new Mutex();
59 
60  bbil_data_events_ = 0;
61  bbil_data_mutex_ = new Mutex();
62 
63  bbil_messages_events_ = 0;
64  bbil_messages_mutex_ = new Mutex();
65 
66  bbio_events_ = 0;
67  bbio_mutex_ = new Mutex();
68 }
69 
70 /** Destructor */
72 {
73  delete bbil_writer_mutex_;
74  delete bbil_reader_mutex_;
75  delete bbil_data_mutex_;
76  delete bbil_messages_mutex_;
77 
78  delete bbio_mutex_;
79 }
80 
81 /** Register BB event listener.
82  * @param listener BlackBoard event listener to register
83  * @param flag concatenation of flags denoting which queue entries should be
84  * processed
85  */
86 void
89 {
90  update_listener(listener, flag);
91 }
92 
93 /** Update BB event listener.
94  * @param listener BlackBoard event listener to update subscriptions of
95  * @param flag concatenation of flags denoting which queue entries should be
96  * processed
97  */
98 void
101 {
102  const BlackBoardInterfaceListener::InterfaceQueue &queue = listener->bbil_acquire_queue();
103 
104  BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
105 
106  for (i = queue.begin(); i != queue.end(); ++i) {
107  switch (i->type) {
109  if (flag & BlackBoard::BBIL_FLAG_DATA) {
110  proc_listener_maybe_queue(i->op,
111  i->interface,
112  listener,
113  bbil_data_mutex_,
114  bbil_data_events_,
115  bbil_data_,
116  bbil_data_queue_,
117  "data");
118  }
119  break;
121  if (flag & BlackBoard::BBIL_FLAG_MESSAGES) {
122  proc_listener_maybe_queue(i->op,
123  i->interface,
124  listener,
125  bbil_messages_mutex_,
126  bbil_messages_events_,
127  bbil_messages_,
128  bbil_messages_queue_,
129  "messages");
130  }
131  break;
133  if (flag & BlackBoard::BBIL_FLAG_READER) {
134  proc_listener_maybe_queue(i->op,
135  i->interface,
136  listener,
137  bbil_reader_mutex_,
138  bbil_reader_events_,
139  bbil_reader_,
140  bbil_reader_queue_,
141  "reader");
142  }
143  break;
145  if (flag & BlackBoard::BBIL_FLAG_WRITER) {
146  proc_listener_maybe_queue(i->op,
147  i->interface,
148  listener,
149  bbil_writer_mutex_,
150  bbil_writer_events_,
151  bbil_writer_,
152  bbil_writer_queue_,
153  "writer");
154  }
155  break;
156  default: break;
157  }
158  }
159 
160  listener->bbil_release_queue(flag);
161 }
162 
163 void
164 BlackBoardNotifier::proc_listener_maybe_queue(bool op,
165  Interface * interface,
166  BlackBoardInterfaceListener *listener,
167  Mutex * mutex,
168  unsigned int & events,
169  BBilMap & map,
170  BBilQueue & queue,
171  const char * hint)
172 {
173  MutexLocker lock(mutex);
174  if (events > 0) {
175  LibLogger::log_warn("BlackBoardNotifier",
176  "%s interface "
177  "listener %s for %s events (queued)",
178  op ? "Registering" : "Unregistering",
179  listener->bbil_name(),
180  hint);
181 
182  queue_listener(op, interface, listener, queue);
183  } else {
184  if (op) { // add
185  add_listener(interface, listener, map);
186  } else {
187  remove_listener(interface, listener, map);
188  }
189  }
190 }
191 
192 /** Unregister BB interface listener.
193  * This will remove the given BlackBoard interface listener from any
194  * event that it was previously registered for.
195  * @param listener BlackBoard event listener to remove
196  */
197 void
198 BlackBoardNotifier::unregister_listener(BlackBoardInterfaceListener *listener)
199 {
200  const BlackBoardInterfaceListener::InterfaceMaps maps = listener->bbil_acquire_maps();
201 
202  BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
203  for (i = maps.data.begin(); i != maps.data.end(); ++i) {
204  proc_listener_maybe_queue(false,
205  i->second,
206  listener,
207  bbil_data_mutex_,
208  bbil_data_events_,
209  bbil_data_,
210  bbil_data_queue_,
211  "data");
212  }
213 
214  for (i = maps.messages.begin(); i != maps.messages.end(); ++i) {
215  proc_listener_maybe_queue(false,
216  i->second,
217  listener,
218  bbil_messages_mutex_,
219  bbil_messages_events_,
220  bbil_messages_,
221  bbil_messages_queue_,
222  "messages");
223  }
224 
225  for (i = maps.reader.begin(); i != maps.reader.end(); ++i) {
226  proc_listener_maybe_queue(false,
227  i->second,
228  listener,
229  bbil_reader_mutex_,
230  bbil_reader_events_,
231  bbil_reader_,
232  bbil_reader_queue_,
233  "reader");
234  }
235 
236  for (i = maps.writer.begin(); i != maps.writer.end(); ++i) {
237  proc_listener_maybe_queue(false,
238  i->second,
239  listener,
240  bbil_writer_mutex_,
241  bbil_writer_events_,
242  bbil_writer_,
243  bbil_writer_queue_,
244  "writer");
245  }
246 
247  listener->bbil_release_maps();
248 }
249 
250 /** Add listener for specified map.
251  * @param listener interface listener for events
252  * @param im map of interfaces to listen for
253  * @param ilmap internal map to add listener to
254  */
255 void
256 BlackBoardNotifier::add_listener(Interface * interface,
257  BlackBoardInterfaceListener *listener,
258  BBilMap & ilmap)
259 {
260  std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
261 
262  BBilMap::value_type v = std::make_pair(interface->uid(), listener);
263  BBilMap::iterator f = std::find(ret.first, ret.second, v);
264 
265  if (f == ret.second) {
266  ilmap.insert(std::make_pair(interface->uid(), listener));
267  }
268 }
269 
270 void
271 BlackBoardNotifier::remove_listener(Interface * interface,
272  BlackBoardInterfaceListener *listener,
273  BBilMap & ilmap)
274 {
275  std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
276  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
277  if (j->second == listener) {
278  ilmap.erase(j);
279  break;
280  }
281  }
282 }
283 
284 bool
285 BlackBoardNotifier::is_in_queue(bool op,
286  BBilQueue & queue,
287  const char * uid,
288  BlackBoardInterfaceListener *bbil)
289 {
290  BBilQueue::iterator q;
291  for (q = queue.begin(); q != queue.end(); ++q) {
292  if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
293  return true;
294  }
295  }
296  return false;
297 }
298 
299 void
300 BlackBoardNotifier::queue_listener(bool op,
301  Interface * interface,
302  BlackBoardInterfaceListener *listener,
303  BBilQueue & queue)
304 {
305  BBilQueueEntry qe = {op, interface->uid(), interface, listener};
306  queue.push_back(qe);
307 }
308 
309 /** Register BB interface observer.
310  * @param observer BlackBoard interface observer to register
311  */
312 void
313 BlackBoardNotifier::register_observer(BlackBoardInterfaceObserver *observer)
314 {
315  bbio_mutex_->lock();
316  if (bbio_events_ > 0) {
317  bbio_queue_.push_back(std::make_pair(1, observer));
318  } else {
319  add_observer(observer, observer->bbio_get_observed_create(), bbio_created_);
320  add_observer(observer, observer->bbio_get_observed_destroy(), bbio_destroyed_);
321  }
322  bbio_mutex_->unlock();
323 }
324 
325 void
326 BlackBoardNotifier::add_observer(BlackBoardInterfaceObserver * observer,
328  BBioMap & bbiomap)
329 {
331  its->lock();
332  for (i = its->begin(); i != its->end(); ++i) {
333  bbiomap[i->first].push_back(make_pair(observer, i->second));
334  }
335  its->unlock();
336 }
337 
338 /** Remove observer from map.
339  * @param iomap interface observer map to remove the observer from
340  * @param observer observer to remove
341  */
342 void
343 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
344 {
345  BBioMapIterator i, tmp;
346 
347  i = iomap.begin();
348  while (i != iomap.end()) {
349  BBioListIterator j = i->second.begin();
350  while (j != i->second.end()) {
351  if (j->first == observer) {
352  j = i->second.erase(j);
353  } else {
354  ++j;
355  }
356  }
357  if (i->second.empty()) {
358  tmp = i;
359  ++i;
360  iomap.erase(tmp);
361  } else {
362  ++i;
363  }
364  }
365 }
366 
367 /** Unregister BB interface observer.
368  * This will remove the given BlackBoard event listener from any event that it was
369  * previously registered for.
370  * @param observer BlackBoard event listener to remove
371  */
372 void
373 BlackBoardNotifier::unregister_observer(BlackBoardInterfaceObserver *observer)
374 {
375  MutexLocker lock(bbio_mutex_);
376  if (bbio_events_ > 0) {
377  BBioQueueEntry e = std::make_pair((unsigned int)0, observer);
378  BBioQueue::iterator re;
379  while ((re = find_if(bbio_queue_.begin(),
380  bbio_queue_.end(),
381  bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
382  != bbio_queue_.end()) {
383  // if there is an entry in the register queue, remove it!
384  if (re->second == observer) {
385  bbio_queue_.erase(re);
386  }
387  }
388  bbio_queue_.push_back(std::make_pair(0, observer));
389 
390  } else {
391  remove_observer(bbio_created_, observer);
392  remove_observer(bbio_destroyed_, observer);
393  }
394 }
395 
396 /** Notify that an interface has been created.
397  * @param type type of the interface
398  * @param id ID of the interface
399  */
400 void
401 BlackBoardNotifier::notify_of_interface_created(const char *type, const char *id) throw()
402 {
403  bbio_mutex_->lock();
404  bbio_events_ += 1;
405  bbio_mutex_->unlock();
406 
407  BBioMapIterator lhmi;
408  BBioListIterator i, l;
409  for (lhmi = bbio_created_.begin(); lhmi != bbio_created_.end(); ++lhmi) {
410  if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
411  continue;
412 
413  BBioList &list = lhmi->second;
414  for (i = list.begin(); i != list.end(); ++i) {
415  BlackBoardInterfaceObserver *bbio = i->first;
416  for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
417  if (fnmatch(pi->c_str(), id, 0) == 0) {
418  bbio->bb_interface_created(type, id);
419  break;
420  }
421  }
422  }
423  }
424 
425  bbio_mutex_->lock();
426  bbio_events_ -= 1;
427  process_bbio_queue();
428  bbio_mutex_->unlock();
429 }
430 
431 /** Notify that an interface has been destroyed.
432  * @param type type of the interface
433  * @param id ID of the interface
434  */
435 void
436 BlackBoardNotifier::notify_of_interface_destroyed(const char *type, const char *id) throw()
437 {
438  bbio_mutex_->lock();
439  bbio_events_ += 1;
440  bbio_mutex_->unlock();
441 
442  BBioMapIterator lhmi;
443  BBioListIterator i, l;
444  for (lhmi = bbio_destroyed_.begin(); lhmi != bbio_destroyed_.end(); ++lhmi) {
445  if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
446  continue;
447 
448  BBioList &list = (*lhmi).second;
449  for (i = list.begin(); i != list.end(); ++i) {
450  BlackBoardInterfaceObserver *bbio = i->first;
451  for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
452  if (fnmatch(pi->c_str(), id, 0) == 0) {
453  bbio->bb_interface_destroyed(type, id);
454  break;
455  }
456  }
457  }
458  }
459 
460  bbio_mutex_->lock();
461  bbio_events_ -= 1;
462  process_bbio_queue();
463  bbio_mutex_->unlock();
464 }
465 
466 void
467 BlackBoardNotifier::process_bbio_queue()
468 {
469  if (!bbio_queue_.empty()) {
470  if (bbio_events_ > 0) {
471  return;
472  } else {
473  while (!bbio_queue_.empty()) {
474  BBioQueueEntry &e = bbio_queue_.front();
475  if (e.first) { // register
476  add_observer(e.second, e.second->bbio_get_observed_create(), bbio_created_);
477  add_observer(e.second, e.second->bbio_get_observed_destroy(), bbio_destroyed_);
478  } else { // unregister
479  remove_observer(bbio_created_, e.second);
480  remove_observer(bbio_destroyed_, e.second);
481  }
482  bbio_queue_.pop_front();
483  }
484  }
485  }
486 }
487 
488 /** Notify that writer has been added.
489  * @param interface the interface for which the event happened. It is not necessarily the
490  * instance which caused the event, but it must have the same mem serial.
491  * @param event_instance_serial the instance serial of the interface that caused the event
492  * @see BlackBoardInterfaceListener::bb_interface_writer_added()
493  */
494 void
495 BlackBoardNotifier::notify_of_writer_added(const Interface *interface,
496  unsigned int event_instance_serial) throw()
497 {
498  bbil_writer_mutex_->lock();
499  bbil_writer_events_ += 1;
500  bbil_writer_mutex_->unlock();
501 
502  const char * uid = interface->uid();
503  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
504  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
505  BlackBoardInterfaceListener *bbil = j->second;
506  if (!is_in_queue(/* remove op*/ false, bbil_writer_queue_, uid, bbil)) {
507  Interface *bbil_iface = bbil->bbil_writer_interface(uid);
508  if (bbil_iface != NULL) {
509  bbil->bb_interface_writer_added(bbil_iface, event_instance_serial);
510  } else {
511  LibLogger::log_warn("BlackBoardNotifier",
512  "BBIL[%s] registered for writer events "
513  "(open) for '%s' but has no such interface",
514  bbil->bbil_name(),
515  uid);
516  }
517  }
518  }
519 
520  bbil_writer_mutex_->lock();
521  bbil_writer_events_ -= 1;
522  process_writer_queue();
523  bbil_writer_mutex_->unlock();
524 }
525 
526 /** Notify that writer has been removed.
527  * @param interface interface for which the writer has been removed
528  * @param event_instance_serial instance serial of the interface that caused the event
529  * @see BlackBoardInterfaceListener::bb_interface_writer_removed()
530  */
531 void
532 BlackBoardNotifier::notify_of_writer_removed(const Interface *interface,
533  unsigned int event_instance_serial) throw()
534 {
535  bbil_writer_mutex_->lock();
536  bbil_writer_events_ += 1;
537  bbil_writer_mutex_->unlock();
538 
539  const char * uid = interface->uid();
540  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
541  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
542  BlackBoardInterfaceListener *bbil = j->second;
543  if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
544  Interface *bbil_iface = bbil->bbil_writer_interface(uid);
545  if (bbil_iface != NULL) {
546  bbil->bb_interface_writer_removed(bbil_iface, event_instance_serial);
547  } else {
548  LibLogger::log_warn("BlackBoardNotifier",
549  "BBIL[%s] registered for writer events "
550  "(close) for '%s' but has no such interface",
551  bbil->bbil_name(),
552  uid);
553  }
554  }
555  }
556 
557  bbil_writer_mutex_->lock();
558  bbil_writer_events_ -= 1;
559  process_writer_queue();
560  bbil_writer_mutex_->unlock();
561 }
562 
563 void
564 BlackBoardNotifier::process_writer_queue()
565 {
566  if (!bbil_writer_queue_.empty()) {
567  if (bbil_writer_events_ > 0) {
568  return;
569  } else {
570  while (!bbil_writer_queue_.empty()) {
571  BBilQueueEntry &e = bbil_writer_queue_.front();
572  if (e.op) { // register
573  add_listener(e.interface, e.listener, bbil_writer_);
574  } else { // unregister
575  remove_listener(e.interface, e.listener, bbil_writer_);
576  }
577  bbil_writer_queue_.pop_front();
578  }
579  }
580  }
581 }
582 
583 /** Notify that reader has been added.
584  * @param interface interface for which the reader has been added
585  * @param event_instance_serial instance serial of the interface that caused the event
586  * @see BlackBoardInterfaceListener::bb_interface_reader_added()
587  */
588 void
589 BlackBoardNotifier::notify_of_reader_added(const Interface *interface,
590  unsigned int event_instance_serial) throw()
591 {
592  bbil_reader_mutex_->lock();
593  bbil_reader_events_ += 1;
594  bbil_reader_mutex_->unlock();
595 
596  const char * uid = interface->uid();
597  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
598  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
599  BlackBoardInterfaceListener *bbil = j->second;
600  if (!is_in_queue(/* remove op*/ false, bbil_reader_queue_, uid, bbil)) {
601  Interface *bbil_iface = bbil->bbil_reader_interface(uid);
602  if (bbil_iface != NULL) {
603  bbil->bb_interface_reader_added(bbil_iface, event_instance_serial);
604  } else {
605  LibLogger::log_warn("BlackBoardNotifier",
606  "BBIL[%s] registered for reader events "
607  "(open) for '%s' but has no such interface",
608  bbil->bbil_name(),
609  uid);
610  }
611  }
612  }
613 
614  bbil_reader_mutex_->lock();
615  bbil_reader_events_ -= 1;
616  process_reader_queue();
617  bbil_reader_mutex_->unlock();
618 }
619 
620 /** Notify that reader has been removed.
621  * @param interface interface for which the reader has been removed
622  * @param event_instance_serial instance serial of the interface that caused the event
623  * @see BlackBoardInterfaceListener::bb_interface_reader_removed()
624  */
625 void
626 BlackBoardNotifier::notify_of_reader_removed(const Interface *interface,
627  unsigned int event_instance_serial) throw()
628 {
629  bbil_reader_mutex_->lock();
630  bbil_reader_events_ += 1;
631  bbil_reader_mutex_->unlock();
632 
633  const char * uid = interface->uid();
634  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
635  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
636  BlackBoardInterfaceListener *bbil = j->second;
637  if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
638  Interface *bbil_iface = bbil->bbil_reader_interface(uid);
639  if (bbil_iface != NULL) {
640  bbil->bb_interface_reader_removed(bbil_iface, event_instance_serial);
641  } else {
642  LibLogger::log_warn("BlackBoardNotifier",
643  "BBIL[%s] registered for reader events "
644  "(close) for '%s' but has no such interface",
645  bbil->bbil_name(),
646  uid);
647  }
648  }
649  }
650 
651  bbil_reader_mutex_->lock();
652  bbil_reader_events_ -= 1;
653  process_reader_queue();
654  bbil_reader_mutex_->unlock();
655 }
656 
657 void
658 BlackBoardNotifier::process_reader_queue()
659 {
660  if (!bbil_reader_queue_.empty()) {
661  if (bbil_reader_events_ > 0) {
662  return;
663  } else {
664  while (!bbil_reader_queue_.empty()) {
665  BBilQueueEntry &e = bbil_reader_queue_.front();
666  if (e.op) { // register
667  add_listener(e.interface, e.listener, bbil_reader_);
668  } else { // unregister
669  remove_listener(e.interface, e.listener, bbil_reader_);
670  }
671  bbil_reader_queue_.pop_front();
672  }
673  }
674  }
675 }
676 
677 /** Notify of data change.
678  * Notify all subscribers of the given interface of a data change.
679  * This also influences logging and sending data over the network so it is
680  * mandatory to call this function! The interface base class write method does
681  * that for you.
682  * @param interface interface whose subscribers to notify
683  * @see Interface::write()
684  * @see BlackBoardInterfaceListener::bb_interface_data_changed()
685  */
686 void
687 BlackBoardNotifier::notify_of_data_change(const Interface *interface)
688 {
689  bbil_data_mutex_->lock();
690  bbil_data_events_ += 1;
691  bbil_data_mutex_->unlock();
692 
693  const char * uid = interface->uid();
694  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_data_.equal_range(uid);
695  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
696  BlackBoardInterfaceListener *bbil = j->second;
697  if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
698  Interface *bbil_iface = bbil->bbil_data_interface(uid);
699  if (bbil_iface != NULL) {
700  bbil->bb_interface_data_changed(bbil_iface);
701  } else {
702  LibLogger::log_warn("BlackBoardNotifier",
703  "BBIL[%s] registered for data change events "
704  "for '%s' but has no such interface",
705  bbil->bbil_name(),
706  uid);
707  }
708  }
709  }
710 
711  bbil_data_mutex_->lock();
712  bbil_data_events_ -= 1;
713  if (!bbil_data_queue_.empty()) {
714  if (bbil_data_events_ == 0) {
715  while (!bbil_data_queue_.empty()) {
716  BBilQueueEntry &e = bbil_data_queue_.front();
717  if (e.op) { // register
718  add_listener(e.interface, e.listener, bbil_data_);
719  } else { // unregister
720  remove_listener(e.interface, e.listener, bbil_data_);
721  }
722  bbil_data_queue_.pop_front();
723  }
724  }
725  }
726  bbil_data_mutex_->unlock();
727 }
728 
729 /** Notify of message received
730  * Notify all subscribers of the given interface of an incoming message
731  * This also influences logging and sending data over the network so it is
732  * mandatory to call this function! The interface base class write method does
733  * that for you.
734  * @param interface interface whose subscribers to notify
735  * @param message message which is being received
736  * @return false if any listener returned false, true otherwise
737  * @see BlackBoardInterfaceListener::bb_interface_message_received()
738  */
739 bool
740 BlackBoardNotifier::notify_of_message_received(const Interface *interface, Message *message)
741 {
742  bbil_messages_mutex_->lock();
743  bbil_messages_events_ += 1;
744  bbil_messages_mutex_->unlock();
745 
746  bool enqueue = true;
747 
748  const char * uid = interface->uid();
749  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_messages_.equal_range(uid);
750  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
751  BlackBoardInterfaceListener *bbil = j->second;
752  if (!is_in_queue(/* remove op*/ false, bbil_messages_queue_, uid, bbil)) {
753  Interface *bbil_iface = bbil->bbil_message_interface(uid);
754  if (bbil_iface != NULL) {
755  bool abort = !bbil->bb_interface_message_received(bbil_iface, message);
756  if (abort) {
757  enqueue = false;
758  break;
759  }
760  } else {
761  LibLogger::log_warn("BlackBoardNotifier",
762  "BBIL[%s] registered for message events "
763  "for '%s' but has no such interface",
764  bbil->bbil_name(),
765  uid);
766  }
767  }
768  }
769 
770  bbil_messages_mutex_->lock();
771  bbil_messages_events_ -= 1;
772  if (!bbil_messages_queue_.empty()) {
773  if (bbil_messages_events_ == 0) {
774  while (!bbil_messages_queue_.empty()) {
775  BBilQueueEntry &e = bbil_messages_queue_.front();
776  if (e.op) { // register
777  add_listener(e.interface, e.listener, bbil_messages_);
778  } else { // unregister
779  remove_listener(e.interface, e.listener, bbil_messages_);
780  }
781  bbil_messages_queue_.pop_front();
782  }
783  }
784  }
785  bbil_messages_mutex_->unlock();
786 
787  return enqueue;
788 }
789 
790 } // end namespace fawkes
fawkes::Mutex::lock
void lock()
Lock this mutex.
Definition: mutex.cpp:91
fawkes::BlackBoardInterfaceListener::InterfaceQueue
std::list< QueueEntry > InterfaceQueue
Queue of additions/removal of interfaces.
Definition: interface_listener.h:67
fawkes::BlackBoardInterfaceObserver::bb_interface_destroyed
virtual void bb_interface_destroyed(const char *type, const char *id)
BlackBoard interface destroyed notification.
Definition: interface_observer.cpp:110
fawkes::BlackBoard::BBIL_FLAG_MESSAGES
consider message received events
Definition: blackboard.h:92
fawkes::BlackBoard::BBIL_FLAG_DATA
consider data events
Definition: blackboard.h:91
fawkes::Mutex
Definition: mutex.h:36
fawkes::BlackBoardInterfaceListener::bb_interface_message_received
virtual bool bb_interface_message_received(Interface *interface, Message *message)
BlackBoard message received notification.
Definition: interface_listener.cpp:149
fawkes::BlackBoardInterfaceListener::DATA
Data changed event entry.
Definition: interface_listener.h:52
fawkes::BlackBoardNotifier::~BlackBoardNotifier
virtual ~BlackBoardNotifier()
Destructor.
Definition: notifier.cpp:75
fawkes::BlackBoardInterfaceListener
Definition: interface_listener.h:45
fawkes::BlackBoardInterfaceObserver::ObservedInterfaceLockMap
LockMap< std::string, std::list< std::string > > ObservedInterfaceLockMap
Type for lockable interface type hash sets.
Definition: interface_observer.h:56
fawkes::BlackBoardInterfaceListener::bbil_reader_interface
Interface * bbil_reader_interface(const char *iuid)
Get interface instance for given UID.
Definition: interface_listener.cpp:498
fawkes::Mutex::unlock
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
fawkes::BlackBoardNotifier::register_listener
void register_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Register BB event listener.
Definition: notifier.cpp:91
fawkes::BlackBoardNotifier::notify_of_message_received
bool notify_of_message_received(const Interface *interface, Message *message)
Notify of message received Notify all subscribers of the given interface of an incoming message This ...
Definition: notifier.cpp:744
fawkes::BlackBoard::BBIL_FLAG_READER
consider reader events
Definition: blackboard.h:93
fawkes::BlackBoardInterfaceListener::bbil_message_interface
Interface * bbil_message_interface(const char *iuid)
Get interface instance for given UID.
Definition: interface_listener.cpp:486
fawkes::BlackBoardNotifier::notify_of_reader_removed
void notify_of_reader_removed(const Interface *interface, unsigned int event_instance_serial)
Notify that reader has been removed.
Definition: notifier.cpp:630
fawkes::BlackBoardInterfaceListener::bb_interface_reader_added
virtual void bb_interface_reader_added(Interface *interface, unsigned int instance_serial)
A reading instance has been opened for a watched interface.
Definition: interface_listener.cpp:163
fawkes::BlackBoardNotifier::register_observer
void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: notifier.cpp:317
fawkes::BlackBoardInterfaceListener::READER
Reader event entry.
Definition: interface_listener.h:54
fawkes::BlackBoardInterfaceListener::bb_interface_writer_added
virtual void bb_interface_writer_added(Interface *interface, unsigned int instance_serial)
A writing instance has been opened for a watched interface.
Definition: interface_listener.cpp:189
fawkes::LibLogger::log_warn
static void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: liblogger.cpp:160
fawkes
fawkes::BlackBoard::ListenerRegisterFlag
ListenerRegisterFlag
Flags to constrain listener registration/updates.
Definition: blackboard.h:90
fawkes::BlackBoardInterfaceListener::bb_interface_data_changed
virtual void bb_interface_data_changed(Interface *interface)
BlackBoard data changed notification.
Definition: interface_listener.cpp:127
fawkes::BlackBoardInterfaceObserver
Definition: interface_observer.h:40
fawkes::Interface
Definition: interface.h:77
fawkes::BlackBoardNotifier::notify_of_reader_added
void notify_of_reader_added(const Interface *interface, unsigned int event_instance_serial)
Notify that reader has been added.
Definition: notifier.cpp:593
fawkes::BlackBoardNotifier::unregister_observer
void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: notifier.cpp:377
fawkes::BlackBoardInterfaceListener::bbil_writer_interface
Interface * bbil_writer_interface(const char *iuid)
Get interface instance for given UID.
Definition: interface_listener.cpp:510
fawkes::BlackBoardNotifier::notify_of_writer_added
void notify_of_writer_added(const Interface *interface, unsigned int event_instance_serial)
Notify that writer has been added.
Definition: notifier.cpp:499
fawkes::BlackBoardInterfaceListener::WRITER
Writer event entry.
Definition: interface_listener.h:55
fawkes::BlackBoardNotifier::BlackBoardNotifier
BlackBoardNotifier()
Constructor.
Definition: notifier.cpp:56
fawkes::BlackBoard::BBIL_FLAG_WRITER
consider writer events
Definition: blackboard.h:94
fawkes::BlackBoardNotifier::notify_of_interface_created
void notify_of_interface_created(const char *type, const char *id)
Notify that an interface has been created.
Definition: notifier.cpp:405
fawkes::BlackBoardNotifier::notify_of_data_change
void notify_of_data_change(const Interface *interface)
Notify of data change.
Definition: notifier.cpp:691
fawkes::BlackBoardInterfaceListener::bb_interface_writer_removed
virtual void bb_interface_writer_removed(Interface *interface, unsigned int instance_serial)
A writing instance has been closed for a watched interface.
Definition: interface_listener.cpp:202
fawkes::BlackBoardNotifier::unregister_listener
void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: notifier.cpp:202
fawkes::BlackBoardNotifier::update_listener
void update_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Update BB event listener.
Definition: notifier.cpp:103
fawkes::BlackBoardInterfaceListener::bbil_data_interface
Interface * bbil_data_interface(const char *iuid)
Get interface instance for given UID.
Definition: interface_listener.cpp:474
fawkes::BlackBoardInterfaceListener::bb_interface_reader_removed
virtual void bb_interface_reader_removed(Interface *interface, unsigned int instance_serial)
A reading instance has been closed for a watched interface.
Definition: interface_listener.cpp:176
fawkes::BlackBoardInterfaceObserver::bb_interface_created
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
Definition: interface_observer.cpp:95
fawkes::BlackBoardInterfaceListener::MESSAGES
Message received event entry.
Definition: interface_listener.h:53
fawkes::BlackBoardNotifier::notify_of_interface_destroyed
void notify_of_interface_destroyed(const char *type, const char *id)
Notify that an interface has been destroyed.
Definition: notifier.cpp:440
fawkes::BlackBoardInterfaceListener::bbil_name
const char * bbil_name() const
Get BBIL name.
Definition: interface_listener.cpp:116
fawkes::BlackBoardNotifier::notify_of_writer_removed
void notify_of_writer_removed(const Interface *interface, unsigned int event_instance_serial)
Notify that writer has been removed.
Definition: notifier.cpp:536
fawkes::BlackBoardInterfaceObserver::ObservedInterfaceLockMapIterator
ObservedInterfaceLockMap::iterator ObservedInterfaceLockMapIterator
Type for iterator of lockable interface type hash sets.
Definition: interface_observer.h:59