Fawkes API  Fawkes Development Version
mongodb_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_thread.cpp - MongoDB Thread
4  *
5  * Created: Sun Dec 05 23:32:13 2010
6  * Copyright 2006-2015 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mongodb_thread.h"
23 
24 #include "mongodb_client_config.h"
25 #include "mongodb_instance_config.h"
26 #include "mongodb_replicaset_config.h"
27 
28 using namespace mongocxx;
29 using namespace fawkes;
30 
31 /** @class MongoDBThread "mongodb_thread.h"
32  * MongoDB Thread.
33  * This thread maintains an active connection to MongoDB and provides an
34  * aspect to access MongoDB to make it convenient for other threads to use
35  * MongoDB.
36  *
37  * @author Tim Niemueller
38  */
39 
40 /** Constructor. */
42 : Thread("MongoDBThread", Thread::OPMODE_WAITFORWAKEUP),
43  AspectProviderAspect(&mongodb_aspect_inifin_),
44  mongodb_aspect_inifin_(this)
45 {
46 }
47 
48 /** Destructor. */
50 {
51 }
52 
53 void
55 {
56  instance_.reset(new instance());
57  logger->log_info(name(), "Init instances");
58  init_instance_configs();
59  logger->log_info(name(), "Init clients");
60  init_client_configs();
61  logger->log_info(name(), "Init RS");
62  init_replicaset_configs();
63 
64  if (client_configs_.empty() && instance_configs_.empty() && replicaset_configs_.empty()) {
65  throw Exception("No enabled MongoDB configurations found");
66  }
67 }
68 
69 void
70 MongoDBThread::init_client_configs()
71 {
72  std::set<std::string> ignored_configs;
73  std::string prefix = "/plugins/mongodb/clients/";
74 
75  std::unique_ptr<Configuration::ValueIterator> i(config->search(prefix.c_str()));
76  while (i->next()) {
77  std::string cfg_name = std::string(i->path()).substr(prefix.length());
78  cfg_name = cfg_name.substr(0, cfg_name.find("/"));
79 
80  if ((client_configs_.find(cfg_name) == client_configs_.end())
81  && (ignored_configs.find(cfg_name) == ignored_configs.end())) {
82  std::string cfg_prefix = prefix + cfg_name + "/";
83 
84  try {
85  auto conf = std::make_shared<MongoDBClientConfig>(config, logger, cfg_name, cfg_prefix);
86  if (conf->is_enabled()) {
87  client_configs_[cfg_name] = conf;
88  logger->log_info(name(), "Added MongoDB client configuration %s", cfg_name.c_str());
89  conf->log(logger, name(), " ");
90  } else {
91  logger->log_info(name(),
92  "Ignoring disabled MongoDB client "
93  "configuration %s",
94  cfg_name.c_str());
95  ignored_configs.insert(cfg_name);
96  }
97  } catch (Exception &e) {
98  logger->log_warn(name(),
99  "Invalid MongoDB client config %s, ignoring, "
100  "exception follows.",
101  cfg_name.c_str());
102  logger->log_warn(name(), e);
103  ignored_configs.insert(cfg_name);
104  }
105  }
106  }
107 }
108 
109 void
110 MongoDBThread::init_instance_configs()
111 {
112  std::set<std::string> ignored_configs;
113  std::string prefix = "/plugins/mongodb/instances/";
114 
115  std::unique_ptr<Configuration::ValueIterator> i(config->search(prefix.c_str()));
116  while (i->next()) {
117  std::string cfg_name = std::string(i->path()).substr(prefix.length());
118  cfg_name = cfg_name.substr(0, cfg_name.find("/"));
119 
120  if ((instance_configs_.find(cfg_name) == instance_configs_.end())
121  && (ignored_configs.find(cfg_name) == ignored_configs.end())) {
122  std::string cfg_prefix = prefix + cfg_name + "/";
123 
124  try {
125  auto conf = std::make_shared<MongoDBInstanceConfig>(config, cfg_name, cfg_prefix);
126  if (conf->is_enabled()) {
127  instance_configs_[cfg_name] = conf;
128  logger->log_info(name(), "Added MongoDB instance configuration %s", cfg_name.c_str());
129  } else {
130  logger->log_info(name(),
131  "Ignoring disabled MongoDB instance "
132  "configuration %s",
133  cfg_name.c_str());
134  ignored_configs.insert(cfg_name);
135  }
136  } catch (Exception &e) {
137  logger->log_warn(name(),
138  "Invalid MongoDB instance config %s, ignoring, "
139  "exception follows.",
140  cfg_name.c_str());
141  logger->log_warn(name(), e);
142  ignored_configs.insert(cfg_name);
143  }
144  }
145  }
146 
147  for (auto c : instance_configs_) {
148  logger->log_info(name(), "Running instance '%s'", c.first.c_str());
149  logger->log_info(name(), " '%s'", c.second->command_line().c_str());
150  thread_collector->add(&*c.second);
151  }
152 }
153 
154 void
155 MongoDBThread::init_replicaset_configs()
156 {
157  std::set<std::string> ignored_configs;
158  std::string prefix = "/plugins/mongodb/replica-sets/managed-sets/";
159 
160  std::string bootstrap_prefix = "/plugins/mongodb/replica-sets/bootstrap-mongodb/";
161  std::string bootstrap_client_cfg = config->get_string(bootstrap_prefix + "client");
162  std::string bootstrap_database = config->get_string(bootstrap_prefix + "database");
163 
164  std::unique_ptr<Configuration::ValueIterator> i(config->search(prefix.c_str()));
165  while (i->next()) {
166  std::string cfg_name = std::string(i->path()).substr(prefix.length());
167  cfg_name = cfg_name.substr(0, cfg_name.find("/"));
168 
169  if ((replicaset_configs_.find(cfg_name) == replicaset_configs_.end())
170  && (ignored_configs.find(cfg_name) == ignored_configs.end())) {
171  std::string cfg_prefix = prefix + cfg_name + "/";
172 
173  try {
174  if (config->get_bool(cfg_prefix + "enabled")) {
175  auto conf =
176  std::make_shared<MongoDBReplicaSetConfig>(cfg_name, cfg_prefix, bootstrap_prefix);
177  replicaset_configs_[cfg_name] = conf;
178  logger->log_info(name(), "Added MongoDB replica set configuration %s", cfg_name.c_str());
179  } else {
180  logger->log_info(name(),
181  "Ignoring disabled MongoDB replica set "
182  "configuration %s",
183  cfg_name.c_str());
184  ignored_configs.insert(cfg_name);
185  }
186  } catch (Exception &e) {
187  logger->log_warn(name(),
188  "Invalid MongoDB replica set config %s, ignoring, "
189  "exception follows.",
190  cfg_name.c_str());
191  logger->log_warn(name(), e);
192  ignored_configs.insert(cfg_name);
193  }
194  }
195  }
196 
197  for (auto c : replicaset_configs_) {
198  logger->log_info(name(), "Running replica set '%s' management", c.first.c_str());
199  thread_collector->add(&*c.second);
200  }
201 }
202 
203 void
205 {
206  for (auto c : instance_configs_) {
207  logger->log_info(name(),
208  "Stopping instance '%s', grace period %u sec",
209  c.first.c_str(),
210  c.second->termination_grace_period());
211  thread_collector->remove(&*c.second);
212  }
213  instance_configs_.clear();
214 
215  for (auto c : replicaset_configs_) {
216  logger->log_info(name(), "Stopping replica set '%s' management", c.first.c_str());
217  thread_collector->remove(&*c.second);
218  }
219  replicaset_configs_.clear();
220 
221  client_configs_.clear();
222 
223  instance_.release();
224 }
225 
226 void
228 {
229 }
230 
231 mongocxx::client *
232 MongoDBThread::create_client(const std::string &config_name)
233 {
234  const std::string cname{config_name.empty() ? "default" : config_name};
235 
236  if (client_configs_.find(cname) != client_configs_.end()) {
237  if (!client_configs_[cname]->is_enabled()) {
238  throw Exception("MongoDB config '%s' is not marked enabled", cname.c_str());
239  }
240  return client_configs_[cname]->create_client();
241  } else {
242  throw Exception("No MongoDB config named '%s' exists", cname.c_str());
243  }
244 }
245 
246 void
247 MongoDBThread::delete_client(mongocxx::client *client)
248 {
249  delete client;
250 }
fawkes::ThreadCollector::remove
virtual void remove(ThreadList &tl)=0
MongoDBThread::finalize
virtual void finalize()
Finalize the thread.
Definition: mongodb_thread.cpp:203
fawkes::Configuration::get_bool
virtual bool get_bool(const char *path)=0
MongoDBThread::~MongoDBThread
virtual ~MongoDBThread()
Destructor.
Definition: mongodb_thread.cpp:48
fawkes::Logger::log_info
virtual void log_info(const char *component, const char *format,...)=0
fawkes::AspectProviderAspect
Definition: aspect_provider.h:39
fawkes::Thread::name
const char * name() const
Definition: thread.h:99
MongoDBThread::loop
virtual void loop()
Code to execute in the thread.
Definition: mongodb_thread.cpp:226
fawkes::ThreadCollector::add
virtual void add(ThreadList &tl)=0
fawkes::Configuration::search
virtual ValueIterator * search(const char *path)=0
MongoDBThread::delete_client
virtual void delete_client(mongocxx::client *client)
Delete a client.
Definition: mongodb_thread.cpp:246
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:50
fawkes::ThreadProducerAspect::thread_collector
ThreadCollector * thread_collector
Definition: thread_producer.h:50
fawkes
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
MongoDBThread::MongoDBThread
MongoDBThread()
Constructor.
Definition: mongodb_thread.cpp:40
fawkes::ConfigurableAspect::config
Configuration * config
Definition: configurable.h:50
MongoDBThread::init
virtual void init()
Initialize the thread.
Definition: mongodb_thread.cpp:53
MongoDBThread::create_client
virtual mongocxx::client * create_client(const std::string &config_name="")
Create a new MongoDB client.
Definition: mongodb_thread.cpp:231
fawkes::Thread
Definition: thread.h:44
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
fawkes::Exception
Definition: exception.h:39