PetaVision  Alpha
fileio.cpp
1 /*
2  * fileio.cpp
3  *
4  * Created on: Oct 21, 2009
5  * Author: Craig Rasmussen
6  */
7 
8 #include "fileio.hpp"
9 #include "connections/weight_conversions.hpp"
10 #include "structures/Buffer.hpp"
11 #include "utils/BufferUtilsMPI.hpp"
12 #include "utils/PVLog.hpp"
13 #include "utils/conversions.h"
14 
15 #include <assert.h>
16 #include <iostream>
17 
18 #undef DEBUG_OUTPUT
19 
20 namespace PV {
21 
22 // Unused function timeToParams was removed Mar 10, 2017.
23 // Unused function timeFromParams was removed Mar 15, 2017.
24 // Unused function pv_sizeof was removed Mar 15, 2017.
25 // Unused function pv_sizeof_patch was removed Mar 15, 2017.
26 
27 PV_Stream *PV_fopen(const char *path, const char *mode, bool verifyWrites) {
28  if (mode == NULL) {
29  ErrorLog().printf("PV_fopen: mode argument must be a string (path was \"%s\").\n", path);
30  errno = EINVAL;
31  return NULL;
32  }
33  char *realPath = strdup(expandLeadingTilde(path).c_str());
34  long filepos = 0L;
35  long filelength = 0L;
36  if (mode[0] == 'r' || mode[0] == 'a') {
37  struct stat statbuf;
38  int statstatus = stat(realPath, &statbuf);
39  if (statstatus == 0) {
40  filelength = (long)statbuf.st_size;
41  if (mode[0] == 'a') {
42  filepos = filelength;
43  }
44  }
45  else if (errno != ENOENT) {
46  Fatal().printf(
47  "PV_fopen: unable to stat \"%s\" with mode \"%s\": %s\n",
48  realPath,
49  mode,
50  strerror(errno));
51  }
52  }
53  int fopencounts = 0;
54  PV_Stream *streampointer = NULL;
55  FILE *fp = NULL;
56  while (fp == NULL) {
57  errno = 0;
58  fp = fopen(realPath, mode);
59  if (fp != NULL)
60  break;
61  fopencounts++;
62  WarnLog().printf(
63  "fopen failure for \"%s\" on attempt %d: %s\n", realPath, fopencounts, strerror(errno));
64  if (fopencounts < MAX_FILESYSTEMCALL_TRIES) {
65  sleep(1);
66  }
67  else {
68  break;
69  }
70  }
71  if (fp == NULL) {
72  ErrorLog().printf(
73  "PV_fopen: exceeded MAX_FILESYSTEMCALL_TRIES = %d attempting to open \"%s\"\n",
74  MAX_FILESYSTEMCALL_TRIES,
75  realPath);
76  }
77  else {
78  if (fopencounts > 0) {
79  WarnLog().printf("fopen succeeded for \"%s\" on attempt %d\n", realPath, fopencounts + 1);
80  }
81  streampointer = (PV_Stream *)calloc(1, sizeof(PV_Stream));
82  if (streampointer != NULL) {
83  streampointer->name = strdup(realPath);
84  streampointer->mode = strdup(mode);
85  streampointer->fp = fp;
86  streampointer->filepos = filepos;
87  streampointer->filelength = filelength;
88  streampointer->isfile = 1;
89  streampointer->verifyWrites = verifyWrites;
90  }
91  else {
92  ErrorLog().printf("PV_fopen failure for \"%s\": %s\n", realPath, strerror(errno));
93  fclose(fp);
94  }
95  }
96  free(realPath);
97  return streampointer;
98 }
99 
100 int PV_stat(const char *path, struct stat *buf) {
101  // Call stat library function, trying up to MAX_FILESYSTEMCALL_TRIES times if an error is
102  // returned.
103  // If an error results on all MAX_FILESYSTEMCALL_TRIES times, returns -1 (the error return value)
104  // for stat()
105  // and errno is the error of the last attempt.
106  char *realPath = strdup(expandLeadingTilde(path).c_str());
107  int attempt = 0;
108  int retval = -1;
109  while (retval != 0) {
110  errno = 0;
111  retval = stat(realPath, buf);
112  if (retval == 0)
113  break;
114  attempt++;
115  WarnLog().printf(
116  "stat() failure for \"%s\" on attempt %d: %s\n", path, attempt, strerror(errno));
117  if (attempt < MAX_FILESYSTEMCALL_TRIES) {
118  sleep(1);
119  }
120  else {
121  break;
122  }
123  }
124  if (retval != 0) {
125  ErrorLog().printf(
126  "PV_stat exceeded MAX_FILESYSTEMCALL_TRIES = %d for \"%s\"\n",
127  MAX_FILESYSTEMCALL_TRIES,
128  path);
129  }
130  free(realPath);
131  return retval;
132 }
133 
134 long int PV_ftell_primitive(PV_Stream *pvstream) {
135  // Calls ftell() and returns value ftell returns, but doesn't compare or change stream's fpos
136  int ftellcounts = 0;
137  long filepos = -1;
138  while (filepos < 0) {
139  errno = 0;
140  filepos = ftell(pvstream->fp);
141  if (filepos >= 0)
142  break;
143  ftellcounts++;
144  WarnLog().printf(
145  "ftell failure for \"%s\" on attempt %d: %s\n",
146  pvstream->name,
147  ftellcounts,
148  strerror(errno));
149  if (ftellcounts < MAX_FILESYSTEMCALL_TRIES) {
150  sleep(1);
151  }
152  else {
153  break;
154  }
155  }
156  if (filepos < 0) {
157  ErrorLog().printf(
158  "PV_ftell failure for \"%s\": MAX_FILESYSTEMCALL_TRIES = %d exceeded\n",
159  pvstream->name,
160  MAX_FILESYSTEMCALL_TRIES);
161  }
162  else if (ftellcounts > 0) {
163  WarnLog().printf(
164  "PV_ftell succeeded for \"%s\" on attempt %d", pvstream->name, ftellcounts + 1);
165  }
166  return filepos;
167 }
168 
169 long int getPV_StreamFilepos(PV_Stream *pvstream) { return pvstream->filepos; }
170 
171 // Use getPV_StreamFilepos instead of PV_ftell whenever possible, since NMC cluster's ftell is
172 // currently unreliable
173 long int PV_ftell(PV_Stream *pvstream) {
174  long int filepos = PV_ftell_primitive(pvstream);
175  if (pvstream->filepos != filepos) {
176  WarnLog().printf(
177  "ftell for \"%s\" returned %ld instead of the expected %ld\n",
178  pvstream->name,
179  filepos,
180  pvstream->filepos);
181  }
182  return filepos;
183 }
184 
185 int PV_fseek(PV_Stream *pvstream, long offset, int whence) {
186  int fseekcounts = 0;
187  int fseekstatus = -1;
188  while (fseekstatus != 0) {
189  errno = 0;
190  fseekstatus = fseek(pvstream->fp, offset, whence);
191  if (fseekstatus == 0)
192  break;
193  fseekcounts++;
194  WarnLog().printf(
195  "fseek failure for \"%s\" on attempt %d: %s\n",
196  pvstream->name,
197  fseekcounts,
198  strerror(errno));
199  if (fseekcounts < MAX_FILESYSTEMCALL_TRIES) {
200  sleep(1);
201  }
202  else {
203  break;
204  }
205  }
206  if (fseekstatus != 0) {
207  ErrorLog().printf(
208  "PV_fseek failure for \"%s\": MAX_FILESYSTEMCALL_TRIES = %d exceeded\n",
209  pvstream->name,
210  MAX_FILESYSTEMCALL_TRIES);
211  }
212  else if (fseekcounts > 0) {
213  WarnLog().printf(
214  "PV_fseek succeeded for \"%s\" on attempt %d\n", pvstream->name, fseekcounts + 1);
215  }
216  if (pvstream->mode[0] != 'a') {
217  switch (whence) {
218  case SEEK_SET: pvstream->filepos = offset; break;
219  case SEEK_CUR: pvstream->filepos += offset; break;
220  case SEEK_END: pvstream->filepos = pvstream->filelength + offset; break;
221  default: assert(0); break;
222  }
223  }
224  return fseekstatus;
225 }
226 
257 size_t
258 PV_fwrite(const void *RESTRICT ptr, size_t size, size_t nitems, PV_Stream *RESTRICT pvstream) {
259  assert(ferror(pvstream->fp) == 0);
260  int fwritecounts = 0;
261  size_t writesize = nitems * size;
262  size_t charswritten = (size_t)0;
263  const char *RESTRICT curptr = (const char *RESTRICT)ptr;
264  long int fpos = pvstream->filepos;
265  if (fpos < 0) {
266  Fatal().printf(
267  "PV_fwrite error: unable to determine file position of \"%s\". Fatal error\n",
268  pvstream->name);
269  }
270  long int ftellresult = ftell(pvstream->fp);
271  if (pvstream->isfile && fpos != ftellresult) {
272  Fatal().printf(
273  "PV_fwrite error for \"%s\": fpos = %ld but ftell() returned %ld\n",
274  pvstream->name,
275  fpos,
276  ftellresult);
277  exit(EXIT_FAILURE);
278  }
279  bool hasfailed = false;
280  for (int fwritecounts = 1; fwritecounts <= MAX_FILESYSTEMCALL_TRIES; fwritecounts++) {
281  charswritten = fwrite(ptr, 1UL, writesize, pvstream->fp);
282  if (charswritten == writesize) {
283  if (hasfailed) {
284  clearerr(pvstream->fp);
285  WarnLog().printf(
286  "fwrite succeeded for \"%s\" on attempt %d.\n", pvstream->name, fwritecounts);
287  }
288  break;
289  }
290  else {
291  hasfailed = true;
292  WarnLog(fwriteFailure);
293  fwriteFailure.printf(
294  "fwrite failure for \"%s\" on attempt %d. Return value %zu instead of %zu. ",
295  pvstream->name,
296  fwritecounts,
297  charswritten,
298  writesize);
299  if (ferror(pvstream->fp)) {
300  fwriteFailure.printf(" Error: %s\n", strerror(errno));
301  }
302  if (fwritecounts < MAX_FILESYSTEMCALL_TRIES) {
303  fwriteFailure.printf("Retrying.\n");
304  sleep(1);
305  int fseekstatus = fseek(pvstream->fp, fpos, SEEK_SET);
306  if (fseekstatus != 0) {
307  Fatal().printf(
308  "PV_fwrite error: Unable to reset file position of \"%s\". Fatal error: %s\n",
309  pvstream->name,
310  strerror(errno));
311  }
312  long int ftellreturn = ftell(pvstream->fp);
313  if (fpos != ftellreturn) {
314  Fatal().printf(
315  "PV_fwrite error: attempted to reset file position of \"%s\" to %ld, but "
316  "ftell() returned %ld. Fatal error.\n",
317  pvstream->name,
318  fpos,
319  ftellreturn);
320  }
321  }
322  else {
323  ErrorLog().printf("MAX_FILESYSTEMCALL_TRIES exceeded.\n");
324  return (size_t)0;
325  }
326  }
327  }
328  if (pvstream->verifyWrites && pvstream->isfile) {
329  fflush(pvstream->fp);
330  int status = PV_SUCCESS;
331  PV_Stream *readStream = PV_fopen(pvstream->name, "r", false /*verifyWrites*/);
332  if (readStream == NULL) {
333  ErrorLog().printf(
334  "PV_fwrite verification: unable to open \"%s\" for reading: %s\n",
335  pvstream->name,
336  strerror(errno));
337  status = PV_FAILURE;
338  }
339  if (status == PV_SUCCESS) {
340  if (fseek(readStream->fp, pvstream->filepos, SEEK_SET) != 0) {
341  ErrorLog().printf(
342  "PV_fwrite verification: unable to verify \"%s\" write of %zu chars from "
343  "position %ld: %s\n",
344  pvstream->name,
345  writesize,
346  pvstream->filepos,
347  strerror(errno));
348  status = PV_FAILURE;
349  }
350  }
351  char *read_buffer = NULL;
352  if (status == PV_SUCCESS) {
353  read_buffer = (char *)malloc(writesize);
354  if (read_buffer == NULL) {
355  ErrorLog().printf(
356  "PV_fwrite verification: unable to create readback buffer of size %zu to verify "
357  "\"%s\"\n",
358  writesize,
359  pvstream->name);
360  status = PV_FAILURE;
361  }
362  }
363  if (status == PV_SUCCESS) {
364  for (size_t n = 0; n < writesize; n++) {
365  read_buffer[n] = ~((char *)ptr)[n];
366  } // Make sure read_buffer is different from ptr before reading
367  }
368  if (status == PV_SUCCESS) {
369  size_t numread = fread(read_buffer, (size_t)1, writesize, readStream->fp);
370  if (numread != writesize) {
371  ErrorLog().printf(
372  "PV_fwrite verification: unable to read into readback buffer for \"%s\": fread "
373  "returned %zu instead of %zu\n",
374  pvstream->name,
375  numread,
376  writesize);
377  status = PV_FAILURE;
378  }
379  }
380  if (status == PV_SUCCESS) {
381  status = memcmp(ptr, read_buffer, writesize) == 0 ? PV_SUCCESS : PV_FAILURE;
382  if (status != PV_SUCCESS) {
383  size_t badcount = 0;
384  for (size_t n = 0; n < writesize; n++) {
385  badcount += (((char *)ptr)[n] != read_buffer[n]);
386  }
387  ErrorLog().printf(
388  "PV_fwrite verification: readback of %zu bytes from \"%s\" starting at position "
389  "%zu failed: %zu bytes disagree.\n",
390  writesize,
391  pvstream->name,
392  pvstream->filepos,
393  badcount);
394  }
395  }
396  free(read_buffer);
397  if (readStream) {
398  PV_fclose(readStream);
399  readStream = NULL;
400  }
401  if (status != PV_SUCCESS) {
402  fseek(pvstream->fp, pvstream->filepos, SEEK_SET);
403  return 0;
404  }
405  }
406  pvstream->filepos += writesize;
407  return nitems;
408 }
409 
410 size_t PV_fread(void *RESTRICT ptr, size_t size, size_t nitems, PV_Stream *RESTRICT pvstream) {
411  int freadcounts = 0;
412  size_t readsize = nitems * size;
413  size_t stilltoread = readsize;
414  char *RESTRICT curptr = (char *RESTRICT)ptr;
415  long int fpos = pvstream->filepos;
416  clearerr(pvstream->fp);
417  if (fpos < 0) {
418  Fatal().printf(
419  "PV_fread error: unable to determine file position of \"%s\". Fatal error\n",
420  pvstream->name);
421  }
422  while (stilltoread != 0UL) {
423  size_t charsread_thispass = fread(curptr, 1UL, stilltoread, pvstream->fp);
424  stilltoread -= charsread_thispass;
425  pvstream->filepos += charsread_thispass;
426  if (stilltoread == 0UL) {
427  if (freadcounts > 0) {
428  WarnLog().printf(
429  "fread succeeded for \"%s\" on attempt %d.\n", pvstream->name, freadcounts + 1);
430  }
431  break;
432  }
433  else {
434  if (feof(pvstream->fp)) {
435  WarnLog().printf(
436  "fread failure for \"%s\": end of file reached with %lu characters still "
437  "unread.\n",
438  pvstream->name,
439  stilltoread);
440  break;
441  }
442  }
443  curptr += charsread_thispass;
444  freadcounts++;
445  if (freadcounts < MAX_FILESYSTEMCALL_TRIES) {
446  WarnLog().printf(
447  "fread failure for \"%s\" on attempt %d. %lu bytes read; %lu bytes still to read "
448  "so far.\n",
449  pvstream->name,
450  freadcounts,
451  charsread_thispass,
452  stilltoread);
453  sleep(1);
454  }
455  else {
456  ErrorLog().printf(
457  "PV_fread failure for \"%s\": MAX_FILESYSTEMCALL_TRIES = %d exceeded, and %lu bytes "
458  "of %lu read.\n",
459  pvstream->name,
460  MAX_FILESYSTEMCALL_TRIES,
461  readsize - stilltoread,
462  readsize);
463  break;
464  }
465  }
466  return (readsize - stilltoread) / size;
467 }
468 
469 int PV_fclose(PV_Stream *pvstream) {
470  int status = PV_SUCCESS;
471  if (pvstream) {
472  if (pvstream->fp && pvstream->isfile) {
473  status = fclose(pvstream->fp);
474  pvstream->fp = NULL;
475  if (status != 0) {
476  ErrorLog().printf("fclose failure for \"%s\": %s", pvstream->name, strerror(errno));
477  }
478  }
479  free(pvstream->name);
480  free(pvstream->mode);
481  free(pvstream);
482  pvstream = NULL;
483  }
484  return status;
485 }
486 
487 int checkDirExists(MPIBlock const *mpiBlock, const char *dirname, struct stat *pathstat) {
488  // check if the given directory name exists for the rank zero process
489  // the return value is zero if a successful stat(2) call and the error
490  // if unsuccessful. pathstat contains the result of the buffer from the stat call.
491  // The rank zero process is the only one that calls stat();
492  // nonzero rank processes return PV_SUCCESS immediately.
493  pvAssert(pathstat);
494 
495  int rank = mpiBlock->getRank();
496  if (rank != 0) {
497  return 0;
498  }
499  int status;
500  int errorcode;
501  char *expandedDirName = strdup(expandLeadingTilde(dirname).c_str());
502  status = stat(dirname, pathstat);
503  free(expandedDirName);
504  return status ? errno : 0;
505 }
506 
507 static inline int makeDirectory(char const *dir) {
508  mode_t dirmode = S_IRWXU | S_IRWXG | S_IRWXO;
509  int status = 0;
510 
511  char *workingDir = strdup(dir);
512  FatalIf(workingDir == nullptr, "makeDirectory: unable to duplicate path \"%s\".", dir);
513 
514  int len = strlen(workingDir);
515  if (workingDir[len - 1] == '/')
516  workingDir[len - 1] = '\0';
517 
518  for (char *p = workingDir + 1; *p; p++)
519  if (*p == '/') {
520  *p = '\0';
521  status |= mkdir(workingDir, dirmode);
522  if (status != 0 && errno != EEXIST) {
523  return status;
524  }
525  *p = '/';
526  }
527  status |= mkdir(workingDir, dirmode);
528  if (errno == EEXIST) {
529  status = 0;
530  }
531  return status;
532 }
533 
534 void ensureDirExists(MPIBlock const *mpiBlock, char const *dirname) {
535  // If rank zero, see if path exists, and try to create it if it doesn't.
536  // If not rank zero, the routine does nothing.
537  int rank = mpiBlock->getRank();
538  struct stat pathstat;
539  int resultcode = checkDirExists(mpiBlock, dirname, &pathstat);
540 
541  if (resultcode == 0) { // mOutputPath exists; now check if it's a directory.
542  FatalIf(
543  rank == 0 && !(pathstat.st_mode & S_IFDIR),
544  "Path \"%s\" exists but is not a directory\n",
545  dirname);
546  }
547  else if (resultcode == ENOENT /* No such file or directory */) {
548  if (rank == 0) {
549  InfoLog().printf("Directory \"%s\" does not exist; attempting to create\n", dirname);
550 
551  // Try up to 5 times until it works
552  int const numAttempts = 5;
553  for (int attemptNum = 0; attemptNum < numAttempts; attemptNum++) {
554  int mkdirstatus = makeDirectory(dirname);
555  if (mkdirstatus != 0) {
556  if (attemptNum == numAttempts - 1) {
557  Fatal().printf(
558  "Directory \"%s\" could not be created: %s; Exiting\n",
559  dirname,
560  strerror(errno));
561  }
562  else {
563  getOutputStream().flush();
564  WarnLog().printf(
565  "Directory \"%s\" could not be created: %s; Retrying %d out of %d\n",
566  dirname,
567  strerror(errno),
568  attemptNum + 1,
569  numAttempts);
570  sleep(1);
571  }
572  }
573  else {
574  break;
575  }
576  }
577  }
578  }
579  else {
580  if (rank == 0) {
581  ErrorLog().printf(
582  "Error checking status of directory \"%s\": %s\n", dirname, strerror(resultcode));
583  }
584  exit(EXIT_FAILURE);
585  }
586 }
587 
588 // Unused function getNumGlobalPatches was removed Mar 15, 2017.
589 // Instead, use calcNumberOfPatches in utils/BufferUtilsPvp.*
590 
591 // Unused function pvp_open_read_file was removed Mar 23, 2017. Instead, construct a FileStream.
592 // Unused function pvp_open_write_file was removed Mar 10, 2017. Instead, construct a FileStream.
593 // Unused function pvp_close_file was removed Mar 23, 2017.
594 // Unused function pvp_check_file_header was removed Mar 15, 2017.
595 // Unused functions pvp_read_header and pvp_write_header were removed Mar 15, 2017.
596 // Unused function pvp_set_activity_params was removed Jan 26, 2017.
597 // Unused function pvp_set_weight_params was removed Jan 26, 2017.
598 // Unused function pvp_set_nonspiking_act_params was removed Feb 21, 2017.
599 // Unused function pvp_set_nonspiking_sparse_act_params was removed Feb 21, 2017.
600 // Unused function alloc_params was removed Feb 21, 2017.
601 
602 // writeActivity and writeActivitySparse removed Feb 17, 2017.
603 // Corresponding HyPerLayer methods now use BufferUtils routines
604 // gatherActivity and scatterActivity were also removed.
605 // Use BufferUtils::gather and BufferUtils::scatter instead.
606 
607 // readWeights was removed Mar 15, 2017. Use the WeightsFileIO class instead.
608 
609 int pv_text_write_patch(
610  PrintStream *outStream,
611  Patch const *patch,
612  float *data,
613  int nf,
614  int sx,
615  int sy,
616  int sf) {
617  int f, i, j;
618 
619  const int nx = (int)patch->nx;
620  const int ny = (int)patch->ny;
621 
622  assert(outStream != NULL);
623 
624  for (f = 0; f < nf; f++) {
625  for (j = 0; j < ny; j++) {
626  for (i = 0; i < nx; i++) {
627  outStream->printf("%7.5f ", (double)data[i * sx + j * sy + f * sf]);
628  }
629  outStream->printf("\n");
630  }
631  outStream->printf("\n");
632  }
633 
634  return 0;
635 }
636 
637 } // namespace PV