Bridge++  Version 1.5.4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
fieldIO_LIME_Parallel.cpp
Go to the documentation of this file.
1 
14 // this code only makes sense in MPI environment.
15 #ifdef USE_MPI
16 
17 #include "fieldIO_LIME_Parallel.h"
18 #include <list>
19 
20 using Bridge::vout;
21 
22 const std::string FieldIO_LIME_Parallel::class_name = "FieldIO_LIME_Parallel";
23 
24 //====================================================================
25 // private definitions
26 
27 namespace {
28 // ILDG metadata
29  const char ildg_metadata_template[] =
30  "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
31  "<ildgFormat xmlns=\"http://www.lqcd.org/ildg\"\n"
32  " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
33  " xsi:schemaLocation=\"http://www.lqcd.org/ildg http://www.lqcd.org/ildg/filefmt.xsd\">\n"
34  " <version> 1.0 </version>\n"
35  " <field> su3gauge </field>\n"
36  " <precision> %zu </precision>\n"
37  " <lx> %d </lx> <ly> %d </ly> <lz> %d </lz> <lt> %d </lt>\n"
38  "</ildgFormat>";
39 
40 // LIME header format
41 
42 #define LIME_MAGIC ((uint32_t)0x456789ab)
43 #define MB_MASK ((uint16_t)0x8000)
44 #define ME_MASK ((uint16_t)0x4000)
45 
46  struct LIME_header
47  {
48  uint32_t magic;
49  uint16_t version;
50  uint16_t bitfield;
51  uint64_t length;
52  char type[128];
53  };
54 
55 // local header info
56  struct LIME_record_info
57  {
58  uint64_t offset;
59  uint64_t length;
60  char type[128];
61  };
62 
63  typedef std::list<LIME_record_info> LIME_message_info;
64  typedef std::list<LIME_message_info> LIME_file_info;
65 
66  int traverse(MPI_File& fh, LIME_file_info& file_info);
67  int read_lime_header(MPI_File& fh, LIME_header& header);
68  int read_lime_content(MPI_File& fh, const MPI_Offset offset, char *buf, const size_t length);
69  int find_record_offset(const LIME_file_info& file_info, const char *type, MPI_Offset& pos);
70 
71  int report_file_info(const LIME_file_info& file_info);
72 
73  size_t write_lime_header(MPI_File& fh, const char *type, const size_t length, const uint16_t flag);
74  size_t write_lime_record(MPI_File& fh, const char *type, const char *content, const size_t length, const uint16_t flag);
75 
77 }
78 
79 //====================================================================
80 void FieldIO_LIME_Parallel::read_file(Field *u, const std::string filename)
81 {
82  static const char _function_name[] = "FieldIO_LIME_Parallel::read_file";
83 
84  initialize();
85 
86  MPI_File fh;
87  int ret;
88 
89  const int nin_file = m_format->nin();
90  const int nex_file = m_format->nex();
91 
92  // Field::element_type *buf = new Field::element_type [m_nvol*m_nvector];
93  double *buf = new double [nin_file * nex_file * m_nvol];
94  if (!buf) {
95  vout.crucial(m_vl, "Error at %s: allocate buffer failed.\n", _function_name);
96  exit(EXIT_FAILURE);
97  }
98 
99  ret = MPI_File_open(Communicator_impl::world(), const_cast<char *>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
100  if (ret) {
101  vout.crucial(m_vl, "Error at %s: MPI_File_open failed.\n", _function_name);
102  exit(EXIT_FAILURE);
103  }
104 
105  MPI_Offset pos = 0;
106 
107  // process records
108  if (Communicator::is_primary()) {
109  LIME_file_info file_info;
110 
111  traverse(fh, file_info);
112 
113  report_file_info(file_info);
114 
115  if (!find_record_offset(file_info, "ildg-binary-data", pos)) {
116  vout.crucial(m_vl, "Error at %s: binary data record not found.\n", _function_name);
117  exit(EXIT_FAILURE);
118  }
119  }
120 
121  Communicator::Base::broadcast(sizeof(off_t), &pos, 0);
122 
123  ret = MPI_File_set_view(fh, pos, m_type_vector, m_type_tiled, const_cast<char *>("native"), MPI_INFO_NULL);
124  if (ret) {
125  vout.crucial(m_vl, "Error at %s: MPI_File_set_view failed.\n", _function_name);
126  exit(EXIT_FAILURE);
127  }
128 
129  ret = MPI_File_read_all(fh, (void *)buf, m_nvol * nex_file, m_type_vector, MPI_STATUS_IGNORE);
130  if (ret) {
131  vout.crucial(m_vl, "Error at %s: MPI_File_read_all failed.\n", _function_name);
132  exit(EXIT_FAILURE);
133  }
134 
135  ret = MPI_File_close(&fh);
136  if (ret) {
137  vout.crucial(m_vl, "Error at %s: MPI_File_close failed.\n", _function_name);
138  exit(EXIT_FAILURE);
139  }
140 
141  if (!is_bigendian()) {
142 // convert_endian(buf, sizeof(Field::element_type), m_nvol*m_nvector);
143  convert_endian(buf, sizeof(double), m_nvol * nin_file * nex_file);
144  }
145 
146  // unpack buffer
147  double *p = buf;
148 
149  for (int j = 0; j < nex_file; ++j) {
150  for (int isite = 0; isite < m_nvol; ++isite) {
151  for (int i = 0; i < nin_file; ++i) {
152  int s, t;
153  m_format->file_to_field(s, t, i, j);
154 
155  u->set(s, isite, t, *p++);
156  }
157  }
158  }
159 
160  delete [] buf;
161 
162  finalize();
163 }
164 
165 
166 //====================================================================
167 void FieldIO_LIME_Parallel::write_file(Field *u, const std::string filename)
168 {
169  static const char _function_name[] = "FieldIO_LIME_Parallel::write_file";
170 
171  initialize();
172 
173  const int nin_file = m_format->nin();
174  const int nex_file = m_format->nex();
175 
176  // Field::element_type *buf = new Field::element_type [m_nvol*m_nvector];
177  double *buf = new double [nin_file * nex_file * m_nvol];
178  if (!buf) {
179  vout.crucial(m_vl, "Error at %s: allocate buffer failed.\n", _function_name);
180  exit(EXIT_FAILURE);
181  }
182 
183  size_t data_length = sizeof(double) * nin_file * nex_file * CommonParameters::Lvol();
184 
185  // pack buffer
186  double *p = buf;
187 
188  for (int j = 0; j < nex_file; ++j) {
189  for (int isite = 0; isite < m_nvol; ++isite) {
190  for (int i = 0; i < nin_file; ++i) {
191  int s, t;
192  m_format->file_to_field(s, t, i, j);
193 
194  *p++ = u->cmp(s, isite, t);
195  }
196  }
197  }
198 
199  if (!is_bigendian()) {
200  // convert_endian(buf, sizeof(Field::element_type), m_nvol*m_nvector);
201  convert_endian(buf, sizeof(double), nin_file * nex_file * m_nvol);
202  }
203 
204  MPI_File fh;
205  int ret;
206 
207  ret = MPI_File_open(Communicator_impl::world(), const_cast<char *>(filename.c_str()), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
208  if (ret) {
209  vout.crucial(m_vl, "Error at %s: MPI_File_open failed.\n", _function_name);
210  exit(EXIT_FAILURE);
211  }
212 
213  MPI_Offset pos = 0;
214 
215  if (Communicator::is_primary()) {
216  // metadata
217  char metadata[2048];
218  sprintf(metadata, ildg_metadata_template,
219  sizeof(double) * 8, /* bit width */
224 
225  pos += write_lime_record(fh, "ildg-format", metadata, strlen(metadata), MB_MASK);
226 
227  // content: write header
228  pos += write_lime_header(fh, "ildg-binary-data", data_length, ME_MASK);
229  }
230 
231  Communicator::Base::broadcast(sizeof(off_t), &pos, 0);
232 
233  // content: write data
234 
235  ret = MPI_File_set_view(fh, pos, m_type_vector, m_type_tiled, const_cast<char *>("native"), MPI_INFO_NULL);
236  if (ret) {
237  vout.crucial(m_vl, "Error at %s: MPI_File_set_view failed.\n", _function_name);
238  exit(EXIT_FAILURE);
239  }
240 
241  ret = MPI_File_write_all(fh, (void *)buf, m_nvol * nex_file, m_type_vector, MPI_STATUS_IGNORE);
242  if (ret) {
243  vout.crucial(m_vl, "Error at %s: MPI_File_write_all failed.\n", _function_name);
244  exit(EXIT_FAILURE);
245  }
246 
247  // content: padding if needed
248  if (data_length % 8 > 0) {
249  size_t padding_size = (8 - data_length % 8) % 8;
250 
251  const char blank[8] = "";
252  ret = MPI_File_write_at(fh, pos + data_length, const_cast<char *>(blank), padding_size, MPI_BYTE, MPI_STATUS_IGNORE);
253  if (ret) {
254  vout.crucial(vl, "Error at %s: write padding failed.\n", _function_name);
255  exit(EXIT_FAILURE);
256  }
257 
258  vout.general(m_vl, "%s: padding %lu bytes added.\n", _function_name, padding_size);
259  }
260 
261  ret = MPI_File_close(&fh);
262  if (ret) {
263  vout.crucial(m_vl, "Error at %s: MPI_File_close failed.\n", _function_name);
264  exit(EXIT_FAILURE);
265  }
266 
267  delete [] buf;
268 
269  finalize();
270 }
271 
272 
273 //====================================================================
274 int FieldIO_LIME_Parallel::initialize()
275 {
276  static const char _function_name[] = "FieldIO_LIME_Parallel::initialize";
277 
278  // store verbose level to private parameter.
279  vl = m_vl;
280 
281  if (m_is_initialized) return EXIT_SUCCESS;
282 
283  const int nin_file = m_format->nin();
284  const int nex_file = m_format->nex();
285 
286  if ((nin_file == 0) || (nex_file == 0)) {
287  vout.crucial(m_vl, "%s: incompatible file format.\n", _function_name);
288  exit(EXIT_FAILURE);
289  }
290 
291 
292  const int ndim = CommonParameters::Ndim();
293 
294  int *global_dims = new int[ndim];
295  global_dims[0] = CommonParameters::Lx();
296  global_dims[1] = CommonParameters::Ly();
297  global_dims[2] = CommonParameters::Lz();
298  global_dims[3] = CommonParameters::Lt();
299 
300  int *local_dims = new int[ndim];
301  local_dims[0] = CommonParameters::Nx();
302  local_dims[1] = CommonParameters::Ny();
303  local_dims[2] = CommonParameters::Nz();
304  local_dims[3] = CommonParameters::Nt();
305 
306  m_nvol = 1;
307  for (int i = 0; i < ndim; ++i) {
308  m_nvol *= local_dims[i];
309  }
310 
311  int *grid_pos = new int[ndim];
312  for (int i = 0; i < ndim; ++i) {
313  grid_pos[i] = Communicator::ipe(i);
314  }
315 
316  int *starts = new int[ndim];
317  for (int i = 0; i < ndim; ++i) {
318  starts[i] = local_dims[i] * grid_pos[i];
319  }
320 
321  int ret = 0;
322 
323 // MPI_Datatype m_type_vector;
324 // ret = MPI_Type_contiguous(sizeof(Field::element_type)*nin_file, MPI_BYTE, &m_type_vector);
325  ret = MPI_Type_contiguous(sizeof(double) * nin_file, MPI_BYTE, &m_type_vector);
326  if (ret) {
327  vout.general(m_vl, "%s: MPI_Type_Contiguous failed.\n", _function_name);
328  exit(EXIT_FAILURE);
329  }
330 
331  ret = MPI_Type_commit(&m_type_vector);
332  if (ret) {
333  vout.general(m_vl, "%s: MPI_Type_commit failed.\n", _function_name);
334  exit(EXIT_FAILURE);
335  }
336 
337 // MPI_Datatype m_type_tiled;
338  ret = MPI_Type_create_subarray(ndim, global_dims, local_dims, starts, MPI_ORDER_FORTRAN, m_type_vector, &m_type_tiled);
339  if (ret) {
340  vout.general(m_vl, "%s: MPI_Type_create_subarray failed.\n", _function_name);
341  exit(EXIT_FAILURE);
342  }
343 
344  ret = MPI_Type_commit(&m_type_tiled);
345  if (ret) {
346  vout.general(m_vl, "%s: MPI_Type_commit failed.\n", _function_name);
347  exit(EXIT_FAILURE);
348  }
349 
350  m_is_initialized = true;
351 
352  delete [] starts;
353  delete [] grid_pos;
354  delete [] local_dims;
355  delete [] global_dims;
356 
357  vout.detailed(m_vl, "FieldIO_LIME_Parallel via MPI I/O initialize done.\n");
358 
359  return EXIT_SUCCESS;
360 }
361 
362 
363 //====================================================================
364 int FieldIO_LIME_Parallel::finalize()
365 {
366  static const char _function_name[] = "FieldIO_LIME_Parallel::finalize";
367 
368  if (!m_is_initialized) return EXIT_SUCCESS;
369 
370  int ret;
371 
372  ret = MPI_Type_free(&m_type_tiled);
373  if (ret) {
374  vout.general(m_vl, "%s: MPI_Type_free failed.\n", _function_name);
375  exit(EXIT_FAILURE);
376  }
377 
378  ret = MPI_Type_free(&m_type_vector);
379  if (ret) {
380  vout.general(m_vl, "%s: MPI_Type_free failed.\n", _function_name);
381  exit(EXIT_FAILURE);
382  }
383 
384  m_is_initialized = false;
385 
386  vout.detailed(m_vl, "%s via MPI I/O finalize done.\n", class_name.c_str());
387 
388  return EXIT_SUCCESS;
389 }
390 
391 
392 //====================================================================
393 namespace {
394 //--------------------------------------------------------------------
395  int read_lime_header(MPI_File& fh, LIME_header& header)
396  {
397  MPI_Status status;
398 
399  int ret = MPI_File_read(fh, (void *)&header, sizeof(LIME_header), MPI_BYTE, &status);
400 
401  int count;
402 
403  MPI_Get_count(&status, MPI_BYTE, &count);
404  if (count != sizeof(LIME_header)) { // data length short. end of file.
405  return 1;
406  }
407 
408  if (ret) {
409  vout.crucial(vl, "%s: io error.\n", __func__);
410  return -1;
411  }
412 
413  if (FieldIO::is_bigendian() == false) {
414  FieldIO::convert_endian(&header.magic, 4, 1);
415  FieldIO::convert_endian(&header.version, 2, 1);
416  FieldIO::convert_endian(&header.bitfield, 2, 1);
417  FieldIO::convert_endian(&header.length, 8, 1);
418  }
419 
420  if (header.magic != LIME_MAGIC) {
421  vout.crucial(vl, "not lime header.\n");
422  return -1;
423  }
424 
425  return 0;
426  }
427 
428 
429 //====================================================================
430  int traverse(MPI_File& fh, LIME_file_info& file_info)
431  {
432  // go to the beginning of the file
433  MPI_File_seek(fh, 0, MPI_SEEK_SET);
434 
435  MPI_Offset pos = 0;
436 
437  LIME_message_info message_info;
438 
439  while (true)
440  {
441  LIME_header header;
442  int stat = read_lime_header(fh, header);
443 
444 // if (stat == -1) { // bad input
445 // return -1;
446 // } else if (stat == 1) { // end of file
447 // break;
448 // } else if (stat != 0) {
449 // // unknown status.
450 // return -1;
451 // }
452 
453 // vout.detailed(vl, "read_lime_header: stat = %d, pos = %lu\n", stat, pos);
454 
455  if (stat != 0) {
456  break;
457  }
458 
459  // read ok
460  pos += sizeof(LIME_header);
461 
462  LIME_record_info record_info;
463 
464  memcpy((void *)&record_info, (void *)&header, sizeof(LIME_record_info));
465  record_info.offset = pos;
466 
467  // padding (0-7)
468  size_t padding_size = (8 - header.length % 8) % 8;
469 
470  // seek to next record
471  // MPI_File_seek(fh, header.length + padding_size, MPI_SEEK_CUR);
472  // pos += header.length + padding_size;
473 
474  // *** workaround for openmpi-2.x
475  pos += header.length + padding_size;
476  MPI_File_seek(fh, pos, MPI_SEEK_SET);
477 
478  // store record info
479  if ((header.bitfield & MB_MASK) == MB_MASK) {
480  message_info.clear();
481  }
482 
483  message_info.push_back(record_info);
484 
485  if ((header.bitfield & ME_MASK) == ME_MASK) {
486  file_info.push_back(message_info);
487 // message_info.clear();
488  }
489  }
490 
491  return 0;
492  }
493 
494 
495 //====================================================================
496  int find_record_offset(const LIME_file_info& file_info, const char *type, MPI_Offset& pos)
497  {
498  bool is_found = false;
499 
500  for (LIME_file_info::const_iterator p = file_info.begin(); p != file_info.end(); ++p) {
501  for (LIME_message_info::const_iterator q = p->begin(); q != p->end(); ++q) {
502  if (strncmp(q->type, type, strlen(type)) == 0) {
503  is_found = true;
504  pos = q->offset;
505  break;
506  }
507  }
508  }
509 
510  return is_found ? 1 : 0;
511  }
512 
513 
514 //====================================================================
515  int read_record_content(MPI_File& fh, const LIME_file_info& file_info, const char *type, std::string& content)
516  {
517  bool is_found = false;
518  LIME_record_info info;
519 
520  for (LIME_file_info::const_iterator p = file_info.begin(); p != file_info.end(); ++p) {
521  for (LIME_message_info::const_iterator q = p->begin(); q != p->end(); ++q) {
522  if (strncmp(q->type, type, strlen(type)) == 0) {
523  is_found = true;
524  info = *q;
525  break;
526  }
527  }
528  }
529 
530  if (!is_found) {
531  return 0;
532  }
533 
534  MPI_Status status;
535  char *buf = new char [info.length + 1];
536  MPI_File_read_at(fh, info.offset, buf, info.length, MPI_BYTE, &status);
537 
538  int count;
539  MPI_Get_count(&status, MPI_BYTE, &count);
540 
541  if (count != info.length) {
542  vout.crucial(vl, "Error at %s: read error. content length mismatch.\n", __func__);
543  exit(EXIT_FAILURE);
544  }
545 
546  content = std::string(buf);
547 
548  return 1;
549  }
550 
551 
552 //====================================================================
553  int report_file_info(const LIME_file_info& file_info)
554  {
555  Bridge::VerboseLevel vlo = vl;
556 
558 
559  for (LIME_file_info::const_iterator p = file_info.begin(); p != file_info.end(); ++p) {
560  vout.detailed(vl, "Message:\n");
561 
562  for (LIME_message_info::const_iterator q = p->begin(); q != p->end(); ++q) {
563  vout.detailed(vl, "\tRecord:\n");
564  vout.detailed(vl, "\t\toffset = %lu\n", q->offset);
565  vout.detailed(vl, "\t\tsize = %lu\n", q->length);
566  vout.detailed(vl, "\t\ttype = %s\n", q->type);
567  }
568  }
569 
570  vl = vlo;
571 
572  return 0;
573  }
574 
575 
576 //====================================================================
577  size_t write_lime_header(MPI_File& fh, const char *type, const size_t length, const uint16_t flag)
578  {
579  LIME_header header;
580 
581  memset(&header, 0, sizeof(LIME_header));
582 
583  header.magic = LIME_MAGIC;
584  header.version = (uint16_t)1;
585  header.bitfield = flag;
586  strncpy(header.type, type, 128);
587  header.length = length;
588 
589  if (FieldIO::is_bigendian() == false) {
590  FieldIO::convert_endian(&header.magic, 4, 1);
591  FieldIO::convert_endian(&header.version, 2, 1);
592  FieldIO::convert_endian(&header.bitfield, 2, 1);
593  FieldIO::convert_endian(&header.length, 8, 1);
594  }
595 
596  MPI_Status status;
597  int ret = MPI_File_write(fh, (void *)&header, sizeof(LIME_header), MPI_BYTE, &status);
598 
599  if (ret) {
600  vout.crucial(vl, "%s: write header failed.\n", __func__);
601  return 0;
602  }
603 
604  return sizeof(LIME_header); // length written.
605  }
606 
607 
608 //====================================================================
609  size_t write_lime_record(MPI_File& fh, const char *type, const char *content, const size_t length, const uint16_t flag)
610  {
611  const char blank[8] = "";
612 
613  if (write_lime_header(fh, type, length, flag) == 0) {
614  return 0;
615  }
616 
617  const size_t padding_size = (8 - length % 8) % 8;
618 
619  MPI_Status status;
620  int ret = MPI_File_write(fh, const_cast<char *>(content), length, MPI_BYTE, &status);
621  if (ret) {
622  vout.crucial(vl, "%s: write content failed.\n", __func__);
623  return 0;
624  }
625 
626  if (padding_size > 0) {
627  ret = MPI_File_write(fh, const_cast<char *>(blank), padding_size, MPI_BYTE, &status);
628  if (ret) {
629  vout.crucial(vl, "%s: write padding failed.\n", __func__);
630  return 0;
631  }
632  }
633 
634  return sizeof(LIME_header) + length + padding_size;
635  }
636 
637 
638 //--------------------------------------------------------------------
639 } // unnamed namespace
640 #endif /* USE_MPI */
641 
642 //====================================================================
643 //============================================================END=====
BridgeIO vout
Definition: bridgeIO.cpp:503
static const std::string class_name
Definition: fieldIO.h:56
void detailed(const char *format,...)
Definition: bridgeIO.cpp:216
void set(const int jin, const int site, const int jex, double v)
Definition: field.h:175
void general(const char *format,...)
Definition: bridgeIO.cpp:197
virtual void file_to_field(int &s, int &t, const int i, const int j) const =0
virtual int nex() const =0
Container of Field-type object.
Definition: field.h:45
virtual int nin() const =0
double cmp(const int jin, const int site, const int jex) const
Definition: field.h:143
void write_file(Field *v, const std::string filename)
write data to file.
static int broadcast(size_t size, void *data, int sender)
static int ipe(const int dir)
logical coordinate of current proc.
static bool is_bigendian()
Definition: fieldIO.cpp:206
static MPI_Comm & world()
retrieves current communicator.
const IO_Format::Format * m_format
Definition: fieldIO.h:62
void crucial(const char *format,...)
Definition: bridgeIO.cpp:178
void read_file(Field *v, const std::string filename)
read data from file.
Bridge::VerboseLevel vl
VerboseLevel
Definition: bridgeIO.h:42
static void convert_endian(void *buf, size_t size, size_t nmemb)
check if machine byte order is big-endian.
Definition: fieldIO.cpp:227
static bool is_primary()
check if the present node is primary in small communicator.
static long_t Lvol()
Bridge::VerboseLevel m_vl
Definition: fieldIO.h:64