23 #include "mongodb_log_pcl_thread.h"
26 #include <core/threading/mutex_locker.h>
27 #include <utils/time/wait.h>
31 #include <mongocxx/client.hpp>
32 #include <mongocxx/exception/operation_exception.hpp>
33 #include <mongocxx/gridfs/uploader.hpp>
37 using namespace mongocxx;
67 cfg_storage_interval_ =
config->
get_float(
"/plugins/mongodb-log/pointclouds/storage-interval");
69 cfg_chunk_size_ = 2097152;
71 cfg_chunk_size_ =
config->
get_uint(
"/plugins/mongodb-log/pointclouds/chunk-size");
76 cfg_flush_after_write_ =
false;
78 cfg_flush_after_write_ =
config->
get_uint(
"/plugins/mongodb-log/pointclouds/flush-after-write");
82 std::vector<std::string> includes;
87 std::vector<std::string> excludes;
95 gridfs_ = mongodb_->database(database_).gridfs_bucket();
102 std::vector<std::string>::iterator p;
103 std::vector<std::string>::iterator f;
104 for (p = pcls.begin(); p != pcls.end(); ++p) {
105 bool include = includes.empty();
107 for (f = includes.begin(); f != includes.end(); ++f) {
108 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
116 for (f = excludes.begin(); f != excludes.end(); ++f) {
117 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
131 std::string topic_name = std::string(
"PointClouds.") + *p;
133 while ((pos = topic_name.find_first_of(
" -", pos)) != std::string::npos) {
134 topic_name.replace(pos, 1,
"_");
138 pi.topic_name = topic_name;
142 std::string frame_id;
143 unsigned int width, height;
146 adapter_->
get_info(*p, width, height, frame_id, is_dense, fieldinfo);
147 pi.msg.header.frame_id = frame_id;
148 pi.msg.width = width;
149 pi.msg.height = height;
150 pi.msg.is_dense = is_dense;
151 pi.msg.fields.clear();
152 pi.msg.fields.resize(fieldinfo.size());
153 for (
unsigned int i = 0; i < fieldinfo.size(); ++i) {
154 pi.msg.fields[i].name = fieldinfo[i].name;
155 pi.msg.fields[i].offset = fieldinfo[i].offset;
156 pi.msg.fields[i].datatype = fieldinfo[i].datatype;
157 pi.msg.fields[i].count = fieldinfo[i].count;
163 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
164 mutex_ =
new Mutex();
188 std::map<std::string, PointCloudInfo>::iterator p;
189 unsigned int num_stored = 0;
190 for (p = pcls_.begin(); p != pcls_.end(); ++p) {
191 PointCloudInfo &pi = p->second;
192 std::string frame_id;
193 unsigned int width, height;
195 size_t point_size, num_points;
198 p->first, frame_id, width, height, time, &point_data, point_size, num_points);
199 size_t data_size = point_size /
sizeof(uint8_t) * num_points;
201 if (pi.last_sent != time) {
206 using namespace bsoncxx::builder;
207 basic::document document;
208 std::stringstream
name;
210 auto uploader = gridfs_.open_upload_stream(
name.str());
211 uploader.write((uint8_t *)point_data, data_size);
212 auto result = uploader.close();
213 document.append(basic::kvp(
"timestamp", static_cast<int64_t>(time.
in_msec())));
214 document.append(basic::kvp(
"pointcloud", [&](basic::sub_document subdoc) {
215 subdoc.append(basic::kvp(
"frame_id", pi.msg.header.frame_id));
216 subdoc.append(basic::kvp(
"is_dense", pi.msg.is_dense));
217 subdoc.append(basic::kvp(
"width", static_cast<int32_t>(width)));
218 subdoc.append(basic::kvp(
"height", static_cast<int32_t>(height)));
219 subdoc.append(basic::kvp(
"point_size", static_cast<int32_t>(point_size)));
220 subdoc.append(basic::kvp(
"num_points", static_cast<int32_t>(num_points)));
222 subdoc.append(basic::kvp(
"data", result.id()));
223 subdoc.append(basic::kvp(
"field_info", [pi](basic::sub_array array) {
224 for (uint i = 0; i < pi.msg.fields.size(); i++) {
225 basic::document field_info_doc;
226 field_info_doc.append(basic::kvp(
"name", pi.msg.fields[i].name));
227 field_info_doc.append(
228 basic::kvp(
"offset", static_cast<int32_t>(pi.msg.fields[i].offset)));
229 field_info_doc.append(
230 basic::kvp(
"datatype", static_cast<int32_t>(pi.msg.fields[i].datatype)));
231 field_info_doc.append(
232 basic::kvp(
"count", static_cast<int32_t>(pi.msg.fields[i].count)));
233 array.append(field_info_doc);
239 mongodb_->database(database_)[pi.topic_name].insert_one(document.view());
241 }
catch (operation_exception &e) {
243 "Failed to insert into %s: %s",
249 float diff = (end - &
start) * 1000.;
251 "Stored point cloud %s (time %li) in %.1f ms",
265 "Stored %u of %zu point clouds in %.1f ms",
268 (loop_end - &loop_start) * 1000.);
270 if (cfg_flush_after_write_) {
272 using namespace bsoncxx::builder;
273 basic::document flush_cmd;
274 flush_cmd.append(basic::kvp(
"fsync", 1));
275 flush_cmd.append(basic::kvp(
"async", 1));
276 auto reply =
mongodb_client->database(
"admin").run_command(flush_cmd.view());
277 if (reply.view()[
"ok"].get_double() != 1) {
280 reply.view()[
"errmsg"].get_utf8().value.to_string().c_str());