42 #include <D4EnumDefs.h>
43 #include <D4Attributes.h>
48 #include "BESInternalError.h"
51 #include "CurlHandlePool.h"
53 #include "DmrppArray.h"
54 #include "DmrppRequestHandler.h"
57 static const string dmrpp_3 =
"dmrpp:3";
58 static const string dmrpp_4 =
"dmrpp:4";
65 void DmrppArray::_duplicate(
const DmrppArray &)
69 DmrppArray::DmrppArray(
const string &n, BaseType *v) :
70 Array(n, v, true ), DmrppCommon()
74 DmrppArray::DmrppArray(
const string &n,
const string &d, BaseType *v) :
75 Array(n, d, v, true), DmrppCommon()
80 DmrppArray::ptr_duplicate()
82 return new DmrppArray(*
this);
85 DmrppArray::DmrppArray(
const DmrppArray &rhs) :
86 Array(rhs), DmrppCommon(rhs)
92 DmrppArray::operator=(
const DmrppArray &rhs)
94 if (
this == &rhs)
return *
this;
96 dynamic_cast<Array &>(*
this) = rhs;
99 DmrppCommon::m_duplicate_common(rhs);
108 bool DmrppArray::is_projected()
110 for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
111 if (dimension_size(p,
true) != dimension_size(p,
false))
return true;
134 static unsigned long long get_index(
const vector<unsigned int> &address_in_target,
const vector<unsigned int> &target_shape)
136 assert(address_in_target.size() == target_shape.size());
138 vector<unsigned int>::const_reverse_iterator shape_index = target_shape.rbegin();
139 vector<unsigned int>::const_reverse_iterator index = address_in_target.rbegin(), index_end = address_in_target.rend();
141 unsigned long long multiplier = *shape_index++;
142 unsigned long long offset = *index++;
144 while (index != index_end) {
145 assert(*index < *shape_index);
147 offset += multiplier * *index++;
148 multiplier *= *shape_index++;
163 unsigned long long size = 1;
164 for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
165 size *= dimension_size(dim, constrained);
178 Dim_iter dim = dim_begin(), edim = dim_end();
179 vector<unsigned int> shape;
183 shape.reserve(edim - dim);
185 for (; dim != edim; dim++) {
186 shape.push_back(dimension_size(dim, constrained));
197 DmrppArray::dimension DmrppArray::get_dimension(
unsigned int i)
199 assert(i <= (dim_end() - dim_begin()));
200 return *(dim_begin() + i);
208 void DmrppArray::insert_constrained_contiguous(Dim_iter dimIter,
unsigned long *target_index, vector<unsigned int> &subsetAddress,
209 const vector<unsigned int> &array_shape,
char *src_buf)
211 BESDEBUG(
"dmrpp",
"DmrppArray::"<< __func__ <<
"() - subsetAddress.size(): " << subsetAddress.size() << endl);
213 unsigned int bytesPerElt = prototype()->width();
215 char *dest_buf = get_buf();
217 unsigned int start = this->dimension_start(dimIter,
true);
218 unsigned int stop = this->dimension_stop(dimIter,
true);
219 unsigned int stride = this->dimension_stride(dimIter,
true);
225 if (dimIter == dim_end() && stride == 1) {
227 subsetAddress.push_back(start);
228 unsigned long start_index = get_index(subsetAddress, array_shape);
229 subsetAddress.pop_back();
231 subsetAddress.push_back(stop);
232 unsigned long stop_index = get_index(subsetAddress, array_shape);
233 subsetAddress.pop_back();
237 for (
unsigned long sourceIndex = start_index; sourceIndex <= stop_index; sourceIndex++) {
238 unsigned long target_byte = *target_index * bytesPerElt;
239 unsigned long source_byte = sourceIndex * bytesPerElt;
241 for (
unsigned long i = 0; i < bytesPerElt; i++) {
242 dest_buf[target_byte++] = src_buf[source_byte++];
248 for (
unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
251 if (dimIter != dim_end()) {
254 subsetAddress.push_back(myDimIndex);
255 insert_constrained_contiguous(dimIter, target_index, subsetAddress, array_shape, src_buf);
256 subsetAddress.pop_back();
261 subsetAddress.push_back(myDimIndex);
262 unsigned int sourceIndex = get_index(subsetAddress, array_shape);
263 subsetAddress.pop_back();
266 unsigned long target_byte = *target_index * bytesPerElt;
267 unsigned long source_byte = sourceIndex * bytesPerElt;
269 for (
unsigned int i = 0; i < bytesPerElt; i++) {
270 dest_buf[target_byte++] = src_buf[source_byte++];
283 void DmrppArray::read_contiguous()
291 if (chunk_refs.size() != 1)
throw BESInternalError(
string(
"Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
293 Chunk &chunk = chunk_refs[0];
302 if (!is_projected()) {
303 val2buf(chunk.get_rbuf());
306 vector<unsigned int> array_shape =
get_shape(
false);
309 reserve_value_capacity(
get_size(
true));
310 unsigned long target_index = 0;
311 vector<unsigned int> subset;
313 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, chunk.get_rbuf());
331 unsigned long long DmrppArray::get_chunk_start(
const dimension &thisDim,
unsigned int chunk_origin)
334 unsigned long long first_element_offset = 0;
335 if ((
unsigned) (thisDim.start) < chunk_origin) {
337 if (thisDim.stride != 1) {
339 first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
341 if (first_element_offset != 0) {
343 first_element_offset = thisDim.stride - first_element_offset;
348 first_element_offset = thisDim.start - chunk_origin;
351 return first_element_offset;
354 #ifdef USE_READ_SERIAL
376 void DmrppArray::insert_chunk_serial(
unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
379 BESDEBUG(
"dmrpp", __func__ <<
" dim: "<< dim <<
" BEGIN "<< endl);
382 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
385 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
387 dimension thisDim = this->get_dimension(dim);
390 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (
unsigned) thisDim.stop < chunk_origin[dim]) {
395 unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
398 if (first_element_offset > chunk_shape[dim]) {
403 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
404 if ((
unsigned) thisDim.stop < end_element) {
405 end_element = thisDim.stop;
408 unsigned long long chunk_start = first_element_offset;
409 unsigned long long chunk_end = end_element - chunk_origin[dim];
410 vector<unsigned int> constrained_array_shape =
get_shape(
true);
412 unsigned int last_dim = chunk_shape.size() - 1;
413 if (dim == last_dim) {
419 char *source_buffer = chunk->get_rbuf();
420 char *target_buffer = get_buf();
421 unsigned int elem_width = prototype()->width();
423 if (thisDim.stride == 1) {
425 unsigned long long start_element = chunk_origin[dim] + first_element_offset;
427 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
430 (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
432 (*chunk_element_address)[dim] = first_element_offset;
434 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
435 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
437 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
441 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
443 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
446 (*chunk_element_address)[dim] = chunk_index;
448 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
449 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
451 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
457 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
458 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
459 (*chunk_element_address)[dim] = chunk_index;
462 insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
467 void DmrppArray::read_chunks_serial()
469 BESDEBUG(
"dmrpp", __func__ <<
" for variable '" << name() <<
"' - BEGIN" << endl);
472 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
475 reserve_value_capacity(
get_size(
true));
483 for (
unsigned long i = 0; i < chunk_refs.size(); i++) {
484 Chunk &chunk = chunk_refs[i];
486 vector<unsigned int> chunk_source_address(dimensions(), 0);
487 vector<unsigned int> target_element_address = chunk.get_position_in_array();
490 insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
495 BESDEBUG(
"dmrpp",
"DmrppArray::"<< __func__ <<
"() for " << name() <<
" END"<< endl);
521 DmrppArray::find_needed_chunks(
unsigned int dim, vector<unsigned int> *target_element_address, Chunk *chunk)
523 BESDEBUG(dmrpp_3, __func__ <<
" BEGIN, dim: " << dim << endl);
526 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
529 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
531 dimension thisDim = this->get_dimension(dim);
534 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (
unsigned) thisDim.stop < chunk_origin[dim]) {
539 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
542 if (chunk_start > chunk_shape[dim]) {
547 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
548 if ((
unsigned) thisDim.stop < end_element) {
549 end_element = thisDim.stop;
552 unsigned long long chunk_end = end_element - chunk_origin[dim];
554 unsigned int last_dim = chunk_shape.size() - 1;
555 if (dim == last_dim) {
560 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
561 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
564 Chunk *needed = find_needed_chunks(dim + 1, target_element_address, chunk);
565 if (needed)
return needed;
591 void DmrppArray::insert_chunk(
unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
592 Chunk *chunk,
const vector<unsigned int> &constrained_array_shape)
595 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
598 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
600 dimension thisDim = this->get_dimension(dim);
603 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
606 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
607 if ((
unsigned) thisDim.stop < end_element) {
608 end_element = thisDim.stop;
611 unsigned long long chunk_end = end_element - chunk_origin[dim];
613 unsigned int last_dim = chunk_shape.size() - 1;
614 if (dim == last_dim) {
615 char *source_buffer = chunk->get_rbuf();
616 char *target_buffer = get_buf();
617 unsigned int elem_width = prototype()->width();
619 if (thisDim.stride == 1) {
621 unsigned long long start_element = chunk_origin[dim] + chunk_start;
623 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
626 (*target_element_address)[dim] = (start_element - thisDim.start);
628 (*chunk_element_address)[dim] = chunk_start;
631 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
632 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
634 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
638 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
640 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
643 (*chunk_element_address)[dim] = chunk_index;
646 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
647 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
649 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
655 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
656 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
657 (*chunk_element_address)[dim] = chunk_index;
660 insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
671 void DmrppArray::read_chunks()
674 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
678 queue<Chunk *> chunks_to_read;
681 for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
684 vector<unsigned int> target_element_address = chunk.get_position_in_array();
685 Chunk *needed = find_needed_chunks(0 , &target_element_address, &chunk);
686 if (needed) chunks_to_read.push(needed);
689 reserve_value_capacity(
get_size(
true));
690 vector<unsigned int> constrained_array_shape =
get_shape(
true);
692 BESDEBUG(dmrpp_3,
"d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
693 BESDEBUG(dmrpp_3,
"d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
695 if (DmrppRequestHandler::d_use_parallel_transfers) {
698 unsigned int max_handles = DmrppRequestHandler::curl_handle_pool->get_max_handles();
699 dmrpp_multi_handle *mhandle = DmrppRequestHandler::curl_handle_pool->get_multi_handle();
702 while (chunks_to_read.size() > 0) {
703 queue<Chunk*> chunks_to_insert;
704 for (
unsigned int i = 0; i < max_handles && chunks_to_read.size() > 0; ++i) {
705 Chunk *chunk = chunks_to_read.front();
706 chunks_to_read.pop();
708 chunk->set_rbuf_to_size();
709 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(chunk);
710 if (!handle)
throw BESInternalError(
"No more libcurl handles.", __FILE__, __LINE__);
712 BESDEBUG(dmrpp_3,
"Queuing: " << chunk->to_string() << endl);
713 mhandle->add_easy_handle(handle);
715 chunks_to_insert.push(chunk);
718 mhandle->read_data();
720 while (chunks_to_insert.size() > 0) {
721 Chunk *chunk = chunks_to_insert.front();
722 chunks_to_insert.pop();
726 vector<unsigned int> target_element_address = chunk->get_position_in_array();
727 vector<unsigned int> chunk_source_address(dimensions(), 0);
729 BESDEBUG(dmrpp_3,
"Inserting: " << chunk->to_string() << endl);
730 insert_chunk(0 , &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
737 while (chunks_to_read.size() > 0) {
738 Chunk *chunk = chunks_to_read.front();
739 chunks_to_read.pop();
741 BESDEBUG(dmrpp_3,
"Reading: " << chunk->to_string() << endl);
746 vector<unsigned int> target_element_address = chunk->get_position_in_array();
747 vector<unsigned int> chunk_source_address(dimensions(), 0);
749 BESDEBUG(dmrpp_3,
"Inserting: " << chunk->to_string() << endl);
750 insert_chunk(0 , &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
770 static unsigned long multiplier(
const vector<unsigned int> &shape,
unsigned int k)
772 assert(shape.size() > 1);
773 assert(shape.size() > k + 1);
775 vector<unsigned int>::const_iterator i = shape.begin(), e = shape.end();
777 unsigned long multiplier = *i++;
804 void DmrppArray::insert_chunk_unconstrained(Chunk *chunk,
unsigned int dim,
unsigned long long array_offset,
const vector<unsigned int> &array_shape,
805 unsigned long long chunk_offset,
const vector<unsigned int> &chunk_shape,
const vector<unsigned int> &chunk_origin)
810 dimension thisDim = this->get_dimension(dim);
811 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
812 if ((
unsigned) thisDim.stop < end_element) {
813 end_element = thisDim.stop;
816 unsigned long long chunk_end = end_element - chunk_origin[dim];
818 unsigned int last_dim = chunk_shape.size() - 1;
819 if (dim == last_dim) {
820 unsigned int elem_width = prototype()->width();
822 array_offset += chunk_origin[dim];
825 unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
826 char *source_buffer = chunk->get_rbuf();
827 char *target_buffer = get_buf();
828 memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
831 unsigned long mc = multiplier(chunk_shape, dim);
832 unsigned long ma = multiplier(array_shape, dim);
835 for (
unsigned int chunk_index = 0 ; chunk_index <= chunk_end; ++chunk_index) {
836 unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
837 unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
840 insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape, chunk_origin);
853 void process_one_chunk_unconstrained(Chunk *chunk, DmrppArray *array,
const vector<unsigned int> &array_shape,
854 const vector<unsigned int> &chunk_shape)
858 if (array->is_deflate_compression() || array->is_shuffle_compression())
859 chunk->inflate_chunk(array->is_deflate_compression(), array->is_shuffle_compression(), array->get_chunk_size_in_elements(),
860 array->var()->width());
862 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
870 void *one_chunk_unconstrained_thread(
void *arg_list)
872 one_chunk_unconstrained_args *args = reinterpret_cast<one_chunk_unconstrained_args*>(arg_list);
875 process_one_chunk_unconstrained(args->chunk, args->array, args->array_shape, args->chunk_shape);
878 write(args->fds[1], &args->tid,
sizeof(args->tid));
880 pthread_exit(
new string(error.get_verbose_message()));
885 write(args->fds[1], &args->tid,
sizeof(args->tid));
891 void DmrppArray::read_chunks_unconstrained()
894 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
899 const vector<unsigned int> array_shape =
get_shape(
true);
901 const vector<unsigned int> chunk_shape = get_chunk_dimension_sizes();
903 BESDEBUG(dmrpp_3, __func__ << endl);
905 BESDEBUG(dmrpp_3,
"d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
906 BESDEBUG(dmrpp_3,
"d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
908 if (DmrppRequestHandler::d_use_parallel_transfers) {
909 queue<Chunk *> chunks_to_read;
912 for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c)
913 chunks_to_read.push(&(*c));
917 int status = pipe(fds);
919 throw BESInternalError(
string(
"Could not open a pipe for thread communication: ").append(strerror(errno)), __FILE__, __LINE__);
922 pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
925 for (
unsigned int i = 0; i < (
unsigned int)DmrppRequestHandler::d_max_parallel_transfers; ++i) {
926 memset(&threads[i], 0,
sizeof(pthread_t));
930 unsigned int num_threads = 0;
931 for (
unsigned int i = 0; i < (
unsigned int) DmrppRequestHandler::d_max_parallel_transfers && chunks_to_read.size() > 0; ++i) {
932 Chunk *chunk = chunks_to_read.front();
933 chunks_to_read.pop();
936 one_chunk_unconstrained_args *args =
new one_chunk_unconstrained_args(fds, i, chunk,
this, array_shape, chunk_shape);
937 int status = pthread_create(&threads[i], NULL, dmrpp::one_chunk_unconstrained_thread, (
void*) args);
940 BESDEBUG(dmrpp_3,
"started thread: " << i << endl);
943 ostringstream oss(
"Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
944 oss << i <<
": " << strerror(status);
945 BESDEBUG(dmrpp_3, oss.str());
951 while (num_threads > 0) {
954 int bytes =
::read(fds[0], &tid,
sizeof(tid));
955 if (bytes !=
sizeof(tid))
956 throw BESInternalError(
string(
"Could not read the thread id: ").append(strerror(errno)), __FILE__, __LINE__);
958 if (!(tid < DmrppRequestHandler::d_max_parallel_transfers)) {
959 ostringstream oss(
"Invalid thread id read after thread exit: ", std::ios::ate);
965 int status = pthread_join(threads[tid], (
void**)&error);
967 BESDEBUG(dmrpp_3,
"joined thread: " << (
unsigned int)tid <<
", there are: " << num_threads << endl);
970 ostringstream oss(
"Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
971 oss << tid <<
": " << strerror(status);
974 else if (error != 0) {
979 else if (chunks_to_read.size() > 0) {
980 Chunk *chunk = chunks_to_read.front();
981 chunks_to_read.pop();
984 one_chunk_unconstrained_args *args =
new one_chunk_unconstrained_args(fds, tid, chunk,
this, array_shape, chunk_shape);
985 int status = pthread_create(&threads[tid], NULL, dmrpp::one_chunk_unconstrained_thread, (
void*) args);
988 oss <<
"Could not start process_one_chunk_unconstrained thread for chunk " << tid <<
": " << strerror(status);
992 BESDEBUG(dmrpp_3,
"started thread: " << (
unsigned int)tid <<
", there are: " << threads << endl);
1003 join_threads(threads, DmrppRequestHandler::d_max_parallel_transfers);
1012 while (chunks_to_read.size() > 0) {
1015 pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
1016 unsigned int threads = 0;
1017 for (
unsigned int i = 0; i < (
unsigned int) DmrppRequestHandler::d_max_parallel_transfers && chunks_to_read.size() > 0; ++i) {
1018 Chunk *chunk = chunks_to_read.front();
1019 chunks_to_read.pop();
1021 one_chunk_unconstrained_args *args =
new one_chunk_unconstrained_args(fds, i, chunk,
this, array_shape, chunk_shape);
1023 int status = pthread_create(&threads[i], NULL, dmrpp::one_chunk_unconstrained_thread, (
void*) args);
1028 ostringstream oss(
"Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
1029 oss << i <<
": " << strerror(status);
1034 for (
unsigned int i = 0; i < threads; ++i) {
1036 int status = pthread_join(threads[i], (
void**) &error);
1038 ostringstream oss(
"Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
1039 oss << i <<
": " << strerror(status);
1042 else if (error != 0) {
1052 for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
1054 process_one_chunk_unconstrained(&chunk,
this, array_shape, chunk_shape);
1074 if (read_p())
return true;
1078 if (get_immutable_chunks().size() == 1 || get_chunk_dimension_sizes().empty()) {
1079 BESDEBUG(dmrpp_4,
"Calling read_contiguous() for " << name() << endl);
1083 if (!is_projected()) {
1084 BESDEBUG(dmrpp_4,
"Calling read_chunks_unconstrained() for " << name() << endl);
1085 read_chunks_unconstrained();
1088 BESDEBUG(dmrpp_4,
"Calling read_chunks() for " << name() << endl);
1099 class PrintD4ArrayDimXMLWriter:
public unary_function<Array::dimension&, void> {
1107 PrintD4ArrayDimXMLWriter(XMLWriter &xml,
bool c) :
1108 xml(xml), d_constrained(c)
1112 void operator()(Array::dimension &d)
1118 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar*)
"Dim") < 0)
1119 throw InternalErr(__FILE__, __LINE__,
"Could not write Dim element");
1121 string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1124 if (!d_constrained && !name.empty()) {
1125 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"name", (
const xmlChar*) name.c_str()) < 0)
1126 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1128 else if (d.use_sdim_for_slice) {
1129 assert(!name.empty());
1130 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"name", (
const xmlChar*) name.c_str()) < 0)
1131 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1135 size << (d_constrained ? d.c_size : d.size);
1136 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"size", (
const xmlChar*) size.str().c_str()) < 0)
1137 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1140 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
throw InternalErr(__FILE__, __LINE__,
"Could not end Dim element");
1144 class PrintD4ConstructorVarXMLWriter:
public unary_function<BaseType*, void> {
1148 PrintD4ConstructorVarXMLWriter(XMLWriter &xml,
bool c) :
1149 xml(xml), d_constrained(c)
1153 void operator()(BaseType *btp)
1155 btp->print_dap4(xml, d_constrained);
1159 class PrintD4MapXMLWriter:
public unary_function<D4Map*, void> {
1163 PrintD4MapXMLWriter(XMLWriter &xml) :
1168 void operator()(D4Map *m)
1200 if (constrained && !send_p())
return;
1202 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar*) var()->type_name().c_str()) < 0)
1203 throw InternalErr(__FILE__, __LINE__,
"Could not write " + type_name() +
" element");
1205 if (!name().empty())
1206 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"name", (
const xmlChar*) name().c_str()) < 0)
1207 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1210 if (var()->type() == dods_enum_c) {
1211 D4Enum *e = static_cast<D4Enum*>(var());
1212 string path = e->enumeration()->name();
1213 if (e->enumeration()->parent()) {
1215 path = static_cast<D4Group*>(e->enumeration()->parent()->parent())->FQN() + path;
1217 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"enum", (
const xmlChar*) path.c_str()) < 0)
1218 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for enum");
1221 if (prototype()->is_constructor_type()) {
1222 Constructor &c = static_cast<Constructor&>(*prototype());
1223 for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1228 for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1230 attributes()->print_dap4(xml);
1232 for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1238 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
throw InternalErr(__FILE__, __LINE__,
"Could not end " + type_name() +
" element");
1241 void DmrppArray::dump(ostream & strm)
const
1243 strm << BESIndent::LMarg <<
"DmrppArray::" << __func__ <<
"(" << (
void *)
this <<
")" << endl;
1244 BESIndent::Indent();
1245 DmrppCommon::dump(strm);
1247 strm << BESIndent::LMarg <<
"value: " <<
"----" << endl;
1248 BESIndent::UnIndent();