PetaVision  Alpha
PresynapticPerspectiveStochasticDelivery.cpp
1 /*
2  * PresynapticPerspectiveStochasticDelivery.cpp
3  *
4  * Created on: Aug 24, 2017
5  * Author: Pete Schultz
6  */
7 
8 #include "PresynapticPerspectiveStochasticDelivery.hpp"
9 #include "columns/HyPerCol.hpp"
10 
11 // Note: there is a lot of code duplication between PresynapticPerspectiveConvolveDelivery
12 // and PresynapticPerspectiveStochasticDelivery.
13 
14 namespace PV {
15 
16 PresynapticPerspectiveStochasticDelivery::PresynapticPerspectiveStochasticDelivery(
17  char const *name,
18  HyPerCol *hc) {
19  initialize(name, hc);
20 }
21 
22 PresynapticPerspectiveStochasticDelivery::PresynapticPerspectiveStochasticDelivery() {}
23 
24 PresynapticPerspectiveStochasticDelivery::~PresynapticPerspectiveStochasticDelivery() {
25  delete mRandState;
26 }
27 
28 int PresynapticPerspectiveStochasticDelivery::initialize(char const *name, HyPerCol *hc) {
29  return BaseObject::initialize(name, hc);
30 }
31 
32 void PresynapticPerspectiveStochasticDelivery::setObjectType() {
33  mObjectType = "PresynapticPerspectiveStochasticDelivery";
34 }
35 
37  int status = HyPerDelivery::ioParamsFillGroup(ioFlag);
38  return status;
39 }
40 
42  mReceiveGpu = false; // If it's true, we should be using a different class.
43 }
44 
45 Response::Status PresynapticPerspectiveStochasticDelivery::communicateInitInfo(
46  std::shared_ptr<CommunicateInitInfoMessage const> message) {
47  auto status = HyPerDelivery::communicateInitInfo(message);
48  if (!Response::completed(status)) {
49  return status;
50  }
51  // HyPerDelivery::communicateInitInfo() postpones until mWeightsPair communicates.
52  pvAssert(mWeightsPair and mWeightsPair->getInitInfoCommunicatedFlag());
53  mWeightsPair->needPre();
54  return Response::SUCCESS;
55 }
56 
57 Response::Status PresynapticPerspectiveStochasticDelivery::allocateDataStructures() {
58  auto status = HyPerDelivery::allocateDataStructures();
59  if (!Response::completed(status)) {
60  return status;
61  }
62  allocateThreadGSyn();
63  allocateRandState();
64  return Response::SUCCESS;
65 }
66 
67 void PresynapticPerspectiveStochasticDelivery::allocateThreadGSyn() {
68  // If multithreaded, allocate a GSyn buffer for each thread, to avoid collisions.
69  int const numThreads = parent->getNumThreads();
70  if (numThreads > 1) {
71  mThreadGSyn.resize(numThreads);
72  // mThreadGSyn is only a buffer for one batch element. We're threading over presynaptic
73  // neuron index, not batch element; so batch elements will be processed serially.
74  for (auto &th : mThreadGSyn) {
75  th.resize(mPostLayer->getNumNeurons());
76  }
77  }
78 }
79 
80 void PresynapticPerspectiveStochasticDelivery::allocateRandState() {
81  mRandState = new Random(mPreLayer->getLayerLoc(), true /*need RNGs in the extended buffer*/);
82 }
83 
85  // Check if we need to update based on connection's channel
86  if (getChannelCode() == CHANNEL_NOUPDATE) {
87  return;
88  }
89  float *postChannel = mPostLayer->getChannel(getChannelCode());
90  pvAssert(postChannel);
91 
92  PVLayerLoc const *preLoc = mPreLayer->getLayerLoc();
93  PVLayerLoc const *postLoc = mPostLayer->getLayerLoc();
94  Weights *weights = mWeightsPair->getPreWeights();
95 
96  int const nxPreExtended = preLoc->nx + preLoc->halo.rt + preLoc->halo.rt;
97  int const nyPreExtended = preLoc->ny + preLoc->halo.dn + preLoc->halo.up;
98  int const numPreExtended = nxPreExtended * nyPreExtended * preLoc->nf;
99 
100  int const numPostRestricted = postLoc->nx * postLoc->ny * postLoc->nf;
101 
102  int nbatch = preLoc->nbatch;
103  pvAssert(nbatch == postLoc->nbatch);
104 
105  const int sy = postLoc->nx * postLoc->nf; // stride in restricted layer
106  const int syw = weights->getGeometry()->getPatchStrideY(); // stride in patch
107 
108  bool const preLayerIsSparse = mPreLayer->getSparseFlag();
109 
110  int numAxonalArbors = mArborList->getNumAxonalArbors();
111  for (int arbor = 0; arbor < numAxonalArbors; arbor++) {
112  int delay = mArborList->getDelay(arbor);
113  PVLayerCube activityCube = mPreLayer->getPublisher()->createCube(delay);
114 
115  for (int b = 0; b < nbatch; b++) {
116  size_t batchOffset = b * numPreExtended;
117  float *activityBatch = activityCube.data + batchOffset;
118  float *gSynPatchHeadBatch = postChannel + b * numPostRestricted;
119  SparseList<float>::Entry const *activeIndicesBatch = NULL;
120  if (preLayerIsSparse) {
121  activeIndicesBatch =
122  (SparseList<float>::Entry *)activityCube.activeIndices + batchOffset;
123  }
124 
125  int numNeurons =
126  preLayerIsSparse ? activityCube.numActive[b] : mPreLayer->getNumExtended();
127 
128 #ifdef PV_USE_OPENMP_THREADS
129  // Clear all thread gsyn buffer
130  if (!mThreadGSyn.empty()) {
131 #pragma omp parallel for schedule(static)
132  for (int ti = 0; ti < parent->getNumThreads(); ++ti) {
133  for (int ni = 0; ni < numPostRestricted; ++ni) {
134  mThreadGSyn[ti][ni] = 0.0;
135  }
136  }
137  }
138 #endif
139 
140  std::size_t const *gSynPatchStart = weights->getGeometry()->getGSynPatchStart().data();
141  if (!preLayerIsSparse) {
142  for (int y = 0; y < weights->getPatchSizeY(); y++) {
143 #ifdef PV_USE_OPENMP_THREADS
144 #pragma omp parallel for schedule(guided)
145 #endif
146  for (int idx = 0; idx < numNeurons; idx++) {
147  int kPreExt = idx;
148 
149  // Weight
150  Patch const *patch = &weights->getPatch(kPreExt);
151 
152  if (y >= patch->ny) {
153  continue;
154  }
155 
156  // Activity
157  float a = activityBatch[kPreExt];
158  if (a == 0.0f) {
159  continue;
160  }
161  a *= mDeltaTimeFactor;
162 
163  // gSyn
164  float *gSynPatchHead = gSynPatchHeadBatch;
165 
166 #ifdef PV_USE_OPENMP_THREADS
167  if (!mThreadGSyn.empty()) {
168  gSynPatchHead = mThreadGSyn[omp_get_thread_num()].data();
169  }
170 #endif // PV_USE_OPENMP_THREADS
171 
172  float *postPatchStart = &gSynPatchHead[gSynPatchStart[kPreExt]];
173 
174  const int nk = patch->nx * weights->getPatchSizeF();
175  float const *weightDataHead = weights->getDataFromPatchIndex(arbor, kPreExt);
176  float const *weightDataStart = &weightDataHead[patch->offset];
177  taus_uint4 *rng = mRandState->getRNG(kPreExt);
178  long along = (long)((double)a * cl_random_max());
179 
180  float *v = postPatchStart + y * sy;
181  float const *weightValues = weightDataStart + y * syw;
182  for (int k = 0; k < nk; k++) {
183  *rng = cl_random_get(*rng);
184  v[k] += (rng->s0 < along) * weightValues[k];
185  }
186  }
187  }
188  }
189  else { // Sparse, use the stored activity / index pairs
190  int const nyp = weights->getPatchSizeY();
191  for (int y = 0; y < nyp; y++) {
192 #ifdef PV_USE_OPENMP_THREADS
193 #pragma omp parallel for schedule(guided)
194 #endif
195  for (int idx = 0; idx < numNeurons; idx++) {
196  int kPreExt = activeIndicesBatch[idx].index;
197 
198  // Weight
199  Patch const *patch = &weights->getPatch(kPreExt);
200 
201  if (y >= patch->ny) {
202  continue;
203  }
204 
205  // Activity
206  float a = activeIndicesBatch[idx].value;
207  if (a == 0.0f) {
208  continue;
209  }
210  a *= mDeltaTimeFactor;
211 
212  // gSyn
213  float *gSynPatchHead = gSynPatchHeadBatch;
214 
215 #ifdef PV_USE_OPENMP_THREADS
216  if (!mThreadGSyn.empty()) {
217  gSynPatchHead = mThreadGSyn[omp_get_thread_num()].data();
218  }
219 #endif // PV_USE_OPENMP_THREADS
220 
221  float *postPatchStart = &gSynPatchHead[gSynPatchStart[kPreExt]];
222 
223  const int nk = patch->nx * weights->getPatchSizeF();
224  float const *weightDataHead = weights->getDataFromPatchIndex(arbor, kPreExt);
225  float const *weightDataStart = &weightDataHead[patch->offset];
226  taus_uint4 *rng = mRandState->getRNG(kPreExt);
227  long along = (long)((double)a * cl_random_max());
228 
229  float *v = postPatchStart + y * sy;
230  float const *weightValues = weightDataStart + y * syw;
231  for (int k = 0; k < nk; k++) {
232  *rng = cl_random_get(*rng);
233  v[k] += (rng->s0 < along) * weightValues[k];
234  }
235  }
236  }
237  }
238 #ifdef PV_USE_OPENMP_THREADS
239  // Accumulate back into gSyn. Should this be done in HyPerLayer where it can be done once,
240  // as opposed to once per connection?
241  if (!mThreadGSyn.empty()) {
242  float *gSynPatchHead = gSynPatchHeadBatch;
243  int numNeurons = mPostLayer->getNumNeurons();
244  for (int ti = 0; ti < parent->getNumThreads(); ti++) {
245  float *onethread = mThreadGSyn[ti].data();
246 // Looping over neurons is thread safe
247 #pragma omp parallel for
248  for (int ni = 0; ni < numNeurons; ni++) {
249  gSynPatchHead[ni] += onethread[ni];
250  }
251  }
252  }
253 #endif // PV_USE_OPENMP_THREADS
254  }
255  }
256 #ifdef PV_USE_CUDA
257  // CPU updated GSyn, now need to update GSyn on GPU
258  mPostLayer->setUpdatedDeviceGSynFlag(true);
259 #endif // PV_USE_CUDA
260 }
261 
262 void PresynapticPerspectiveStochasticDelivery::deliverUnitInput(float *recvBuffer) {
263  PVLayerLoc const *preLoc = mPreLayer->getLayerLoc();
264  PVLayerLoc const *postLoc = mPostLayer->getLayerLoc();
265  Weights *weights = mWeightsPair->getPreWeights();
266 
267  int const numPostRestricted = postLoc->nx * postLoc->ny * postLoc->nf;
268 
269  int nbatch = postLoc->nbatch;
270 
271  const int sy = postLoc->nx * postLoc->nf; // stride in restricted layer
272  const int syw = weights->getGeometry()->getPatchStrideY(); // stride in patch
273 
274  int const numAxonalArbors = mArborList->getNumAxonalArbors();
275  for (int arbor = 0; arbor < numAxonalArbors; arbor++) {
276  int delay = mArborList->getDelay(arbor);
277  PVLayerCube activityCube = mPreLayer->getPublisher()->createCube(delay);
278 
279  for (int b = 0; b < nbatch; b++) {
280  float *recvBatch = recvBuffer + b * numPostRestricted;
281  int numNeurons = mPreLayer->getNumExtended();
282 
283 #ifdef PV_USE_OPENMP_THREADS
284  // Clear all thread gsyn buffer
285  if (!mThreadGSyn.empty()) {
286 #pragma omp parallel for schedule(static)
287  for (int ti = 0; ti < parent->getNumThreads(); ++ti) {
288  for (int ni = 0; ni < numPostRestricted; ++ni) {
289  mThreadGSyn[ti][ni] = 0.0;
290  }
291  }
292  }
293 #endif
294 
295  std::size_t const *gSynPatchStart = weights->getGeometry()->getGSynPatchStart().data();
296  for (int y = 0; y < weights->getPatchSizeY(); y++) {
297 #ifdef PV_USE_OPENMP_THREADS
298 #pragma omp parallel for schedule(guided)
299 #endif
300  for (int idx = 0; idx < numNeurons; idx++) {
301  int kPreExt = idx;
302 
303  // Weight
304  Patch const *patch = &weights->getPatch(kPreExt);
305 
306  if (y >= patch->ny) {
307  continue;
308  }
309 
310  // Activity
311  float a = mDeltaTimeFactor;
312 
313  // gSyn
314  float *recvPatchHead = recvBatch;
315 
316 #ifdef PV_USE_OPENMP_THREADS
317  if (!mThreadGSyn.empty()) {
318  recvPatchHead = mThreadGSyn[omp_get_thread_num()].data();
319  }
320 #endif // PV_USE_OPENMP_THREADS
321 
322  float *postPatchStart = &recvPatchHead[gSynPatchStart[kPreExt]];
323 
324  const int nk = patch->nx * weights->getPatchSizeF();
325  float const *weightDataHead = weights->getDataFromPatchIndex(arbor, kPreExt);
326  float const *weightDataStart = &weightDataHead[patch->offset];
327  taus_uint4 *rng = mRandState->getRNG(kPreExt);
328  long along = (long)cl_random_max();
329 
330  float *v = postPatchStart + y * sy;
331  float const *weightValues = weightDataStart + y * syw;
332  for (int k = 0; k < nk; k++) {
333  *rng = cl_random_get(*rng);
334  v[k] += (rng->s0 < along) * weightValues[k];
335  }
336  }
337  }
338 #ifdef PV_USE_OPENMP_THREADS
339  // Accumulate back into gSyn. Should this be done in HyPerLayer where it can be done once,
340  // as opposed to once per connection?
341  if (!mThreadGSyn.empty()) {
342  float *recvPatchHead = recvBatch;
343  int numNeurons = mPostLayer->getNumNeurons();
344  for (int ti = 0; ti < parent->getNumThreads(); ti++) {
345  float *onethread = mThreadGSyn[ti].data();
346 // Looping over neurons is thread safe
347 #pragma omp parallel for
348  for (int ni = 0; ni < numNeurons; ni++) {
349  recvPatchHead[ni] += onethread[ni];
350  }
351  }
352  }
353 #endif // PV_USE_OPENMP_THREADS
354  }
355  }
356 }
357 
358 } // end namespace PV
PVLayerCube createCube(int delay=0)
Definition: Publisher.cpp:60
static bool completed(Status &a)
Definition: Response.hpp:49
Patch const & getPatch(int patchIndex) const
Definition: Weights.cpp:194
int getPatchSizeY() const
Definition: Weights.hpp:222
virtual int ioParamsFillGroup(enum ParamsIOFlag ioFlag) override
std::shared_ptr< PatchGeometry > getGeometry() const
Definition: Weights.hpp:148
int getNumAxonalArbors() const
Definition: ArborList.hpp:52
float * getDataFromPatchIndex(int arbor, int patchIndex)
Definition: Weights.cpp:205
virtual int ioParamsFillGroup(enum ParamsIOFlag ioFlag) override
virtual void ioParam_receiveGpu(enum ParamsIOFlag ioFlag) override
receiveGpu: PresynapticPerspectiveStochasticDelivery always sets receiveGpu to false. The receiveGpu=true case is handled by the PresynapticPerspectiveGPUDelivery class.
int getPatchSizeF() const
Definition: Weights.hpp:225
bool getInitInfoCommunicatedFlag() const
Definition: BaseObject.hpp:95