8 #include "Publisher.hpp"     9 #include "checkpointing/CheckpointEntryDataStore.hpp"    10 #include "include/pv_common.h"    11 #include "utils/PVAssert.hpp"    15 Publisher::Publisher(MPIBlock 
const &mpiBlock, 
PVLayerCube *cube, 
int numLevels, 
bool isSparse) {
    16    this->mLayerCube = cube;
    18    int const numBuffers = cube->loc.nbatch;
    19    int const numItems   = cube->numItems / numBuffers; 
    21    store = 
new DataStore(numBuffers, numItems, numLevels, isSparse);
    23    mBorderExchanger = 
new BorderExchange(mpiBlock, cube->loc);
    25    mpiRequestsBuffer = 
new RingBuffer<std::vector<MPI_Request>>(numLevels, 1);
    26    for (
int l = 0; l < numLevels; l++) {
    27       auto *v = mpiRequestsBuffer->getBuffer(l, 0);
    29       v->reserve((NUM_NEIGHBORHOOD - 1) * numBuffers);
    33 Publisher::~Publisher() {
    34    for (
int l = 0; l < mpiRequestsBuffer->getNumLevels(); l++) {
    37    delete mpiRequestsBuffer;
    39    delete mBorderExchanger;
    42 void Publisher::checkpointDataStore(
    43       Checkpointer *checkpointer,
    44       char const *objectName,
    45       char const *bufferName) {
    46    bool registerSucceeded = checkpointer->registerCheckpointEntry(
    47          std::make_shared<CheckpointEntryDataStore>(
    48                objectName, bufferName, checkpointer->getMPIBlock(), store, &mLayerCube->loc),
    52 void Publisher::updateAllActiveIndices() {
    53    if (store->isSparse()) {
    54       for (
int l = 0; l < store->getNumLevels(); l++) {
    55          updateActiveIndices(l);
    62    return store->
createCube(mLayerCube->loc, delay);
    65 void Publisher::updateActiveIndices(
int delay) {
    66    if (store->isSparse()) {
    67       for (
int b = 0; b < store->getNumBuffers(); b++) {
    69          if (*store->numActiveBuffer(b, delay) < 0L) {
    70             store->updateActiveIndices(b, delay);
    72          pvAssert(*store->numActiveBuffer(b, delay) >= 0L);
    83    size_t dataSize = mLayerCube->numItems * 
sizeof(float);
    85    float const *sendBuf = mLayerCube->data;
    86    float *recvBuf       = recvBuffer(0); 
    88    memcpy(recvBuf, sendBuf, dataSize);
    89    exchangeBorders(&mLayerCube->loc, 0);
    90    store->setLastUpdateTime(0 , lastUpdateTime);
    92    for (
int b = 0; b < store->getNumBuffers(); b++) {
    93       store->markActiveIndicesOutOfSync(b, 0);
   102    if (store->getNumLevels() > 1) {
   103       float *recvBuf  = recvBuffer(0); 
   104       size_t dataSize = mLayerCube->numItems * 
sizeof(float);
   105       memcpy(recvBuf, recvBuffer(0 , 1), dataSize);
   106       store->setLastUpdateTime(0 , lastUpdateTime);
   107       updateActiveIndices(0); 
   111 int Publisher::exchangeBorders(
const PVLayerLoc *loc, 
int delay ) {
   112    PVHalo const *halo = &loc->halo;
   113    if (halo->lt == 0 && halo->rt == 0 && halo->dn == 0 && halo->up == 0) {
   116    int status = PV_SUCCESS;
   119    auto *requestsVector = mpiRequestsBuffer->getBuffer(delay, 0);
   120    pvAssert(requestsVector->empty());
   126    int exchangeVectorSize = 2 * (mBorderExchanger->getNumNeighbors() - 1);
   127    for (
int b = 0; b < loc->nbatch; b++) {
   129       pvAssert(requestsVector->size() == b * exchangeVectorSize);
   131       float *data = recvBuffer(b, delay);
   132       std::vector<MPI_Request> batchElementMPIRequest{};
   133       mBorderExchanger->exchange(data, batchElementMPIRequest);
   134       pvAssert(batchElementMPIRequest.size() == exchangeVectorSize);
   135       requestsVector->insert(
   136             requestsVector->end(), batchElementMPIRequest.begin(), batchElementMPIRequest.end());
   137       pvAssert(requestsVector->size() == (b + 1) * exchangeVectorSize);
   145 int Publisher::isExchangeFinished(
int delay ) {
   147    auto *requestsVector = mpiRequestsBuffer->getBuffer(delay, 0);
   148    if (requestsVector->empty()) {
   153       MPI_Testall((
int)requestsVector->size(), requestsVector->data(), &test, MPI_STATUSES_IGNORE);
   155          requestsVector->clear();
   156          updateActiveIndices(delay);
   158       isReady = (bool)test;
   169          "[%2d]: waiting for data, num_requests==%d\n", mBorderExchanger->getRank(), numRemote);
   171 #endif // DEBUG_OUTPUT   173    auto *requestsVector = mpiRequestsBuffer->getBuffer(delay, 0);
   174    if (!requestsVector->empty()) {
   175       mBorderExchanger->wait(*requestsVector);
   176       pvAssert(requestsVector->empty());
   178    updateActiveIndices(delay);
   183 void Publisher::increaseTimeLevel() {
   184    wait(mpiRequestsBuffer->getNumLevels() - 1);
   185    mpiRequestsBuffer->newLevel();
   186    store->newLevelIndex();
 PVLayerCube createCube(int delay=0)
PVLayerCube createCube(PVLayerLoc const &loc, int delay)
int publish(double lastUpdateTime)
void copyForward(double lastUpdateTime)