PetaVision  Alpha
Publisher.cpp
1 /*
2  * Publisher.cpp
3  *
4  * Created on: Jul 19, 2016
5  * Author: pschultz
6  */
7 
8 #include "Publisher.hpp"
9 #include "checkpointing/CheckpointEntryDataStore.hpp"
10 #include "include/pv_common.h"
11 #include "utils/PVAssert.hpp"
12 
13 namespace PV {
14 
15 Publisher::Publisher(MPIBlock const &mpiBlock, PVLayerCube *cube, int numLevels, bool isSparse) {
16  this->mLayerCube = cube;
17 
18  int const numBuffers = cube->loc.nbatch;
19  int const numItems = cube->numItems / numBuffers; // number of items in one batch element.
20 
21  store = new DataStore(numBuffers, numItems, numLevels, isSparse);
22 
23  mBorderExchanger = new BorderExchange(mpiBlock, cube->loc);
24 
25  mpiRequestsBuffer = new RingBuffer<std::vector<MPI_Request>>(numLevels, 1);
26  for (int l = 0; l < numLevels; l++) {
27  auto *v = mpiRequestsBuffer->getBuffer(l, 0);
28  v->clear();
29  v->reserve((NUM_NEIGHBORHOOD - 1) * numBuffers);
30  }
31 }
32 
33 Publisher::~Publisher() {
34  for (int l = 0; l < mpiRequestsBuffer->getNumLevels(); l++) {
35  wait(l);
36  }
37  delete mpiRequestsBuffer;
38  delete store;
39  delete mBorderExchanger;
40 }
41 
42 void Publisher::checkpointDataStore(
43  Checkpointer *checkpointer,
44  char const *objectName,
45  char const *bufferName) {
46  bool registerSucceeded = checkpointer->registerCheckpointEntry(
47  std::make_shared<CheckpointEntryDataStore>(
48  objectName, bufferName, checkpointer->getMPIBlock(), store, &mLayerCube->loc),
49  false /*not constant*/);
50 }
51 
52 void Publisher::updateAllActiveIndices() {
53  if (store->isSparse()) {
54  for (int l = 0; l < store->getNumLevels(); l++) {
55  updateActiveIndices(l);
56  }
57  }
58 }
59 
61  wait(delay);
62  return store->createCube(mLayerCube->loc, delay);
63 }
64 
65 void Publisher::updateActiveIndices(int delay) {
66  if (store->isSparse()) {
67  for (int b = 0; b < store->getNumBuffers(); b++) {
68  // Active indicies stored as local extended values
69  if (*store->numActiveBuffer(b, delay) < 0L) {
70  store->updateActiveIndices(b, delay);
71  }
72  pvAssert(*store->numActiveBuffer(b, delay) >= 0L);
73  }
74  }
75 }
76 
77 int Publisher::publish(double lastUpdateTime) {
78  //
79  // Everyone publishes border region to neighbors even if no subscribers.
80  // This means that everyone should wait as well.
81  //
82 
83  size_t dataSize = mLayerCube->numItems * sizeof(float);
84 
85  float const *sendBuf = mLayerCube->data;
86  float *recvBuf = recvBuffer(0); // Grab all of the buffer, allocated continuously
87 
88  memcpy(recvBuf, sendBuf, dataSize);
89  exchangeBorders(&mLayerCube->loc, 0);
90  store->setLastUpdateTime(0 /*bufferId*/, lastUpdateTime);
91 
92  for (int b = 0; b < store->getNumBuffers(); b++) {
93  store->markActiveIndicesOutOfSync(b, 0);
94  }
95  // Updating active indices is done after MPI wait in HyPerCol
96  // to avoid race condition because exchangeBorders mpi is async
97 
98  return PV_SUCCESS;
99 }
100 
101 void Publisher::copyForward(double lastUpdateTime) {
102  if (store->getNumLevels() > 1) {
103  float *recvBuf = recvBuffer(0); // Grab all of the buffer, allocated continuously
104  size_t dataSize = mLayerCube->numItems * sizeof(float);
105  memcpy(recvBuf, recvBuffer(0 /*bufferId*/, 1), dataSize);
106  store->setLastUpdateTime(0 /*bufferId*/, lastUpdateTime);
107  updateActiveIndices(0); // alternately, could copy active indices forward as well.
108  }
109 }
110 
111 int Publisher::exchangeBorders(const PVLayerLoc *loc, int delay /*default 0*/) {
112  PVHalo const *halo = &loc->halo;
113  if (halo->lt == 0 && halo->rt == 0 && halo->dn == 0 && halo->up == 0) {
114  return PV_SUCCESS;
115  }
116  int status = PV_SUCCESS;
117 
118 #ifdef PV_USE_MPI
119  auto *requestsVector = mpiRequestsBuffer->getBuffer(delay, 0);
120  pvAssert(requestsVector->empty());
121 
122  // Loop through batch.
123  // The loop over batch elements probably belongs inside
124  // BorderExchange::exchange(), but for this to happen, exchange() would need
125  // to know how its data argument is organized with respect to batching.
126  int exchangeVectorSize = 2 * (mBorderExchanger->getNumNeighbors() - 1);
127  for (int b = 0; b < loc->nbatch; b++) {
128  // don't send interior
129  pvAssert(requestsVector->size() == b * exchangeVectorSize);
130 
131  float *data = recvBuffer(b, delay);
132  std::vector<MPI_Request> batchElementMPIRequest{};
133  mBorderExchanger->exchange(data, batchElementMPIRequest);
134  pvAssert(batchElementMPIRequest.size() == exchangeVectorSize);
135  requestsVector->insert(
136  requestsVector->end(), batchElementMPIRequest.begin(), batchElementMPIRequest.end());
137  pvAssert(requestsVector->size() == (b + 1) * exchangeVectorSize);
138  }
139 
140 #endif // PV_USE_MPI
141 
142  return status;
143 }
144 
145 int Publisher::isExchangeFinished(int delay /* default 0*/) {
146  bool isReady;
147  auto *requestsVector = mpiRequestsBuffer->getBuffer(delay, 0);
148  if (requestsVector->empty()) {
149  isReady = true;
150  }
151  else {
152  int test;
153  MPI_Testall((int)requestsVector->size(), requestsVector->data(), &test, MPI_STATUSES_IGNORE);
154  if (test) {
155  requestsVector->clear();
156  updateActiveIndices(delay);
157  }
158  isReady = (bool)test;
159  }
160  return isReady;
161 }
162 
166 int Publisher::wait(int delay /*default 0*/) {
167 #ifdef DEBUG_OUTPUT
168  InfoLog().printf(
169  "[%2d]: waiting for data, num_requests==%d\n", mBorderExchanger->getRank(), numRemote);
170  InfoLog().flush();
171 #endif // DEBUG_OUTPUT
172 
173  auto *requestsVector = mpiRequestsBuffer->getBuffer(delay, 0);
174  if (!requestsVector->empty()) {
175  mBorderExchanger->wait(*requestsVector);
176  pvAssert(requestsVector->empty());
177  }
178  updateActiveIndices(delay);
179 
180  return 0;
181 }
182 
183 void Publisher::increaseTimeLevel() {
184  wait(mpiRequestsBuffer->getNumLevels() - 1);
185  mpiRequestsBuffer->newLevel();
186  store->newLevelIndex();
187 }
188 
189 } /* namespace PV */
PVLayerCube createCube(int delay=0)
Definition: Publisher.cpp:60
int wait(int delay=0)
Definition: Publisher.cpp:166
PVLayerCube createCube(PVLayerLoc const &loc, int delay)
Definition: DataStore.cpp:68
int publish(double lastUpdateTime)
Definition: Publisher.cpp:77
void copyForward(double lastUpdateTime)
Definition: Publisher.cpp:101