Fawkes API  Fawkes Development Version
base_thread.cpp
1 
2 /***************************************************************************
3  * base_thread.cpp - FireVision Base Thread
4  *
5  * Created: Tue May 29 16:41:50 2007
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "base_thread.h"
24 
25 #include "acquisition_thread.h"
26 #include "aqt_vision_threads.h"
27 
28 #include <aspect/vision.h>
29 #include <core/exceptions/software.h>
30 #include <core/threading/barrier.h>
31 #include <core/threading/mutex.h>
32 #include <core/threading/mutex_locker.h>
33 #include <core/threading/thread.h>
34 #include <fvcams/cam_exceptions.h>
35 #include <fvcams/control/factory.h>
36 #include <fvcams/factory.h>
37 #include <fvutils/ipc/shm_image.h>
38 #include <fvutils/ipc/shm_lut.h>
39 #include <fvutils/system/camargp.h>
40 #include <logging/logger.h>
41 
42 #include <algorithm>
43 #include <unistd.h>
44 
45 using namespace fawkes;
46 using namespace firevision;
47 
48 /** @class FvBaseThread "base_thread.h"
49  * FireVision base thread.
50  * This implements the functionality of the FvBasePlugin.
51  * @author Tim Niemueller
52  */
53 
54 /** Constructor. */
56 : Thread("FvBaseThread", Thread::OPMODE_WAITFORWAKEUP),
57  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE),
58  VisionMasterAspect(this)
59 {
60  // default to 30 seconds
61  aqt_timeout_ = 30;
62  aqt_barrier_ = new Barrier(1);
63 }
64 
65 /** Destructor. */
67 {
68  delete aqt_barrier_;
69 }
70 
71 void
73 {
74  // wipe all previously existing FireVision shared memory segments
75  // that are orphaned
76  SharedMemoryImageBuffer::cleanup(/* use lister */ false);
77  SharedMemoryLookupTable::cleanup(/* use lister */ false);
78 }
79 
80 void
82 {
83  aqts_.lock();
84  for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
85  thread_collector->remove(ait_->second);
86  delete ait_->second;
87  }
88  aqts_.clear();
89  aqts_.unlock();
90  owned_controls_.lock();
92  for (i = owned_controls_.begin(); i != owned_controls_.end(); ++i) {
93  delete *i;
94  }
95  owned_controls_.clear();
96  owned_controls_.unlock();
97 }
98 
99 /** Thread loop. */
100 void
102 {
103  aqts_.lock();
104 
105  try {
106  for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
107  ait_->second->set_vt_prepfin_hold(true);
108  }
109  } catch (Exception &e) {
110  logger->log_warn(name(), "Cannot get prepfin hold status, skipping this loop");
111  for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
112  ait_->second->set_vt_prepfin_hold(false);
113  }
114  aqts_.unlock();
115  return;
116  }
117 
118  // Wakeup all cyclic acquisition threads and wait for them
119  for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
120  if (ait_->second->aqtmode() == FvAcquisitionThread::AqtCyclic) {
121  //logger->log_debug(name(), "Waking Thread %s", ait_->second->name());
122  ait_->second->wakeup(aqt_barrier_);
123  }
124  }
125 
126  aqt_barrier_->wait();
127 
128  // Check for aqt timeouts
129  for (ait_ = aqts_.begin(); ait_ != aqts_.end();) {
130  if (ait_->second->vision_threads->empty()
131  && (ait_->second->vision_threads->empty_time() > aqt_timeout_)) {
132  logger->log_info(name(), "Acquisition thread %s timed out, destroying", ait_->second->name());
133 
134  thread_collector->remove(ait_->second);
135  delete ait_->second;
136  aqts_.erase(ait_++);
137  } else {
138  ++ait_;
139  }
140  }
141 
142  started_threads_.lock();
143  fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stit = started_threads_.begin();
144  while (stit != started_threads_.end()) {
145  logger->log_info(name(),
146  "Thread %s has been started, %zu",
147  stit->second->name(),
148  started_threads_.size());
149 
150  // if the thread is registered in that aqt mark it running
151  stit->second->vision_threads->set_thread_running(stit->first);
152 
153  if (stit->second->vision_threads->has_cyclic_thread()) {
154  // Make thread actually capture data
155  stit->second->set_enabled(true);
156 
157  if (stit->second->aqtmode() != FvAcquisitionThread::AqtCyclic) {
158  logger->log_info(name(),
159  "Switching acquisition thread %s to cyclic mode",
160  stit->second->name());
161 
162  stit->second->prepare_finalize();
163  stit->second->cancel();
164  stit->second->join();
165  stit->second->set_aqtmode(FvAcquisitionThread::AqtCyclic);
166  stit->second->start();
167  stit->second->cancel_finalize();
168  }
169  } else if (stit->second->vision_threads->has_cont_thread()) {
170  // Make thread actually capture data
171  stit->second->set_enabled(true);
172 
173  if (stit->second->aqtmode() != FvAcquisitionThread::AqtContinuous) {
174  logger->log_info(name(),
175  "Switching acquisition thread %s to continuous mode",
176  stit->second->name());
177  stit->second->prepare_finalize();
178  stit->second->cancel();
179  stit->second->join();
180  stit->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
181  stit->second->start();
182  stit->second->cancel_finalize();
183  }
184  } else {
185  logger->log_warn(name(),
186  "Acquisition thread %s has no threads while we expected some",
187  stit->second->name());
188  // Make thread stop capturing data
189  stit->second->set_enabled(false);
190  }
191 
193  ++stit;
194  started_threads_.erase(stittmp);
195  }
196  started_threads_.unlock();
197 
198  // Re-create barrier as necessary after _adding_ threads
199  unsigned int num_cyclic_threads = 0;
200  for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
201  if (ait_->second->vision_threads->has_cyclic_thread()) {
202  ++num_cyclic_threads;
203  }
204  }
205  cond_recreate_barrier(num_cyclic_threads);
206 
207  for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
208  ait_->second->set_vt_prepfin_hold(false);
209  }
210 
211  aqts_.unlock();
212 }
213 
214 /** Get vision master.
215  * @return vision master
216  */
217 VisionMaster *
219 {
220  return this;
221 }
222 
223 Camera *
224 FvBaseThread::register_for_camera(const char *camera_string, Thread *thread, colorspace_t cspace)
225 {
226  Camera *c = NULL;
227  aqts_.lock();
228 
229  logger->log_info(name(), "Thread '%s' registers for camera '%s'", thread->name(), camera_string);
230 
231  VisionAspect *vision_thread = dynamic_cast<VisionAspect *>(thread);
232  if (vision_thread == NULL) {
233  throw TypeMismatchException("Thread is not a vision thread");
234  }
235 
236  CameraArgumentParser *cap = new CameraArgumentParser(camera_string);
237  try {
238  std::string id = cap->cam_type() + "." + cap->cam_id();
239  if (aqts_.find(id) != aqts_.end()) {
240  // this camera has already been loaded
241  c = aqts_[id]->camera_instance(cspace,
242  (vision_thread->vision_thread_mode()
243  == VisionAspect::CONTINUOUS));
244 
245  aqts_[id]->vision_threads->add_waiting_thread(thread);
246 
247  } else {
248  Camera *cam = NULL;
249  try {
250  cam = CameraFactory::instance(cap);
251  cam->open();
252  } catch (Exception &e) {
253  delete cam;
254  e.append("Could not open camera");
255  throw;
256  }
257 
258  FvAcquisitionThread *aqt = new FvAcquisitionThread(id.c_str(), cam, logger, clock);
259 
260  c = aqt->camera_instance(cspace,
261  (vision_thread->vision_thread_mode() == VisionAspect::CONTINUOUS));
262 
263  aqt->vision_threads->add_waiting_thread(thread);
264 
265  aqts_[id] = aqt;
266  thread_collector->add(aqt);
267 
268  // no need to recreate barrier, by default aqts operate in continuous mode
269 
270  logger->log_info(name(),
271  "Acquisition thread '%s' started for thread '%s' and camera '%s'",
272  aqt->name(),
273  thread->name(),
274  id.c_str());
275  }
276 
277  thread->add_notification_listener(this);
278 
279  } catch (UnknownCameraTypeException &e) {
280  delete cap;
281  e.append("FvBaseVisionMaster: could not instantiate camera");
282  aqts_.unlock();
283  throw;
284  } catch (Exception &e) {
285  delete cap;
286  e.append("FvBaseVisionMaster: could not open or start camera");
287  aqts_.unlock();
288  throw;
289  }
290 
291  delete cap;
292 
293  aqts_.unlock();
294  return c;
295 }
296 
297 Camera *
298 FvBaseThread::register_for_raw_camera(const char *camera_string, Thread *thread)
299 {
300  Camera * camera = register_for_camera(camera_string, thread, CS_UNKNOWN);
301  CameraArgumentParser cap(camera_string);
302  try {
303  std::string id = cap.cam_type() + "." + cap.cam_id();
304  aqts_.lock();
305  if (aqts_.find(id) != aqts_.end()) {
306  aqts_[id]->raw_subscriber_thread = thread;
307  }
308  aqts_.unlock();
309  } catch (Exception &e) {
310  aqts_.unlock();
311  throw;
312  }
313  return camera;
314 }
315 
317 FvBaseThread::create_camctrl(const char *camera_string)
318 {
319  CameraControl *cc = CameraControlFactory::instance(camera_string);
320  if (cc) {
321  owned_controls_.lock();
322  owned_controls_.push_back(cc);
323  owned_controls_.sort();
324  owned_controls_.unique();
325  owned_controls_.unlock();
326  return cc;
327  } else {
328  throw Exception("Cannot create camera control of desired type");
329  }
330 }
331 
333 FvBaseThread::acquire_camctrl(const char *cam_string)
334 {
335  CameraArgumentParser cap(cam_string);
336  std::string id = cap.cam_type() + "." + cap.cam_id();
337 
338  // Has this camera been loaded?
339  MutexLocker lock(aqts_.mutex());
340  if (aqts_.find(id) != aqts_.end()) {
341  return CameraControlFactory::instance(aqts_[id]->get_camera());
342  } else {
343  return create_camctrl(cam_string);
344  }
345 }
346 
348 FvBaseThread::acquire_camctrl(const char *cam_string, const std::type_info &typeinf)
349 {
350  CameraArgumentParser cap(cam_string);
351  std::string id = cap.cam_type() + "." + cap.cam_id();
352 
353  // Has this camera been loaded?
354  MutexLocker lock(aqts_.mutex());
355  if (aqts_.find(id) != aqts_.end()) {
356  return CameraControlFactory::instance(typeinf, aqts_[id]->get_camera());
357  } else {
358  return create_camctrl(cam_string);
359  }
360 }
361 
362 void
364 {
365  owned_controls_.lock();
367  if ((f = std::find(owned_controls_.begin(), owned_controls_.end(), cc))
368  != owned_controls_.end()) {
369  delete *f;
370  owned_controls_.erase(f);
371  }
372  owned_controls_.unlock();
373 }
374 
375 /** Conditionally re-create barriers.
376  * Re-create barriers if the number of cyclic threads has changed.
377  * @param num_cyclic_threads new number of cyclic threads
378  */
379 void
380 FvBaseThread::cond_recreate_barrier(unsigned int num_cyclic_threads)
381 {
382  if ((num_cyclic_threads + 1) != aqt_barrier_->count()) {
383  delete aqt_barrier_;
384  aqt_barrier_ = new Barrier(num_cyclic_threads + 1); // +1 for base thread
385  }
386 }
387 
388 void
390 {
391  aqts_.lock();
392  unsigned int num_cyclic_threads = 0;
393 
394  for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
395  // Remove thread from all aqts
396  ait_->second->vision_threads->remove_thread(thread);
397 
398  if (ait_->second->raw_subscriber_thread == thread) {
399  ait_->second->raw_subscriber_thread = NULL;
400  }
401 
402  if (ait_->second->vision_threads->has_cyclic_thread()) {
403  ++num_cyclic_threads;
404 
405  } else if (ait_->second->aqtmode() != FvAcquisitionThread::AqtContinuous) {
406  logger->log_info(name(),
407  "Switching acquisition thread %s to continuous mode "
408  "on unregister",
409  ait_->second->name());
410 
411  ait_->second->prepare_finalize();
412  ait_->second->cancel();
413  ait_->second->join();
414  ait_->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
415  ait_->second->start();
416  ait_->second->cancel_finalize();
417  }
418 
419  if (ait_->second->vision_threads->empty()) {
420  // Make thread stop capturing data
421  logger->log_info(name(),
422  "Disabling capturing on thread %s (no more threads)",
423  ait_->second->name());
424  ait_->second->set_enabled(false);
425  }
426  }
427  // Recreate as necessary after _removing_ threads
428  cond_recreate_barrier(num_cyclic_threads);
429 
430  aqts_.unlock();
431 }
432 
433 bool
435 {
436  aqts_.lock();
437  for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
438  if (ait_->second->vision_threads->has_waiting_thread(thread)) {
439  started_threads_.lock();
440  started_threads_[thread] = ait_->second;
441  started_threads_.unlock();
442  }
443  }
444  aqts_.unlock();
445 
446  return false;
447 }
448 
449 bool
451 {
452  aqts_.lock();
453  for (ait_ = aqts_.begin(); ait_ != aqts_.end(); ++ait_) {
454  ait_->second->vision_threads->remove_waiting_thread(thread);
455  }
456  aqts_.unlock();
457 
458  return false;
459 }
FvBaseThread::release_camctrl
virtual void release_camctrl(firevision::CameraControl *cc)
Definition: base_thread.cpp:363
fawkes::LockList::lock
virtual void lock() const
Lock list.
Definition: lock_list.h:128
fawkes::ThreadCollector::remove
virtual void remove(ThreadList &tl)=0
fawkes::LockMap
Definition: lock_map.h:39
fawkes::VisionAspect::vision_thread_mode
VisionThreadMode vision_thread_mode()
Get the vision thread mode of this thread.
Definition: vision.cpp:85
fawkes::LockMap::mutex
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:137
firevision::CameraArgumentParser::cam_id
std::string cam_id() const
Get camera ID.
Definition: camargp.cpp:132
firevision::CameraControl
Definition: control.h:35
FvBaseThread::register_for_raw_camera
virtual firevision::Camera * register_for_raw_camera(const char *camera_string, fawkes::Thread *thread)
Definition: base_thread.cpp:298
fawkes::VisionMasterAspect
Definition: vision_master.h:34
FvBaseThread::loop
virtual void loop()
Thread loop.
Definition: base_thread.cpp:101
FvAcquisitionThread
Definition: acquisition_thread.h:50
fawkes::Logger::log_info
virtual void log_info(const char *component, const char *format,...)=0
fawkes::MutexLocker
Definition: mutex_locker.h:37
fawkes::LockList
Definition: thread.h:42
FvAcquisitionThread::camera_instance
firevision::Camera * camera_instance(firevision::colorspace_t cspace, bool deep_copy)
Get a camera instance.
Definition: acquisition_thread.cpp:177
firevision::Camera::open
virtual void open()=0
fawkes::BlockedTimingAspect
Definition: blocked_timing.h:54
fawkes::Thread::name
const char * name() const
Definition: thread.h:99
FvBaseThread::thread_init_failed
virtual bool thread_init_failed(fawkes::Thread *thread)
Definition: base_thread.cpp:450
fawkes::LockMap::unlock
void unlock() const
Unlock list.
Definition: lock_map.h:113
fawkes::ClockAspect::clock
Clock * clock
Definition: clock.h:53
fawkes::ThreadCollector::add
virtual void add(ThreadList &tl)=0
FvBaseThread::FvBaseThread
FvBaseThread()
Constructor.
Definition: base_thread.cpp:55
fawkes::Exception::append
void append(const char *format,...)
Append messages to the message list.
Definition: exception.cpp:332
fawkes::TypeMismatchException
Definition: software.h:47
fawkes::LockList::unlock
virtual void unlock() const
Unlock list.
Definition: lock_list.h:142
FvAqtVisionThreads::add_waiting_thread
void add_waiting_thread(fawkes::Thread *thread)
Add a thread in waiting state.
Definition: aqt_vision_threads.cpp:59
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:50
fawkes::ThreadProducerAspect::thread_collector
ThreadCollector * thread_collector
Definition: thread_producer.h:50
FvAcquisitionThread::AqtContinuous
continuous mode, use if there are only continuous threads for this acquisition thread.
Definition: acquisition_thread.h:60
fawkes
FvBaseThread::thread_started
virtual bool thread_started(fawkes::Thread *thread)
Definition: base_thread.cpp:434
firevision::CameraArgumentParser::cam_type
std::string cam_type() const
Get camera type.
Definition: camargp.cpp:121
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
FvBaseThread::acquire_camctrl
virtual firevision::CameraControl * acquire_camctrl(const char *cam_string)
Definition: base_thread.cpp:333
fawkes::Thread::add_notification_listener
void add_notification_listener(ThreadNotificationListener *notification_listener)
Add notification listener.
Definition: thread.cpp:1166
firevision::CameraArgumentParser
Definition: camargp.h:39
FvBaseThread::init
virtual void init()
Initialize the thread.
Definition: base_thread.cpp:72
firevision::UnknownCameraTypeException
Definition: cam_exceptions.h:53
FvBaseThread::register_for_camera
virtual firevision::Camera * register_for_camera(const char *camera_string, fawkes::Thread *thread, firevision::colorspace_t cspace=firevision::YUV422_PLANAR)
Definition: base_thread.cpp:224
FvAcquisitionThread::AqtCyclic
cyclic mode, use if there is at least one cyclic thread for this acquisition thread.
Definition: acquisition_thread.h:58
FvAcquisitionThread::vision_threads
FvAqtVisionThreads * vision_threads
Vision threads assigned to this acquisition thread.
Definition: acquisition_thread.h:86
FvBaseThread::unregister_thread
virtual void unregister_thread(fawkes::Thread *thread)
Definition: base_thread.cpp:389
fawkes::Thread
Definition: thread.h:44
fawkes::Barrier::count
unsigned int count()
Get number of threads this barrier will wait for.
Definition: barrier.cpp:180
FvBaseThread::finalize
virtual void finalize()
Finalize the thread.
Definition: base_thread.cpp:81
firevision::Camera
Definition: camera.h:36
fawkes::LockMap::lock
void lock() const
Lock list.
Definition: lock_map.h:95
FvBaseThread::~FvBaseThread
virtual ~FvBaseThread()
Destructor.
Definition: base_thread.cpp:66
fawkes::Barrier
Definition: barrier.h:35
firevision::VisionMaster
Definition: vision_master.h:42
FvBaseThread::vision_master
virtual firevision::VisionMaster * vision_master()
Get vision master.
Definition: base_thread.cpp:218
fawkes::Barrier::wait
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:157
fawkes::VisionAspect
Definition: vision.h:36
fawkes::Exception
Definition: exception.h:39