Fawkes API  Fawkes Development Version
robot_memory.cpp
1 /***************************************************************************
2  * robot_memory.cpp - Class for storing and querying information in the RobotMemory
3  *
4  * Created: Aug 23, 2016 1:34:32 PM 2016
5  * Copyright 2016 Frederik Zwilling
6  * 2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "robot_memory.h"
23 
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/RobotMemoryInterface.h>
27 #include <plugins/mongodb/utils.h>
28 #include <utils/misc/string_conversions.h>
29 #include <utils/misc/string_split.h>
30 #include <utils/system/hostinfo.h>
31 #ifdef USE_TIMETRACKER
32 # include <utils/time/tracker.h>
33 #endif
34 #include <utils/time/tracker_macros.h>
35 
36 #include <bsoncxx/builder/basic/document.hpp>
37 #include <chrono>
38 #include <mongocxx/client.hpp>
39 #include <mongocxx/exception/operation_exception.hpp>
40 #include <mongocxx/read_preference.hpp>
41 #include <string>
42 #include <thread>
43 
44 using namespace fawkes;
45 using namespace mongocxx;
46 using namespace bsoncxx;
47 
48 /** @class RobotMemory "robot_memory.h"
49  * Access to the robot memory based on mongodb.
50  * Using this class, you can query/insert/remove/update information in
51  * the robot memory. Furthermore, you can register trigger to get
52  * notified when something was changed in the robot memory matching
53  * your query and you can access computables, which are on demand
54  * computed information, by registering the computables and then
55  * querying as if the information would already be in the database.
56  * @author Frederik Zwilling
57  */
58 
59 /**
60  * Robot Memory Constructor with objects of the thread
61  * @param config Fawkes config
62  * @param logger Fawkes logger
63  * @param clock Fawkes clock
64  * @param mongo_connection_manager MongoDBConnCreator to create client connections to the shared and local db
65  * @param blackboard Fawkes blackboard
66  */
68  fawkes::Logger * logger,
69  fawkes::Clock * clock,
70  fawkes::MongoDBConnCreator *mongo_connection_manager,
71  fawkes::BlackBoard * blackboard)
72 {
73  config_ = config;
74  logger_ = logger;
75  clock_ = clock;
76  mongo_connection_manager_ = mongo_connection_manager;
77  blackboard_ = blackboard;
78  mongodb_client_local_ = nullptr;
79  mongodb_client_distributed_ = nullptr;
80  debug_ = false;
81 }
82 
83 RobotMemory::~RobotMemory()
84 {
85  mongo_connection_manager_->delete_client(mongodb_client_local_);
86  mongo_connection_manager_->delete_client(mongodb_client_distributed_);
87  delete trigger_manager_;
88  blackboard_->close(rm_if_);
89 #ifdef USE_TIMETRACKER
90  delete tt_;
91 #endif
92 }
93 
94 void
95 RobotMemory::init()
96 {
97  //load config values
98  log("Started RobotMemory");
99  default_collection_ = "robmem.test";
100  try {
101  default_collection_ = config_->get_string("/plugins/robot-memory/default-collection");
102  } catch (Exception &) {
103  }
104  try {
105  debug_ = config_->get_bool("/plugins/robot-memory/more-debug-output");
106  } catch (Exception &) {
107  }
108  database_name_ = "robmem";
109  try {
110  database_name_ = config_->get_string("/plugins/robot-memory/database");
111  } catch (Exception &) {
112  }
113  distributed_dbs_ = config_->get_strings("/plugins/robot-memory/distributed-db-names");
114  cfg_startup_grace_period_ = 10;
115  try {
116  cfg_startup_grace_period_ = config_->get_uint("/plugins/robot-memory/startup-grace-period");
117  } catch (Exception &) {
118  } // ignored, use default
119 
120  cfg_coord_database_ = config_->get_string("/plugins/robot-memory/coordination/database");
121  cfg_coord_mutex_collection_ =
122  config_->get_string("/plugins/robot-memory/coordination/mutex-collection");
123 
124  using namespace std::chrono_literals;
125 
126  //initiate mongodb connections:
127  log("Connect to local mongod");
128  unsigned int startup_tries = 0;
129  for (; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
130  // TODO if the last try fails, the client remains uninitialized
131  try {
132  mongodb_client_local_ = mongo_connection_manager_->create_client("robot-memory-local");
133  break;
134  } catch (fawkes::Exception &) {
135  logger_->log_info(name_, "Waiting for local");
136  std::this_thread::sleep_for(500ms);
137  }
138  }
139 
140  if (config_->exists("/plugins/mongodb/clients/robot-memory-distributed/enabled")
141  && config_->get_bool("/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
142  distributed_ = true;
143  log("Connect to distributed mongod");
144  for (startup_tries = 0; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
145  // TODO if the last try fails, the client remains uninitialized
146  try {
147  mongodb_client_distributed_ =
148  mongo_connection_manager_->create_client("robot-memory-distributed");
149  break;
150  } catch (fawkes::Exception &) {
151  logger_->log_info(name_, "Waiting for distributed");
152  std::this_thread::sleep_for(500ms);
153  }
154  }
155  }
156 
157  //init blackboard interface
158  rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
159  config_->get_string("/plugins/robot-memory/interface-name").c_str());
160  rm_if_->set_error("");
161  rm_if_->set_result("");
162  rm_if_->write();
163 
164  //Setup event trigger and computables manager
165  trigger_manager_ = new EventTriggerManager(logger_, config_, mongo_connection_manager_);
166  computables_manager_ = new ComputablesManager(config_, this);
167 
168  log_deb("Initialized RobotMemory");
169 
170 #ifdef USE_TIMETRACKER
171  tt_ = new TimeTracker();
172  tt_loopcount_ = 0;
173  ttc_events_ = tt_->add_class("RobotMemory Events");
174  ttc_cleanup_ = tt_->add_class("RobotMemory Cleanup");
175 #endif
176 }
177 
178 void
179 RobotMemory::loop()
180 {
181  TIMETRACK_START(ttc_events_);
182  trigger_manager_->check_events();
183  TIMETRACK_END(ttc_events_);
184  TIMETRACK_START(ttc_cleanup_);
185  computables_manager_->cleanup_computed_docs();
186  TIMETRACK_END(ttc_cleanup_);
187 #ifdef USE_TIMETRACKER
188  if (++tt_loopcount_ % 5 == 0) {
189  tt_->print_to_stdout();
190  }
191 #endif
192 }
193 
194 /**
195  * Query information from the robot memory.
196  * @param query The query returned documents have to match (essentially a BSONObj)
197  * @param collection_name The database and collection to query as string (e.g. robmem.worldmodel)
198  * @param query_options Optional options to use to query the database
199  * @return Cursor to get the documents from, NULL for invalid query
200  */
201 cursor
202 RobotMemory::query(document::view query,
203  const std::string & collection_name,
204  mongocxx::options::find query_options)
205 {
206  collection collection = get_collection(collection_name);
207  log_deb(std::string("Executing Query " + to_json(query) + " on collection " + collection_name));
208 
209  //check if computation on demand is necessary and execute Computables
210  computables_manager_->check_and_compute(query, collection_name);
211 
212  //lock (mongo_client not thread safe)
213  MutexLocker lock(mutex_);
214 
215  //actually execute query
216  try {
217  return collection.find(query, query_options);
218  } catch (mongocxx::operation_exception &e) {
219  std::string error =
220  std::string("Error for query ") + to_json(query) + "\n Exception: " + e.what();
221  log(error, "error");
222  throw;
223  }
224 }
225 
226 /**
227  * Aggregation call on the robot memory.
228  * @param pipeline Series of commands defining the aggregation
229  * @param collection The database and collection to query as string (e.g. robmem.worldmodel)
230  * @return Result object
231  */
232 bsoncxx::document::value
233 RobotMemory::aggregate(const std::vector<bsoncxx::document::view> &pipeline,
234  const std::string & collection)
235 {
236  /*
237  client *mongodb_client = get_mongodb_client(collection);
238  log_deb(std::string("Executing Aggregation on collection " + collection));
239 
240  //TODO: check if computation on demand is necessary and execute Computables
241  // that might be complicated because you need to build a query to check against from the fields mentioned in the different parts of the pipeline
242  // A possible solution might be forcing the user to define the $match oject seperately and using it as query to check computables
243 
244  //lock (mongo_client not thread safe)
245  MutexLocker lock(mutex_);
246 
247  //actually execute aggregation as command (in more modern mongo-cxx versions there should be an easier way with a proper aggregate function)
248  BSONObj res;
249  //get db and collection name
250  size_t point_pos = collection.find(".");
251  if (point_pos == std::string::npos) {
252  logger_->log_error(name_, "Collection %s needs to start with 'dbname.'", collection.c_str());
253  return fromjson("{}");
254  }
255  std::string db = collection.substr(0, point_pos);
256  std::string col = collection.substr(point_pos + 1);
257  try {
258  mongodb_client->runCommand(db, BSON("aggregate" << col << "pipeline" << pipeline), res);
259  } catch (DBException &e) {
260  std::string error = std::string("Error for aggregation ") + "\n Exception: " + e.toString();
261  log(error, "error");
262  return fromjson("{}");
263  }
264  return res;
265  */
266  throw Exception("Not implemented");
267 }
268 
269 /**
270  * Inserts a document into the robot memory
271  * @param doc A view of the document to insert
272  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
273  * @return 1: Success 0: Error
274  */
275 int
276 RobotMemory::insert(bsoncxx::document::view doc, const std::string &collection_name)
277 {
278  collection collection = get_collection(collection_name);
279  log_deb(std::string("Inserting " + to_json(doc) + " into collection " + collection_name));
280  //lock (mongo_client not thread safe)
281  MutexLocker lock(mutex_);
282  //actually execute insert
283  try {
284  collection.insert_one(doc);
285  } catch (mongocxx::operation_exception &e) {
286  std::string error = "Error for insert " + to_json(doc) + "\n Exception: " + e.what();
287  log_deb(error, "error");
288  return 0;
289  }
290  //return success
291  return 1;
292 }
293 
294 /** Create an index on a collection.
295  * @param keys The keys document
296  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
297  * @param unique true to create unique index
298  * @return 1: Success 0: Error
299  */
300 int
301 RobotMemory::create_index(bsoncxx::document::view keys,
302  const std::string & collection_name,
303  bool unique)
304 {
305  collection collection = get_collection(collection_name);
306 
307  log_deb(std::string("Creating index " + to_json(keys) + " on collection " + collection_name));
308 
309  //lock (mongo_client not thread safe)
310  MutexLocker lock(mutex_);
311 
312  //actually execute insert
313  try {
314  using namespace bsoncxx::builder::basic;
315  collection.create_index(keys, make_document(kvp("unique", unique)));
316  } catch (operation_exception &e) {
317  std::string error = "Error when creating index " + to_json(keys) + "\n Exception: " + e.what();
318  log_deb(error, "error");
319  return 0;
320  }
321  //return success
322  return 1;
323 }
324 
325 /**
326  * Inserts all document of a vector into the robot memory
327  * @param docs The vector of BSON documents as views
328  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
329  * @return 1: Success 0: Error
330  */
331 int
332 RobotMemory::insert(std::vector<bsoncxx::document::view> docs, const std::string &collection_name)
333 {
334  collection collection = get_collection(collection_name);
335  std::string insert_string = "[";
336  for (auto &&doc : docs) {
337  insert_string += to_json(doc) + ",\n";
338  }
339  insert_string += "]";
340 
341  log_deb(std::string("Inserting vector of documents " + insert_string + " into collection "
342  + collection_name));
343 
344  //lock (mongo_client not thread safe)
345  MutexLocker lock(mutex_);
346 
347  //actually execute insert
348  try {
349  collection.insert_many(docs);
350  } catch (operation_exception &e) {
351  std::string error = "Error for insert " + insert_string + "\n Exception: " + e.what();
352  log_deb(error, "error");
353  return 0;
354  }
355  //return success
356  return 1;
357 }
358 
359 /**
360  * Inserts a document into the robot memory
361  * @param obj_str The document as json string
362  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
363  * @return 1: Success 0: Error
364  */
365 int
366 RobotMemory::insert(const std::string &obj_str, const std::string &collection)
367 {
368  return insert(from_json(obj_str), collection);
369 }
370 
371 /**
372  * Updates documents in the robot memory
373  * @param query The query defining which documents to update
374  * @param update What to change in these documents
375  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
376  * @param upsert Should the update document be inserted if the query returns no documents?
377  * @return 1: Success 0: Error
378  */
379 int
380 RobotMemory::update(const bsoncxx::document::view &query,
381  const bsoncxx::document::view &update,
382  const std::string & collection_name,
383  bool upsert)
384 {
385  collection collection = get_collection(collection_name);
386  log_deb(std::string("Executing Update " + to_json(update) + " for query " + to_json(query)
387  + " on collection " + collection_name));
388 
389  //lock (mongo_client not thread safe)
390  MutexLocker lock(mutex_);
391 
392  //actually execute update
393  try {
394  collection.update_many(query,
395  builder::basic::make_document(
396  builder::basic::kvp("$set", builder::concatenate(update))),
397  options::update().upsert(upsert));
398  } catch (operation_exception &e) {
399  log_deb(std::string("Error for update " + to_json(update) + " for query " + to_json(query)
400  + "\n Exception: " + e.what()),
401  "error");
402  return 0;
403  }
404  //return success
405  return 1;
406 }
407 
408 /**
409  * Updates documents in the robot memory
410  * @param query The query defining which documents to update
411  * @param update_str What to change in these documents as json string
412  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
413  * @param upsert Should the update document be inserted if the query returns no documents?
414  * @return 1: Success 0: Error
415  */
416 int
417 RobotMemory::update(const bsoncxx::document::view &query,
418  const std::string & update_str,
419  const std::string & collection,
420  bool upsert)
421 {
422  return update(query, from_json(update_str), collection, upsert);
423 }
424 
425 /** Atomically update and retrieve document.
426  * @param filter The filter defining the document to update.
427  * If multiple match takes the first one.
428  * @param update Update statement. May only contain update operators!
429  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
430  * @param upsert Should the update document be inserted if the query returns no documents?
431  * @param return_new return the document before (false) or after (true) the update has been applied?
432  * @return document, depending on @p return_new either before or after the udpate has been applied.
433  */
434 document::value
435 RobotMemory::find_one_and_update(const document::view &filter,
436  const document::view &update,
437  const std::string & collection_name,
438  bool upsert,
439  bool return_new)
440 {
441  collection collection = get_collection(collection_name);
442 
443  log_deb(std::string("Executing findOneAndUpdate " + to_json(update) + " for filter "
444  + to_json(filter) + " on collection " + collection_name));
445 
446  MutexLocker lock(mutex_);
447 
448  try {
449  auto res =
450  collection.find_one_and_update(filter,
451  update,
452  options::find_one_and_update().upsert(upsert).return_document(
453  return_new ? options::return_document::k_after
454  : options::return_document::k_before));
455  if (res) {
456  return *res;
457  } else {
458  std::string error = "Error for update " + to_json(update) + " for query " + to_json(filter)
459  + "FindOneAndUpdate unexpectedly did not return a document";
460  log_deb(error, "warn");
461  return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("error", error));
462  }
463  } catch (operation_exception &e) {
464  std::string error = "Error for update " + to_json(update) + " for query " + to_json(filter)
465  + "\n Exception: " + e.what();
466  log_deb(error, "error");
467  return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("error", error));
468  }
469 }
470 
471 /**
472  * Remove documents from the robot memory
473  * @param query Which documents to remove
474  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
475  * @return 1: Success 0: Error
476  */
477 int
478 RobotMemory::remove(const bsoncxx::document::view &query, const std::string &collection_name)
479 {
480  //lock (mongo_client not thread safe)
481  MutexLocker lock(mutex_);
482  collection collection = get_collection(collection_name);
483  log_deb(std::string("Executing Remove " + to_json(query) + " on collection " + collection_name));
484  //actually execute remove
485  try {
486  collection.delete_many(query);
487  } catch (operation_exception &e) {
488  log_deb(std::string("Error for query " + to_json(query) + "\n Exception: " + e.what()),
489  "error");
490  return 0;
491  }
492  //return success
493  return 1;
494 }
495 
496 /**
497  * Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
498  * @param query Which documents to use for the map step
499  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
500  * @param js_map_fun Map function in JavaScript as string
501  * @param js_reduce_fun Reduce function in JavaScript as string
502  * @return BSONObj containing the result
503  */
504 bsoncxx::document::value
505 RobotMemory::mapreduce(const bsoncxx::document::view &query,
506  const std::string & collection,
507  const std::string & js_map_fun,
508  const std::string & js_reduce_fun)
509 {
510  throw Exception("Not implemented");
511  /*
512  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
513  MutexLocker lock(mutex_);
514  log_deb(std::string("Executing MapReduce " + query.toString() + " on collection " + collection
515  + " map: " + js_map_fun + " reduce: " + js_reduce_fun));
516  return mongodb_client->mapreduce(collection, js_map_fun, js_reduce_fun, query);
517  */
518 }
519 
520 /**
521  * Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3.2/reference/method/db.collection.aggregate/)
522  * @param pipeline A sequence of data aggregation operations or stages. See the https://docs.mongodb.com/v3.2/reference/operator/aggregation-pipeline/ for details
523  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
524  * @return Cursor to get the documents from, NULL for invalid pipeline
525  */
526 cursor
527 RobotMemory::aggregate(bsoncxx::document::view pipeline, const std::string &collection)
528 {
529  throw Exception("Not implemented");
530  /**
531  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
532  MutexLocker lock(mutex_);
533  log_deb(std::string("Executing Aggregation pipeline: " + pipeline.toString() + " on collection "
534  + collection));
535 
536  QResCursor cursor;
537  try {
538  cursor = mongodb_client->aggregate(collection, pipeline);
539  } catch (DBException &e) {
540  std::string error =
541  std::string("Error for query ") + pipeline.toString() + "\n Exception: " + e.toString();
542  log(error, "error");
543  return NULL;
544  }
545  return cursor;
546  */
547 }
548 
549 /**
550  * Drop (= remove) a whole collection and all documents inside it
551  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
552  * @return 1: Success 0: Error
553  */
554 int
555 RobotMemory::drop_collection(const std::string &collection_name)
556 {
557  MutexLocker lock(mutex_);
558  collection collection = get_collection(collection_name);
559  log_deb("Dropping collection " + collection_name);
560  collection.drop();
561  return 1;
562 }
563 
564 /**
565  * Remove the whole database of the robot memory and all documents inside
566  * @return 1: Success 0: Error
567  */
568 int
570 {
571  //lock (mongo_client not thread safe)
572  MutexLocker lock(mutex_);
573 
574  log_deb("Clearing whole robot memory");
575  mongodb_client_local_->database(database_name_).drop();
576  return 1;
577 }
578 
579 /**
580  * Restore a previously dumped collection from a directory
581  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
582  * @param directory Directory of the dump
583  * @return 1: Success 0: Error
584  */
585 int
586 RobotMemory::restore_collection(const std::string &collection, const std::string &directory)
587 {
588  std::string coll{std::move(collection)};
589  drop_collection(coll);
590 
591  //lock (mongo_client not thread safe)
592  MutexLocker lock(mutex_);
593 
594  //resolve path to restore
595  if (coll.find(".") == std::string::npos) {
596  log(std::string("Unable to restore collection" + coll), "error");
597  log(std::string("Specify collection like 'db.collection'"), "error");
598  return 0;
599  }
600  std::string path = StringConversions::resolve_path(directory) + "/"
601  + coll.replace(coll.find("."), 1, "/") + ".bson";
602  log_deb(std::string("Restore collection " + collection + " from " + path), "warn");
603 
604  //call mongorestore from folder with initial restores
605  std::string command = "/usr/bin/mongorestore --dir " + path + " --host=127.0.0.1 --quiet";
606  log_deb(std::string("Restore command: " + command), "warn");
607  FILE *bash_output = popen(command.c_str(), "r");
608 
609  //check if output is ok
610  if (!bash_output) {
611  log(std::string("Unable to restore collection" + coll), "error");
612  return 0;
613  }
614  std::string output_string = "";
615  char buffer[100];
616  while (!feof(bash_output)) {
617  if (fgets(buffer, 100, bash_output) == NULL) {
618  break;
619  }
620  output_string += buffer;
621  }
622  pclose(bash_output);
623  if (output_string.find("Failed") != std::string::npos) {
624  log(std::string("Unable to restore collection" + coll), "error");
625  log_deb(output_string, "error");
626  return 0;
627  }
628  return 1;
629 }
630 
631 /**
632  * Dump (= save) a collection to the filesystem to restore it later
633  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
634  * @param directory Directory to dump the collection to
635  * @return 1: Success 0: Error
636  */
637 int
638 RobotMemory::dump_collection(const std::string &collection, const std::string &directory)
639 {
640  //lock (mongo_client not thread safe)
641  MutexLocker lock(mutex_);
642 
643  //resolve path to dump to
644  if (collection.find(".") == std::string::npos) {
645  log(std::string("Unable to dump collection" + collection), "error");
646  log(std::string("Specify collection like 'db.collection'"), "error");
647  return 0;
648  }
649  std::string path = StringConversions::resolve_path(directory);
650  log_deb(std::string("Dump collection " + collection + " into " + path), "warn");
651 
652  //call mongorestore from folder with initial restores
653  std::vector<std::string> split = str_split(collection, '.');
654  std::string command = "/usr/bin/mongodump --out=" + path + " --db=" + split[0]
655  + " --collection=" + split[1] + " --host=127.0.0.1 --quiet";
656  log_deb(std::string("Dump command: " + command), "warn");
657  FILE *bash_output = popen(command.c_str(), "r");
658  //check if output is ok
659  if (!bash_output) {
660  log(std::string("Unable to dump collection" + collection), "error");
661  return 0;
662  }
663  std::string output_string = "";
664  char buffer[100];
665  while (!feof(bash_output)) {
666  if (fgets(buffer, 100, bash_output) == NULL) {
667  break;
668  }
669  output_string += buffer;
670  }
671  pclose(bash_output);
672  if (output_string.find("Failed") != std::string::npos) {
673  log(std::string("Unable to dump collection" + collection), "error");
674  log_deb(output_string, "error");
675  return 0;
676  }
677  return 1;
678 }
679 
680 void
681 RobotMemory::log(const std::string &what, const std::string &info)
682 {
683  if (!info.compare("error"))
684  logger_->log_error(name_, "%s", what.c_str());
685  else if (!info.compare("warn"))
686  logger_->log_warn(name_, "%s", what.c_str());
687  else if (!info.compare("debug"))
688  logger_->log_debug(name_, "%s", what.c_str());
689  else
690  logger_->log_info(name_, "%s", what.c_str());
691 }
692 
693 void
694 RobotMemory::log_deb(const std::string &what, const std::string &level)
695 {
696  if (debug_) {
697  log(what, level);
698  }
699 }
700 
701 void
702 RobotMemory::log_deb(const bsoncxx::document::view &query,
703  const std::string & what,
704  const std::string & level)
705 {
706  if (debug_) {
707  log(query, what, level);
708  }
709 }
710 
711 void
712 RobotMemory::log(const bsoncxx::document::view &query,
713  const std::string & what,
714  const std::string & level)
715 {
716  log(what + " " + to_json(query), level);
717 }
718 
719 /** Check if the given database is a distributed database
720  * @param dbcollection A database collection name pair of the form <dbname>.<collname>
721  * @return true iff the database is distributed database
722  */
723 bool
724 RobotMemory::is_distributed_database(const std::string &dbcollection)
725 {
726  return std::find(distributed_dbs_.begin(),
727  distributed_dbs_.end(),
728  split_db_collection_string(dbcollection).first)
729  != distributed_dbs_.end();
730 }
731 
732 /**
733  * Get the mongodb client associated with the collection (eighter the local or distributed one)
734  * @param collection The collection name in the form "<dbname>.<collname>"
735  * @return A pointer to the client for the database with name <dbname>
736  */
737 client *
738 RobotMemory::get_mongodb_client(const std::string &collection)
739 {
740  if (!distributed_) {
741  return mongodb_client_local_;
742  }
743  if (is_distributed_database(collection)) {
744  return mongodb_client_distributed_;
745  } else {
746  return mongodb_client_local_;
747  }
748 }
749 
750 /**
751  * Get the collection object referred to by the given string.
752  * @param dbcollection The name of the collection in the form <dbname>.<collname>
753  * @return The respective collection object
754  */
755 
756 collection
757 RobotMemory::get_collection(const std::string &dbcollection)
758 {
759  auto db_coll_pair = split_db_collection_string(dbcollection);
760  client *client;
761  if (is_distributed_database(dbcollection)) {
762  client = mongodb_client_distributed_;
763  } else {
764  client = mongodb_client_local_;
765  }
766  return client->database(db_coll_pair.first)[db_coll_pair.second];
767 }
768 
769 /**
770  * Remove a previously registered trigger
771  * @param trigger Pointer to the trigger to remove
772  */
773 void
775 {
776  trigger_manager_->remove_trigger(trigger);
777 }
778 
779 /**
780  * Remove previously registered computable
781  * @param computable The computable to remove
782  */
783 void
785 {
786  computables_manager_->remove_computable(computable);
787 }
788 
789 /** Explicitly create a mutex.
790  * This is an optional step, a mutex is also created automatically when trying
791  * to acquire the lock for the first time. Adding it explicitly may increase
792  * visibility, e.g., in the database. Use it for mutexes which are locked
793  * only very infrequently.
794  * @param name mutex name
795  * @return true if operation was successful, false on failure
796  */
797 bool
798 RobotMemory::mutex_create(const std::string &name)
799 {
800  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
801  using namespace bsoncxx::builder;
802  basic::document insert_doc{};
803  insert_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
804  subdoc.append(basic::kvp("lock-time", true));
805  }));
806  insert_doc.append(basic::kvp("_id", name));
807  insert_doc.append(basic::kvp("locked", false));
808  try {
809  MutexLocker lock(mutex_);
810  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
811  auto write_concern = mongocxx::write_concern();
812  write_concern.majority(std::chrono::milliseconds(0));
813  collection.insert_one(insert_doc.view(), options::insert().write_concern(write_concern));
814  return true;
815  } catch (operation_exception &e) {
816  logger_->log_info(name_, "Failed to create mutex %s: %s", name.c_str(), e.what());
817  return false;
818  }
819 }
820 
821 /** Destroy a mutex.
822  * The mutex is erased from the database. This is done disregarding it's current
823  * lock state.
824  * @param name mutex name
825  * @return true if operation was successful, false on failure
826  */
827 bool
828 RobotMemory::mutex_destroy(const std::string &name)
829 {
830  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
831  using namespace bsoncxx::builder;
832  basic::document destroy_doc;
833  destroy_doc.append(basic::kvp("_id", name));
834  try {
835  MutexLocker lock(mutex_);
836  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
837  auto write_concern = mongocxx::write_concern();
838  write_concern.majority(std::chrono::milliseconds(0));
839  collection.delete_one(destroy_doc.view(),
840  options::delete_options().write_concern(write_concern));
841  return true;
842  } catch (operation_exception &e) {
843  logger_->log_info(name_, "Failed to destroy mutex %s: %s", name.c_str(), e.what());
844  return false;
845  }
846 }
847 
848 /** Try to acquire a lock for a mutex.
849  * This will access the database and atomically find and update (or
850  * insert) a mutex lock. If the mutex has not been created it is added
851  * automatically. If the lock cannot be acquired the function also
852  * returns immediately. There is no blocked waiting for the lock.
853  * @param name mutex name
854  * @param identity string to set as lock-holder
855  * @param force true to force acquisition of the lock, i.e., even if
856  * the lock has already been acquired take ownership (steal the lock).
857  * @return true if operation was successful, false on failure
858  */
859 bool
860 RobotMemory::mutex_try_lock(const std::string &name, const std::string &identity, bool force)
861 {
862  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
863 
864  std::string locked_by{identity};
865  if (identity.empty()) {
866  HostInfo host_info;
867  locked_by = host_info.name();
868  }
869 
870  // here we can add an $or to implement lock timeouts
871  using namespace bsoncxx::builder;
872  basic::document filter_doc;
873  filter_doc.append(basic::kvp("_id", name));
874  if (!force) {
875  filter_doc.append(basic::kvp("locked", false));
876  }
877 
878  basic::document update_doc;
879  update_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
880  subdoc.append(basic::kvp("lock-time", true));
881  }));
882  update_doc.append(basic::kvp("$set", [locked_by](basic::sub_document subdoc) {
883  subdoc.append(basic::kvp("locked", true));
884  subdoc.append(basic::kvp("locked-by", locked_by));
885  }));
886  try {
887  MutexLocker lock(mutex_);
888  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
889  auto write_concern = mongocxx::write_concern();
890  write_concern.majority(std::chrono::milliseconds(0));
891  auto new_doc =
892  collection.find_one_and_update(filter_doc.view(),
893  update_doc.view(),
894  options::find_one_and_update()
895  .upsert(true)
896  .return_document(options::return_document::k_after)
897  .write_concern(write_concern));
898 
899  if (!new_doc) {
900  return false;
901  }
902  auto new_view = new_doc->view();
903  return (new_view["locked-by"].get_utf8().value.to_string() == locked_by
904  && new_view["locked"].get_bool());
905 
906  } catch (operation_exception &e) {
907  logger_->log_error(name_, "Mongo OperationException: %s", e.what());
908  try {
909  // TODO is this extrac check still needed?
910  basic::document check_doc;
911  check_doc.append(basic::kvp("_id", name));
912  check_doc.append(basic::kvp("locked", true));
913  check_doc.append(basic::kvp("locked-by", locked_by));
914  MutexLocker lock(mutex_);
915  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
916  auto res = collection.find_one(check_doc.view());
917  logger_->log_info(name_, "Checking whether mutex was acquired succeeded");
918  if (res) {
919  logger_->log_warn(name_,
920  "Exception during try-lock for %s, "
921  "but mutex was still acquired",
922  name.c_str());
923  } else {
924  logger_->log_info(name_,
925  "Exception during try-lock for %s, "
926  "and mutex was not acquired",
927  name.c_str());
928  }
929  return static_cast<bool>(res);
930  } catch (operation_exception &e) {
931  logger_->log_error(name_,
932  "Mongo OperationException while handling "
933  "the first exception: %s",
934  e.what());
935  return false;
936  }
937  }
938 }
939 
940 /** Try to acquire a lock for a mutex.
941  * This will access the database and atomically find and update (or
942  * insert) a mutex lock. If the mutex has not been created it is added
943  * automatically. If the lock cannot be acquired the function also
944  * returns immediately. There is no blocked waiting for the lock.
945  * @param name mutex name
946  * @param force true to force acquisition of the lock, i.e., even if
947  * the lock has already been acquired take ownership (steal the lock).
948  * @return true if operation was successful, false on failure
949  */
950 bool
951 RobotMemory::mutex_try_lock(const std::string &name, bool force)
952 {
953  return mutex_try_lock(name, "", force);
954 }
955 
956 /** Release lock on mutex.
957  * @param name mutex name
958  * @param identity string to set as lock-holder
959  * @return true if operation was successful, false on failure
960  */
961 bool
962 RobotMemory::mutex_unlock(const std::string &name, const std::string &identity)
963 {
964  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
965 
966  std::string locked_by{identity};
967  if (identity.empty()) {
968  HostInfo host_info;
969  locked_by = host_info.name();
970  }
971 
972  using namespace bsoncxx::builder;
973  // here we can add an $or to implement lock timeouts
974  basic::document filter_doc;
975  filter_doc.append(basic::kvp("_id", name));
976  filter_doc.append(basic::kvp("locked-by", locked_by));
977 
978  basic::document update_doc;
979  update_doc.append(basic::kvp("$set", [](basic::sub_document subdoc) {
980  subdoc.append(basic::kvp("locked", false));
981  }));
982  update_doc.append(basic::kvp("$unset", [](basic::sub_document subdoc) {
983  subdoc.append(basic::kvp("locked-by", true));
984  subdoc.append(basic::kvp("lock-time", true));
985  }));
986 
987  try {
988  MutexLocker lock(mutex_);
989  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
990  auto write_concern = mongocxx::write_concern();
991  write_concern.majority(std::chrono::milliseconds(0));
992  auto new_doc =
993  collection.find_one_and_update(filter_doc.view(),
994  update_doc.view(),
995  options::find_one_and_update()
996  .upsert(true)
997  .return_document(options::return_document::k_after)
998  .write_concern(write_concern));
999  if (!new_doc) {
1000  return false;
1001  }
1002  return new_doc->view()["locked"].get_bool();
1003  } catch (operation_exception &e) {
1004  return false;
1005  }
1006 }
1007 
1008 /** Renew a mutex.
1009  * Renewing means updating the lock timestamp to the current time to
1010  * avoid expiration. Note that the lock must currently be held by
1011  * the given identity.
1012  * @param name mutex name
1013  * @param identity string to set as lock-holder (defaults to hostname
1014  * if empty)
1015  * @return true if operation was successful, false on failure
1016  */
1017 bool
1018 RobotMemory::mutex_renew_lock(const std::string &name, const std::string &identity)
1019 {
1020  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1021 
1022  std::string locked_by{identity};
1023  if (identity.empty()) {
1024  HostInfo host_info;
1025  locked_by = host_info.name();
1026  }
1027 
1028  using namespace bsoncxx::builder;
1029  // here we can add an $or to implement lock timeouts
1030  basic::document filter_doc;
1031  filter_doc.append(basic::kvp("_id", name));
1032  filter_doc.append(basic::kvp("locked", true));
1033  filter_doc.append(basic::kvp("locked-by", locked_by));
1034 
1035  // we set all data, even the data which is not actually modified, to
1036  // make it easier to process the update in triggers.
1037  basic::document update_doc;
1038  update_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
1039  subdoc.append(basic::kvp("lock-time", true));
1040  }));
1041  update_doc.append(basic::kvp("$set", [locked_by](basic::sub_document subdoc) {
1042  subdoc.append(basic::kvp("locked", true));
1043  subdoc.append(basic::kvp("locked-by", locked_by));
1044  }));
1045 
1046  try {
1047  MutexLocker lock(mutex_);
1048  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1049  auto write_concern = mongocxx::write_concern();
1050  write_concern.majority(std::chrono::milliseconds(0));
1051  auto new_doc =
1052  collection.find_one_and_update(filter_doc.view(),
1053  update_doc.view(),
1054  options::find_one_and_update()
1055  .upsert(false)
1056  .return_document(options::return_document::k_after)
1057  .write_concern(write_concern));
1058  return static_cast<bool>(new_doc);
1059  } catch (operation_exception &e) {
1060  logger_->log_warn(name_, "Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1061  return false;
1062  }
1063 }
1064 
1065 /** Setup time-to-live index for mutexes.
1066  * Setting up a time-to-live index for mutexes enables automatic
1067  * expiration through the database. Note, however, that the documents
1068  * are expired only every 60 seconds. This has two consequences:
1069  * - max_age_sec lower than 60 seconds cannot be achieved
1070  * - locks may be held for up to just below 60 seconds longer than
1071  * configured, i.e., if the mutex had not yet expired when the
1072  * background tasks runs.
1073  * @param max_age_sec maximum age of locks in seconds
1074  * @return true if operation was successful, false on failure
1075  */
1076 bool
1077 RobotMemory::mutex_setup_ttl(float max_age_sec)
1078 {
1079  MutexLocker lock(mutex_);
1080 
1081  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1082 
1083  auto keys = builder::basic::make_document(builder::basic::kvp("lock-time", true));
1084 
1085  try {
1086  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1087  collection.create_index(keys.view(),
1088  builder::basic::make_document(
1089  builder::basic::kvp("expireAfterSeconds", max_age_sec)));
1090  } catch (operation_exception &e) {
1091  logger_->log_warn(name_, "Creating TTL index failed: %s", e.what());
1092  return false;
1093  }
1094  return true;
1095 }
1096 
1097 /** Expire old locks on mutexes.
1098  * This will update the database and set all mutexes to unlocked for
1099  * which the lock-time is older than the given maximum age.
1100  * @param max_age_sec maximum age of locks in seconds
1101  * @return true if operation was successful, false on failure
1102  */
1103 bool
1104 RobotMemory::mutex_expire_locks(float max_age_sec)
1105 {
1106  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1107 
1108  using std::chrono::high_resolution_clock;
1109  using std::chrono::milliseconds;
1110  using std::chrono::time_point;
1111  using std::chrono::time_point_cast;
1112 
1113  auto max_age_ms = milliseconds(static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1114  time_point<high_resolution_clock, milliseconds> expire_before =
1115  time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1116  types::b_date expire_before_mdb(expire_before);
1117 
1118  // here we can add an $or to implement lock timeouts
1119  using namespace bsoncxx::builder;
1120  basic::document filter_doc;
1121  filter_doc.append(basic::kvp("locked", true));
1122  filter_doc.append(basic::kvp("lock-time", [expire_before_mdb](basic::sub_document subdoc) {
1123  subdoc.append(basic::kvp("$lt", expire_before_mdb));
1124  }));
1125 
1126  basic::document update_doc;
1127  update_doc.append(basic::kvp("$set", [](basic::sub_document subdoc) {
1128  subdoc.append(basic::kvp("locked", false));
1129  }));
1130  update_doc.append(basic::kvp("$unset", [](basic::sub_document subdoc) {
1131  subdoc.append(basic::kvp("locked-by", true));
1132  subdoc.append(basic::kvp("lock-time", true));
1133  }));
1134 
1135  try {
1136  MutexLocker lock(mutex_);
1137  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1138  auto write_concern = mongocxx::write_concern();
1139  write_concern.majority(std::chrono::milliseconds(0));
1140  collection.update_many(filter_doc.view(),
1141  update_doc.view(),
1142  options::update().write_concern(write_concern));
1143  return true;
1144  } catch (operation_exception &e) {
1145  log(std::string("Failed to expire locks: " + std::string(e.what())), "error");
1146  return false;
1147  }
1148 }
RobotMemory::create_index
int create_index(bsoncxx::document::view keys, const std::string &collection="", bool unique=false)
Create an index on a collection.
Definition: robot_memory.cpp:300
RobotMemory::mutex_expire_locks
bool mutex_expire_locks(float max_age_sec)
Expire old locks on mutexes.
Definition: robot_memory.cpp:1103
RobotMemory::remove_trigger
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
Definition: robot_memory.cpp:773
RobotMemory::insert
int insert(bsoncxx::document::view, const std::string &collection="")
Inserts a document into the robot memory.
Definition: robot_memory.cpp:275
ComputablesManager
Definition: computables_manager.h:45
fawkes::MongoDBConnCreator
Definition: mongodb_conncreator.h:40
fawkes::MutexLocker
Definition: mutex_locker.h:37
fawkes::BlackBoard
Definition: blackboard.h:48
EventTrigger
Definition: event_trigger.h:30
fawkes::str_split
static std::vector< std::string > str_split(const std::string &s, char delim='/')
Split string by delimiter.
Definition: string_split.h:45
RobotMemory::mutex_renew_lock
bool mutex_renew_lock(const std::string &name, const std::string &identity)
Renew a mutex.
Definition: robot_memory.cpp:1017
RobotMemory::aggregate
bsoncxx::document::value aggregate(const std::vector< bsoncxx::document::view > &pipeline, const std::string &collection="")
Aggregation call on the robot memory.
Definition: robot_memory.cpp:232
fawkes::Configuration
Definition: config.h:68
RobotMemory::RobotMemory
RobotMemory(fawkes::Configuration *config, fawkes::Logger *logger, fawkes::Clock *clock, fawkes::MongoDBConnCreator *mongo_connection_manager, fawkes::BlackBoard *blackboard)
Robot Memory Constructor with objects of the thread.
Definition: robot_memory.cpp:66
fawkes::StringConversions::resolve_path
static std::string resolve_path(std::string s)
Resolves path-string with @...@ tags.
Definition: string_conversions.cpp:264
RobotMemory::remove_computable
void remove_computable(Computable *computable)
Remove previously registered computable.
Definition: robot_memory.cpp:783
fawkes::HostInfo::name
const char * name()
Get full hostname.
Definition: hostinfo.cpp:104
RobotMemory::restore_collection
int restore_collection(const std::string &collection, const std::string &directory="@CONFDIR@/robot-memory")
Restore a previously dumped collection from a directory.
Definition: robot_memory.cpp:585
RobotMemory::mapreduce
bsoncxx::document::value mapreduce(const bsoncxx::document::view &query, const std::string &collection, const std::string &js_map_fun, const std::string &js_reduce_fun)
Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
Definition: robot_memory.cpp:504
RobotMemory::mutex_create
bool mutex_create(const std::string &name)
Explicitly create a mutex.
Definition: robot_memory.cpp:797
fawkes::Logger
Definition: logger.h:40
fawkes
RobotMemory::mutex_try_lock
bool mutex_try_lock(const std::string &name, bool force=false)
Try to acquire a lock for a mutex.
Definition: robot_memory.cpp:950
EventTriggerManager
Definition: event_trigger_manager.h:44
RobotMemory::remove
int remove(const bsoncxx::document::view &query, const std::string &collection="")
Remove documents from the robot memory.
Definition: robot_memory.cpp:477
RobotMemory::drop_collection
int drop_collection(const std::string &collection)
Drop (= remove) a whole collection and all documents inside it.
Definition: robot_memory.cpp:554
RobotMemory::clear_memory
int clear_memory()
Remove the whole database of the robot memory and all documents inside.
Definition: robot_memory.cpp:568
RobotMemory::mutex_setup_ttl
bool mutex_setup_ttl(float max_age_sec)
Setup time-to-live index for mutexes.
Definition: robot_memory.cpp:1076
RobotMemory::mutex_destroy
bool mutex_destroy(const std::string &name)
Destroy a mutex.
Definition: robot_memory.cpp:827
RobotMemory::find_one_and_update
bsoncxx::document::value find_one_and_update(const bsoncxx::document::view &filter, const bsoncxx::document::view &update, const std::string &collection, bool upsert=false, bool return_new=true)
Atomically update and retrieve document.
Definition: robot_memory.cpp:434
fawkes::TimeTracker
Definition: tracker.h:40
fawkes::HostInfo
Definition: hostinfo.h:30
RobotMemory::query
mongocxx::cursor query(bsoncxx::document::view query, const std::string &collection_name="", mongocxx::options::find query_options=mongocxx::options::find())
Query information from the robot memory.
Definition: robot_memory.cpp:201
Computable
Definition: computable.h:30
RobotMemory::dump_collection
int dump_collection(const std::string &collection, const std::string &directory="@CONFDIR@/robot-memory")
Dump (= save) a collection to the filesystem to restore it later.
Definition: robot_memory.cpp:637
RobotMemory::update
int update(const bsoncxx::document::view &query, const bsoncxx::document::view &update, const std::string &collection="", bool upsert=false)
Updates documents in the robot memory.
Definition: robot_memory.cpp:379
RobotMemory::mutex_unlock
bool mutex_unlock(const std::string &name, const std::string &identity)
Release lock on mutex.
Definition: robot_memory.cpp:961
fawkes::Clock
Definition: clock.h:39
fawkes::Exception
Definition: exception.h:39