22 #include "mongodb_replicaset_config.h"
24 #include "mongodb_client_config.h"
27 #include <config/config.h>
28 #include <logging/logger.h>
29 #include <utils/time/wait.h>
31 #include <bsoncxx/builder/basic/document.hpp>
34 #include <mongocxx/exception/operation_exception.hpp>
38 using namespace bsoncxx::builder;
57 const std::string &prefix,
58 const std::string &bootstrap_prefix)
59 :
Thread(
"MongoDBReplicaSet",
Thread::OPMODE_CONTINUOUS),
60 leader_elec_query_(bsoncxx::builder::basic::document()),
61 leader_elec_query_force_(bsoncxx::builder::basic::document()),
62 leader_elec_update_(bsoncxx::builder::basic::document())
64 set_name(
"MongoDBReplicaSet|%s", cfgname.c_str());
65 config_name_ = cfgname;
67 bootstrap_prefix_ = bootstrap_prefix;
70 ReplicaSetStatus{.member_status = MongoDBManagedReplicaSetInterface::ERROR,
71 .primary_status = MongoDBManagedReplicaSetInterface::PRIMARY_UNKNOWN};
84 throw Exception(
"Replica set manager '%s' cannot be started while disabled",
name());
88 "Bootstrap Query: %s",
89 bsoncxx::to_json(leader_elec_query_.view()).c_str());
91 "Bootstrap Update: %s",
92 bsoncxx::to_json(leader_elec_update_.view()).c_str());
99 std::string bootstrap_client_cfg =
config->
get_string(bootstrap_prefix_ +
"client");
102 bootstrap_client_cfg,
103 "/plugins/mongodb/clients/" + bootstrap_client_cfg
106 throw Exception(
"%s: bootstrap client configuration '%s' disabled",
108 bootstrap_client_cfg.c_str());
110 bootstrap_client_.reset(bootstrap_client_config.
create_client());
111 bootstrap_ns_ = bootstrap_database_ +
"." + config_name_;
114 loop_interval_ = 5.0;
120 timewait_ =
new TimeWait(
clock, (
int)(loop_interval_ * 1000000.));
122 leader_expiration_ = 10;
124 leader_expiration_ =
config->
get_int(prefix_ +
"leader-expiration");
131 "/plugins/mongodb/clients/" + local_client_cfg_ +
"/");
133 throw Exception(
"%s: local client configuration '%s' disabled",
135 local_client_cfg_.c_str());
138 throw Exception(
"%s: local client configuration '%s' mode is not connection",
140 local_client_cfg_.c_str());
142 local_hostport_ = client_config.
hostport();
144 std::copy(hostv.begin(), hostv.end(), std::inserter(hosts_, hosts_.end()));
146 if (std::find(hosts_.begin(), hosts_.end(), local_hostport_) == hosts_.end()) {
147 throw Exception(
"%s host list does not include local client",
name());
150 using namespace bsoncxx::builder::basic;
152 auto leader_elec_query_builder = basic::document{};
153 leader_elec_query_builder.append(basic::kvp(
"host", local_hostport_));
154 leader_elec_query_builder.append(basic::kvp(
"master",
false));
155 leader_elec_query_ = leader_elec_query_builder.extract();
157 auto leader_elec_query_force_builder = basic::document{};
158 leader_elec_query_builder.append(basic::kvp(
"master",
true));
159 leader_elec_query_force_ = leader_elec_query_force_builder.extract();
161 auto leader_elec_update_builder = basic::document{};
162 leader_elec_update_builder.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
163 subdoc.append(basic::kvp(
"last_seen",
true));
165 leader_elec_update_builder.append(basic::kvp(
"$set", [
this](basic::sub_document subdoc) {
166 subdoc.append(basic::kvp(
"master",
true));
167 subdoc.append(basic::kvp(
"host", local_hostport_));
169 leader_elec_update_ = leader_elec_update_builder.extract();
180 MongoDBReplicaSetConfig::bootstrap()
184 auto database = bootstrap_client_->database(bootstrap_database_);
185 auto collection = database[bootstrap_ns_];
187 collection.create_index(basic::make_document(basic::kvp(
"host", 1)));
188 collection.create_index(basic::make_document(basic::kvp(
"master", 1)),
189 basic::make_document(basic::kvp(
"unique",
true)));
190 collection.create_index(basic::make_document(basic::kvp(
"last_seen", 1)),
191 basic::make_document(
192 basic::kvp(
"expireAfterSeconds", leader_expiration_)));
193 }
catch (mongocxx::operation_exception &e) {
201 MongoDBReplicaSetConfig::leader_elect(
bool force)
204 auto write_concern = mongocxx::write_concern();
205 write_concern.majority(std::chrono::milliseconds(0));
206 auto result = bootstrap_client_->database(bootstrap_database_)[bootstrap_ns_].update_one(
207 force ? leader_elec_query_force_.view() : leader_elec_query_.view(),
208 leader_elec_update_.view(),
209 mongocxx::options::update().upsert(
true).write_concern(write_concern));
221 }
catch (mongocxx::operation_exception &e) {
222 if (boost::optional<bsoncxx::document::value> error = e.raw_server_error()) {
223 bsoncxx::array::view writeErrors = error->view()[
"writeErrors"].get_array();
225 auto num_errors = std::distance(writeErrors.begin(), writeErrors.end());
227 if (num_errors > 0) {
228 error_code = error->view()[
"writeErrors"][0][
"code"].get_int32();
230 if (num_errors > 1 || error_code != 11000) {
233 "Leader election failed (%i): %s %s",
236 bsoncxx::to_json(error->view()).c_str());
238 }
else if (is_leader_) {
243 logger->
log_error(
name(),
"Leader election failed; failed to fetch error code: %s", e.what());
250 MongoDBReplicaSetConfig::leader_resign()
254 auto write_concern = mongocxx::write_concern();
255 write_concern.majority(std::chrono::milliseconds(0));
256 bootstrap_client_->database(bootstrap_database_)[bootstrap_ns_].delete_one(
257 leader_elec_query_.view(), mongocxx::options::delete_options().write_concern(write_concern));
274 bsoncxx::document::value reply{bsoncxx::builder::basic::document()};
275 ReplicaSetStatus status = rs_status(reply);
277 if (status.primary_status == MongoDBManagedReplicaSetInterface::NO_PRIMARY) {
279 if (leader_elect(
false)) {
285 switch (status.member_status) {
286 case MongoDBManagedReplicaSetInterface::PRIMARY:
287 if (last_status_.member_status != status.member_status) {
293 case MongoDBManagedReplicaSetInterface::SECONDARY:
294 if (last_status_.member_status != status.member_status) {
298 case MongoDBManagedReplicaSetInterface::ARBITER:
301 case MongoDBManagedReplicaSetInterface::NOT_INITIALIZED:
302 if (hosts_.size() == 1 || leader_elect()) {
304 if (hosts_.size() == 1) {
312 case MongoDBManagedReplicaSetInterface::INVALID_CONFIG:
315 "Invalid configuration, hands-on required\n%s",
316 bsoncxx::to_json(reply.view()).c_str());
321 if (last_status_ != status) {
322 rs_status_if_->set_member_status(status.member_status);
323 rs_status_if_->set_primary_status(status.primary_status);
324 rs_status_if_->set_error_msg(status.error_msg.c_str());
325 rs_status_if_->write();
327 last_status_ = status;
333 MongoDBReplicaSetConfig::ReplicaSetStatus
334 MongoDBReplicaSetConfig::rs_status(bsoncxx::document::value &reply)
336 ReplicaSetStatus status = {.member_status = MongoDBManagedReplicaSetInterface::ERROR,
337 .primary_status = MongoDBManagedReplicaSetInterface::PRIMARY_UNKNOWN};
339 auto cmd = basic::make_document(basic::kvp(
"replSetGetStatus", 1));
341 reply = local_client_->database(
"admin").run_command(std::move(cmd));
342 }
catch (mongocxx::operation_exception &e) {
344 auto error_code_element = e.raw_server_error()->view()[
"code"];
345 if (error_code_element && error_code_element.type() == bsoncxx::type::k_int32) {
346 error_code = e.raw_server_error()->view()[
"code"].get_int32();
348 if (error_code == 94 ) {
350 status.member_status = MongoDBManagedReplicaSetInterface::NOT_INITIALIZED;
351 status.error_msg =
"Instance has not received replica set configuration, yet";
352 }
else if (error_code == 93 ) {
354 "Invalid replica set configuration: %s",
355 bsoncxx::to_json(reply.view()).c_str());
356 status.member_status = MongoDBManagedReplicaSetInterface::INVALID_CONFIG;
357 status.error_msg =
"Invalid replica set configuration: " + bsoncxx::to_json(reply.view());
359 status.error_msg =
"Unknown error";
365 MongoDBManagedReplicaSetInterface::ReplicaSetMemberStatus self_status =
366 MongoDBManagedReplicaSetInterface::REMOVED;
367 auto members = reply.view()[
"members"];
368 if (members && members.type() == bsoncxx::type::k_array) {
369 bsoncxx::array::view members_view{members.get_array().value};
370 bool have_primary =
false;
371 for (bsoncxx::array::element member : members_view) {
372 int state = member[
"state"].get_int32();
376 if (member[
"self"] && member[
"self"].get_bool()) {
378 case 1: self_status = MongoDBManagedReplicaSetInterface::PRIMARY;
break;
379 case 2: self_status = MongoDBManagedReplicaSetInterface::SECONDARY;
break;
383 self_status = MongoDBManagedReplicaSetInterface::INITIALIZING;
385 case 7: self_status = MongoDBManagedReplicaSetInterface::ARBITER;
break;
386 default: self_status = MongoDBManagedReplicaSetInterface::ERROR;
break;
390 status.primary_status = have_primary ? MongoDBManagedReplicaSetInterface::HAVE_PRIMARY
391 : MongoDBManagedReplicaSetInterface::NO_PRIMARY;
392 status.member_status = self_status;
396 "Received replica set status reply without members, unknown status");
397 self_status = MongoDBManagedReplicaSetInterface::ERROR;
399 }
catch (mongocxx::operation_exception &e) {
401 status.member_status = MongoDBManagedReplicaSetInterface::ERROR;
402 status.error_msg = std::string(
"Failed to analyze member info: ") + e.what();
409 MongoDBReplicaSetConfig::rs_init()
412 auto cmd = basic::make_document(basic::kvp(
"replSetInitiate", basic::document{}));
413 bsoncxx::document::value reply{bsoncxx::builder::basic::document()};
415 reply = local_client_->database(
"admin").run_command(std::move(cmd));
416 bool ok = check_mongodb_ok(reply.view());
419 "RS initialization failed: %s",
420 reply.view()[
"errmsg"].get_utf8().value.to_string().c_str());
423 "RS initialized successfully: %s",
424 bsoncxx::to_json(reply.view()).c_str());
426 }
catch (mongocxx::operation_exception &e) {
432 MongoDBReplicaSetConfig::rs_get_config(bsoncxx::document::value &rs_config)
434 auto cmd = basic::make_document(basic::kvp(
"replSetGetConfig", 1));
437 bsoncxx::document::value reply{bsoncxx::builder::basic::document()};
438 reply = local_client_->database(
"admin").run_command(std::move(cmd));
439 bool ok = check_mongodb_ok(reply.view());
445 "Failed to get RS config: %s (DB error)",
446 bsoncxx::to_json(reply.view()).c_str());
449 }
catch (mongocxx::operation_exception &e) {
456 MongoDBReplicaSetConfig::rs_monitor(
const bsoncxx::document::view &status_reply)
458 using namespace std::chrono_literals;
460 std::set<std::string> in_rs, unresponsive, new_alive, members;
461 int last_member_id{0};
462 bsoncxx::array::view members_view{status_reply[
"members"].get_array().value};
463 for (bsoncxx::array::element member : members_view) {
464 std::string member_name = member[
"name"].get_utf8().value.to_string();
465 members.insert(member_name);
466 last_member_id = std::max(
int(member[
"_id"].get_int32()), last_member_id);
467 if (member[
"self"] && member[
"self"].get_bool()) {
468 in_rs.insert(member_name);
470 std::chrono::time_point<std::chrono::high_resolution_clock> last_heartbeat_rcvd(
471 std::chrono::milliseconds(member[
"lastHeartbeatRecv"].get_date()));
472 auto now = std::chrono::high_resolution_clock::now();
473 if ((
int(member[
"health"].get_double()) != 1) || (now - last_heartbeat_rcvd) > 15s) {
474 unresponsive.insert(member_name);
476 in_rs.insert(member_name);
480 std::set<std::string> not_member;
481 std::set_difference(hosts_.begin(),
485 std::inserter(not_member, not_member.end()));
487 for (
const std::string &h : not_member) {
489 if (check_alive(h)) {
497 if (!unresponsive.empty() || !new_alive.empty()) {
499 bsoncxx::document::value reply{bsoncxx::builder::basic::document()};
500 if (!rs_get_config(reply)) {
503 auto config = reply.view()[
"config"].get_document().view();
504 using namespace bsoncxx::builder::basic;
506 auto new_config = basic::document{};
507 for (
auto &&key_it =
config.begin(); key_it !=
config.end(); key_it++) {
508 if (key_it->key() ==
"version") {
509 new_config.append(basic::kvp(
"version",
config[
"version"].get_int32() + 1));
511 }
else if (key_it->key() ==
"members") {
512 bsoncxx::array::view members_view{
config[
"members"].get_array().value};
513 new_config.append(basic::kvp(
"members", [&](basic::sub_array array) {
514 for (bsoncxx::array::element member : members_view) {
515 std::string host = member[
"host"].get_utf8().value.to_string();
516 if (hosts_.find(host) == hosts_.end()) {
519 "not part of the replica set configuration",
521 }
else if (unresponsive.find(host) == unresponsive.end()) {
524 array.append(basic::make_document(basic::kvp(
"host", host),
525 basic::kvp(
"_id", member[
"_id"].get_value())));
530 for (
const std::string &h : new_alive) {
533 basic::make_document(basic::kvp(
"host", h), basic::kvp(
"_id", ++last_member_id)));
537 new_config.append(basic::kvp(key_it->key(), key_it->get_value()));
544 auto cmd = basic::document{};
545 cmd.append(basic::kvp(
"replSetReconfig", new_config));
546 cmd.append(basic::kvp(
"force",
true));
549 auto reply = local_client_->database(
"admin").run_command(cmd.view());
551 bool ok = check_mongodb_ok(reply.view());
554 "RS reconfig failed: %s (DB error)",
555 reply.view()[
"errmsg"].get_utf8().value.to_string().c_str());
557 }
catch (mongocxx::operation_exception &e) {
564 MongoDBReplicaSetConfig::check_alive(
const std::string &h)
566 using namespace bsoncxx::builder::basic;
568 mongocxx::client client{mongocxx::uri{
"mongodb://" + h}};
569 auto cmd = basic::document{};
570 cmd.append(basic::kvp(
"isMaster", 1));
571 auto reply = client.database(
"admin").run_command(cmd.view());
572 bool ok = check_mongodb_ok(reply.view());
574 logger->
log_warn(
name(),
"Failed to connect: %s", bsoncxx::to_json(reply.view()).c_str());
577 }
catch (mongocxx::operation_exception &e) {