Fawkes API  Fawkes Development Version
event_trigger_manager.cpp
1 /***************************************************************************
2  * event_trigger_manager.cpp - Manager to realize triggers on events in the robot memory
3  *
4  *
5  * Created: 3:53:46 PM 2016
6  * Copyright 2016 Frederik Zwilling
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "event_trigger_manager.h"
23 
24 #ifdef USE_TIMETRACKER
25 # include <utils/time/tracker.h>
26 #endif
27 #include <plugins/mongodb/utils.h>
28 #include <utils/time/tracker_macros.h>
29 
30 #include <boost/bind.hpp>
31 #include <bsoncxx/json.hpp>
32 #include <mongocxx/exception/operation_exception.hpp>
33 #include <mongocxx/exception/query_exception.hpp>
34 
35 using namespace fawkes;
36 using namespace mongocxx;
37 
38 /** @class EventTriggerManager event_trigger_manager.h
39  * Manager to realize triggers on events in the robot memory
40  * @author Frederik Zwilling
41  */
42 
43 /**
44  * Constructor for class managing EventTriggers
45  * @param logger Logger
46  * @param config Configuration
47  * @param mongo_connection_manager MongoDBConnCreator
48  */
50  Configuration * config,
51  MongoDBConnCreator *mongo_connection_manager)
52 : cfg_debug_(false)
53 {
54  logger_ = logger;
55  config_ = config;
56  mongo_connection_manager_ = mongo_connection_manager;
57 
58  con_local_ = mongo_connection_manager_->create_client("robot-memory-local");
59  if (config_->exists("/plugins/mongodb/clients/robot-memory-distributed/enabled")
60  && config_->get_bool("/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
61  con_replica_ = mongo_connection_manager_->create_client("robot-memory-distributed");
62  }
63 
64  // create connections to running mongod instances because only there
65  std::string local_db = config_->get_string("/plugins/robot-memory/database");
66  dbnames_local_.push_back(local_db);
67  dbnames_distributed_ = config_->get_strings("/plugins/robot-memory/distributed-db-names");
68 
69  mutex_ = new Mutex();
70 
71  try {
72  cfg_debug_ = config->get_bool("/plugins/robot-memory/more-debug-output");
73  } catch (...) {
74  }
75 #ifdef USE_TIMETRACKER
76  tt_ = new fawkes::TimeTracker();
77  ttc_trigger_loop_ = tt_->add_class("RM Trigger Trigger Loop");
78  ttc_callback_loop_ = tt_->add_class("RM Trigger Callback Loop");
79  ttc_callback_ = tt_->add_class("RM Trigger Single Callback");
80  ttc_reinit_ = tt_->add_class("RM Trigger Reinit");
81 #endif
82 }
83 
84 EventTriggerManager::~EventTriggerManager()
85 {
86  for (EventTrigger *trigger : triggers) {
87  delete trigger;
88  }
89  mongo_connection_manager_->delete_client(con_local_);
90  mongo_connection_manager_->delete_client(con_replica_);
91  delete mutex_;
92 #ifdef USE_TIMETRACKER
93  delete tt_;
94 #endif
95 }
96 
97 void
98 EventTriggerManager::check_events()
99 {
100  //lock to be thread safe (e.g. registration during checking)
101  MutexLocker lock(mutex_);
102 
103  TIMETRACK_START(ttc_trigger_loop_);
104  for (EventTrigger *trigger : triggers) {
105  bool ok = true;
106  try {
107  auto next = trigger->change_stream.begin();
108  TIMETRACK_START(ttc_callback_loop_);
109  while (next != trigger->change_stream.end()) {
110  //logger_->log_warn(name.c_str(), "Triggering: %s", bsoncxx::to_json(*next).c_str());
111  //actually call the callback function
112  TIMETRACK_START(ttc_callback_);
113  trigger->callback(*next);
114  next++;
115  TIMETRACK_END(ttc_callback_);
116  }
117  TIMETRACK_END(ttc_callback_loop_);
118  } catch (operation_exception &e) {
119  logger_->log_error(name.c_str(), "Error while reading the change stream");
120  ok = false;
121  }
122  // TODO Do we still need to check whether the cursor is dead?
123  // (with old driver: (!ok || trigger->oplog_cursor->isDead()))
124  if (!ok) {
125  TIMETRACK_START(ttc_reinit_);
126  if (cfg_debug_)
127  logger_->log_debug(name.c_str(), "Tailable Cursor is dead, requerying");
128  //check if collection is local or replicated
129  client *con;
130  if (std::find(dbnames_distributed_.begin(),
131  dbnames_distributed_.end(),
132  get_db_name(trigger->ns_db))
133  != dbnames_distributed_.end()) {
134  con = con_replica_;
135  } else {
136  con = con_local_;
137  }
138  auto db_coll_pair = split_db_collection_string(trigger->ns);
139  auto collection = con->database(db_coll_pair.first)[db_coll_pair.second];
140  try {
141  trigger->change_stream = create_change_stream(collection, trigger->filter_query.view());
142  } catch (mongocxx::query_exception &e) {
143  logger_->log_error(name.c_str(),
144  "Failed to create change stream, broken trigger for collection %s: %s",
145  trigger->ns.c_str(),
146  e.what());
147  }
148  TIMETRACK_END(ttc_reinit_);
149  }
150  }
151  TIMETRACK_END(ttc_trigger_loop_);
152 #ifdef USE_TIMETRACKER
153  if (++tt_loopcount_ % 5 == 0) {
154  tt_->print_to_stdout();
155  }
156 #endif
157 }
158 
159 /**
160  * Remove a previously registered trigger
161  * @param trigger Pointer to the trigger to remove
162  */
163 void
165 {
166  triggers.remove(trigger);
167  delete trigger;
168 }
169 
170 change_stream
171 EventTriggerManager::create_change_stream(mongocxx::collection &coll, bsoncxx::document::view query)
172 {
173  // TODO Allow non-empty pipelines
174  // @body We used to have a regular mongodb query as input to the oplog, but
175  // now this needs to be a pipeline. Adapt the change stream creation and the
176  // robot-memory API so we also accept a non-empty pipeline.
177  if (!query.empty()) {
178  throw fawkes::Exception("Non-empty queries are not implemented!");
179  }
180  mongocxx::options::change_stream opts;
181  opts.full_document("updateLookup");
182  opts.max_await_time(std::chrono::milliseconds(1));
183  auto res = coll.watch(opts);
184  // Go to end of change stream to get new updates from then on.
185  auto it = res.begin();
186  while (std::next(it) != res.end()) {}
187 
188  return res;
189 }
190 
191 /** Split database name from namespace.
192  * @param ns namespace, format db.collection
193  * @return db part of @p ns
194  */
195 std::string
196 EventTriggerManager::get_db_name(const std::string &ns)
197 {
198  std::string::size_type dot_pos = ns.find(".");
199  if (dot_pos == std::string::npos) {
200  return "";
201  } else {
202  return ns.substr(0, dot_pos);
203  }
204 }
EventTriggerManager::remove_trigger
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
Definition: event_trigger_manager.cpp:163
fawkes::MongoDBConnCreator::create_client
virtual mongocxx::client * create_client(const std::string &config_name="")=0
Create a new MongoDB client.
fawkes::Mutex
Definition: mutex.h:36
EventTriggerManager::EventTriggerManager
EventTriggerManager(fawkes::Logger *logger, fawkes::Configuration *config, fawkes::MongoDBConnCreator *mongo_connection_manager)
Constructor for class managing EventTriggers.
Definition: event_trigger_manager.cpp:48
fawkes::MongoDBConnCreator
Definition: mongodb_conncreator.h:40
fawkes::Configuration::get_bool
virtual bool get_bool(const char *path)=0
fawkes::MutexLocker
Definition: mutex_locker.h:37
EventTrigger
Definition: event_trigger.h:30
fawkes::Configuration
Definition: config.h:68
fawkes::Logger::log_error
virtual void log_error(const char *component, const char *format,...)=0
fawkes::Logger
Definition: logger.h:40
EventTriggerManager::get_db_name
static std::string get_db_name(const std::string &ns)
Split database name from namespace.
Definition: event_trigger_manager.cpp:195
fawkes
fawkes::Configuration::get_strings
virtual std::vector< std::string > get_strings(const char *path)=0
fawkes::TimeTracker
Definition: tracker.h:40
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
fawkes::MongoDBConnCreator::delete_client
virtual void delete_client(mongocxx::client *client)=0
Delete a client.
fawkes::Configuration::exists
virtual bool exists(const char *path)=0
fawkes::Logger::log_debug
virtual void log_debug(const char *component, const char *format,...)=0
fawkes::Exception
Definition: exception.h:39