Fawkes API  Fawkes Development Version
mongodb_log_pcl_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_pcl_thread.cpp - Thread to log point clouds to MongoDB
4  *
5  * Created: Mon Nov 07 02:58:40 2011
6  * Copyright 2011-2017 Tim Niemueller [www.niemueller.de]
7  * 2012 Bastian Klingen
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "mongodb_log_pcl_thread.h"
24 
25 // Fawkes
26 #include <core/threading/mutex_locker.h>
27 #include <utils/time/wait.h>
28 
29 // from MongoDB
30 #include <fnmatch.h>
31 #include <mongocxx/client.hpp>
32 #include <mongocxx/exception/operation_exception.hpp>
33 #include <mongocxx/gridfs/uploader.hpp>
34 #include <unistd.h>
35 
36 using namespace fawkes;
37 using namespace mongocxx;
38 
39 /** @class MongoLogPointCloudThread "mongodb_log_pcl_thread.h"
40  * Thread to store point clouds to MongoDB.
41  * @author Tim Niemueller
42  * @author Bastian Klingen
43  */
44 
45 /** Constructor. */
47 : Thread("MongoLogPointCloudThread", Thread::OPMODE_CONTINUOUS), MongoDBAspect("default")
48 {
50 }
51 
52 /** Destructor. */
54 {
55 }
56 
57 void
59 {
60  database_ = "fflog";
61  try {
62  database_ = config->get_string("/plugins/mongodb-log/database");
63  } catch (Exception &e) {
64  logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
65  }
66 
67  cfg_storage_interval_ = config->get_float("/plugins/mongodb-log/pointclouds/storage-interval");
68 
69  cfg_chunk_size_ = 2097152; // 2 MB
70  try {
71  cfg_chunk_size_ = config->get_uint("/plugins/mongodb-log/pointclouds/chunk-size");
72  } catch (Exception &e) {
73  } // ignored, use default
74  logger->log_info(name(), "Chunk size: %u", cfg_chunk_size_);
75 
76  cfg_flush_after_write_ = false;
77  try {
78  cfg_flush_after_write_ = config->get_uint("/plugins/mongodb-log/pointclouds/flush-after-write");
79  } catch (Exception &e) {
80  } // ignored, use default
81 
82  std::vector<std::string> includes;
83  try {
84  includes = config->get_strings("/plugins/mongodb-log/pointclouds/includes");
85  } catch (Exception &e) {
86  } // ignored, no include rules
87  std::vector<std::string> excludes;
88 
89  try {
90  excludes = config->get_strings("/plugins/mongodb-log/pointclouds/excludes");
91  } catch (Exception &e) {
92  } // ignored, no include rules
93 
94  mongodb_ = mongodb_client;
95  gridfs_ = mongodb_->database(database_).gridfs_bucket();
96  //gridfs_->setChunkSize(cfg_chunk_size_);
97 
98  adapter_ = new PointCloudAdapter(pcl_manager, logger);
99 
100  std::vector<std::string> pcls = pcl_manager->get_pointcloud_list();
101 
102  std::vector<std::string>::iterator p;
103  std::vector<std::string>::iterator f;
104  for (p = pcls.begin(); p != pcls.end(); ++p) {
105  bool include = includes.empty();
106  if (!include) {
107  for (f = includes.begin(); f != includes.end(); ++f) {
108  if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
109  logger->log_debug(name(), "Include match %s to %s", f->c_str(), p->c_str());
110  include = true;
111  break;
112  }
113  }
114  }
115  if (include) {
116  for (f = excludes.begin(); f != excludes.end(); ++f) {
117  if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
118  logger->log_debug(name(), "Exclude match %s to %s", f->c_str(), p->c_str());
119  include = false;
120  break;
121  }
122  }
123  }
124  if (!include) {
125  logger->log_info(name(), "Excluding point cloud %s", p->c_str());
126  continue;
127  }
128 
129  PointCloudInfo pi;
130 
131  std::string topic_name = std::string("PointClouds.") + *p;
132  size_t pos = 0;
133  while ((pos = topic_name.find_first_of(" -", pos)) != std::string::npos) {
134  topic_name.replace(pos, 1, "_");
135  pos = pos + 1;
136  }
137 
138  pi.topic_name = topic_name;
139 
140  logger->log_info(name(), "MongoLog of point cloud %s", p->c_str());
141 
142  std::string frame_id;
143  unsigned int width, height;
144  bool is_dense;
146  adapter_->get_info(*p, width, height, frame_id, is_dense, fieldinfo);
147  pi.msg.header.frame_id = frame_id;
148  pi.msg.width = width;
149  pi.msg.height = height;
150  pi.msg.is_dense = is_dense;
151  pi.msg.fields.clear();
152  pi.msg.fields.resize(fieldinfo.size());
153  for (unsigned int i = 0; i < fieldinfo.size(); ++i) {
154  pi.msg.fields[i].name = fieldinfo[i].name;
155  pi.msg.fields[i].offset = fieldinfo[i].offset;
156  pi.msg.fields[i].datatype = fieldinfo[i].datatype;
157  pi.msg.fields[i].count = fieldinfo[i].count;
158  }
159 
160  pcls_[*p] = pi;
161  }
162 
163  wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
164  mutex_ = new Mutex();
165 }
166 
167 bool
169 {
170  mutex_->lock();
171  return true;
172 }
173 
174 void
176 {
177  delete adapter_;
178  delete wait_;
179  delete mutex_;
180 }
181 
182 void
184 {
185  MutexLocker lock(mutex_);
186  fawkes::Time loop_start(clock);
187  wait_->mark_start();
188  std::map<std::string, PointCloudInfo>::iterator p;
189  unsigned int num_stored = 0;
190  for (p = pcls_.begin(); p != pcls_.end(); ++p) {
191  PointCloudInfo &pi = p->second;
192  std::string frame_id;
193  unsigned int width, height;
194  void * point_data;
195  size_t point_size, num_points;
196  fawkes::Time time;
197  adapter_->get_data(
198  p->first, frame_id, width, height, time, &point_data, point_size, num_points);
199  size_t data_size = point_size / sizeof(uint8_t) * num_points;
200 
201  if (pi.last_sent != time) {
202  pi.last_sent = time;
203 
205 
206  using namespace bsoncxx::builder;
207  basic::document document;
208  std::stringstream name;
209  name << pi.topic_name << "_" << time.in_msec();
210  auto uploader = gridfs_.open_upload_stream(name.str());
211  uploader.write((uint8_t *)point_data, data_size);
212  auto result = uploader.close();
213  document.append(basic::kvp("timestamp", static_cast<int64_t>(time.in_msec())));
214  document.append(basic::kvp("pointcloud", [&](basic::sub_document subdoc) {
215  subdoc.append(basic::kvp("frame_id", pi.msg.header.frame_id));
216  subdoc.append(basic::kvp("is_dense", pi.msg.is_dense));
217  subdoc.append(basic::kvp("width", static_cast<int32_t>(width)));
218  subdoc.append(basic::kvp("height", static_cast<int32_t>(height)));
219  subdoc.append(basic::kvp("point_size", static_cast<int32_t>(point_size)));
220  subdoc.append(basic::kvp("num_points", static_cast<int32_t>(num_points)));
221  // TODO: We store the ID, is this correct?
222  subdoc.append(basic::kvp("data", result.id()));
223  subdoc.append(basic::kvp("field_info", [pi](basic::sub_array array) {
224  for (uint i = 0; i < pi.msg.fields.size(); i++) {
225  basic::document field_info_doc;
226  field_info_doc.append(basic::kvp("name", pi.msg.fields[i].name));
227  field_info_doc.append(
228  basic::kvp("offset", static_cast<int32_t>(pi.msg.fields[i].offset)));
229  field_info_doc.append(
230  basic::kvp("datatype", static_cast<int32_t>(pi.msg.fields[i].datatype)));
231  field_info_doc.append(
232  basic::kvp("count", static_cast<int32_t>(pi.msg.fields[i].count)));
233  array.append(field_info_doc);
234  }
235  }));
236  }));
237 
238  try {
239  mongodb_->database(database_)[pi.topic_name].insert_one(document.view());
240  ++num_stored;
241  } catch (operation_exception &e) {
242  logger->log_warn(this->name(),
243  "Failed to insert into %s: %s",
244  collection_.c_str(),
245  e.what());
246  }
247 
248  fawkes::Time end(clock);
249  float diff = (end - &start) * 1000.;
250  logger->log_debug(this->name(),
251  "Stored point cloud %s (time %li) in %.1f ms",
252  p->first.c_str(),
253  time.in_msec(),
254  diff);
255 
256  } else {
257  logger->log_debug(this->name(), "Point cloud %s did not change", p->first.c_str());
258  //adapter_->close(p->first);
259  }
260  }
261  mutex_->unlock();
262  // -1 to subtract "NO PARENT" pseudo cache
263  fawkes::Time loop_end(clock);
264  logger->log_debug(name(),
265  "Stored %u of %zu point clouds in %.1f ms",
266  num_stored,
267  pcls_.size(),
268  (loop_end - &loop_start) * 1000.);
269 
270  if (cfg_flush_after_write_) {
271  // flush database
272  using namespace bsoncxx::builder;
273  basic::document flush_cmd;
274  flush_cmd.append(basic::kvp("fsync", 1));
275  flush_cmd.append(basic::kvp("async", 1));
276  auto reply = mongodb_client->database("admin").run_command(flush_cmd.view());
277  if (reply.view()["ok"].get_double() != 1) {
278  logger->log_warn(name(),
279  "fsync error: %s",
280  reply.view()["errmsg"].get_utf8().value.to_string().c_str());
281  }
282  }
283  wait_->wait();
284 }
fawkes::Mutex::lock
void lock()
Lock this mutex.
Definition: mutex.cpp:91
MongoLogPointCloudThread::prepare_finalize_user
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
Definition: mongodb_log_pcl_thread.cpp:168
fawkes::Time::in_msec
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:233
fawkes::MongoDBAspect
Definition: mongodb.h:41
fawkes::Thread::set_prepfin_conc_loop
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:720
fawkes::Mutex
Definition: mutex.h:36
PointCloudAdapter::V_PointFieldInfo
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
Definition: pcl_adapter.h:65
fawkes::MongoDBAspect::mongodb_client
mongocxx::client * mongodb_client
Definition: mongodb.h:57
fawkes::Logger::log_info
virtual void log_info(const char *component, const char *format,...)=0
fawkes::MutexLocker
Definition: mutex_locker.h:37
fawkes::Thread::name
const char * name() const
Definition: thread.h:99
fawkes::ClockAspect::clock
Clock * clock
Definition: clock.h:53
fawkes::Mutex::unlock
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
MongoLogPointCloudThread::init
virtual void init()
Initialize the thread.
Definition: mongodb_log_pcl_thread.cpp:58
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:50
fawkes::TimeWait::wait
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:82
fawkes
MongoLogPointCloudThread::MongoLogPointCloudThread
MongoLogPointCloudThread()
Constructor.
Definition: mongodb_log_pcl_thread.cpp:46
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
fawkes::Configuration::get_strings
virtual std::vector< std::string > get_strings(const char *path)=0
fawkes::TimeWait::mark_start
void mark_start()
Mark start of loop.
Definition: wait.cpp:72
fawkes::ConfigurableAspect::config
Configuration * config
Definition: configurable.h:50
fawkes::PointCloudManager::get_pointcloud_list
std::vector< std::string > get_pointcloud_list() const
Get list of point cloud IDs.
Definition: pointcloud_manager.cpp:95
fawkes::Time
Definition: time.h:96
PointCloudAdapter
Definition: pcl_adapter.h:37
fawkes::Configuration::get_float
virtual float get_float(const char *path)=0
fawkes::Thread
Definition: thread.h:44
fawkes::TimeWait
Definition: wait.h:36
PointCloudAdapter::get_data
void get_data(const std::string &id, std::string &frame_id, unsigned int &width, unsigned int &height, fawkes::Time &time, void **data_ptr, size_t &point_size, size_t &num_points)
Get current data of point cloud.
Definition: pcl_adapter.cpp:183
PointCloudAdapter::get_info
void get_info(const std::string &id, unsigned int &width, unsigned int &height, std::string &frame_id, bool &is_dense, V_PointFieldInfo &pfi)
Get info about point cloud.
Definition: pcl_adapter.cpp:141
fawkes::Thread::start
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:503
fawkes::Configuration::get_uint
virtual unsigned int get_uint(const char *path)=0
MongoLogPointCloudThread::finalize
virtual void finalize()
Finalize the thread.
Definition: mongodb_log_pcl_thread.cpp:175
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
MongoLogPointCloudThread::loop
virtual void loop()
Code to execute in the thread.
Definition: mongodb_log_pcl_thread.cpp:183
fawkes::PointCloudAspect::pcl_manager
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:46
MongoLogPointCloudThread::~MongoLogPointCloudThread
virtual ~MongoLogPointCloudThread()
Destructor.
Definition: mongodb_log_pcl_thread.cpp:53
fawkes::Logger::log_debug
virtual void log_debug(const char *component, const char *format,...)=0
fawkes::Exception
Definition: exception.h:39