Fawkes API  Fawkes Development Version
metrics_thread.cpp
1 /***************************************************************************
2  * metrics_thread.cpp - Metrics exporter for Prometheus plugin
3  *
4  * Created: Sat May 06 19:44:55 2017 (German Open 2017)
5  * Copyright 2017 Tim Niemueller [www.niemueller.de]
6  ****************************************************************************/
7 
8 /* This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU Library General Public License for more details.
17  *
18  * Read the full text in the LICENSE.GPL file in the doc directory.
19  */
20 
21 #include "metrics_thread.h"
22 
23 #include "metrics_processor.h"
24 
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/MetricCounterInterface.h>
27 #include <interfaces/MetricGaugeInterface.h>
28 #include <interfaces/MetricHistogramInterface.h>
29 #include <interfaces/MetricUntypedInterface.h>
30 #include <utils/misc/string_split.h>
31 #include <webview/url_manager.h>
32 
33 #include <algorithm>
34 #include <chrono>
35 #include <functional>
36 
37 using namespace fawkes;
38 
39 #define CFG_PREFIX "/metrics/"
40 #define URL_PREFIX "/metrics"
41 
42 /** @class MetricsThread "metrics_thread.h"
43  * Thread to export metrics for Prometheus.
44  * @author Tim Niemueller
45  */
46 
47 /** Constructor. */
49 : Thread("MetricsThread", Thread::OPMODE_WAITFORWAKEUP),
50  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_POST_LOOP),
51  AspectProviderAspect(&metrics_aspect_inifin_),
52  BlackBoardInterfaceListener("MetricsThread")
53 {
54 }
55 
56 /** Destructor. */
58 {
59 }
60 
61 void
63 {
64  metrics_aspect_inifin_.set_manager(this);
65 
66  bbio_add_observed_create("MetricFamilyInterface", "*");
68 
69  MutexLocker lock(metric_bbs_.mutex());
70  std::list<MetricFamilyInterface *> ifaces =
71  blackboard->open_multiple_for_reading<MetricFamilyInterface>("*");
72 
73  for (auto &i : ifaces) {
74  logger->log_info(name(), "Got metric family %s", i->id());
75  i->read();
76  MetricFamilyBB mfbb{.metric_family = i, .metric_type = i->metric_type()};
77  metric_bbs_[i->id()] = mfbb;
78 
79  if (!conditional_open(i->id(), metric_bbs_[i->id()])) {
81  }
82  }
83 
85 
86  lock.unlock();
87 
88  imf_loop_count_ = std::make_shared<io::prometheus::client::MetricFamily>();
89  imf_loop_count_->set_name("fawkes_loop_count");
90  imf_loop_count_->set_help("Number of Fawkes main loop iterations");
91  imf_loop_count_->set_type(io::prometheus::client::COUNTER);
92  imf_loop_count_->add_metric();
93  internal_metrics_.push_back(imf_loop_count_);
94 
95  imf_metrics_requests_ = std::make_shared<io::prometheus::client::MetricFamily>();
96  imf_metrics_requests_->set_name("fawkes_metrics_requests");
97  imf_metrics_requests_->set_help("Number of requests for metrics");
98  imf_metrics_requests_->set_type(io::prometheus::client::COUNTER);
99  imf_metrics_requests_->add_metric();
100  internal_metrics_.push_back(imf_metrics_requests_);
101 
102  try {
103  std::vector<float> buckets_le =
104  config->get_floats("/metrics/internal/metrics_requests/buckets");
105 
106  if (!buckets_le.empty()) {
107  std::sort(buckets_le.begin(), buckets_le.end());
108 
109  imf_metrics_proctime_ = std::make_shared<io::prometheus::client::MetricFamily>();
110  imf_metrics_proctime_->set_name("fawkes_metrics_proctime");
111  imf_metrics_proctime_->set_help("Time required to process metrics");
112  imf_metrics_proctime_->set_type(io::prometheus::client::HISTOGRAM);
113  auto m = imf_metrics_proctime_->add_metric();
114  auto h = m->mutable_histogram();
115  for (float &b : buckets_le) {
116  h->add_bucket()->set_upper_bound(b);
117  }
118  internal_metrics_.push_back(imf_metrics_proctime_);
119  }
120  } catch (Exception &e) {
121  logger->log_warn(name(),
122  "Internal metric metrics_proctime bucket bounds not configured, disabling");
123  }
124 
125  metrics_suppliers_.push_back(this);
126 
127  req_proc_ = new MetricsRequestProcessor(this, logger, URL_PREFIX);
128  webview_url_manager->add_handler(WebRequest::METHOD_GET,
129  URL_PREFIX,
131  req_proc_,
132  std::placeholders::_1));
133 }
134 
135 void
137 {
138  webview_url_manager->remove_handler(WebRequest::METHOD_GET, URL_PREFIX);
139  delete req_proc_;
140 }
141 
142 void
144 {
145  imf_loop_count_->mutable_metric(0)->mutable_counter()->set_value(
146  imf_loop_count_->metric(0).counter().value() + 1);
147 }
148 
149 void
150 MetricsThread::bb_interface_created(const char *type, const char *id) throw()
151 {
152  MutexLocker lock(metric_bbs_.mutex());
153  MetricFamilyInterface *mfi;
154  try {
155  mfi = blackboard->open_for_reading<MetricFamilyInterface>(id);
156  logger->log_info(name(), "Opened %s:%s", type, id);
157  } catch (Exception &e) {
158  // ignored
159  logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what_no_backtrace());
160  return;
161  }
162 
163  try {
164  bbil_add_reader_interface(mfi);
165  bbil_add_writer_interface(mfi);
166  bbil_add_data_interface(mfi);
167  blackboard->update_listener(this);
168  } catch (Exception &e) {
169  logger->log_warn(name(), "Failed to register for %s:%s: %s", type, id, e.what());
170  try {
171  bbil_remove_reader_interface(mfi);
172  bbil_remove_writer_interface(mfi);
173  blackboard->update_listener(this);
174  blackboard->close(mfi);
175  } catch (Exception &e) {
176  logger->log_error(
177  name(), "Failed to deregister %s:%s during error recovery: %s", type, id, e.what());
178  }
179  return;
180  }
181  MetricFamilyBB mfbb{.metric_family = mfi, .metric_type = mfi->metric_type()};
182  metric_bbs_[id] = mfbb;
183 }
184 
185 std::list<io::prometheus::client::MetricFamily>
186 MetricsThread::metrics()
187 {
188  std::chrono::high_resolution_clock::time_point proc_start =
189  std::chrono::high_resolution_clock::now();
190 
191  imf_metrics_requests_->mutable_metric(0)->mutable_counter()->set_value(
192  imf_metrics_requests_->metric(0).counter().value() + 1);
193 
194  std::list<io::prometheus::client::MetricFamily> rv;
195 
196  MutexLocker lock(metric_bbs_.mutex());
197  for (auto &mbbp : metric_bbs_) {
198  auto &mfbb = mbbp.second;
199 
200  io::prometheus::client::MetricFamily mf;
201  mfbb.metric_family->read();
202  mf.set_name(mfbb.metric_family->name());
203  mf.set_help(mfbb.metric_family->help());
204 
205  switch (mfbb.metric_type) {
206  case MetricFamilyInterface::COUNTER:
207  mf.set_type(io::prometheus::client::COUNTER);
208  for (const auto &d : mfbb.data) {
209  d.counter->read();
210  io::prometheus::client::Metric *m = mf.add_metric();
211  parse_labels(d.counter->labels(), m);
212  m->mutable_counter()->set_value(d.counter->value());
213  }
214  break;
215 
216  case MetricFamilyInterface::GAUGE:
217  mf.set_type(io::prometheus::client::GAUGE);
218  for (const auto &d : mfbb.data) {
219  d.gauge->read();
220  io::prometheus::client::Metric *m = mf.add_metric();
221  parse_labels(d.gauge->labels(), m);
222  m->mutable_gauge()->set_value(d.gauge->value());
223  }
224  break;
225 
226  case MetricFamilyInterface::UNTYPED:
227  mf.set_type(io::prometheus::client::UNTYPED);
228  for (const auto &d : mfbb.data) {
229  d.untyped->read();
230  io::prometheus::client::Metric *m = mf.add_metric();
231  parse_labels(d.untyped->labels(), m);
232  m->mutable_untyped()->set_value(d.untyped->value());
233  }
234  break;
235 
236  case MetricFamilyInterface::HISTOGRAM:
237  mf.set_type(io::prometheus::client::HISTOGRAM);
238  for (const auto &d : mfbb.data) {
239  d.histogram->read();
240  io::prometheus::client::Metric *m = mf.add_metric();
241  parse_labels(d.histogram->labels(), m);
242  io::prometheus::client::Histogram *h = m->mutable_histogram();
243  h->set_sample_count(d.histogram->sample_count());
244  h->set_sample_sum(d.histogram->sample_sum());
245  for (unsigned int i = 0; i < d.histogram->bucket_count(); ++i) {
246  io::prometheus::client::Bucket *b = h->add_bucket();
247  b->set_cumulative_count(d.histogram->bucket_cumulative_count(i));
248  b->set_upper_bound(d.histogram->bucket_upper_bound(i));
249  }
250  }
251  break;
252 
253  case MetricFamilyInterface::NOT_INITIALIZED:
254  // ignore
255  break;
256  }
257  rv.push_back(std::move(mf));
258  }
259 
260  if (imf_metrics_proctime_) {
261  std::chrono::high_resolution_clock::time_point proc_end =
262  std::chrono::high_resolution_clock::now();
263  const std::chrono::duration<double> proc_diff = proc_end - proc_start;
264  for (int i = 0; i < imf_metrics_proctime_->metric(0).histogram().bucket_size(); ++i) {
265  io::prometheus::client::Histogram *h =
266  imf_metrics_proctime_->mutable_metric(0)->mutable_histogram();
267  if (proc_diff.count() < h->bucket(i).upper_bound()) {
268  io::prometheus::client::Bucket *b = h->mutable_bucket(i);
269  b->set_cumulative_count(b->cumulative_count() + 1);
270  h->set_sample_count(h->sample_count() + 1);
271  h->set_sample_sum(h->sample_sum() + proc_diff.count());
272  break;
273  }
274  }
275  }
276 
277  for (auto &im : internal_metrics_) {
278  rv.push_back(std::move(*im));
279  }
280 
281  return rv;
282 }
283 
284 std::list<io::prometheus::client::MetricFamily>
285 MetricsThread::all_metrics()
286 {
287  std::list<io::prometheus::client::MetricFamily> metrics;
288 
289  for (auto &s : metrics_suppliers_) {
290  metrics.splice(metrics.begin(), std::move(s->metrics()));
291  }
292 
293  return metrics;
294 }
295 
296 void
297 MetricsThread::add_supplier(MetricsSupplier *supplier)
298 {
299  MutexLocker lock(metrics_suppliers_.mutex());
300  auto i = std::find(metrics_suppliers_.begin(), metrics_suppliers_.end(), supplier);
301  if (i == metrics_suppliers_.end()) {
302  metrics_suppliers_.push_back(supplier);
303  }
304 }
305 
306 void
307 MetricsThread::remove_supplier(MetricsSupplier *supplier)
308 {
309  MutexLocker lock(metrics_suppliers_.mutex());
310  auto i = std::find(metrics_suppliers_.begin(), metrics_suppliers_.end(), supplier);
311  if (i != metrics_suppliers_.end()) {
312  metrics_suppliers_.erase(i);
313  }
314 }
315 
317 MetricsThread::metrics_suppliers() const
318 {
319  return metrics_suppliers_;
320 }
321 
322 void
323 MetricsThread::parse_labels(const std::string &labels, io::prometheus::client::Metric *m)
324 {
325  std::vector<std::string> labelv = str_split(labels, ',');
326  for (const std::string &l : labelv) {
327  std::vector<std::string> key_value = str_split(l, '=');
328  if (key_value.size() == 2) {
329  io::prometheus::client::LabelPair *lp = m->add_label();
330  lp->set_name(key_value[0]);
331  lp->set_value(key_value[1]);
332  } else {
333  logger->log_warn(name(), "Invalid label '%s'", l.c_str());
334  }
335  }
336 }
337 
338 void
339 MetricsThread::bb_interface_writer_removed(fawkes::Interface *interface,
340  unsigned int instance_serial) throw()
341 {
342  conditional_close(interface);
343 }
344 
345 void
346 MetricsThread::bb_interface_reader_removed(fawkes::Interface *interface,
347  unsigned int instance_serial) throw()
348 {
349  conditional_close(interface);
350 }
351 
352 void
353 MetricsThread::bb_interface_data_changed(fawkes::Interface *interface) throw()
354 {
355  MetricFamilyInterface *mfi = dynamic_cast<MetricFamilyInterface *>(interface);
356  if (!mfi)
357  return;
358  if (!mfi->has_writer())
359  return;
360 
361  mfi->read();
362  if (mfi->metric_type() == MetricFamilyInterface::NOT_INITIALIZED) {
363  logger->log_warn(name(),
364  "Got data changed event for %s which is not yet initialized",
365  mfi->uid());
366  return;
367  }
368 
369  MutexLocker lock(metric_bbs_.mutex());
370  if (metric_bbs_.find(mfi->id()) == metric_bbs_.end()) {
371  logger->log_warn(name(), "Got data changed event for %s which is not registered", mfi->uid());
372  return;
373  }
374 
375  metric_bbs_[mfi->id()].metric_type = mfi->metric_type();
376  if (conditional_open(mfi->id(), metric_bbs_[mfi->id()])) {
377  bbil_remove_data_interface(mfi);
378  blackboard->update_listener(this);
379  }
380 }
381 
382 bool
383 MetricsThread::conditional_open(const std::string &id, MetricFamilyBB &mfbb)
384 {
385  mfbb.metric_family->read();
386 
387  std::string data_id_pattern = id + "/*";
388 
389  switch (mfbb.metric_type) {
390  case MetricFamilyInterface::COUNTER: {
391  std::list<MetricCounterInterface *> ifaces =
392  blackboard->open_multiple_for_reading<MetricCounterInterface>(data_id_pattern.c_str());
393  if (ifaces.empty())
394  return false;
395  std::transform(ifaces.begin(),
396  ifaces.end(),
397  std::back_inserter(mfbb.data),
398  [](MetricCounterInterface *iface) {
399  return MetricFamilyData{.counter = iface};
400  });
401  } break;
402 
403  case MetricFamilyInterface::GAUGE: {
404  std::list<MetricGaugeInterface *> ifaces =
405  blackboard->open_multiple_for_reading<MetricGaugeInterface>(data_id_pattern.c_str());
406  if (ifaces.empty())
407  return false;
408  std::transform(ifaces.begin(),
409  ifaces.end(),
410  std::back_inserter(mfbb.data),
411  [](MetricGaugeInterface *iface) { return MetricFamilyData{.gauge = iface}; });
412  } break;
413 
414  case MetricFamilyInterface::UNTYPED: {
415  std::list<MetricUntypedInterface *> ifaces =
416  blackboard->open_multiple_for_reading<MetricUntypedInterface>(data_id_pattern.c_str());
417  if (ifaces.empty())
418  return false;
419  std::transform(ifaces.begin(),
420  ifaces.end(),
421  std::back_inserter(mfbb.data),
422  [](MetricUntypedInterface *iface) {
423  return MetricFamilyData{.untyped = iface};
424  });
425  } break;
426 
427  case MetricFamilyInterface::HISTOGRAM: {
428  std::list<MetricHistogramInterface *> ifaces =
429  blackboard->open_multiple_for_reading<MetricHistogramInterface>(data_id_pattern.c_str());
430  if (ifaces.empty())
431  return false;
432  std::transform(ifaces.begin(),
433  ifaces.end(),
434  std::back_inserter(mfbb.data),
435  [](MetricHistogramInterface *iface) {
436  return MetricFamilyData{.histogram = iface};
437  });
438  } break;
439 
440  case MetricFamilyInterface::NOT_INITIALIZED:
441  logger->log_info(name(), "Metric family %s not yet initialized", id.c_str());
442  return false;
443  }
444 
445  logger->log_info(name(), "Initialized metric %s", id.c_str());
446  return true;
447 }
448 
449 void
450 MetricsThread::conditional_close(Interface *interface) throw()
451 {
452  MetricFamilyInterface *mfi = dynamic_cast<MetricFamilyInterface *>(interface);
453  if (!mfi)
454  return;
455 
456  MutexLocker lock(metric_bbs_.mutex());
457 
458  if (metric_bbs_.find(mfi->id()) == metric_bbs_.end()) {
459  logger->log_warn(name(), "Called to close %s whic was not opened", mfi->uid());
460  return;
461  }
462 
463  logger->log_info(name(), "Last on metric family %s, closing", interface->id());
464  auto &mfbb(metric_bbs_[mfi->id()]);
465 
466  switch (mfbb.metric_type) {
467  case MetricFamilyInterface::COUNTER:
468  std::for_each(mfbb.data.begin(), mfbb.data.end(), [this](auto &d) {
469  this->blackboard->close(d.counter);
470  });
471  break;
472 
473  case MetricFamilyInterface::GAUGE:
474  std::for_each(mfbb.data.begin(), mfbb.data.end(), [this](auto &d) {
475  this->blackboard->close(d.gauge);
476  });
477  break;
478  case MetricFamilyInterface::UNTYPED:
479  std::for_each(mfbb.data.begin(), mfbb.data.end(), [this](auto &d) {
480  this->blackboard->close(d.untyped);
481  });
482  break;
483  case MetricFamilyInterface::HISTOGRAM:
484  std::for_each(mfbb.data.begin(), mfbb.data.end(), [this](auto &d) {
485  this->blackboard->close(d.histogram);
486  });
487  break;
488  case MetricFamilyInterface::NOT_INITIALIZED: bbil_remove_data_interface(mfi); break;
489  }
490 
491  metric_bbs_.erase(mfi->id());
492  lock.unlock();
493 
494  std::string uid = interface->uid();
495  try {
496  bbil_remove_reader_interface(interface);
497  bbil_remove_writer_interface(interface);
498  blackboard->update_listener(this);
499  blackboard->close(interface);
500  } catch (Exception &e) {
501  logger->log_error(name(), "Failed to unregister or close %s: %s", uid.c_str(), e.what());
502  }
503 }
fawkes::MultiLogger::log_error
virtual void log_error(const char *component, const char *format,...)
Definition: multi.cpp:241
fawkes::WebUrlManager::add_handler
void add_handler(WebRequest::Method method, const std::string &path, Handler handler)
Add a request processor.
Definition: url_manager.cpp:58
fawkes::LockMap::mutex
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:137
fawkes::BlackBoard::register_listener
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:188
fawkes::MetricsSupplier
Definition: metrics_supplier.h:34
fawkes::Logger::log_info
virtual void log_info(const char *component, const char *format,...)=0
fawkes::MutexLocker
Definition: mutex_locker.h:37
fawkes::LockList< MetricsSupplier * >
fawkes::BlackBoardInterfaceListener
Definition: interface_listener.h:45
fawkes::WebUrlManager::remove_handler
void remove_handler(WebRequest::Method method, const std::string &path)
Remove a request processor.
Definition: url_manager.cpp:88
fawkes::AspectProviderAspect
Definition: aspect_provider.h:39
fawkes::BlockedTimingAspect
Definition: blocked_timing.h:54
fawkes::Thread::name
const char * name() const
Definition: thread.h:99
fawkes::BlackBoardInterfaceObserver::bbio_add_observed_create
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*")
Add interface creation type to watch list.
Definition: interface_observer.cpp:123
MetricsThread::MetricsThread
MetricsThread()
Constructor.
Definition: metrics_thread.cpp:47
fawkes::MultiLogger::log_warn
virtual void log_warn(const char *component, const char *format,...)
Definition: multi.cpp:220
fawkes::Configuration::get_floats
virtual std::vector< float > get_floats(const char *path)=0
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:50
fawkes::BlackBoard::close
virtual void close(Interface *interface)=0
fawkes
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
fawkes::BlackBoard::register_observer
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:228
fawkes::Interface
Definition: interface.h:77
fawkes::ConfigurableAspect::config
Configuration * config
Definition: configurable.h:50
MetricsRequestProcessor
Definition: metrics_processor.h:33
fawkes::Exception::what_no_backtrace
virtual const char * what_no_backtrace() const
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:662
fawkes::Exception::what
virtual const char * what() const
Get primary string.
Definition: exception.cpp:638
fawkes::Thread
Definition: thread.h:44
fawkes::WebviewAspect::webview_url_manager
WebUrlManager * webview_url_manager
Webview request processor manager.
Definition: webview.h:53
MetricsThread::loop
virtual void loop()
Code to execute in the thread.
Definition: metrics_thread.cpp:142
fawkes::BlackBoardAspect::blackboard
BlackBoard * blackboard
Definition: blackboard.h:47
fawkes::BlackBoard::open_for_reading
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
fawkes::BlackBoard::open_multiple_for_reading
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
MetricsThread::finalize
virtual void finalize()
Finalize the thread.
Definition: metrics_thread.cpp:135
MetricsThread::init
virtual void init()
Initialize the thread.
Definition: metrics_thread.cpp:61
MetricsThread::~MetricsThread
virtual ~MetricsThread()
Destructor.
Definition: metrics_thread.cpp:56
fawkes::BlackBoardInterfaceListener::bbil_add_data_interface
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
Definition: interface_listener.cpp:236
fawkes::BlackBoard::update_listener
virtual void update_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Update BB event listener.
Definition: blackboard.cpp:200
MetricsRequestProcessor::process_request
fawkes::WebReply * process_request(const fawkes::WebRequest *request)
Process request.
Definition: metrics_processor.cpp:70
fawkes::MetricsAspectIniFin::set_manager
void set_manager(MetricsManager *supplier_mgr)
Set Metrics environment manger.
Definition: metrics_inifin.cpp:84
fawkes::MultiLogger::log_info
virtual void log_info(const char *component, const char *format,...)
Definition: multi.cpp:199
fawkes::Exception
Definition: exception.h:39