bes  Updated for version 3.20.5
CurlHandlePool.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2018 OPeNDAP, Inc.
6 // Author: James Gallagher<jgallagher@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <string>
27 #include <sstream>
28 #include <iomanip>
29 
30 #include <cstring>
31 #include <unistd.h>
32 
33 #include <curl/curl.h>
34 
35 #if HAVE_CURL_MULTI_H
36 #include <curl/multi.h>
37 #endif
38 
39 #include "util.h" // long_to_string()
40 
41 #include "BESLog.h"
42 #include "BESDebug.h"
43 #include "BESInternalError.h"
44 #include "BESForbiddenError.h"
45 #include "WhiteList.h"
46 
47 #include "DmrppRequestHandler.h"
48 #include "DmrppCommon.h"
49 #include "CurlHandlePool.h"
50 #include "Chunk.h"
51 
52 #define KEEP_ALIVE 1 // Reuse libcurl easy handles (1) or not (0).
53 
54 #define CURL_VERBOSE 0
55 
56 static const int MAX_WAIT_MSECS = 30*1000; // Wait max. 30 seconds
57 static const unsigned int retry_limit = 10; // Amazon's suggestion
58 static const unsigned int initial_retry_time = 1000; // one milli-second
59 static const string dmrpp_3 = "dmrpp:3";
60 
61 using namespace dmrpp;
62 using namespace std;
63 using namespace bes;
64 
65 Lock::Lock(pthread_mutex_t &lock) : m_mutex(lock)
66  {
67  int status = pthread_mutex_lock(&m_mutex);
68  if (status != 0) throw BESInternalError("Could not lock in CurlHandlePool", __FILE__, __LINE__);
69  }
70 
71 Lock::~Lock()
72  {
73  int status = pthread_mutex_unlock(&m_mutex);
74  if (status != 0)
75  ERROR("Could not unlock in CurlHandlePool");
76  }
77 
81 static string
82 curl_error_msg(CURLcode res, char *errbuf)
83 {
84  ostringstream oss;
85  size_t len = strlen(errbuf);
86  if (len) {
87  oss << errbuf;
88  oss << " (code: " << (int)res << ")";
89  }
90  else {
91  oss << curl_easy_strerror(res) << "(result: " << res << ")";
92  }
93 
94  return oss.str();
95 }
96 
102 #if 0
103 static
104 string dump(const char *text, unsigned char *ptr, size_t size)
105 {
106  size_t i;
107  size_t c;
108  unsigned int width=0x10;
109 
110  ostringstream oss;
111  oss << text << ", " << std::setw(10) << (long)size << std::setbase(16) << (long)size << endl;
112 
113  for(i=0; i<size; i+= width) {
114  oss << std::setw(4) << (long)i;
115  // fprintf(stream, "%4.4lx: ", (long)i);
116 
117  /* show hex to the left */
118  for(c = 0; c < width; c++) {
119  if(i+c < size) {
120  oss << std::setw(2) << ptr[i+c];
121  //fprintf(stream, "%02x ", ptr[i+c]);
122  }
123  else {
124  oss << " ";
125  // fputs(" ", stream);
126  }
127  }
128 
129  /* show data on the right */
130  for(c = 0; (c < width) && (i+c < size); c++) {
131  char x = (ptr[i+c] >= 0x20 && ptr[i+c] < 0x80) ? ptr[i+c] : '.';
132  // fputc(x, stream);
133  oss << std::setw(1) << x;
134  }
135 
136  // fputc('\n', stream); /* newline */
137  oss << endl;
138  }
139 
140  return oss.str();
141 }
142 #endif
143 
144 #if CURL_VERBOSE
145 
150 static
151 int curl_trace(CURL */*handle*/, curl_infotype type, char *data, size_t /*size*/, void */*userp*/)
152 {
153  switch (type) {
154  // print info
155  case CURLINFO_TEXT: {
156  string text = data;
157  size_t pos;
158  while((pos = text.find('\n')) != string::npos)
159  text = text.substr(0, pos);
160  LOG("libcurl == Info: " << text << endl);
161  }
162 
163  // print nothing for these
164  case CURLINFO_DATA_IN:
165  case CURLINFO_SSL_DATA_IN:
166 
167  case CURLINFO_DATA_OUT:
168  case CURLINFO_SSL_DATA_OUT:
169 
170  case CURLINFO_HEADER_IN:
171  case CURLINFO_HEADER_OUT:
172  default: /* in case a new one is introduced to shock us */
173  return 0;
174  }
175 }
176 #endif
177 
179 {
180  d_handle = curl_easy_init();
181  if (!d_handle) throw BESInternalError("Could not allocate CURL handle", __FILE__, __LINE__);
182 
183  CURLcode res;
184 
185  if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_ERRORBUFFER, d_errbuf)))
186  throw BESInternalError(string("CURL Error: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
187 
188 #if CURL_VERBOSE
189  // Information is output only when the log is in verbose mode and the code is
190  // built using --enable-developer
191  if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_DEBUGFUNCTION, curl_trace)))
192  throw BESInternalError(string("CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
193  // Many tests fail with this option, but it's still useful to see how connections
194  // are treated. jhrg 10/2/18
195  if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_VERBOSE, 1L)))
196  throw BESInternalError(string("CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
197 #endif
198 
199  // Pass all data to the 'write_data' function
200  if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_WRITEFUNCTION, chunk_write_data)))
201  throw BESInternalError(string("CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
202 
203 #ifdef CURLOPT_TCP_KEEPALIVE
204  /* enable TCP keep-alive for this transfer */
205  if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPALIVE, 1L)))
206  throw BESInternalError(string("CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__);
207 #endif
208 
209 #ifdef CURLOPT_TCP_KEEPIDLE
210  /* keep-alive idle time to 120 seconds */
211  if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPIDLE, 120L)))
212  throw BESInternalError(string("CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__);
213 #endif
214 
215 #ifdef CURLOPT_TCP_KEEPINTVL
216  /* interval time between keep-alive probes: 120 seconds */
217  if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPINTVL, 120L)))
218  throw BESInternalError(string("CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__)
219 #endif
220 
221  d_in_use = false;
222  d_url = "";
223  d_chunk = 0;
224 }
225 
226 dmrpp_easy_handle::~dmrpp_easy_handle()
227 {
228  curl_easy_cleanup(d_handle);
229 }
230 
239 static bool evaluate_curl_response(CURL* eh)
240 {
241  long http_code = 0;
242  CURLcode res = curl_easy_getinfo(eh, CURLINFO_RESPONSE_CODE, &http_code);
243  if (CURLE_OK != res) {
244  throw BESInternalError(string("Error getting HTTP response code: ").append(curl_error_msg(res, (char*)"")), __FILE__, __LINE__);
245  }
246 
247  // Newer Apache servers return 206 for range requests. jhrg 8/8/18
248  switch (http_code) {
249  case 200: // OK
250  case 206: // Partial content - this is to be expected since we use range gets
251  // cases 201-205 are things we should probably reject, unless we add more
252  // comprehensive HTTP/S processing here. jhrg 8/8/18
253  return true;
254 
255  case 500: // Internal server error
256  case 503: // Service Unavailable
257  case 504: // Gateway Timeout
258  return false;
259 
260  default: {
261  ostringstream oss;
262  oss << "HTTP status error: Expected an OK status, but got: ";
263  oss << http_code;
264  throw BESInternalError(oss.str(), __FILE__, __LINE__);
265  }
266  }
267 }
268 
275 {
276  // Treat HTTP/S requests specially; retry some kinds of failures.
277  if (d_url.find("https://") == 0 || d_url.find("http://") == 0) {
278  unsigned int tries = 0;
279  bool success = true;
280  unsigned int retry_time = initial_retry_time;
281 
282  // Perform the request
283  do {
284  CURLcode curl_code = curl_easy_perform(d_handle);
285  ++tries;
286 
287  if (CURLE_OK != curl_code) {
288  throw BESInternalError(string("Data transfer error: ").append(curl_error_msg(curl_code, d_errbuf)),
289  __FILE__, __LINE__);
290  }
291 
292  success = evaluate_curl_response(d_handle);
293 
294  if (!success) {
295  if (tries == retry_limit) {
296  throw BESInternalError(
297  string("Data transfer error: Number of re-tries to S3 exceeded: ").append(
298  curl_error_msg(curl_code, d_errbuf)), __FILE__, __LINE__);
299  }
300  else {
301  LOG("HTTP transfer 500 error, will retry (trial " << tries << " for: " << d_url << ").");
302  usleep(retry_time);
303  retry_time *= 2;
304  }
305  }
306  } while (!success);
307  }
308  else {
309  CURLcode curl_code = curl_easy_perform(d_handle);
310  if (CURLE_OK != curl_code) {
311  throw BESInternalError(string("Data transfer error: ").append(curl_error_msg(curl_code, d_errbuf)),
312  __FILE__, __LINE__);
313  }
314  }
315 
316  d_chunk->set_is_read(true);
317 }
318 
326 struct dmrpp_multi_handle::multi_handle {
327 #if HAVE_CURL_MULTI_API
328  CURLM *curlm;
329 #else
330  std::vector<dmrpp_easy_handle *> ehandles;
331 #endif
332 };
333 
334 dmrpp_multi_handle::dmrpp_multi_handle()
335 {
336  p_impl = new multi_handle;
337 #if HAVE_CURL_MULTI_API
338  p_impl->curlm = curl_multi_init();
339 #endif
340 }
341 
342 dmrpp_multi_handle::~dmrpp_multi_handle()
343 {
344 #if HAVE_CURL_MULTI_API
345  curl_multi_cleanup(p_impl->curlm);
346 #endif
347  delete p_impl;
348 }
349 
359 {
360 #if HAVE_CURL_MULTI_API
361  curl_multi_add_handle(p_impl->curlm, eh->d_handle);
362 #else
363  p_impl->ehandles.push_back(eh);
364 #endif
365 }
366 
367 // This is only used if we don't have the Multi API and have to use pthreads.
368 // jhrg 8/27/18
369 #if !HAVE_CURL_MULTI_API
370 static void *easy_handle_read_data(void *handle)
371 {
372  dmrpp_easy_handle *eh = reinterpret_cast<dmrpp_easy_handle*>(handle);
373 
374  try {
375  eh->read_data();
376  pthread_exit(NULL);
377  }
378  catch (BESError &e) {
379  string *error = new string(e.get_verbose_message());
380  pthread_exit(error);
381  }
382 }
383 #endif
384 
398 {
399 #if HAVE_CURL_MULTI_API
400  // Use the libcurl Multi API here. Alternate version follows...
401 
402  int still_running = 0;
403  CURLMcode mres = curl_multi_perform(p_impl->curlm, &still_running);
404  if (mres != CURLM_OK)
405  throw BESInternalError(string("Could not initiate data read: ").append(curl_multi_strerror(mres)), __FILE__,
406  __LINE__);
407 
408  do {
409  int numfds = 0;
410  mres = curl_multi_wait(p_impl->curlm, NULL, 0, MAX_WAIT_MSECS, &numfds);
411  if (mres != CURLM_OK)
412  throw BESInternalError(string("Could not wait on data read: ").append(curl_multi_strerror(mres)), __FILE__,
413  __LINE__);
414 
415  mres = curl_multi_perform(p_impl->curlm, &still_running);
416  if (mres != CURLM_OK)
417  throw BESInternalError(string("Could not iterate data read: ").append(curl_multi_strerror(mres)), __FILE__,
418  __LINE__);
419 
420  } while (still_running);
421 
422  CURLMsg *msg = 0;
423  int msgs_left = 0;
424  while ((msg = curl_multi_info_read(p_impl->curlm, &msgs_left))) {
425  if (msg->msg == CURLMSG_DONE) {
426  CURL *eh = msg->easy_handle;
427 
428  CURLcode res = msg->data.result;
429  if (res != CURLE_OK)
430  throw BESInternalError(string("Error HTTP: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
431 
432  // Note: 'eh' is the easy handle returned by culr_multi_info_read(),
433  // but in it's private field is our dmrpp_easy_handle object. We need
434  // both to mark this data read operation as complete.
436  res = curl_easy_getinfo(eh, CURLINFO_PRIVATE, &dmrpp_easy_handle);
437  if (res != CURLE_OK)
438  throw BESInternalError(string("Could not access easy handle: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
439 
440  // This code has to work with both http/s: and file: protocols. Here we check the
441  // HTTP status code. If the protocol is not HTTP, we assume since msg->data.result
442  // returned CURLE_OK, that the transfer worked. jhrg 5/1/18
443  if (dmrpp_easy_handle->d_url.find("http://") == 0 || dmrpp_easy_handle->d_url.find("https://") == 0) {
444  evaluate_curl_response(eh);
445  }
446 
447  // If we are here, the request was successful.
448  dmrpp_easy_handle->d_chunk->set_is_read(true); // Set the is_read() property for chunk here.
449 
450  // NB: Remove the handle from the CURLM* and _then_ call release_handle()
451  // so that the KEEP_ALIVE 0 (off) works. Calling delete on the dmrpp_easy_handle
452  // will invalidate 'eh', so call that after removing 'eh'.
453  mres = curl_multi_remove_handle(p_impl->curlm, eh);
454  if (mres != CURLM_OK)
455  throw BESInternalError(string("Could not remove libcurl handle: ").append(curl_multi_strerror(mres)), __FILE__, __LINE__);
456 
457  DmrppRequestHandler::curl_handle_pool->release_handle(dmrpp_easy_handle);
458  }
459  else { // != CURLMSG_DONE
460  throw BESInternalError("Error getting HTTP or FILE responses.", __FILE__, __LINE__);
461  }
462  }
463 #else
464  // Start the processing pipelines using pthreads - there is no Multi API
465 
466  pthread_t threads[p_impl->ehandles.size()];
467  unsigned int num_threads = 0;
468  try {
469  for (unsigned int i = 0; i < p_impl->ehandles.size(); ++i) {
470  int status = pthread_create(&threads[i], NULL, easy_handle_read_data, (void*) p_impl->ehandles[i]);
471  if (status == 0) {
472  ++num_threads;
473  }
474  else {
475  ostringstream oss("Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
476  oss << i << ": " << strerror(status);
477  throw BESInternalError(oss.str(), __FILE__, __LINE__);
478  }
479  }
480 
481  // Now join the child threads.
482  for (unsigned int i = 0; i < num_threads; ++i) {
483  string *error;
484  int status = pthread_join(threads[i], (void**) &error);
485  if (status != 0) {
486  ostringstream oss("Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
487  oss << i << ": " << strerror(status);
488  throw BESInternalError(oss.str(), __FILE__, __LINE__);
489  }
490  else if (error != 0) {
491  BESInternalError e(*error, __FILE__, __LINE__);
492  delete error;
493  throw e;
494  }
495  }
496  }
497  catch(...) {
498  join_threads(threads, num_threads);
499  throw;
500  }
501 
502  // Now remove the easy_handles, mimicking the behavior when using the real Multi API
503  p_impl->ehandles.clear();
504 #endif
505 }
506 
507 CurlHandlePool::CurlHandlePool() : d_multi_handle(0)
508 {
509  d_max_easy_handles = DmrppRequestHandler::d_max_parallel_transfers;
510  d_multi_handle = new dmrpp_multi_handle();
511 
512  for (unsigned int i = 0; i < d_max_easy_handles; ++i) {
513  d_easy_handles.push_back(new dmrpp_easy_handle());
514  }
515 
516  if (pthread_mutex_init(&d_get_easy_handle_mutex, 0) != 0)
517  throw BESInternalError("Could not initialize mutex in CurlHandlePool", __FILE__, __LINE__);
518 }
519 
536 {
537  Lock lock(d_get_easy_handle_mutex); // RAII
538 
539  dmrpp_easy_handle *handle = 0;
540  for (vector<dmrpp_easy_handle *>::iterator i = d_easy_handles.begin(), e = d_easy_handles.end(); i != e; ++i) {
541  if (!(*i)->d_in_use)
542  handle = *i;
543  }
544 
545  if (handle) {
546  // Once here, d_easy_handle holds a CURL* we can use.
547  handle->d_in_use = true;
548  handle->d_url = chunk->get_data_url();
549 
550  // Here we check to make sure that the we are only going to
551  // access an approved location with this easy_handle
552  if(!WhiteList::get_white_list()->is_white_listed(handle->d_url)){
553  string msg = "ERROR!! The chunk url " + handle->d_url + " does not match any white-list rule. ";
554  throw BESForbiddenError(msg ,__FILE__,__LINE__);
555  }
556 
557  handle->d_chunk = chunk;
558 
559  CURLcode res = curl_easy_setopt(handle->d_handle, CURLOPT_URL, chunk->get_data_url().c_str());
560  if (res != CURLE_OK) throw BESInternalError(string("HTTP Error setting URL: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__, __LINE__);
561 
562  // get the offset to offset + size bytes
563  if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_RANGE, chunk->get_curl_range_arg_string().c_str())))
564  throw BESInternalError(string("HTTP Error setting Range: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__,
565  __LINE__);
566 
567  // Pass this to write_data as the fourth argument
568  if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_WRITEDATA, reinterpret_cast<void*>(chunk))))
569  throw BESInternalError(string("CURL Error setting chunk as data buffer: ").append(curl_error_msg(res, handle->d_errbuf)),
570  __FILE__, __LINE__);
571 
572  // store the easy_handle so that we can call release_handle in multi_handle::read_data()
573  if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_PRIVATE, reinterpret_cast<void*>(handle))))
574  throw BESInternalError(string("CURL Error setting easy_handle as private data: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__,
575  __LINE__);
576  }
577 
578  return handle;
579 }
580 
588 {
589  // In get_easy_handle, it's possible that d_in_use could be false and d_chunk
590  // could not be set to 0 (because a separate thread could be running these
591  // methods). In that case, the thread running get_easy_handle could set d_chunk,
592  // and then this thread could clear it (... unlikely, but an optimizing compiler is
593  // free to reorder statements so long as they don't alter the function's behavior).
594  // Timing tests indicate this lock does not cost anything that can be measured.
595  // jhrg 8/21/18
596  Lock lock(d_get_easy_handle_mutex);
597 
598 #if KEEP_ALIVE
599  handle->d_url = "";
600  handle->d_chunk = 0;
601  handle->d_in_use = false;
602 #else
603  // This is to test the effect of libcurl Keep Alive support
604  // Find the handle; erase from the vector; delete; allocate a new handle and push it back on
605  for (std::vector<dmrpp_easy_handle *>::iterator i = d_easy_handles.begin(), e = d_easy_handles.end(); i != e; ++i) {
606  if (*i == handle) {
607  BESDEBUG("dmrpp:5", "Found a handle match for the " << i - d_easy_handles.begin() << "th easy handle." << endl);
608  delete handle;
609  *i = new dmrpp_easy_handle();
610  break;
611  }
612  }
613 #endif
614 }
dmrpp::Chunk
Definition: Chunk.h:43
dmrpp::dmrpp_easy_handle
Bundle a libcurl easy handle to other information.
Definition: CurlHandlePool.h:61
dmrpp::dmrpp_easy_handle::read_data
void read_data()
This is the read_data() method for serial transfers.
Definition: CurlHandlePool.cc:274
dmrpp::dmrpp_easy_handle::dmrpp_easy_handle
dmrpp_easy_handle()
Build a string with hex info about stuff libcurl gets.
Definition: CurlHandlePool.cc:178
dmrpp::dmrpp_multi_handle::read_data
void read_data()
The read_data() method for parallel transfers.
Definition: CurlHandlePool.cc:397
dmrpp::dmrpp_multi_handle
Encapsulate a libcurl multi handle.
Definition: CurlHandlePool.h:82
BESForbiddenError
error thrown if the BES is not allowed to access the resource requested
Definition: BESForbiddenError.h:40
BESInternalError
exception thrown if inernal error encountered
Definition: BESInternalError.h:43
dmrpp::dmrpp_multi_handle::add_easy_handle
void add_easy_handle(dmrpp_easy_handle *eh)
Add an Easy Handle to a Multi Handle object.
Definition: CurlHandlePool.cc:358
dmrpp::Chunk::get_curl_range_arg_string
virtual std::string get_curl_range_arg_string()
Returns a curl range argument. The libcurl requires a string argument for range-ge activitys,...
Definition: Chunk.cc:320
dmrpp::Chunk::get_data_url
virtual std::string get_data_url() const
Get the data url string for this Chunk's data block.
Definition: Chunk.h:177
dmrpp::CurlHandlePool::get_easy_handle
dmrpp_easy_handle * get_easy_handle(Chunk *chunk)
Definition: CurlHandlePool.cc:535
dmrpp::Lock
Definition: CurlHandlePool.h:41
BESError
Abstract exception class for the BES with basic string message.
Definition: BESError.h:58
dmrpp::CurlHandlePool::release_handle
void release_handle(dmrpp_easy_handle *h)
Definition: CurlHandlePool.cc:587