PetaVision  Alpha
Checkpointer.cpp
1 /*
2  * Checkpointer.cpp
3  *
4  * Created on Sep 28, 2016
5  * Author: Pete Schultz
6  */
7 
8 #include "Checkpointer.hpp"
9 
10 #include "checkpointing/CheckpointingMessages.hpp"
11 #include <cerrno>
12 #include <climits>
13 // #include <cmath>
14 // #include <cstring>
15 #include <fts.h>
16 #include <signal.h>
17 #include <sys/stat.h>
18 #include <sys/types.h>
19 // #include <unistd.h>
20 #define DEFAULT_OUTPUT_PATH "output"
21 
22 namespace PV {
23 
24 Checkpointer::Checkpointer(
25  std::string const &name,
26  MPIBlock const *globalMPIBlock,
27  Arguments const *arguments)
28  : mName(name) {
29  initMPIBlock(globalMPIBlock, arguments);
30  initBlockDirectoryName();
31 
32  mOutputPath = arguments->getStringArgument("OutputPath");
33  mWarmStart = arguments->getBooleanArgument("Restart");
34  mCheckpointReadDirectory = arguments->getStringArgument("CheckpointReadDirectory");
35  if (!mCheckpointReadDirectory.empty()) {
36  extractCheckpointReadDirectory();
37  }
38 
39  mTimeInfoCheckpointEntry = std::make_shared<CheckpointEntryData<Checkpointer::TimeInfo>>(
40  std::string("timeinfo"), mMPIBlock, &mTimeInfo, (size_t)1, true /*broadcast*/);
41  // This doesn't get put into mCheckpointRegistry because we handle the timeinfo separately.
42  mCheckpointTimer = new Timer(mName.c_str(), "column", "checkpoint");
43  registerTimer(mCheckpointTimer);
44 }
45 
46 Checkpointer::~Checkpointer() {
47  free(mCheckpointWriteDir);
48  free(mCheckpointWriteTriggerModeString);
49  free(mCheckpointWriteWallclockUnit);
50  free(mLastCheckpointDir);
51  free(mInitializeFromCheckpointDir);
52  delete mCheckpointTimer;
53  delete mMPIBlock;
54 }
55 
56 void Checkpointer::initMPIBlock(MPIBlock const *globalMPIBlock, Arguments const *arguments) {
57  pvAssert(mMPIBlock == nullptr);
58  int cellNumRows = arguments->getIntegerArgument("CheckpointCellNumRows");
59  int cellNumColumns = arguments->getIntegerArgument("CheckpointCellNumColumns");
60  int cellBatchDimension = arguments->getIntegerArgument("CheckpointCellBatchDimension");
61  // If using batching, mCheckpointReadDir might be a comma-separated list of directories
62  mMPIBlock = new MPIBlock(
63  globalMPIBlock->getComm(),
64  globalMPIBlock->getNumRows(),
65  globalMPIBlock->getNumColumns(),
66  globalMPIBlock->getBatchDimension(),
67  cellNumRows,
68  cellNumColumns,
69  cellBatchDimension);
70 }
71 
72 void Checkpointer::initBlockDirectoryName() {
73  mBlockDirectoryName.clear();
74  if (mMPIBlock->getGlobalNumRows() != mMPIBlock->getNumRows()
75  or mMPIBlock->getGlobalNumColumns() != mMPIBlock->getNumColumns()
76  or mMPIBlock->getGlobalBatchDimension() != mMPIBlock->getBatchDimension()) {
77  int const blockColumnIndex = mMPIBlock->getStartColumn() / mMPIBlock->getNumColumns();
78  int const blockRowIndex = mMPIBlock->getStartRow() / mMPIBlock->getNumRows();
79  int const blockBatchIndex = mMPIBlock->getStartBatch() / mMPIBlock->getBatchDimension();
80  mBlockDirectoryName.append("block_");
81  mBlockDirectoryName.append("col" + std::to_string(blockColumnIndex));
82  mBlockDirectoryName.append("row" + std::to_string(blockRowIndex));
83  mBlockDirectoryName.append("elem" + std::to_string(blockBatchIndex));
84  }
85 }
86 
87 std::string Checkpointer::makeOutputPathFilename(std::string const &path) {
88  FatalIf(path[0] == '/', "makeOutputPathFilename called with absolute path argument\n");
89  std::string fullPath(mOutputPath);
90  if (!mBlockDirectoryName.empty()) {
91  fullPath.append("/").append(mBlockDirectoryName);
92  }
93  fullPath.append("/").append(path);
94  return fullPath;
95 }
96 
97 void Checkpointer::ioParams(enum ParamsIOFlag ioFlag, PVParams *params) {
98  ioParamsFillGroup(ioFlag, params);
99 
100  // If WarmStart is set and CheckpointWrite is false, CheckpointReadDirectory
101  // is LastCheckpointDir. If WarmStart is set and CheckpointWrite is true, we
102  // CheckpointReadDirectory is the last checkpoint in CheckpointWriteDir.
103  // Hence we cannot set CheckpointReadDirectory until we've read the params.
104  if (mWarmStart and ioFlag == PARAMS_IO_READ) {
105  // Arguments class should prevent -r and -c from both being set.
106  pvAssert(mCheckpointReadDirectory.empty());
107  if (mCheckpointWriteFlag) {
108  // Set mCheckpointReadDirectory to the last checkpoint in the CheckpointWrite directory.
109  findWarmStartDirectory();
110  }
111  else {
112  mCheckpointReadDirectory = mLastCheckpointDir;
113  }
114  }
115 }
116 
117 void Checkpointer::ioParamsFillGroup(enum ParamsIOFlag ioFlag, PVParams *params) {
118  ioParam_outputPath(ioFlag, params);
119  ioParam_verifyWrites(ioFlag, params);
120  ioParam_checkpointWrite(ioFlag, params);
121  ioParam_checkpointWriteDir(ioFlag, params);
122  ioParam_checkpointWriteTriggerMode(ioFlag, params);
123  ioParam_checkpointWriteStepInterval(ioFlag, params);
124  ioParam_checkpointWriteTimeInterval(ioFlag, params);
125  ioParam_checkpointWriteClockInterval(ioFlag, params);
126  ioParam_checkpointWriteClockUnit(ioFlag, params);
127  ioParam_checkpointIndexWidth(ioFlag, params);
128  ioParam_suppressNonplasticCheckpoints(ioFlag, params);
129  ioParam_deleteOlderCheckpoints(ioFlag, params);
130  ioParam_numCheckpointsKept(ioFlag, params);
131  ioParam_lastCheckpointDir(ioFlag, params);
132  ioParam_initializeFromCheckpointDir(ioFlag, params);
133 }
134 
135 void Checkpointer::ioParam_verifyWrites(enum ParamsIOFlag ioFlag, PVParams *params) {
136  params->ioParamValue(ioFlag, mName.c_str(), "verifyWrites", &mVerifyWrites, mVerifyWrites);
137 }
138 
139 void Checkpointer::ioParam_outputPath(enum ParamsIOFlag ioFlag, PVParams *params) {
140  // If mOutputPath is set in the configuration, it overrides params file.
141  switch (ioFlag) {
142  case PARAMS_IO_READ:
143  if (mOutputPath.empty()) {
144  if (params->stringPresent(mName.c_str(), "outputPath")) {
145  mOutputPath = std::string(params->stringValue(mName.c_str(), "outputPath"));
146  }
147  else {
148  mOutputPath = std::string(DEFAULT_OUTPUT_PATH);
149  if (getMPIBlock()->getGlobalRank() == 0) {
150  WarnLog().printf(
151  "Output path specified neither in command line nor in params file.\n"
152  "Output path set to default \"%s\"\n",
153  DEFAULT_OUTPUT_PATH);
154  }
155  }
156  }
157  break;
158  case PARAMS_IO_WRITE: params->writeParamString("outputPath", mOutputPath.c_str()); break;
159  default: break;
160  }
161 }
162 
163 void Checkpointer::ioParam_checkpointWrite(enum ParamsIOFlag ioFlag, PVParams *params) {
164  params->ioParamValue(
165  ioFlag, mName.c_str(), "checkpointWrite", &mCheckpointWriteFlag, mCheckpointWriteFlag);
166 }
167 
168 void Checkpointer::ioParam_checkpointWriteDir(enum ParamsIOFlag ioFlag, PVParams *params) {
169  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
170  if (mCheckpointWriteFlag) {
171  params->ioParamStringRequired(
172  ioFlag, mName.c_str(), "checkpointWriteDir", &mCheckpointWriteDir);
173  if (ioFlag == PARAMS_IO_READ) {
174  ensureDirExists(mMPIBlock, mCheckpointWriteDir);
175  }
176  }
177 }
178 
179 void Checkpointer::ioParam_checkpointWriteTriggerMode(enum ParamsIOFlag ioFlag, PVParams *params) {
180  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
181  if (mCheckpointWriteFlag) {
182  params->ioParamString(
183  ioFlag,
184  mName.c_str(),
185  "checkpointWriteTriggerMode",
186  &mCheckpointWriteTriggerModeString,
187  "step");
188  if (ioFlag == PARAMS_IO_READ) {
189  pvAssert(mCheckpointWriteTriggerModeString);
190  if (!strcmp(mCheckpointWriteTriggerModeString, "step")
191  || !strcmp(mCheckpointWriteTriggerModeString, "Step")
192  || !strcmp(mCheckpointWriteTriggerModeString, "STEP")) {
193  mCheckpointWriteTriggerMode = STEP;
194  registerCheckpointData(
195  mName,
196  std::string("nextCheckpointStep"),
197  &mNextCheckpointStep,
198  (std::size_t)1,
199  true /*broadcast*/,
200  false /*not constant entire run*/);
201  }
202  else if (
203  !strcmp(mCheckpointWriteTriggerModeString, "time")
204  || !strcmp(mCheckpointWriteTriggerModeString, "Time")
205  || !strcmp(mCheckpointWriteTriggerModeString, "TIME")) {
206  mCheckpointWriteTriggerMode = SIMTIME;
207  registerCheckpointData(
208  mName,
209  std::string("nextCheckpointTime"),
210  &mNextCheckpointSimtime,
211  (std::size_t)1,
212  true /*broadcast*/,
213  false /*not constant entire run*/);
214  }
215  else if (
216  !strcmp(mCheckpointWriteTriggerModeString, "clock")
217  || !strcmp(mCheckpointWriteTriggerModeString, "Clock")
218  || !strcmp(mCheckpointWriteTriggerModeString, "CLOCK")) {
219  mCheckpointWriteTriggerMode = WALLCLOCK;
220  }
221  else {
222  if (mMPIBlock->getRank() == 0) {
223  ErrorLog() << "Parameter group \"" << mName << "\" checkpointWriteTriggerMode \""
224  << mCheckpointWriteTriggerModeString << "\" is not recognized.\n";
225  }
226  MPI_Barrier(mMPIBlock->getComm());
227  exit(EXIT_FAILURE);
228  }
229  }
230  }
231 }
232 
233 void Checkpointer::ioParam_checkpointWriteStepInterval(enum ParamsIOFlag ioFlag, PVParams *params) {
234  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
235  if (mCheckpointWriteFlag) {
236  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWriteTriggerMode"));
237  if (mCheckpointWriteTriggerMode == STEP) {
238  params->ioParamValue(
239  ioFlag,
240  mName.c_str(),
241  "checkpointWriteStepInterval",
242  &mCheckpointWriteStepInterval,
243  mCheckpointWriteStepInterval);
244  }
245  }
246 }
247 
248 void Checkpointer::ioParam_checkpointWriteTimeInterval(enum ParamsIOFlag ioFlag, PVParams *params) {
249  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
250  if (mCheckpointWriteFlag) {
251  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWriteTriggerMode"));
252  if (mCheckpointWriteTriggerMode == SIMTIME) {
253  params->ioParamValue(
254  ioFlag,
255  mName.c_str(),
256  "checkpointWriteTimeInterval",
257  &mCheckpointWriteSimtimeInterval,
258  mCheckpointWriteSimtimeInterval);
259  }
260  }
261 }
262 
264  enum ParamsIOFlag ioFlag,
265  PVParams *params) {
266  assert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
267  if (mCheckpointWriteFlag) {
268  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWriteTriggerMode"));
269  if (mCheckpointWriteTriggerMode == WALLCLOCK) {
270  params->ioParamValueRequired(
271  ioFlag,
272  mName.c_str(),
273  "checkpointWriteClockInterval",
274  &mCheckpointWriteWallclockInterval);
275  }
276  }
277 }
278 
279 void Checkpointer::ioParam_checkpointWriteClockUnit(enum ParamsIOFlag ioFlag, PVParams *params) {
280  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
281  if (mCheckpointWriteFlag) {
282  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWriteTriggerMode"));
283  if (mCheckpointWriteTriggerMode == WALLCLOCK) {
284  assert(
285  !params->presentAndNotBeenRead(
286  mName.c_str(), "checkpointWriteTriggerClockInterval"));
287  params->ioParamString(
288  ioFlag,
289  mName.c_str(),
290  "checkpointWriteClockUnit",
291  &mCheckpointWriteWallclockUnit,
292  "seconds");
293  if (ioFlag == PARAMS_IO_READ) {
294  pvAssert(mCheckpointWriteWallclockUnit);
295  for (size_t n = 0; n < strlen(mCheckpointWriteWallclockUnit); n++) {
296  mCheckpointWriteWallclockUnit[n] = tolower(mCheckpointWriteWallclockUnit[n]);
297  }
298  if (!strcmp(mCheckpointWriteWallclockUnit, "second")
299  || !strcmp(mCheckpointWriteWallclockUnit, "seconds")
300  || !strcmp(mCheckpointWriteWallclockUnit, "sec")
301  || !strcmp(mCheckpointWriteWallclockUnit, "s")) {
302  free(mCheckpointWriteWallclockUnit);
303  mCheckpointWriteWallclockUnit = strdup("seconds");
304  mCheckpointWriteWallclockIntervalSeconds = mCheckpointWriteWallclockInterval;
305  }
306  else if (
307  !strcmp(mCheckpointWriteWallclockUnit, "minute")
308  || !strcmp(mCheckpointWriteWallclockUnit, "minutes")
309  || !strcmp(mCheckpointWriteWallclockUnit, "min")
310  || !strcmp(mCheckpointWriteWallclockUnit, "m")) {
311  free(mCheckpointWriteWallclockUnit);
312  mCheckpointWriteWallclockUnit = strdup("minutes");
313  mCheckpointWriteWallclockIntervalSeconds =
314  mCheckpointWriteWallclockInterval * (time_t)60;
315  }
316  else if (
317  !strcmp(mCheckpointWriteWallclockUnit, "hour")
318  || !strcmp(mCheckpointWriteWallclockUnit, "hours")
319  || !strcmp(mCheckpointWriteWallclockUnit, "hr")
320  || !strcmp(mCheckpointWriteWallclockUnit, "h")) {
321  free(mCheckpointWriteWallclockUnit);
322  mCheckpointWriteWallclockUnit = strdup("hours");
323  mCheckpointWriteWallclockIntervalSeconds =
324  mCheckpointWriteWallclockInterval * (time_t)3600;
325  }
326  else if (
327  !strcmp(mCheckpointWriteWallclockUnit, "day")
328  || !strcmp(mCheckpointWriteWallclockUnit, "days")) {
329  free(mCheckpointWriteWallclockUnit);
330  mCheckpointWriteWallclockUnit = strdup("days");
331  mCheckpointWriteWallclockIntervalSeconds =
332  mCheckpointWriteWallclockInterval * (time_t)86400;
333  }
334  else {
335  if (mMPIBlock->getRank() == 0) {
336  ErrorLog().printf(
337  "checkpointWriteClockUnit \"%s\" is unrecognized. Use \"seconds\", "
338  "\"minutes\", \"hours\", or \"days\".\n",
339  mCheckpointWriteWallclockUnit);
340  }
341  MPI_Barrier(mMPIBlock->getComm());
342  exit(EXIT_FAILURE);
343  }
344  FatalIf(
345  mCheckpointWriteWallclockUnit == nullptr,
346  "Error in global rank %d process converting checkpointWriteClockUnit: %s\n",
347  mMPIBlock->getRank(),
348  strerror(errno));
349  }
350  }
351  }
352 }
353 
354 void Checkpointer::ioParam_deleteOlderCheckpoints(enum ParamsIOFlag ioFlag, PVParams *params) {
355  assert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
356  if (mCheckpointWriteFlag) {
357  params->ioParamValue(
358  ioFlag,
359  mName.c_str(),
360  "deleteOlderCheckpoints",
361  &mDeleteOlderCheckpoints,
362  false /*default value*/);
363  }
364 }
365 
366 void Checkpointer::ioParam_numCheckpointsKept(enum ParamsIOFlag ioFlag, PVParams *params) {
367  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
368  if (mCheckpointWriteFlag) {
369  pvAssert(!params->presentAndNotBeenRead(mName.c_str(), "deleteOlderCheckpoints"));
370  if (mDeleteOlderCheckpoints) {
371  params->ioParamValue(ioFlag, mName.c_str(), "numCheckpointsKept", &mNumCheckpointsKept, 1);
372  if (ioFlag == PARAMS_IO_READ && mNumCheckpointsKept <= 0) {
373  if (mMPIBlock->getRank() == 0) {
374  ErrorLog() << "HyPerCol \"" << mName
375  << "\": numCheckpointsKept must be positive (value was "
376  << mNumCheckpointsKept << ")\n";
377  }
378  MPI_Barrier(mMPIBlock->getComm());
379  exit(EXIT_FAILURE);
380  }
381  if (ioFlag == PARAMS_IO_READ) {
382  if (mNumCheckpointsKept < 0) {
383  if (mMPIBlock->getRank() == 0) {
384  ErrorLog() << "HyPerCol \"" << mName
385  << "\": numCheckpointsKept must be positive (value was "
386  << mNumCheckpointsKept << ")\n";
387  }
388  MPI_Barrier(mMPIBlock->getComm());
389  exit(EXIT_FAILURE);
390  }
391  if (mOldCheckpointDirectories.size() != 0) {
392  WarnLog() << "ioParamsFillGroup called after list of old checkpoint directories was "
393  "created. Reinitializing.\n";
394  }
395  mOldCheckpointDirectories.resize(mNumCheckpointsKept, "");
396  mOldCheckpointDirectoriesIndex = 0;
397  }
398  }
399  }
400 }
401 
402 void Checkpointer::ioParam_checkpointIndexWidth(enum ParamsIOFlag ioFlag, PVParams *params) {
403  assert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
404  if (mCheckpointWriteFlag) {
405  params->ioParamValue(
406  ioFlag,
407  mName.c_str(),
408  "checkpointIndexWidth",
409  &mCheckpointIndexWidth,
410  mCheckpointIndexWidth);
411  }
412 }
413 
415  enum ParamsIOFlag ioFlag,
416  PVParams *params) {
417  assert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
418  if (mCheckpointWriteFlag) {
419  params->ioParamValue(
420  ioFlag,
421  mName.c_str(),
422  "suppressNonplasticCheckpoints",
423  &mSuppressNonplasticCheckpoints,
424  mSuppressNonplasticCheckpoints);
425  }
426 }
427 
428 void Checkpointer::ioParam_lastCheckpointDir(enum ParamsIOFlag ioFlag, PVParams *params) {
429  assert(!params->presentAndNotBeenRead(mName.c_str(), "checkpointWrite"));
430  if (!mCheckpointWriteFlag) {
431  params->ioParamStringRequired(
432  ioFlag, mName.c_str(), "lastCheckpointDir", &mLastCheckpointDir);
433  }
434 }
435 
436 void Checkpointer::ioParam_initializeFromCheckpointDir(enum ParamsIOFlag ioFlag, PVParams *params) {
437  params->ioParamString(
438  ioFlag,
439  mName.c_str(),
440  "initializeFromCheckpointDir",
441  &mInitializeFromCheckpointDir,
442  "",
443  true);
444  if (ioFlag == PARAMS_IO_READ and mInitializeFromCheckpointDir != nullptr
445  and mInitializeFromCheckpointDir[0] != '\0') {
446  verifyDirectory(mInitializeFromCheckpointDir, "InitializeFromCheckpointDir.\n");
447  }
448 }
449 
450 void Checkpointer::provideFinalStep(long int finalStep) {
451  if (mCheckpointIndexWidth < 0) {
452  mWidthOfFinalStepNumber = (int)std::floor(std::log10((float)finalStep)) + 1;
453  }
454 }
455 
457  mObserverTable.addObject(observer->getDescription(), observer);
458 }
459 
460 bool Checkpointer::registerCheckpointEntry(
461  std::shared_ptr<CheckpointEntry> checkpointEntry,
462  bool constantEntireRun) {
463  if (mSuppressNonplasticCheckpoints && constantEntireRun) {
464  return true;
465  }
466  std::string const &name = checkpointEntry->getName();
467  for (auto &c : mCheckpointRegistry) {
468  if (c->getName() == checkpointEntry->getName()) {
469  return false;
470  }
471  }
472  mCheckpointRegistry.push_back(checkpointEntry);
473  return true;
474 }
475 
476 void Checkpointer::registerTimer(Timer const *timer) { mTimers.push_back(timer); }
477 
478 void Checkpointer::readNamedCheckpointEntry(
479  std::string const &objName,
480  std::string const &dataName,
481  bool constantEntireRun) {
482  std::string checkpointEntryName(objName);
483  if (!(objName.empty() || dataName.empty())) {
484  checkpointEntryName.append("_");
485  }
486  checkpointEntryName.append(dataName);
487  readNamedCheckpointEntry(checkpointEntryName, constantEntireRun);
488 }
489 
490 void Checkpointer::readNamedCheckpointEntry(
491  std::string const &checkpointEntryName,
492  bool constantEntireRun) {
493  if (mSuppressNonplasticCheckpoints and constantEntireRun) {
494  return;
495  }
496  std::string checkpointDirectory = generateBlockPath(mInitializeFromCheckpointDir);
497  for (auto &c : mCheckpointRegistry) {
498  if (c->getName() == checkpointEntryName) {
499  double timestamp = 0.0; // not used
500  c->read(checkpointDirectory, &timestamp);
501  return;
502  }
503  }
504  Fatal() << "initializeFromCheckpoint failed to find checkpointEntryName " << checkpointEntryName
505  << "\n";
506 }
507 
508 void Checkpointer::findWarmStartDirectory() {
509  char warmStartDirectoryBuffer[PV_PATH_MAX];
510  if (mMPIBlock->getRank() == 0) {
511  if (mCheckpointWriteFlag) {
512  // Look for largest indexed Checkpointnnnnnn directory in checkpointWriteDir
513  pvAssert(mCheckpointWriteDir);
514  std::string cpDirString = mCheckpointWriteDir;
515  if (cpDirString.c_str()[cpDirString.length() - 1] != '/') {
516  cpDirString += "/";
517  }
518  struct stat statbuf;
519  int statstatus = PV_stat(cpDirString.c_str(), &statbuf);
520  if (statstatus == 0) {
521  if (statbuf.st_mode & S_IFDIR) {
522  char *dirs[] = {mCheckpointWriteDir, nullptr};
523  FTS *fts = fts_open(dirs, FTS_LOGICAL, nullptr);
524  FTSENT *ftsent = fts_read(fts);
525  bool found = false;
526  long int cpIndex = LONG_MIN;
527  std::string indexedDir;
528  for (ftsent = fts_children(fts, 0); ftsent != nullptr; ftsent = ftsent->fts_link) {
529  if (ftsent->fts_statp->st_mode & S_IFDIR) {
530  long int x;
531  int k = sscanf(ftsent->fts_name, "Checkpoint%ld", &x);
532  if (x > cpIndex) {
533  cpIndex = x;
534  indexedDir = ftsent->fts_name;
535  found = true;
536  }
537  }
538  }
539  FatalIf(
540  !found,
541  "restarting but checkpointWriteFlag is set and "
542  "checkpointWriteDir directory \"%s\" does not have any "
543  "checkpoints\n",
544  mCheckpointWriteDir);
545  mCheckpointReadDirectory = cpDirString;
546  mCheckpointReadDirectory.append(indexedDir);
547  }
548  else {
549  Fatal().printf(
550  "checkpoint read directory \"%s\" is "
551  "not a directory.\n",
552  mCheckpointWriteDir);
553  }
554  }
555  else if (errno == ENOENT) {
556  Fatal().printf(
557  "restarting but neither Last nor checkpointWriteDir "
558  "directory \"%s\" exists.\n",
559  mCheckpointWriteDir);
560  }
561  }
562  else {
563  pvAssert(mLastCheckpointDir);
564  FatalIf(
565  mLastCheckpointDir[0] == '\0',
566  "Restart flag set, but unable to determine restart directory.\n");
567  mCheckpointReadDirectory = strdup(mLastCheckpointDir);
568  }
569  FatalIf(
570  mCheckpointReadDirectory.size() >= PV_PATH_MAX,
571  "Restart flag set, but inferred checkpoint read directory is too long (%zu "
572  "bytes).\n",
573  mCheckpointReadDirectory.size());
574  memcpy(
575  warmStartDirectoryBuffer,
576  mCheckpointReadDirectory.c_str(),
577  mCheckpointReadDirectory.size());
578  warmStartDirectoryBuffer[mCheckpointReadDirectory.size()] = '\0';
579  }
580  MPI_Bcast(warmStartDirectoryBuffer, PV_PATH_MAX, MPI_CHAR, 0, mMPIBlock->getComm());
581  if (mMPIBlock->getRank() != 0) {
582  mCheckpointReadDirectory = warmStartDirectoryBuffer;
583  }
584 }
585 
586 void Checkpointer::readStateFromCheckpoint() {
587  if (getInitializeFromCheckpointDir() and getInitializeFromCheckpointDir()[0]) {
588  notify(
589  mObserverTable,
590  std::make_shared<ReadStateFromCheckpointMessage<Checkpointer>>(this),
591  mMPIBlock->getRank() == 0 /*printFlag*/);
592  }
593 }
594 
596  std::vector<std::string> checkpointReadDirs;
597  checkpointReadDirs.reserve(mMPIBlock->getBatchDimension());
598  std::size_t dirStart = (std::size_t)0;
599  while (dirStart < mCheckpointReadDirectory.size()) {
600  std::size_t dirStop = mCheckpointReadDirectory.find(':', dirStart);
601  if (dirStop == std::string::npos) {
602  dirStop = mCheckpointReadDirectory.size();
603  }
604  checkpointReadDirs.push_back(mCheckpointReadDirectory.substr(dirStart, dirStop - dirStart));
605  FatalIf(
606  checkpointReadDirs.size() > (std::size_t)mMPIBlock->getBatchDimension(),
607  "Checkpoint read parsing error: Too many colon separated "
608  "checkpoint read directories. "
609  "Only specify %d checkpoint directories.\n",
610  mMPIBlock->getBatchDimension());
611  dirStart = dirStop + 1;
612  }
613  // Make sure number matches up
614  int const count = (int)checkpointReadDirs.size();
615  FatalIf(
616  count != mMPIBlock->getBatchDimension() && count != 1,
617  "Checkpoint read parsing error: Not enough colon separated "
618  "checkpoint read directories. "
619  "Running with %d batch MPIs but only %d colon separated checkpoint "
620  "directories.\n",
621  mMPIBlock->getBatchDimension(),
622  count);
623  // Grab the directory for this rank and use as mCheckpointReadDir
624  int const checkpointIndex = count == 1 ? 0 : mMPIBlock->getBatchIndex();
625  std::string dirString = expandLeadingTilde(checkpointReadDirs[checkpointIndex].c_str());
626  mCheckpointReadDirectory = dirString.c_str();
627  pvAssert(!mCheckpointReadDirectory.empty());
628 
629  if (getMPIBlock()->getGlobalRank() == 0) {
630  InfoLog().printf(
631  "Setting CheckpointReadDirectory to %s.\n",
632  mMPIBlock->getGlobalRank(),
633  mCheckpointReadDirectory.c_str());
634  }
635 }
636 
637 void Checkpointer::checkpointRead(double *simTimePointer, long int *currentStepPointer) {
638  verifyDirectory(mCheckpointReadDirectory.c_str(), "CheckpointReadDirectory");
639  std::string checkpointReadDirectory = generateBlockPath(mCheckpointReadDirectory);
640  double readTime;
641  for (auto &c : mCheckpointRegistry) {
642  c->read(checkpointReadDirectory, &readTime);
643  }
644  mTimeInfoCheckpointEntry->read(checkpointReadDirectory.c_str(), &readTime);
645  if (simTimePointer) {
646  *simTimePointer = mTimeInfo.mSimTime;
647  }
648  if (currentStepPointer) {
649  *currentStepPointer = mTimeInfo.mCurrentCheckpointStep;
650  }
651  notify(
652  mObserverTable,
653  std::make_shared<ProcessCheckpointReadMessage const>(checkpointReadDirectory),
654  mMPIBlock->getRank() == 0 /*printFlag*/);
655 }
656 
657 void Checkpointer::checkpointWrite(double simTime) {
658  mTimeInfo.mSimTime = simTime;
659  // set mSimTime here so that it is available in routines called by checkpointWrite.
660  if (!mCheckpointWriteFlag) {
661  return;
662  }
663  bool isScheduled = scheduledCheckpoint(); // Is a checkpoint scheduled to occur here?
664  // If there is both a SIGUSR1 and scheduled checkpoint, we call checkpointWriteSignal
665  // but not checkpointNow, because SIGUSR1-generated checkpoints shouldn't be deleted.
666  if (receivedSignal()) {
667  checkpointWriteSignal();
668  }
669  else if (isScheduled) {
670  checkpointNow();
671  }
672  mTimeInfo.mCurrentCheckpointStep++;
673  // increment step number here so that initial conditions correspond to step zero, etc.
674 }
675 
677  int checkpointSignal;
678  if (mMPIBlock->getGlobalRank() == 0) {
679  int sigstatus = PV_SUCCESS;
680  sigset_t pollusr1;
681 
682  sigstatus = sigpending(&pollusr1);
683  assert(sigstatus == 0);
684  checkpointSignal = sigismember(&pollusr1, SIGUSR1);
685  assert(checkpointSignal == 0 || checkpointSignal == 1);
686  if (checkpointSignal) {
687  sigstatus = sigemptyset(&pollusr1);
688  assert(sigstatus == 0);
689  sigstatus = sigaddset(&pollusr1, SIGUSR1);
690  assert(sigstatus == 0);
691  int result = 0;
692  sigwait(&pollusr1, &result);
693  assert(result == SIGUSR1);
694  }
695  }
696  MPI_Bcast(&checkpointSignal, 1 /*count*/, MPI_INT, 0, mMPIBlock->getGlobalComm());
697  return (checkpointSignal != 0);
698 }
699 
701  bool isScheduled = false;
702  switch (mCheckpointWriteTriggerMode) {
703  case NONE:
704  // Only NONE if checkpointWrite is off, in which case this method should not get called
705  pvAssert(0);
706  break;
707  case STEP: isScheduled = scheduledStep(); break;
708  case SIMTIME: isScheduled = scheduledSimTime(); break;
709  case WALLCLOCK: isScheduled = scheduledWallclock(); break;
710  default: pvAssert(0); break;
711  }
712  return isScheduled;
713 }
714 
716  bool isScheduled = false;
717  pvAssert(mCheckpointWriteStepInterval > 0);
718  if (mTimeInfo.mCurrentCheckpointStep % mCheckpointWriteStepInterval == 0) {
719  mNextCheckpointStep = mTimeInfo.mCurrentCheckpointStep + mCheckpointWriteStepInterval;
720  isScheduled = true;
721  }
722  return isScheduled;
723 }
724 
726  bool isScheduled = false;
727  if (mTimeInfo.mSimTime >= mNextCheckpointSimtime) {
728  mNextCheckpointSimtime += mCheckpointWriteSimtimeInterval;
729  isScheduled = true;
730  }
731  return isScheduled;
732 }
733 
735  bool isScheduled = false;
736  std::time_t currentTime;
737  if (mMPIBlock->getGlobalRank() == 0) {
738  currentTime = std::time(nullptr);
739  }
740  MPI_Bcast(&currentTime, sizeof(currentTime), MPI_CHAR, 0, mMPIBlock->getComm());
741  if (currentTime == (std::time_t)(-1)) {
742  throw;
743  }
744  double elapsed = std::difftime(currentTime, mLastCheckpointWallclock);
745  if (elapsed >= mCheckpointWriteWallclockInterval) {
746  isScheduled = true;
747  mLastCheckpointWallclock = currentTime;
748  }
749  return isScheduled;
750 }
751 
753  InfoLog().printf(
754  "Global rank %d: checkpointing in response to SIGUSR1 at time %f.\n",
755  mMPIBlock->getGlobalRank(),
756  mTimeInfo.mSimTime);
757  std::string checkpointDirectory = makeCheckpointDirectoryFromCurrentStep();
758  checkpointToDirectory(checkpointDirectory);
759 }
760 
761 std::string Checkpointer::makeCheckpointDirectoryFromCurrentStep() {
762  std::stringstream checkpointDirStream;
763  checkpointDirStream << mCheckpointWriteDir << "/Checkpoint";
764  int fieldWidth = mCheckpointIndexWidth < 0 ? mWidthOfFinalStepNumber : mCheckpointIndexWidth;
765  checkpointDirStream.fill('0');
766  checkpointDirStream.width(fieldWidth);
767  checkpointDirStream << mTimeInfo.mCurrentCheckpointStep;
768  std::string checkpointDirectory = checkpointDirStream.str();
769  return checkpointDirectory;
770 }
771 
773  std::string checkpointDirectory = makeCheckpointDirectoryFromCurrentStep();
774  if (checkpointDirectory != mCheckpointReadDirectory) {
775  /* Note: the strcmp isn't perfect, since there are multiple ways to specify a path that
776  * points to the same directory. Should use realpath, but that breaks under OS X. */
777  if (mMPIBlock->getGlobalRank() == 0) {
778  InfoLog() << "Checkpointing to \"" << checkpointDirectory
779  << "\", simTime = " << mTimeInfo.mSimTime << "\n";
780  }
781  }
782  else {
783  if (mMPIBlock->getGlobalRank() == 0) {
784  InfoLog().printf(
785  "Skipping checkpoint to \"%s\","
786  " which would clobber the checkpointRead checkpoint.\n",
787  checkpointDirectory.c_str());
788  }
789  return;
790  }
791  checkpointToDirectory(checkpointDirectory);
792 
793  if (mDeleteOlderCheckpoints) {
794  rotateOldCheckpoints(checkpointDirectory);
795  }
796 }
797 
798 void Checkpointer::checkpointToDirectory(std::string const &directory) {
799  std::string checkpointDirectory = generateBlockPath(directory);
800  mCheckpointTimer->start();
801  if (mMPIBlock->getRank() == 0) {
802  InfoLog() << "Checkpointing to directory \"" << checkpointDirectory
803  << "\" at simTime = " << mTimeInfo.mSimTime << "\n";
804  struct stat timeinfostat;
805  std::string timeinfoFilename(checkpointDirectory);
806  timeinfoFilename.append("/timeinfo.bin");
807  int statstatus = stat(timeinfoFilename.c_str(), &timeinfostat);
808  if (statstatus == 0) {
809  WarnLog() << "Checkpoint directory \"" << checkpointDirectory
810  << "\" has existing timeinfo.bin, which is now being deleted.\n";
811  mTimeInfoCheckpointEntry->remove(checkpointDirectory);
812  }
813  }
814  notify(
815  mObserverTable,
816  std::make_shared<PrepareCheckpointWriteMessage const>(checkpointDirectory),
817  mMPIBlock->getRank() == 0 /*printFlag*/);
818  ensureDirExists(mMPIBlock, checkpointDirectory.c_str());
819  for (auto &c : mCheckpointRegistry) {
820  c->write(checkpointDirectory, mTimeInfo.mSimTime, mVerifyWrites);
821  }
822  mTimeInfoCheckpointEntry->write(checkpointDirectory, mTimeInfo.mSimTime, mVerifyWrites);
823  mCheckpointTimer->stop();
824  mCheckpointTimer->start();
825  writeTimers(checkpointDirectory);
826  mCheckpointTimer->stop();
827  if (mMPIBlock->getRank() == 0) {
828  InfoLog().printf("checkpointWrite complete. simTime = %f\n", mTimeInfo.mSimTime);
829  InfoLog().flush();
830  }
831 }
832 
833 void Checkpointer::finalCheckpoint(double simTime) {
834  mTimeInfo.mSimTime = simTime;
835  if (mCheckpointWriteFlag) {
836  checkpointNow();
837  }
838  else if (mLastCheckpointDir != nullptr && mLastCheckpointDir[0] != '\0') {
839  checkpointToDirectory(std::string(mLastCheckpointDir));
840  }
841 }
842 
843 void Checkpointer::rotateOldCheckpoints(std::string const &newCheckpointDirectory) {
844  std::string &oldestCheckpointDir = mOldCheckpointDirectories[mOldCheckpointDirectoriesIndex];
845  if (!oldestCheckpointDir.empty()) {
846  if (mMPIBlock->getRank() == 0) {
847  std::string targetDirectory = generateBlockPath(oldestCheckpointDir);
848  struct stat lcp_stat;
849  int statstatus = stat(targetDirectory.c_str(), &lcp_stat);
850  if (statstatus != 0 || !(lcp_stat.st_mode & S_IFDIR)) {
851  if (statstatus == 0) {
852  ErrorLog().printf(
853  "Failed to delete older checkpoint: failed to stat \"%s\": %s.\n",
854  targetDirectory.c_str(),
855  strerror(errno));
856  }
857  else {
858  ErrorLog().printf(
859  "Deleting older checkpoint: \"%s\" exists but is not a directory.\n",
860  targetDirectory.c_str());
861  }
862  }
863  sync();
864  std::string rmrf_string("");
865  rmrf_string = rmrf_string + "rm -r '" + targetDirectory + "'";
866  int rmrf_result = system(rmrf_string.c_str());
867  if (rmrf_result != 0) {
868  WarnLog().printf(
869  "unable to delete older checkpoint \"%s\": rm command returned %d\n",
870  targetDirectory.c_str(),
871  WEXITSTATUS(rmrf_result));
872  }
873  }
874  MPI_Barrier(mMPIBlock->getGlobalComm());
875  if (mMPIBlock->getGlobalRank() == 0) {
876  sync();
877  struct stat oldcp_stat;
878  int statstatus = stat(oldestCheckpointDir.c_str(), &oldcp_stat);
879  if (statstatus == 0 && (oldcp_stat.st_mode & S_IFDIR)) {
880  int rmdirstatus = rmdir(oldestCheckpointDir.c_str());
881  if (rmdirstatus) {
882  ErrorLog().printf(
883  "Unable to delete older checkpoint \"%s\": rmdir command returned %d "
884  "(%s)\n",
885  oldestCheckpointDir.c_str(),
886  errno,
887  std::strerror(errno));
888  }
889  }
890  }
891  }
892  mOldCheckpointDirectories[mOldCheckpointDirectoriesIndex] = newCheckpointDirectory;
893  mOldCheckpointDirectoriesIndex++;
894  if (mOldCheckpointDirectoriesIndex == mNumCheckpointsKept) {
895  mOldCheckpointDirectoriesIndex = 0;
896  }
897 }
898 
899 void Checkpointer::writeTimers(PrintStream &stream) const {
900  for (auto timer : mTimers) {
901  timer->fprint_time(stream);
902  }
903 }
904 
905 std::string Checkpointer::generateBlockPath(std::string const &baseDirectory) {
906  std::string path(baseDirectory);
907  if (!mBlockDirectoryName.empty()) {
908  path.append("/").append(mBlockDirectoryName);
909  }
910  return path;
911 }
912 
913 void Checkpointer::verifyDirectory(char const *directory, std::string const &description) {
914  int status = PV_SUCCESS;
915  if (mMPIBlock->getRank() == 0) {
916  if (directory == nullptr || directory[0] == '\0') {
917  ErrorLog() << "Checkpointer \"" << mName << "\": " << description << " is not set.\n";
918  status = PV_FAILURE;
919  }
920  struct stat directoryStat;
921  int statResult = stat(expandLeadingTilde(directory).c_str(), &directoryStat);
922  if (statResult != 0) {
923  ErrorLog() << "Checkpointer \"" << mName << "\": checking status of " << description
924  << " \"" << directory << "\" returned error \"" << strerror(errno) << "\".\n";
925  status = PV_FAILURE;
926  }
927  bool isDirectory = S_ISDIR(directoryStat.st_mode);
928  if (!isDirectory) {
929  ErrorLog() << "Checkpointer \"" << mName << "\": " << description << " \"" << directory
930  << " is not a directory.\n";
931  }
932  if (status) {
933  exit(EXIT_FAILURE);
934  }
935  }
936 }
937 
938 void Checkpointer::writeTimers(std::string const &directory) {
939  if (mMPIBlock->getRank() == 0) {
940  std::string timerpathstring = directory;
941  timerpathstring += "/";
942  timerpathstring += "timers.txt";
943 
944  const char *timerpath = timerpathstring.c_str();
945  FileStream timerstream(timerpath, std::ios_base::out, mVerifyWrites);
946  writeTimers(timerstream);
947  }
948 }
949 
950 std::string const Checkpointer::mDefaultOutputPath = "output";
951 } // namespace PV
void ioParam_suppressNonplasticCheckpoints(enum ParamsIOFlag ioFlag, PVParams *params)
void checkpointWriteSignal()
void ioParam_checkpointWriteClockUnit(enum ParamsIOFlag ioFlag, PVParams *params)
checkpointWriteClockInteval: If checkpointWrite on clock, specifies the units used in checkpointWrite...
void rotateOldCheckpoints(std::string const &newCheckpointDirectory)
virtual void addObserver(Observer *observer) override
void ioParam_deleteOlderCheckpoints(enum ParamsIOFlag ioFlag, PVParams *params)
deleteOlderCheckpoints: If checkpointWrite, specifies if the run should delete older checkpoints when...
void ioParam_checkpointWriteTriggerMode(enum ParamsIOFlag ioFlag, PVParams *params)
mCheckpointWriteTriggerMode: If checkpointWrite is set, specifies the method to checkpoint.
virtual void ioParam_verifyWrites(enum ParamsIOFlag ioFlag, PVParams *params)
verifyWrites: If true, calls to FileStream::write are checked by opening the file in read mode and re...
void ioParam_initializeFromCheckpointDir(enum ParamsIOFlag ioFlag, PVParams *params)
initializeFromCheckpointDir: Sets directory used by Checkpointer::initializeFromCheckpoint(). Layers and connections use this directory if they set their initializeFromCheckpointFlag parameter.
bool scheduledCheckpoint()
int getGlobalRank() const
Definition: MPIBlock.hpp:105
virtual void ioParam_outputPath(enum ParamsIOFlag ioFlag, PVParams *params)
mOutputPath: Specifies the absolute or relative output path of the run
void ioParam_checkpointWriteDir(enum ParamsIOFlag ioFlag, PVParams *params)
checkpointWriteDir: If checkpointWrite is set, specifies the output checkpoint directory.
void ioParam_checkpointWriteStepInterval(enum ParamsIOFlag ioFlag, PVParams *params)
checkpointWriteStepInterval: If checkpointWrite on step, specifies the number of steps between checkp...
void checkpointToDirectory(std::string const &checkpointDirectory)
void ioParam_lastCheckpointDir(enum ParamsIOFlag ioFlag, PVParams *params)
lastCheckpointDir: If checkpointWrite is not set, this required parameter specifies the directory to ...
void ioParam_checkpointIndexWidth(enum ParamsIOFlag ioFlag, PVParams *params)
If checkpointWrite is true, checkpointIndexWidth specifies the minimum width for the step number appe...
void ioParam_checkpointWrite(enum ParamsIOFlag ioFlag, PVParams *params)
checkpointWrite: Flag to determine if the run writes checkpoints.
void ioParam_numCheckpointsKept(enum ParamsIOFlag ioFlag, PVParams *params)
mNumCheckpointsKept: If mDeleteOlderCheckpoints is set, keep this many checkpoints before deleting th...
void ioParam_checkpointWriteTimeInterval(enum ParamsIOFlag ioFlag, PVParams *params)
checkpointWriteTimeInteval: If checkpointWrite on time, specifies the amount of simulation time betwe...
void extractCheckpointReadDirectory()
void ioParam_checkpointWriteClockInterval(enum ParamsIOFlag ioFlag, PVParams *params)
checkpointWriteClockInteval: If checkpointWrite on clock, specifies the amount of clock time between ...
std::string makeOutputPathFilename(std::string const &path)