bes  Updated for version 3.20.5
Chunk.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2016 OPeNDAP, Inc.
6 // Author: Nathan Potter <ndp@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <sstream>
27 #include <cstdlib>
28 #include <cstring>
29 #include <cassert>
30 
31 #include <zlib.h>
32 
33 #include <BESDebug.h>
34 #include <BESInternalError.h>
35 #include <BESContextManager.h>
36 
37 #include "Chunk.h"
38 #include "CurlHandlePool.h"
39 #include "DmrppRequestHandler.h"
40 
41 const string debug = "dmrpp";
42 
43 using namespace std;
44 
45 namespace dmrpp {
46 
47 const std::string Chunk::tracking_context = "cloudydap";
48 
62 size_t chunk_write_data(void *buffer, size_t size, size_t nmemb, void *data)
63 {
64  Chunk *c_ptr = reinterpret_cast<Chunk*>(data);
65 
66  // rbuf: |******++++++++++----------------------|
67  // ^ ^ bytes_read + nbytes
68  // | bytes_read
69 
70  unsigned long long bytes_read = c_ptr->get_bytes_read();
71  size_t nbytes = size * nmemb;
72 
73  // If this fails, the code will write beyond the buffer.
74  assert(bytes_read + nbytes <= c_ptr->get_rbuf_size());
75 
76  memcpy(c_ptr->get_rbuf() + bytes_read, buffer, nbytes);
77 
78  c_ptr->set_bytes_read(bytes_read + nbytes);
79 
80  return nbytes;
81 }
82 
93 void inflate(char *dest, unsigned int dest_len, char *src, unsigned int src_len)
94 {
95  /* Sanity check */
96  assert(src_len > 0);
97  assert(src);
98  assert(dest_len > 0);
99  assert(dest);
100 
101  /* Input; uncompress */
102  z_stream z_strm; /* zlib parameters */
103 
104  /* Set the uncompression parameters */
105  memset(&z_strm, 0, sizeof(z_strm));
106  z_strm.next_in = (Bytef *) src;
107  z_strm.avail_in = src_len;
108  z_strm.next_out = (Bytef *) dest;
109  z_strm.avail_out = dest_len;
110 
111  /* Initialize the uncompression routines */
112  if (Z_OK != inflateInit(&z_strm))
113  throw BESError("Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
114 
115  /* Loop to uncompress the buffer */
116  int status = Z_OK;
117  do {
118  /* Uncompress some data */
119  status = inflate(&z_strm, Z_SYNC_FLUSH);
120 
121  /* Check if we are done uncompressing data */
122  if (Z_STREAM_END == status) break; /*done*/
123 
124  /* Check for error */
125  if (Z_OK != status) {
126  (void) inflateEnd(&z_strm);
127  throw BESError("Failed to inflate data chunk.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
128  }
129  else {
130  /* If we're not done and just ran out of buffer space, it's an error.
131  * The HDF5 library code would extend the buffer as needed, but for
132  * this handler, we always know the size of the uncompressed chunk.
133  */
134  if (0 == z_strm.avail_out) {
135  throw BESError("Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
136 #if 0
137  /* Here's how to extend the buffer if needed. This might be useful someday... */
138  void *new_outbuf; /* Pointer to new output buffer */
139 
140  /* Allocate a buffer twice as big */
141  nalloc *= 2;
142  if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
143  (void) inflateEnd(&z_strm);
144  HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0, "memory allocation failed for inflate decompression")
145  } /* end if */
146  outbuf = new_outbuf;
147 
148  /* Update pointers to buffer for next set of uncompressed data */
149  z_strm.next_out = (unsigned char*) outbuf + z_strm.total_out;
150  z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
151 #endif
152  } /* end if */
153  } /* end else */
154  } while (status == Z_OK);
155 
156  /* Finish uncompressing the stream */
157  (void) inflateEnd(&z_strm);
158 }
159 
160 // #define this to enable the duff's device loop unrolling code.
161 // jhrg 1/19/17
162 #define DUFFS_DEVICE
163 
185 void unshuffle(char *dest, const char *src, unsigned int src_size, unsigned int width)
186 {
187  unsigned int elems = src_size / width; // int division rounds down
188 
189  /* Don't do anything for 1-byte elements, or "fractional" elements */
190  if (!(width > 1 && elems > 1)) {
191  memcpy(dest, const_cast<char*>(src), src_size);
192  }
193  else {
194  /* Get the pointer to the source buffer (Alias for source buffer) */
195  char *_src = const_cast<char*>(src);
196  char *_dest = 0; // Alias for destination buffer
197 
198  /* Input; unshuffle */
199  for (unsigned int i = 0; i < width; i++) {
200  _dest = dest + i;
201 #ifndef DUFFS_DEVICE
202  size_t j = elems;
203  while(j > 0) {
204  *_dest = *_src++;
205  _dest += width;
206 
207  j--;
208  }
209 #else /* DUFFS_DEVICE */
210  {
211  size_t duffs_index = (elems + 7) / 8; /* Counting index for Duff's device */
212  switch (elems % 8) {
213  default:
214  assert(0 && "This Should never be executed!");
215  break;
216  case 0:
217  do {
218  // This macro saves repeating the same line 8 times
219 #define DUFF_GUTS *_dest = *_src++; _dest += width;
220 
221  DUFF_GUTS
222  case 7:
223  DUFF_GUTS
224  case 6:
225  DUFF_GUTS
226  case 5:
227  DUFF_GUTS
228  case 4:
229  DUFF_GUTS
230  case 3:
231  DUFF_GUTS
232  case 2:
233  DUFF_GUTS
234  case 1:
235  DUFF_GUTS
236  } while (--duffs_index > 0);
237  } /* end switch */
238  } /* end block */
239 #endif /* DUFFS_DEVICE */
240 
241  } /* end for i = 0 to width*/
242 
243  /* Compute the leftover bytes if there are any */
244  size_t leftover = src_size % width;
245 
246  /* Add leftover to the end of data */
247  if (leftover > 0) {
248  /* Adjust back to end of shuffled bytes */
249  _dest -= (width - 1); /*lint !e794 _dest is initialized */
250  memcpy((void*) _dest, (void*) _src, leftover);
251  }
252  } /* end if width and elems both > 1 */
253 }
254 
268 void Chunk::set_position_in_array(const string &pia)
269 {
270  if (pia.empty()) return;
271 
272  if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
273 
274  // Assume input is [x,y,...,z] where x, ..., are integers; modest syntax checking
275  // [1] is a minimal 'position in array' string.
276  if (pia.find('[') == string::npos || pia.find(']') == string::npos || pia.length() < 3)
277  throw BESInternalError("while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
278 
279  if (pia.find_first_not_of("[]1234567890,") != string::npos)
280  throw BESInternalError("while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
281 
282  // string off []; iss holds x,y,...,z
283  istringstream iss(pia.substr(1, pia.length()-2));
284 
285  char c;
286  unsigned int i;
287  while (!iss.eof() ) {
288  iss >> i; // read an integer
289 
290  d_chunk_position_in_array.push_back(i);
291 
292  iss >> c; // read a separator (,)
293  }
294 }
295 
304 void Chunk::set_position_in_array(const std::vector<unsigned int> &pia)
305 {
306  if (pia.size() == 0) return;
307 
308  if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
309 
310  d_chunk_position_in_array = pia;
311 }
312 
320 string Chunk::get_curl_range_arg_string()
321 {
322  ostringstream range; // range-get needs a string arg for the range
323  range << d_offset << "-" << d_offset + d_size - 1;
324  return range.str();
325 }
326 
338 void Chunk::add_tracking_query_param()
339 {
354  string aws_s3_url_https("https://s3.amazonaws.com/");
355  string aws_s3_url_http("http://s3.amazonaws.com/");
356 
357  // Is it an AWS S3 access? (y.find(x) returns 0 when y starts with x)
358  if (d_data_url.find(aws_s3_url_https) == 0 || d_data_url.find(aws_s3_url_http) == 0) {
359  // Yup, headed to S3.
360  bool found = false;
361  string cloudydap_context_value = BESContextManager::TheManager()->get_context(tracking_context, found);
362  if (found) {
363  d_query_marker.append("?").append(tracking_context).append("=").append(cloudydap_context_value);
364  }
365  }
366 }
367 
368 #if 0
369 
380 void *inflate_chunk(void *arg_list)
381 {
382  inflate_chunk_args *args = reinterpret_cast<inflate_chunk_args*>(arg_list);
383 
384  try {
385  args->chunk->inflate_chunk(args->deflate, args->shuffle, args->chunk_size, args->elem_width);
386  }
387  catch (BESError &error) {
388  delete args;
389  pthread_exit(new BESError(error));
390  }
391 
392  delete args;
393  pthread_exit(NULL);
394 }
395 #endif
396 
397 
409 void Chunk::inflate_chunk(bool deflate, bool shuffle, unsigned int chunk_size, unsigned int elem_width)
410 {
411  // This code is pretty naive - there are apparently a number of
412  // different ways HDF5 can compress data, and it does also use a scheme
413  // where several algorithms can be applied in sequence. For now, get
414  // simple zlib deflate working.jhrg 1/15/17
415  // Added support for shuffle. Assuming unshuffle always is applied _after_
416  // inflating the data (reversing the shuffle --> deflate process). It is
417  // possible that data could just be deflated or shuffled (because we
418  // have test data are use only shuffle). jhrg 1/20/17
419  // The file that implements the deflate filter is H5Zdeflate.c in the hdf5 source.
420  // The file that implements the shuffle filter is H5Zshuffle.c.
421 
422  if (d_is_inflated)
423  return;
424 
425  chunk_size *= elem_width;
426 
427  if (deflate) {
428  char *dest = new char[chunk_size];
429  try {
430  inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
431  // This replaces (and deletes) the original read_buffer with dest.
432  set_rbuf(dest, chunk_size);
433  }
434  catch (...) {
435  delete[] dest;
436  throw;
437  }
438  }
439 
440  if (shuffle) {
441  // The internal buffer is chunk's full size at this point.
442  char *dest = new char[get_rbuf_size()];
443  try {
444  unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
445  set_rbuf(dest, get_rbuf_size());
446  }
447  catch (...) {
448  delete[] dest;
449  throw;
450  }
451  }
452 
453  d_is_inflated = true;
454 
455 #if 0 // This was handy during development for debugging. Keep it for awhile (year or two) before we drop it ndp - 01/18/17
456  if(BESDebug::IsSet("dmrpp")) {
457  unsigned long long chunk_buf_size = get_rbuf_size();
458  dods_float32 *vals = (dods_float32 *) get_rbuf();
459  ostream *os = BESDebug::GetStrm();
460  (*os) << std::fixed << std::setfill('_') << std::setw(10) << std::setprecision(0);
461  (*os) << "DmrppArray::"<< __func__ <<"() - Chunk[" << i << "]: " << endl;
462  for(unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
463  (*os) << vals[k] << ", " << ((k==0)|((k+1)%10)?"":"\n");
464  }
465  }
466 #endif
467 }
468 
478 void Chunk::read_chunk()
479 {
480  if (d_is_read) {
481  BESDEBUG("dmrpp", "Chunk::"<< __func__ <<"() - Already been read! Returning." << endl);
482  return;
483  }
484 
485  set_rbuf_to_size();
486 
487  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(this);
488  if (!handle)
489  throw BESInternalError("No more libcurl handles.", __FILE__, __LINE__);
490 
491  handle->read_data(); // throws BESInternalError if error
492 
493  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
494 
495  // If the expected byte count was not read, it's an error.
496  if (get_size() != get_bytes_read()) {
497  ostringstream oss;
498  oss << "Wrong number of bytes read for chunk; read: " << get_bytes_read() << ", expected: " << get_size();
499  throw BESInternalError(oss.str(), __FILE__, __LINE__);
500  }
501 
502  d_is_read = true;
503 }
504 
515 void Chunk::dump(ostream &oss) const
516 {
517  oss << "Chunk";
518  oss << "[ptr='" << (void *)this << "']";
519  oss << "[data_url='" << d_data_url << "']";
520  oss << "[offset=" << d_offset << "]";
521  oss << "[size=" << d_size << "]";
522  oss << "[chunk_position_in_array=(";
523  for (unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
524  if (i) oss << ",";
525  oss << d_chunk_position_in_array[i];
526  }
527  oss << ")]";
528  oss << "[is_read=" << d_is_read << "]";
529  oss << "[is_inflated=" << d_is_inflated << "]";
530 }
531 
532 string Chunk::to_string() const
533 {
534  std::ostringstream oss;
535  dump(oss);
536  return oss.str();
537 }
538 
539 } // namespace dmrpp
540 
dmrpp::dmrpp_easy_handle
Bundle a libcurl easy handle to other information.
Definition: CurlHandlePool.h:61
BESContextManager::get_context
virtual string get_context(const string &name, bool &found)
retrieve the value of the specified context from the BES
Definition: BESContextManager.cc:73
dmrpp::dmrpp_easy_handle::read_data
void read_data()
This is the read_data() method for serial transfers.
Definition: CurlHandlePool.cc:274
BESDebug::IsSet
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:157
BESInternalError
exception thrown if inernal error encountered
Definition: BESInternalError.h:43
BESDebug::GetStrm
static std::ostream * GetStrm()
return the debug stream
Definition: BESDebug.h:176
BESError
Abstract exception class for the BES with basic string message.
Definition: BESError.h:58