00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef _REENTRANT
00025 #define _REENTRANT
00026 #endif
00027
00028
00029 #include "UDPSynchronizedClient.h"
00030 #include "SpikeStreamSimulation.h"
00031
00032
00033
00034 #include <pthread.h>
00035
00036
00037 #include <errno.h>
00038 #include <math.h>
00039
00040
00041
00042 #define MESSAGE_BUFFER_SIZE 1048576
00043
00044
00045
00046
00047 #define SPIKE_BUFFER_SIZE 60000
00048
00049
00050 #define STEP_COUNTER_MAX 32768
00051
00052
00053
00054 #define MAX_EXTERNAL_COMPUTE_TIME_US 500000
00055
00056
00057
00058
00059 pthread_mutex_t threadMutex = PTHREAD_MUTEX_INITIALIZER;
00060
00061
00062
00063
00064 void* startThreadFunction(void* udpSyncClient){
00065 ((UDPSynchronizedClient*)udpSyncClient)->run();
00066 }
00067
00068
00069
00070 UDPSynchronizedClient::UDPSynchronizedClient(unsigned int neurGrpWidth, unsigned int neurGrpLength){
00071
00072 neuronGrpWidth = neurGrpWidth;
00073 neuronGrpLength = neurGrpLength;
00074
00075
00076 socketOpen = false;
00077 threadRunning = false;
00078 externalSyncDelay = false;
00079 externalComputeTime_us = 0;
00080 timeStepsNoMessagesCount = 0;
00081
00082
00083 spikeCount = 0;
00084 spikeBuffer = new unsigned int[SPIKE_BUFFER_SIZE];
00085
00086
00087 messageBuffer = new unsigned char[MESSAGE_BUFFER_SIZE];
00088 }
00089
00090
00091
00092 UDPSynchronizedClient::~UDPSynchronizedClient(){
00093 #ifdef MEMORY_DEBUG
00094 cout<<"DESTROYING UDP SYNCHRONIZATION CLIENT"<<endl;
00095 #endif//MEMORY_DEBUG
00096
00097
00098 closeDevice();
00099
00100
00101 delete [] spikeBuffer;
00102 delete [] messageBuffer;
00103 }
00104
00105
00106
00107
00108
00109
00110
00111
00112 bool UDPSynchronizedClient::clientThreadRunning(){
00113 return threadRunning;
00114 }
00115
00116
00117
00118 bool UDPSynchronizedClient::closeDevice(){
00119
00120 bool closeOk = true;
00121
00122
00123 threadRunning = false;
00124
00125
00126 void *thread_result;
00127 int joinResult = pthread_join(clientThread, &thread_result);
00128 if(joinResult != 0){
00129 SpikeStreamSimulation::systemError("UDPSynchronizedClient: Thread join failed");
00130
00131 closeOk = false;
00132 }
00133 else{
00134 SpikeStreamSimulation::systemInfo("UDPSynchronizedClient: Thread successfully rejoined");
00135 }
00136
00137
00138 close(socketHandle);
00139 socketOpen = false;
00140
00141
00142 return closeOk;
00143 }
00144
00145
00146
00147 bool UDPSynchronizedClient::deviceOpen(){
00148 if(socketOpen || threadRunning)
00149 return true;
00150 return false;
00151 }
00152
00153
00154
00155
00156
00157
00158
00159
00160 unsigned int UDPSynchronizedClient::getExternalComputeTime_us(){
00161
00162 ++timeStepsNoMessagesCount;
00163 if(timeStepsNoMessagesCount <= 10)
00164 return externalComputeTime_us;
00165
00166
00167 unsigned int newExternalComputeTime_us = timeStepsNoMessagesCount * externalComputeTime_us;
00168 if(newExternalComputeTime_us < MAX_EXTERNAL_COMPUTE_TIME_US)
00169 return newExternalComputeTime_us;
00170 return MAX_EXTERNAL_COMPUTE_TIME_US;
00171 }
00172
00173
00174
00175 bool UDPSynchronizedClient::getExternalSyncDelay(){
00176 return externalSyncDelay;
00177 }
00178
00179
00180
00181
00182 void UDPSynchronizedClient::lockMutex(){
00183 pthread_mutex_lock( &threadMutex );
00184 }
00185
00186
00187
00188
00189 bool UDPSynchronizedClient::openSocket(string groupAddress, int port){
00190 struct ip_mreq multicastRequest;
00191 struct sockaddr_in multicastAddr;
00192
00193
00194 if ((socketHandle = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0){
00195 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR IN socket");
00196 return false;
00197 }
00198
00199
00200 unsigned int multipleListeners = 1;
00201 if (setsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, &multipleListeners, sizeof(multipleListeners)) < 0) {
00202 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR Reusing ADDR failed");
00203 return false;
00204 }
00205
00206
00207 struct timeval tmout;
00208 tmout.tv_sec = 0;
00209 tmout.tv_usec = 500000;
00210 if (setsockopt(socketHandle, SOL_SOCKET, SO_RCVTIMEO, &tmout, sizeof(tmout)) < 0) {
00211 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR SETTING SOCKET TIMEOUT ");
00212 return false;
00213 }
00214
00215
00216 memset(&multicastAddr, 0, sizeof(multicastAddr));
00217 multicastAddr.sin_family = AF_INET;
00218 multicastAddr.sin_addr.s_addr = htonl(INADDR_ANY);
00219 multicastAddr.sin_port = htons(port);
00220
00221
00222
00223 if (bind(socketHandle, (struct sockaddr *) &multicastAddr, sizeof(multicastAddr)) < 0){
00224 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR IN bind");
00225 return false;
00226 }
00227
00228
00229 multicastRequest.imr_multiaddr.s_addr = inet_addr(groupAddress.data());
00230
00231
00232 multicastRequest.imr_interface.s_addr = htonl(INADDR_ANY);
00233
00234
00235 if (setsockopt(socketHandle, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *) &multicastRequest, sizeof(multicastRequest)) < 0){
00236 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR JOINING MULTICAST");
00237 return false;
00238 }
00239
00240
00241 socketOpen = true;
00242 return true;
00243 }
00244
00245
00246
00247 void UDPSynchronizedClient::run(){
00248
00249 if(!socketOpen){
00250 threadRunning = false;
00251 return;
00252 }
00253
00254
00255 unsigned int interMessageTime_us = 0, externalStepCtr = 0, oldExternalStepCtr = 0;
00256 timeval currentTimeStruct, oldTimeStruct;
00257 oldTimeStruct.tv_sec = 0;
00258 oldTimeStruct.tv_usec = 0;
00259
00260 bool firstMessage = true;
00261 unsigned int addressLength = sizeof(socketAddress);
00262
00263 cout<<"UDPSynchronizedClient: Thread entering main loop"<<endl;
00264
00265
00266 while(threadRunning){
00267
00268 int numBytes= recvfrom(socketHandle, messageBuffer, MESSAGE_BUFFER_SIZE, 0, (struct sockaddr *) &(socketAddress), &addressLength);
00269 if(numBytes < 2) {
00270 if(errno == EAGAIN || errno == EWOULDBLOCK){
00271 #ifdef RECEIVE_DEVICE_DATA_DEBUG
00272 SpikeStreamSimulation::systemInfo("UDPSynchronizedClient thread: Socket timed out");
00273 #endif//RECEIVE_DEVICE_DATA_DEBUG
00274 }
00275 else{
00276 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR IN recvfrom EXITING RUN METHOD.");
00277 threadRunning = false;
00278 }
00279 }
00280 else{
00281
00282 timeStepsNoMessagesCount = 0;
00283
00284 #ifdef RECEIVE_DEVICE_DATA_DEBUG
00285 SpikeStreamSimulation::systemInfo("UDPSynchronizedClient: Message received");
00286 #endif//RECEIVE_DEVICE_DATA_DEBUG
00287
00288
00289 pthread_mutex_lock( &threadMutex );
00290
00291
00292 gettimeofday(¤tTimeStruct, NULL);
00293 if(firstMessage)
00294 oldTimeStruct = currentTimeStruct;
00295
00296
00297 if( (numBytes - 2) % 4 != 0){
00298 SpikeStreamSimulation::systemError("UDPSynchronizedClient: INCORRECT NUMBER OF BYTES IN SPIKE PACKET ", numBytes);
00299 threadRunning = false;
00300 return;
00301 }
00302
00303
00304
00305
00306 if(spikeCount + (( numBytes - 2 ) / 4 ) > ( SPIKE_BUFFER_SIZE / 12 )){
00307 cout<<"UDPSynchronizedClient: Shared memory for spike buffer is full. Resetting spike count. SpikeCount = "<<spikeCount<<"; numBytes/6 = "<<(numBytes/6)<<"; Max spikes = "<<((SPIKE_BUFFER_SIZE - 4) / 8)<<endl;
00308 spikeCount = 0;
00309 }
00310
00311 #ifdef RECEIVE_DEVICE_DATA_DEBUG
00312 cout<<"UDPSynchronizedClient: Message contains "<<numBytes<<" bytes"<<endl;
00313 #endif//RECEIVE_DEVICE_DATA_DEBUG
00314
00315
00316
00317 if(messageBuffer[0] & 1)
00318 externalSyncDelay = true;
00319 else
00320 externalSyncDelay = false;
00321
00322
00323 externalStepCtr = (unsigned char)messageBuffer[1];
00324 externalStepCtr <<= 8;
00325 externalStepCtr += (unsigned char)messageBuffer[0];
00326 externalStepCtr >>= 1;
00327
00328
00329 if(firstMessage){
00330 oldExternalStepCtr = externalStepCtr;
00331 firstMessage = false;
00332 }
00333
00334
00335 int stepCtrChange = 0;
00336 if(externalStepCtr > oldExternalStepCtr)
00337 stepCtrChange = externalStepCtr - oldExternalStepCtr;
00338 else if(oldExternalStepCtr - externalStepCtr > 30000)
00339 stepCtrChange = STEP_COUNTER_MAX - oldExternalStepCtr + externalStepCtr;
00340 oldExternalStepCtr = externalStepCtr;
00341
00342 if(stepCtrChange > 0){
00343
00344 interMessageTime_us = 1000000 * (currentTimeStruct.tv_sec - oldTimeStruct.tv_sec) + ( currentTimeStruct.tv_usec - oldTimeStruct.tv_usec);
00345 oldTimeStruct = currentTimeStruct;
00346
00347
00348 externalComputeTime_us = (unsigned int)rint((double)interMessageTime_us / (double)stepCtrChange);
00349
00350
00351 for(int i=2; i < numBytes; i += 4){
00352
00353
00354 if(messageBuffer[i] > neuronGrpWidth){
00355 SpikeStreamSimulation::systemError("UDPSynchronizedClient: X POSITION OUT OF RANGE", messageBuffer[i]);
00356 threadRunning = false;
00357 return;
00358 }
00359 if(messageBuffer[i+1] > neuronGrpLength){
00360 SpikeStreamSimulation::systemError("UDPSynchronizedClient: Y POSITION OUT OF RANGE", messageBuffer[i+1]);
00361 threadRunning = false;
00362 return;
00363 }
00364
00365
00366 spikeBuffer[spikeCount * 3] = messageBuffer[i];
00367 spikeBuffer[spikeCount * 3 + 1] = messageBuffer[i+1];
00368
00369
00370
00371
00372 unsigned int time = messageBuffer[i+3];
00373 time <<= 8;
00374 time += messageBuffer[i+2];
00375
00376
00377 unsigned int delay = time / 10;
00378 if(delay > NUMBER_OF_DELAY_VALUES)
00379 spikeBuffer[spikeCount * 3 + 2] = 0;
00380 else
00381 spikeBuffer[spikeCount * 3 + 2] = delay;
00382
00383
00384
00385
00386 ++spikeCount;
00387 }
00388 }
00389
00390
00391 pthread_mutex_unlock( &threadMutex );
00392 }
00393 }
00394 SpikeStreamSimulation::systemInfo("UDPSynchronizedClient: Exiting spike client thread");
00395 }
00396
00397
00398
00399 void UDPSynchronizedClient::start(){
00400
00401 threadRunning = true;
00402
00403
00404 int res = pthread_create(&clientThread, NULL, startThreadFunction, (void*)this);
00405 if(res != 0){
00406 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR CREATING CLIENT THREAD");
00407 }
00408 }
00409
00410
00411
00412 void UDPSynchronizedClient::stop(){
00413 threadRunning = false;
00414 }
00415
00416
00417
00418 void UDPSynchronizedClient::unlockMutex(){
00419 pthread_mutex_unlock( &threadMutex );
00420 }
00421
00422