PetaVision  Alpha
CheckpointableFileStream.cpp
1 #include "CheckpointableFileStream.hpp"
2 
3 namespace PV {
4 
6  string const &path,
7  bool newFile,
8  Checkpointer *checkpointer,
9  string const &objName,
10  bool verifyWrites) {
11  initialize(path, newFile, checkpointer, objName, verifyWrites);
12 }
13 
15  string const &path,
16  bool newFile,
17  Checkpointer *checkpointer,
18  string const &objName) {
19  initialize(path, newFile, checkpointer, objName, checkpointer->doesVerifyWrites());
20 }
21 
22 void CheckpointableFileStream::initialize(
23  string const &path,
24  bool newFile,
25  Checkpointer *checkpointer,
26  string const &objName,
27  bool verifyWrites) {
28  FatalIf(
29  checkpointer->getMPIBlock()->getRank() != 0,
30  "CheckpointableFileStream (path \"%s\") called by non-root process.\n",
31  path.c_str());
32  string fullPath = checkpointer->makeOutputPathFilename(path);
33 
34  bool createFile = newFile;
35  if (!newFile) {
36  // Test if file exists. If not, issue a warning and set createFile to true.
37  struct stat statbuf;
38  if (PV_stat(fullPath.c_str(), &statbuf) != 0) {
39  if (errno == ENOENT) {
40  WarnLog().printf(
41  "%s: file \"%s\" does not exist. Creating new file.\n",
42  getDescription_c(),
43  fullPath.c_str());
44  createFile = true;
45  }
46  else {
47  Fatal().printf(
48  "%s: error checking whether file \"%s\" exists: %s \n",
49  getDescription_c(),
50  fullPath.c_str(),
51  strerror(errno));
52  }
53  }
54  }
55  if (createFile) {
56  char fullPathCopy[fullPath.size() + 1];
57  std::memcpy(fullPathCopy, fullPath.c_str(), fullPath.size());
58  fullPathCopy[fullPath.size()] = '\0';
59  char *dirName = dirname(fullPathCopy);
60  ensureDirExists(checkpointer->getMPIBlock(), dirName);
61  FileStream fileStream(fullPath.c_str(), std::ios_base::out, verifyWrites);
62  }
63 
64  mObjName = objName;
65  setDescription();
66  setOutStream(mFStream);
67  openFile(fullPath.c_str(), std::ios_base::in | std::ios_base::out, verifyWrites);
68  updateFilePos();
69  registerData(checkpointer);
70 }
71 
72 void CheckpointableFileStream::setDescription() {
73  description = "CheckpointableFileStream \"";
74  description.append(mObjName).append("\"");
75 }
76 
77 Response::Status CheckpointableFileStream::respond(std::shared_ptr<BaseMessage const> message) {
78  auto status = Response::NO_ACTION;
79  if (message == nullptr) {
80  return status;
81  }
82  else if (
83  ProcessCheckpointReadMessage const *castMessage =
84  dynamic_cast<ProcessCheckpointReadMessage const *>(message.get())) {
85  status = respondProcessCheckpointRead(castMessage);
86  }
87  return status;
88 }
89 
90 Response::Status CheckpointableFileStream::respondProcessCheckpointRead(
91  ProcessCheckpointReadMessage const *message) {
92  syncFilePos();
93  return Response::SUCCESS;
94 }
95 
96 Response::Status CheckpointableFileStream::registerData(Checkpointer *checkpointer) {
97  auto status = CheckpointerDataInterface::registerData(checkpointer);
98  if (!Response::completed(status)) {
99  return status;
100  }
101  checkpointer->registerCheckpointData<long>(
102  mObjName, string("FileStreamRead"), &mFileReadPos, (std::size_t)1, false, false);
103  checkpointer->registerCheckpointData<long>(
104  mObjName, string("FileStreamWrite"), &mFileWritePos, (std::size_t)1, false, false);
105  return Response::SUCCESS;
106 }
107 
108 // When restoring from checkpoint, the variables mFileReadPos and mFileWritePos
109 // will be modified. This method checks to see if this has happened, and seeks
110 // to the correct location in the files.
111 void CheckpointableFileStream::syncFilePos() {
112  if (mFileReadPos != getInPos()) {
113  setInPos(mFileReadPos, true);
114  }
115  if (mFileWritePos != getOutPos()) {
116  setOutPos(mFileWritePos, true);
117  }
118 }
119 
120 void CheckpointableFileStream::updateFilePos() {
121  mFileReadPos = getInPos();
122  mFileWritePos = getOutPos();
123 }
124 
125 void CheckpointableFileStream::write(void const *data, long length) {
126  syncFilePos();
127  FileStream::write(data, length);
128  updateFilePos();
129 }
130 
131 void CheckpointableFileStream::read(void *data, long length) {
132  syncFilePos();
133  FileStream::read(data, length);
134  updateFilePos();
135 }
136 
137 void CheckpointableFileStream::setOutPos(long pos, bool fromBeginning) {
138  if (!fromBeginning) {
139  syncFilePos();
140  }
141  FileStream::setOutPos(pos, fromBeginning);
142  updateFilePos();
143 }
144 
145 void CheckpointableFileStream::setInPos(long pos, bool fromBeginning) {
146  if (!fromBeginning) {
147  syncFilePos();
148  }
149  FileStream::setInPos(pos, fromBeginning);
150  updateFilePos();
151 }
152 } // end namespace PV
CheckpointableFileStream(string const &path, bool newFile, Checkpointer *checkpointer, string const &objName, bool verifyWrites)
static bool completed(Status &a)
Definition: Response.hpp:49
int getRank() const
Definition: MPIBlock.hpp:100
std::string makeOutputPathFilename(std::string const &path)