Bridge++  Version 1.4.4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
fieldIO_LIME_Parallel.cpp
Go to the documentation of this file.
1 
14 // this code only makes sense in MPI environment.
15 #ifdef USE_MPI
16 
17 #include "fieldIO_LIME_Parallel.h"
18 #include <list>
19 
20 using Bridge::vout;
21 
22 const std::string FieldIO_LIME_Parallel::class_name = "FieldIO_LIME_Parallel";
23 
24 //====================================================================
25 // private definitions
26 
27 namespace {
28 // ILDG metadata
29  const char ildg_metadata_template[] =
30  "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
31  "<ildgFormat xmlns=\"http://www.lqcd.org/ildg\"\n"
32  " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
33  " xsi:schemaLocation=\"http://www.lqcd.org/ildg http://www.lqcd.org/ildg/filefmt.xsd\">\n"
34  " <version> 1.0 </version>\n"
35  " <field> su3gauge </field>\n"
36  " <precision> %zu </precision>\n"
37  " <lx> %d </lx> <ly> %d </ly> <lz> %d </lz> <lt> %d </lt>\n"
38  "</ildgFormat>";
39 
40 // LIME header format
41 
42 #define LIME_MAGIC ((uint32_t)0x456789ab)
43 #define MB_MASK ((uint16_t)0x8000)
44 #define ME_MASK ((uint16_t)0x4000)
45 
46  struct LIME_header
47  {
48  uint32_t magic;
49  uint16_t version;
50  uint16_t bitfield;
51  uint64_t length;
52  char type[128];
53  };
54 
55 // local header info
56  struct LIME_record_info
57  {
58  uint64_t offset;
59  uint64_t length;
60  char type[128];
61  };
62 
63  typedef std::list<LIME_record_info> LIME_message_info;
64  typedef std::list<LIME_message_info> LIME_file_info;
65 
66  int traverse(MPI_File& fh, LIME_file_info& file_info);
67  int read_lime_header(MPI_File& fh, LIME_header& header);
68  int read_lime_content(MPI_File& fh, const MPI_Offset offset, char *buf, const size_t length);
69  int find_record_offset(const LIME_file_info& file_info, const char *type, MPI_Offset& pos);
70 
71  int report_file_info(const LIME_file_info& file_info);
72 
73  size_t write_lime_header(MPI_File& fh, const char *type, const size_t length, const uint16_t flag);
74  size_t write_lime_record(MPI_File& fh, const char *type, const char *content, const size_t length, const uint16_t flag);
75 
77 }
78 
79 //====================================================================
80 void FieldIO_LIME_Parallel::read_file(Field *u, string filename)
81 {
82  static const char _function_name[] = "FieldIO_LIME_Parallel::read_file";
83 
84  initialize();
85 
86  MPI_File fh;
87  int ret;
88 
89  int nin_file = m_format->nin();
90  int nex_file = m_format->nex();
91 
92  // Field::element_type *buf = new Field::element_type [m_nvol*m_nvector];
93  double *buf = new double [nin_file * nex_file * m_nvol];
94  if (!buf) {
95  vout.crucial(m_vl, "Error at %s: allocate buffer failed.\n", _function_name);
96  exit(EXIT_FAILURE);
97  }
98 
99 #ifdef USE_BGNET
100  ret = MPI_File_open(MPI_COMM_WORLD, const_cast<char *>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
101 #else
102  ret = MPI_File_open(Communicator_impl::world(), const_cast<char *>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
103 #endif
104  if (ret) {
105  vout.crucial(m_vl, "Error at %s: MPI_File_open failed.\n", _function_name);
106  exit(EXIT_FAILURE);
107  }
108 
109  MPI_Offset pos = 0;
110 
111  // process records
112  if (Communicator::is_primary()) {
113  LIME_file_info file_info;
114 
115  traverse(fh, file_info);
116 
117  report_file_info(file_info);
118 
119  if (!find_record_offset(file_info, "ildg-binary-data", pos)) {
120  vout.crucial(m_vl, "Error at %s: binary data record not found.\n", _function_name);
121  exit(EXIT_FAILURE);
122  }
123  }
124 
125  // It would be better to use MPI communicator here (even when BGNET is available),
126  // since the mixture of MPI/BGNET is buggy in BG/Q
127 #ifdef USE_BGNET
128  MPI_Bcast(&pos, sizeof(off_t), MPI_BYTE, 0, MPI_COMM_WORLD);
129 #else
130  Communicator::Base::broadcast(sizeof(off_t), &pos, 0);
131 #endif
132 
133  ret = MPI_File_set_view(fh, pos, m_type_vector, m_type_tiled, const_cast<char *>("native"), MPI_INFO_NULL);
134  if (ret) {
135  vout.crucial(m_vl, "Error at %s: MPI_File_set_view failed.\n", _function_name);
136  exit(EXIT_FAILURE);
137  }
138 
139  ret = MPI_File_read_all(fh, (void *)buf, m_nvol * nex_file, m_type_vector, MPI_STATUS_IGNORE);
140  if (ret) {
141  vout.crucial(m_vl, "Error at %s: MPI_File_read_all failed.\n", _function_name);
142  exit(EXIT_FAILURE);
143  }
144 
145  ret = MPI_File_close(&fh);
146  if (ret) {
147  vout.crucial(m_vl, "Error at %s: MPI_File_close failed.\n", _function_name);
148  exit(EXIT_FAILURE);
149  }
150 
151  if (!is_bigendian()) {
152 // convert_endian(buf, sizeof(Field::element_type), m_nvol*m_nvector);
153  convert_endian(buf, sizeof(double), m_nvol * nin_file * nex_file);
154  }
155 
156  // unpack buffer
157  double *p = buf;
158 
159  for (int j = 0; j < nex_file; ++j) {
160  for (int isite = 0; isite < m_nvol; ++isite) {
161  for (int i = 0; i < nin_file; ++i) {
162  int s, t;
163  m_format->file_to_field(s, t, i, j);
164 
165  u->set(s, isite, t, *p++);
166  }
167  }
168  }
169 
170  delete [] buf;
171 
172  finalize();
173 }
174 
175 
176 //====================================================================
177 void FieldIO_LIME_Parallel::write_file(Field *u, string filename)
178 {
179  static const char _function_name[] = "FieldIO_LIME_Parallel::write_file";
180 
181  initialize();
182 
183  int nin_file = m_format->nin();
184  int nex_file = m_format->nex();
185 
186  // Field::element_type *buf = new Field::element_type [m_nvol*m_nvector];
187  double *buf = new double [nin_file * nex_file * m_nvol];
188  if (!buf) {
189  vout.crucial(m_vl, "Error at %s: allocate buffer failed.\n", _function_name);
190  exit(EXIT_FAILURE);
191  }
192 
193  size_t data_length = sizeof(double) * nin_file * nex_file * CommonParameters::Lvol();
194 
195  // pack buffer
196  double *p = buf;
197 
198  for (int j = 0; j < nex_file; ++j) {
199  for (int isite = 0; isite < m_nvol; ++isite) {
200  for (int i = 0; i < nin_file; ++i) {
201  int s, t;
202  m_format->file_to_field(s, t, i, j);
203 
204  *p++ = u->cmp(s, isite, t);
205  }
206  }
207  }
208 
209  if (!is_bigendian()) {
210  // convert_endian(buf, sizeof(Field::element_type), m_nvol*m_nvector);
211  convert_endian(buf, sizeof(double), nin_file * nex_file * m_nvol);
212  }
213 
214  MPI_File fh;
215  int ret;
216 
217 #ifdef USE_BGNET
218  ret = MPI_File_open(MPI_COMM_WORLD, const_cast<char *>(filename.c_str()), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
219 #else
220  ret = MPI_File_open(Communicator_impl::world(), const_cast<char *>(filename.c_str()), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
221 #endif
222  if (ret) {
223  vout.crucial(m_vl, "Error at %s: MPI_File_open failed.\n", _function_name);
224  exit(EXIT_FAILURE);
225  }
226 
227  MPI_Offset pos = 0;
228 
229  if (Communicator::is_primary()) {
230  // metadata
231  char metadata[2048];
232  sprintf(metadata, ildg_metadata_template,
233  sizeof(double) * 8, /* bit width */
238 
239  pos += write_lime_record(fh, "ildg-format", metadata, strlen(metadata), MB_MASK);
240 
241  // content: write header
242  pos += write_lime_header(fh, "ildg-binary-data", data_length, ME_MASK);
243  }
244 
245  // It would be better to use MPI communicator here (even when BGNET is available),
246  // since the mixture of MPI/BGNET is buggy in BG/Q
247 #ifdef USE_BGNET
248  MPI_Bcast(&pos, sizeof(off_t), MPI_BYTE, 0, MPI_COMM_WORLD);
249 #else
250  Communicator::Base::broadcast(sizeof(off_t), &pos, 0);
251 #endif
252 
253  // content: write data
254 
255  ret = MPI_File_set_view(fh, pos, m_type_vector, m_type_tiled, const_cast<char *>("native"), MPI_INFO_NULL);
256  if (ret) {
257  vout.crucial(m_vl, "Error at %s: MPI_File_set_view failed.\n", _function_name);
258  exit(EXIT_FAILURE);
259  }
260 
261  ret = MPI_File_write_all(fh, (void *)buf, m_nvol * nex_file, m_type_vector, MPI_STATUS_IGNORE);
262  if (ret) {
263  vout.crucial(m_vl, "Error at %s: MPI_File_write_all failed.\n", _function_name);
264  exit(EXIT_FAILURE);
265  }
266 
267  // content: padding if needed
268  if (data_length % 8 > 0) {
269  size_t padding_size = (8 - data_length % 8) % 8;
270 
271  const char blank[8] = "";
272  ret = MPI_File_write_at(fh, pos + data_length, const_cast<char *>(blank), padding_size, MPI_BYTE, MPI_STATUS_IGNORE);
273  if (ret) {
274  vout.crucial(vl, "Error at %s: write padding failed.\n", _function_name);
275  exit(EXIT_FAILURE);
276  }
277 
278  vout.general(m_vl, "%s: padding %lu bytes added.\n", _function_name, padding_size);
279  }
280 
281  ret = MPI_File_close(&fh);
282  if (ret) {
283  vout.crucial(m_vl, "Error at %s: MPI_File_close failed.\n", _function_name);
284  exit(EXIT_FAILURE);
285  }
286 
287  delete [] buf;
288 
289  finalize();
290 }
291 
292 
293 //====================================================================
294 int FieldIO_LIME_Parallel::initialize()
295 {
296  static const char _function_name[] = "FieldIO_LIME_Parallel::initialize";
297 
298  // store verbose level to private parameter.
299  vl = m_vl;
300 
301  if (m_is_initialized) return EXIT_SUCCESS;
302 
303  int nin_file = m_format->nin();
304  int nex_file = m_format->nex();
305 
306  if (nin_file == 0 || nex_file == 0) {
307  vout.crucial(m_vl, "%s: incompatible file format.\n", _function_name);
308  exit(EXIT_FAILURE);
309  }
310 
311 
312  const int ndim = CommonParameters::Ndim();
313 
314  int *global_dims = new int[ndim];
315  global_dims[0] = CommonParameters::Lx();
316  global_dims[1] = CommonParameters::Ly();
317  global_dims[2] = CommonParameters::Lz();
318  global_dims[3] = CommonParameters::Lt();
319 
320  int *local_dims = new int[ndim];
321  local_dims[0] = CommonParameters::Nx();
322  local_dims[1] = CommonParameters::Ny();
323  local_dims[2] = CommonParameters::Nz();
324  local_dims[3] = CommonParameters::Nt();
325 
326  m_nvol = 1;
327  for (int i = 0; i < ndim; ++i) {
328  m_nvol *= local_dims[i];
329  }
330 
331  int *grid_pos = new int[ndim];
332  for (int i = 0; i < ndim; ++i) {
333  grid_pos[i] = Communicator::ipe(i);
334  }
335 
336  int *starts = new int[ndim];
337  for (int i = 0; i < ndim; ++i) {
338  starts[i] = local_dims[i] * grid_pos[i];
339  }
340 
341  int ret = 0;
342 
343 // MPI_Datatype m_type_vector;
344 // ret = MPI_Type_contiguous(sizeof(Field::element_type)*nin_file, MPI_BYTE, &m_type_vector);
345  ret = MPI_Type_contiguous(sizeof(double) * nin_file, MPI_BYTE, &m_type_vector);
346  if (ret) {
347  vout.general(m_vl, "%s: MPI_Type_Contiguous failed.\n", _function_name);
348  exit(EXIT_FAILURE);
349  }
350 
351  ret = MPI_Type_commit(&m_type_vector);
352  if (ret) {
353  vout.general(m_vl, "%s: MPI_Type_commit failed.\n", _function_name);
354  exit(EXIT_FAILURE);
355  }
356 
357 // MPI_Datatype m_type_tiled;
358  ret = MPI_Type_create_subarray(ndim, global_dims, local_dims, starts, MPI_ORDER_FORTRAN, m_type_vector, &m_type_tiled);
359  if (ret) {
360  vout.general(m_vl, "%s: MPI_Type_create_subarray failed.\n", _function_name);
361  exit(EXIT_FAILURE);
362  }
363 
364  ret = MPI_Type_commit(&m_type_tiled);
365  if (ret) {
366  vout.general(m_vl, "%s: MPI_Type_commit failed.\n", _function_name);
367  exit(EXIT_FAILURE);
368  }
369 
370  m_is_initialized = true;
371 
372  delete [] starts;
373  delete [] grid_pos;
374  delete [] local_dims;
375  delete [] global_dims;
376 
377  vout.detailed(m_vl, "FieldIO_LIME_Parallel via MPI I/O initialize done.\n");
378 
379  return EXIT_SUCCESS;
380 }
381 
382 
383 //====================================================================
384 int FieldIO_LIME_Parallel::finalize()
385 {
386  static const char _function_name[] = "FieldIO_LIME_Parallel::finalize";
387 
388  if (!m_is_initialized) return EXIT_SUCCESS;
389 
390  int ret;
391 
392  ret = MPI_Type_free(&m_type_tiled);
393  if (ret) {
394  vout.general(m_vl, "%s: MPI_Type_free failed.\n", _function_name);
395  exit(EXIT_FAILURE);
396  }
397 
398  ret = MPI_Type_free(&m_type_vector);
399  if (ret) {
400  vout.general(m_vl, "%s: MPI_Type_free failed.\n", _function_name);
401  exit(EXIT_FAILURE);
402  }
403 
404  m_is_initialized = false;
405 
406  vout.detailed(m_vl, "%s via MPI I/O finalize done.\n", class_name.c_str());
407 
408  return EXIT_SUCCESS;
409 }
410 
411 
412 //====================================================================
413 namespace {
414 //--------------------------------------------------------------------
415  int read_lime_header(MPI_File& fh, LIME_header& header)
416  {
417  MPI_Status status;
418 
419  int ret = MPI_File_read(fh, (void *)&header, sizeof(LIME_header), MPI_BYTE, &status);
420 
421  int count;
422 
423  MPI_Get_count(&status, MPI_BYTE, &count);
424  if (count != sizeof(LIME_header)) { // data length short. end of file.
425  return 1;
426  }
427 
428  if (ret) {
429  vout.crucial(vl, "%s: io error.\n", __func__);
430  return -1;
431  }
432 
433  if (FieldIO::is_bigendian() == false) {
434  FieldIO::convert_endian(&header.magic, 4, 1);
435  FieldIO::convert_endian(&header.version, 2, 1);
436  FieldIO::convert_endian(&header.bitfield, 2, 1);
437  FieldIO::convert_endian(&header.length, 8, 1);
438  }
439 
440  if (header.magic != LIME_MAGIC) {
441  vout.crucial(vl, "not lime header.\n");
442  return -1;
443  }
444 
445  return 0;
446  }
447 
448 
449 //====================================================================
450  int traverse(MPI_File& fh, LIME_file_info& file_info)
451  {
452  // go to the beginning of the file
453  MPI_File_seek(fh, 0, MPI_SEEK_SET);
454 
455  MPI_Offset pos = 0;
456 
457  LIME_message_info message_info;
458 
459  while (true)
460  {
461  LIME_header header;
462  int stat = read_lime_header(fh, header);
463 
464 // if (stat == -1) { // bad input
465 // return -1;
466 // } else if (stat == 1) { // end of file
467 // break;
468 // } else if (stat != 0) {
469 // // unknown status.
470 // return -1;
471 // }
472 
473 // vout.detailed(vl, "read_lime_header: stat = %d, pos = %lu\n", stat, pos);
474 
475  if (stat != 0) {
476  break;
477  }
478 
479  // read ok
480  pos += sizeof(LIME_header);
481 
482  LIME_record_info record_info;
483 
484  memcpy((void *)&record_info, (void *)&header, sizeof(LIME_record_info));
485  record_info.offset = pos;
486 
487  // padding (0-7)
488  size_t padding_size = (8 - header.length % 8) % 8;
489 
490  // seek to next record
491  // MPI_File_seek(fh, header.length + padding_size, MPI_SEEK_CUR);
492  // pos += header.length + padding_size;
493 
494  // *** workaround for openmpi-2.x
495  pos += header.length + padding_size;
496  MPI_File_seek(fh, pos, MPI_SEEK_SET);
497 
498  // store record info
499  if ((header.bitfield & MB_MASK) == MB_MASK) {
500  message_info.clear();
501  }
502 
503  message_info.push_back(record_info);
504 
505  if ((header.bitfield & ME_MASK) == ME_MASK) {
506  file_info.push_back(message_info);
507 // message_info.clear();
508  }
509  }
510 
511  return 0;
512  }
513 
514 
515 //====================================================================
516  int find_record_offset(const LIME_file_info& file_info, const char *type, MPI_Offset& pos)
517  {
518  bool is_found = false;
519 
520  for (LIME_file_info::const_iterator p = file_info.begin(); p != file_info.end(); ++p) {
521  for (LIME_message_info::const_iterator q = p->begin(); q != p->end(); ++q) {
522  if (strncmp(q->type, type, strlen(type)) == 0) {
523  is_found = true;
524  pos = q->offset;
525  break;
526  }
527  }
528  }
529 
530  return is_found ? 1 : 0;
531  }
532 
533 
534 //====================================================================
535  int read_record_content(MPI_File& fh, const LIME_file_info& file_info, const char *type, std::string& content)
536  {
537  bool is_found = false;
538  LIME_record_info info;
539 
540  for (LIME_file_info::const_iterator p = file_info.begin(); p != file_info.end(); ++p) {
541  for (LIME_message_info::const_iterator q = p->begin(); q != p->end(); ++q) {
542  if (strncmp(q->type, type, strlen(type)) == 0) {
543  is_found = true;
544  info = *q;
545  break;
546  }
547  }
548  }
549 
550  if (!is_found) {
551  return 0;
552  }
553 
554  MPI_Status status;
555  char *buf = new char [info.length + 1];
556  MPI_File_read_at(fh, info.offset, buf, info.length, MPI_BYTE, &status);
557 
558  int count;
559  MPI_Get_count(&status, MPI_BYTE, &count);
560 
561  if (count != info.length) {
562  vout.crucial(vl, "Error at %s: read error. content length mismatch.\n", __func__);
563  exit(EXIT_FAILURE);
564  }
565 
566  content = std::string(buf);
567 
568  return 1;
569  }
570 
571 
572 //====================================================================
573  int report_file_info(const LIME_file_info& file_info)
574  {
575  Bridge::VerboseLevel vlo = vl;
576 
578 
579  for (LIME_file_info::const_iterator p = file_info.begin(); p != file_info.end(); ++p) {
580  vout.detailed(vl, "Message:\n");
581 
582  for (LIME_message_info::const_iterator q = p->begin(); q != p->end(); ++q) {
583  vout.detailed(vl, "\tRecord:\n");
584  vout.detailed(vl, "\t\toffset = %lu\n", q->offset);
585  vout.detailed(vl, "\t\tsize = %lu\n", q->length);
586  vout.detailed(vl, "\t\ttype = %s\n", q->type);
587  }
588  }
589 
590  vl = vlo;
591 
592  return 0;
593  }
594 
595 
596 //====================================================================
597  size_t write_lime_header(MPI_File& fh, const char *type, const size_t length, const uint16_t flag)
598  {
599  LIME_header header;
600 
601  memset(&header, 0, sizeof(LIME_header));
602 
603  header.magic = LIME_MAGIC;
604  header.version = (uint16_t)1;
605  header.bitfield = flag;
606  strncpy(header.type, type, 128);
607  header.length = length;
608 
609  if (FieldIO::is_bigendian() == false) {
610  FieldIO::convert_endian(&header.magic, 4, 1);
611  FieldIO::convert_endian(&header.version, 2, 1);
612  FieldIO::convert_endian(&header.bitfield, 2, 1);
613  FieldIO::convert_endian(&header.length, 8, 1);
614  }
615 
616  MPI_Status status;
617  int ret = MPI_File_write(fh, (void *)&header, sizeof(LIME_header), MPI_BYTE, &status);
618 
619  if (ret) {
620  vout.crucial(vl, "%s: write header failed.\n", __func__);
621  return 0;
622  }
623 
624  return sizeof(LIME_header); // length written.
625  }
626 
627 
628 //====================================================================
629  size_t write_lime_record(MPI_File& fh, const char *type, const char *content, const size_t length, const uint16_t flag)
630  {
631  const char blank[8] = "";
632 
633  if (write_lime_header(fh, type, length, flag) == 0) {
634  return 0;
635  }
636 
637  size_t padding_size = (8 - length % 8) % 8;
638 
639  MPI_Status status;
640  int ret = MPI_File_write(fh, const_cast<char *>(content), length, MPI_BYTE, &status);
641  if (ret) {
642  vout.crucial(vl, "%s: write content failed.\n", __func__);
643  return 0;
644  }
645 
646  if (padding_size > 0) {
647  ret = MPI_File_write(fh, const_cast<char *>(blank), padding_size, MPI_BYTE, &status);
648  if (ret) {
649  vout.crucial(vl, "%s: write padding failed.\n", __func__);
650  return 0;
651  }
652  }
653 
654  return sizeof(LIME_header) + length + padding_size;
655  }
656 
657 
658 //--------------------------------------------------------------------
659 } // unnamed namespace
660 #endif /* USE_MPI */
661 
662 //====================================================================
663 //============================================================END=====
BridgeIO vout
Definition: bridgeIO.cpp:495
static const std::string class_name
Definition: fieldIO.h:56
void detailed(const char *format,...)
Definition: bridgeIO.cpp:212
void set(const int jin, const int site, const int jex, double v)
Definition: field.h:164
void general(const char *format,...)
Definition: bridgeIO.cpp:195
virtual void file_to_field(int &s, int &t, const int i, const int j) const =0
virtual int nex() const =0
Container of Field-type object.
Definition: field.h:39
virtual int nin() const =0
double cmp(const int jin, const int site, const int jex) const
Definition: field.h:132
void read_file(Field *v, std::string filename)
read data from file.
static int broadcast(size_t size, void *data, int sender)
static int ipe(const int dir)
logical coordinate of current proc.
static int Lvol()
static bool is_bigendian()
Definition: fieldIO.cpp:203
void write_file(Field *v, std::string filename)
write data to file.
const IO_Format::Format * m_format
Definition: fieldIO.h:62
void crucial(const char *format,...)
Definition: bridgeIO.cpp:178
Bridge::VerboseLevel vl
Definition: checker.cpp:18
VerboseLevel
Definition: bridgeIO.h:42
static void convert_endian(void *buf, size_t size, size_t nmemb)
check if machine byte order is big-endian.
Definition: fieldIO.cpp:225
static bool is_primary()
check if the present node is primary in small communicator.
Bridge::VerboseLevel m_vl
Definition: fieldIO.h:64