Fawkes API
Fawkes Development Version
|
22 #include <core/threading/mutex_locker.h>
23 #include <syncpoint/exceptions.h>
24 #include <syncpoint/syncpoint.h>
25 #include <utils/time/time.h>
57 SyncPoint::SyncPoint(
string identifier,
59 uint max_waittime_sec ,
60 uint max_waittime_nsec )
61 : identifier_(identifier),
65 creation_time_(
Time()),
67 mutex_next_wait_(new
Mutex()),
69 mutex_wait_for_one_(new
Mutex()),
71 mutex_wait_for_all_(new
Mutex()),
73 wait_for_all_timer_running_(false),
74 max_waittime_sec_(max_waittime_sec),
75 max_waittime_nsec_(max_waittime_nsec),
77 last_emitter_reset_(
Time(0l))
79 if (identifier.empty()) {
83 if (identifier.compare(0, 1,
"/")) {
90 if (identifier !=
"/" && !identifier.compare(identifier.size() - 1, 1,
"/")) {
96 SyncPoint::~SyncPoint()
150 emit(component,
true);
159 SyncPoint::emit(
const std::string &component,
bool remove_from_pending)
162 if (!emit_locker_.empty()) {
177 if (!emitters_.count(component)) {
186 bool pred_remove_from_pending =
false;
187 if (remove_from_pending) {
188 multiset<string>::iterator it_pending = pending_emitters_.find(component);
189 if (it_pending != pending_emitters_.end()) {
190 pending_emitters_.erase(it_pending);
192 if (last_emitter_reset_ <= predecessor_->last_emitter_reset_) {
193 pred_remove_from_pending =
true;
198 if (pending_emitters_.empty()) {
211 predecessor_->
emit(component, pred_remove_from_pending);
246 std::set<std::string> * watchers;
250 bool * timer_running;
251 string * timer_owner;
253 if (type == WAIT_FOR_ONE) {
258 timer_running = NULL;
259 }
else if (type == WAIT_FOR_ALL) {
279 if (watchers->count(component)) {
288 bool need_to_wait = !emitters_.empty() || type == WAIT_FOR_ONE;
290 watchers->insert(component);
294 if (emit_locker_ == component) {
300 if (type == WAIT_FOR_ONE) {
303 pthread_cleanup_push(cleanup_mutex, mutex_cond);
305 pthread_cleanup_pop(1);
308 handle_default(component, type);
312 if (*timer_running) {
314 pthread_cleanup_push(cleanup_mutex, mutex_cond);
316 pthread_cleanup_pop(1);
318 *timer_running =
true;
319 *timer_owner = component;
320 if (wait_sec != 0 || wait_nsec != 0) {
326 pthread_cleanup_push(cleanup_mutex, mutex_cond);
328 pthread_cleanup_pop(1);
330 *timer_running =
false;
333 handle_default(component, type);
356 wait(component, WAIT_FOR_ONE);
365 wait(component, WAIT_FOR_ALL);
376 wait(component, SyncPoint::WAIT_FOR_ONE, wait_sec, wait_nsec);
387 wait(component, SyncPoint::WAIT_FOR_ALL, wait_sec, wait_nsec);
420 if (emit_locker_.empty()) {
421 emit_locker_ = component;
424 "%s tried to call lock_until_next_wait, "
425 "but %s already did the same. Ignoring.",
427 emit_locker_.c_str());
440 emitters_.insert(component);
441 pending_emitters_.insert(component);
457 multiset<string>::iterator it_emitter = emitters_.find(component);
458 if (it_emitter == emitters_.end()) {
463 if (emit_if_pending && is_pending(component)) {
470 emitters_.erase(it_emitter);
485 return emitters_.count(component) > 0;
505 pair<set<string>::iterator,
bool>
515 std::set<std::string>
530 if (type == WAIT_FOR_ONE) {
532 }
else if (type == WAIT_FOR_ALL) {
568 case SyncPoint::WAIT_FOR_ONE: {
572 case SyncPoint::WAIT_FOR_ALL: {
581 SyncPoint::reset_emitters()
583 last_emitter_reset_ =
Time();
584 pending_emitters_ = emitters_;
588 SyncPoint::is_pending(
string component)
590 return pending_emitters_.count(component) > 0;
594 SyncPoint::handle_default(
string component, WakeupType type)
597 "Thread time limit exceeded while waiting for syncpoint '%s'. "
598 "Time limit: %f sec.",
601 bad_components_.insert(pending_emitters_.begin(), pending_emitters_.end());
602 if (bad_components_.size() > 1) {
603 string bad_components_string =
"";
604 for (set<string>::const_iterator it = bad_components_.begin(); it != bad_components_.end();
606 bad_components_string +=
" " + *it;
608 logger_->
log_warn(component.c_str(),
"bad components:%s", bad_components_string.c_str());
609 }
else if (bad_components_.size() == 1) {
610 logger_->
log_warn(component.c_str(),
"bad component: %s", bad_components_.begin()->c_str());
611 }
else if (type == SyncPoint::WAIT_FOR_ALL) {
612 throw Exception(
"SyncPoints: component %s defaulted, "
613 "but there is no pending emitter. This is probably a bug.",
void lock()
Lock this mutex.
WakeupType
Type to define when a thread wakes up after waiting for a SyncPoint.
Mutex * mutex_next_wait_
Mutex used to allow lock_until_next_wait.
bool watcher_is_waiting(std::string watcher, WakeupType type) const
Check if the given waiter is currently waiting with the given type.
Emit was called by a component which isn't in the watcher set (or wrong component argument was passed...
Invalid identifier used (i.e.
virtual void unwait(const std::string &component)
abort waiting
std::pair< std::set< std::string >::iterator, bool > add_watcher(std::string watcher)
Add a watcher to the watch list.
std::set< std::string > get_watchers() const
virtual void emit(const std::string &component)
send a signal to all waiting threads
WaitCondition * cond_wait_for_one_
WaitCondition which is used for wait_for_one()
void push_back(const Type &val)
Insert an element at the end of the buffer and delete the first element if necessary.
bool wait_for_all_timer_running_
true if the wait for all timer is running
uint max_waittime_nsec_
maximum waiting time in nsecs
std::set< std::string > watchers_
Set of all components which use this SyncPoint.
Mutex * mutex_wait_for_one_
Mutex used for cond_wait_for_one_.
virtual void reltime_wait_for_all(const std::string &component, uint wait_sec, uint wait_nsec)
Wait for all registered emitters for the given time.
CircularBuffer< SyncPointCall > get_wait_calls(WakeupType type=WAIT_FOR_ONE) const
std::set< std::string > watchers_wait_for_all_
Set of all components which are currently waiting on the barrier.
std::string get_identifier() const
const std::string identifier_
The unique identifier of the SyncPoint.
uint max_waittime_sec_
maximum waiting time in secs
Mutex * mutex_wait_for_all_
Mutex used for cond_wait_for_all_.
void unlock()
Unlock the mutex.
bool is_watcher(const std::string &component) const
Check if the given component is a watch.
virtual void reltime_wait_for_one(const std::string &component, uint wait_sec, uint wait_nsec)
wait for the sync point, but abort after given time
void wait()
Wait for the condition forever.
virtual void log_warn(const char *component, const char *format,...)
WaitCondition * cond_next_wait_
WaitCondition used for lock_until_next_wait.
WaitCondition * cond_wait_for_all_
WaitCondition which is used for wait_for_all()
CircularBuffer< SyncPointCall > wait_for_all_calls_
A buffer of the most recent wait calls of type WAIT_FOR_ALL.
virtual void unregister_emitter(const std::string &component, bool emit_if_pending=true)
unregister as emitter
bool operator==(const SyncPoint &other) const
EqualOperator.
CircularBuffer< SyncPointCall > wait_for_one_calls_
A buffer of the most recent wait calls of type WAIT_FOR_ONE.
MultiLogger * logger_
Logger.
void relock()
Lock this mutex, again.
virtual void wait_for_one(const std::string &component)
Wait for a single emitter.
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
void wake_all()
Wake up all waiting threads.
A component called wait() but is already waiting.
Emit was called by a component which isn't in the watcher set (or wrong component argument was passed...
std::set< std::string > watchers_wait_for_one_
Set of all components which are currently waiting for a single emitter.
virtual void register_emitter(const std::string &component)
register as emitter
CircularBuffer< SyncPointCall > emit_calls_
A buffer of the most recent emit calls.
Emit was called on a SyncBarrier but the calling component is not registered as emitter.
std::multiset< std::string > get_emitters() const
std::string wait_for_all_timer_owner_
the component that started the wait-for-all timer
bool operator<(const SyncPoint &other) const
LessThan Operator.
virtual void wait(const std::string &component, WakeupType=WAIT_FOR_ONE, uint wait_sec=0, uint wait_nsec=0)
wait for the sync point to be emitted by any other component
bool is_emitter(const std::string &component) const
Check if the given component is an emitter.
void unlock()
Unlock the mutex.
void lock_until_next_wait(const std::string &component)
Lock the SyncPoint for emitters until the specified component does the next wait() call.
CircularBuffer< SyncPointCall > get_emit_calls() const
virtual void wait_for_all(const std::string &component)
Wait for all registered emitters.
Mutex * mutex_
Mutex used to protect all member variables.