22 #include "mongodb_log_tf_thread.h"
24 #include <core/threading/mutex_locker.h>
25 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
26 #include <tf/time_cache.h>
27 #include <utils/time/wait.h>
29 #include <bsoncxx/builder/basic/document.hpp>
31 #include <mongocxx/client.hpp>
32 #include <mongocxx/exception/operation_exception.hpp>
34 using namespace mongocxx;
47 :
Thread(
"MongoLogTransformsThread",
Thread::OPMODE_CONTINUOUS),
71 collection_ =
config->
get_string(
"/plugins/mongodb-log/transforms/collection");
73 logger->
log_info(
name(),
"No transforms collection configured, using %s", collection_.c_str());
75 collection_ = database_ +
"." + collection_;
77 cfg_storage_interval_ =
config->
get_float(
"/plugins/mongodb-log/transforms/storage-interval");
79 if (cfg_storage_interval_ <= 0.) {
84 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
108 std::vector<fawkes::Time> tf_range_start;
109 std::vector<fawkes::Time> tf_range_end;
114 std::vector<tf::TimeCacheInterfacePtr> copies(caches.size(), tf::TimeCacheInterfacePtr());
116 const size_t n_caches = caches.size();
119 if (last_tf_range_end_.size() != n_caches) {
120 last_tf_range_end_.resize(n_caches,
fawkes::Time(0, 0));
123 unsigned int num_transforms = 0;
124 unsigned int num_upd_caches = 0;
126 for (
size_t i = 0; i < n_caches; ++i) {
128 tf_range_end[i] = caches[i]->get_latest_timestamp();
129 if (last_tf_range_end_[i] != tf_range_end[i]) {
131 if (!tf_range_end[i].is_zero()) {
132 tf_range_start[i] = tf_range_end[i] - cfg_storage_interval_;
133 if (last_tf_range_end_[i] > tf_range_start[i]) {
134 tf_range_start[i] = last_tf_range_end_[i];
137 copies[i] = caches[i]->clone(tf_range_start[i]);
138 last_tf_range_end_[i] = tf_range_end[i];
140 num_transforms += copies[i]->get_list_length();
146 store(copies, tf_range_start, tf_range_end);
152 "%u transforms for %u updated frames stored in %.1f ms",
155 (loop_end - &loop_start) * 1000.);
160 MongoLogTransformsThread::store(std::vector<tf::TimeCacheInterfacePtr> &caches,
161 std::vector<fawkes::Time> & from,
162 std::vector<fawkes::Time> & to)
166 for (
size_t i = 0; i < caches.size(); ++i) {
167 tf::TimeCacheInterfacePtr tc = caches[i];
171 using namespace bsoncxx::builder;
172 basic::document document;
174 document.append(basic::kvp(
"timestamp", static_cast<int64_t>(from[i].in_msec())));
175 document.append(basic::kvp(
"timestamp_from", static_cast<int64_t>(from[i].in_msec())));
176 document.append(basic::kvp(
"timestamp_to", static_cast<int64_t>(to[i].in_msec())));
177 const tf::TimeCache::L_TransformStorage &storage = tc->get_storage();
179 if (storage.empty()) {
189 document.append(basic::kvp(
"frame", frame_map[storage.front().frame_id]));
190 document.append(basic::kvp(
"child_frame", frame_map[storage.front().child_frame_id]));
197 document.append(basic::kvp(
"transforms", [storage, frame_map](basic::sub_array array) {
198 for (
auto s = storage.begin(); s != storage.end(); ++s) {
214 basic::document tf_doc;
215 tf_doc.append(basic::kvp(
"timestamp", static_cast<int64_t>(s->stamp.in_msec())));
216 tf_doc.append(basic::kvp(
"frame", frame_map[s->frame_id]));
217 tf_doc.append(basic::kvp(
"child_frame", frame_map[s->child_frame_id]));
218 tf_doc.append(basic::kvp(
"rotation", [s](basic::sub_array rot_array) {
219 rot_array.append(s->rotation.x());
220 rot_array.append(s->rotation.y());
221 rot_array.append(s->rotation.z());
222 rot_array.append(s->rotation.w());
224 tf_doc.append(basic::kvp(
"translation", [s](basic::sub_array trans_array) {
225 trans_array.append(s->translation.x());
226 trans_array.append(s->translation.y());
227 trans_array.append(s->translation.z());
229 array.append(tf_doc);
234 mongodb_client->database(database_)[collection_].insert_one(document.view());
235 }
catch (operation_exception &e) {
238 }
catch (std::exception &e) {