8 #include "Publisher.hpp" 9 #include "checkpointing/CheckpointEntryDataStore.hpp" 10 #include "include/pv_common.h" 11 #include "utils/PVAssert.hpp" 15 Publisher::Publisher(MPIBlock
const &mpiBlock,
PVLayerCube *cube,
int numLevels,
bool isSparse) {
16 this->mLayerCube = cube;
18 int const numBuffers = cube->loc.nbatch;
19 int const numItems = cube->numItems / numBuffers;
21 store =
new DataStore(numBuffers, numItems, numLevels, isSparse);
23 mBorderExchanger =
new BorderExchange(mpiBlock, cube->loc);
25 mpiRequestsBuffer =
new RingBuffer<std::vector<MPI_Request>>(numLevels, 1);
26 for (
int l = 0; l < numLevels; l++) {
27 auto *v = mpiRequestsBuffer->getBuffer(l, 0);
29 v->reserve((NUM_NEIGHBORHOOD - 1) * numBuffers);
33 Publisher::~Publisher() {
34 for (
int l = 0; l < mpiRequestsBuffer->getNumLevels(); l++) {
37 delete mpiRequestsBuffer;
39 delete mBorderExchanger;
42 void Publisher::checkpointDataStore(
43 Checkpointer *checkpointer,
44 char const *objectName,
45 char const *bufferName) {
46 bool registerSucceeded = checkpointer->registerCheckpointEntry(
47 std::make_shared<CheckpointEntryDataStore>(
48 objectName, bufferName, checkpointer->getMPIBlock(), store, &mLayerCube->loc),
52 void Publisher::updateAllActiveIndices() {
53 if (store->isSparse()) {
54 for (
int l = 0; l < store->getNumLevels(); l++) {
55 updateActiveIndices(l);
62 return store->
createCube(mLayerCube->loc, delay);
65 void Publisher::updateActiveIndices(
int delay) {
66 if (store->isSparse()) {
67 for (
int b = 0; b < store->getNumBuffers(); b++) {
69 if (*store->numActiveBuffer(b, delay) < 0L) {
70 store->updateActiveIndices(b, delay);
72 pvAssert(*store->numActiveBuffer(b, delay) >= 0L);
83 size_t dataSize = mLayerCube->numItems *
sizeof(float);
85 float const *sendBuf = mLayerCube->data;
86 float *recvBuf = recvBuffer(0);
88 memcpy(recvBuf, sendBuf, dataSize);
89 exchangeBorders(&mLayerCube->loc, 0);
90 store->setLastUpdateTime(0 , lastUpdateTime);
92 for (
int b = 0; b < store->getNumBuffers(); b++) {
93 store->markActiveIndicesOutOfSync(b, 0);
102 if (store->getNumLevels() > 1) {
103 float *recvBuf = recvBuffer(0);
104 size_t dataSize = mLayerCube->numItems *
sizeof(float);
105 memcpy(recvBuf, recvBuffer(0 , 1), dataSize);
106 store->setLastUpdateTime(0 , lastUpdateTime);
107 updateActiveIndices(0);
111 int Publisher::exchangeBorders(
const PVLayerLoc *loc,
int delay ) {
112 PVHalo const *halo = &loc->halo;
113 if (halo->lt == 0 && halo->rt == 0 && halo->dn == 0 && halo->up == 0) {
116 int status = PV_SUCCESS;
119 auto *requestsVector = mpiRequestsBuffer->getBuffer(delay, 0);
120 pvAssert(requestsVector->empty());
126 int exchangeVectorSize = 2 * (mBorderExchanger->getNumNeighbors() - 1);
127 for (
int b = 0; b < loc->nbatch; b++) {
129 pvAssert(requestsVector->size() == b * exchangeVectorSize);
131 float *data = recvBuffer(b, delay);
132 std::vector<MPI_Request> batchElementMPIRequest{};
133 mBorderExchanger->exchange(data, batchElementMPIRequest);
134 pvAssert(batchElementMPIRequest.size() == exchangeVectorSize);
135 requestsVector->insert(
136 requestsVector->end(), batchElementMPIRequest.begin(), batchElementMPIRequest.end());
137 pvAssert(requestsVector->size() == (b + 1) * exchangeVectorSize);
145 int Publisher::isExchangeFinished(
int delay ) {
147 auto *requestsVector = mpiRequestsBuffer->getBuffer(delay, 0);
148 if (requestsVector->empty()) {
153 MPI_Testall((
int)requestsVector->size(), requestsVector->data(), &test, MPI_STATUSES_IGNORE);
155 requestsVector->clear();
156 updateActiveIndices(delay);
158 isReady = (bool)test;
169 "[%2d]: waiting for data, num_requests==%d\n", mBorderExchanger->getRank(), numRemote);
171 #endif // DEBUG_OUTPUT 173 auto *requestsVector = mpiRequestsBuffer->getBuffer(delay, 0);
174 if (!requestsVector->empty()) {
175 mBorderExchanger->wait(*requestsVector);
176 pvAssert(requestsVector->empty());
178 updateActiveIndices(delay);
183 void Publisher::increaseTimeLevel() {
184 wait(mpiRequestsBuffer->getNumLevels() - 1);
185 mpiRequestsBuffer->newLevel();
186 store->newLevelIndex();
PVLayerCube createCube(int delay=0)
PVLayerCube createCube(PVLayerLoc const &loc, int delay)
int publish(double lastUpdateTime)
void copyForward(double lastUpdateTime)