33 #include <curl/curl.h>
36 #include <curl/multi.h>
43 #include "BESInternalError.h"
44 #include "BESForbiddenError.h"
45 #include "WhiteList.h"
47 #include "DmrppRequestHandler.h"
48 #include "DmrppCommon.h"
49 #include "CurlHandlePool.h"
52 #define KEEP_ALIVE 1 // Reuse libcurl easy handles (1) or not (0).
54 #define CURL_VERBOSE 0
56 static const int MAX_WAIT_MSECS = 30*1000;
57 static const unsigned int retry_limit = 10;
58 static const unsigned int initial_retry_time = 1000;
59 static const string dmrpp_3 =
"dmrpp:3";
61 using namespace dmrpp;
65 Lock::Lock(pthread_mutex_t &lock) : m_mutex(lock)
67 int status = pthread_mutex_lock(&m_mutex);
68 if (status != 0)
throw BESInternalError(
"Could not lock in CurlHandlePool", __FILE__, __LINE__);
73 int status = pthread_mutex_unlock(&m_mutex);
75 ERROR(
"Could not unlock in CurlHandlePool");
82 curl_error_msg(CURLcode res,
char *errbuf)
85 size_t len = strlen(errbuf);
88 oss <<
" (code: " << (int)res <<
")";
91 oss << curl_easy_strerror(res) <<
"(result: " << res <<
")";
104 string dump(
const char *text,
unsigned char *ptr,
size_t size)
108 unsigned int width=0x10;
111 oss << text <<
", " << std::setw(10) << (long)size << std::setbase(16) << (long)size << endl;
113 for(i=0; i<size; i+= width) {
114 oss << std::setw(4) << (long)i;
118 for(c = 0; c < width; c++) {
120 oss << std::setw(2) << ptr[i+c];
130 for(c = 0; (c < width) && (i+c < size); c++) {
131 char x = (ptr[i+c] >= 0x20 && ptr[i+c] < 0x80) ? ptr[i+c] :
'.';
133 oss << std::setw(1) << x;
151 int curl_trace(CURL *, curl_infotype type,
char *data,
size_t ,
void *)
155 case CURLINFO_TEXT: {
158 while((pos = text.find(
'\n')) != string::npos)
159 text = text.substr(0, pos);
160 LOG(
"libcurl == Info: " << text << endl);
164 case CURLINFO_DATA_IN:
165 case CURLINFO_SSL_DATA_IN:
167 case CURLINFO_DATA_OUT:
168 case CURLINFO_SSL_DATA_OUT:
170 case CURLINFO_HEADER_IN:
171 case CURLINFO_HEADER_OUT:
180 d_handle = curl_easy_init();
181 if (!d_handle)
throw BESInternalError(
"Could not allocate CURL handle", __FILE__, __LINE__);
185 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_ERRORBUFFER, d_errbuf)))
186 throw BESInternalError(
string(
"CURL Error: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
191 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_DEBUGFUNCTION, curl_trace)))
192 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
195 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_VERBOSE, 1L)))
196 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
200 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_WRITEFUNCTION, chunk_write_data)))
201 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
203 #ifdef CURLOPT_TCP_KEEPALIVE
205 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPALIVE, 1L)))
206 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__);
209 #ifdef CURLOPT_TCP_KEEPIDLE
211 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPIDLE, 120L)))
212 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__);
215 #ifdef CURLOPT_TCP_KEEPINTVL
217 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPINTVL, 120L)))
218 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__)
226 dmrpp_easy_handle::~dmrpp_easy_handle()
228 curl_easy_cleanup(d_handle);
239 static bool evaluate_curl_response(CURL* eh)
242 CURLcode res = curl_easy_getinfo(eh, CURLINFO_RESPONSE_CODE, &http_code);
243 if (CURLE_OK != res) {
244 throw BESInternalError(
string(
"Error getting HTTP response code: ").append(curl_error_msg(res, (
char*)
"")), __FILE__, __LINE__);
262 oss <<
"HTTP status error: Expected an OK status, but got: ";
277 if (d_url.find(
"https://") == 0 || d_url.find(
"http://") == 0) {
278 unsigned int tries = 0;
280 unsigned int retry_time = initial_retry_time;
284 CURLcode curl_code = curl_easy_perform(d_handle);
287 if (CURLE_OK != curl_code) {
288 throw BESInternalError(
string(
"Data transfer error: ").append(curl_error_msg(curl_code, d_errbuf)),
292 success = evaluate_curl_response(d_handle);
295 if (tries == retry_limit) {
297 string(
"Data transfer error: Number of re-tries to S3 exceeded: ").append(
298 curl_error_msg(curl_code, d_errbuf)), __FILE__, __LINE__);
301 LOG(
"HTTP transfer 500 error, will retry (trial " << tries <<
" for: " << d_url <<
").");
309 CURLcode curl_code = curl_easy_perform(d_handle);
310 if (CURLE_OK != curl_code) {
311 throw BESInternalError(
string(
"Data transfer error: ").append(curl_error_msg(curl_code, d_errbuf)),
316 d_chunk->set_is_read(
true);
326 struct dmrpp_multi_handle::multi_handle {
327 #if HAVE_CURL_MULTI_API
330 std::vector<dmrpp_easy_handle *> ehandles;
334 dmrpp_multi_handle::dmrpp_multi_handle()
336 p_impl =
new multi_handle;
337 #if HAVE_CURL_MULTI_API
338 p_impl->curlm = curl_multi_init();
342 dmrpp_multi_handle::~dmrpp_multi_handle()
344 #if HAVE_CURL_MULTI_API
345 curl_multi_cleanup(p_impl->curlm);
360 #if HAVE_CURL_MULTI_API
361 curl_multi_add_handle(p_impl->curlm, eh->d_handle);
363 p_impl->ehandles.push_back(eh);
369 #if !HAVE_CURL_MULTI_API
370 static void *easy_handle_read_data(
void *handle)
379 string *error =
new string(e.get_verbose_message());
399 #if HAVE_CURL_MULTI_API
402 int still_running = 0;
403 CURLMcode mres = curl_multi_perform(p_impl->curlm, &still_running);
404 if (mres != CURLM_OK)
405 throw BESInternalError(
string(
"Could not initiate data read: ").append(curl_multi_strerror(mres)), __FILE__,
410 mres = curl_multi_wait(p_impl->curlm, NULL, 0, MAX_WAIT_MSECS, &numfds);
411 if (mres != CURLM_OK)
412 throw BESInternalError(
string(
"Could not wait on data read: ").append(curl_multi_strerror(mres)), __FILE__,
415 mres = curl_multi_perform(p_impl->curlm, &still_running);
416 if (mres != CURLM_OK)
417 throw BESInternalError(
string(
"Could not iterate data read: ").append(curl_multi_strerror(mres)), __FILE__,
420 }
while (still_running);
424 while ((msg = curl_multi_info_read(p_impl->curlm, &msgs_left))) {
425 if (msg->msg == CURLMSG_DONE) {
426 CURL *eh = msg->easy_handle;
428 CURLcode res = msg->data.result;
430 throw BESInternalError(
string(
"Error HTTP: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
438 throw BESInternalError(
string(
"Could not access easy handle: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
444 evaluate_curl_response(eh);
453 mres = curl_multi_remove_handle(p_impl->curlm, eh);
454 if (mres != CURLM_OK)
455 throw BESInternalError(
string(
"Could not remove libcurl handle: ").append(curl_multi_strerror(mres)), __FILE__, __LINE__);
460 throw BESInternalError(
"Error getting HTTP or FILE responses.", __FILE__, __LINE__);
466 pthread_t threads[p_impl->ehandles.size()];
467 unsigned int num_threads = 0;
469 for (
unsigned int i = 0; i < p_impl->ehandles.size(); ++i) {
470 int status = pthread_create(&threads[i], NULL, easy_handle_read_data, (
void*) p_impl->ehandles[i]);
475 ostringstream oss(
"Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
476 oss << i <<
": " << strerror(status);
482 for (
unsigned int i = 0; i < num_threads; ++i) {
484 int status = pthread_join(threads[i], (
void**) &error);
486 ostringstream oss(
"Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
487 oss << i <<
": " << strerror(status);
490 else if (error != 0) {
498 join_threads(threads, num_threads);
503 p_impl->ehandles.clear();
507 CurlHandlePool::CurlHandlePool() : d_multi_handle(0)
509 d_max_easy_handles = DmrppRequestHandler::d_max_parallel_transfers;
512 for (
unsigned int i = 0; i < d_max_easy_handles; ++i) {
516 if (pthread_mutex_init(&d_get_easy_handle_mutex, 0) != 0)
517 throw BESInternalError(
"Could not initialize mutex in CurlHandlePool", __FILE__, __LINE__);
537 Lock lock(d_get_easy_handle_mutex);
540 for (vector<dmrpp_easy_handle *>::iterator i = d_easy_handles.begin(), e = d_easy_handles.end(); i != e; ++i) {
547 handle->d_in_use =
true;
552 if(!WhiteList::get_white_list()->is_white_listed(handle->d_url)){
553 string msg =
"ERROR!! The chunk url " + handle->d_url +
" does not match any white-list rule. ";
557 handle->d_chunk = chunk;
559 CURLcode res = curl_easy_setopt(handle->d_handle, CURLOPT_URL, chunk->
get_data_url().c_str());
560 if (res != CURLE_OK)
throw BESInternalError(
string(
"HTTP Error setting URL: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__, __LINE__);
564 throw BESInternalError(
string(
"HTTP Error setting Range: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__,
568 if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_WRITEDATA, reinterpret_cast<void*>(chunk))))
569 throw BESInternalError(
string(
"CURL Error setting chunk as data buffer: ").append(curl_error_msg(res, handle->d_errbuf)),
573 if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_PRIVATE, reinterpret_cast<void*>(handle))))
574 throw BESInternalError(
string(
"CURL Error setting easy_handle as private data: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__,
596 Lock lock(d_get_easy_handle_mutex);
601 handle->d_in_use =
false;
605 for (std::vector<dmrpp_easy_handle *>::iterator i = d_easy_handles.begin(), e = d_easy_handles.end(); i != e; ++i) {
607 BESDEBUG(
"dmrpp:5",
"Found a handle match for the " << i - d_easy_handles.begin() <<
"th easy handle." << endl);