23 #include "mongodb_log_bb_thread.h"
25 #include <core/threading/mutex_locker.h>
26 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
30 #include <mongocxx/client.hpp>
31 #include <mongocxx/exception/operation_exception.hpp>
33 using namespace mongocxx;
66 std::vector<std::string> includes;
76 if (includes.empty()) {
77 includes.push_back(
"*");
80 std::vector<std::string>::iterator i;
81 std::vector<std::string>::iterator e;
82 for (i = includes.begin(); i != includes.end(); ++i) {
85 std::list<Interface *> current_interfaces =
88 std::list<Interface *>::iterator i;
89 for (i = current_interfaces.begin(); i != current_interfaces.end(); ++i) {
91 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
92 if (fnmatch(e->c_str(), (*i)->id(), 0) != FNM_NOMATCH) {
104 listeners_[(*i)->uid()] =
105 new InterfaceListener(
blackboard, *i, mc, database_, collections_,
logger, now_);
117 std::map<std::string, InterfaceListener *>::iterator i;
118 for (i = listeners_.begin(); i != listeners_.end(); ++i) {
119 client *mc = i->second->mongodb_client();
137 std::vector<std::string>::iterator e;
138 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
139 if (fnmatch(e->c_str(), id, 0) != FNM_NOMATCH) {
140 logger->
log_debug(name(),
"Ignoring excluded interface '%s::%s'", type,
id);
146 Interface *
interface = blackboard->open_for_reading(type, id);
147 if (listeners_.find(interface->uid()) == listeners_.end()) {
148 logger->
log_debug(name(),
"Opening new %s", interface->uid());
149 client *mc = mongodb_connmgr->create_client();
150 listeners_[interface->uid()] =
151 new InterfaceListener(blackboard, interface, mc, database_, collections_, logger, now_);
153 logger->
log_warn(name(),
"Interface %s already opened", interface->uid());
154 blackboard->
close(interface);
157 logger->
log_warn(name(),
"Failed to open interface %s::%s, exception follows", type,
id);
171 MongoLogBlackboardThread::InterfaceListener::InterfaceListener(
BlackBoard * blackboard,
174 std::string & database,
183 interface_ = interface;
189 std::string
id = interface->
id();
191 while ((pos =
id.find_first_of(
" -", pos)) != std::string::npos) {
192 id.replace(pos, 1,
"_");
195 collection_ = std::string(interface->
type()) +
"." +
id;
196 if (collections_.find(collection_) != collections_.end()) {
197 throw Exception(
"Collection named %s already used, cannot log %s",
202 bbil_add_data_interface(interface);
203 blackboard_->register_listener(
this, BlackBoard::BBIL_FLAG_DATA);
207 MongoLogBlackboardThread::InterfaceListener::~InterfaceListener()
209 blackboard_->unregister_listener(
this);
213 MongoLogBlackboardThread::InterfaceListener::bb_interface_data_changed(
Interface *interface)
throw()
220 using namespace bsoncxx::builder;
221 basic::document document;
222 document.append(basic::kvp(
"timestamp", static_cast<int64_t>(now_->in_msec())));
226 bool is_array = (length > 1);
233 document.append(basic::kvp(key, [bools, length](basic::sub_array subarray) {
234 for (
size_t l = 0; l < length; ++l) {
235 subarray.append(bools[l]);
239 document.append(basic::kvp(key, i.
get_bool()));
246 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
247 for (
size_t l = 0; l < length; ++l) {
248 subarray.append(ints[l]);
252 document.append(basic::kvp(key, i.
get_int8()));
259 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
260 for (
size_t l = 0; l < length; ++l) {
261 subarray.append(ints[l]);
265 document.append(basic::kvp(key, i.
get_uint8()));
272 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
273 for (
size_t l = 0; l < length; ++l) {
274 subarray.append(ints[l]);
278 document.append(basic::kvp(key, i.
get_int16()));
285 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
286 for (
size_t l = 0; l < length; ++l) {
287 subarray.append(ints[l]);
291 document.append(basic::kvp(key, i.
get_uint16()));
298 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
299 for (
size_t l = 0; l < length; ++l) {
300 subarray.append(ints[l]);
304 document.append(basic::kvp(key, i.
get_int32()));
311 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
312 for (
size_t l = 0; l < length; ++l) {
313 subarray.append(static_cast<int64_t>(ints[l]));
317 document.append(basic::kvp(key, static_cast<int64_t>(i.
get_uint32())));
324 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
325 for (
size_t l = 0; l < length; ++l) {
326 subarray.append(ints[l]);
330 document.append(basic::kvp(key, i.
get_int64()));
337 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
338 for (
size_t l = 0; l < length; ++l) {
339 subarray.append(static_cast<int64_t>(ints[l]));
343 document.append(basic::kvp(key, static_cast<int64_t>(i.
get_uint64())));
350 document.append(basic::kvp(key, [floats, length](basic::sub_array subarray) {
351 for (
size_t l = 0; l < length; ++l) {
352 subarray.append(floats[l]);
356 document.append(basic::kvp(key, i.
get_float()));
363 document.append(basic::kvp(key, [doubles, length](basic::sub_array subarray) {
364 for (
size_t l = 0; l < length; ++l) {
365 subarray.append(doubles[l]);
369 document.append(basic::kvp(key, i.
get_double()));
378 document.append(basic::kvp(key, [bytes, length](basic::sub_array subarray) {
379 for (
size_t l = 0; l < length; ++l) {
380 subarray.append(bytes[l]);
384 document.append(basic::kvp(key, i.
get_byte()));
391 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
392 for (
size_t l = 0; l < length; ++l) {
393 subarray.append(ints[l]);
397 document.append(basic::kvp(key, i.
get_enum()));
403 mongodb_->database(database_)[collection_].insert_one(document.view());
404 }
catch (operation_exception &e) {
406 bbil_name(),
"Failed to log to %s.%s: %s", database_.c_str(), collection_.c_str(), e.what());
407 }
catch (std::exception &e) {
408 logger_->log_warn(bbil_name(),
409 "Failed to log to %s.%s: %s (*)",