PetaVision  Alpha
PresynapticPerspectiveConvolveDelivery.cpp
1 /*
2  * PresynapticPerspectiveConvolveDelivery.cpp
3  *
4  * Created on: Aug 24, 2017
5  * Author: Pete Schultz
6  */
7 
8 #include "PresynapticPerspectiveConvolveDelivery.hpp"
9 #include "columns/HyPerCol.hpp"
10 
11 namespace PV {
12 
13 PresynapticPerspectiveConvolveDelivery::PresynapticPerspectiveConvolveDelivery(
14  char const *name,
15  HyPerCol *hc) {
16  initialize(name, hc);
17 }
18 
19 PresynapticPerspectiveConvolveDelivery::PresynapticPerspectiveConvolveDelivery() {}
20 
21 PresynapticPerspectiveConvolveDelivery::~PresynapticPerspectiveConvolveDelivery() {}
22 
23 int PresynapticPerspectiveConvolveDelivery::initialize(char const *name, HyPerCol *hc) {
24  return BaseObject::initialize(name, hc);
25 }
26 
27 void PresynapticPerspectiveConvolveDelivery::setObjectType() {
28  mObjectType = "PresynapticPerspectiveConvolveDelivery";
29 }
30 
32  int status = HyPerDelivery::ioParamsFillGroup(ioFlag);
33  return status;
34 }
35 
37  mReceiveGpu = false; // If it's true, we should be using a different class.
38 }
39 
40 Response::Status PresynapticPerspectiveConvolveDelivery::communicateInitInfo(
41  std::shared_ptr<CommunicateInitInfoMessage const> message) {
42  auto status = HyPerDelivery::communicateInitInfo(message);
43  if (!Response::completed(status)) {
44  return status;
45  }
46  // HyPerDelivery::communicateInitInfo() postpones until mWeightsPair communicates.
47  pvAssert(mWeightsPair and mWeightsPair->getInitInfoCommunicatedFlag());
48  mWeightsPair->needPre();
49  return Response::SUCCESS;
50 }
51 
52 Response::Status PresynapticPerspectiveConvolveDelivery::allocateDataStructures() {
53  auto status = HyPerDelivery::allocateDataStructures();
54  if (!Response::completed(status)) {
55  return status;
56  }
57  allocateThreadGSyn();
58  return Response::SUCCESS;
59 }
60 
61 void PresynapticPerspectiveConvolveDelivery::allocateThreadGSyn() {
62  // If multithreaded, allocate a GSyn buffer for each thread, to avoid collisions.
63  int const numThreads = parent->getNumThreads();
64  if (numThreads > 1) {
65  mThreadGSyn.resize(numThreads);
66  // mThreadGSyn is only a buffer for one batch element. We're threading over presynaptic
67  // neuron index, not batch element; so batch elements will be processed serially.
68  for (auto &th : mThreadGSyn) {
69  th.resize(mPostLayer->getNumNeurons());
70  }
71  }
72 }
73 
75  // Check if we need to update based on connection's channel
76  if (getChannelCode() == CHANNEL_NOUPDATE) {
77  return;
78  }
79  float *postChannel = mPostLayer->getChannel(getChannelCode());
80  pvAssert(postChannel);
81 
82  PVLayerLoc const *preLoc = mPreLayer->getLayerLoc();
83  PVLayerLoc const *postLoc = mPostLayer->getLayerLoc();
84  Weights *weights = mWeightsPair->getPreWeights();
85 
86  int const nxPreExtended = preLoc->nx + preLoc->halo.rt + preLoc->halo.rt;
87  int const nyPreExtended = preLoc->ny + preLoc->halo.dn + preLoc->halo.up;
88  int const numPreExtended = nxPreExtended * nyPreExtended * preLoc->nf;
89 
90  int const numPostRestricted = postLoc->nx * postLoc->ny * postLoc->nf;
91 
92  int nbatch = preLoc->nbatch;
93  pvAssert(nbatch == postLoc->nbatch);
94 
95  const int sy = postLoc->nx * postLoc->nf; // stride in restricted layer
96  const int syw = weights->getGeometry()->getPatchStrideY(); // stride in patch
97 
98  bool const preLayerIsSparse = mPreLayer->getSparseFlag();
99 
100  int numAxonalArbors = mArborList->getNumAxonalArbors();
101  for (int arbor = 0; arbor < numAxonalArbors; arbor++) {
102  int delay = mArborList->getDelay(arbor);
103  PVLayerCube activityCube = mPreLayer->getPublisher()->createCube(delay);
104 
105  for (int b = 0; b < nbatch; b++) {
106  size_t batchOffset = b * numPreExtended;
107  float *activityBatch = activityCube.data + batchOffset;
108  float *gSynPatchHeadBatch = postChannel + b * numPostRestricted;
109  SparseList<float>::Entry const *activeIndicesBatch = NULL;
110  if (preLayerIsSparse) {
111  activeIndicesBatch =
112  (SparseList<float>::Entry *)activityCube.activeIndices + batchOffset;
113  }
114 
115  int numNeurons =
116  preLayerIsSparse ? activityCube.numActive[b] : mPreLayer->getNumExtended();
117 
118 #ifdef PV_USE_OPENMP_THREADS
119  // Clear all thread gsyn buffer
120  if (!mThreadGSyn.empty()) {
121 #pragma omp parallel for schedule(static)
122  for (int ti = 0; ti < parent->getNumThreads(); ++ti) {
123  for (int ni = 0; ni < numPostRestricted; ++ni) {
124  mThreadGSyn[ti][ni] = 0.0;
125  }
126  }
127  }
128 #endif
129 
130  std::size_t const *gSynPatchStart = weights->getGeometry()->getGSynPatchStart().data();
131  if (!preLayerIsSparse) {
132  for (int y = 0; y < weights->getPatchSizeY(); y++) {
133 #ifdef PV_USE_OPENMP_THREADS
134 #pragma omp parallel for schedule(guided)
135 #endif
136  for (int idx = 0; idx < numNeurons; idx++) {
137  int kPreExt = idx;
138 
139  // Weight
140  Patch const *patch = &weights->getPatch(kPreExt);
141 
142  if (y >= patch->ny) {
143  continue;
144  }
145 
146  // Activity
147  float a = activityBatch[kPreExt];
148  if (a == 0.0f) {
149  continue;
150  }
151  a *= mDeltaTimeFactor;
152 
153  // gSyn
154  float *gSynPatchHead = gSynPatchHeadBatch;
155 
156 #ifdef PV_USE_OPENMP_THREADS
157  if (!mThreadGSyn.empty()) {
158  gSynPatchHead = mThreadGSyn[omp_get_thread_num()].data();
159  }
160 #endif // PV_USE_OPENMP_THREADS
161 
162  float *postPatchStart = &gSynPatchHead[gSynPatchStart[kPreExt]];
163 
164  const int nk = patch->nx * weights->getPatchSizeF();
165  float const *weightDataHead = weights->getDataFromPatchIndex(arbor, kPreExt);
166  float const *weightDataStart = &weightDataHead[patch->offset];
167 
168  float *v = postPatchStart + y * sy;
169  float const *weightValues = weightDataStart + y * syw;
170  for (int k = 0; k < nk; k++) {
171  v[k] += a * weightValues[k];
172  }
173  }
174  }
175  }
176  else { // Sparse, use the stored activity / index pairs
177  int const nyp = weights->getPatchSizeY();
178  for (int y = 0; y < nyp; y++) {
179 #ifdef PV_USE_OPENMP_THREADS
180 #pragma omp parallel for schedule(guided)
181 #endif
182  for (int idx = 0; idx < numNeurons; idx++) {
183  int kPreExt = activeIndicesBatch[idx].index;
184 
185  // Weight
186  Patch const *patch = &weights->getPatch(kPreExt);
187 
188  if (y >= patch->ny) {
189  continue;
190  }
191 
192  // Activity
193  float a = activeIndicesBatch[idx].value;
194  if (a == 0.0f) {
195  continue;
196  }
197  a *= mDeltaTimeFactor;
198 
199  // gSyn
200  float *gSynPatchHead = gSynPatchHeadBatch;
201 
202 #ifdef PV_USE_OPENMP_THREADS
203  if (!mThreadGSyn.empty()) {
204  gSynPatchHead = mThreadGSyn[omp_get_thread_num()].data();
205  }
206 #endif // PV_USE_OPENMP_THREADS
207 
208  float *postPatchStart = &gSynPatchHead[gSynPatchStart[kPreExt]];
209 
210  const int nk = patch->nx * weights->getPatchSizeF();
211  float const *weightDataHead = weights->getDataFromPatchIndex(arbor, kPreExt);
212  float const *weightDataStart = &weightDataHead[patch->offset];
213 
214  float *v = postPatchStart + y * sy;
215  float const *weightValues = weightDataStart + y * syw;
216  for (int k = 0; k < nk; k++) {
217  v[k] += a * weightValues[k];
218  }
219  }
220  }
221  }
222 #ifdef PV_USE_OPENMP_THREADS
223  // Accumulate back into gSyn. Should this be done in HyPerLayer where it can be done once,
224  // as opposed to once per connection?
225  if (!mThreadGSyn.empty()) {
226  float *gSynPatchHead = gSynPatchHeadBatch;
227  int numNeurons = mPostLayer->getNumNeurons();
228  for (int ti = 0; ti < parent->getNumThreads(); ti++) {
229  float *onethread = mThreadGSyn[ti].data();
230 // Looping over neurons is thread safe
231 #pragma omp parallel for
232  for (int ni = 0; ni < numNeurons; ni++) {
233  gSynPatchHead[ni] += onethread[ni];
234  }
235  }
236  }
237 #endif // PV_USE_OPENMP_THREADS
238  }
239  }
240 #ifdef PV_USE_CUDA
241  // CPU updated GSyn, now need to update GSyn on GPU
242  mPostLayer->setUpdatedDeviceGSynFlag(true);
243 #endif // PV_USE_CUDA
244 }
245 
246 void PresynapticPerspectiveConvolveDelivery::deliverUnitInput(float *recvBuffer) {
247  PVLayerLoc const *postLoc = mPostLayer->getLayerLoc();
248  Weights *weights = mWeightsPair->getPreWeights();
249 
250  int const numPostRestricted = postLoc->nx * postLoc->ny * postLoc->nf;
251 
252  int nbatch = postLoc->nbatch;
253 
254  const int sy = postLoc->nx * postLoc->nf; // stride in restricted layer
255  const int syw = weights->getGeometry()->getPatchStrideY(); // stride in patch
256 
257  int numAxonalArbors = mArborList->getNumAxonalArbors();
258  for (int arbor = 0; arbor < numAxonalArbors; arbor++) {
259  for (int b = 0; b < nbatch; b++) {
260  float *recvBatch = recvBuffer + b * numPostRestricted;
261  SparseList<float>::Entry const *activeIndicesBatch = NULL;
262 
263  int numNeurons = mPreLayer->getNumExtended();
264 
265 #ifdef PV_USE_OPENMP_THREADS
266  // Clear all thread gsyn buffer
267  if (!mThreadGSyn.empty()) {
268 #pragma omp parallel for schedule(static)
269  for (int ti = 0; ti < parent->getNumThreads(); ++ti) {
270  for (int ni = 0; ni < numPostRestricted; ++ni) {
271  mThreadGSyn[ti][ni] = 0.0;
272  }
273  }
274  }
275 #endif
276 
277  std::size_t const *gSynPatchStart = weights->getGeometry()->getGSynPatchStart().data();
278  for (int y = 0; y < weights->getPatchSizeY(); y++) {
279 #ifdef PV_USE_OPENMP_THREADS
280 #pragma omp parallel for schedule(guided)
281 #endif
282  for (int idx = 0; idx < numNeurons; idx++) {
283  int kPreExt = idx;
284 
285  // Weight
286  Patch const *patch = &weights->getPatch(kPreExt);
287 
288  if (y >= patch->ny) {
289  continue;
290  }
291 
292  // gSyn
293  float *recvPatchHead = recvBatch;
294 
295 #ifdef PV_USE_OPENMP_THREADS
296  if (!mThreadGSyn.empty()) {
297  recvPatchHead = mThreadGSyn[omp_get_thread_num()].data();
298  }
299 #endif // PV_USE_OPENMP_THREADS
300 
301  float *postPatchStart = &recvPatchHead[gSynPatchStart[kPreExt]];
302 
303  const int nk = patch->nx * weights->getPatchSizeF();
304  float const *weightDataHead = weights->getDataFromPatchIndex(arbor, kPreExt);
305  float const *weightDataStart = &weightDataHead[patch->offset];
306 
307  float *v = postPatchStart + y * sy;
308  float const *weightValues = weightDataStart + y * syw;
309  for (int k = 0; k < nk; k++) {
310  v[k] += mDeltaTimeFactor * weightValues[k];
311  }
312  }
313  }
314 #ifdef PV_USE_OPENMP_THREADS
315  // Accumulate back into gSyn. Should this be done in HyPerLayer where it can be done once,
316  // as opposed to once per connection?
317  if (!mThreadGSyn.empty()) {
318  float *recvPatchHead = recvBatch;
319  int numNeurons = mPostLayer->getNumNeurons();
320  for (int ti = 0; ti < parent->getNumThreads(); ti++) {
321  float *onethread = mThreadGSyn[ti].data();
322 // Looping over neurons is thread safe
323 #pragma omp parallel for
324  for (int ni = 0; ni < numNeurons; ni++) {
325  recvPatchHead[ni] += onethread[ni];
326  }
327  }
328  }
329 #endif // PV_USE_OPENMP_THREADS
330  }
331  }
332 }
333 
334 } // end namespace PV
virtual int ioParamsFillGroup(enum ParamsIOFlag ioFlag) override
PVLayerCube createCube(int delay=0)
Definition: Publisher.cpp:60
static bool completed(Status &a)
Definition: Response.hpp:49
Patch const & getPatch(int patchIndex) const
Definition: Weights.cpp:194
int getPatchSizeY() const
Definition: Weights.hpp:222
std::shared_ptr< PatchGeometry > getGeometry() const
Definition: Weights.hpp:148
int getNumAxonalArbors() const
Definition: ArborList.hpp:52
float * getDataFromPatchIndex(int arbor, int patchIndex)
Definition: Weights.cpp:205
virtual void ioParam_receiveGpu(enum ParamsIOFlag ioFlag) override
receiveGpu: PresynapticPerspectiveConvolveDelivery always sets receiveGpu to false. The receiveGpu=true case is handled by the PresynapticPerspectiveGPUDelivery class.
virtual int ioParamsFillGroup(enum ParamsIOFlag ioFlag) override
int getPatchSizeF() const
Definition: Weights.hpp:225
bool getInitInfoCommunicatedFlag() const
Definition: BaseObject.hpp:95