34 #include <BESInternalError.h>
35 #include <BESContextManager.h>
38 #include "CurlHandlePool.h"
39 #include "DmrppRequestHandler.h"
41 const string debug =
"dmrpp";
47 const std::string Chunk::tracking_context =
"cloudydap";
62 size_t chunk_write_data(
void *buffer,
size_t size,
size_t nmemb,
void *data)
64 Chunk *c_ptr = reinterpret_cast<Chunk*>(data);
70 unsigned long long bytes_read = c_ptr->get_bytes_read();
71 size_t nbytes = size * nmemb;
74 assert(bytes_read + nbytes <= c_ptr->get_rbuf_size());
76 memcpy(c_ptr->get_rbuf() + bytes_read, buffer, nbytes);
78 c_ptr->set_bytes_read(bytes_read + nbytes);
93 void inflate(
char *dest,
unsigned int dest_len,
char *src,
unsigned int src_len)
105 memset(&z_strm, 0,
sizeof(z_strm));
106 z_strm.next_in = (Bytef *) src;
107 z_strm.avail_in = src_len;
108 z_strm.next_out = (Bytef *) dest;
109 z_strm.avail_out = dest_len;
112 if (Z_OK != inflateInit(&z_strm))
113 throw BESError(
"Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
119 status = inflate(&z_strm, Z_SYNC_FLUSH);
122 if (Z_STREAM_END == status)
break;
125 if (Z_OK != status) {
126 (void) inflateEnd(&z_strm);
127 throw BESError(
"Failed to inflate data chunk.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
134 if (0 == z_strm.avail_out) {
135 throw BESError(
"Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
142 if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
143 (void) inflateEnd(&z_strm);
144 HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0,
"memory allocation failed for inflate decompression")
149 z_strm.next_out = (
unsigned char*) outbuf + z_strm.total_out;
150 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
154 }
while (status == Z_OK);
157 (void) inflateEnd(&z_strm);
185 void unshuffle(
char *dest,
const char *src,
unsigned int src_size,
unsigned int width)
187 unsigned int elems = src_size / width;
190 if (!(width > 1 && elems > 1)) {
191 memcpy(dest, const_cast<char*>(src), src_size);
195 char *_src = const_cast<char*>(src);
199 for (
unsigned int i = 0; i < width; i++) {
211 size_t duffs_index = (elems + 7) / 8;
214 assert(0 &&
"This Should never be executed!");
219 #define DUFF_GUTS *_dest = *_src++; _dest += width;
236 }
while (--duffs_index > 0);
244 size_t leftover = src_size % width;
249 _dest -= (width - 1);
250 memcpy((
void*) _dest, (
void*) _src, leftover);
268 void Chunk::set_position_in_array(
const string &pia)
270 if (pia.empty())
return;
272 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
276 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
277 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
279 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
280 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
283 istringstream iss(pia.substr(1, pia.length()-2));
287 while (!iss.eof() ) {
290 d_chunk_position_in_array.push_back(i);
304 void Chunk::set_position_in_array(
const std::vector<unsigned int> &pia)
306 if (pia.size() == 0)
return;
308 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
310 d_chunk_position_in_array = pia;
320 string Chunk::get_curl_range_arg_string()
323 range << d_offset <<
"-" << d_offset + d_size - 1;
338 void Chunk::add_tracking_query_param()
354 string aws_s3_url_https(
"https://s3.amazonaws.com/");
355 string aws_s3_url_http(
"http://s3.amazonaws.com/");
358 if (d_data_url.find(aws_s3_url_https) == 0 || d_data_url.find(aws_s3_url_http) == 0) {
361 string cloudydap_context_value = BESContextManager::TheManager()->
get_context(tracking_context, found);
363 d_query_marker.append(
"?").append(tracking_context).append(
"=").append(cloudydap_context_value);
380 void *inflate_chunk(
void *arg_list)
382 inflate_chunk_args *args = reinterpret_cast<inflate_chunk_args*>(arg_list);
385 args->chunk->inflate_chunk(args->deflate, args->shuffle, args->chunk_size, args->elem_width);
409 void Chunk::inflate_chunk(
bool deflate,
bool shuffle,
unsigned int chunk_size,
unsigned int elem_width)
425 chunk_size *= elem_width;
428 char *dest =
new char[chunk_size];
430 inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
432 set_rbuf(dest, chunk_size);
442 char *dest =
new char[get_rbuf_size()];
444 unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
445 set_rbuf(dest, get_rbuf_size());
453 d_is_inflated =
true;
455 #if 0 // This was handy during development for debugging. Keep it for awhile (year or two) before we drop it ndp - 01/18/17
457 unsigned long long chunk_buf_size = get_rbuf_size();
458 dods_float32 *vals = (dods_float32 *) get_rbuf();
460 (*os) << std::fixed << std::setfill(
'_') << std::setw(10) << std::setprecision(0);
461 (*os) <<
"DmrppArray::"<< __func__ <<
"() - Chunk[" << i <<
"]: " << endl;
462 for(
unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
463 (*os) << vals[k] <<
", " << ((k==0)|((k+1)%10)?
"":
"\n");
478 void Chunk::read_chunk()
481 BESDEBUG(
"dmrpp",
"Chunk::"<< __func__ <<
"() - Already been read! Returning." << endl);
487 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(
this);
493 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
496 if (get_size() != get_bytes_read()) {
498 oss <<
"Wrong number of bytes read for chunk; read: " << get_bytes_read() <<
", expected: " << get_size();
515 void Chunk::dump(ostream &oss)
const
518 oss <<
"[ptr='" << (
void *)
this <<
"']";
519 oss <<
"[data_url='" << d_data_url <<
"']";
520 oss <<
"[offset=" << d_offset <<
"]";
521 oss <<
"[size=" << d_size <<
"]";
522 oss <<
"[chunk_position_in_array=(";
523 for (
unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
525 oss << d_chunk_position_in_array[i];
528 oss <<
"[is_read=" << d_is_read <<
"]";
529 oss <<
"[is_inflated=" << d_is_inflated <<
"]";
532 string Chunk::to_string()
const
534 std::ostringstream oss;