22 #include "pcl_db_store_thread.h"
24 #include <blackboard/utils/on_message_waker.h>
25 #include <interfaces/PclDatabaseStoreInterface.h>
26 #include <pcl_utils/pcl_adapter.h>
29 #include <bsoncxx/builder/basic/document.hpp>
30 #include <mongocxx/client.hpp>
31 #include <mongocxx/exception/operation_exception.hpp>
32 #include <mongocxx/gridfs/bucket.hpp>
33 #include <mongocxx/gridfs/uploader.hpp>
35 #define CFG_PREFIX "/perception/pcl-db/"
38 using namespace mongocxx;
88 if (store_if_->msgq_empty())
91 if (PclDatabaseStoreInterface::StoreMessage *msg = store_if_->msgq_first_safe(msg)) {
92 store_if_->set_final(
false);
93 store_if_->set_msgid(msg->id());
94 store_if_->set_error(
"");
97 std::string msg_database = msg->database();
98 std::string msg_collection = msg->collection();
102 std::string database = (msg_database !=
"") ? msg_database : cfg_database_;
103 std::string collection = database +
".";
104 if (msg_collection ==
"") {
105 collection +=
"pcls";
106 }
else if (msg_collection.compare(0, 3,
"fs.") == 0) {
107 errmsg =
"Passed in collection uses GridFS namespace";
110 collection += msg->collection();
114 store_pointcloud(msg->pcl_id(), database, collection, errmsg);
116 store_if_->set_error(errmsg.c_str());
117 store_if_->set_final(
true);
122 store_if_->msgq_pop();
126 PointCloudDBStoreThread::store_pointcloud(std::string pcl_id,
127 std::string database,
128 std::string collection,
132 errmsg =
"PointCloud does not exist";
136 std::string frame_id;
137 unsigned int width, height;
140 size_t point_size, num_points;
155 size_t data_size = point_size * num_points;
159 std::stringstream
name;
161 auto uploader = gridfs.open_upload_stream(
name.str());
162 uploader.write(static_cast<uint8_t *>(point_data), data_size);
163 auto result = uploader.close();
164 using namespace bsoncxx::builder;
165 basic::document document;
166 document.append(basic::kvp(
"timestamp", static_cast<int64_t>(time.
in_msec())));
167 document.append(basic::kvp(
"pointcloud", [&](basic::sub_document subdoc) {
168 subdoc.append(basic::kvp(
"frame_id", frame_id));
169 subdoc.append(basic::kvp(
"is_dense", is_dense));
170 subdoc.append(basic::kvp(
"width", static_cast<int64_t>(width)));
171 subdoc.append(basic::kvp(
"height", static_cast<int64_t>(height)));
172 subdoc.append(basic::kvp(
"point_size", static_cast<int64_t>(point_size)));
173 subdoc.append(basic::kvp(
"num_points", static_cast<int64_t>(num_points)));
174 subdoc.append(basic::kvp(
"data", [&](basic::sub_document datadoc) {
175 datadoc.append(basic::kvp(
"id", result.id()));
176 datadoc.append(basic::kvp(
"filename",
name.str()));
178 subdoc.append(basic::kvp(
"field_info", [fieldinfo](basic::sub_array fi_array) {
179 for (
auto fi : fieldinfo) {
180 basic::document fi_doc;
181 fi_doc.append(basic::kvp(
"name", fi.name));
182 fi_doc.append(basic::kvp(
"offset", static_cast<int64_t>(fi.offset)));
183 fi_doc.append(basic::kvp(
"datatype", fi.datatype));
184 fi_doc.append(basic::kvp(
"count", static_cast<int64_t>(fi.count)));
190 mongodb_client->database(database)[collection].insert_one(document.view());
191 }
catch (mongocxx::operation_exception &e) {
192 logger->
log_warn(this->
name(),
"Failed to insert into %s: %s", collection.c_str(), e.what());