PetaVision  Alpha
HyPerCol.cpp
1 /*
2  * HyPerCol.cpp
3  *
4  * Created on: Jul 30, 2008
5  * Author: Craig Rasmussen
6  */
7 
8 #define TIMER_ON
9 #define DEFAULT_DELTA_T 1.0 // time step size (msec)
10 
11 #include "HyPerCol.hpp"
12 #include "columns/Communicator.hpp"
13 #include "columns/Factory.hpp"
14 #include "columns/RandomSeed.hpp"
15 #include "io/PrintStream.hpp"
16 #include "io/io.hpp"
17 #include "pvGitRevision.h"
18 
19 #include <assert.h>
20 #include <cmath>
21 #include <csignal>
22 #include <float.h>
23 #include <fstream>
24 #include <fts.h>
25 #include <libgen.h>
26 #include <limits>
27 #include <memory.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string>
31 #include <sys/stat.h>
32 #include <sys/types.h>
33 #include <time.h>
34 #include <unistd.h>
35 
36 #ifdef PV_USE_OPENMP_THREADS
37 #include <omp.h>
38 #endif
39 
40 #ifdef PV_USE_CUDA
41 #include <cuda.h>
42 #include <cudnn.h>
43 #include <map>
44 #endif // PV_USE_CUDA
45 
46 namespace PV {
47 
48 HyPerCol::HyPerCol(PV_Init *initObj) {
49  initialize_base();
50  initialize(initObj);
51 }
52 
53 HyPerCol::~HyPerCol() {
54 #ifdef PV_USE_CUDA
55  finalizeCUDA();
56 #endif // PV_USE_CUDA
57  if (getCommunicator()->globalCommRank() == 0) {
58  PrintStream pStream(getOutputStream());
59  mCheckpointer->writeTimers(pStream);
60  }
61  delete mCheckpointer;
62  mObjectHierarchy.clear(true /*delete the objects in the hierarchy*/);
63  for (auto iterator = mPhaseRecvTimers.begin(); iterator != mPhaseRecvTimers.end();) {
64  delete *iterator;
65  iterator = mPhaseRecvTimers.erase(iterator);
66  }
67 
68  delete mRunTimer;
69  // TODO: Change these old C strings into std::string
70  free(mPrintParamsFilename);
71  free(mName);
72 }
73 
74 int HyPerCol::initialize_base() {
75  // Initialize all member variables to safe values. They will be set to their
76  // actual values in
77  // initialize()
78  mReadyFlag = false;
79  mParamsProcessedFlag = false;
80  mNumPhases = 0;
81  mCheckpointReadFlag = false;
82  mStopTime = 0.0;
83  mDeltaTime = DEFAULT_DELTA_T;
84  mWriteTimeScaleFieldnames = true;
85  mProgressInterval = 1.0;
86  mWriteProgressToErr = false;
87  mOrigStdOut = -1;
88  mOrigStdErr = -1;
89  mLayerStatus = nullptr;
90  mConnectionStatus = nullptr;
91  mPrintParamsFilename = nullptr;
92  mPrintParamsStream = nullptr;
93  mLuaPrintParamsStream = nullptr;
94  mNumXGlobal = 0;
95  mNumYGlobal = 0;
96  mNumBatch = 1;
97  mNumBatchGlobal = 1;
98  mOwnsCommunicator = true;
99  mParams = nullptr;
100  mCommunicator = nullptr;
101  mRunTimer = nullptr;
102  mPhaseRecvTimers.clear();
103  mRandomSeed = 0U;
104  mErrorOnNotANumber = false;
105  mNumThreads = 1;
106 #ifdef PV_USE_CUDA
107  mCudaDevice = nullptr;
108 #endif
109  return PV_SUCCESS;
110 }
111 
112 int HyPerCol::initialize(PV_Init *initObj) {
113  mPVInitObj = initObj;
114  mCommunicator = mPVInitObj->getCommunicator();
115  mParams = mPVInitObj->getParams();
116  if (mParams == nullptr) {
117  if (mCommunicator->globalCommRank() == 0) {
118  ErrorLog() << "HyPerCol::initialize: params have not been set." << std::endl;
119  MPI_Barrier(mCommunicator->communicator());
120  }
121  exit(EXIT_FAILURE);
122  }
123  std::string working_dir = mPVInitObj->getStringArgument("WorkingDirectory");
124  working_dir = expandLeadingTilde(working_dir);
125 
126  int numGroups = mParams->numberOfGroups();
127  std::string paramsFile = initObj->getStringArgument("ParamsFile");
128  if (numGroups == 0) {
129  ErrorLog() << "Params \"" << paramsFile << "\" does not define any groups.\n";
130  return PV_FAILURE;
131  }
132  if (strcmp(mParams->groupKeywordFromIndex(0), "HyPerCol")) {
133  std::string paramsFile = initObj->getStringArgument("ParamsFile");
134  ErrorLog() << "First group in the params file \"" << paramsFile
135  << "\" does not define a HyPerCol.\n";
136  return PV_FAILURE;
137  }
138  mName = strdup(mParams->groupNameFromIndex(0));
139  setDescription();
140 
141  // mNumThreads will not be set, or used until HyPerCol::run.
142  // This means that threading cannot happen in the initialization or
143  // communicateInitInfo stages,
144  // but that should not be a problem.
145  char const *programName = mPVInitObj->getProgramName();
146 
147  if (columnId() == 0 && !working_dir.empty()) {
148  int status = chdir(working_dir.c_str());
149  if (status) {
150  Fatal(chdirMessage);
151  chdirMessage.printf("Unable to switch directory to \"%s\"\n", working_dir.c_str());
152  chdirMessage.printf("chdir error: %s\n", strerror(errno));
153  }
154  }
155 
156 #ifdef PV_USE_MPI // Fail if there was a parsing error, but make sure nonroot
157  // processes don't kill
158  // the root process before the root process reaches the syntax error
159  int parsedStatus;
160  int rootproc = 0;
161  if (globalRank() == rootproc) {
162  parsedStatus = this->mParams->getParseStatus();
163  }
164  MPI_Bcast(&parsedStatus, 1, MPI_INT, rootproc, getCommunicator()->globalCommunicator());
165 #else
166  int parsedStatus = this->mParams->getParseStatus();
167 #endif
168  if (parsedStatus != 0) {
169  exit(parsedStatus);
170  }
171 
172  mRandomSeed = mPVInitObj->getUnsignedIntArgument("RandomSeed");
173 
174  mCheckpointer = new Checkpointer(
175  std::string(mName), mCommunicator->getGlobalMPIBlock(), mPVInitObj->getArguments());
176  mCheckpointer->addObserver(this);
177  ioParams(PARAMS_IO_READ);
178  mSimTime = 0.0;
179  mCurrentStep = 0L;
180  mFinalStep = (long int)nearbyint(mStopTime / mDeltaTime);
181  mCheckpointer->provideFinalStep(mFinalStep);
182  mNextProgressTime = 0.0;
183 
184  RandomSeed::instance()->initialize(mRandomSeed);
185  if (getCommunicator()->globalCommRank() == 0) {
186  InfoLog() << "RandomSeed initialized to " << mRandomSeed << ".\n";
187  }
188 
189  mRunTimer = new Timer(mName, "column", "run ");
190  mCheckpointer->registerTimer(mRunTimer);
191  mCheckpointer->registerCheckpointData(
192  mName,
193  "nextProgressTime",
194  &mNextProgressTime,
195  (std::size_t)1,
196  true /*broadcast*/,
197  false /*not constant*/);
198 
199  mCheckpointReadFlag = !mCheckpointer->getCheckpointReadDirectory().empty();
200 
201  // Add layers, connections, etc.
202  for (int k = 1; k < numGroups; k++) { // k = 0 is the HyPerCol itself.
203  const char *kw = mParams->groupKeywordFromIndex(k);
204  const char *name = mParams->groupNameFromIndex(k);
205  if (!strcmp(kw, "HyPerCol")) {
206  if (globalRank() == 0) {
207  std::string paramsFile = initObj->getStringArgument("ParamsFile");
208  ErrorLog() << "Group " << k + 1 << " in params file (\"" << paramsFile
209  << "\") is a HyPerCol; only the first group can be a HyPercol.\n";
210  return PV_FAILURE;
211  }
212  }
213  else {
214  BaseObject *addedObject = nullptr;
215  try {
216  addedObject = Factory::instance()->createByKeyword(kw, name, this);
217  } catch (std::exception const &e) {
218  Fatal() << e.what() << std::endl;
219  }
220  if (addedObject == nullptr) {
221  ErrorLog().printf("Unable to create %s \"%s\".\n", kw, name);
222  return PV_FAILURE;
223  }
224  addObject(addedObject);
225  }
226  } // for-loop over parameter groups
227  return PV_SUCCESS;
228 }
229 
230 void HyPerCol::setDescription() {
231  description = "HyPerCol \"";
232  description.append(getName()).append("\"");
233 }
234 
235 void HyPerCol::ioParams(enum ParamsIOFlag ioFlag) {
236  ioParamsStartGroup(ioFlag, mName);
237  ioParamsFillGroup(ioFlag);
238  ioParamsFinishGroup(ioFlag);
239 }
240 
241 int HyPerCol::ioParamsStartGroup(enum ParamsIOFlag ioFlag, const char *group_name) {
242  if (ioFlag == PARAMS_IO_WRITE && mCheckpointer->getMPIBlock()->getRank() == 0) {
243  pvAssert(mPrintParamsStream);
244  pvAssert(mLuaPrintParamsStream);
245  const char *keyword = mParams->groupKeywordFromName(group_name);
246  mPrintParamsStream->printf("\n");
247  mPrintParamsStream->printf("%s \"%s\" = {\n", keyword, group_name);
248  mLuaPrintParamsStream->printf("%s = {\n", group_name);
249  mLuaPrintParamsStream->printf("groupType = \"%s\";\n", keyword);
250  }
251  return PV_SUCCESS;
252 }
253 
254 int HyPerCol::ioParamsFillGroup(enum ParamsIOFlag ioFlag) {
255  ioParam_dt(ioFlag);
256  ioParam_stopTime(ioFlag);
257  ioParam_progressInterval(ioFlag);
259  mCheckpointer->ioParams(ioFlag, parameters());
261  ioParam_randomSeed(ioFlag);
262  ioParam_nx(ioFlag);
263  ioParam_ny(ioFlag);
264  ioParam_nBatch(ioFlag);
266 
267  return PV_SUCCESS;
268 }
269 
270 int HyPerCol::ioParamsFinishGroup(enum ParamsIOFlag ioFlag) {
271  if (ioFlag == PARAMS_IO_WRITE && mPrintParamsStream != nullptr) {
272  pvAssert(mLuaPrintParamsStream);
273  mPrintParamsStream->printf("};\n");
274  mLuaPrintParamsStream->printf("};\n\n");
275  }
276  return PV_SUCCESS;
277 }
278 
279 void HyPerCol::ioParam_dt(enum ParamsIOFlag ioFlag) {
280  parameters()->ioParamValue(ioFlag, mName, "dt", &mDeltaTime, mDeltaTime);
281 }
282 
283 void HyPerCol::ioParam_stopTime(enum ParamsIOFlag ioFlag) {
284  parameters()->ioParamValue(ioFlag, mName, "stopTime", &mStopTime, mStopTime);
285 }
286 
287 void HyPerCol::ioParam_progressInterval(enum ParamsIOFlag ioFlag) {
288  parameters()->ioParamValue(
289  ioFlag, mName, "progressInterval", &mProgressInterval, mProgressInterval);
290 }
291 
292 void HyPerCol::ioParam_writeProgressToErr(enum ParamsIOFlag ioFlag) {
293  parameters()->ioParamValue(
294  ioFlag, mName, "writeProgressToErr", &mWriteProgressToErr, mWriteProgressToErr);
295 }
296 
297 void HyPerCol::ioParam_printParamsFilename(enum ParamsIOFlag ioFlag) {
298  parameters()->ioParamString(
299  ioFlag, mName, "printParamsFilename", &mPrintParamsFilename, "pv.params");
300  if (mPrintParamsFilename == nullptr || mPrintParamsFilename[0] == '\0') {
301  if (mCheckpointer->getMPIBlock()->getRank() == 0) {
302  ErrorLog().printf("printParamsFilename cannot be null or the empty string.\n");
303  }
304  MPI_Barrier(mCheckpointer->getMPIBlock()->getComm());
305  exit(EXIT_FAILURE);
306  }
307 }
308 
309 void HyPerCol::ioParam_randomSeed(enum ParamsIOFlag ioFlag) {
310  switch (ioFlag) {
311  // randomSeed can be set on the command line, from the params file, or from
312  // the system clock
313  case PARAMS_IO_READ:
314  // set random seed if it wasn't set in the command line
315  if (!mRandomSeed) {
316  if (mParams->present(mName, "randomSeed")) {
317  mRandomSeed = (unsigned long)mParams->value(mName, "randomSeed");
318  }
319  else {
320  mRandomSeed = seedRandomFromWallClock();
321  }
322  }
323  if (mRandomSeed < RandomSeed::minSeed) {
324  Fatal().printf(
325  "Error: random seed %u is too small. Use a seed of at "
326  "least 10000000.\n",
327  mRandomSeed);
328  }
329  break;
330  case PARAMS_IO_WRITE: parameters()->writeParam("randomSeed", mRandomSeed); break;
331  default: assert(0); break;
332  }
333 }
334 
335 void HyPerCol::ioParam_nx(enum ParamsIOFlag ioFlag) {
336  parameters()->ioParamValueRequired(ioFlag, mName, "nx", &mNumXGlobal);
337 }
338 
339 void HyPerCol::ioParam_ny(enum ParamsIOFlag ioFlag) {
340  parameters()->ioParamValueRequired(ioFlag, mName, "ny", &mNumYGlobal);
341 }
342 
343 void HyPerCol::ioParam_nBatch(enum ParamsIOFlag ioFlag) {
344  parameters()->ioParamValue(ioFlag, mName, "nbatch", &mNumBatchGlobal, mNumBatchGlobal);
345  // Make sure numCommBatches is a multiple of mNumBatch specified in the params
346  // file
347  FatalIf(
348  mNumBatchGlobal % mCommunicator->numCommBatches() != 0,
349  "The total number of batches (%d) must be a multiple of the batch "
350  "width (%d)\n",
351  mNumBatchGlobal,
352  mCommunicator->numCommBatches());
353  mNumBatch = mNumBatchGlobal / mCommunicator->numCommBatches();
354 }
355 
356 void HyPerCol::ioParam_errorOnNotANumber(enum ParamsIOFlag ioFlag) {
357  parameters()->ioParamValue(
358  ioFlag, mName, "errorOnNotANumber", &mErrorOnNotANumber, mErrorOnNotANumber);
359 }
360 
362  if (mReadyFlag) {
363  return;
364  }
365 
366  setNumThreads(false);
367  // When we call processParams, the communicateInitInfo stage will run, which
368  // can put out a lot of messages.
369  // So if there's a problem with the -t option setting, the error message can
370  // be hard to find.
371  // Instead of printing the error messages here, we will call setNumThreads a
372  // second time after processParams(), and only then print messages.
373 
374  // processParams function does communicateInitInfo stage, sets up adaptive
375  // time step, and prints params
376  pvAssert(mPrintParamsFilename && mPrintParamsFilename[0]);
377  if (mPrintParamsFilename[0] != '/') {
378  std::string printParamsFilename(mPrintParamsFilename);
379  std::string printParamsPath = mCheckpointer->makeOutputPathFilename(printParamsFilename);
380  processParams(printParamsPath.c_str());
381  }
382  else {
383  // If using absolute path, only global rank 0 writes, to avoid collisions.
384  if (mCheckpointer->getMPIBlock()->getGlobalRank() == 0) {
385  processParams(mPrintParamsFilename);
386  }
387  }
388 
389 #ifdef PV_USE_CUDA
390  // Needs to go between CommunicateInitInfo (called by processParams) and
391  // AllocateDataStructures, because the object's mUsingGPUFlag might not get
392  // set until the communicate stage, but the objects will need to know the
393  // CudaDevice in order to allocate GPU memory.
394  std::string const &gpu_devices = mPVInitObj->getStringArgument("GPUDevices");
395  initializeCUDA(gpu_devices);
396 #endif
397 
398  int thread_status =
399  setNumThreads(true /*now, print messages related to setting number of threads*/);
400  MPI_Barrier(mCommunicator->globalCommunicator());
401  if (thread_status != PV_SUCCESS) {
402  exit(EXIT_FAILURE);
403  }
404 
405 #ifdef PV_USE_OPENMP_THREADS
406  pvAssert(mNumThreads > 0); // setNumThreads should fail if it sets
407  // mNumThreads less than or equal to zero
408  omp_set_num_threads(mNumThreads);
409 #endif // PV_USE_OPENMP_THREADS
410 
411  notifyLoop(std::make_shared<AllocateDataMessage>());
412 
413  notifyLoop(std::make_shared<LayerSetMaxPhaseMessage>(&mNumPhases));
414  mNumPhases++;
415 
416  mPhaseRecvTimers.clear();
417  for (int phase = 0; phase < mNumPhases; phase++) {
418  std::string timerTypeString("phRecv");
419  timerTypeString.append(std::to_string(phase));
420  Timer *phaseRecvTimer = new Timer(mName, "column", timerTypeString.c_str());
421  mPhaseRecvTimers.push_back(phaseRecvTimer);
422  mCheckpointer->registerTimer(phaseRecvTimer);
423  }
424 
425  notifyLoop(std::make_shared<RegisterDataMessage<Checkpointer>>(mCheckpointer));
426 
427 #ifdef DEBUG_OUTPUT
428  InfoLog().printf("[%d]: HyPerCol: running...\n", mCommunicator->globalCommRank());
429  InfoLog().flush();
430 #endif
431 
432  // Initialize the state of each object based on the params file,
433  // and then if reading from checkpoint, call the checkpointer.
434  // This needs to happen after initPublishers so that we can initialize
435  // the values in the data stores, and before the layers' publish calls
436  // so that the data in border regions gets copied correctly.
437  notifyLoop(std::make_shared<InitializeStateMessage>());
438  if (mCheckpointReadFlag) {
439  mCheckpointer->checkpointRead(&mSimTime, &mCurrentStep);
440  }
441  else {
442  mCheckpointer->readStateFromCheckpoint();
443  // readStateFromCheckpoint() does nothing if initializeFromCheckpointDir is empty or null.
444  }
445 // Note: ideally, if checkpointReadFlag is set, calling InitializeState should
446 // be unnecessary. However, currently initializeState does some CUDA kernel
447 // initializations that still need to happen when reading from checkpoint.
448 
449 #ifdef PV_USE_CUDA
450  notifyLoop(std::make_shared<CopyInitialStateToGPUMessage>());
451 #endif // PV_USE_CUDA
452 
453  // Initial normalization moved here to facilitate normalizations of groups
454  // of HyPerConns
455  notifyLoop(std::make_shared<ConnectionNormalizeMessage>());
456  notifyLoop(std::make_shared<ConnectionFinalizeUpdateMessage>(mSimTime, mDeltaTime));
457 
458  // publish initial conditions
459  for (int phase = 0; phase < mNumPhases; phase++) {
460  notifyLoop(std::make_shared<LayerPublishMessage>(phase, mSimTime));
461  }
462 
463  // output initial conditions
464  if (!mCheckpointReadFlag) {
465  notifyLoop(std::make_shared<ConnectionOutputMessage>(mSimTime, mDeltaTime));
466  for (int phase = 0; phase < mNumPhases; phase++) {
467  notifyLoop(std::make_shared<LayerOutputStateMessage>(phase, mSimTime));
468  }
469  }
470  mReadyFlag = true;
471 }
472 
473 // typically called by buildandrun via HyPerCol::run()
474 int HyPerCol::run(double stopTime, double dt) {
475  mStopTime = stopTime;
476  mDeltaTime = dt;
477 
478  allocateColumn();
479  getOutputStream().flush();
480 
481  bool dryRunFlag = mPVInitObj->getBooleanArgument("DryRun");
482  if (dryRunFlag) {
483  return PV_SUCCESS;
484  }
485 
486 #ifdef TIMER_ON
487  Clock runClock;
488  runClock.start_clock();
489 #endif
490 
491  advanceTimeLoop(runClock, 10 /*runClockStartingStep*/);
492 
493  notifyLoop(std::make_shared<CleanupMessage>());
494 
495 #ifdef DEBUG_OUTPUT
496  InfoLog().printf("[%d]: HyPerCol: done...\n", mCommunicator->globalCommRank());
497  InfoLog().flush();
498 #endif
499 
500  mCheckpointer->finalCheckpoint(mSimTime);
501 
502 #ifdef TIMER_ON
503  runClock.stop_clock();
504  if (getCommunicator()->globalCommRank() == 0) {
505  runClock.print_elapsed(getOutputStream());
506  }
507 #endif
508 
509  return PV_SUCCESS;
510 }
511 
512 // This routine sets the mNumThreads member variable. It should only be called
513 // by the run() method,
514 // and only inside the !ready if-statement.
515 // TODO: Instead of using the printMessagesFlag, why not use the same flag that
516 int HyPerCol::setNumThreads(bool printMessagesFlag) {
517  bool printMsgs0 = printMessagesFlag && globalRank() == 0;
518  int thread_status = PV_SUCCESS;
519  int num_threads = 0;
520 #ifdef PV_USE_OPENMP_THREADS
521  int max_threads = mPVInitObj->getMaxThreads();
522  int comm_size = mCommunicator->globalCommSize();
523  if (printMsgs0) {
524  InfoLog().printf(
525  "Maximum number of OpenMP threads%s is %d\n"
526  "Number of MPI processes is %d.\n",
527  comm_size == 1 ? "" : " (over all processes)",
528  max_threads,
529  comm_size);
530  }
531  Configuration::IntOptional numThreadsArg = mPVInitObj->getIntOptionalArgument("NumThreads");
532  if (numThreadsArg.mUseDefault) {
533  num_threads = max_threads / comm_size; // integer arithmetic
534  if (num_threads == 0) {
535  num_threads = 1;
536  if (printMsgs0) {
537  WarnLog().printf(
538  "Warning: more MPI processes than available threads. "
539  "Processors may be oversubscribed.\n");
540  }
541  }
542  }
543  else {
544  num_threads = numThreadsArg.mValue;
545  }
546  if (num_threads > 0) {
547  if (printMsgs0) {
548  InfoLog().printf("Number of threads used is %d\n", num_threads);
549  }
550  }
551  else if (num_threads == 0) {
552  thread_status = PV_FAILURE;
553  if (printMsgs0) {
554  ErrorLog().printf(
555  "%s: number of threads must be positive (was set to zero)\n",
556  mPVInitObj->getProgramName());
557  }
558  }
559  else {
560  assert(num_threads < 0);
561  thread_status = PV_FAILURE;
562  if (printMsgs0) {
563  ErrorLog().printf(
564  "%s was compiled with PV_USE_OPENMP_THREADS; "
565  "therefore the \"-t\" argument is "
566  "required.\n",
567  mPVInitObj->getProgramName());
568  }
569  }
570 #else // PV_USE_OPENMP_THREADS
571  Configuration::IntOptional numThreadsArg = mPVInitObj->getIntOptionalArgument("NumThreads");
572  if (numThreadsArg.mUseDefault) {
573  num_threads = 1;
574  if (printMsgs0) {
575  InfoLog().printf("Number of threads used is 1 (Compiled without OpenMP.\n");
576  }
577  }
578  else {
579  num_threads = numThreadsArg.mValue;
580  if (num_threads < 0) {
581  num_threads = 1;
582  }
583  if (num_threads != 1) {
584  thread_status = PV_FAILURE;
585  }
586  }
587  if (printMsgs0) {
588  if (thread_status != PV_SUCCESS) {
589  ErrorLog().printf(
590  "%s error: PetaVision must be compiled with "
591  "OpenMP to run with threads.\n",
592  mPVInitObj->getProgramName());
593  }
594  }
595 #endif // PV_USE_OPENMP_THREADS
596  mNumThreads = num_threads;
597  return thread_status;
598 }
599 
600 int HyPerCol::processParams(char const *path) {
601  if (!mParamsProcessedFlag) {
602  auto const &objectMap = mObjectHierarchy.getObjectMap();
603  notifyLoop(std::make_shared<CommunicateInitInfoMessage>(objectMap));
604  }
605 
606  // Print a cleaned up version of params to the file given by
607  // printParamsFilename
608  parameters()->warnUnread();
609  if (path != nullptr && path[0] != '\0') {
610  outputParams(path);
611  }
612  else {
613  if (globalRank() == 0) {
614  InfoLog().printf(
615  "HyPerCol \"%s\": path for printing parameters file was "
616  "empty or null.\n",
617  mName);
618  }
619  }
620  mParamsProcessedFlag = true;
621  return PV_SUCCESS;
622 }
623 
624 void HyPerCol::advanceTimeLoop(Clock &runClock, int const runClockStartingStep) {
625  // time loop
626  //
627  long int step = 0;
628  while (mSimTime < mStopTime - mDeltaTime / 2.0) {
629  mCheckpointer->checkpointWrite(mSimTime);
630  advanceTime(mSimTime);
631 
632  step += 1;
633 #ifdef TIMER_ON
634  if (step == runClockStartingStep) {
635  runClock.start_clock();
636  }
637 #endif
638 
639  } // end time loop
640 }
641 
642 int HyPerCol::advanceTime(double sim_time) {
643  if (mSimTime >= mNextProgressTime) {
644  mNextProgressTime += mProgressInterval;
645  if (mCommunicator->globalCommRank() == 0) {
646  std::ostream &progressStream = mWriteProgressToErr ? getErrorStream() : getOutputStream();
647  time_t current_time;
648  time(&current_time);
649  progressStream << " time==" << sim_time << " "
650  << ctime(&current_time); // ctime outputs an newline
651  progressStream.flush();
652  }
653  }
654 
655  mRunTimer->start();
656 
657  // make sure mSimTime is updated even if HyPerCol isn't running time loop
658  // triggerOffset might fail if mSimTime does not advance uniformly because
659  // mSimTime could skip over tigger event
660  // !!!TODO: fix trigger layer to compute mTimeScale so as not to allow
661  // bypassing trigger event
662  mSimTime = sim_time + mDeltaTime;
663 
664  notifyLoop(std::make_shared<AdaptTimestepMessage>());
665 
666  // At this point all activity from the previous time step has
667  // been delivered to the data store.
668  //
669 
670  int status = PV_SUCCESS;
671 
672  // Each layer's phase establishes a priority for updating
673  for (int phase = 0; phase < mNumPhases; phase++) {
674  notifyLoop(std::make_shared<LayerClearProgressFlagsMessage>());
675 
676  // nonblockingLayerUpdate allows for more concurrency than notifyLoop.
677  bool someLayerIsPending = false;
678  bool someLayerHasActed = false;
679 #ifdef PV_USE_CUDA
680  // Ordering needs to go recvGpu, if(recvGpu and upGpu)update, recvNoGpu,
681  // update rest
682  auto recvMessage = std::make_shared<LayerRecvSynapticInputMessage>(
683  phase,
684  mPhaseRecvTimers.at(phase),
685  true /*recvGpuFlag*/,
686  mSimTime,
687  mDeltaTime,
688  &someLayerIsPending,
689  &someLayerHasActed);
690  auto updateMessage = std::make_shared<LayerUpdateStateMessage>(
691  phase,
692  true /*recvGpuFlag*/,
693  true /*updateGpuFlag*/,
694  mSimTime,
695  mDeltaTime,
696  &someLayerIsPending,
697  &someLayerHasActed);
698  nonblockingLayerUpdate(recvMessage, updateMessage);
699 
700  recvMessage = std::make_shared<LayerRecvSynapticInputMessage>(
701  phase,
702  mPhaseRecvTimers.at(phase),
703  false /*recvGpuFlag*/,
704  mSimTime,
705  mDeltaTime,
706  &someLayerIsPending,
707  &someLayerHasActed);
708  updateMessage = std::make_shared<LayerUpdateStateMessage>(
709  phase,
710  false /*recvGpuFlag*/,
711  false /*updateGpuFlag*/,
712  mSimTime,
713  mDeltaTime,
714  &someLayerIsPending,
715  &someLayerHasActed);
716  nonblockingLayerUpdate(recvMessage, updateMessage);
717 
718  if (getDevice() != nullptr) {
719  getDevice()->syncDevice();
720  }
721 
722  // Update for receiving on cpu and updating on gpu
723  nonblockingLayerUpdate(
724  std::make_shared<LayerUpdateStateMessage>(
725  phase,
726  false /*recvOnGpuFlag*/,
727  true /*updateOnGpuFlag*/,
728  mSimTime,
729  mDeltaTime,
730  &someLayerIsPending,
731  &someLayerHasActed));
732 
733  if (getDevice() != nullptr) {
734  getDevice()->syncDevice();
735  notifyLoop(std::make_shared<LayerCopyFromGpuMessage>(phase, mPhaseRecvTimers.at(phase)));
736  }
737 
738  // Update for gpu recv and non gpu update
739  nonblockingLayerUpdate(
740  std::make_shared<LayerUpdateStateMessage>(
741  phase,
742  true /*recvOnGpuFlag*/,
743  false /*updateOnGpuFlag*/,
744  mSimTime,
745  mDeltaTime,
746  &someLayerIsPending,
747  &someLayerHasActed));
748 #else
749  auto recvMessage = std::make_shared<LayerRecvSynapticInputMessage>(
750  phase,
751  mPhaseRecvTimers.at(phase),
752  mSimTime,
753  mDeltaTime,
754  &someLayerIsPending,
755  &someLayerHasActed);
756  auto updateMessage = std::make_shared<LayerUpdateStateMessage>(
757  phase, mSimTime, mDeltaTime, &someLayerIsPending, &someLayerHasActed);
758  nonblockingLayerUpdate(recvMessage, updateMessage);
759 #endif
760  // Rotate DataStore ring buffers
761  notifyLoop(std::make_shared<LayerAdvanceDataStoreMessage>(phase));
762 
763  // copy activity buffer to DataStore, and do MPI exchange.
764  notifyLoop(std::make_shared<LayerPublishMessage>(phase, mSimTime));
765 
766  // Feb 2, 2017: waiting and updating active indices have been moved into
767  // OutputState and CheckNotANumber, where they are called if needed.
768  notifyLoop(std::make_shared<LayerOutputStateMessage>(phase, mSimTime));
769  if (mErrorOnNotANumber) {
770  notifyLoop(std::make_shared<LayerCheckNotANumberMessage>(phase));
771  }
772  }
773 
774  // update the connections (weights)
775  //
776  notifyLoop(std::make_shared<ConnectionUpdateMessage>(mSimTime, mDeltaTime));
777  notifyLoop(std::make_shared<ConnectionNormalizeMessage>());
778  notifyLoop(std::make_shared<ConnectionFinalizeUpdateMessage>(mSimTime, mDeltaTime));
779  notifyLoop(std::make_shared<ConnectionOutputMessage>(mSimTime, mDeltaTime));
780 
781  mRunTimer->stop();
782 
783  notifyLoop(std::make_shared<ColProbeOutputStateMessage>(mSimTime, mDeltaTime));
784 
785  return status;
786 }
787 
788 void HyPerCol::nonblockingLayerUpdate(
789  std::shared_ptr<LayerUpdateStateMessage const> updateMessage) {
790 
791  *(updateMessage->mSomeLayerIsPending) = true;
792  *(updateMessage->mSomeLayerHasActed) = false;
793 
794  long int idleCounter = 0;
795  while (*(updateMessage->mSomeLayerIsPending)) {
796  *(updateMessage->mSomeLayerIsPending) = false;
797  *(updateMessage->mSomeLayerHasActed) = false;
798  notifyLoop(updateMessage);
799 
800  if (!*(updateMessage->mSomeLayerHasActed)) {
801  idleCounter++;
802  }
803  }
804 
805  if (idleCounter > 1L) {
806  InfoLog() << "t = " << mSimTime << ", phase " << updateMessage->mPhase
807 #ifdef PV_USE_CUDA
808  << ", recvGpu" << updateMessage->mRecvOnGpuFlag << ", updateGpu"
809  << updateMessage->mUpdateOnGpuFlag
810 #endif // PV_USE_CUDA
811  << ", idle count " << idleCounter << "\n";
812  }
813 }
814 
815 void HyPerCol::nonblockingLayerUpdate(
816  std::shared_ptr<LayerRecvSynapticInputMessage const> recvMessage,
817  std::shared_ptr<LayerUpdateStateMessage const> updateMessage) {
818 
819  pvAssert(recvMessage->mSomeLayerIsPending == updateMessage->mSomeLayerIsPending);
820  pvAssert(recvMessage->mSomeLayerHasActed == updateMessage->mSomeLayerHasActed);
821 
822  *(updateMessage->mSomeLayerIsPending) = true;
823  *(updateMessage->mSomeLayerHasActed) = false;
824 
825  long int idleCounter = 0;
826  while (*(recvMessage->mSomeLayerIsPending)) {
827  *(updateMessage->mSomeLayerIsPending) = false;
828  *(updateMessage->mSomeLayerHasActed) = false;
829  notifyLoop(recvMessage);
830  notifyLoop(updateMessage);
831 
832  if (!*(updateMessage->mSomeLayerHasActed)) {
833  idleCounter++;
834  }
835  }
836 
837  if (idleCounter > 1L) {
838  InfoLog() << "t = " << mSimTime << ", phase " << updateMessage->mPhase
839 #ifdef PV_USE_CUDA
840  << ", recvGpu" << updateMessage->mRecvOnGpuFlag << ", updateGpu"
841  << updateMessage->mUpdateOnGpuFlag
842 #endif // PV_USE_CUDA
843  << ", idle count " << idleCounter << "\n";
844  }
845 }
846 
847 Response::Status HyPerCol::respond(std::shared_ptr<BaseMessage const> message) {
848  if (auto castMessage = std::dynamic_pointer_cast<PrepareCheckpointWriteMessage const>(message)) {
849  return respondPrepareCheckpointWrite(castMessage);
850  }
851  else {
852  return Response::SUCCESS;
853  }
854 }
855 
856 Response::Status HyPerCol::respondPrepareCheckpointWrite(
857  std::shared_ptr<PrepareCheckpointWriteMessage const> message) {
858  std::string path(message->mDirectory);
859  path.append("/").append("pv.params");
860  outputParams(path.c_str());
861  return Response::SUCCESS;
862 }
863 
864 void HyPerCol::outputParams(char const *path) {
865  assert(path != nullptr && path[0] != '\0');
866  int rank = mCheckpointer->getMPIBlock()->getRank();
867  assert(mPrintParamsStream == nullptr);
868  char *tmp = strdup(path); // duplicate string since dirname() is allowed to modify its argument
869  if (tmp == nullptr) {
870  Fatal().printf("HyPerCol::outputParams unable to allocate memory: %s\n", strerror(errno));
871  }
872  char *containingdir = dirname(tmp);
873  ensureDirExists(mCheckpointer->getMPIBlock(), containingdir);
874  free(tmp);
875  if (rank == 0) {
876  mPrintParamsStream = new FileStream(path, std::ios_base::out, getVerifyWrites());
877  // Get new lua path
878  std::string luaPath(path);
879  luaPath.append(".lua");
880  char luapath[PV_PATH_MAX];
881  mLuaPrintParamsStream =
882  new FileStream(luaPath.c_str(), std::ios_base::out, getVerifyWrites());
883  parameters()->setPrintParamsStream(mPrintParamsStream);
884  parameters()->setPrintLuaStream(mLuaPrintParamsStream);
885 
886  // Params file output
887  outputParamsHeadComments(mPrintParamsStream, "//");
888 
889  // Lua file output
890  outputParamsHeadComments(mLuaPrintParamsStream, "--");
891  // Load util module based on PVPath
892  mLuaPrintParamsStream->printf(
893  "package.path = package.path .. \";\" .. \"" PV_DIR "/../parameterWrapper/?.lua\"\n");
894  mLuaPrintParamsStream->printf("local pv = require \"PVModule\"\n\n");
895  mLuaPrintParamsStream->printf(
896  "NULL = function() end; -- to allow string parameters to be set to NULL\n\n");
897  mLuaPrintParamsStream->printf("-- Base table variable to store\n");
898  mLuaPrintParamsStream->printf("local pvParameters = {\n");
899  }
900 
901  // Parent HyPerCol params
902  ioParams(PARAMS_IO_WRITE);
903 
904  // Splitting this up into five messages for backwards compatibility in preserving the order.
905  // If order preservation is not needed here, it would be better to replace with a single
906  // message that all five types respond to.
907  notifyLoop(std::make_shared<LayerWriteParamsMessage>());
908  notifyLoop(std::make_shared<ConnectionWriteParamsMessage>());
909  notifyLoop(std::make_shared<ColProbeWriteParamsMessage>());
910  notifyLoop(std::make_shared<LayerProbeWriteParamsMessage>());
911  notifyLoop(std::make_shared<ConnectionProbeWriteParamsMessage>());
912 
913  if (rank == 0) {
914  mLuaPrintParamsStream->printf("} --End of pvParameters\n");
915  mLuaPrintParamsStream->printf(
916  "\n-- Print out PetaVision approved parameter file to the console\n");
917  mLuaPrintParamsStream->printf("paramsFileString = pv.createParamsFileString(pvParameters)\n");
918  mLuaPrintParamsStream->printf("io.write(paramsFileString)\n");
919  }
920 
921  if (mPrintParamsStream) {
922  delete mPrintParamsStream;
923  mPrintParamsStream = nullptr;
924  parameters()->setPrintParamsStream(mPrintParamsStream);
925  }
926  if (mLuaPrintParamsStream) {
927  delete mLuaPrintParamsStream;
928  mLuaPrintParamsStream = nullptr;
929  parameters()->setPrintLuaStream(mLuaPrintParamsStream);
930  }
931 }
932 
933 void HyPerCol::outputParamsHeadComments(FileStream *fileStream, char const *commentToken) {
934  time_t t = time(nullptr);
935  fileStream->printf("%s PetaVision, " PV_GIT_REVISION "\n", commentToken);
936  fileStream->printf("%s Run time %s", commentToken, ctime(&t)); // output of ctime contains \n
937 #ifdef PV_USE_MPI
938  MPIBlock const *mpiBlock = mCheckpointer->getMPIBlock();
939 
940  fileStream->printf(
941  "%s Compiled with Open MPI %d.%d.%d (MPI Standard %d.%d).\n",
942  commentToken,
943  OMPI_MAJOR_VERSION,
944  OMPI_MINOR_VERSION,
945  OMPI_RELEASE_VERSION,
946  MPI_VERSION,
947  MPI_SUBVERSION);
948  fileStream->printf(
949  "%s MPI configuration has %d rows, %d columns, and batch dimension %d.\n",
950  commentToken,
951  mpiBlock->getGlobalNumRows(),
952  mpiBlock->getGlobalNumColumns(),
953  mpiBlock->getGlobalBatchDimension());
954  if (mpiBlock->getNumRows() < mpiBlock->getGlobalNumRows()
955  or mpiBlock->getNumColumns() < mpiBlock->getGlobalNumColumns()
956  or mpiBlock->getBatchDimension() < mpiBlock->getGlobalBatchDimension()) {
957  fileStream->printf(
958  "%s CheckpointCells have %d rows, %d columns, and batch dimension %d.\n",
959  commentToken,
960  mpiBlock->getNumRows(),
961  mpiBlock->getNumColumns(),
962  mpiBlock->getBatchDimension());
963  }
964 #else // PV_USE_MPI
965  fileStream->printf("%s Compiled without MPI.\n", commentToken);
966 #endif // PV_USE_MPI
967 #ifdef PV_USE_CUDA
968  int const cudaMajor = CUDA_VERSION / 1000;
969  int const cudaMinor = (CUDA_VERSION % 1000) / 10;
970  int const cudnnMajor = CUDNN_MAJOR;
971  int const cudnnMinor = CUDNN_MINOR;
972  int const cudnnPatch = CUDNN_PATCHLEVEL;
973  fileStream->printf(
974  "%s Compiled with CUDA version %d.%d; cuDNN version %d.%d.%d\n",
975  commentToken,
976  cudaMajor,
977  cudaMinor,
978  cudnnMajor,
979  cudnnMinor,
980  cudnnPatch);
981 #else
982  fileStream->printf("%s Compiled without CUDA.\n", commentToken);
983 #endif
984 #ifdef PV_USE_OPENMP_THREADS
985  std::string openmpVersion;
986  switch (_OPENMP) {
987  case 201511: openmpVersion = "4.5"; break;
988  case 201307: openmpVersion = "4.0"; break;
989  case 201107: openmpVersion = "3.1"; break;
990  case 200805: openmpVersion = "3.0"; break;
991  default: openmpVersion = "is unrecognized"; break;
992  }
993  fileStream->printf(
994  "%s Compiled with OpenMP parallel code, API version %s (%06d) ",
995  commentToken,
996  openmpVersion.c_str(),
997  _OPENMP);
998  if (mNumThreads > 0) {
999  fileStream->printf("and run using %d threads.\n", mNumThreads);
1000  }
1001  else if (mNumThreads == 0) {
1002  fileStream->printf("but number of threads was set to zero (error).\n");
1003  }
1004  else {
1005  fileStream->printf("but the -t option was not specified.\n");
1006  }
1007 #else
1008  fileStream->printf("%s Compiled without OpenMP parallel code ", commentToken);
1009  if (mNumThreads == 1) {
1010  fileStream->printf(".\n");
1011  }
1012  else if (mNumThreads == 0) {
1013  fileStream->printf("but number of threads was set to zero (error).\n");
1014  }
1015  else {
1016  fileStream->printf(
1017  "but number of threads specified was %d instead of 1. (error).\n", mNumThreads);
1018  }
1019 #endif // PV_USE_OPENMP_THREADS
1020  if (mCheckpointReadFlag) {
1021  fileStream->printf(
1022  "%s Started from checkpoint \"%s\"\n",
1023  commentToken,
1024  mCheckpointer->getCheckpointReadDirectory().c_str());
1025  }
1026 }
1027 
1028 int HyPerCol::getAutoGPUDevice() {
1029  int returnGpuIdx = -1;
1030 #ifdef PV_USE_CUDA
1031  int mpiRank = mCommunicator->globalCommRank();
1032  int numMpi = mCommunicator->globalCommSize();
1033  char hostNameStr[PV_PATH_MAX];
1034  gethostname(hostNameStr, PV_PATH_MAX);
1035  size_t hostNameLen = strlen(hostNameStr) + 1; //+1 for null terminator
1036 
1037  // Each rank communicates which host it is on
1038  // Root process
1039  if (mpiRank == 0) {
1040  // Allocate data structure for rank to host
1041  char rankToHost[numMpi][PV_PATH_MAX];
1042  assert(rankToHost);
1043  // Allocate data structure for rank to maxGpu
1044  int rankToMaxGpu[numMpi];
1045  // Allocate final data structure for rank to GPU index
1046  int rankToGpu[numMpi];
1047  assert(rankToGpu);
1048 
1049  for (int rank = 0; rank < numMpi; rank++) {
1050  if (rank == 0) {
1051  strcpy(rankToHost[rank], hostNameStr);
1052  rankToMaxGpu[rank] = PVCuda::CudaDevice::getNumDevices();
1053  }
1054  else {
1055  MPI_Recv(
1056  rankToHost[rank],
1057  PV_PATH_MAX,
1058  MPI_CHAR,
1059  rank,
1060  0,
1061  mCommunicator->globalCommunicator(),
1062  MPI_STATUS_IGNORE);
1063  MPI_Recv(
1064  &(rankToMaxGpu[rank]),
1065  1,
1066  MPI_INT,
1067  rank,
1068  0,
1069  mCommunicator->globalCommunicator(),
1070  MPI_STATUS_IGNORE);
1071  }
1072  }
1073 
1074  // rankToHost now is an array such that the index is the rank, and the value
1075  // is the host
1076  // Convert to a map of vectors, such that the key is the host name and the
1077  // value
1078  // is a vector of mpi ranks that is running on that host
1079  std::map<std::string, std::vector<int>> hostMap;
1080  for (int rank = 0; rank < numMpi; rank++) {
1081  hostMap[std::string(rankToHost[rank])].push_back(rank);
1082  }
1083 
1084  // Determine what gpus to use per mpi
1085  for (auto &host : hostMap) {
1086  std::vector<int> rankVec = host.second;
1087  int numRanksPerHost = rankVec.size();
1088  assert(numRanksPerHost > 0);
1089  // Grab maxGpus of current host
1090  int maxGpus = rankToMaxGpu[rankVec[0]];
1091  // Warnings for overloading/underloading gpus
1092  if (numRanksPerHost != maxGpus) {
1093  WarnLog(assignGpuWarning);
1094  assignGpuWarning.printf(
1095  "HyPerCol::getAutoGPUDevice: Host \"%s\" (rank[s] ", host.first.c_str());
1096  for (int v_i = 0; v_i < numRanksPerHost; v_i++) {
1097  if (v_i != numRanksPerHost - 1) {
1098  assignGpuWarning.printf("%d, ", rankVec[v_i]);
1099  }
1100  else {
1101  assignGpuWarning.printf("%d", rankVec[v_i]);
1102  }
1103  }
1104  assignGpuWarning.printf(
1105  ") is being %s, with %d mpi processes mapped to %d total GPU[s]\n",
1106  numRanksPerHost < maxGpus ? "underloaded" : "overloaded",
1107  numRanksPerHost,
1108  maxGpus);
1109  }
1110 
1111  // Match a rank to a gpu
1112  for (int v_i = 0; v_i < numRanksPerHost; v_i++) {
1113  rankToGpu[rankVec[v_i]] = v_i % maxGpus;
1114  }
1115  }
1116 
1117  // MPI sends to each process to specify which gpu the rank should use
1118  for (int rank = 0; rank < numMpi; rank++) {
1119  InfoLog() << "Rank " << rank << " on host \"" << rankToHost[rank] << "\" ("
1120  << rankToMaxGpu[rank] << " GPU[s]) using GPU index " << rankToGpu[rank] << "\n";
1121  if (rank == 0) {
1122  returnGpuIdx = rankToGpu[rank];
1123  }
1124  else {
1125  MPI_Send(&(rankToGpu[rank]), 1, MPI_INT, rank, 0, mCommunicator->globalCommunicator());
1126  }
1127  }
1128  }
1129  // Non root process
1130  else {
1131  // Send host name
1132  MPI_Send(hostNameStr, hostNameLen, MPI_CHAR, 0, 0, mCommunicator->globalCommunicator());
1133  // Send max gpus for that host
1134  int maxGpu = PVCuda::CudaDevice::getNumDevices();
1135  MPI_Send(&maxGpu, 1, MPI_INT, 0, 0, mCommunicator->globalCommunicator());
1136  // Recv gpu idx
1137  MPI_Recv(
1138  &(returnGpuIdx),
1139  1,
1140  MPI_INT,
1141  0,
1142  0,
1143  mCommunicator->globalCommunicator(),
1144  MPI_STATUS_IGNORE);
1145  }
1146  assert(returnGpuIdx >= 0 && returnGpuIdx < PVCuda::CudaDevice::getNumDevices());
1147 #else
1148  // This function should never be called when not running with GPUs
1149  assert(false);
1150 #endif
1151  return returnGpuIdx;
1152 }
1153 
1154 #ifdef PV_USE_CUDA
1155 void HyPerCol::initializeCUDA(std::string const &in_device) {
1156  // Don't do anything unless some object needs CUDA.
1157  bool needGPU = false;
1158  auto &objectMap = mObjectHierarchy.getObjectMap();
1159  for (auto &obj : objectMap) {
1160  Observer *observer = obj.second;
1161  BaseObject *object = dynamic_cast<BaseObject *>(observer);
1162  pvAssert(object); // Only addObject(BaseObject*) can change the hierarchy.
1163  if (object->isUsingGPU()) {
1164  needGPU = true;
1165  break;
1166  }
1167  }
1168  if (!needGPU) {
1169  return;
1170  }
1171 
1172  int numMpi = mCommunicator->globalCommSize();
1173  int device;
1174 
1175  // default value
1176  if (in_device.empty()) {
1177  if (getCommunicator()->globalCommRank() == 0) {
1178  InfoLog() << "Auto-assigning GPUs\n";
1179  }
1180  device = getAutoGPUDevice();
1181  }
1182  else {
1183  std::vector<int> deviceVec;
1184  std::stringstream ss(in_device);
1185  std::string stoken;
1186  // Grabs strings from ss into item, separated by commas
1187  while (std::getline(ss, stoken, ',')) {
1188  // Convert stoken to integer
1189  for (auto &ch : stoken) {
1190  if (!isdigit(ch)) {
1191  Fatal().printf(
1192  "Device specification error: %s contains "
1193  "unrecognized characters. Must be "
1194  "comma separated integers greater or equal to 0 "
1195  "with no other characters "
1196  "allowed (including spaces).\n",
1197  in_device.c_str());
1198  }
1199  }
1200  deviceVec.push_back(atoi(stoken.c_str()));
1201  }
1202  // Check length of deviceVec
1203  // Allowed cases are 1 device specified or greater than or equal to number
1204  // of mpi processes
1205  // devices specified
1206  if (deviceVec.size() == 1) {
1207  device = deviceVec[0];
1208  }
1209  else if (deviceVec.size() >= numMpi) {
1210  device = deviceVec[mCommunicator->globalCommRank()];
1211  }
1212  else {
1213  Fatal().printf(
1214  "Device specification error: Number of devices "
1215  "specified (%zu) must be either 1 or "
1216  ">= than number of mpi processes (%d).\n",
1217  deviceVec.size(),
1218  numMpi);
1219  }
1220  InfoLog() << "Global MPI Process " << mCommunicator->globalCommRank() << " using device "
1221  << device << "\n";
1222  }
1223 
1224  int globalSize = mCommunicator->globalCommSize();
1225  for (int r = 0; r < globalSize; r++) {
1226  if (r == globalRank()) {
1227  mCudaDevice = new PVCuda::CudaDevice(device);
1228  }
1229  MPI_Barrier(mCommunicator->globalCommunicator());
1230  }
1231 
1232  // Only print rank for comm rank 0
1233  if (globalRank() == 0) {
1234  mCudaDevice->query_device_info();
1235  }
1236 
1237  // Broadcast the pointer to the CUDA device to the hierarchy
1238  notifyLoop(std::make_shared<SetCudaDeviceMessage>(mCudaDevice));
1239 }
1240 
1241 int HyPerCol::finalizeCUDA() {
1242  delete mCudaDevice;
1243  return 0;
1244 }
1245 
1246 #endif // PV_USE_CUDA
1247 
1248 void HyPerCol::addObject(BaseObject *obj) {
1249  bool succeeded = mObjectHierarchy.addObject(obj->getName(), obj);
1250  FatalIf(!succeeded, "Adding %s failed.\n", getDescription_c());
1251 }
1252 
1253 Observer *HyPerCol::getObjectFromName(std::string const &objectName) const {
1254  auto &objectMap = mObjectHierarchy.getObjectMap();
1255  auto search = objectMap.find(objectName);
1256  return search == objectMap.end() ? nullptr : search->second;
1257 }
1258 
1259 Observer *HyPerCol::getNextObject(Observer const *currentObject) const {
1260  if (mObjectHierarchy.getObjectVector().empty()) {
1261  if (currentObject != nullptr) {
1262  throw std::domain_error("HyPerCol::getNextObject called with empty hierarchy");
1263  }
1264  else {
1265  return nullptr;
1266  }
1267  }
1268  else {
1269  auto objectVector = mObjectHierarchy.getObjectVector();
1270  if (currentObject == nullptr) {
1271  return objectVector[0];
1272  }
1273  else {
1274  for (auto iterator = objectVector.begin(); iterator != objectVector.end(); iterator++) {
1275  Observer *object = *iterator;
1276  if (object == currentObject) {
1277  iterator++;
1278  return iterator == objectVector.end() ? nullptr : *iterator;
1279  }
1280  }
1281  throw std::domain_error("HyPerCol::getNextObject argument not in hierarchy");
1282  }
1283  }
1284 }
1285 
1286 unsigned int HyPerCol::seedRandomFromWallClock() {
1287  unsigned long t = 0UL;
1288  int rootproc = 0;
1289  if (mCommunicator->globalCommRank() == rootproc) {
1290  t = time((time_t *)nullptr);
1291  }
1292  MPI_Bcast(&t, 1, MPI_UNSIGNED, rootproc, mCommunicator->globalCommunicator());
1293  return t;
1294 }
1295 
1296 } // PV namespace
int getGlobalNumColumns() const
Definition: MPIBlock.hpp:115
void allocateColumn()
Definition: HyPerCol.cpp:361
Observer * getNextObject(Observer const *currentObject) const
Definition: HyPerCol.cpp:1259
virtual void ioParam_writeProgressToErr(enum ParamsIOFlag ioFlag)
writeProgressToErr: Whether to print timestep progress to the error stream instead of the output stre...
Definition: HyPerCol.cpp:292
bool isUsingGPU() const
Definition: BaseObject.hpp:116
int getNumColumns() const
Definition: MPIBlock.hpp:130
int present(const char *groupName, const char *paramName)
Definition: PVParams.cpp:1254
virtual void ioParam_randomSeed(enum ParamsIOFlag ioFlag)
randomSeed: The seed for the random number generator for reproducability
Definition: HyPerCol.cpp:309
virtual void addObserver(Observer *observer) override
int getNumRows() const
Definition: MPIBlock.hpp:125
virtual void ioParam_dt(enum ParamsIOFlag ioFlag)
dt: The default delta time to use.
Definition: HyPerCol.cpp:279
Arguments const * getArguments() const
Definition: PV_Init.hpp:88
virtual void ioParam_stopTime(enum ParamsIOFlag ioFlag)
mStopTime: The set stopping time for the run
Definition: HyPerCol.cpp:283
MPI_Comm getComm() const
Definition: MPIBlock.hpp:90
double value(const char *groupName, const char *paramName)
Definition: PVParams.cpp:1270
int setNumThreads(bool printMessagesFlag)
Definition: HyPerCol.cpp:516
int getRank() const
Definition: MPIBlock.hpp:100
int getGlobalBatchDimension() const
Definition: MPIBlock.hpp:120
virtual void ioParam_errorOnNotANumber(enum ParamsIOFlag ioFlag)
errorOnNotANumber: Specifies if the run should check on each timestep for nans in activity...
Definition: HyPerCol.cpp:356
virtual void ioParam_ny(enum ParamsIOFlag ioFlag)
ny: Specifies the size of the column
Definition: HyPerCol.cpp:339
int getGlobalRank() const
Definition: MPIBlock.hpp:105
int getBatchDimension() const
Definition: MPIBlock.hpp:135
int getGlobalNumRows() const
Definition: MPIBlock.hpp:110
virtual void ioParam_nBatch(enum ParamsIOFlag ioFlag)
ny: Specifies the batch size of the column
Definition: HyPerCol.cpp:343
PVParams * getParams()
Definition: PV_Init.hpp:114
virtual void ioParam_nx(enum ParamsIOFlag ioFlag)
nx: Specifies the size of the column
Definition: HyPerCol.cpp:335
int getMaxThreads() const
Definition: PV_Init.hpp:220
virtual void ioParam_progressInterval(enum ParamsIOFlag ioFlag)
mProgressInterval: Specifies how often a progress report prints out
Definition: HyPerCol.cpp:287
Observer * getObjectFromName(std::string const &objectName) const
Definition: HyPerCol.cpp:1253
virtual void ioParam_printParamsFilename(enum ParamsIOFlag ioFlag)
mPrintParamsFilename: Specifies the output mParams filename.
Definition: HyPerCol.cpp:297
std::string makeOutputPathFilename(std::string const &path)
char const * getProgramName() const
Definition: PV_Init.hpp:82